Class: TestRunner::Executor
- Inherits:
-
Object
- Object
- TestRunner::Executor
- Defined in:
- lib/test-runner/executor.rb
Constant Summary collapse
- DEFAULT_RESOURCE_REFRESH_INTERVAL =
15
Instance Attribute Summary collapse
-
#mutex ⇒ Object
readonly
protected
Returns the value of attribute mutex.
- #opts ⇒ Hash readonly
-
#pending ⇒ Object
readonly
protected
Returns the value of attribute pending.
-
#resource_monitor_cv ⇒ Object
readonly
protected
Returns the value of attribute resource_monitor_cv.
-
#resource_monitor_mutex ⇒ Object
readonly
protected
Returns the value of attribute resource_monitor_mutex.
-
#resource_pool ⇒ Object
readonly
protected
Returns the value of attribute resource_pool.
-
#resource_refresh_interval ⇒ Object
readonly
protected
Returns the value of attribute resource_refresh_interval.
- #results ⇒ Array<TestResult> readonly
-
#scheduler_cv ⇒ Object
readonly
protected
Returns the value of attribute scheduler_cv.
-
#scheduler_mutex ⇒ Object
readonly
protected
Returns the value of attribute scheduler_mutex.
- #test_scripts ⇒ Array<TestScript> readonly
-
#workers ⇒ Object
readonly
protected
Returns the value of attribute workers.
Instance Method Summary collapse
- #build_accumulated_test_result(test, scripts, latest_script_results, elapsed_time, last_result) ⇒ Object protected
- #fill_queue ⇒ Object protected
-
#initialize(test_scripts, **opts) ⇒ Executor
constructor
A new instance of Executor.
- #last_nonempty_line(path, max_bytes: 8192) ⇒ Object protected
- #log(msg = '') ⇒ Object protected
- #log_reserved_resources(test, resources) ⇒ Object protected
- #log_resource_wait ⇒ Object protected
- #parse_resource_refresh_interval(value) ⇒ Object protected
- #refresh_resource_capacity ⇒ Object protected
- #refresh_resource_capacity_locked ⇒ Object protected
- #release_test_resources(resources) ⇒ Object protected
- #reserve_next_test ⇒ Object protected
- #run ⇒ Array<TestResult>
- #run_resource_monitor ⇒ Object protected
- #run_test(test, scripts, prefix:) ⇒ Object protected
- #run_test_attempt(i, test, scripts, attempt) ⇒ Object protected
- #run_test_with_retries(i, test, scripts) ⇒ Object protected
- #run_worker(_w_i) ⇒ Object protected
- #schedulable_test_index ⇒ Object protected
- #start_resource_monitor ⇒ Object protected
- #start_worker(i) ⇒ Object protected
- #state_dir ⇒ Object protected
- #stop_resource_monitor ⇒ Object protected
- #stop_work! ⇒ Object protected
- #stop_work? ⇒ Boolean protected
- #test_sock_dir ⇒ Object protected
- #test_state_dir(test) ⇒ Object protected
- #test_state_key(test) ⇒ Object protected
- #wait_for_workers ⇒ Object protected
Constructor Details
#initialize(test_scripts, **opts) ⇒ Executor
Returns a new instance of Executor.
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 |
# File 'lib/test-runner/executor.rb', line 29 def initialize(test_scripts, **opts) @test_scripts = test_scripts @opts = opts @workers = [] @pending = [] @results = [] @stop_work = false @mutex = Mutex.new @scheduler_mutex = Mutex.new @scheduler_cv = ConditionVariable.new @resource_pool = ResourcePool.(opts) @last_resource_wait_log_at = nil @resource_refresh_interval = parse_resource_refresh_interval(opts[:resource_refresh_interval]) @resource_monitor_mutex = Mutex.new @resource_monitor_cv = ConditionVariable.new @resource_monitor_stop = false @resource_monitor = nil fill_queue end |
Instance Attribute Details
#mutex ⇒ Object (readonly, protected)
Returns the value of attribute mutex.
139 140 141 |
# File 'lib/test-runner/executor.rb', line 139 def mutex @mutex end |
#opts ⇒ Hash (readonly)
14 15 16 |
# File 'lib/test-runner/executor.rb', line 14 def opts @opts end |
#pending ⇒ Object (readonly, protected)
Returns the value of attribute pending.
139 140 141 |
# File 'lib/test-runner/executor.rb', line 139 def pending @pending end |
#resource_monitor_cv ⇒ Object (readonly, protected)
Returns the value of attribute resource_monitor_cv.
139 140 141 |
# File 'lib/test-runner/executor.rb', line 139 def resource_monitor_cv @resource_monitor_cv end |
#resource_monitor_mutex ⇒ Object (readonly, protected)
Returns the value of attribute resource_monitor_mutex.
139 140 141 |
# File 'lib/test-runner/executor.rb', line 139 def resource_monitor_mutex @resource_monitor_mutex end |
#resource_pool ⇒ Object (readonly, protected)
Returns the value of attribute resource_pool.
139 140 141 |
# File 'lib/test-runner/executor.rb', line 139 def resource_pool @resource_pool end |
#resource_refresh_interval ⇒ Object (readonly, protected)
Returns the value of attribute resource_refresh_interval.
139 140 141 |
# File 'lib/test-runner/executor.rb', line 139 def resource_refresh_interval @resource_refresh_interval end |
#results ⇒ Array<TestResult> (readonly)
17 18 19 |
# File 'lib/test-runner/executor.rb', line 17 def results @results end |
#scheduler_cv ⇒ Object (readonly, protected)
Returns the value of attribute scheduler_cv.
139 140 141 |
# File 'lib/test-runner/executor.rb', line 139 def scheduler_cv @scheduler_cv end |
#scheduler_mutex ⇒ Object (readonly, protected)
Returns the value of attribute scheduler_mutex.
139 140 141 |
# File 'lib/test-runner/executor.rb', line 139 def scheduler_mutex @scheduler_mutex end |
#test_scripts ⇒ Array<TestScript> (readonly)
11 12 13 |
# File 'lib/test-runner/executor.rb', line 11 def test_scripts @test_scripts end |
#workers ⇒ Object (readonly, protected)
Returns the value of attribute workers.
139 140 141 |
# File 'lib/test-runner/executor.rb', line 139 def workers @workers end |
Instance Method Details
#build_accumulated_test_result(test, scripts, latest_script_results, elapsed_time, last_result) ⇒ Object (protected)
388 389 390 391 392 393 394 395 396 397 398 399 400 |
# File 'lib/test-runner/executor.rb', line 388 def build_accumulated_test_result(test, scripts, latest_script_results, elapsed_time, last_result) script_results = scripts.map do |script| latest_script_results.fetch(script) { TestScriptResult.new(script, false, -1) } end TestResult.new( test, script_results, last_result&.successful? || false, elapsed_time, last_result&.state_dir || test_state_dir(test) ) end |
#fill_queue ⇒ Object (protected)
149 150 151 152 153 154 155 156 157 158 159 160 161 162 |
# File 'lib/test-runner/executor.rb', line 149 def fill_queue tests = {} test_scripts.each do |ts| tests[ts.test] ||= [] tests[ts.test] << ts end tests.to_a.shuffle!.each_with_index do |(test, scripts), i| @pending << [i, test, scripts.shuffle!] end @test_count = tests.length end |
#last_nonempty_line(path, max_bytes: 8192) ⇒ Object (protected)
586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 |
# File 'lib/test-runner/executor.rb', line 586 def last_nonempty_line(path, max_bytes: 8192) return nil unless File.file?(path) size = File.size(path) return nil if size <= 0 offset = [size - max_bytes, 0].max data = File.open(path, 'rb') do |f| f.seek(offset) f.read end data.lines.reverse_each do |line| stripped = line.strip return stripped unless stripped.empty? end nil rescue Errno::ENOENT, Errno::EACCES nil end |
#log(msg = '') ⇒ Object (protected)
608 609 610 |
# File 'lib/test-runner/executor.rb', line 608 def log(msg = '') mutex.synchronize { puts "[#{Time.now}] #{msg}" } end |
#log_reserved_resources(test, resources) ⇒ Object (protected)
297 298 299 300 301 302 |
# File 'lib/test-runner/executor.rb', line 297 def log_reserved_resources(test, resources) log( "Reserved resources for '#{test.path}': #{resources.summary}; " \ "pool: #{resource_pool.status}" ) end |
#log_resource_wait ⇒ Object (protected)
304 305 306 307 308 309 310 311 312 313 314 |
# File 'lib/test-runner/executor.rb', line 304 def log_resource_wait now = Time.now return if @last_resource_wait_log_at && now - @last_resource_wait_log_at < 60 @last_resource_wait_log_at = now waiting = pending.first(3).map do |_test_i, test, _scripts| "#{test.path} (#{test.resources.summary})" end.join(', ') log("Waiting for resources: pool: #{resource_pool.status}; pending: #{waiting}") end |
#parse_resource_refresh_interval(value) ⇒ Object (protected)
288 289 290 291 292 293 294 295 |
# File 'lib/test-runner/executor.rb', line 288 def parse_resource_refresh_interval(value) value = DEFAULT_RESOURCE_REFRESH_INTERVAL if value.nil? || value.to_s == '' ret = Float(value) raise ArgumentError, 'resource refresh interval must be positive' if ret <= 0 ret end |
#refresh_resource_capacity ⇒ Object (protected)
239 240 241 242 243 |
# File 'lib/test-runner/executor.rb', line 239 def refresh_resource_capacity scheduler_mutex.synchronize do refresh_resource_capacity_locked end end |
#refresh_resource_capacity_locked ⇒ Object (protected)
245 246 247 248 249 250 251 252 253 |
# File 'lib/test-runner/executor.rb', line 245 def refresh_resource_capacity_locked previous_status = resource_pool.status return unless resource_pool.refresh_capacity current_status = resource_pool.status scheduler_cv.broadcast log("Resource limits updated: #{current_status}") if previous_status != current_status end |
#release_test_resources(resources) ⇒ Object (protected)
232 233 234 235 236 237 |
# File 'lib/test-runner/executor.rb', line 232 def release_test_resources(resources) scheduler_mutex.synchronize do resource_pool.release(resources) scheduler_cv.broadcast end end |
#reserve_next_test ⇒ Object (protected)
192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 |
# File 'lib/test-runner/executor.rb', line 192 def reserve_next_test scheduler_mutex.synchronize do loop do return nil if stop_work? return nil if pending.empty? i = schedulable_test_index if i item = pending.delete_at(i) _test_i, test, = item resources = test.resources resource_pool.reserve(resources) log_reserved_resources(test, resources) return [*item, resources] end log_resource_wait scheduler_cv.wait(scheduler_mutex) end end end |
#run ⇒ Array<TestResult>
51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 |
# File 'lib/test-runner/executor.rb', line 51 def run log( "Running #{test_scripts.length} scripts of #{@test_count} tests, " \ "at most #{opts[:jobs]} tests at a time#{opts[:jobs_auto] ? ' (auto)' : ''}" ) log("Resource limits: #{resource_pool.status}") log("State directory is #{state_dir}") t1 = Time.now begin start_resource_monitor opts[:jobs].times do |i| start_worker(i) end wait_for_workers ensure stop_resource_monitor end log("Run #{results.inject(0) { |acc, r| acc + r.script_results.length }} test scripts of #{@test_count} tests in #{(Time.now - t1).round(2)} seconds") expected_successful = results.select do |r| r.expected_to_succeed? && r.successful? end expected_failed = results.select do |r| r.expected_to_fail? && r.failed? end unexpected_failed = results.select do |r| r.expected_to_succeed? && r.failed? end unexpected_successful = results.select do |r| r.expected_to_fail? && r.successful? end if expected_successful.any? log("#{expected_successful.length} tests successful") end if expected_failed.any? log("#{expected_failed.length} tests failed as expected") end if unexpected_failed.any? log("#{unexpected_failed.length} tests should have succeeded, but failed") end if unexpected_successful.any? log("#{unexpected_successful.length} tests should have failed, but succeeded") end if unexpected_failed.any? log('Unexpectedly failed test scripts:') unexpected_failed.each do |test_result| test_result.script_results.each do |test_script_result| next if test_script_result.expected_result? log(" #{test_script_result.test_script.path}") end end puts end if unexpected_successful.any? log('Unexpectedly successful test scripts:') unexpected_successful.each do |test_result| test_result.script_results.each do |test_script_result| next if test_script_result.expected_result? log(" #{test_script_result.test_script.path}") end end puts end results end |
#run_resource_monitor ⇒ Object (protected)
273 274 275 276 277 278 279 280 281 282 283 284 285 286 |
# File 'lib/test-runner/executor.rb', line 273 def run_resource_monitor loop do resource_monitor_mutex.synchronize do return if @resource_monitor_stop resource_monitor_cv.wait(resource_monitor_mutex, resource_refresh_interval) return if @resource_monitor_stop end refresh_resource_capacity rescue StandardError => e log("Resource monitor failed: #{e.class}: #{e.}") end end |
#run_test(test, scripts, prefix:) ⇒ Object (protected)
402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 |
# File 'lib/test-runner/executor.rb', line 402 def run_test(test, scripts, prefix:) t1 = Time.now dir = test_state_dir(test) r, w = IO.pipe # 4 ports for use with boot.qemu.networks.[i].socket.mcast.port mcast_ports = OsVm::PortReservation.get_ports(key: "test:#{test.path}", size: 4) pid = Process.fork do r.close FileUtils.mkdir_p(dir) out = File.open(File.join(dir, 'test-runner.log'), 'w') $stdout.reopen(out) $stderr.reopen(out) $stdin.close OsVm::PortReservation.reset_to_ports(mcast_ports) ev = TestRunner::TestEvaluator.new( test, scripts, system: opts[:system], test_config_path: opts[:test_config_path], state_dir: dir, sock_dir: test_sock_dir, default_timeout: opts[:default_timeout], destructive: opts[:destructive], recreate_disks: opts[:recreate_disks] ) ev.run do |result_hash| w.puts(result_hash.to_json) end end w.close script_results = [] test_runner_log = File.join(dir, 'test-runner.log') heartbeat_interval = 300 next_heartbeat_at = Time.now + heartbeat_interval begin loop do timeout = [next_heartbeat_at - Time.now, 0].max ready = r.wait_readable(timeout) if ready.nil? elapsed = (Time.now - t1).round(2) msg = "#{prefix} Test '#{test.path}' still running after #{elapsed} seconds, log: #{test_runner_log}" last_line = last_nonempty_line(test_runner_log) msg += ", last output: #{last_line}" if last_line log(msg) next_heartbeat_at = Time.now + heartbeat_interval next end line = r.gets break if line.nil? next_heartbeat_at = Time.now + heartbeat_interval begin result_hash = JSON.parse(line) rescue JSON::ParserError warn "Unable to parse test script result json: #{line.inspect}" next end case result_hash['type'] when 'script' test_script = test.test_scripts[result_hash['script']] script_result = TestScriptResult.from_h(test_script, result_hash) script_results << script_result next if test_script.singleton? secs = script_result.elapsed_time.round(2) if script_result.expected_result? if script_result.successful? log("#{prefix} Script '#{test_script.path}' successful in #{secs} seconds") else log("#{prefix} Script '#{test_script.path}' failed as expected in #{secs} seconds") end else # unexpected result if script_result.successful? log("#{prefix} Script '#{test_script.path}' unexpectedly succeeded in #{secs} seconds") else log("#{prefix} Script '#{test_script.path}' failed after #{secs} seconds") end stop_work! if opts[:stop_on_failure] end when 'example' status = if result_hash['success'] if result_hash['pending'] 'pending' elsif result_hash['skip'] 'skipped' else 'succeeded' end elsif result_hash['pending'] 'unexpectedly succeeded' else 'failed' end log("#{prefix} Example [#{result_hash['progress']}/#{result_hash['total']}] '#{result_hash['example']}' #{status} in #{result_hash['elapsed_time'].round(2)} seconds") end end rescue EOFError # pass end Process.wait(pid) OsVm::PortReservation.release_ports(key: "test:#{test.path}") # Complement script results if some are missing scripts.each do |script| script_result = script_results.detect { |sr| sr.test_script == script } next if script_result script_results << TestScriptResult.new(script, false, -1) end result = TestResult.new( test, script_results, $?.exitstatus == 0, Time.now - t1, dir ) File.open(File.join(dir, 'test-result.txt'), 'w') do |f| str = if result.expected_result? if result.successful? 'expected_success' else 'expected_failure' end elsif result.successful? 'unexpected_success' else 'unexpected_failure' end f.puts(str) end result end |
#run_test_attempt(i, test, scripts, attempt) ⇒ Object (protected)
354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 |
# File 'lib/test-runner/executor.rb', line 354 def run_test_attempt(i, test, scripts, attempt) prefix = "[#{i + 1}/#{@test_count}]" script_list = scripts.map { |v| "##{v.name}" }.join(', ') max_attempts = scripts.map(&:attempts).max if attempt > 0 log("#{prefix} Retrying test '#{test.path}' (#{script_list}) (attempt #{attempt + 1}/#{max_attempts})") else log("#{prefix} Running test '#{test.path}' (#{script_list})") end result = run_test(test, scripts, prefix:) secs = result.elapsed_time.round(2) if result.expected_result? if result.successful? log("#{prefix} Test '#{test.path}' successful in #{secs} seconds") else log("#{prefix} Test '#{test.path}' failed as expected in #{secs} seconds") end else # unexpected result if result.successful? log("#{prefix} Test '#{test.path}' unexpectedly succeeded in #{secs} seconds, see #{result.state_dir}") else log("#{prefix} Test '#{test.path}' failed after #{secs} seconds, see #{result.state_dir}") end stop_work! if opts[:stop_on_failure] end result end |
#run_test_with_retries(i, test, scripts) ⇒ Object (protected)
316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 |
# File 'lib/test-runner/executor.rb', line 316 def run_test_with_retries(i, test, scripts) latest_script_results = {} remaining_scripts = scripts elapsed_time = 0 last_result = nil attempt = 0 loop do last_result = run_test_attempt(i, test, remaining_scripts, attempt) elapsed_time += last_result.elapsed_time last_result.script_results.each do |script_result| latest_script_results[script_result.test_script] = script_result end break if stop_work? remaining_scripts = remaining_scripts.select do |script| script_result = latest_script_results.fetch(script) script_result.unexpected_result? && attempt + 1 < script.attempts end break if remaining_scripts.empty? sleep(5) attempt += 1 end build_accumulated_test_result( test, scripts, latest_script_results, elapsed_time, last_result ) end |
#run_worker(_w_i) ⇒ Object (protected)
172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 |
# File 'lib/test-runner/executor.rb', line 172 def run_worker(_w_i) loop do return if stop_work? reserved_test = reserve_next_test return if reserved_test.nil? i, test, scripts, resources = reserved_test result = nil begin result = run_test_with_retries(i, test, scripts) ensure release_test_resources(resources) end mutex.synchronize { results << result } unless result.nil? end end |
#schedulable_test_index ⇒ Object (protected)
217 218 219 220 221 222 223 224 225 226 227 228 229 230 |
# File 'lib/test-runner/executor.rb', line 217 def schedulable_test_index i = pending.index do |_test_i, test, _scripts| resource_pool.can_reserve?(test.resources) end return i unless i.nil? # Never deadlock the suite just because one test is larger than the # detected capacity. Run it alone and let QEMU or the host enforce the # real limit. return 0 if resource_pool.running == 0 nil end |
#start_resource_monitor ⇒ Object (protected)
255 256 257 258 |
# File 'lib/test-runner/executor.rb', line 255 def start_resource_monitor @resource_monitor_stop = false @resource_monitor = Thread.new { run_resource_monitor } end |
#start_worker(i) ⇒ Object (protected)
164 165 166 |
# File 'lib/test-runner/executor.rb', line 164 def start_worker(i) workers << Thread.new { run_worker(i) } end |
#state_dir ⇒ Object (protected)
577 578 579 |
# File 'lib/test-runner/executor.rb', line 577 def state_dir opts[:state_dir] end |
#stop_resource_monitor ⇒ Object (protected)
260 261 262 263 264 265 266 267 268 269 270 271 |
# File 'lib/test-runner/executor.rb', line 260 def stop_resource_monitor thread = @resource_monitor return if thread.nil? resource_monitor_mutex.synchronize do @resource_monitor_stop = true resource_monitor_cv.signal end thread.join @resource_monitor = nil end |
#stop_work! ⇒ Object (protected)
560 561 562 563 |
# File 'lib/test-runner/executor.rb', line 560 def stop_work! @stop_work = true scheduler_mutex.synchronize { scheduler_cv.broadcast } end |
#stop_work? ⇒ Boolean (protected)
565 566 567 |
# File 'lib/test-runner/executor.rb', line 565 def stop_work? @stop_work end |
#test_sock_dir ⇒ Object (protected)
573 574 575 |
# File 'lib/test-runner/executor.rb', line 573 def test_sock_dir File.join(state_dir, 'socks') end |
#test_state_dir(test) ⇒ Object (protected)
569 570 571 |
# File 'lib/test-runner/executor.rb', line 569 def test_state_dir(test) File.join(state_dir, "os-test-#{test_state_key(test)}") end |
#test_state_key(test) ⇒ Object (protected)
581 582 583 584 |
# File 'lib/test-runner/executor.rb', line 581 def test_state_key(test) slug = test.path.gsub(/[^A-Za-z0-9_.-]+/, '__') "#{slug}-#{Digest::SHA256.hexdigest(test.path)[0, 8]}" end |
#wait_for_workers ⇒ Object (protected)
168 169 170 |
# File 'lib/test-runner/executor.rb', line 168 def wait_for_workers workers.each(&:join) end |