Class: OsCtld::ExecutionPlan
- Inherits:
-
Object
- Object
- OsCtld::ExecutionPlan
- Includes:
- OsCtl::Lib::Utils::Log
- Defined in:
- lib/osctld/execution_plan.rb
Overview
Parallel executor of queued operations
First, add items to the internal queue using #<<. When done, call #run with a block. ExecutionPlan will start a configured number of threads and let them consume the queued items. The given block is called for every executed item, but the call may be done from different threads.
Instance Method Summary collapse
-
#<<(v) ⇒ Object
Enqueue item, cannot be called after the execution has been started.
-
#default_threads ⇒ Integer
Number of default threads unless overriden in #run.
- #get_threads(n) ⇒ Object protected
-
#initialize ⇒ ExecutionPlan
constructor
A new instance of ExecutionPlan.
-
#length ⇒ Integer
Return the number of queued items.
-
#on_done(&block) ⇒ Object
Execute a block when all threads have finished and the queue is empty.
-
#on_start(&block) ⇒ Object
Execute a block before the execution threads are started.
-
#queue ⇒ Array
Return the currently enqueued items in an array.
-
#run(threads: nil) {|queued| ... } ⇒ Object
Start processing of queued items.
- #running? ⇒ Boolean
-
#stop ⇒ Object
Clear the queue and wait for all working threads to finish.
- #sync ⇒ Object protected
-
#wait ⇒ Object
Wait for the execution to finish, if it is running.
- #work(block) ⇒ Object protected
Constructor Details
#initialize ⇒ ExecutionPlan
Returns a new instance of ExecutionPlan.
15 16 17 18 19 |
# File 'lib/osctld/execution_plan.rb', line 15 def initialize @queue = OsCtl::Lib::Queue.new @mutex = Mutex.new @cond = ConditionVariable.new end |
Instance Method Details
#<<(v) ⇒ Object
Enqueue item, cannot be called after the execution has been started
22 23 24 25 |
# File 'lib/osctld/execution_plan.rb', line 22 def <<(v) fail 'already in progress' if running? @queue << v end |
#default_threads ⇒ Integer
Number of default threads unless overriden in #run
100 101 102 |
# File 'lib/osctld/execution_plan.rb', line 100 def default_threads @default_threads ||= [Etc.nprocessors / 2, 1].max end |
#get_threads(n) ⇒ Object (protected)
114 115 116 117 118 119 120 121 122 123 124 125 |
# File 'lib/osctld/execution_plan.rb', line 114 def get_threads(n) case n when nil default_threads when :all Etc.nprocessors when :half [Etc.nprocessors / 2, 1].max else fail "unsupported threads value #{n.inspect}" end end |
#length ⇒ Integer
Return the number of queued items
94 95 96 |
# File 'lib/osctld/execution_plan.rb', line 94 def length @queue.length end |
#on_done(&block) ⇒ Object
Execute a block when all threads have finished and the queue is empty
33 34 35 |
# File 'lib/osctld/execution_plan.rb', line 33 def on_done(&block) @on_done = block end |
#on_start(&block) ⇒ Object
Execute a block before the execution threads are started
28 29 30 |
# File 'lib/osctld/execution_plan.rb', line 28 def on_start(&block) @on_start = block end |
#queue ⇒ Array
Return the currently enqueued items in an array
88 89 90 |
# File 'lib/osctld/execution_plan.rb', line 88 def queue @queue.to_a end |
#run(threads: nil) {|queued| ... } ⇒ Object
Start processing of queued items
The given block is called for every executed item. The calls can be made in parallel from different threads.
44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 |
# File 'lib/osctld/execution_plan.rb', line 44 def run(threads: nil, &block) fail 'already in progress' if running? run_threads = [get_threads(threads), @queue.length].min @on_start && @on_start.call t = Thread.new do run_threads.times.map do Thread.new { work(block) } end.map(&:join) @on_done && @on_done.call @cond.broadcast end sync { @thread = t } end |
#running? ⇒ Boolean
75 76 77 |
# File 'lib/osctld/execution_plan.rb', line 75 def running? sync { @thread && @thread.alive? } end |
#stop ⇒ Object
Clear the queue and wait for all working threads to finish
64 65 66 67 68 69 70 71 72 |
# File 'lib/osctld/execution_plan.rb', line 64 def stop @queue.clear sync do next unless @thread @thread.join @thread = nil end end |
#sync ⇒ Object (protected)
127 128 129 130 131 132 133 134 |
# File 'lib/osctld/execution_plan.rb', line 127 def sync if @mutex.owned? yield else @mutex.synchronize { yield } end end |
#wait ⇒ Object
Wait for the execution to finish, if it is running
80 81 82 83 84 |
# File 'lib/osctld/execution_plan.rb', line 80 def wait sync do @cond.wait(@mutex) if running? end end |
#work(block) ⇒ Object (protected)
105 106 107 108 109 110 111 112 |
# File 'lib/osctld/execution_plan.rb', line 105 def work(block) while @queue.any? v = @queue.shift(block: false) break if v.nil? block.call(v) end end |