Class: OsCtl::Image::Operations::Execution::Parallel

Inherits:
Base
  • Object
show all
Includes:
Lib::Utils::Log
Defined in:
lib/osctl/image/operations/execution/parallel.rb

Defined Under Namespace

Classes: Item, Result

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods inherited from Base

run

Constructor Details

#initialize(jobs) ⇒ Parallel

Returns a new instance of Parallel.

Parameters:

  • jobs (Integer)


16
17
18
19
20
21
22
# File 'lib/osctl/image/operations/execution/parallel.rb', line 16

def initialize(jobs)
  @jobs = jobs
  @mutex = ::Mutex.new
  @queue = OsCtl::Lib::Queue.new
  @threads = []
  @results = []
end

Instance Attribute Details

#jobsInteger (readonly)

Returns:

  • (Integer)


13
14
15
# File 'lib/osctl/image/operations/execution/parallel.rb', line 13

def jobs
  @jobs
end

#mutexObject (readonly, protected)

Returns the value of attribute mutex.



43
44
45
# File 'lib/osctl/image/operations/execution/parallel.rb', line 43

def mutex
  @mutex
end

#queueInteger (readonly)

Returns:

  • (Integer)


13
14
15
# File 'lib/osctl/image/operations/execution/parallel.rb', line 13

def queue
  @queue
end

#resultsObject (readonly, protected)

Returns the value of attribute results.



43
44
45
# File 'lib/osctl/image/operations/execution/parallel.rb', line 43

def results
  @results
end

#threadsObject (readonly, protected)

Returns the value of attribute threads.



43
44
45
# File 'lib/osctl/image/operations/execution/parallel.rb', line 43

def threads
  @threads
end

Instance Method Details

#add(obj, &block) ⇒ Object



24
25
26
# File 'lib/osctl/image/operations/execution/parallel.rb', line 24

def add(obj, &block)
  queue << Item.new(obj, block)
end

#add_result(result) ⇒ Object (protected)



75
76
77
# File 'lib/osctl/image/operations/execution/parallel.rb', line 75

def add_result(result)
  mutex.synchronize { results << result }
end

#executeArray<Result>

Returns:



29
30
31
32
33
34
35
36
# File 'lib/osctl/image/operations/execution/parallel.rb', line 29

def execute
  jobs.times do |i|
    threads << Thread.new { work_loop(i) }
  end

  join_threads
  results
end

#join_threadsObject (protected)



79
80
81
# File 'lib/osctl/image/operations/execution/parallel.rb', line 79

def join_threads
  threads.each(&:join)
end

#stopObject



38
39
40
# File 'lib/osctl/image/operations/execution/parallel.rb', line 38

def stop
  queue.clear
end

#work_loop(i) ⇒ Object (protected)



45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
# File 'lib/osctl/image/operations/execution/parallel.rb', line 45

def work_loop(i)
  loop do
    item = queue.shift(block: false)

    if item.nil?
      log(:info, "Worker ##{i} finished")
      return
    end

    log(:info, "Worker ##{i} executing job for #{item.obj}")

    begin
      ret = item.block.call
      exception = nil
    rescue => e
      log(:info, "Worker ##{i} caught exception: #{e.class}: #{e.message}")
      log(:info, e.backtrace.join("\n"))
      ret = nil
      exception = e
    end

    add_result(Result.new(
      exception.nil?,
      item.obj,
      ret,
      exception,
    ))
  end
end