Class: OsCtld::ThreadReaper
- Inherits:
-
Object
- Object
- OsCtld::ThreadReaper
- Includes:
- OsCtl::Lib::Utils::Exception, OsCtl::Lib::Utils::Log, Singleton
- Defined in:
- lib/osctld/thread_reaper.rb
Overview
Watches over a list of threads and waits for them to gracefully finish
ThreadReaper is used to watch over per-client threads and join them when they finish. When osctld is supposed to shut down, the reaper asks the threads to prematurely exit and waits for all of them to finish their job.
Instance Attribute Summary collapse
-
#queue ⇒ Object
readonly
protected
Returns the value of attribute queue.
-
#stop_at ⇒ Object
readonly
protected
Returns the value of attribute stop_at.
-
#thread ⇒ Object
readonly
protected
Returns the value of attribute thread.
-
#threads ⇒ Object
readonly
protected
Returns the value of attribute threads.
Instance Method Summary collapse
- #add(thread, manager) ⇒ Object
- #can_stop? ⇒ Boolean protected
- #export ⇒ Object
-
#initialize ⇒ ThreadReaper
constructor
A new instance of ThreadReaper.
- #join_dead_threads ⇒ Object protected
- #request_stop_threads ⇒ Object protected
- #run ⇒ Object protected
- #start ⇒ Object
- #stop ⇒ Object
- #sync ⇒ Object protected
Constructor Details
#initialize ⇒ ThreadReaper
Returns a new instance of ThreadReaper.
23 24 25 26 27 |
# File 'lib/osctld/thread_reaper.rb', line 23 def initialize @queue = OsCtl::Lib::Queue.new @mutex = Mutex.new @threads = [] end |
Instance Attribute Details
#queue ⇒ Object (readonly, protected)
Returns the value of attribute queue.
50 51 52 |
# File 'lib/osctld/thread_reaper.rb', line 50 def queue @queue end |
#stop_at ⇒ Object (readonly, protected)
Returns the value of attribute stop_at.
50 51 52 |
# File 'lib/osctld/thread_reaper.rb', line 50 def stop_at @stop_at end |
#thread ⇒ Object (readonly, protected)
Returns the value of attribute thread.
50 51 52 |
# File 'lib/osctld/thread_reaper.rb', line 50 def thread @thread end |
#threads ⇒ Object (readonly, protected)
Returns the value of attribute threads.
50 51 52 |
# File 'lib/osctld/thread_reaper.rb', line 50 def threads @threads end |
Instance Method Details
#add(thread, manager) ⇒ Object
40 41 42 |
# File 'lib/osctld/thread_reaper.rb', line 40 def add(thread, manager) queue << [thread, manager] end |
#can_stop? ⇒ Boolean (protected)
89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 |
# File 'lib/osctld/thread_reaper.rb', line 89 def can_stop? if sync { threads.empty? } true elsif @time.nil? || (Time.now - @time) >= 10 @time = Time.now sync do log( :info, 'threadreaper', "Waiting for #{threads.count} threads to exit" ) if (Time.now - stop_at) >= 30 threads.each_with_index do |v, i| t, m = v log(:info, 'threadreaper', "Thread ##{i + 1}: manager=#{m}") log(:info, 'threadreaper', denixstorify(t.backtrace).join("\n")) end end end false end end |
#export ⇒ Object
44 45 46 |
# File 'lib/osctld/thread_reaper.rb', line 44 def export sync { threads.clone } end |
#join_dead_threads ⇒ Object (protected)
77 78 79 80 81 |
# File 'lib/osctld/thread_reaper.rb', line 77 def join_dead_threads sync do threads.delete_if { |t, _m| !t.alive? && t.join(0.05) } end end |
#request_stop_threads ⇒ Object (protected)
83 84 85 86 87 |
# File 'lib/osctld/thread_reaper.rb', line 83 def request_stop_threads sync do threads.each { |t, m| t.alive? && m && m.request_stop } end end |
#run ⇒ Object (protected)
52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 |
# File 'lib/osctld/thread_reaper.rb', line 52 def run do_stop = false loop do v = queue.pop(timeout: 0.1) if v.nil? join_dead_threads elsif v == :stop do_stop = true @stop_at = Time.now request_stop_threads elsif v.is_a?(Array) sync { threads << v } else raise "unknown command '#{v}'" end return if do_stop && can_stop? end end |
#start ⇒ Object
29 30 31 |
# File 'lib/osctld/thread_reaper.rb', line 29 def start @thread = Thread.new { run } end |
#stop ⇒ Object
33 34 35 36 |
# File 'lib/osctld/thread_reaper.rb', line 33 def stop queue << :stop thread && thread.join end |
#sync ⇒ Object (protected)
115 116 117 |
# File 'lib/osctld/thread_reaper.rb', line 115 def sync(&) @mutex.synchronize(&) end |