Class: OsCtld::Eventd::Manager

Inherits:
Object
  • Object
show all
Defined in:
lib/osctld/eventd/manager.rb

Overview

Event pub/sub server

This class aggregates events from osctld and announces them to subscribed clients.

Subscribing

Clients can subscribe by calling #subscribe, which returns a queue over which the events will be sent. To unsubscribe, call ##unsubscribe with the queue as an argument. An instance of Event is sent over the queue for every reported event.

Announcing events

Events are announced using method #report. Events are classified by a ‘type` and described by `opts`. For possible event types, see below.

Event types

‘:management`

Used for management commands received over the command socket. Options:

{
  id: unique internal command identifier,
  cmd: management command,
  opts: command options,
  state: :run | :done | :failed
}

‘:osctld_shutdown`

Sent when osctld is shutting down.

‘:state`

Used to report changes of container states. Options:

{
  pool: pool name,
  id: container id,
  state: new state
}

‘:db`

Reports about changes in osctld database. Options:

{
  object: pool/user/group/container,
  pool: object's pool name,
  id: object identificator,
  action: add/remove
}

‘:ct_scheduled`

Reports actions by the CPU scheduler Options:

{
  pool: pool name,
  id: container id,
  cpu_package_inuse: package id
}

‘:ct_init_pid`

Reports discovery of container init PID Options:

{
  pool: pool name,
  id: container id,
  init_pid: init PID
}

‘:ct_netif`

Reports about container networking interfaces being added, deleted or coming up and down.

Options:

{
  pool: container's pool name,
  id: container id,
  action: add/remove/rename/up/down,
  name: interface name inside the container,
  new_name: net interface name when action is rename,
  veth: interface name on the host
}

Instance Method Summary collapse

Constructor Details

#initializeManager

Returns a new instance of Manager.



91
92
93
# File 'lib/osctld/eventd/manager.rb', line 91

def initialize
  @workers = []
end

Instance Method Details

#default_worker_countObject (protected)



164
165
166
# File 'lib/osctld/eventd/manager.rb', line 164

def default_worker_count
  [Etc.nprocessors / 32, 2].max
end

#get_workerObject (protected)



146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
# File 'lib/osctld/eventd/manager.rb', line 146

def get_worker
  ret = nil
  min_size = nil

  @workers.each do |w|
    w_size = w.size

    if min_size.nil? || w_size < min_size
      ret = w
      min_size = w_size
    end
  end

  raise 'programming error: no worker found' if ret.nil?

  ret
end

#report(type, **opts) ⇒ Object

Report an event that should be announced to all subscribers

Parameters:

  • type (Symbol)
  • opts (Hash)


138
139
140
141
142
# File 'lib/osctld/eventd/manager.rb', line 138

def report(type, **opts)
  event = Eventd::Event.new(type, opts)
  @workers.each { |w| w.report(event) }
  nil
end

#shutdownObject



114
115
116
117
118
# File 'lib/osctld/eventd/manager.rb', line 114

def shutdown
  report(:osctld_shutdown)
  stop
  nil
end

#start(num_workers: nil) ⇒ Object

Parameters:

  • num_workers (Integer, nil) (defaults to: nil)


96
97
98
99
100
101
102
103
104
105
106
107
# File 'lib/osctld/eventd/manager.rb', line 96

def start(num_workers: nil)
  num_workers ||= default_worker_count
  @workers.clear

  num_workers.times.each do
    w = Eventd::Worker.new
    w.start
    @workers << w
  end

  nil
end

#stopObject



109
110
111
112
# File 'lib/osctld/eventd/manager.rb', line 109

def stop
  @workers.each(&:stop)
  nil
end

#subscribeOsCtl::Lib::Queue

Subscribe a client to all events

Returns:

  • (OsCtl::Lib::Queue)


122
123
124
125
126
# File 'lib/osctld/eventd/manager.rb', line 122

def subscribe
  q = OsCtl::Lib::Queue.new
  get_worker.subscribe(q)
  q
end

#unsubscribe(queue) ⇒ Object

Unsubscribe client represented by ‘queue`

Parameters:

  • queue (OsCtl::Lib::Queue)


130
131
132
133
# File 'lib/osctld/eventd/manager.rb', line 130

def unsubscribe(queue)
  @workers.each { |w| w.unsubscribe(queue) }
  nil
end