Class: OsCtl::Lib::Zfs::Stream
- Inherits:
-
Object
- Object
- OsCtl::Lib::Zfs::Stream
- Includes:
- Utils::Log, Utils::System
- Defined in:
- lib/libosctl/zfs/stream.rb
Overview
Control class for ZFS send invocation.
Stream uses the `-v` flag of zfs send to monitor how the transfer progresses. The following output from zfs send is parsed:
send from <snap1> to <snap2> estimated size is <n><unit>
send from <snap2> to <snap3> estimated size is <n><unit>
send from <snap3> to <snap4> estimated size is <n><unit>
...
total estimated size is <n><unit> # this is the size that is read
TIME SENT SNAPSHOT
HH:MM:SS <n1><unit> <snap>
HH:MM:SS <n2><unit> <snap>
HH:MM:SS <n3><unit> <snap>
TIME SENT SNAPSHOT
HH:MM:SS <m1><unit> <snap>
HH:MM:SS <m2><unit> <snap>
HH:MM:SS <m3><unit> <snap>
...
A line with column headers separates transfers of snapshots listed at the top. There might not be any transfered data between column headers if the snapshot has zero (or possibly very little) size. Time and snapshot columns are ignored, sent data are expected to increment and reset when the next snapshot is being transfered.
Instance Method Summary collapse
- #approximate_size ⇒ Object protected
- #full_zfs_send_cmd ⇒ Object protected
-
#initialize(fs, snapshot, from_snapshot = nil, opts = {}) ⇒ Stream
constructor
A new instance of Stream.
-
#monitor(send) ⇒ Object
Monitor progress of spawned zfs send, needed only when using #spawn.
- #monitor_progress(io) ⇒ Object protected
- #notify_exec(pipeline) ⇒ Object protected
- #parse_size(str) ⇒ Object protected
-
#parse_total_size(fd) ⇒ Object
protected
Reads from fd until total transfer size can be estimated.
- #parse_transfered(str) ⇒ Object protected
-
#path ⇒ String
Path.
- #pipe_cmd(cmd) ⇒ Object protected
- #pipe_io(io) ⇒ Object protected
- #progress {|total, transfered, sent| ... } ⇒ Object
-
#send_recv(fs) ⇒ Object
Send stream to a local filesystem.
-
#send_to(addr, port: nil, timeout: 900) ⇒ Object
Send stream over a socket.
-
#size ⇒ Integer
Get approximate stream size.
-
#spawn ⇒ IO
Spawn zfs send and return IO with its standard output.
- #update_transfered(str) ⇒ Object protected
-
#write_to(io) ⇒ Object
Write stream to `io`.
- #zfs_send(stdout) ⇒ Object protected
- #zfs_send_cmd ⇒ Object protected
Methods included from Utils::System
#repeat_on_failure, #syscmd, #zfs
Methods included from Utils::Log
Constructor Details
#initialize(fs, snapshot, from_snapshot = nil, opts = {}) ⇒ Stream
Returns a new instance of Stream.
38 39 40 41 42 43 44 45 46 47 48 |
# File 'lib/libosctl/zfs/stream.rb', line 38 def initialize(fs, snapshot, from_snapshot = nil, opts = {}) @fs = fs @snapshot = snapshot @from_snapshot = from_snapshot @opts = opts @progress = [] @opts[:compressed] = true unless @opts.has_key?(:compressed) @opts[:properties] = true unless @opts.has_key?(:properties) @opts[:large_block] = true unless @opts.has_key?(:large_block) end |
Instance Method Details
#approximate_size ⇒ Object (protected)
199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 |
# File 'lib/libosctl/zfs/stream.rb', line 199 def approximate_size opts = ['n', 'v'] opts << 'c' if @opts[:compressed] if @from_snapshot cmd = zfs(:send, "-#{opts.join} -I @#{@from_snapshot}", "#{path}@#{@snapshot}") else cmd = zfs(:send, "-#{opts.join}", "#{path}@#{@snapshot}") end rx = /^total estimated size is ([^$]+)$/ m = rx.match(cmd.output) raise ArgumentError, 'unable to estimate size' if m.nil? parse_size(m[1]) end |
#full_zfs_send_cmd ⇒ Object (protected)
260 261 262 263 264 265 266 |
# File 'lib/libosctl/zfs/stream.rb', line 260 def full_zfs_send_cmd if @from_snapshot "#{zfs_send_cmd} -v -I @#{@from_snapshot} #{path}@#{@snapshot}" else "#{zfs_send_cmd} -v #{path}@#{@snapshot}" end end |
#monitor(send) ⇒ Object
Monitor progress of spawned zfs send, needed only when using #spawn.
61 62 63 64 65 66 67 68 69 |
# File 'lib/libosctl/zfs/stream.rb', line 61 def monitor(send) pid, err = send monitor_progress(err) err.close Process.wait(pid) $? end |
#monitor_progress(io) ⇒ Object (protected)
172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 |
# File 'lib/libosctl/zfs/stream.rb', line 172 def monitor_progress(io) total = 0 transfered = 0 io.each_line do |line| n = parse_transfered(line) if n change = transfered == 0 ? n : n - transfered transfered = n total += change else # Transfer of another snapshot has begun, zfs send is counting from 0 transfered = 0 next end @progress.each do |block| block.call(total, transfered, change) end end end |
#notify_exec(pipeline) ⇒ Object (protected)
276 277 278 |
# File 'lib/libosctl/zfs/stream.rb', line 276 def notify_exec(pipeline) end |
#parse_size(str) ⇒ Object (protected)
242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 |
# File 'lib/libosctl/zfs/stream.rb', line 242 def parse_size(str) size = str.to_f suffix = str.strip[-1] if suffix !~ /^\d+$/ units = %w(B K M G T) if i = units.index(suffix) i.times { size *= 1024 } else fail "unsupported suffix '#{suffix}'" end end (size / 1024 / 1024).round end |
#parse_total_size(fd) ⇒ Object (protected)
Reads from fd until total transfer size can be estimated.
220 221 222 223 224 225 226 227 228 229 230 231 232 233 |
# File 'lib/libosctl/zfs/stream.rb', line 220 def parse_total_size(fd) rx = /total estimated size is ([^$]+)$/ until fd.eof? do line = fd.readline m = rx.match(line) next if m.nil? return parse_size(m[1]) end fail 'unable to estimate total transfer size' end |
#parse_transfered(str) ⇒ Object (protected)
235 236 237 238 239 240 |
# File 'lib/libosctl/zfs/stream.rb', line 235 def parse_transfered(str) cols = str.split return if /^\d{2}:\d{2}:\d{2}$/ !~ cols[0].strip parse_size(cols[1]) end |
#path ⇒ String
Returns path.
108 109 110 |
# File 'lib/libosctl/zfs/stream.rb', line 108 def path @fs end |
#pipe_cmd(cmd) ⇒ Object (protected)
113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 |
# File 'lib/libosctl/zfs/stream.rb', line 113 def pipe_cmd(cmd) @pipeline = [] r, w = IO.pipe cmd_pid = Process.fork do STDIN.reopen(r) w.close Process.exec(cmd) end r.close zfs_pid, err = zfs_send(w) @pipeline << cmd w.close log(:info, :zfs, @pipeline.join(' | ')) notify_exec(@pipeline) monitor_progress(err) err.close Process.wait(zfs_pid) Process.wait(cmd_pid) end |
#pipe_io(io) ⇒ Object (protected)
139 140 141 142 143 144 145 146 147 148 149 |
# File 'lib/libosctl/zfs/stream.rb', line 139 def pipe_io(io) @pipeline = [] zfs_pid, err = zfs_send(io) log(:info, :zfs, @pipeline.join(' | ')) notify_exec(@pipeline) monitor_progress(err) err.close Process.wait(zfs_pid) end |
#progress {|total, transfered, sent| ... } ⇒ Object
103 104 105 |
# File 'lib/libosctl/zfs/stream.rb', line 103 def progress(&block) @progress << block end |
#send_recv(fs) ⇒ Object
Send stream to a local filesystem.
90 91 92 |
# File 'lib/libosctl/zfs/stream.rb', line 90 def send_recv(fs) pipe_cmd("zfs recv -F #{fs}") end |
#send_to(addr, port: nil, timeout: 900) ⇒ Object
Send stream over a socket.
78 79 80 81 82 83 84 85 86 87 |
# File 'lib/libosctl/zfs/stream.rb', line 78 def send_to(addr, port: nil, timeout: 900) socat = "socat -u -T #{timeout} 'EXEC:\"#{full_zfs_send_cmd}\"' TCP:#{addr}:#{port}" log(:info, :zfs, socat) IO.popen("exec #{socat} 2>&1") do |io| @size = parse_total_size(io) monitor_progress(io) end end |
#size ⇒ Integer
Get approximate stream size.
96 97 98 |
# File 'lib/libosctl/zfs/stream.rb', line 96 def size @size ||= approximate_size end |
#spawn ⇒ IO
Spawn zfs send and return IO with its standard output
52 53 54 55 56 57 |
# File 'lib/libosctl/zfs/stream.rb', line 52 def spawn r, w = IO.pipe pid, err = zfs_send(w) w.close [r, [pid, err]] end |
#update_transfered(str) ⇒ Object (protected)
195 196 197 |
# File 'lib/libosctl/zfs/stream.rb', line 195 def update_transfered(str) size = update_transfered(str) end |
#write_to(io) ⇒ Object
Write stream to `io`
73 74 75 |
# File 'lib/libosctl/zfs/stream.rb', line 73 def write_to(io) zfs_send(io) end |
#zfs_send(stdout) ⇒ Object (protected)
151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 |
# File 'lib/libosctl/zfs/stream.rb', line 151 def zfs_send(stdout) r_err, w_err = IO.pipe cmd = full_zfs_send_cmd @pipeline << cmd if @pipeline pid = Process.fork do r_err.close STDOUT.reopen(stdout) STDERR.reopen(w_err) Process.exec(cmd) end w_err.close @size = parse_total_size(r_err) [pid, r_err] end |
#zfs_send_cmd ⇒ Object (protected)
268 269 270 271 272 273 274 |
# File 'lib/libosctl/zfs/stream.rb', line 268 def zfs_send_cmd cmd = ['zfs', 'send'] cmd << '-c' if @opts[:compressed] cmd << '-p' if @opts[:properties] cmd << '-L' if @opts[:large_block] cmd.join(' ') end |