Class: OsCtld::ContinuousExecutor
- Inherits:
-
Object
- Object
- OsCtld::ContinuousExecutor
- Includes:
- OsCtl::Lib::Utils::Exception, OsCtl::Lib::Utils::Log
- Defined in:
- lib/osctld/continuous_executor.rb
Overview
Continuous execution queue
This class handles parallel execution of queued commands with priorities. Commands can be added, the queue can be cleared and the pool of workers can be resized at runtime.
Defined Under Namespace
Classes: Command
Instance Method Summary collapse
-
#<<(cmd) ⇒ Object
Enqueue command for execution.
-
#clear ⇒ Object
Clear the execution queue.
-
#enqueue(cmd) ⇒ Object
Enqueue command for execution.
- #exec(cmd) ⇒ Object protected
-
#execute(cmd, timeout: nil) ⇒ any
Enqueue command for execution and wait until it is executed.
-
#initialize(size) ⇒ ContinuousExecutor
constructor
A new instance of ContinuousExecutor.
-
#queue ⇒ Array<Command>
Return contents of the current queue.
-
#remove(cmd_id) ⇒ Object
Remove enqueued command identified by its id.
- #resize(new_size) ⇒ Object
- #start ⇒ Object protected
-
#stop ⇒ Object
Stop execution and wait for all workers to finish.
- #sync ⇒ Object protected
-
#wait_until_empty ⇒ Object
Block until the queue is empty.
- #wake_waiters(force: false) ⇒ Object protected
Constructor Details
#initialize(size) ⇒ ContinuousExecutor
Returns a new instance of ContinuousExecutor.
60 61 62 63 64 65 66 67 68 69 70 |
# File 'lib/osctld/continuous_executor.rb', line 60 def initialize(size) @mutex = Mutex.new @size = size @workers = [] @front_queue = OsCtl::Lib::Queue.new @exec_queue = [] @waiters = [] @counter = Concurrent::AtomicFixnum.new(0) start end |
Instance Method Details
#<<(cmd) ⇒ Object
Enqueue command for execution
80 81 82 |
# File 'lib/osctld/continuous_executor.rb', line 80 def <<(cmd) enqueue(cmd) end |
#clear ⇒ Object
Clear the execution queue
101 102 103 |
# File 'lib/osctld/continuous_executor.rb', line 101 def clear @front_queue << [:clear] end |
#enqueue(cmd) ⇒ Object
Enqueue command for execution
74 75 76 |
# File 'lib/osctld/continuous_executor.rb', line 74 def enqueue(cmd) @front_queue << [:command, cmd] end |
#exec(cmd) ⇒ Object (protected)
215 216 217 218 219 220 221 222 223 224 225 226 227 |
# File 'lib/osctld/continuous_executor.rb', line 215 def exec(cmd) t = Thread.new do ret = cmd.send(:exec) rescue StandardError => e log(:warn, 'cont', "Exception raised during command execution: #{e.}") puts denixstorify(e.backtrace).join("\n") ensure @front_queue << [:thread, Thread.current] cmd.send(:done, ret) end @workers << t end |
#execute(cmd, timeout: nil) ⇒ any
Enqueue command for execution and wait until it is executed
88 89 90 91 92 |
# File 'lib/osctld/continuous_executor.rb', line 88 def execute(cmd, timeout: nil) q = cmd.send(:return_queue) enqueue(cmd) q.shift(timeout:) end |
#queue ⇒ Array<Command>
Return contents of the current queue
118 119 120 |
# File 'lib/osctld/continuous_executor.rb', line 118 def queue sync { @exec_queue.clone } end |
#remove(cmd_id) ⇒ Object
Remove enqueued command identified by its id
96 97 98 |
# File 'lib/osctld/continuous_executor.rb', line 96 def remove(cmd_id) @front_queue << [:remove, cmd_id] end |
#resize(new_size) ⇒ Object
112 113 114 |
# File 'lib/osctld/continuous_executor.rb', line 112 def resize(new_size) @front_queue << [:resize, new_size] end |
#start ⇒ Object (protected)
132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 |
# File 'lib/osctld/continuous_executor.rb', line 132 def start @main = Thread.new do loop do cmd, arg = @front_queue.pop case cmd when :command if arg.is_a?(Command) arg.send(:'order=', @counter.increment) sync do if @workers.size < @size exec(arg) else @exec_queue << arg @exec_queue.sort! end end elsif arg.is_a?(Array) arg.each { |c| c.send(:'order=', @counter.increment) } sync do @exec_queue.concat(arg) @exec_queue.sort! exec(@exec_queue.shift) while @workers.size < @size && @exec_queue.any? end end when :thread sync do @workers.delete(arg) exec(@exec_queue.shift) while @workers.size < @size && @exec_queue.any? wake_waiters end when :clear @front_queue.clear sync do @exec_queue.clear wake_waiters end when :remove sync do @exec_queue.delete_if { |c| c.id == arg } wake_waiters end when :stop sync do @exec_queue.clear @workers.each(&:join) wake_waiters(force: true) end break when :resize sync do @size = arg exec(@exec_queue.shift) while @workers.size < @size && @exec_queue.any? end when :wait sync do q = arg if @workers.empty? && @exec_queue.empty? q << true else @waiters << q end end end end end end |
#stop ⇒ Object
Stop execution and wait for all workers to finish
106 107 108 109 |
# File 'lib/osctld/continuous_executor.rb', line 106 def stop @front_queue << [:stop] @main.join end |
#sync ⇒ Object (protected)
238 239 240 |
# File 'lib/osctld/continuous_executor.rb', line 238 def sync(&) @mutex.synchronize(&) end |
#wait_until_empty ⇒ Object
Block until the queue is empty
123 124 125 126 127 128 |
# File 'lib/osctld/continuous_executor.rb', line 123 def wait_until_empty q = OsCtl::Lib::Queue.new @front_queue << [:wait, q] q.pop nil end |
#wake_waiters(force: false) ⇒ Object (protected)
229 230 231 232 233 234 235 236 |
# File 'lib/osctld/continuous_executor.rb', line 229 def wake_waiters(force: false) return unless force || (@workers.empty? && @exec_queue.empty?) @waiters.delete_if do |q| q << true true end end |