Class: OsCtld::ExecutionPlan
- Inherits:
-
Object
- Object
- OsCtld::ExecutionPlan
- Includes:
- OsCtl::Lib::Utils::Log
- Defined in:
- lib/osctld/execution_plan.rb
Overview
Parallel executor of queued operations
First, add items to the internal queue using #<<. When done, call #run with a block. ExecutionPlan will start a configured number of threads and let them consume the queued items. The given block is called for every executed item, but the call may be done from different threads.
Instance Method Summary collapse
-
#<<(v) ⇒ Object
Enqueue item, cannot be called after the execution has been started.
-
#default_threads ⇒ Integer
Number of default threads unless overriden in #run.
-
#empty? ⇒ Boolean
Return true if the queue is empty.
- #get_threads(n) ⇒ Object protected
-
#initialize ⇒ ExecutionPlan
constructor
A new instance of ExecutionPlan.
-
#length ⇒ Integer
Return the number of queued items.
-
#on_done(&block) ⇒ Object
Execute a block when all threads have finished and the queue is empty.
-
#on_start(&block) ⇒ Object
Execute a block before the execution threads are started.
-
#queue ⇒ Array
Return the currently enqueued items in an array.
-
#run(threads: nil) {|queued| ... } ⇒ Object
Start processing of queued items.
- #running? ⇒ Boolean
-
#stop ⇒ Object
Clear the queue and wait for all working threads to finish.
- #sync ⇒ Object protected
-
#wait ⇒ Object
Wait for the execution to finish, if it is running.
- #work(block) ⇒ Object protected
Constructor Details
#initialize ⇒ ExecutionPlan
Returns a new instance of ExecutionPlan.
14 15 16 17 18 |
# File 'lib/osctld/execution_plan.rb', line 14 def initialize @queue = OsCtl::Lib::Queue.new @mutex = Mutex.new @cond = ConditionVariable.new end |
Instance Method Details
#<<(v) ⇒ Object
Enqueue item, cannot be called after the execution has been started
21 22 23 24 25 |
# File 'lib/osctld/execution_plan.rb', line 21 def <<(v) raise 'already in progress' if running? @queue << v end |
#default_threads ⇒ Integer
Number of default threads unless overriden in #run
107 108 109 |
# File 'lib/osctld/execution_plan.rb', line 107 def default_threads @default_threads ||= [Etc.nprocessors / 2, 1].max end |
#empty? ⇒ Boolean
Return true if the queue is empty
101 102 103 |
# File 'lib/osctld/execution_plan.rb', line 101 def empty? @queue.empty? end |
#get_threads(n) ⇒ Object (protected)
122 123 124 125 126 127 128 129 130 131 132 133 |
# File 'lib/osctld/execution_plan.rb', line 122 def get_threads(n) case n when nil default_threads when :all Etc.nprocessors when :half [Etc.nprocessors / 2, 1].max else raise "unsupported threads value #{n.inspect}" end end |
#length ⇒ Integer
Return the number of queued items
95 96 97 |
# File 'lib/osctld/execution_plan.rb', line 95 def length @queue.length end |
#on_done(&block) ⇒ Object
Execute a block when all threads have finished and the queue is empty
33 34 35 |
# File 'lib/osctld/execution_plan.rb', line 33 def on_done(&block) @on_done = block end |
#on_start(&block) ⇒ Object
Execute a block before the execution threads are started
28 29 30 |
# File 'lib/osctld/execution_plan.rb', line 28 def on_start(&block) @on_start = block end |
#queue ⇒ Array
Return the currently enqueued items in an array
89 90 91 |
# File 'lib/osctld/execution_plan.rb', line 89 def queue @queue.to_a end |
#run(threads: nil) {|queued| ... } ⇒ Object
Start processing of queued items
The given block is called for every executed item. The calls can be made in parallel from different threads.
44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 |
# File 'lib/osctld/execution_plan.rb', line 44 def run(threads: nil, &block) raise 'already in progress' if running? run_threads = [get_threads(threads), @queue.length].min @on_start && @on_start.call t = Thread.new do run_threads.times.map do Thread.new { work(block) } end.map(&:join) @on_done && @on_done.call @cond.broadcast end sync { @thread = t } end |
#running? ⇒ Boolean
76 77 78 |
# File 'lib/osctld/execution_plan.rb', line 76 def running? sync { @thread && @thread.alive? } end |
#stop ⇒ Object
Clear the queue and wait for all working threads to finish
64 65 66 67 68 69 70 71 72 73 |
# File 'lib/osctld/execution_plan.rb', line 64 def stop @queue.clear sync do next unless @thread @thread.join @thread = nil end end |
#sync ⇒ Object (protected)
135 136 137 138 139 140 141 142 |
# File 'lib/osctld/execution_plan.rb', line 135 def sync(&) if @mutex.owned? yield else @mutex.synchronize(&) end end |
#wait ⇒ Object
Wait for the execution to finish, if it is running
81 82 83 84 85 |
# File 'lib/osctld/execution_plan.rb', line 81 def wait sync do @cond.wait(@mutex) if running? end end |
#work(block) ⇒ Object (protected)
113 114 115 116 117 118 119 120 |
# File 'lib/osctld/execution_plan.rb', line 113 def work(block) while @queue.any? v = @queue.shift(block: false) break if v.nil? block.call(v) end end |