Class: OsCtld::ThreadReaper

Inherits:
Object
  • Object
show all
Includes:
OsCtl::Lib::Utils::Exception, OsCtl::Lib::Utils::Log, Singleton
Defined in:
lib/osctld/thread_reaper.rb

Overview

Watches over a list of threads and waits for them to gracefully finish

ThreadReaper is used to watch over per-client threads and join them when they finish. When osctld is supposed to shut down, the reaper asks the threads to prematurely exit and waits for all of them to finish their job.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeThreadReaper

Returns a new instance of ThreadReaper.



23
24
25
26
27
# File 'lib/osctld/thread_reaper.rb', line 23

def initialize
  @queue = OsCtl::Lib::Queue.new
  @mutex = Mutex.new
  @threads = []
end

Instance Attribute Details

#queueObject (readonly, protected)

Returns the value of attribute queue.



50
51
52
# File 'lib/osctld/thread_reaper.rb', line 50

def queue
  @queue
end

#stop_atObject (readonly, protected)

Returns the value of attribute stop_at.



50
51
52
# File 'lib/osctld/thread_reaper.rb', line 50

def stop_at
  @stop_at
end

#threadObject (readonly, protected)

Returns the value of attribute thread.



50
51
52
# File 'lib/osctld/thread_reaper.rb', line 50

def thread
  @thread
end

#threadsObject (readonly, protected)

Returns the value of attribute threads.



50
51
52
# File 'lib/osctld/thread_reaper.rb', line 50

def threads
  @threads
end

Instance Method Details

#add(thread, manager) ⇒ Object

Parameters:

  • thread (Thread)
  • manager (Object, nil)


40
41
42
# File 'lib/osctld/thread_reaper.rb', line 40

def add(thread, manager)
  queue << [thread, manager]
end

#can_stop?Boolean (protected)

Returns:

  • (Boolean)


89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
# File 'lib/osctld/thread_reaper.rb', line 89

def can_stop?
  if sync { threads.empty? }
    true

  elsif @time.nil? || (Time.now - @time) >= 10
    @time = Time.now
    sync do
      log(
        :info,
        'threadreaper',
        "Waiting for #{threads.count} threads to exit"
      )

      if (Time.now - stop_at) >= 30
        threads.each_with_index do |v, i|
          t, m = v
          log(:info, 'threadreaper', "Thread ##{i + 1}: manager=#{m}")
          log(:info, 'threadreaper', denixstorify(t.backtrace).join("\n"))
        end
      end
    end

    false
  end
end

#exportObject



44
45
46
# File 'lib/osctld/thread_reaper.rb', line 44

def export
  sync { threads.clone }
end

#join_dead_threadsObject (protected)



77
78
79
80
81
# File 'lib/osctld/thread_reaper.rb', line 77

def join_dead_threads
  sync do
    threads.delete_if { |t, _m| !t.alive? && t.join(0.05) }
  end
end

#request_stop_threadsObject (protected)



83
84
85
86
87
# File 'lib/osctld/thread_reaper.rb', line 83

def request_stop_threads
  sync do
    threads.each { |t, m| t.alive? && m && m.request_stop }
  end
end

#runObject (protected)



52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
# File 'lib/osctld/thread_reaper.rb', line 52

def run
  do_stop = false

  loop do
    v = queue.pop(timeout: 0.1)

    if v.nil?
      join_dead_threads

    elsif v == :stop
      do_stop = true
      @stop_at = Time.now
      request_stop_threads

    elsif v.is_a?(Array)
      sync { threads << v }

    else
      raise "unknown command '#{v}'"
    end

    return if do_stop && can_stop?
  end
end

#startObject



29
30
31
# File 'lib/osctld/thread_reaper.rb', line 29

def start
  @thread = Thread.new { run }
end

#stopObject



33
34
35
36
# File 'lib/osctld/thread_reaper.rb', line 33

def stop
  queue << :stop
  thread && thread.join
end

#syncObject (protected)



115
116
117
# File 'lib/osctld/thread_reaper.rb', line 115

def sync(&)
  @mutex.synchronize(&)
end