Class: OsCtld::ContinuousExecutor

Inherits:
Object
  • Object
show all
Includes:
OsCtl::Lib::Utils::Exception, OsCtl::Lib::Utils::Log
Defined in:
lib/osctld/continuous_executor.rb

Overview

Continuous execution queue

This class handles parallel execution of queued commands with priorities. Commands can be added, the queue can be cleared and the pool of workers can be resized at runtime.

Defined Under Namespace

Classes: Command

Instance Method Summary collapse

Constructor Details

#initialize(size) ⇒ ContinuousExecutor

Returns a new instance of ContinuousExecutor.

Parameters:

  • size (Integer)

    initial number of workers



60
61
62
63
64
65
66
67
68
69
# File 'lib/osctld/continuous_executor.rb', line 60

def initialize(size)
  @mutex = Mutex.new
  @size = size
  @workers = []
  @front_queue = OsCtl::Lib::Queue.new
  @exec_queue = []
  @counter = Concurrent::AtomicFixnum.new(0)

  start
end

Instance Method Details

#<<(cmd) ⇒ Object

Enqueue command for execution

Parameters:



79
80
81
# File 'lib/osctld/continuous_executor.rb', line 79

def <<(cmd)
  enqueue(cmd)
end

#clearObject

Clear the execution queue



100
101
102
# File 'lib/osctld/continuous_executor.rb', line 100

def clear
  @front_queue << [:clear]
end

#enqueue(cmd) ⇒ Object

Enqueue command for execution

Parameters:



73
74
75
# File 'lib/osctld/continuous_executor.rb', line 73

def enqueue(cmd)
  @front_queue << [:command, cmd]
end

#exec(cmd) ⇒ Object (protected)



193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
# File 'lib/osctld/continuous_executor.rb', line 193

def exec(cmd)
  t = Thread.new do
    begin
      ret = cmd.send(:exec)

    rescue Exception => e
      log(:warn, 'cont', "Exception raised during command execution: #{e.message}")
      puts denixstorify(e.backtrace).join("\n")

    ensure
      @front_queue << [:thread, Thread.current]
      cmd.send(:done, ret)
    end
  end

  @workers << t
end

#execute(cmd, timeout: nil) ⇒ any

Enqueue command for execution and wait until it is executed

Parameters:

  • cmd (Command)
  • timeout (Integer, nil) (defaults to: nil)

    how long to wait for the command to execute

Returns:

  • (any)

    return value from the executed command



87
88
89
90
91
# File 'lib/osctld/continuous_executor.rb', line 87

def execute(cmd, timeout: nil)
  q = cmd.send(:return_queue)
  enqueue(cmd)
  q.shift(timeout: timeout)
end

#queueArray<Command>

Return contents of the current queue

Returns:



117
118
119
# File 'lib/osctld/continuous_executor.rb', line 117

def queue
  sync { @exec_queue.clone }
end

#remove(cmd_id) ⇒ Object

Remove enqueued command identified by its id

Parameters:

  • id (any)


95
96
97
# File 'lib/osctld/continuous_executor.rb', line 95

def remove(cmd_id)
  @front_queue << [:remove, cmd_id]
end

#resize(new_size) ⇒ Object

Parameters:

  • new_size (Integer)

    new number of workers



111
112
113
# File 'lib/osctld/continuous_executor.rb', line 111

def resize(new_size)
  @front_queue << [:resize, new_size]
end

#startObject (protected)



122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
# File 'lib/osctld/continuous_executor.rb', line 122

def start
  @main = Thread.new do
    loop do
      cmd, arg = @front_queue.pop

      case cmd
      when :command
        if arg.is_a?(Command)
          arg.send(:'order=', @counter.increment)

          sync do
            if @workers.size < @size
              exec(arg)
            else
              @exec_queue << arg
              @exec_queue.sort!
            end
          end

        elsif arg.is_a?(Array)
          arg.each { |c| c.send(:'order=', @counter.increment) }

          sync do
            @exec_queue.concat(arg)
            @exec_queue.sort!

            while @workers.size < @size && @exec_queue.any?
              exec(@exec_queue.shift)
            end
          end
        end

      when :thread
        sync do
          @workers.delete(arg)

          while @workers.size < @size && @exec_queue.any?
            exec(@exec_queue.shift)
          end
        end

      when :clear
        @front_queue.clear
        sync { @exec_queue.clear }

      when :remove
        sync do
          @exec_queue.delete_if { |c| c.id == arg }
        end

      when :stop
        sync do
          @exec_queue.clear
          @workers.each(&:join)
        end

        break

      when :resize
        sync do
          @size = arg

          while @workers.size < @size && @exec_queue.any?
            exec(@exec_queue.shift)
          end
        end
      end
    end
  end
end

#stopObject

Stop execution and wait for all workers to finish



105
106
107
108
# File 'lib/osctld/continuous_executor.rb', line 105

def stop
  @front_queue << [:stop]
  @main.join
end

#syncObject (protected)



211
212
213
# File 'lib/osctld/continuous_executor.rb', line 211

def sync
  @mutex.synchronize { yield }
end