Class: OsCtl::Lib::Zfs::Stream

Inherits:
Object
  • Object
show all
Includes:
Utils::Log, Utils::System
Defined in:
lib/libosctl/zfs/stream.rb

Overview

Control class for ZFS send invocation.

Stream uses the ‘-v` flag of zfs send to monitor how the transfer progresses. The following output from zfs send is parsed:

send from <snap1> to <snap2> estimated size is <n><unit>
send from <snap2> to <snap3> estimated size is <n><unit>
send from <snap3> to <snap4> estimated size is <n><unit>
...
total estimated size is <n><unit>  # this is the size that is read
TIME        SENT         SNAPSHOT
HH:MM:SS    <n1><unit>   <snap>
HH:MM:SS    <n2><unit>   <snap>
HH:MM:SS    <n3><unit>   <snap>
TIME        SENT         SNAPSHOT
HH:MM:SS    <m1><unit>   <snap>
HH:MM:SS    <m2><unit>   <snap>
HH:MM:SS    <m3><unit>   <snap>
...

A line with column headers separates transfers of snapshots listed at the top. There might not be any transfered data between column headers if the snapshot has zero (or possibly very little) size. Time and snapshot columns are ignored, sent data are expected to increment and reset when the next snapshot is being transfered.

Instance Method Summary collapse

Methods included from Utils::System

#find_executable!, #read_process_output, #repeat_on_failure, #syscmd, #syscmd_argv, #write_stdin, #zfs

Methods included from Utils::Log

included

Constructor Details

#initialize(fs, snapshot, from_snapshot = nil, opts = {}) ⇒ Stream

Returns a new instance of Stream.

Parameters:

  • fs (String)

    filesystem

  • snapshot (String)
  • from_snapshot (String) (defaults to: nil)
  • opts (Hash) (defaults to: {})

Options Hash (opts):

  • :intermediary (Boolean)

    defaults to ‘true`

  • :compressed (Boolean)

    defaults to ‘true`

  • :properties (Boolean)

    defaults to ‘true`

  • :large_block (Boolean)

    defaults to ‘true`



41
42
43
44
45
46
47
48
49
50
51
52
# File 'lib/libosctl/zfs/stream.rb', line 41

def initialize(fs, snapshot, from_snapshot = nil, opts = {})
  @fs = fs
  @snapshot = snapshot
  @from_snapshot = from_snapshot
  @opts = opts
  @progress = []

  @opts[:intermediary] = true unless @opts.has_key?(:intermediary)
  @opts[:compressed] = false unless @opts.has_key?(:compressed)
  @opts[:properties] = true unless @opts.has_key?(:properties)
  @opts[:large_block] = true unless @opts.has_key?(:large_block)
end

Instance Method Details

#approximate_sizeObject (protected)

Raises:

  • (ArgumentError)


255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
# File 'lib/libosctl/zfs/stream.rb', line 255

def approximate_size
  opts = %w[n v]
  opts << 'c' if @opts[:compressed]

  cmd = if @from_snapshot
          zfs(:send, "-#{opts.join} -I @#{@from_snapshot}", "#{path}@#{@snapshot}")

        else
          zfs(:send, "-#{opts.join}", "#{path}@#{@snapshot}")
        end

  rx = /^total estimated size is ([^$]+)$/
  m = rx.match(cmd.output)

  raise ArgumentError, 'unable to estimate size' if m.nil?

  parse_size(m[1])
end

#full_zfs_send_cmdObject (protected)



314
315
316
317
318
319
320
321
# File 'lib/libosctl/zfs/stream.rb', line 314

def full_zfs_send_cmd
  if @from_snapshot
    i = @opts[:intermediary] ? 'I' : 'i'
    "#{zfs_send_cmd} -v -#{i} @#{@from_snapshot} #{path}@#{@snapshot}"
  else
    "#{zfs_send_cmd} -v #{path}@#{@snapshot}"
  end
end

#monitor(send) ⇒ Object

Monitor progress of spawned zfs send, needed only when using #spawn.

Parameters:

  • send (Array)

    the second return value from #spawn



88
89
90
91
92
93
94
95
96
# File 'lib/libosctl/zfs/stream.rb', line 88

def monitor(send)
  pids, err = send

  monitor_progress(err)
  err.close

  pids.each { |pid| Process.wait(pid) }
  $?
end

#monitor_progress(io) ⇒ Object (protected)



228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
# File 'lib/libosctl/zfs/stream.rb', line 228

def monitor_progress(io)
  total = 0
  transfered = 0

  io.each_line do |line|
    n = parse_transfered(line)

    if n
      change = transfered == 0 ? n : n - transfered
      transfered = n
      total += change

    else # Transfer of another snapshot has begun, zfs send is counting from 0
      transfered = 0
      next
    end

    @progress.each do |block|
      block.call(total, transfered, change)
    end
  end
end

#notify_exec(pipeline) ⇒ Object (protected)



331
# File 'lib/libosctl/zfs/stream.rb', line 331

def notify_exec(pipeline); end

#parse_size(str) ⇒ Object (protected)



298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
# File 'lib/libosctl/zfs/stream.rb', line 298

def parse_size(str)
  size = str.to_f
  suffix = str.strip[-1]

  if suffix !~ /^\d+$/
    units = %w[B K M G T]

    raise "unsupported suffix '#{suffix}'" unless (i = units.index(suffix))

    i.times { size *= 1024 }

  end

  (size / 1024 / 1024).round
end

#parse_total_size(fd) ⇒ Object (protected)

Reads from fd until total transfer size can be estimated.

Parameters:

  • fd (IO)

    IO object to read



276
277
278
279
280
281
282
283
284
285
286
287
288
289
# File 'lib/libosctl/zfs/stream.rb', line 276

def parse_total_size(fd)
  rx = /total estimated size is ([^$]+)$/

  until fd.eof?
    line = fd.readline

    m = rx.match(line)
    next if m.nil?

    return parse_size(m[1])
  end

  raise 'unable to estimate total transfer size'
end

#parse_transfered(str) ⇒ Object (protected)



291
292
293
294
295
296
# File 'lib/libosctl/zfs/stream.rb', line 291

def parse_transfered(str)
  cols = str.split
  return if /^\d{2}:\d{2}:\d{2}$/ !~ cols[0].strip

  parse_size(cols[1])
end

#pathString

Returns path.

Returns:

  • (String)

    path



147
148
149
# File 'lib/libosctl/zfs/stream.rb', line 147

def path
  @fs
end

#pipe_cmd(cmd) ⇒ Object (protected)



153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
# File 'lib/libosctl/zfs/stream.rb', line 153

def pipe_cmd(cmd)
  @pipeline = []
  r, w = IO.pipe

  cmd_pid = Process.fork do
    $stdin.reopen(r)
    w.close
    Process.exec(*cmd)
  end

  r.close

  zfs_pid, err = zfs_send(w)
  @pipeline << Shellwords.join(cmd)

  w.close

  log(:info, :zfs, @pipeline.join(' | '))
  notify_exec(@pipeline)
  monitor_progress(err)
  err.close

  zfs_status = wait_for_pipeline_process(zfs_pid)
  cmd_status = wait_for_pipeline_process(cmd_pid)

  raise_pipeline_failure(cmd_status) unless cmd_status.success?
  raise_pipeline_failure(zfs_status) unless zfs_status.success?
end

#pipe_io(io) ⇒ Object (protected)



182
183
184
185
186
187
188
189
190
191
192
# File 'lib/libosctl/zfs/stream.rb', line 182

def pipe_io(io)
  @pipeline = []
  zfs_pid, err = zfs_send(io)

  log(:info, :zfs, @pipeline.join(' | '))
  notify_exec(@pipeline)
  monitor_progress(err)
  err.close

  Process.wait(zfs_pid)
end

#progress {|total, transfered, sent| ... } ⇒ Object

Yield Parameters:

  • total (Integer)

    total number of transfered data

  • transfered (Integer)

    number transfered data of the current snapshot

  • sent (Integer)

    transfered data since the last call



142
143
144
# File 'lib/libosctl/zfs/stream.rb', line 142

def progress(&block)
  @progress << block
end

#raise_pipeline_failure(status) ⇒ Object (protected)



199
200
201
202
203
204
205
# File 'lib/libosctl/zfs/stream.rb', line 199

def raise_pipeline_failure(status)
  raise Exceptions::SystemCommandFailed.new(
    @pipeline.join(' | '),
    status.exitstatus || (128 + status.termsig),
    ''
  )
end

#send_recv(fs) ⇒ Object

Send stream to a local filesystem.



129
130
131
# File 'lib/libosctl/zfs/stream.rb', line 129

def send_recv(fs)
  pipe_cmd(['zfs', 'recv', '-F', fs])
end

#send_to(addr, port, opts = {}) ⇒ Object

Send stream over a socket.

Parameters:

  • addr (String)
  • port (Integer)
  • opts (Hash) (defaults to: {})

Options Hash (opts):

  • :block_size (String)
  • :buffer_size (String)
  • :log_file (String)
  • :timeout (Integer)


113
114
115
116
117
118
119
120
121
122
123
124
125
126
# File 'lib/libosctl/zfs/stream.rb', line 113

def send_to(addr, port, opts = {})
  cmd = [
    opts.fetch(:command, 'mbuffer'),
    '-q',
    '-O', "#{addr}:#{port}",
    '-s', opts.fetch(:block_size, '128k').to_s,
    '-m', opts.fetch(:buffer_size, '64M').to_s,
    ('-l' if opts[:log_file]),
    opts[:log_file],
    '-W', opts.fetch(:timeout, 900).to_s
  ].compact

  pipe_cmd(cmd)
end

#sizeInteger

Get approximate stream size.

Returns:

  • (Integer)

    size in MiB



135
136
137
# File 'lib/libosctl/zfs/stream.rb', line 135

def size
  @size ||= approximate_size
end

#spawnArray<IO, Array>

Spawn zfs send and return IO with its standard output

Returns:

  • (Array<IO, Array>)


56
57
58
59
60
61
# File 'lib/libosctl/zfs/stream.rb', line 56

def spawn
  r, w = IO.pipe
  pid, err = zfs_send(w)
  w.close
  [r, [[pid], err]]
end

#spawn_with_mbuffer(command: 'mbuffer', block_size: '128k', buffer_size: '256M', start_writing_at: 60) ⇒ Array<IO, Array>

Spawn zfs send with mbuffer and return IO with its standard output

Returns:

  • (Array<IO, Array>)


65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# File 'lib/libosctl/zfs/stream.rb', line 65

def spawn_with_mbuffer(command: 'mbuffer', block_size: '128k', buffer_size: '256M', start_writing_at: 60)
  send_r, send_w = IO.pipe
  send_pid, send_err = zfs_send(send_w)
  send_w.close

  mbuf_r, mbuf_w = IO.pipe
  mbuf_pid = Process.spawn(
    command,
    '-q',
    '-s', block_size.to_s,
    '-m', buffer_size.to_s,
    '-P', start_writing_at.to_s,
    in: send_r,
    out: mbuf_w
  )
  send_r.close
  mbuf_w.close

  [mbuf_r, [[send_pid, mbuf_pid], send_err]]
end

#update_transfered(str) ⇒ Object (protected)



251
252
253
# File 'lib/libosctl/zfs/stream.rb', line 251

def update_transfered(str)
  size = update_transfered(str)
end

#wait_for_pipeline_process(pid) ⇒ Object (protected)



194
195
196
197
# File 'lib/libosctl/zfs/stream.rb', line 194

def wait_for_pipeline_process(pid)
  _, status = Process.wait2(pid)
  status
end

#write_to(io) ⇒ Object

Write stream to ‘io`

Parameters:

  • io (IO)

    writable stream



100
101
102
# File 'lib/libosctl/zfs/stream.rb', line 100

def write_to(io)
  zfs_send(io)
end

#zfs_send(stdout) ⇒ Object (protected)



207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
# File 'lib/libosctl/zfs/stream.rb', line 207

def zfs_send(stdout)
  r_err, w_err = IO.pipe
  cmd = full_zfs_send_cmd

  @pipeline << cmd if @pipeline

  pid = Process.fork do
    r_err.close
    $stdout.reopen(stdout)
    $stderr.reopen(w_err)

    Process.exec(cmd)
  end

  w_err.close

  @size = parse_total_size(r_err)

  [pid, r_err]
end

#zfs_send_cmdObject (protected)



323
324
325
326
327
328
329
# File 'lib/libosctl/zfs/stream.rb', line 323

def zfs_send_cmd
  cmd = %w[zfs send]
  cmd << '-c' if @opts[:compressed]
  cmd << '-p' if @opts[:properties]
  cmd << '-L' if @opts[:large_block]
  cmd.join(' ')
end