Class: OsCtl::Lib::Zfs::Stream
- Inherits:
-
Object
- Object
- OsCtl::Lib::Zfs::Stream
- Includes:
- Utils::Log, Utils::System
- Defined in:
- lib/libosctl/zfs/stream.rb
Overview
Control class for ZFS send invocation.
Stream uses the ‘-v` flag of zfs send to monitor how the transfer progresses. The following output from zfs send is parsed:
send from <snap1> to <snap2> estimated size is <n><unit>
send from <snap2> to <snap3> estimated size is <n><unit>
send from <snap3> to <snap4> estimated size is <n><unit>
...
total estimated size is <n><unit> # this is the size that is read
TIME SENT SNAPSHOT
HH:MM:SS <n1><unit> <snap>
HH:MM:SS <n2><unit> <snap>
HH:MM:SS <n3><unit> <snap>
TIME SENT SNAPSHOT
HH:MM:SS <m1><unit> <snap>
HH:MM:SS <m2><unit> <snap>
HH:MM:SS <m3><unit> <snap>
...
A line with column headers separates transfers of snapshots listed at the top. There might not be any transfered data between column headers if the snapshot has zero (or possibly very little) size. Time and snapshot columns are ignored, sent data are expected to increment and reset when the next snapshot is being transfered.
Instance Method Summary collapse
- #approximate_size ⇒ Object protected
- #full_zfs_send_cmd ⇒ Object protected
-
#initialize(fs, snapshot, from_snapshot = nil, opts = {}) ⇒ Stream
constructor
A new instance of Stream.
-
#monitor(send) ⇒ Object
Monitor progress of spawned zfs send, needed only when using #spawn.
- #monitor_progress(io) ⇒ Object protected
- #notify_exec(pipeline) ⇒ Object protected
- #parse_size(str) ⇒ Object protected
-
#parse_total_size(fd) ⇒ Object
protected
Reads from fd until total transfer size can be estimated.
- #parse_transfered(str) ⇒ Object protected
-
#path ⇒ String
Path.
- #pipe_cmd(cmd) ⇒ Object protected
- #pipe_io(io) ⇒ Object protected
- #progress {|total, transfered, sent| ... } ⇒ Object
-
#send_recv(fs) ⇒ Object
Send stream to a local filesystem.
-
#send_to(addr, port, opts = {}) ⇒ Object
Send stream over a socket.
-
#size ⇒ Integer
Get approximate stream size.
-
#spawn ⇒ Array<IO, Array>
Spawn zfs send and return IO with its standard output.
-
#spawn_with_mbuffer(block_size: '128k', buffer_size: '256M', start_writing_at: 60) ⇒ Array<IO, Array>
Spawn zfs send with mbuffer and return IO with its standard output.
- #update_transfered(str) ⇒ Object protected
-
#write_to(io) ⇒ Object
Write stream to ‘io`.
- #zfs_send(stdout) ⇒ Object protected
- #zfs_send_cmd ⇒ Object protected
Methods included from Utils::System
#repeat_on_failure, #syscmd, #zfs
Methods included from Utils::Log
Constructor Details
#initialize(fs, snapshot, from_snapshot = nil, opts = {}) ⇒ Stream
Returns a new instance of Stream.
39 40 41 42 43 44 45 46 47 48 49 50 |
# File 'lib/libosctl/zfs/stream.rb', line 39 def initialize(fs, snapshot, from_snapshot = nil, opts = {}) @fs = fs @snapshot = snapshot @from_snapshot = from_snapshot @opts = opts @progress = [] @opts[:intermediary] = true unless @opts.has_key?(:intermediary) @opts[:compressed] = false unless @opts.has_key?(:compressed) @opts[:properties] = true unless @opts.has_key?(:properties) @opts[:large_block] = true unless @opts.has_key?(:large_block) end |
Instance Method Details
#approximate_size ⇒ Object (protected)
236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 |
# File 'lib/libosctl/zfs/stream.rb', line 236 def approximate_size opts = %w[n v] opts << 'c' if @opts[:compressed] cmd = if @from_snapshot zfs(:send, "-#{opts.join} -I @#{@from_snapshot}", "#{path}@#{@snapshot}") else zfs(:send, "-#{opts.join}", "#{path}@#{@snapshot}") end rx = /^total estimated size is ([^$]+)$/ m = rx.match(cmd.output) raise ArgumentError, 'unable to estimate size' if m.nil? parse_size(m[1]) end |
#full_zfs_send_cmd ⇒ Object (protected)
295 296 297 298 299 300 301 302 |
# File 'lib/libosctl/zfs/stream.rb', line 295 def full_zfs_send_cmd if @from_snapshot i = @opts[:intermediary] ? 'I' : 'i' "#{zfs_send_cmd} -v -#{i} @#{@from_snapshot} #{path}@#{@snapshot}" else "#{zfs_send_cmd} -v #{path}@#{@snapshot}" end end |
#monitor(send) ⇒ Object
Monitor progress of spawned zfs send, needed only when using #spawn.
86 87 88 89 90 91 92 93 94 |
# File 'lib/libosctl/zfs/stream.rb', line 86 def monitor(send) pids, err = send monitor_progress(err) err.close pids.each { |pid| Process.wait(pid) } $? end |
#monitor_progress(io) ⇒ Object (protected)
209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 |
# File 'lib/libosctl/zfs/stream.rb', line 209 def monitor_progress(io) total = 0 transfered = 0 io.each_line do |line| n = parse_transfered(line) if n change = transfered == 0 ? n : n - transfered transfered = n total += change else # Transfer of another snapshot has begun, zfs send is counting from 0 transfered = 0 next end @progress.each do |block| block.call(total, transfered, change) end end end |
#notify_exec(pipeline) ⇒ Object (protected)
312 |
# File 'lib/libosctl/zfs/stream.rb', line 312 def notify_exec(pipeline); end |
#parse_size(str) ⇒ Object (protected)
279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 |
# File 'lib/libosctl/zfs/stream.rb', line 279 def parse_size(str) size = str.to_f suffix = str.strip[-1] if suffix !~ /^\d+$/ units = %w[B K M G T] raise "unsupported suffix '#{suffix}'" unless (i = units.index(suffix)) i.times { size *= 1024 } end (size / 1024 / 1024).round end |
#parse_total_size(fd) ⇒ Object (protected)
Reads from fd until total transfer size can be estimated.
257 258 259 260 261 262 263 264 265 266 267 268 269 270 |
# File 'lib/libosctl/zfs/stream.rb', line 257 def parse_total_size(fd) rx = /total estimated size is ([^$]+)$/ until fd.eof? line = fd.readline m = rx.match(line) next if m.nil? return parse_size(m[1]) end raise 'unable to estimate total transfer size' end |
#parse_transfered(str) ⇒ Object (protected)
272 273 274 275 276 277 |
# File 'lib/libosctl/zfs/stream.rb', line 272 def parse_transfered(str) cols = str.split return if /^\d{2}:\d{2}:\d{2}$/ !~ cols[0].strip parse_size(cols[1]) end |
#path ⇒ String
Returns path.
144 145 146 |
# File 'lib/libosctl/zfs/stream.rb', line 144 def path @fs end |
#pipe_cmd(cmd) ⇒ Object (protected)
150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 |
# File 'lib/libosctl/zfs/stream.rb', line 150 def pipe_cmd(cmd) @pipeline = [] r, w = IO.pipe cmd_pid = Process.fork do $stdin.reopen(r) w.close Process.exec(cmd) end r.close zfs_pid, err = zfs_send(w) @pipeline << cmd w.close log(:info, :zfs, @pipeline.join(' | ')) notify_exec(@pipeline) monitor_progress(err) err.close Process.wait(zfs_pid) Process.wait(cmd_pid) end |
#pipe_io(io) ⇒ Object (protected)
176 177 178 179 180 181 182 183 184 185 186 |
# File 'lib/libosctl/zfs/stream.rb', line 176 def pipe_io(io) @pipeline = [] zfs_pid, err = zfs_send(io) log(:info, :zfs, @pipeline.join(' | ')) notify_exec(@pipeline) monitor_progress(err) err.close Process.wait(zfs_pid) end |
#progress {|total, transfered, sent| ... } ⇒ Object
139 140 141 |
# File 'lib/libosctl/zfs/stream.rb', line 139 def progress(&block) @progress << block end |
#send_recv(fs) ⇒ Object
Send stream to a local filesystem.
126 127 128 |
# File 'lib/libosctl/zfs/stream.rb', line 126 def send_recv(fs) pipe_cmd("zfs recv -F #{fs}") end |
#send_to(addr, port, opts = {}) ⇒ Object
Send stream over a socket.
111 112 113 114 115 116 117 118 119 120 121 122 123 |
# File 'lib/libosctl/zfs/stream.rb', line 111 def send_to(addr, port, opts = {}) cmd = [ 'mbuffer', '-q', "-O #{addr}:#{port}", "-s #{opts.fetch(:block_size, '128k')}", "-m #{opts.fetch(:buffer_size, '64M')}", (opts[:log_file] ? "-l #{opts[:log_file]}" : nil), "-W #{opts.fetch(:timeout, 900)}" ].compact.join(' ') pipe_cmd(cmd) end |
#size ⇒ Integer
Get approximate stream size.
132 133 134 |
# File 'lib/libosctl/zfs/stream.rb', line 132 def size @size ||= approximate_size end |
#spawn ⇒ Array<IO, Array>
Spawn zfs send and return IO with its standard output
54 55 56 57 58 59 |
# File 'lib/libosctl/zfs/stream.rb', line 54 def spawn r, w = IO.pipe pid, err = zfs_send(w) w.close [r, [[pid], err]] end |
#spawn_with_mbuffer(block_size: '128k', buffer_size: '256M', start_writing_at: 60) ⇒ Array<IO, Array>
Spawn zfs send with mbuffer and return IO with its standard output
63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 |
# File 'lib/libosctl/zfs/stream.rb', line 63 def spawn_with_mbuffer(block_size: '128k', buffer_size: '256M', start_writing_at: 60) send_r, send_w = IO.pipe send_pid, send_err = zfs_send(send_w) send_w.close mbuf_r, mbuf_w = IO.pipe mbuf_pid = Process.spawn( 'mbuffer', '-q', '-s', block_size.to_s, '-m', buffer_size.to_s, '-P', start_writing_at.to_s, in: send_r, out: mbuf_w ) send_r.close mbuf_w.close [mbuf_r, [[send_pid, mbuf_pid], send_err]] end |
#update_transfered(str) ⇒ Object (protected)
232 233 234 |
# File 'lib/libosctl/zfs/stream.rb', line 232 def update_transfered(str) size = update_transfered(str) end |
#write_to(io) ⇒ Object
Write stream to ‘io`
98 99 100 |
# File 'lib/libosctl/zfs/stream.rb', line 98 def write_to(io) zfs_send(io) end |
#zfs_send(stdout) ⇒ Object (protected)
188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 |
# File 'lib/libosctl/zfs/stream.rb', line 188 def zfs_send(stdout) r_err, w_err = IO.pipe cmd = full_zfs_send_cmd @pipeline << cmd if @pipeline pid = Process.fork do r_err.close $stdout.reopen(stdout) $stderr.reopen(w_err) Process.exec(cmd) end w_err.close @size = parse_total_size(r_err) [pid, r_err] end |
#zfs_send_cmd ⇒ Object (protected)
304 305 306 307 308 309 310 |
# File 'lib/libosctl/zfs/stream.rb', line 304 def zfs_send_cmd cmd = %w[zfs send] cmd << '-c' if @opts[:compressed] cmd << '-p' if @opts[:properties] cmd << '-L' if @opts[:large_block] cmd.join(' ') end |