Class: OsCtld::CpuScheduler

Inherits:
Object
  • Object
show all
Includes:
OsCtl::Lib::Utils::File, OsCtl::Lib::Utils::Log, Lockable, Singleton
Defined in:
lib/osctld/cpu_scheduler.rb

Overview

Schedule containers on CPUs to keep them running on the same package

Defined Under Namespace

Classes: PackageInfo, ScheduleInfo

Constant Summary collapse

STATE_FILE =
File.join(RunState::CPU_SCHEDULER_DIR, 'state.yml')

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Lockable

#exclusively, included, #inclusively, #init_lock, #lock, #unlock

Constructor Details

#initializeCpuScheduler

Returns a new instance of CpuScheduler.



74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
# File 'lib/osctld/cpu_scheduler.rb', line 74

def initialize
  init_lock

  daemon_cfg = Daemon.get.config.cpu_scheduler

  @enabled = daemon_cfg.enable?
  @manual_toggle = false
  @min_package_container_count_percent = daemon_cfg.min_package_container_count_percent
  @upkeep_queue = OsCtl::Lib::Queue.new
  @save_queue = OsCtl::Lib::Queue.new
  @topology = OsCtl::Lib::CpuTopology.new
  @control_mutex = Mutex.new
  @package_info = {}
  @scheduled_cts = {}

  log(:info, "#{topology.cpus.length} CPUs in #{topology.packages.length} packages")

  topology.packages.each_value do |pkg|
    pkg_cfg = daemon_cfg.packages[pkg.id]

    pkg_mask = OsCtl::Lib::CpuMask.new(pkg.cpus.keys.sort)
    cpu_mask = pkg_cfg ? pkg_cfg.cpu_mask & pkg_mask : pkg_mask

    log(:info, "CPU package #{pkg.id}: #{cpu_mask.size} CPUs, mask=#{cpu_mask}")

    @package_info[pkg.id] = PackageInfo.new(
      id: pkg.id,
      cpu_mask:,
      usage_score: 0,
      container_count: 0,
      enabled: pkg_cfg ? pkg_cfg.enable : true
    )
  end
end

Instance Attribute Details

#package_infoObject (readonly, protected)

Returns the value of attribute package_info.



332
333
334
# File 'lib/osctld/cpu_scheduler.rb', line 332

def package_info
  @package_info
end

#save_queueObject (readonly, protected)

Returns the value of attribute save_queue.



332
333
334
# File 'lib/osctld/cpu_scheduler.rb', line 332

def save_queue
  @save_queue
end

#save_threadObject (readonly, protected)

Returns the value of attribute save_thread.



332
333
334
# File 'lib/osctld/cpu_scheduler.rb', line 332

def save_thread
  @save_thread
end

#scheduled_ctsObject (readonly, protected)

Returns the value of attribute scheduled_cts.



332
333
334
# File 'lib/osctld/cpu_scheduler.rb', line 332

def scheduled_cts
  @scheduled_cts
end

#topologyOsCtl::Lib::CpuTopology (readonly)

Returns:

  • (OsCtl::Lib::CpuTopology)


72
73
74
# File 'lib/osctld/cpu_scheduler.rb', line 72

def topology
  @topology
end

#upkeep_queueObject (readonly, protected)

Returns the value of attribute upkeep_queue.



332
333
334
# File 'lib/osctld/cpu_scheduler.rb', line 332

def upkeep_queue
  @upkeep_queue
end

#upkeep_threadObject (readonly, protected)

Returns the value of attribute upkeep_thread.



332
333
334
# File 'lib/osctld/cpu_scheduler.rb', line 332

def upkeep_thread
  @upkeep_thread
end

Instance Method Details

#assets(add) ⇒ Object



109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
# File 'lib/osctld/cpu_scheduler.rb', line 109

def assets(add)
  add.directory(
    RunState::CPU_SCHEDULER_DIR,
    desc: 'CPU scheduler state files',
    user: 0,
    group: 0,
    mode: 0o755
  )
  add.file(
    STATE_FILE,
    desc: 'CPU scheduler state file',
    user: 0,
    group: 0,
    mode: 0o400,
    optional: true
  )
end

#assign_package_for(ct, reservation: false) ⇒ Object (protected)



412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
# File 'lib/osctld/cpu_scheduler.rb', line 412

def assign_package_for(ct, reservation: false)
  ct_pkg = ct.cpu_package
  wanted_pkg_id = nil
  daily_use = ct.hints.cpu_daily.usage_us
  pkg = nil
  sched = nil

  if ct_pkg == 'auto'
    # pass
  elsif ct_pkg == 'none'
    log(:info, "#{ct.ident} has disabled scheduler by config")
    return
  elsif !topology.packages.has_key?(ct_pkg)
    log(
      :warn,
      "#{ct.ident} prefers package #{ct_pkg.inspect}, which does not " \
      'exist on this system; disregarding'
    )
  else
    wanted_pkg_id = ct_pkg
  end

  exclusively do
    return unless use?

    sched = scheduled_cts[ct.ident]

    if sched && sched.reservation
      sched.reservation = false
      sched.reserved_at = nil
      pkg = package_info[sched.package_id]

      log(:info, "Using reservation of #{ct.ident} on CPU package #{pkg.id}")
    else
      pkg =
        if wanted_pkg_id
          # static pin
          get_package_by_preference(wanted_pkg_id, daily_use)
        elsif use_sequential_start_stop? && priority_start?(ct)
          # prioritized containers are always put on the first package
          target_pkg_id = package_info.keys.min
          log(:debug, "Priority start for #{ct.ident}, using CPU package #{target_pkg_id}")
          get_package_by_preference(target_pkg_id, daily_use)
        elsif daily_use == 0 || !can_schedule_by_score?
          # no usage stats available, choose package based on number of cts
          get_package_by_count(daily_use)
        else
          # choose package based on cpu use
          get_package_by_score(daily_use)
        end

      sched = record_scheduled(ct, reservation, daily_use, pkg) if pkg
    end

    if pkg.nil?
      log(:warn, "No enabled package found, unable to schedule #{ct.ident}")
      return
    end
  end

  save_state

  if reservation
    log(:info, "Preassigning #{ct.ident} to CPU package #{pkg.id}")
  else
    log(:info, "Assigning #{ct.ident} to CPU package #{pkg.id}")
  end

  [pkg, sched]
end

#can_schedule_by_score?Boolean (protected)

We can schedule by score if no package has less than 75 % cts of the most-used package

Returns:

  • (Boolean)


398
399
400
401
402
403
404
405
406
407
408
409
410
# File 'lib/osctld/cpu_scheduler.rb', line 398

def can_schedule_by_score?
  max_cnt = nil
  min_cnt = nil

  package_info.each_value do |pkg|
    percpu_cnt = pkg.container_count_per_cpu

    max_cnt = percpu_cnt if max_cnt.nil? || max_cnt < percpu_cnt
    min_cnt = percpu_cnt if min_cnt.nil? || min_cnt > percpu_cnt
  end

  (min_cnt.to_f / max_cnt) * 100 >= @min_package_container_count_percent
end

#cancel_preschedule_ct(ct) ⇒ Object

Cancel a reservation in the scheduler

Parameters:



272
273
274
275
276
277
278
279
280
281
282
283
284
285
# File 'lib/osctld/cpu_scheduler.rb', line 272

def cancel_preschedule_ct(ct)
  exclusively do
    sched = scheduled_cts[ct.ident]
    return if sched.nil? || !sched.reservation

    scheduled_cts.delete(ct.ident)

    pkg = package_info[sched.package_id]
    pkg.container_count -= 1
    pkg.usage_score -= sched.usage_score
  end

  nil
end

#disableObject

Disable and stop the scheduler



163
164
165
166
167
168
169
170
171
# File 'lib/osctld/cpu_scheduler.rb', line 163

def disable
  exclusively do
    @enabled = false
    @manual_toggle = true
  end

  save_state
  sync_control { stop_upkeep }
end

#disable_package(package_id) ⇒ Boolean

Parameters:

  • package_id (Integer)

Returns:

  • (Boolean)


217
218
219
220
221
222
223
224
225
226
227
228
229
# File 'lib/osctld/cpu_scheduler.rb', line 217

def disable_package(package_id)
  ret = false

  exclusively do
    next unless package_info.has_key?(package_id)

    package_info[package_id].enabled = false
    ret = true
  end

  save_state if ret
  ret
end

#do_save_stateObject (protected)



637
638
639
640
641
642
643
644
645
# File 'lib/osctld/cpu_scheduler.rb', line 637

def do_save_state
  data = dump_state

  regenerate_file(STATE_FILE, 0o400) do |new|
    new.write(OsCtl::Lib::ConfigFile.dump_yaml(data))
  end

  File.chown(0, 0, STATE_FILE)
end

#do_schedule_ct(ctrc) ⇒ Object (protected)



358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
# File 'lib/osctld/cpu_scheduler.rb', line 358

def do_schedule_ct(ctrc)
  pkg, sched = assign_package_for(ctrc.ct)
  return if pkg.nil?

  # cpuset cannot be configured when child groups already exists, so set it
  # as soon as possible.
  CGroup.mkpath('cpuset', ctrc.ct.base_cgroup_path.split('/'), leaf: false)
  package_set = CGroup.set_param(
    File.join(CGroup.abs_cgroup_path('cpuset', ctrc.ct.base_cgroup_path), 'cpuset.cpus'),
    [pkg.cpu_mask.to_s]
  )

  # Even when we fail here, the cpuset configuration is propagated to LXC
  # config and it should still work.
  unless package_set
    log(:warn, "Unable to set cpuset for #{ctrc.ident}")
  end

  # To make sure that LXC also sets it, add it also among the container's
  # cgroup parameters.
  ctrc.ct.cgparams.set([CGroup::Param.import(
    subsystem: 'cpuset',
    parameter: 'cpuset.cpus',
    value: [pkg.cpu_mask.to_s],
    persistent: false
  )])

  ctrc.cpu_package = pkg.id

  Eventd.report(
    :ct_scheduled,
    pool: ctrc.pool.name,
    id: ctrc.id,
    cpu_package_inuse: pkg.id
  )

  sched
end

#dump_packagesObject (protected)



600
601
602
603
604
605
606
607
# File 'lib/osctld/cpu_scheduler.rb', line 600

def dump_packages
  package_info.map do |pkg_id, pkg|
    {
      'id' => pkg_id,
      'enabled' => pkg.enabled
    }
  end
end

#dump_scheduledObject (protected)



609
610
611
612
613
614
615
616
617
618
619
# File 'lib/osctld/cpu_scheduler.rb', line 609

def dump_scheduled
  scheduled_cts.map do |id, sched|
    {
      'ctid' => id,
      'usage_score' => sched.usage_score,
      'package_id' => sched.package_id,
      'reservation' => sched.reservation,
      'reserved_at' => sched.reserved_at && sched.reserved_at.to_i
    }
  end
end

#dump_stateObject (protected)



621
622
623
624
625
626
627
628
629
630
631
# File 'lib/osctld/cpu_scheduler.rb', line 621

def dump_state
  inclusively do
    ret = {}
    ret['enabled'] = enabled? if @manual_toggle
    ret.update({
      'packages' => dump_packages,
      'scheduled_cts' => dump_scheduled
    })
    ret
  end
end

#enableObject

Enable and start the scheduler



149
150
151
152
153
154
155
156
157
158
159
160
# File 'lib/osctld/cpu_scheduler.rb', line 149

def enable
  exclusively do
    @enabled = true
    @manual_toggle = true
  end

  save_state

  sync_control do
    start_upkeep unless upkeep_running?
  end
end

#enable_package(package_id) ⇒ Boolean

Parameters:

  • package_id (Integer)

Returns:

  • (Boolean)


201
202
203
204
205
206
207
208
209
210
211
212
213
# File 'lib/osctld/cpu_scheduler.rb', line 201

def enable_package(package_id)
  ret = false

  exclusively do
    next unless package_info.has_key?(package_id)

    package_info[package_id].enabled = true
    ret = true
  end

  save_state if ret
  ret
end

#enabled?Boolean

Return ‘true` if the scheduler is enabled by configuration

Returns:

  • (Boolean)


174
175
176
# File 'lib/osctld/cpu_scheduler.rb', line 174

def enabled?
  inclusively { @enabled }
end

#export_packagesObject



310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
# File 'lib/osctld/cpu_scheduler.rb', line 310

def export_packages
  exclusively do
    topology.packages.each_value.map do |pkg|
      pkg_info = package_info[pkg.id]

      {
        id: pkg.id,
        cpus: pkg_info.cpu_mask.to_a,
        containers: pkg_info.container_count,
        usage_score: pkg_info.usage_score,
        enabled: pkg_info.enabled
      }
    end
  end
end

#export_statusObject



293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
# File 'lib/osctld/cpu_scheduler.rb', line 293

def export_status
  ret = {}

  exclusively do
    ret.update(enabled: enabled?, needed: needed?, use: use?)
  end

  sync_control do
    ret.update(upkeep_running: upkeep_running?)
  end

  ret[:packages] = topology.packages.length
  ret[:cpus] = topology.cpus.length

  ret
end

#get_package_by_count(usage_score) ⇒ Object (protected)



490
491
492
493
494
495
496
497
498
499
500
501
# File 'lib/osctld/cpu_scheduler.rb', line 490

def get_package_by_count(usage_score)
  sorted_pkgs = package_info.values.select(&:enabled).sort do |a, b|
    a.container_count_per_cpu <=> b.container_count_per_cpu
  end

  pkg = sorted_pkgs.first
  return if pkg.nil?

  pkg.container_count += 1
  pkg.usage_score += usage_score
  pkg
end

#get_package_by_preference(pkg_id, usage_score) ⇒ Object (protected)



483
484
485
486
487
488
# File 'lib/osctld/cpu_scheduler.rb', line 483

def get_package_by_preference(pkg_id, usage_score)
  pkg = package_info[pkg_id]
  pkg.container_count += 1
  pkg.usage_score += usage_score
  pkg
end

#get_package_by_score(usage_score) ⇒ Object (protected)



503
504
505
506
507
508
509
510
511
512
513
514
# File 'lib/osctld/cpu_scheduler.rb', line 503

def get_package_by_score(usage_score)
  sorted_pkgs = package_info.values.select(&:enabled).sort do |a, b|
    a.usage_score_per_cpu <=> b.usage_score_per_cpu
  end

  pkg = sorted_pkgs.first
  return if pkg.nil?

  pkg.container_count += 1
  pkg.usage_score += usage_score
  pkg
end

#get_preschedule_package_id(ct) ⇒ Integer?

Get prescheduler package id

Parameters:

Returns:

  • (Integer, nil)


263
264
265
266
267
268
# File 'lib/osctld/cpu_scheduler.rb', line 263

def get_preschedule_package_id(ct)
  exclusively do
    sched = scheduled_cts[ct.ident]
    sched && sched.package_id
  end
end

#load_stateObject (protected)



647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
# File 'lib/osctld/cpu_scheduler.rb', line 647

def load_state
  begin
    data = OsCtl::Lib::ConfigFile.load_yaml_file(STATE_FILE)
  rescue Errno::ENOENT
    return
  end

  if data.has_key?('enabled')
    @enabled = data['enabled']
    @manual_toggle = true
  end

  data.fetch('packages', []).each do |pkg|
    pkg_id = pkg['id']
    next unless package_info.has_key?(pkg_id)

    package_info[pkg_id].enabled = pkg.fetch('enabled', true)
  end

  data.fetch('scheduled_cts', []).each do |ct|
    sched = ScheduleInfo.new(
      ctid: ct['ctid'],
      usage_score: ct['usage_score'],
      package_id: ct['package_id'],
      reservation: ct['reservation'],
      reserved_at: ct['reserved_at'] && Time.at(ct['reserved_at'])
    )

    next unless package_info.has_key?(sched.package_id)

    scheduled_cts[sched.ctid] = sched

    pkg = package_info[sched.package_id]
    pkg.container_count += 1
    pkg.usage_score += sched.usage_score
  end
end

#log_typeObject



326
327
328
# File 'lib/osctld/cpu_scheduler.rb', line 326

def log_type
  'cpu-scheduler'
end

#needed?Boolean

Return ‘true` if the scheduler is needed by the system

Returns:

  • (Boolean)


179
180
181
# File 'lib/osctld/cpu_scheduler.rb', line 179

def needed?
  topology.packages.length > 1
end

#preschedule_ct(ct) ⇒ Object

Make a reservation in the scheduler

Parameters:



256
257
258
# File 'lib/osctld/cpu_scheduler.rb', line 256

def preschedule_ct(ct)
  assign_package_for(ct, reservation: true)
end

#priority_start?(ct) ⇒ Boolean (protected)

Returns:

  • (Boolean)


537
538
539
540
541
542
# File 'lib/osctld/cpu_scheduler.rb', line 537

def priority_start?(ct)
  daemon_cfg = Daemon.get.config.cpu_scheduler
  autostart = ct.autostart

  autostart ? autostart.priority < daemon_cfg.sequential_start_priority_threshold : false
end

#record_scheduled(ct, reservation, usage_score, pkg) ⇒ Object (protected)



516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
# File 'lib/osctld/cpu_scheduler.rb', line 516

def record_scheduled(ct, reservation, usage_score, pkg)
  if scheduled_cts[ct.ident]
    # This container has already been scheduled, so fix the leak
    sched = scheduled_cts[ct.ident]
    sched_pkg = package_info[sched.package_id]

    log(:warn, "Fixing schedule leak for #{ct.ident}: scheduling on #{pkg.id}, while already scheduled on #{sched_pkg.id}")

    sched_pkg.usage_score -= sched.usage_score
    sched_pkg.container_count -= 1
  end

  scheduled_cts[ct.ident] = ScheduleInfo.new(
    ctid: ct.ident,
    usage_score:,
    package_id: pkg.id,
    reservation:,
    reserved_at: reservation ? Time.now : nil
  )
end

#run_saveObject (protected)



588
589
590
591
592
593
594
595
596
597
598
# File 'lib/osctld/cpu_scheduler.rb', line 588

def run_save
  loop do
    v = save_queue.pop
    return if v == :stop

    do_save_state
    return if @do_shutdown

    sleep(1)
  end
end

#run_upkeepObject (protected)



544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
# File 'lib/osctld/cpu_scheduler.rb', line 544

def run_upkeep
  unschedule_table = {}

  loop do
    v = upkeep_queue.pop(timeout: 60 * 5)
    return if v == :stop

    now = Time.now

    cts = DB::Containers.get.each do |ct|
      ctrc = ct.run_conf
      stopped = ct.state == :stopped
      should_unschedule = false

      exclusively do
        sched = scheduled_cts[ct.ident]

        if stopped && ctrc.nil? && sched
          if !sched.reservation || sched.reserved_at + (60 * 60) < now
            should_unschedule = true
          end
        elsif ctrc && ctrc.cpu_package.nil? && sched
          should_unschedule = true
        elsif ctrc && ctrc.cpu_package && (sched.nil? || ctrc.cpu_package != sched.package_id)
          pkg = package_info[ctrc.cpu_package]
          pkg.container_count += 1
          pkg.usage_score += ct.hints.cpu_daily.usage_us
          record_scheduled(ct, false, ct.hints.cpu_daily.usage_us, pkg)
        end

        if should_unschedule
          unschedule_table[ct.ident] ||= 0
          unschedule_table[ct.ident] += 1
          unschedule_ct(ct) if unschedule_table[ct.ident] > 3
        else
          unschedule_table.delete(ct.ident)
        end
      end
    end

    save_state
  end
end

#save_stateObject (protected)



633
634
635
# File 'lib/osctld/cpu_scheduler.rb', line 633

def save_state
  save_queue << :save
end

#schedule_ct(ctrc) ⇒ Object

Assign container to an available CPU package and configure its cpuset

Parameters:



233
234
235
236
237
# File 'lib/osctld/cpu_scheduler.rb', line 233

def schedule_ct(ctrc)
  sched = do_schedule_ct(ctrc)
  ctrc.save if sched
  nil
end

#setupObject



127
128
129
130
131
132
133
# File 'lib/osctld/cpu_scheduler.rb', line 127

def setup
  load_state

  @save_thread = Thread.new { run_save }

  start_upkeep if use?
end

#shutdownObject



135
136
137
138
139
140
141
142
143
144
145
146
# File 'lib/osctld/cpu_scheduler.rb', line 135

def shutdown
  sync_control do
    stop_upkeep

    if save_thread
      @do_shutdown = true
      save_queue << :save
      save_thread.join
      @save_thread = nil
    end
  end
end

#start_upkeepObject (protected)

Start background container upkeeping



336
337
338
339
340
# File 'lib/osctld/cpu_scheduler.rb', line 336

def start_upkeep
  sync_control do
    @upkeep_thread = Thread.new { run_upkeep }
  end
end

#stop_upkeepObject (protected)

Stop background container upkeeping



343
344
345
346
347
348
349
350
351
# File 'lib/osctld/cpu_scheduler.rb', line 343

def stop_upkeep
  sync_control do
    return unless upkeep_running?

    upkeep_queue << :stop
    upkeep_thread.join
    @upkeep_thread = nil
  end
end

#sync_control(&block) ⇒ Object (protected)



685
686
687
688
689
690
691
# File 'lib/osctld/cpu_scheduler.rb', line 685

def sync_control(&block)
  if @control_mutex.owned?
    block.call
  else
    @control_mutex.synchronize(&block)
  end
end

#unschedule_ct(ct) ⇒ Object

Remove container from the scheduler

Parameters:



241
242
243
244
245
246
247
248
249
250
251
252
# File 'lib/osctld/cpu_scheduler.rb', line 241

def unschedule_ct(ct)
  exclusively do
    sched = scheduled_cts.delete(ct.ident)
    return if sched.nil?

    pkg = package_info[sched.package_id]
    pkg.container_count -= 1
    pkg.usage_score -= sched.usage_score
  end

  nil
end

#upkeepObject



287
288
289
290
291
# File 'lib/osctld/cpu_scheduler.rb', line 287

def upkeep
  sync_control do
    upkeep_queue << :upkeep if upkeep_running?
  end
end

#upkeep_running?Boolean (protected)

Return ‘true` if the scheduler is running

Returns:

  • (Boolean)


354
355
356
# File 'lib/osctld/cpu_scheduler.rb', line 354

def upkeep_running?
  sync_control { !@upkeep_thread.nil? }
end

#use?Boolean

Return ‘true` if the scheduler is both enabled and needed

Returns:

  • (Boolean)


184
185
186
# File 'lib/osctld/cpu_scheduler.rb', line 184

def use?
  inclusively { enabled? && needed? }
end

#use_sequential_start_stop?Boolean

Return ‘true` if containers should be started/stopped in order of CPU packages

If there are two packages, then containers are first started on package0 and then on package1. Stop goes in reverse order: package1, package0.

This setting interferes with container start priorities, see cpu scheduler config option ‘sequential_start_priority_threshold`.

Returns:

  • (Boolean)


195
196
197
# File 'lib/osctld/cpu_scheduler.rb', line 195

def use_sequential_start_stop?
  inclusively { use? && topology.packages.length == 2 }
end