Class: OsCtl::Lib::Zfs::Stream
- Inherits:
-
Object
- Object
- OsCtl::Lib::Zfs::Stream
- Includes:
- Utils::Log, Utils::System
- Defined in:
- lib/libosctl/zfs/stream.rb
Overview
Control class for ZFS send invocation.
Stream uses the ‘-v` flag of zfs send to monitor how the transfer progresses. The following output from zfs send is parsed:
send from <snap1> to <snap2> estimated size is <n><unit>
send from <snap2> to <snap3> estimated size is <n><unit>
send from <snap3> to <snap4> estimated size is <n><unit>
...
total estimated size is <n><unit> # this is the size that is read
TIME SENT SNAPSHOT
HH:MM:SS <n1><unit> <snap>
HH:MM:SS <n2><unit> <snap>
HH:MM:SS <n3><unit> <snap>
TIME SENT SNAPSHOT
HH:MM:SS <m1><unit> <snap>
HH:MM:SS <m2><unit> <snap>
HH:MM:SS <m3><unit> <snap>
...
A line with column headers separates transfers of snapshots listed at the top. There might not be any transfered data between column headers if the snapshot has zero (or possibly very little) size. Time and snapshot columns are ignored, sent data are expected to increment and reset when the next snapshot is being transfered.
Instance Method Summary collapse
- #approximate_size ⇒ Object protected
- #full_zfs_send_cmd ⇒ Object protected
-
#initialize(fs, snapshot, from_snapshot = nil, opts = {}) ⇒ Stream
constructor
A new instance of Stream.
-
#monitor(send) ⇒ Object
Monitor progress of spawned zfs send, needed only when using #spawn.
- #monitor_progress(io) ⇒ Object protected
- #notify_exec(pipeline) ⇒ Object protected
- #parse_size(str) ⇒ Object protected
-
#parse_total_size(fd) ⇒ Object
protected
Reads from fd until total transfer size can be estimated.
- #parse_transfered(str) ⇒ Object protected
-
#path ⇒ String
Path.
- #pipe_cmd(cmd) ⇒ Object protected
- #pipe_io(io) ⇒ Object protected
- #progress {|total, transfered, sent| ... } ⇒ Object
- #raise_pipeline_failure(status) ⇒ Object protected
-
#send_recv(fs) ⇒ Object
Send stream to a local filesystem.
-
#send_to(addr, port, opts = {}) ⇒ Object
Send stream over a socket.
-
#size ⇒ Integer
Get approximate stream size.
-
#spawn ⇒ Array<IO, Array>
Spawn zfs send and return IO with its standard output.
-
#spawn_with_mbuffer(command: 'mbuffer', block_size: '128k', buffer_size: '256M', start_writing_at: 60) ⇒ Array<IO, Array>
Spawn zfs send with mbuffer and return IO with its standard output.
- #update_transfered(str) ⇒ Object protected
- #wait_for_pipeline_process(pid) ⇒ Object protected
-
#write_to(io) ⇒ Object
Write stream to ‘io`.
- #zfs_send(stdout) ⇒ Object protected
- #zfs_send_cmd ⇒ Object protected
Methods included from Utils::System
#find_executable!, #read_process_output, #repeat_on_failure, #syscmd, #syscmd_argv, #write_stdin, #zfs
Methods included from Utils::Log
Constructor Details
#initialize(fs, snapshot, from_snapshot = nil, opts = {}) ⇒ Stream
Returns a new instance of Stream.
41 42 43 44 45 46 47 48 49 50 51 52 |
# File 'lib/libosctl/zfs/stream.rb', line 41 def initialize(fs, snapshot, from_snapshot = nil, opts = {}) @fs = fs @snapshot = snapshot @from_snapshot = from_snapshot @opts = opts @progress = [] @opts[:intermediary] = true unless @opts.has_key?(:intermediary) @opts[:compressed] = false unless @opts.has_key?(:compressed) @opts[:properties] = true unless @opts.has_key?(:properties) @opts[:large_block] = true unless @opts.has_key?(:large_block) end |
Instance Method Details
#approximate_size ⇒ Object (protected)
255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 |
# File 'lib/libosctl/zfs/stream.rb', line 255 def approximate_size opts = %w[n v] opts << 'c' if @opts[:compressed] cmd = if @from_snapshot zfs(:send, "-#{opts.join} -I @#{@from_snapshot}", "#{path}@#{@snapshot}") else zfs(:send, "-#{opts.join}", "#{path}@#{@snapshot}") end rx = /^total estimated size is ([^$]+)$/ m = rx.match(cmd.output) raise ArgumentError, 'unable to estimate size' if m.nil? parse_size(m[1]) end |
#full_zfs_send_cmd ⇒ Object (protected)
314 315 316 317 318 319 320 321 |
# File 'lib/libosctl/zfs/stream.rb', line 314 def full_zfs_send_cmd if @from_snapshot i = @opts[:intermediary] ? 'I' : 'i' "#{zfs_send_cmd} -v -#{i} @#{@from_snapshot} #{path}@#{@snapshot}" else "#{zfs_send_cmd} -v #{path}@#{@snapshot}" end end |
#monitor(send) ⇒ Object
Monitor progress of spawned zfs send, needed only when using #spawn.
88 89 90 91 92 93 94 95 96 |
# File 'lib/libosctl/zfs/stream.rb', line 88 def monitor(send) pids, err = send monitor_progress(err) err.close pids.each { |pid| Process.wait(pid) } $? end |
#monitor_progress(io) ⇒ Object (protected)
228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 |
# File 'lib/libosctl/zfs/stream.rb', line 228 def monitor_progress(io) total = 0 transfered = 0 io.each_line do |line| n = parse_transfered(line) if n change = transfered == 0 ? n : n - transfered transfered = n total += change else # Transfer of another snapshot has begun, zfs send is counting from 0 transfered = 0 next end @progress.each do |block| block.call(total, transfered, change) end end end |
#notify_exec(pipeline) ⇒ Object (protected)
331 |
# File 'lib/libosctl/zfs/stream.rb', line 331 def notify_exec(pipeline); end |
#parse_size(str) ⇒ Object (protected)
298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 |
# File 'lib/libosctl/zfs/stream.rb', line 298 def parse_size(str) size = str.to_f suffix = str.strip[-1] if suffix !~ /^\d+$/ units = %w[B K M G T] raise "unsupported suffix '#{suffix}'" unless (i = units.index(suffix)) i.times { size *= 1024 } end (size / 1024 / 1024).round end |
#parse_total_size(fd) ⇒ Object (protected)
Reads from fd until total transfer size can be estimated.
276 277 278 279 280 281 282 283 284 285 286 287 288 289 |
# File 'lib/libosctl/zfs/stream.rb', line 276 def parse_total_size(fd) rx = /total estimated size is ([^$]+)$/ until fd.eof? line = fd.readline m = rx.match(line) next if m.nil? return parse_size(m[1]) end raise 'unable to estimate total transfer size' end |
#parse_transfered(str) ⇒ Object (protected)
291 292 293 294 295 296 |
# File 'lib/libosctl/zfs/stream.rb', line 291 def parse_transfered(str) cols = str.split return if /^\d{2}:\d{2}:\d{2}$/ !~ cols[0].strip parse_size(cols[1]) end |
#path ⇒ String
Returns path.
147 148 149 |
# File 'lib/libosctl/zfs/stream.rb', line 147 def path @fs end |
#pipe_cmd(cmd) ⇒ Object (protected)
153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 |
# File 'lib/libosctl/zfs/stream.rb', line 153 def pipe_cmd(cmd) @pipeline = [] r, w = IO.pipe cmd_pid = Process.fork do $stdin.reopen(r) w.close Process.exec(*cmd) end r.close zfs_pid, err = zfs_send(w) @pipeline << Shellwords.join(cmd) w.close log(:info, :zfs, @pipeline.join(' | ')) notify_exec(@pipeline) monitor_progress(err) err.close zfs_status = wait_for_pipeline_process(zfs_pid) cmd_status = wait_for_pipeline_process(cmd_pid) raise_pipeline_failure(cmd_status) unless cmd_status.success? raise_pipeline_failure(zfs_status) unless zfs_status.success? end |
#pipe_io(io) ⇒ Object (protected)
182 183 184 185 186 187 188 189 190 191 192 |
# File 'lib/libosctl/zfs/stream.rb', line 182 def pipe_io(io) @pipeline = [] zfs_pid, err = zfs_send(io) log(:info, :zfs, @pipeline.join(' | ')) notify_exec(@pipeline) monitor_progress(err) err.close Process.wait(zfs_pid) end |
#progress {|total, transfered, sent| ... } ⇒ Object
142 143 144 |
# File 'lib/libosctl/zfs/stream.rb', line 142 def progress(&block) @progress << block end |
#raise_pipeline_failure(status) ⇒ Object (protected)
199 200 201 202 203 204 205 |
# File 'lib/libosctl/zfs/stream.rb', line 199 def raise_pipeline_failure(status) raise Exceptions::SystemCommandFailed.new( @pipeline.join(' | '), status.exitstatus || (128 + status.termsig), '' ) end |
#send_recv(fs) ⇒ Object
Send stream to a local filesystem.
129 130 131 |
# File 'lib/libosctl/zfs/stream.rb', line 129 def send_recv(fs) pipe_cmd(['zfs', 'recv', '-F', fs]) end |
#send_to(addr, port, opts = {}) ⇒ Object
Send stream over a socket.
113 114 115 116 117 118 119 120 121 122 123 124 125 126 |
# File 'lib/libosctl/zfs/stream.rb', line 113 def send_to(addr, port, opts = {}) cmd = [ opts.fetch(:command, 'mbuffer'), '-q', '-O', "#{addr}:#{port}", '-s', opts.fetch(:block_size, '128k').to_s, '-m', opts.fetch(:buffer_size, '64M').to_s, ('-l' if opts[:log_file]), opts[:log_file], '-W', opts.fetch(:timeout, 900).to_s ].compact pipe_cmd(cmd) end |
#size ⇒ Integer
Get approximate stream size.
135 136 137 |
# File 'lib/libosctl/zfs/stream.rb', line 135 def size @size ||= approximate_size end |
#spawn ⇒ Array<IO, Array>
Spawn zfs send and return IO with its standard output
56 57 58 59 60 61 |
# File 'lib/libosctl/zfs/stream.rb', line 56 def spawn r, w = IO.pipe pid, err = zfs_send(w) w.close [r, [[pid], err]] end |
#spawn_with_mbuffer(command: 'mbuffer', block_size: '128k', buffer_size: '256M', start_writing_at: 60) ⇒ Array<IO, Array>
Spawn zfs send with mbuffer and return IO with its standard output
65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 |
# File 'lib/libosctl/zfs/stream.rb', line 65 def spawn_with_mbuffer(command: 'mbuffer', block_size: '128k', buffer_size: '256M', start_writing_at: 60) send_r, send_w = IO.pipe send_pid, send_err = zfs_send(send_w) send_w.close mbuf_r, mbuf_w = IO.pipe mbuf_pid = Process.spawn( command, '-q', '-s', block_size.to_s, '-m', buffer_size.to_s, '-P', start_writing_at.to_s, in: send_r, out: mbuf_w ) send_r.close mbuf_w.close [mbuf_r, [[send_pid, mbuf_pid], send_err]] end |
#update_transfered(str) ⇒ Object (protected)
251 252 253 |
# File 'lib/libosctl/zfs/stream.rb', line 251 def update_transfered(str) size = update_transfered(str) end |
#wait_for_pipeline_process(pid) ⇒ Object (protected)
194 195 196 197 |
# File 'lib/libosctl/zfs/stream.rb', line 194 def wait_for_pipeline_process(pid) _, status = Process.wait2(pid) status end |
#write_to(io) ⇒ Object
Write stream to ‘io`
100 101 102 |
# File 'lib/libosctl/zfs/stream.rb', line 100 def write_to(io) zfs_send(io) end |
#zfs_send(stdout) ⇒ Object (protected)
207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 |
# File 'lib/libosctl/zfs/stream.rb', line 207 def zfs_send(stdout) r_err, w_err = IO.pipe cmd = full_zfs_send_cmd @pipeline << cmd if @pipeline pid = Process.fork do r_err.close $stdout.reopen(stdout) $stderr.reopen(w_err) Process.exec(cmd) end w_err.close @size = parse_total_size(r_err) [pid, r_err] end |
#zfs_send_cmd ⇒ Object (protected)
323 324 325 326 327 328 329 |
# File 'lib/libosctl/zfs/stream.rb', line 323 def zfs_send_cmd cmd = %w[zfs send] cmd << '-c' if @opts[:compressed] cmd << '-p' if @opts[:properties] cmd << '-L' if @opts[:large_block] cmd.join(' ') end |