Class: OsCtl::Image::Operations::Execution::Parallel

Inherits:
Base
  • Object
show all
Includes:
Lib::Utils::Log
Defined in:
lib/osctl/image/operations/execution/parallel.rb

Defined Under Namespace

Classes: Item, Result

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods inherited from Base

run

Constructor Details

#initialize(jobs) ⇒ Parallel

Returns a new instance of Parallel.

Parameters:

  • jobs (Integer)


15
16
17
18
19
20
21
22
# File 'lib/osctl/image/operations/execution/parallel.rb', line 15

def initialize(jobs)
  super()
  @jobs = jobs
  @mutex = ::Mutex.new
  @queue = OsCtl::Lib::Queue.new
  @threads = []
  @results = []
end

Instance Attribute Details

#jobsInteger (readonly)

Returns:

  • (Integer)


12
13
14
# File 'lib/osctl/image/operations/execution/parallel.rb', line 12

def jobs
  @jobs
end

#mutexObject (readonly, protected)

Returns the value of attribute mutex.



44
45
46
# File 'lib/osctl/image/operations/execution/parallel.rb', line 44

def mutex
  @mutex
end

#queueInteger (readonly)

Returns:

  • (Integer)


12
13
14
# File 'lib/osctl/image/operations/execution/parallel.rb', line 12

def queue
  @queue
end

#resultsObject (readonly, protected)

Returns the value of attribute results.



44
45
46
# File 'lib/osctl/image/operations/execution/parallel.rb', line 44

def results
  @results
end

#threadsObject (readonly, protected)

Returns the value of attribute threads.



44
45
46
# File 'lib/osctl/image/operations/execution/parallel.rb', line 44

def threads
  @threads
end

Instance Method Details

#add(obj, &block) ⇒ Object



24
25
26
# File 'lib/osctl/image/operations/execution/parallel.rb', line 24

def add(obj, &block)
  queue << Item.new(obj, block)
end

#add_result(result) ⇒ Object (protected)



76
77
78
# File 'lib/osctl/image/operations/execution/parallel.rb', line 76

def add_result(result)
  mutex.synchronize { results << result }
end

#executeArray<Result>

Returns:



29
30
31
32
33
34
35
36
# File 'lib/osctl/image/operations/execution/parallel.rb', line 29

def execute
  jobs.times do |i|
    threads << Thread.new { work_loop(i) }
  end

  join_threads
  results
end

#join_threadsObject (protected)



80
81
82
# File 'lib/osctl/image/operations/execution/parallel.rb', line 80

def join_threads
  threads.each(&:join)
end

#stopObject



38
39
40
# File 'lib/osctl/image/operations/execution/parallel.rb', line 38

def stop
  queue.clear
end

#work_loop(i) ⇒ Object (protected)



46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
# File 'lib/osctl/image/operations/execution/parallel.rb', line 46

def work_loop(i)
  loop do
    item = queue.shift(block: false)

    if item.nil?
      log(:info, "Worker ##{i} finished")
      return
    end

    log(:info, "Worker ##{i} executing job for #{item.obj}")

    begin
      ret = item.block.call
      exception = nil
    rescue StandardError => e
      log(:info, "Worker ##{i} caught exception: #{e.class}: #{e.message}")
      log(:info, e.backtrace.join("\n"))
      ret = nil
      exception = e
    end

    add_result(Result.new(
                 exception.nil?,
                 item.obj,
                 ret,
                 exception
               ))
  end
end