Class: OsCtld::ThreadReaper
- Inherits:
-
Object
- Object
- OsCtld::ThreadReaper
- Includes:
- OsCtl::Lib::Utils::Exception, OsCtl::Lib::Utils::Log, Singleton
- Defined in:
- lib/osctld/thread_reaper.rb
Overview
Watches over a list of threads and waits for them to gracefully finish
ThreadReaper is used to watch over per-client threads and join them when they finish. When osctld is supposed to shut down, the reaper asks the threads to prematurely exit and waits for all of them to finish their job.
Constant Summary collapse
- DEFAULT_GROUP =
:default
Instance Attribute Summary collapse
-
#drains ⇒ Object
readonly
protected
Returns the value of attribute drains.
-
#queue ⇒ Object
readonly
protected
Returns the value of attribute queue.
-
#stop_at ⇒ Object
readonly
protected
Returns the value of attribute stop_at.
-
#thread ⇒ Object
readonly
protected
Returns the value of attribute thread.
-
#threads ⇒ Object
readonly
protected
Returns the value of attribute threads.
Instance Method Summary collapse
- #add(thread, manager, group: DEFAULT_GROUP) ⇒ Object
- #can_stop? ⇒ Boolean protected
-
#drain(group: nil, groups: nil, stop: true) ⇒ Object
Drain selected thread groups while the reaper remains active.
- #export ⇒ Object
- #finish_drains ⇒ Object protected
- #groups_empty?(groups) ⇒ Boolean protected
-
#initialize ⇒ ThreadReaper
constructor
A new instance of ThreadReaper.
- #join_dead_threads ⇒ Object protected
- #normalize_groups(group:, groups:) ⇒ Object protected
- #request_stop_thread(thread, manager) ⇒ Object protected
- #request_stop_threads(groups: nil) ⇒ Object protected
- #run ⇒ Object protected
- #start ⇒ Object
- #stop ⇒ Object
- #stop_thread_group?(group) ⇒ Boolean protected
- #sync ⇒ Object protected
- #thread_group(group) ⇒ Object protected
Constructor Details
#initialize ⇒ ThreadReaper
Returns a new instance of ThreadReaper.
25 26 27 28 29 30 |
# File 'lib/osctld/thread_reaper.rb', line 25 def initialize @queue = OsCtl::Lib::Queue.new @mutex = Mutex.new @threads = [] @drains = [] end |
Instance Attribute Details
#drains ⇒ Object (readonly, protected)
Returns the value of attribute drains.
76 77 78 |
# File 'lib/osctld/thread_reaper.rb', line 76 def drains @drains end |
#queue ⇒ Object (readonly, protected)
Returns the value of attribute queue.
76 77 78 |
# File 'lib/osctld/thread_reaper.rb', line 76 def queue @queue end |
#stop_at ⇒ Object (readonly, protected)
Returns the value of attribute stop_at.
76 77 78 |
# File 'lib/osctld/thread_reaper.rb', line 76 def stop_at @stop_at end |
#thread ⇒ Object (readonly, protected)
Returns the value of attribute thread.
76 77 78 |
# File 'lib/osctld/thread_reaper.rb', line 76 def thread @thread end |
#threads ⇒ Object (readonly, protected)
Returns the value of attribute threads.
76 77 78 |
# File 'lib/osctld/thread_reaper.rb', line 76 def threads @threads end |
Instance Method Details
#add(thread, manager, group: DEFAULT_GROUP) ⇒ Object
66 67 68 |
# File 'lib/osctld/thread_reaper.rb', line 66 def add(thread, manager, group: DEFAULT_GROUP) queue << [:add, thread, manager, group] end |
#can_stop? ⇒ Boolean (protected)
155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 |
# File 'lib/osctld/thread_reaper.rb', line 155 def can_stop? if sync { threads.empty? } true elsif @time.nil? || (Time.now - @time) >= 10 @time = Time.now sync do log( :info, 'threadreaper', "Waiting for #{threads.count} threads to exit" ) if (Time.now - stop_at) >= 30 threads.each_with_index do |v, i| t, m, group = v log(:info, 'threadreaper', "Thread ##{i + 1}: manager=#{m}") log(:info, 'threadreaper', "Thread ##{i + 1}: group=#{group}") log(:info, 'threadreaper', denixstorify(t.backtrace).join("\n")) end end end false end end |
#drain(group: nil, groups: nil, stop: true) ⇒ Object
Drain selected thread groups while the reaper remains active.
Threads in other groups are left running and newly added threads in those groups are not asked to stop. This is used during osctld shutdown to drain public management clients while user-control callbacks needed by those commands are still accepted.
53 54 55 56 57 58 59 60 61 |
# File 'lib/osctld/thread_reaper.rb', line 53 def drain(group: nil, groups: nil, stop: true) return unless thread target_groups = normalize_groups(group:, groups:) ack = Queue.new queue << [:drain, target_groups, stop, ack] ack.pop end |
#export ⇒ Object
70 71 72 |
# File 'lib/osctld/thread_reaper.rb', line 70 def export sync { threads.map { |thread, manager, _group| [thread, manager] } } end |
#finish_drains ⇒ Object (protected)
140 141 142 143 144 145 146 147 |
# File 'lib/osctld/thread_reaper.rb', line 140 def finish_drains drains.delete_if do |groups, _stop, ack| next false unless groups_empty?(groups) ack << true true end end |
#groups_empty?(groups) ⇒ Boolean (protected)
149 150 151 152 153 |
# File 'lib/osctld/thread_reaper.rb', line 149 def groups_empty?(groups) sync do threads.none? { |_thread, _manager, group| groups.include?(thread_group(group)) } end end |
#join_dead_threads ⇒ Object (protected)
114 115 116 117 118 |
# File 'lib/osctld/thread_reaper.rb', line 114 def join_dead_threads sync do threads.delete_if { |t, _m, _group| !t.alive? && t.join(0.05) } end end |
#normalize_groups(group:, groups:) ⇒ Object (protected)
186 187 188 189 190 191 192 193 |
# File 'lib/osctld/thread_reaper.rb', line 186 def normalize_groups(group:, groups:) ret = Array(groups || group) ret.compact! raise ArgumentError, 'no thread groups selected' if ret.empty? ret end |
#request_stop_thread(thread, manager) ⇒ Object (protected)
130 131 132 |
# File 'lib/osctld/thread_reaper.rb', line 130 def request_stop_thread(thread, manager) thread.alive? && manager && manager.request_stop end |
#request_stop_threads(groups: nil) ⇒ Object (protected)
120 121 122 123 124 125 126 127 128 |
# File 'lib/osctld/thread_reaper.rb', line 120 def request_stop_threads(groups: nil) sync do threads.each do |thread, manager, group| next if groups && !groups.include?(thread_group(group)) request_stop_thread(thread, manager) end end end |
#run ⇒ Object (protected)
78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 |
# File 'lib/osctld/thread_reaper.rb', line 78 def run @stopping = false loop do v = queue.pop(timeout: 0.1) if v.nil? join_dead_threads elsif v == :stop @stopping = true @stop_at = Time.now request_stop_threads elsif v.is_a?(Array) && v[0] == :drain _cmd, groups, stop, ack = v drains << [groups, stop, ack] request_stop_threads(groups:) if stop elsif v.is_a?(Array) && v[0] == :add _cmd, thread, manager, group = v request_stop_thread(thread, manager) if stop_thread_group?(group) sync { threads << [thread, manager, group] } else raise "unknown command '#{v}'" end finish_drains return if @stopping && can_stop? end end |
#start ⇒ Object
32 33 34 |
# File 'lib/osctld/thread_reaper.rb', line 32 def start @thread = Thread.new { run } end |
#stop ⇒ Object
36 37 38 39 40 41 |
# File 'lib/osctld/thread_reaper.rb', line 36 def stop return unless thread queue << :stop thread.join end |
#stop_thread_group?(group) ⇒ Boolean (protected)
134 135 136 137 138 |
# File 'lib/osctld/thread_reaper.rb', line 134 def stop_thread_group?(group) @stopping || drains.any? do |groups, stop, _ack| stop && groups.include?(thread_group(group)) end end |
#sync ⇒ Object (protected)
182 183 184 |
# File 'lib/osctld/thread_reaper.rb', line 182 def sync(&) @mutex.synchronize(&) end |
#thread_group(group) ⇒ Object (protected)
195 196 197 |
# File 'lib/osctld/thread_reaper.rb', line 195 def thread_group(group) group || DEFAULT_GROUP end |