Class: OsCtld::ContinuousExecutor

Inherits:
Object
  • Object
show all
Includes:
OsCtl::Lib::Utils::Exception, OsCtl::Lib::Utils::Log
Defined in:
lib/osctld/continuous_executor.rb

Overview

Continuous execution queue

This class handles parallel execution of queued commands with priorities. Commands can be added, the queue can be cleared and the pool of workers can be resized at runtime.

Defined Under Namespace

Classes: Command

Instance Method Summary collapse

Constructor Details

#initialize(size) ⇒ ContinuousExecutor

Returns a new instance of ContinuousExecutor.

Parameters:

  • size (Integer)

    initial number of workers



60
61
62
63
64
65
66
67
68
69
70
# File 'lib/osctld/continuous_executor.rb', line 60

def initialize(size)
  @mutex = Mutex.new
  @size = size
  @workers = []
  @front_queue = OsCtl::Lib::Queue.new
  @exec_queue = []
  @waiters = []
  @counter = Concurrent::AtomicFixnum.new(0)

  start
end

Instance Method Details

#<<(cmd) ⇒ Object

Enqueue command for execution

Parameters:



80
81
82
# File 'lib/osctld/continuous_executor.rb', line 80

def <<(cmd)
  enqueue(cmd)
end

#clearObject

Clear the execution queue



101
102
103
# File 'lib/osctld/continuous_executor.rb', line 101

def clear
  @front_queue << [:clear]
end

#enqueue(cmd) ⇒ Object

Enqueue command for execution

Parameters:



74
75
76
# File 'lib/osctld/continuous_executor.rb', line 74

def enqueue(cmd)
  @front_queue << [:command, cmd]
end

#exec(cmd) ⇒ Object (protected)



220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
# File 'lib/osctld/continuous_executor.rb', line 220

def exec(cmd)
  t = Thread.new do
    begin
      ret = cmd.send(:exec)

    rescue Exception => e
      log(:warn, 'cont', "Exception raised during command execution: #{e.message}")
      puts denixstorify(e.backtrace).join("\n")

    ensure
      @front_queue << [:thread, Thread.current]
      cmd.send(:done, ret)
    end
  end

  @workers << t
end

#execute(cmd, timeout: nil) ⇒ any

Enqueue command for execution and wait until it is executed

Parameters:

  • cmd (Command)
  • timeout (Integer, nil) (defaults to: nil)

    how long to wait for the command to execute

Returns:

  • (any)

    return value from the executed command



88
89
90
91
92
# File 'lib/osctld/continuous_executor.rb', line 88

def execute(cmd, timeout: nil)
  q = cmd.send(:return_queue)
  enqueue(cmd)
  q.shift(timeout: timeout)
end

#queueArray<Command>

Return contents of the current queue

Returns:



118
119
120
# File 'lib/osctld/continuous_executor.rb', line 118

def queue
  sync { @exec_queue.clone }
end

#remove(cmd_id) ⇒ Object

Remove enqueued command identified by its id

Parameters:

  • cmd_id (any)


96
97
98
# File 'lib/osctld/continuous_executor.rb', line 96

def remove(cmd_id)
  @front_queue << [:remove, cmd_id]
end

#resize(new_size) ⇒ Object

Parameters:

  • new_size (Integer)

    new number of workers



112
113
114
# File 'lib/osctld/continuous_executor.rb', line 112

def resize(new_size)
  @front_queue << [:resize, new_size]
end

#startObject (protected)



131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
# File 'lib/osctld/continuous_executor.rb', line 131

def start
  @main = Thread.new do
    loop do
      cmd, arg = @front_queue.pop

      case cmd
      when :command
        if arg.is_a?(Command)
          arg.send(:'order=', @counter.increment)

          sync do
            if @workers.size < @size
              exec(arg)
            else
              @exec_queue << arg
              @exec_queue.sort!
            end
          end

        elsif arg.is_a?(Array)
          arg.each { |c| c.send(:'order=', @counter.increment) }

          sync do
            @exec_queue.concat(arg)
            @exec_queue.sort!

            while @workers.size < @size && @exec_queue.any?
              exec(@exec_queue.shift)
            end
          end
        end

      when :thread
        sync do
          @workers.delete(arg)

          while @workers.size < @size && @exec_queue.any?
            exec(@exec_queue.shift)
          end

          wake_waiters
        end

      when :clear
        @front_queue.clear
        sync do
          @exec_queue.clear
          wake_waiters
        end

      when :remove
        sync do
          @exec_queue.delete_if { |c| c.id == arg }
          wake_waiters
        end

      when :stop
        sync do
          @exec_queue.clear
          @workers.each(&:join)
          wake_waiters(force: true)
        end

        break

      when :resize
        sync do
          @size = arg

          while @workers.size < @size && @exec_queue.any?
            exec(@exec_queue.shift)
          end
        end

      when :wait
        sync do
          q = arg

          if @workers.empty? && @exec_queue.empty?
            q << true
          else
            @waiters << q
          end
        end
      end
    end
  end
end

#stopObject

Stop execution and wait for all workers to finish



106
107
108
109
# File 'lib/osctld/continuous_executor.rb', line 106

def stop
  @front_queue << [:stop]
  @main.join
end

#syncObject (protected)



247
248
249
# File 'lib/osctld/continuous_executor.rb', line 247

def sync
  @mutex.synchronize { yield }
end

#wait_until_emptyObject

Block until the queue is empty



123
124
125
126
127
128
# File 'lib/osctld/continuous_executor.rb', line 123

def wait_until_empty
  q = OsCtl::Lib::Queue.new
  @front_queue << [:wait, q]
  q.pop
  nil
end

#wake_waiters(force: false) ⇒ Object (protected)



238
239
240
241
242
243
244
245
# File 'lib/osctld/continuous_executor.rb', line 238

def wake_waiters(force: false)
  if force || (@workers.empty? && @exec_queue.empty?)
    @waiters.delete_if do |q|
      q << true
      true
    end
  end
end