var through = require('through2'); var speedometer = require('speedometer'); module.exports = function(options, onprogress) { if (typeof options === 'function') return module.exports(null, options); options = options || {}; var length = options.length || 0; var time = options.time || 0; var drain = options.drain || false; var transferred = options.transferred || 0; var nextUpdate = Date.now()+time; var delta = 0; var speed = speedometer(options.speed || 5000); var startTime = Date.now(); var update = { percentage: 0, transferred: transferred, length: length, remaining: length, eta: 0, runtime: 0 }; var emit = function(ended) { update.delta = delta; update.percentage = ended ? 100 : (length ? transferred/length*100 : 0); update.speed = speed(delta); update.eta = Math.round(update.remaining / update.speed); update.runtime = parseInt((Date.now() - startTime)/1000); nextUpdate = Date.now()+time; delta = 0; tr.emit('progress', update); }; var write = function(chunk, enc, callback) { var len = options.objectMode ? 1 : chunk.length; transferred += len; delta += len; update.transferred = transferred; update.remaining = length >= transferred ? length - transferred : 0; if (Date.now() >= nextUpdate) emit(false); callback(null, chunk); }; var end = function(callback) { emit(true); callback(); }; var tr = through(options.objectMode ? {objectMode:true, highWaterMark:16} : {}, write, end); var onlength = function(newLength) { length = newLength; update.length = length; update.remaining = length - update.transferred; tr.emit('length', length); }; // Expose `onlength()` handler as `setLength()` to support custom use cases where length // is not known until after a few chunks have already been pumped, or is // calculated on the fly. tr.setLength = onlength; tr.on('pipe', function(stream) { if (typeof length === 'number') return; // Support http module if (stream.readable && !stream.writable && stream.headers) { return onlength(parseInt(stream.headers['content-length'] || 0)); } // Support streams with a length property if (typeof stream.length === 'number') { return onlength(stream.length); } // Support request module stream.on('response', function(res) { if (!res || !res.headers) return; if (res.headers['content-encoding'] === 'gzip') return; if (res.headers['content-length']) { return onlength(parseInt(res.headers['content-length'])); } }); }); if (drain) tr.resume(); if (onprogress) tr.on('progress', onprogress); tr.progress = function() { update.speed = speed(0); update.eta = Math.round(update.remaining / update.speed); return update; }; return tr; };