Class: OsCtld::ExecutionPlan

Inherits:
Object
  • Object
show all
Includes:
OsCtl::Lib::Utils::Log
Defined in:
lib/osctld/execution_plan.rb

Overview

Parallel executor of queued operations

First, add items to the internal queue using #<<. When done, call #run with a block. ExecutionPlan will start a configured number of threads and let them consume the queued items. The given block is called for every executed item, but the call may be done from different threads.

Instance Method Summary collapse

Constructor Details

#initializeExecutionPlan

Returns a new instance of ExecutionPlan



14
15
16
17
18
# File 'lib/osctld/execution_plan.rb', line 14

def initialize
  @queue = OsCtl::Lib::Queue.new
  @mutex = Mutex.new
  @cond = ConditionVariable.new
end

Instance Method Details

#<<(v) ⇒ Object

Enqueue item, cannot be called after the execution has been started



21
22
23
24
# File 'lib/osctld/execution_plan.rb', line 21

def <<(v)
  fail 'already in progress' if running?
  @queue << v
end

#on_done(&block) ⇒ Object

Execute a block when all threads have finished and the queue is empty



32
33
34
# File 'lib/osctld/execution_plan.rb', line 32

def on_done(&block)
  @on_done = block
end

#on_start(&block) ⇒ Object

Execute a block before the execution threads are started



27
28
29
# File 'lib/osctld/execution_plan.rb', line 27

def on_start(&block)
  @on_start = block
end

#queueArray

Return the currently enqueued items in an array

Returns:

  • (Array)


85
86
87
# File 'lib/osctld/execution_plan.rb', line 85

def queue
  @queue.to_a
end

#run(threads) {|queued| ... } ⇒ Object

Start processing of queued items

The given block is called for every executed item. The calls can be made in parallel from different threads.

Parameters:

  • threads (Integer)

    number of threads to spawn

Yield Parameters:

  • queued (v)

    item



43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/osctld/execution_plan.rb', line 43

def run(threads, &block)
  fail 'already in progress' if running?

  @on_start && @on_start.call

  t = Thread.new do
    threads.times.map do
      Thread.new { work(block) }
    end.map(&:join)

    @on_done && @on_done.call
    @cond.broadcast
  end

  sync { @thread = t }
end

#running?Boolean

Returns:

  • (Boolean)


72
73
74
# File 'lib/osctld/execution_plan.rb', line 72

def running?
  sync { @thread && @thread.alive? }
end

#stopObject

Clear the queue and wait for all working threads to finish



61
62
63
64
65
66
67
68
69
# File 'lib/osctld/execution_plan.rb', line 61

def stop
  @queue.clear

  sync do
    next unless @thread
    @thread.join
    @thread = nil
  end
end

#syncObject (protected)



99
100
101
102
103
104
105
106
# File 'lib/osctld/execution_plan.rb', line 99

def sync
  if @mutex.owned?
    yield

  else
    @mutex.synchronize { yield }
  end
end

#waitObject

Wait for the execution to finish, if it is running



77
78
79
80
81
# File 'lib/osctld/execution_plan.rb', line 77

def wait
  sync do
    @cond.wait(@mutex) if running?
  end
end

#work(block) ⇒ Object (protected)



90
91
92
93
94
95
96
97
# File 'lib/osctld/execution_plan.rb', line 90

def work(block)
  while @queue.any?
    v = @queue.shift(block: false)
    break if v.nil?

    block.call(v)
  end
end