_stream_writable.js 13 KB


  1. // Copyright Joyent, Inc. and other Node contributors.
  2. //
  3. // Permission is hereby granted, free of charge, to any person obtaining a
  4. // copy of this software and associated documentation files (the
  5. // "Software"), to deal in the Software without restriction, including
  6. // without limitation the rights to use, copy, modify, merge, publish,
  7. // distribute, sublicense, and/or sell copies of the Software, and to permit
  8. // persons to whom the Software is furnished to do so, subject to the
  9. // following conditions:
  10. //
  11. // The above copyright notice and this permission notice shall be included
  12. // in all copies or substantial portions of the Software.
  13. //
  14. // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
  15. // OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
  16. // MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
  17. // NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
  18. // DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
  19. // OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
  20. // USE OR OTHER DEALINGS IN THE SOFTWARE.
  21. // A bit simpler than readable streams.
  22. // Implement an async ._write(chunk, cb), and it'll handle all
  23. // the drain event emission and buffering.
  24. module.exports = Writable;
  25. /*<replacement>*/
  26. var Buffer = require('buffer').Buffer;
  27. /*</replacement>*/
  28. Writable.WritableState = WritableState;
  29. /*<replacement>*/
  30. var util = require('core-util-is');
  31. util.inherits = require('inherits');
  32. /*</replacement>*/
  33. var Stream = require('stream');
  34. util.inherits(Writable, Stream);
  35. function WriteReq(chunk, encoding, cb) {
  36. this.chunk = chunk;
  37. this.encoding = encoding;
  38. this.callback = cb;
  39. }
  40. function WritableState(options, stream) {
  41. var Duplex = require('./_stream_duplex');
  42. options = options || {};
  43. // the point at which write() starts returning false
  44. // Note: 0 is a valid value, means that we always return false if
  45. // the entire buffer is not flushed immediately on write()
  46. var hwm = options.highWaterMark;
  47. var defaultHwm = options.objectMode ? 16 : 16 * 1024;
  48. this.highWaterMark = (hwm || hwm === 0) ? hwm : defaultHwm;
  49. // object stream flag to indicate whether or not this stream
  50. // contains buffers or objects.
  51. this.objectMode = !!options.objectMode;
  52. if (stream instanceof Duplex)
  53. this.objectMode = this.objectMode || !!options.writableObjectMode;
  54. // cast to ints.
  55. this.highWaterMark = ~~this.highWaterMark;
  56. this.needDrain = false;
  57. // at the start of calling end()
  58. this.ending = false;
  59. // when end() has been called, and returned
  60. this.ended = false;
  61. // when 'finish' is emitted
  62. this.finished = false;
  63. // should we decode strings into buffers before passing to _write?
  64. // this is here so that some node-core streams can optimize string
  65. // handling at a lower level.
  66. var noDecode = options.decodeStrings === false;
  67. this.decodeStrings = !noDecode;
  68. // Crypto is kind of old and crusty. Historically, its default string
  69. // encoding is 'binary' so we have to make this configurable.
  70. // Everything else in the universe uses 'utf8', though.
  71. this.defaultEncoding = options.defaultEncoding || 'utf8';
  72. // not an actual buffer we keep track of, but a measurement
  73. // of how much we're waiting to get pushed to some underlying
  74. // socket or file.
  75. this.length = 0;
  76. // a flag to see when we're in the middle of a write.
  77. this.writing = false;
  78. // when true all writes will be buffered until .uncork() call
  79. this.corked = 0;
  80. // a flag to be able to tell if the onwrite cb is called immediately,
  81. // or on a later tick. We set this to true at first, because any
  82. // actions that shouldn't happen until "later" should generally also
  83. // not happen before the first write call.
  84. this.sync = true;
  85. // a flag to know if we're processing previously buffered items, which
  86. // may call the _write() callback in the same tick, so that we don't
  87. // end up in an overlapped onwrite situation.
  88. this.bufferProcessing = false;
  89. // the callback that's passed to _write(chunk,cb)
  90. this.onwrite = function(er) {
  91. onwrite(stream, er);
  92. };
  93. // the callback that the user supplies to write(chunk,encoding,cb)
  94. this.writecb = null;
  95. // the amount that is being written when _write is called.
  96. this.writelen = 0;
  97. this.buffer = [];
  98. // number of pending user-supplied write callbacks
  99. // this must be 0 before 'finish' can be emitted
  100. this.pendingcb = 0;
  101. // emit prefinish if the only thing we're waiting for is _write cbs
  102. // This is relevant for synchronous Transform streams
  103. this.prefinished = false;
  104. // True if the error was already emitted and should not be thrown again
  105. this.errorEmitted = false;
  106. }
  107. function Writable(options) {
  108. var Duplex = require('./_stream_duplex');
  109. // Writable ctor is applied to Duplexes, though they're not
  110. // instanceof Writable, they're instanceof Readable.
  111. if (!(this instanceof Writable) && !(this instanceof Duplex))
  112. return new Writable(options);
  113. this._writableState = new WritableState(options, this);
  114. // legacy.
  115. this.writable = true;
  116. Stream.call(this);
  117. }
  118. // Otherwise people can pipe Writable streams, which is just wrong.
  119. Writable.prototype.pipe = function() {
  120. this.emit('error', new Error('Cannot pipe. Not readable.'));
  121. };
  122. function writeAfterEnd(stream, state, cb) {
  123. var er = new Error('write after end');
  124. // TODO: defer error events consistently everywhere, not just the cb
  125. stream.emit('error', er);
  126. process.nextTick(function() {
  127. cb(er);
  128. });
  129. }
  130. // If we get something that is not a buffer, string, null, or undefined,
  131. // and we're not in objectMode, then that's an error.
  132. // Otherwise stream chunks are all considered to be of length=1, and the
  133. // watermarks determine how many objects to keep in the buffer, rather than
  134. // how many bytes or characters.
  135. function validChunk(stream, state, chunk, cb) {
  136. var valid = true;
  137. if (!util.isBuffer(chunk) &&
  138. !util.isString(chunk) &&
  139. !util.isNullOrUndefined(chunk) &&
  140. !state.objectMode) {
  141. var er = new TypeError('Invalid non-string/buffer chunk');
  142. stream.emit('error', er);
  143. process.nextTick(function() {
  144. cb(er);
  145. });
  146. valid = false;
  147. }
  148. return valid;
  149. }
  150. Writable.prototype.write = function(chunk, encoding, cb) {
  151. var state = this._writableState;
  152. var ret = false;
  153. if (util.isFunction(encoding)) {
  154. cb = encoding;
  155. encoding = null;
  156. }
  157. if (util.isBuffer(chunk))
  158. encoding = 'buffer';
  159. else if (!encoding)
  160. encoding = state.defaultEncoding;
  161. if (!util.isFunction(cb))
  162. cb = function() {};
  163. if (state.ended)
  164. writeAfterEnd(this, state, cb);
  165. else if (validChunk(this, state, chunk, cb)) {
  166. state.pendingcb++;
  167. ret = writeOrBuffer(this, state, chunk, encoding, cb);
  168. }
  169. return ret;
  170. };
  171. Writable.prototype.cork = function() {
  172. var state = this._writableState;
  173. state.corked++;
  174. };
  175. Writable.prototype.uncork = function() {
  176. var state = this._writableState;
  177. if (state.corked) {
  178. state.corked--;
  179. if (!state.writing &&
  180. !state.corked &&
  181. !state.finished &&
  182. !state.bufferProcessing &&
  183. state.buffer.length)
  184. clearBuffer(this, state);
  185. }
  186. };
  187. function decodeChunk(state, chunk, encoding) {
  188. if (!state.objectMode &&
  189. state.decodeStrings !== false &&
  190. util.isString(chunk)) {
  191. chunk = new Buffer(chunk, encoding);
  192. }
  193. return chunk;
  194. }
  195. // if we're already writing something, then just put this
  196. // in the queue, and wait our turn. Otherwise, call _write
  197. // If we return false, then we need a drain event, so set that flag.
  198. function writeOrBuffer(stream, state, chunk, encoding, cb) {
  199. chunk = decodeChunk(state, chunk, encoding);
  200. if (util.isBuffer(chunk))
  201. encoding = 'buffer';
  202. var len = state.objectMode ? 1 : chunk.length;
  203. state.length += len;
  204. var ret = state.length < state.highWaterMark;
  205. // we must ensure that previous needDrain will not be reset to false.
  206. if (!ret)
  207. state.needDrain = true;
  208. if (state.writing || state.corked)
  209. state.buffer.push(new WriteReq(chunk, encoding, cb));
  210. else
  211. doWrite(stream, state, false, len, chunk, encoding, cb);
  212. return ret;
  213. }
  214. function doWrite(stream, state, writev, len, chunk, encoding, cb) {
  215. state.writelen = len;
  216. state.writecb = cb;
  217. state.writing = true;
  218. state.sync = true;
  219. if (writev)
  220. stream._writev(chunk, state.onwrite);
  221. else
  222. stream._write(chunk, encoding, state.onwrite);
  223. state.sync = false;
  224. }
  225. function onwriteError(stream, state, sync, er, cb) {
  226. if (sync)
  227. process.nextTick(function() {
  228. state.pendingcb--;
  229. cb(er);
  230. });
  231. else {
  232. state.pendingcb--;
  233. cb(er);
  234. }
  235. stream._writableState.errorEmitted = true;
  236. stream.emit('error', er);
  237. }
  238. function onwriteStateUpdate(state) {
  239. state.writing = false;
  240. state.writecb = null;
  241. state.length -= state.writelen;
  242. state.writelen = 0;
  243. }
  244. function onwrite(stream, er) {
  245. var state = stream._writableState;
  246. var sync = state.sync;
  247. var cb = state.writecb;
  248. onwriteStateUpdate(state);
  249. if (er)
  250. onwriteError(stream, state, sync, er, cb);
  251. else {
  252. // Check if we're actually ready to finish, but don't emit yet
  253. var finished = needFinish(stream, state);
  254. if (!finished &&
  255. !state.corked &&
  256. !state.bufferProcessing &&
  257. state.buffer.length) {
  258. clearBuffer(stream, state);
  259. }
  260. if (sync) {
  261. process.nextTick(function() {
  262. afterWrite(stream, state, finished, cb);
  263. });
  264. } else {
  265. afterWrite(stream, state, finished, cb);
  266. }
  267. }
  268. }
  269. function afterWrite(stream, state, finished, cb) {
  270. if (!finished)
  271. onwriteDrain(stream, state);
  272. state.pendingcb--;
  273. cb();
  274. finishMaybe(stream, state);
  275. }
  276. // Must force callback to be called on nextTick, so that we don't
  277. // emit 'drain' before the write() consumer gets the 'false' return
  278. // value, and has a chance to attach a 'drain' listener.
  279. function onwriteDrain(stream, state) {
  280. if (state.length === 0 && state.needDrain) {
  281. state.needDrain = false;
  282. stream.emit('drain');
  283. }
  284. }
  285. // if there's something in the buffer waiting, then process it
  286. function clearBuffer(stream, state) {
  287. state.bufferProcessing = true;
  288. if (stream._writev && state.buffer.length > 1) {
  289. // Fast case, write everything using _writev()
  290. var cbs = [];
  291. for (var c = 0; c < state.buffer.length; c++)
  292. cbs.push(state.buffer[c].callback);
  293. // count the one we are adding, as well.
  294. // TODO(isaacs) clean this up
  295. state.pendingcb++;
  296. doWrite(stream, state, true, state.length, state.buffer, '', function(err) {
  297. for (var i = 0; i < cbs.length; i++) {
  298. state.pendingcb--;
  299. cbs[i](err);
  300. }
  301. });
  302. // Clear buffer
  303. state.buffer = [];
  304. } else {
  305. // Slow case, write chunks one-by-one
  306. for (var c = 0; c < state.buffer.length; c++) {
  307. var entry = state.buffer[c];
  308. var chunk = entry.chunk;
  309. var encoding = entry.encoding;
  310. var cb = entry.callback;
  311. var len = state.objectMode ? 1 : chunk.length;
  312. doWrite(stream, state, false, len, chunk, encoding, cb);
  313. // if we didn't call the onwrite immediately, then
  314. // it means that we need to wait until it does.
  315. // also, that means that the chunk and cb are currently
  316. // being processed, so move the buffer counter past them.
  317. if (state.writing) {
  318. c++;
  319. break;
  320. }
  321. }
  322. if (c < state.buffer.length)
  323. state.buffer = state.buffer.slice(c);
  324. else
  325. state.buffer.length = 0;
  326. }
  327. state.bufferProcessing = false;
  328. }
  329. Writable.prototype._write = function(chunk, encoding, cb) {
  330. cb(new Error('not implemented'));
  331. };
  332. Writable.prototype._writev = null;
  333. Writable.prototype.end = function(chunk, encoding, cb) {
  334. var state = this._writableState;
  335. if (util.isFunction(chunk)) {
  336. cb = chunk;
  337. chunk = null;
  338. encoding = null;
  339. } else if (util.isFunction(encoding)) {
  340. cb = encoding;
  341. encoding = null;
  342. }
  343. if (!util.isNullOrUndefined(chunk))
  344. this.write(chunk, encoding);
  345. // .end() fully uncorks
  346. if (state.corked) {
  347. state.corked = 1;
  348. this.uncork();
  349. }
  350. // ignore unnecessary end() calls.
  351. if (!state.ending && !state.finished)
  352. endWritable(this, state, cb);
  353. };
  354. function needFinish(stream, state) {
  355. return (state.ending &&
  356. state.length === 0 &&
  357. !state.finished &&
  358. !state.writing);
  359. }
  360. function prefinish(stream, state) {
  361. if (!state.prefinished) {
  362. state.prefinished = true;
  363. stream.emit('prefinish');
  364. }
  365. }
  366. function finishMaybe(stream, state) {
  367. var need = needFinish(stream, state);
  368. if (need) {
  369. if (state.pendingcb === 0) {
  370. prefinish(stream, state);
  371. state.finished = true;
  372. stream.emit('finish');
  373. } else
  374. prefinish(stream, state);
  375. }
  376. return need;
  377. }
  378. function endWritable(stream, state, cb) {
  379. state.ending = true;
  380. finishMaybe(stream, state);
  381. if (cb) {
  382. if (state.finished)
  383. process.nextTick(cb);
  384. else
  385. stream.once('finish', cb);
  386. }
  387. state.ended = true;
  388. }