Class: OsCtld::ExecutionPlan

Inherits:
Object
  • Object
show all
Includes:
OsCtl::Lib::Utils::Log
Defined in:
lib/osctld/execution_plan.rb

Overview

Parallel executor of queued operations

First, add items to the internal queue using #<<. When done, call #run with a block. ExecutionPlan will start a configured number of threads and let them consume the queued items. The given block is called for every executed item, but the call may be done from different threads.

Instance Method Summary collapse

Constructor Details

#initializeExecutionPlan

Returns a new instance of ExecutionPlan.



14
15
16
17
18
# File 'lib/osctld/execution_plan.rb', line 14

def initialize
  @queue = OsCtl::Lib::Queue.new
  @mutex = Mutex.new
  @cond = ConditionVariable.new
end

Instance Method Details

#<<(v) ⇒ Object

Enqueue item, cannot be called after the execution has been started



21
22
23
24
25
# File 'lib/osctld/execution_plan.rb', line 21

def <<(v)
  raise 'already in progress' if running?

  @queue << v
end

#default_threadsInteger

Number of default threads unless overriden in #run

Returns:

  • (Integer)


107
108
109
# File 'lib/osctld/execution_plan.rb', line 107

def default_threads
  @default_threads ||= [Etc.nprocessors / 2, 1].max
end

#empty?Boolean

Return true if the queue is empty

Returns:

  • (Boolean)


101
102
103
# File 'lib/osctld/execution_plan.rb', line 101

def empty?
  @queue.empty?
end

#get_threads(n) ⇒ Object (protected)



122
123
124
125
126
127
128
129
130
131
132
133
# File 'lib/osctld/execution_plan.rb', line 122

def get_threads(n)
  case n
  when nil
    default_threads
  when :all
    Etc.nprocessors
  when :half
    [Etc.nprocessors / 2, 1].max
  else
    raise "unsupported threads value #{n.inspect}"
  end
end

#lengthInteger

Return the number of queued items

Returns:

  • (Integer)


95
96
97
# File 'lib/osctld/execution_plan.rb', line 95

def length
  @queue.length
end

#on_done(&block) ⇒ Object

Execute a block when all threads have finished and the queue is empty



33
34
35
# File 'lib/osctld/execution_plan.rb', line 33

def on_done(&block)
  @on_done = block
end

#on_start(&block) ⇒ Object

Execute a block before the execution threads are started



28
29
30
# File 'lib/osctld/execution_plan.rb', line 28

def on_start(&block)
  @on_start = block
end

#queueArray

Return the currently enqueued items in an array

Returns:

  • (Array)


89
90
91
# File 'lib/osctld/execution_plan.rb', line 89

def queue
  @queue.to_a
end

#run(threads: nil) {|queued| ... } ⇒ Object

Start processing of queued items

The given block is called for every executed item. The calls can be made in parallel from different threads.

Parameters:

  • threads (Integer, :all, :half) (defaults to: nil)

    max number of threads to spawn

Yield Parameters:

  • queued (v)

    item



44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/osctld/execution_plan.rb', line 44

def run(threads: nil, &block)
  raise 'already in progress' if running?

  run_threads = [get_threads(threads), @queue.length].min

  @on_start && @on_start.call

  t = Thread.new do
    run_threads.times.map do
      Thread.new { work(block) }
    end.map(&:join)

    @on_done && @on_done.call
    @cond.broadcast
  end

  sync { @thread = t }
end

#running?Boolean

Returns:

  • (Boolean)


76
77
78
# File 'lib/osctld/execution_plan.rb', line 76

def running?
  sync { @thread && @thread.alive? }
end

#stopObject

Clear the queue and wait for all working threads to finish



64
65
66
67
68
69
70
71
72
73
# File 'lib/osctld/execution_plan.rb', line 64

def stop
  @queue.clear

  sync do
    next unless @thread

    @thread.join
    @thread = nil
  end
end

#syncObject (protected)



135
136
137
138
139
140
141
142
# File 'lib/osctld/execution_plan.rb', line 135

def sync(&)
  if @mutex.owned?
    yield

  else
    @mutex.synchronize(&)
  end
end

#waitObject

Wait for the execution to finish, if it is running



81
82
83
84
85
# File 'lib/osctld/execution_plan.rb', line 81

def wait
  sync do
    @cond.wait(@mutex) if running?
  end
end

#work(block) ⇒ Object (protected)



113
114
115
116
117
118
119
120
# File 'lib/osctld/execution_plan.rb', line 113

def work(block)
  while @queue.any?
    v = @queue.shift(block: false)
    break if v.nil?

    block.call(v)
  end
end