Class: OsCtld::CpuScheduler

Inherits:
Object
  • Object
show all
Includes:
OsCtl::Lib::Utils::File, OsCtl::Lib::Utils::Log, Lockable, Singleton
Defined in:
lib/osctld/cpu_scheduler.rb

Overview

Schedule containers on CPUs to keep them running on the same package

Defined Under Namespace

Classes: PackageInfo, ScheduleInfo

Constant Summary collapse

STATE_FILE =
File.join(RunState::CPU_SCHEDULER_DIR, 'state.yml')

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Lockable

#exclusively, included, #inclusively, #init_lock, #lock, #unlock

Constructor Details

#initializeCpuScheduler

Returns a new instance of CpuScheduler.



72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
# File 'lib/osctld/cpu_scheduler.rb', line 72

def initialize
  init_lock

  daemon_cfg = Daemon.get.config.cpu_scheduler

  @enabled = daemon_cfg.enable?
  @manual_toggle = false
  @min_package_container_count_percent = daemon_cfg.min_package_container_count_percent
  @upkeep_queue = OsCtl::Lib::Queue.new
  @save_queue = OsCtl::Lib::Queue.new
  @topology = OsCtl::Lib::CpuTopology.new
  @control_mutex = Mutex.new
  @package_info = {}
  @scheduled_cts = {}

  log(:info, "#{topology.cpus.length} CPUs in #{topology.packages.length} packages")

  topology.packages.each_value do |pkg|
    pkg_cfg = daemon_cfg.packages[pkg.id]

    pkg_mask = OsCtl::Lib::CpuMask.new(pkg.cpus.keys.sort)
    cpu_mask = pkg_cfg ? pkg_cfg.cpu_mask & pkg_mask : pkg_mask

    log(:info, "CPU package #{pkg.id}: #{cpu_mask.size} CPUs, mask=#{cpu_mask}")

    @package_info[pkg.id] = PackageInfo.new(
      id: pkg.id,
      cpu_mask: cpu_mask,
      usage_score: 0,
      container_count: 0,
      enabled: pkg_cfg ? pkg_cfg.enable : true,
    )
  end
end

Instance Attribute Details

#package_infoObject (readonly, protected)

Returns the value of attribute package_info.



308
309
310
# File 'lib/osctld/cpu_scheduler.rb', line 308

def package_info
  @package_info
end

#save_queueObject (readonly, protected)

Returns the value of attribute save_queue.



308
309
310
# File 'lib/osctld/cpu_scheduler.rb', line 308

def save_queue
  @save_queue
end

#save_threadObject (readonly, protected)

Returns the value of attribute save_thread.



308
309
310
# File 'lib/osctld/cpu_scheduler.rb', line 308

def save_thread
  @save_thread
end

#scheduled_ctsObject (readonly, protected)

Returns the value of attribute scheduled_cts.



308
309
310
# File 'lib/osctld/cpu_scheduler.rb', line 308

def scheduled_cts
  @scheduled_cts
end

#topologyOsCtl::Lib::CpuTopology (readonly)

Returns:

  • (OsCtl::Lib::CpuTopology)


70
71
72
# File 'lib/osctld/cpu_scheduler.rb', line 70

def topology
  @topology
end

#upkeep_queueObject (readonly, protected)

Returns the value of attribute upkeep_queue.



308
309
310
# File 'lib/osctld/cpu_scheduler.rb', line 308

def upkeep_queue
  @upkeep_queue
end

#upkeep_threadObject (readonly, protected)

Returns the value of attribute upkeep_thread.



308
309
310
# File 'lib/osctld/cpu_scheduler.rb', line 308

def upkeep_thread
  @upkeep_thread
end

Instance Method Details

#assets(add) ⇒ Object



107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
# File 'lib/osctld/cpu_scheduler.rb', line 107

def assets(add)
  add.directory(
    RunState::CPU_SCHEDULER_DIR,
    desc: 'CPU scheduler state files',
    user: 0,
    group: 0,
    mode: 0755,
  )
  add.file(
    STATE_FILE,
    desc: 'CPU scheduler state file',
    user: 0,
    group: 0,
    mode: 0400,
    optional: true,
  )
end

#assign_package_for(ct, reservation: false) ⇒ Object (protected)



388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
# File 'lib/osctld/cpu_scheduler.rb', line 388

def assign_package_for(ct, reservation: false)
  ct_pkg = ct.cpu_package
  wanted_pkg_id = nil
  daily_use = ct.hints.cpu_daily.usage_us
  pkg = nil
  sched = nil

  if ct_pkg == 'auto'
    # pass
  elsif ct_pkg == 'none'
    log(:info, "#{ct.ident} has disabled scheduler by config")
    return
  elsif !topology.packages.has_key?(ct_pkg)
    log(
      :warn,
      "#{ct.ident} prefers package #{ct_pkg.inspect}, which does not "+
      "exist on this system; disregarding"
    )
  else
    wanted_pkg_id = ct_pkg
  end

  exclusively do
    return unless use?

    sched = scheduled_cts[ct.ident]

    if sched && sched.reservation
      sched.reservation = false
      sched.reserved_at = nil
      pkg = package_info[sched.package_id]

      log(:info, "Using reservation of #{ct.ident} on CPU package #{pkg.id}")
    else
      pkg =
        if wanted_pkg_id
          # static pin
          get_package_by_preference(wanted_pkg_id, daily_use)
        elsif daily_use == 0 || !can_schedule_by_score?
          # no usage stats available, choose package based on number of cts
          get_package_by_count(daily_use)
        else
          # choose package based on cpu use
          get_package_by_score(daily_use)
        end

      sched = record_scheduled(ct, reservation, daily_use, pkg) if pkg
    end

    if pkg.nil?
      log(:warn, "No enabled package found, unable to schedule #{ct.ident}")
      return
    end
  end

  save_state

  if reservation
    log(:info, "Preassigning #{ct.ident} to CPU package #{pkg.id}")
  else
    log(:info, "Assigning #{ct.ident} to CPU package #{pkg.id}")
  end

  [pkg, sched]
end

#can_schedule_by_score?Boolean (protected)

We can schedule by score if no package has less than 75 % cts of the most-used package

Returns:

  • (Boolean)


374
375
376
377
378
379
380
381
382
383
384
385
386
# File 'lib/osctld/cpu_scheduler.rb', line 374

def can_schedule_by_score?
  max_cnt = nil
  min_cnt = nil

  package_info.each_value do |pkg|
    percpu_cnt = pkg.container_count_per_cpu

    max_cnt = percpu_cnt if max_cnt.nil? || max_cnt < percpu_cnt
    min_cnt = percpu_cnt if min_cnt.nil? || min_cnt > percpu_cnt
  end

  (min_cnt.to_f / max_cnt) * 100 >= @min_package_container_count_percent
end

#cancel_preschedule_ct(ct) ⇒ Object

Cancel a reservation in the scheduler

Parameters:



249
250
251
252
253
254
255
256
257
258
259
260
261
262
# File 'lib/osctld/cpu_scheduler.rb', line 249

def cancel_preschedule_ct(ct)
  exclusively do
    sched = scheduled_cts[ct.ident]
    return if sched.nil? || !sched.reservation

    scheduled_cts.delete(ct.ident)

    pkg = package_info[sched.package_id]
    pkg.container_count -= 1
    pkg.usage_score -= sched.usage_score
  end

  nil
end

#disableObject

Disable and stop the scheduler



161
162
163
164
165
166
167
168
169
# File 'lib/osctld/cpu_scheduler.rb', line 161

def disable
  exclusively do
    @enabled = false
    @manual_toggle = true
  end

  save_state
  sync_control { stop_upkeep }
end

#disable_package(package_id) ⇒ Boolean

Parameters:

  • package_id (Integer)

Returns:

  • (Boolean)


204
205
206
207
208
209
210
211
212
213
214
215
216
# File 'lib/osctld/cpu_scheduler.rb', line 204

def disable_package(package_id)
  ret = false

  exclusively do
    next unless package_info.has_key?(package_id)

    package_info[package_id].enabled = false
    ret = true
  end

  save_state if ret
  ret
end

#do_save_stateObject (protected)



601
602
603
604
605
606
607
608
609
# File 'lib/osctld/cpu_scheduler.rb', line 601

def do_save_state
  data = dump_state

  regenerate_file(STATE_FILE, 0400) do |new|
    new.write(OsCtl::Lib::ConfigFile.dump_yaml(data))
  end

  File.chown(0, 0, STATE_FILE)
end

#do_schedule_ct(ctrc) ⇒ Object (protected)



334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
# File 'lib/osctld/cpu_scheduler.rb', line 334

def do_schedule_ct(ctrc)
  pkg, sched = assign_package_for(ctrc.ct)
  return if pkg.nil?

  # cpuset cannot be configured when child groups already exists, so set it
  # as soon as possible.
  CGroup.mkpath('cpuset', ctrc.ct.base_cgroup_path.split('/'), leaf: false)
  package_set = CGroup.set_param(
    File.join(CGroup.abs_cgroup_path('cpuset', ctrc.ct.base_cgroup_path), 'cpuset.cpus'),
    [pkg.cpu_mask.to_s]
  )

  # Even when we fail here, the cpuset configuration is propagated to LXC
  # config and it should still work.
  unless package_set
    log(:warn, "Unable to set cpuset for #{ctrc.ident}")
  end

  # To make sure that LXC also sets it, add it also among the container's
  # cgroup parameters.
  ctrc.ct.cgparams.set([CGroup::Param.import(
    subsystem: 'cpuset',
    parameter: 'cpuset.cpus',
    value: [pkg.cpu_mask.to_s],
    persistent: false,
  )])

  ctrc.cpu_package = pkg.id

  Eventd.report(
    :ct_scheduled,
    pool: ctrc.pool.name,
    id: ctrc.id,
    cpu_package_inuse: pkg.id,
  )

  sched
end

#dump_packagesObject (protected)



564
565
566
567
568
569
570
571
# File 'lib/osctld/cpu_scheduler.rb', line 564

def dump_packages
  package_info.map do |pkg_id, pkg|
    {
      'id' => pkg_id,
      'enabled' => pkg.enabled,
    }
  end
end

#dump_scheduledObject (protected)



573
574
575
576
577
578
579
580
581
582
583
# File 'lib/osctld/cpu_scheduler.rb', line 573

def dump_scheduled
  scheduled_cts.map do |id, sched|
    {
      'ctid' => id,
      'usage_score' => sched.usage_score,
      'package_id' => sched.package_id,
      'reservation' => sched.reservation,
      'reserved_at' => sched.reserved_at && sched.reserved_at.to_i,
    }
  end
end

#dump_stateObject (protected)



585
586
587
588
589
590
591
592
593
594
595
# File 'lib/osctld/cpu_scheduler.rb', line 585

def dump_state
  inclusively do
    ret = {}
    ret['enabled'] = enabled? if @manual_toggle
    ret.update({
      'packages' => dump_packages,
      'scheduled_cts' => dump_scheduled,
    })
    ret
  end
end

#enableObject

Enable and start the scheduler



147
148
149
150
151
152
153
154
155
156
157
158
# File 'lib/osctld/cpu_scheduler.rb', line 147

def enable
  exclusively do
    @enabled = true
    @manual_toggle = true
  end

  save_state

  sync_control do
    start_upkeep unless upkeep_running?
  end
end

#enable_package(package_id) ⇒ Boolean

Parameters:

  • package_id (Integer)

Returns:

  • (Boolean)


188
189
190
191
192
193
194
195
196
197
198
199
200
# File 'lib/osctld/cpu_scheduler.rb', line 188

def enable_package(package_id)
  ret = false

  exclusively do
    next unless package_info.has_key?(package_id)

    package_info[package_id].enabled = true
    ret = true
  end

  save_state if ret
  ret
end

#enabled?Boolean

Return `true` if the scheduler is enabled by configuration

Returns:

  • (Boolean)


172
173
174
# File 'lib/osctld/cpu_scheduler.rb', line 172

def enabled?
  inclusively { @enabled }
end

#export_packagesObject



287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
# File 'lib/osctld/cpu_scheduler.rb', line 287

def export_packages
  exclusively do
    topology.packages.each_value.map do |pkg|
      pkg_info = package_info[pkg.id]

      {
        id: pkg.id,
        cpus: pkg_info.cpu_mask.to_a,
        containers: pkg_info.container_count,
        usage_score: pkg_info.usage_score,
        enabled: pkg_info.enabled,
      }
    end
  end
end

#export_statusObject



270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
# File 'lib/osctld/cpu_scheduler.rb', line 270

def export_status
  ret = {}

  exclusively do
    ret.update(enabled: enabled?, needed: needed?, use: use?)
  end

  sync_control do
    ret.update(upkeep_running: upkeep_running?)
  end

  ret[:packages] = topology.packages.length
  ret[:cpus] = topology.cpus.length

  ret
end

#get_package_by_count(usage_score) ⇒ Object (protected)



461
462
463
464
465
466
467
468
469
470
471
472
# File 'lib/osctld/cpu_scheduler.rb', line 461

def get_package_by_count(usage_score)
  sorted_pkgs = package_info.values.select(&:enabled).sort do |a, b|
    a.container_count_per_cpu <=> b.container_count_per_cpu
  end

  pkg = sorted_pkgs.first
  return if pkg.nil?

  pkg.container_count += 1
  pkg.usage_score += usage_score
  pkg
end

#get_package_by_preference(pkg_id, usage_score) ⇒ Object (protected)



454
455
456
457
458
459
# File 'lib/osctld/cpu_scheduler.rb', line 454

def get_package_by_preference(pkg_id, usage_score)
  pkg = package_info[pkg_id]
  pkg.container_count += 1
  pkg.usage_score += usage_score
  pkg
end

#get_package_by_score(usage_score) ⇒ Object (protected)



474
475
476
477
478
479
480
481
482
483
484
485
# File 'lib/osctld/cpu_scheduler.rb', line 474

def get_package_by_score(usage_score)
  sorted_pkgs = package_info.values.select(&:enabled).sort do |a, b|
    a.usage_score_per_cpu <=> b.usage_score_per_cpu
  end

  pkg = sorted_pkgs.first
  return if pkg.nil?

  pkg.container_count += 1
  pkg.usage_score += usage_score
  pkg
end

#load_stateObject (protected)



611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
# File 'lib/osctld/cpu_scheduler.rb', line 611

def load_state
  begin
    data = OsCtl::Lib::ConfigFile.load_yaml_file(STATE_FILE)
  rescue Errno::ENOENT
    return
  end

  if data.has_key?('enabled')
    @enabled = data['enabled']
    @manual_toggle = true
  end

  data.fetch('packages', []).each do |pkg|
    pkg_id = pkg['id']
    next unless package_info.has_key?(pkg_id)

    package_info[pkg_id].enabled = pkg.fetch('enabled', true)
  end

  data.fetch('scheduled_cts', []).each do |ct|
    sched = ScheduleInfo.new(
      ctid: ct['ctid'],
      usage_score: ct['usage_score'],
      package_id: ct['package_id'],
      reservation: ct['reservation'],
      reserved_at: ct['reserved_at'] && Time.at(ct['reserved_at']),
    )

    next unless package_info.has_key?(sched.package_id)

    scheduled_cts[sched.ctid] = sched

    pkg = package_info[sched.package_id]
    pkg.container_count += 1
    pkg.usage_score += sched.usage_score
  end
end

#log_typeObject



303
304
305
# File 'lib/osctld/cpu_scheduler.rb', line 303

def log_type
  'cpu-scheduler'
end

#needed?Boolean

Return `true` if the scheduler is needed by the system

Returns:

  • (Boolean)


177
178
179
# File 'lib/osctld/cpu_scheduler.rb', line 177

def needed?
  topology.packages.length > 1
end

#preschedule_ct(ct) ⇒ Object

Make a reservation in the scheduler

Parameters:



243
244
245
# File 'lib/osctld/cpu_scheduler.rb', line 243

def preschedule_ct(ct)
  assign_package_for(ct, reservation: true)
end

#record_scheduled(ct, reservation, usage_score, pkg) ⇒ Object (protected)



487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
# File 'lib/osctld/cpu_scheduler.rb', line 487

def record_scheduled(ct, reservation, usage_score, pkg)
  if scheduled_cts[ct.ident]
    # This container has already been scheduled, so fix the leak
    sched = scheduled_cts[ct.ident]
    sched_pkg = package_info[ sched.package_id ]

    log(:warn, "Fixing schedule leak for #{ct.ident}: scheduling on #{pkg.id}, while already scheduled on #{sched_pkg.id}")

    sched_pkg.usage_score -= sched.usage_score
    sched_pkg.container_count -= 1
  end

  scheduled_cts[ct.ident] = ScheduleInfo.new(
    ctid: ct.ident,
    usage_score: usage_score,
    package_id: pkg.id,
    reservation: reservation,
    reserved_at: reservation ? Time.now : nil,
  )
end

#run_saveObject (protected)



552
553
554
555
556
557
558
559
560
561
562
# File 'lib/osctld/cpu_scheduler.rb', line 552

def run_save
  loop do
    v = save_queue.pop
    return if v == :stop

    do_save_state
    return if @do_shutdown

    sleep(1)
  end
end

#run_upkeepObject (protected)



508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
# File 'lib/osctld/cpu_scheduler.rb', line 508

def run_upkeep
  unschedule_table = {}

  loop do
    v = upkeep_queue.pop(timeout: 60*5)
    return if v == :stop

    now = Time.now

    cts = DB::Containers.get.each do |ct|
      ctrc = ct.run_conf
      stopped = ct.state == :stopped
      should_unschedule = false

      exclusively do
        sched = scheduled_cts[ct.ident]

        if stopped && ctrc.nil? && sched
          if !sched.reservation || sched.reserved_at + 60*60 < now
            should_unschedule = true
          end
        elsif ctrc && ctrc.cpu_package.nil? && sched
          should_unschedule = true
        elsif ctrc && ctrc.cpu_package && (sched.nil? || ctrc.cpu_package != sched.package_id)
          pkg = package_info[ctrc.cpu_package]
          pkg.container_count += 1
          pkg.usage_score += ct.hints.cpu_daily.usage_us
          record_scheduled(ct, false, ct.hints.cpu_daily.usage_us, pkg)
        end

        if should_unschedule
          unschedule_table[ct.ident] ||= 0
          unschedule_table[ct.ident] += 1
          unschedule_ct(ct) if unschedule_table[ct.ident] > 3
        else
          unschedule_table.delete(ct.ident)
        end
      end
    end

    save_state
  end
end

#save_stateObject (protected)



597
598
599
# File 'lib/osctld/cpu_scheduler.rb', line 597

def save_state
  save_queue << :save
end

#schedule_ct(ctrc) ⇒ Object

Assign container to an available CPU package and configure its cpuset

Parameters:



220
221
222
223
224
# File 'lib/osctld/cpu_scheduler.rb', line 220

def schedule_ct(ctrc)
  sched = do_schedule_ct(ctrc)
  ctrc.save if sched
  nil
end

#setupObject



125
126
127
128
129
130
131
# File 'lib/osctld/cpu_scheduler.rb', line 125

def setup
  load_state

  @save_thread = Thread.new { run_save }

  start_upkeep if use?
end

#shutdownObject



133
134
135
136
137
138
139
140
141
142
143
144
# File 'lib/osctld/cpu_scheduler.rb', line 133

def shutdown
  sync_control do
    stop_upkeep

    if save_thread
      @do_shutdown = true
      save_queue << :save
      save_thread.join
      @save_thread = nil
    end
  end
end

#start_upkeepObject (protected)

Start background container upkeeping



312
313
314
315
316
# File 'lib/osctld/cpu_scheduler.rb', line 312

def start_upkeep
  sync_control do
    @upkeep_thread = Thread.new { run_upkeep }
  end
end

#stop_upkeepObject (protected)

Stop background container upkeeping



319
320
321
322
323
324
325
326
327
# File 'lib/osctld/cpu_scheduler.rb', line 319

def stop_upkeep
  sync_control do
    return unless upkeep_running?

    upkeep_queue << :stop
    upkeep_thread.join
    @upkeep_thread = nil
  end
end

#sync_control(&block) ⇒ Object (protected)



649
650
651
652
653
654
655
# File 'lib/osctld/cpu_scheduler.rb', line 649

def sync_control(&block)
  if @control_mutex.owned?
    block.call
  else
    @control_mutex.synchronize(&block)
  end
end

#unschedule_ct(ct) ⇒ Object

Remove container from the scheduler

Parameters:



228
229
230
231
232
233
234
235
236
237
238
239
# File 'lib/osctld/cpu_scheduler.rb', line 228

def unschedule_ct(ct)
  exclusively do
    sched = scheduled_cts.delete(ct.ident)
    return if sched.nil?

    pkg = package_info[sched.package_id]
    pkg.container_count -= 1
    pkg.usage_score -= sched.usage_score
  end

  nil
end

#upkeepObject



264
265
266
267
268
# File 'lib/osctld/cpu_scheduler.rb', line 264

def upkeep
  sync_control do
    upkeep_queue << :upkeep if upkeep_running?
  end
end

#upkeep_running?Boolean (protected)

Return `true` if the scheduler is running

Returns:

  • (Boolean)


330
331
332
# File 'lib/osctld/cpu_scheduler.rb', line 330

def upkeep_running?
  sync_control { !@upkeep_thread.nil? }
end

#use?Boolean

Return `true` if the scheduler is both enabled and needed

Returns:

  • (Boolean)


182
183
184
# File 'lib/osctld/cpu_scheduler.rb', line 182

def use?
  inclusively { enabled? && needed? }
end