Class: OsCtld::ThreadReaper

Inherits:
Object
  • Object
show all
Includes:
OsCtl::Lib::Utils::Log, Singleton
Defined in:
lib/osctld/thread_reaper.rb

Overview

Watches over a list of threads and waits for them to gracefully finish

ThreadReaper is used to watch over per-client threads and join them when they finish. When osctld is supposed to shut down, the reaper asks the threads to prematurely exit and waits for all of them to finish their job.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeThreadReaper

Returns a new instance of ThreadReaper



22
23
24
25
26
# File 'lib/osctld/thread_reaper.rb', line 22

def initialize
  @queue = OsCtl::Lib::Queue.new
  @mutex = Mutex.new
  @threads = []
end

Instance Attribute Details

#queueObject (readonly, protected)

Returns the value of attribute queue



48
49
50
# File 'lib/osctld/thread_reaper.rb', line 48

def queue
  @queue
end

#threadObject (readonly, protected)

Returns the value of attribute thread



48
49
50
# File 'lib/osctld/thread_reaper.rb', line 48

def thread
  @thread
end

#threadsObject (readonly, protected)

Returns the value of attribute threads



48
49
50
# File 'lib/osctld/thread_reaper.rb', line 48

def threads
  @threads
end

Instance Method Details

#add(thread, manager) ⇒ Object

Parameters:

  • thread (Thread)
  • manager (Object, nil)


39
40
41
# File 'lib/osctld/thread_reaper.rb', line 39

def add(thread, manager)
  queue << [thread, manager]
end

#can_stop?Boolean (protected)

Returns:

  • (Boolean)


86
87
88
89
90
91
92
93
94
95
96
97
98
99
# File 'lib/osctld/thread_reaper.rb', line 86

def can_stop?
  if sync { threads.empty? }
    true

  elsif @time.nil? || (Time.now - @time) >= 10
    @time = Time.now
    log(
      :info,
      'threadreaper',
      "Waiting for #{sync { threads.count }} threads to exit"
    )
    false
  end
end

#exportObject



43
44
45
# File 'lib/osctld/thread_reaper.rb', line 43

def export
  sync { threads.clone }
end

#join_dead_threadsObject (protected)



74
75
76
77
78
# File 'lib/osctld/thread_reaper.rb', line 74

def join_dead_threads
  sync do
    threads.delete_if { |t, m| !t.alive? && t.join(0.05) }
  end
end

#request_stop_threadsObject (protected)



80
81
82
83
84
# File 'lib/osctld/thread_reaper.rb', line 80

def request_stop_threads
  sync do
    threads.each { |t, m| t.alive? && m && m.request_stop }
  end
end

#runObject (protected)



50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
# File 'lib/osctld/thread_reaper.rb', line 50

def run
  do_stop = false

  loop do
    v = queue.pop(timeout: 0.1)

    if v.nil?
      join_dead_threads

    elsif v == :stop
      do_stop = true
      request_stop_threads

    elsif v.is_a?(Array)
      sync { threads << v }

    else
      fail "unknown command '#{v}'"
    end

    return if do_stop && can_stop?
  end
end

#startObject



28
29
30
# File 'lib/osctld/thread_reaper.rb', line 28

def start
  @thread = Thread.new { run }
end

#stopObject



32
33
34
35
# File 'lib/osctld/thread_reaper.rb', line 32

def stop
  queue << :stop
  thread && thread.join
end

#sync(&block) ⇒ Object (protected)



101
102
103
# File 'lib/osctld/thread_reaper.rb', line 101

def sync(&block)
  @mutex.synchronize(&block)
end