Class: OsCtl::Lib::Zfs::Stream

Inherits:
Object
  • Object
show all
Includes:
Utils::Log, Utils::System
Defined in:
lib/libosctl/zfs/stream.rb

Overview

Control class for ZFS send invocation.

Stream uses the ‘-v` flag of zfs send to monitor how the transfer progresses. The following output from zfs send is parsed:

send from <snap1> to <snap2> estimated size is <n><unit>
send from <snap2> to <snap3> estimated size is <n><unit>
send from <snap3> to <snap4> estimated size is <n><unit>
...
total estimated size is <n><unit>  # this is the size that is read
TIME        SENT         SNAPSHOT
HH:MM:SS    <n1><unit>   <snap>
HH:MM:SS    <n2><unit>   <snap>
HH:MM:SS    <n3><unit>   <snap>
TIME        SENT         SNAPSHOT
HH:MM:SS    <m1><unit>   <snap>
HH:MM:SS    <m2><unit>   <snap>
HH:MM:SS    <m3><unit>   <snap>
...

A line with column headers separates transfers of snapshots listed at the top. There might not be any transfered data between column headers if the snapshot has zero (or possibly very little) size. Time and snapshot columns are ignored, sent data are expected to increment and reset when the next snapshot is being transfered.

Instance Method Summary collapse

Methods included from Utils::System

#repeat_on_failure, #syscmd, #zfs

Methods included from Utils::Log

included

Constructor Details

#initialize(fs, snapshot, from_snapshot = nil, opts = {}) ⇒ Stream

Returns a new instance of Stream.

Parameters:

  • fs (String)

    filesystem

  • snapshot (String)
  • from_snapshot (String) (defaults to: nil)
  • opts (Hash) (defaults to: {})

Options Hash (opts):

  • :intermediary (Boolean)

    defaults to ‘true`

  • :compressed (Boolean)

    defaults to ‘true`

  • :properties (Boolean)

    defaults to ‘true`

  • :large_block (Boolean)

    defaults to ‘true`



39
40
41
42
43
44
45
46
47
48
49
50
# File 'lib/libosctl/zfs/stream.rb', line 39

def initialize(fs, snapshot, from_snapshot = nil, opts = {})
  @fs = fs
  @snapshot = snapshot
  @from_snapshot = from_snapshot
  @opts = opts
  @progress = []

  @opts[:intermediary] = true unless @opts.has_key?(:intermediary)
  @opts[:compressed] = false unless @opts.has_key?(:compressed)
  @opts[:properties] = true unless @opts.has_key?(:properties)
  @opts[:large_block] = true unless @opts.has_key?(:large_block)
end

Instance Method Details

#approximate_sizeObject (protected)

Raises:

  • (ArgumentError)


236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
# File 'lib/libosctl/zfs/stream.rb', line 236

def approximate_size
  opts = %w[n v]
  opts << 'c' if @opts[:compressed]

  cmd = if @from_snapshot
          zfs(:send, "-#{opts.join} -I @#{@from_snapshot}", "#{path}@#{@snapshot}")

        else
          zfs(:send, "-#{opts.join}", "#{path}@#{@snapshot}")
        end

  rx = /^total estimated size is ([^$]+)$/
  m = rx.match(cmd.output)

  raise ArgumentError, 'unable to estimate size' if m.nil?

  parse_size(m[1])
end

#full_zfs_send_cmdObject (protected)



295
296
297
298
299
300
301
302
# File 'lib/libosctl/zfs/stream.rb', line 295

def full_zfs_send_cmd
  if @from_snapshot
    i = @opts[:intermediary] ? 'I' : 'i'
    "#{zfs_send_cmd} -v -#{i} @#{@from_snapshot} #{path}@#{@snapshot}"
  else
    "#{zfs_send_cmd} -v #{path}@#{@snapshot}"
  end
end

#monitor(send) ⇒ Object

Monitor progress of spawned zfs send, needed only when using #spawn.

Parameters:

  • send (Array)

    the second return value from #spawn



86
87
88
89
90
91
92
93
94
# File 'lib/libosctl/zfs/stream.rb', line 86

def monitor(send)
  pids, err = send

  monitor_progress(err)
  err.close

  pids.each { |pid| Process.wait(pid) }
  $?
end

#monitor_progress(io) ⇒ Object (protected)



209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
# File 'lib/libosctl/zfs/stream.rb', line 209

def monitor_progress(io)
  total = 0
  transfered = 0

  io.each_line do |line|
    n = parse_transfered(line)

    if n
      change = transfered == 0 ? n : n - transfered
      transfered = n
      total += change

    else # Transfer of another snapshot has begun, zfs send is counting from 0
      transfered = 0
      next
    end

    @progress.each do |block|
      block.call(total, transfered, change)
    end
  end
end

#notify_exec(pipeline) ⇒ Object (protected)



312
# File 'lib/libosctl/zfs/stream.rb', line 312

def notify_exec(pipeline); end

#parse_size(str) ⇒ Object (protected)



279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
# File 'lib/libosctl/zfs/stream.rb', line 279

def parse_size(str)
  size = str.to_f
  suffix = str.strip[-1]

  if suffix !~ /^\d+$/
    units = %w[B K M G T]

    raise "unsupported suffix '#{suffix}'" unless (i = units.index(suffix))

    i.times { size *= 1024 }

  end

  (size / 1024 / 1024).round
end

#parse_total_size(fd) ⇒ Object (protected)

Reads from fd until total transfer size can be estimated.

Parameters:

  • fd (IO)

    IO object to read



257
258
259
260
261
262
263
264
265
266
267
268
269
270
# File 'lib/libosctl/zfs/stream.rb', line 257

def parse_total_size(fd)
  rx = /total estimated size is ([^$]+)$/

  until fd.eof?
    line = fd.readline

    m = rx.match(line)
    next if m.nil?

    return parse_size(m[1])
  end

  raise 'unable to estimate total transfer size'
end

#parse_transfered(str) ⇒ Object (protected)



272
273
274
275
276
277
# File 'lib/libosctl/zfs/stream.rb', line 272

def parse_transfered(str)
  cols = str.split
  return if /^\d{2}:\d{2}:\d{2}$/ !~ cols[0].strip

  parse_size(cols[1])
end

#pathString

Returns path.

Returns:

  • (String)

    path



144
145
146
# File 'lib/libosctl/zfs/stream.rb', line 144

def path
  @fs
end

#pipe_cmd(cmd) ⇒ Object (protected)



150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
# File 'lib/libosctl/zfs/stream.rb', line 150

def pipe_cmd(cmd)
  @pipeline = []
  r, w = IO.pipe

  cmd_pid = Process.fork do
    $stdin.reopen(r)
    w.close
    Process.exec(cmd)
  end

  r.close

  zfs_pid, err = zfs_send(w)
  @pipeline << cmd

  w.close

  log(:info, :zfs, @pipeline.join(' | '))
  notify_exec(@pipeline)
  monitor_progress(err)
  err.close

  Process.wait(zfs_pid)
  Process.wait(cmd_pid)
end

#pipe_io(io) ⇒ Object (protected)



176
177
178
179
180
181
182
183
184
185
186
# File 'lib/libosctl/zfs/stream.rb', line 176

def pipe_io(io)
  @pipeline = []
  zfs_pid, err = zfs_send(io)

  log(:info, :zfs, @pipeline.join(' | '))
  notify_exec(@pipeline)
  monitor_progress(err)
  err.close

  Process.wait(zfs_pid)
end

#progress {|total, transfered, sent| ... } ⇒ Object

Yield Parameters:

  • total (Integer)

    total number of transfered data

  • transfered (Integer)

    number transfered data of the current snapshot

  • sent (Integer)

    transfered data since the last call



139
140
141
# File 'lib/libosctl/zfs/stream.rb', line 139

def progress(&block)
  @progress << block
end

#send_recv(fs) ⇒ Object

Send stream to a local filesystem.



126
127
128
# File 'lib/libosctl/zfs/stream.rb', line 126

def send_recv(fs)
  pipe_cmd("zfs recv -F #{fs}")
end

#send_to(addr, port, opts = {}) ⇒ Object

Send stream over a socket.

Parameters:

  • addr (String)
  • port (Integer)
  • opts (Hash) (defaults to: {})

Options Hash (opts):

  • :block_size (String)
  • :buffer_size (String)
  • :log_file (String)
  • :timeout (Integer)


111
112
113
114
115
116
117
118
119
120
121
122
123
# File 'lib/libosctl/zfs/stream.rb', line 111

def send_to(addr, port, opts = {})
  cmd = [
    'mbuffer',
    '-q',
    "-O #{addr}:#{port}",
    "-s #{opts.fetch(:block_size, '128k')}",
    "-m #{opts.fetch(:buffer_size, '64M')}",
    (opts[:log_file] ? "-l #{opts[:log_file]}" : nil),
    "-W #{opts.fetch(:timeout, 900)}"
  ].compact.join(' ')

  pipe_cmd(cmd)
end

#sizeInteger

Get approximate stream size.

Returns:

  • (Integer)

    size in MiB



132
133
134
# File 'lib/libosctl/zfs/stream.rb', line 132

def size
  @size ||= approximate_size
end

#spawnArray<IO, Array>

Spawn zfs send and return IO with its standard output

Returns:

  • (Array<IO, Array>)


54
55
56
57
58
59
# File 'lib/libosctl/zfs/stream.rb', line 54

def spawn
  r, w = IO.pipe
  pid, err = zfs_send(w)
  w.close
  [r, [[pid], err]]
end

#spawn_with_mbuffer(block_size: '128k', buffer_size: '256M', start_writing_at: 60) ⇒ Array<IO, Array>

Spawn zfs send with mbuffer and return IO with its standard output

Returns:

  • (Array<IO, Array>)


63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
# File 'lib/libosctl/zfs/stream.rb', line 63

def spawn_with_mbuffer(block_size: '128k', buffer_size: '256M', start_writing_at: 60)
  send_r, send_w = IO.pipe
  send_pid, send_err = zfs_send(send_w)
  send_w.close

  mbuf_r, mbuf_w = IO.pipe
  mbuf_pid = Process.spawn(
    'mbuffer',
    '-q',
    '-s', block_size.to_s,
    '-m', buffer_size.to_s,
    '-P', start_writing_at.to_s,
    in: send_r,
    out: mbuf_w
  )
  send_r.close
  mbuf_w.close

  [mbuf_r, [[send_pid, mbuf_pid], send_err]]
end

#update_transfered(str) ⇒ Object (protected)



232
233
234
# File 'lib/libosctl/zfs/stream.rb', line 232

def update_transfered(str)
  size = update_transfered(str)
end

#write_to(io) ⇒ Object

Write stream to ‘io`

Parameters:

  • io (IO)

    writable stream



98
99
100
# File 'lib/libosctl/zfs/stream.rb', line 98

def write_to(io)
  zfs_send(io)
end

#zfs_send(stdout) ⇒ Object (protected)



188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
# File 'lib/libosctl/zfs/stream.rb', line 188

def zfs_send(stdout)
  r_err, w_err = IO.pipe
  cmd = full_zfs_send_cmd

  @pipeline << cmd if @pipeline

  pid = Process.fork do
    r_err.close
    $stdout.reopen(stdout)
    $stderr.reopen(w_err)

    Process.exec(cmd)
  end

  w_err.close

  @size = parse_total_size(r_err)

  [pid, r_err]
end

#zfs_send_cmdObject (protected)



304
305
306
307
308
309
310
# File 'lib/libosctl/zfs/stream.rb', line 304

def zfs_send_cmd
  cmd = %w[zfs send]
  cmd << '-c' if @opts[:compressed]
  cmd << '-p' if @opts[:properties]
  cmd << '-L' if @opts[:large_block]
  cmd.join(' ')
end