Class: OsCtld::ContinuousExecutor
- Inherits:
-
Object
- Object
- OsCtld::ContinuousExecutor
- Includes:
- OsCtl::Lib::Utils::Exception, OsCtl::Lib::Utils::Log
- Defined in:
- lib/osctld/continuous_executor.rb
Overview
Continuous execution queue
This class handles parallel execution of queued commands with priorities. Commands can be added, the queue can be cleared and the pool of workers can be resized at runtime.
Defined Under Namespace
Classes: Command
Instance Method Summary collapse
-
#<<(cmd) ⇒ Object
Enqueue command for execution.
-
#clear ⇒ Object
Clear the execution queue.
-
#enqueue(cmd) ⇒ Object
Enqueue command for execution.
- #exec(cmd) ⇒ Object protected
-
#execute(cmd, timeout: nil) ⇒ any
Enqueue command for execution and wait until it is executed.
-
#initialize(size) ⇒ ContinuousExecutor
constructor
A new instance of ContinuousExecutor.
-
#queue ⇒ Array<Command>
Return contents of the current queue.
-
#remove(cmd_id) ⇒ Object
Remove enqueued command identified by its id.
- #resize(new_size) ⇒ Object
- #start ⇒ Object protected
-
#stop ⇒ Object
Stop execution and wait for all workers to finish.
- #sync ⇒ Object protected
Constructor Details
#initialize(size) ⇒ ContinuousExecutor
Returns a new instance of ContinuousExecutor.
60 61 62 63 64 65 66 67 68 69 |
# File 'lib/osctld/continuous_executor.rb', line 60 def initialize(size) @mutex = Mutex.new @size = size @workers = [] @front_queue = OsCtl::Lib::Queue.new @exec_queue = [] @counter = Concurrent::AtomicFixnum.new(0) start end |
Instance Method Details
#<<(cmd) ⇒ Object
Enqueue command for execution
79 80 81 |
# File 'lib/osctld/continuous_executor.rb', line 79 def <<(cmd) enqueue(cmd) end |
#clear ⇒ Object
Clear the execution queue
100 101 102 |
# File 'lib/osctld/continuous_executor.rb', line 100 def clear @front_queue << [:clear] end |
#enqueue(cmd) ⇒ Object
Enqueue command for execution
73 74 75 |
# File 'lib/osctld/continuous_executor.rb', line 73 def enqueue(cmd) @front_queue << [:command, cmd] end |
#exec(cmd) ⇒ Object (protected)
193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 |
# File 'lib/osctld/continuous_executor.rb', line 193 def exec(cmd) t = Thread.new do begin ret = cmd.send(:exec) rescue Exception => e log(:warn, 'cont', "Exception raised during command execution: #{e.}") puts denixstorify(e.backtrace).join("\n") ensure @front_queue << [:thread, Thread.current] cmd.send(:done, ret) end end @workers << t end |
#execute(cmd, timeout: nil) ⇒ any
Enqueue command for execution and wait until it is executed
87 88 89 90 91 |
# File 'lib/osctld/continuous_executor.rb', line 87 def execute(cmd, timeout: nil) q = cmd.send(:return_queue) enqueue(cmd) q.shift(timeout: timeout) end |
#queue ⇒ Array<Command>
Return contents of the current queue
117 118 119 |
# File 'lib/osctld/continuous_executor.rb', line 117 def queue sync { @exec_queue.clone } end |
#remove(cmd_id) ⇒ Object
Remove enqueued command identified by its id
95 96 97 |
# File 'lib/osctld/continuous_executor.rb', line 95 def remove(cmd_id) @front_queue << [:remove, cmd_id] end |
#resize(new_size) ⇒ Object
111 112 113 |
# File 'lib/osctld/continuous_executor.rb', line 111 def resize(new_size) @front_queue << [:resize, new_size] end |
#start ⇒ Object (protected)
122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 |
# File 'lib/osctld/continuous_executor.rb', line 122 def start @main = Thread.new do loop do cmd, arg = @front_queue.pop case cmd when :command if arg.is_a?(Command) arg.send(:'order=', @counter.increment) sync do if @workers.size < @size exec(arg) else @exec_queue << arg @exec_queue.sort! end end elsif arg.is_a?(Array) arg.each { |c| c.send(:'order=', @counter.increment) } sync do @exec_queue.concat(arg) @exec_queue.sort! while @workers.size < @size && @exec_queue.any? exec(@exec_queue.shift) end end end when :thread sync do @workers.delete(arg) while @workers.size < @size && @exec_queue.any? exec(@exec_queue.shift) end end when :clear @front_queue.clear sync { @exec_queue.clear } when :remove sync do @exec_queue.delete_if { |c| c.id == arg } end when :stop sync do @exec_queue.clear @workers.each(&:join) end break when :resize sync do @size = arg while @workers.size < @size && @exec_queue.any? exec(@exec_queue.shift) end end end end end end |
#stop ⇒ Object
Stop execution and wait for all workers to finish
105 106 107 108 |
# File 'lib/osctld/continuous_executor.rb', line 105 def stop @front_queue << [:stop] @main.join end |
#sync ⇒ Object (protected)
211 212 213 |
# File 'lib/osctld/continuous_executor.rb', line 211 def sync @mutex.synchronize { yield } end |