Class: OsCtld::ThreadReaper

Inherits:
Object
  • Object
show all
Includes:
OsCtl::Lib::Utils::Exception, OsCtl::Lib::Utils::Log, Singleton
Defined in:
lib/osctld/thread_reaper.rb

Overview

Watches over a list of threads and waits for them to gracefully finish

ThreadReaper is used to watch over per-client threads and join them when they finish. When osctld is supposed to shut down, the reaper asks the threads to prematurely exit and waits for all of them to finish their job.

Constant Summary collapse

DEFAULT_GROUP =
:default

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeThreadReaper

Returns a new instance of ThreadReaper.



25
26
27
28
29
30
# File 'lib/osctld/thread_reaper.rb', line 25

def initialize
  @queue = OsCtl::Lib::Queue.new
  @mutex = Mutex.new
  @threads = []
  @drains = []
end

Instance Attribute Details

#drainsObject (readonly, protected)

Returns the value of attribute drains.



76
77
78
# File 'lib/osctld/thread_reaper.rb', line 76

def drains
  @drains
end

#queueObject (readonly, protected)

Returns the value of attribute queue.



76
77
78
# File 'lib/osctld/thread_reaper.rb', line 76

def queue
  @queue
end

#stop_atObject (readonly, protected)

Returns the value of attribute stop_at.



76
77
78
# File 'lib/osctld/thread_reaper.rb', line 76

def stop_at
  @stop_at
end

#threadObject (readonly, protected)

Returns the value of attribute thread.



76
77
78
# File 'lib/osctld/thread_reaper.rb', line 76

def thread
  @thread
end

#threadsObject (readonly, protected)

Returns the value of attribute threads.



76
77
78
# File 'lib/osctld/thread_reaper.rb', line 76

def threads
  @threads
end

Instance Method Details

#add(thread, manager, group: DEFAULT_GROUP) ⇒ Object

Parameters:

  • thread (Thread)
  • manager (Object, nil)
  • group (Symbol) (defaults to: DEFAULT_GROUP)


66
67
68
# File 'lib/osctld/thread_reaper.rb', line 66

def add(thread, manager, group: DEFAULT_GROUP)
  queue << [:add, thread, manager, group]
end

#can_stop?Boolean (protected)

Returns:

  • (Boolean)


155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
# File 'lib/osctld/thread_reaper.rb', line 155

def can_stop?
  if sync { threads.empty? }
    true

  elsif @time.nil? || (Time.now - @time) >= 10
    @time = Time.now
    sync do
      log(
        :info,
        'threadreaper',
        "Waiting for #{threads.count} threads to exit"
      )

      if (Time.now - stop_at) >= 30
        threads.each_with_index do |v, i|
          t, m, group = v
          log(:info, 'threadreaper', "Thread ##{i + 1}: manager=#{m}")
          log(:info, 'threadreaper', "Thread ##{i + 1}: group=#{group}")
          log(:info, 'threadreaper', denixstorify(t.backtrace).join("\n"))
        end
      end
    end

    false
  end
end

#drain(group: nil, groups: nil, stop: true) ⇒ Object

Drain selected thread groups while the reaper remains active.

Threads in other groups are left running and newly added threads in those groups are not asked to stop. This is used during osctld shutdown to drain public management clients while user-control callbacks needed by those commands are still accepted.

Parameters:

  • group (Symbol, nil) (defaults to: nil)
  • groups (Array<Symbol>, nil) (defaults to: nil)
  • stop (Boolean) (defaults to: true)

    ask matching managers to stop while draining



53
54
55
56
57
58
59
60
61
# File 'lib/osctld/thread_reaper.rb', line 53

def drain(group: nil, groups: nil, stop: true)
  return unless thread

  target_groups = normalize_groups(group:, groups:)
  ack = Queue.new

  queue << [:drain, target_groups, stop, ack]
  ack.pop
end

#exportObject



70
71
72
# File 'lib/osctld/thread_reaper.rb', line 70

def export
  sync { threads.map { |thread, manager, _group| [thread, manager] } }
end

#finish_drainsObject (protected)



140
141
142
143
144
145
146
147
# File 'lib/osctld/thread_reaper.rb', line 140

def finish_drains
  drains.delete_if do |groups, _stop, ack|
    next false unless groups_empty?(groups)

    ack << true
    true
  end
end

#groups_empty?(groups) ⇒ Boolean (protected)

Returns:

  • (Boolean)


149
150
151
152
153
# File 'lib/osctld/thread_reaper.rb', line 149

def groups_empty?(groups)
  sync do
    threads.none? { |_thread, _manager, group| groups.include?(thread_group(group)) }
  end
end

#join_dead_threadsObject (protected)



114
115
116
117
118
# File 'lib/osctld/thread_reaper.rb', line 114

def join_dead_threads
  sync do
    threads.delete_if { |t, _m, _group| !t.alive? && t.join(0.05) }
  end
end

#normalize_groups(group:, groups:) ⇒ Object (protected)

Raises:

  • (ArgumentError)


186
187
188
189
190
191
192
193
# File 'lib/osctld/thread_reaper.rb', line 186

def normalize_groups(group:, groups:)
  ret = Array(groups || group)
  ret.compact!

  raise ArgumentError, 'no thread groups selected' if ret.empty?

  ret
end

#request_stop_thread(thread, manager) ⇒ Object (protected)



130
131
132
# File 'lib/osctld/thread_reaper.rb', line 130

def request_stop_thread(thread, manager)
  thread.alive? && manager && manager.request_stop
end

#request_stop_threads(groups: nil) ⇒ Object (protected)



120
121
122
123
124
125
126
127
128
# File 'lib/osctld/thread_reaper.rb', line 120

def request_stop_threads(groups: nil)
  sync do
    threads.each do |thread, manager, group|
      next if groups && !groups.include?(thread_group(group))

      request_stop_thread(thread, manager)
    end
  end
end

#runObject (protected)



78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
# File 'lib/osctld/thread_reaper.rb', line 78

def run
  @stopping = false

  loop do
    v = queue.pop(timeout: 0.1)

    if v.nil?
      join_dead_threads

    elsif v == :stop
      @stopping = true
      @stop_at = Time.now
      request_stop_threads

    elsif v.is_a?(Array) && v[0] == :drain
      _cmd, groups, stop, ack = v

      drains << [groups, stop, ack]
      request_stop_threads(groups:) if stop

    elsif v.is_a?(Array) && v[0] == :add
      _cmd, thread, manager, group = v

      request_stop_thread(thread, manager) if stop_thread_group?(group)
      sync { threads << [thread, manager, group] }

    else
      raise "unknown command '#{v}'"
    end

    finish_drains

    return if @stopping && can_stop?
  end
end

#startObject



32
33
34
# File 'lib/osctld/thread_reaper.rb', line 32

def start
  @thread = Thread.new { run }
end

#stopObject



36
37
38
39
40
41
# File 'lib/osctld/thread_reaper.rb', line 36

def stop
  return unless thread

  queue << :stop
  thread.join
end

#stop_thread_group?(group) ⇒ Boolean (protected)

Returns:

  • (Boolean)


134
135
136
137
138
# File 'lib/osctld/thread_reaper.rb', line 134

def stop_thread_group?(group)
  @stopping || drains.any? do |groups, stop, _ack|
    stop && groups.include?(thread_group(group))
  end
end

#syncObject (protected)



182
183
184
# File 'lib/osctld/thread_reaper.rb', line 182

def sync(&)
  @mutex.synchronize(&)
end

#thread_group(group) ⇒ Object (protected)



195
196
197
# File 'lib/osctld/thread_reaper.rb', line 195

def thread_group(group)
  group || DEFAULT_GROUP
end