Skip to main content
Module

std/node/_stream/writable.ts

Deno standard library
Go to Latest
File
// Copyright Node.js contributors. All rights reserved. MIT License.import { Buffer } from "../buffer.ts";import Stream from "./stream.ts";import { captureRejectionSymbol } from "../events.ts";import { ERR_INVALID_ARG_TYPE, ERR_INVALID_OPT_VALUE, ERR_METHOD_NOT_IMPLEMENTED, ERR_STREAM_ALREADY_FINISHED, ERR_STREAM_CANNOT_PIPE, ERR_STREAM_DESTROYED, ERR_STREAM_NULL_VALUES, ERR_STREAM_WRITE_AFTER_END, ERR_UNKNOWN_ENCODING,} from "../_errors.ts";import type { AfterWriteTick, writeV } from "./writable_internal.ts";import { clearBuffer, destroy, errorBuffer, errorOrDestroy, finishMaybe, kOnFinished, nop, onwrite, resetBuffer, writeOrBuffer,} from "./writable_internal.ts";import type { Encodings } from "../_utils.ts";
type WritableEncodings = Encodings | "buffer";
export interface WritableOptions { autoDestroy?: boolean; decodeStrings?: boolean; defaultEncoding?: WritableEncodings; destroy?( this: Writable, error: Error | null, callback: (error: Error | null) => void, ): void; emitClose?: boolean; final?(this: Writable, callback: (error?: Error | null) => void): void; highWaterMark?: number; objectMode?: boolean; write?( this: Writable, // deno-lint-ignore no-explicit-any chunk: any, encoding: WritableEncodings, callback: (error?: Error | null) => void, ): void; writev?( this: Writable, // deno-lint-ignore no-explicit-any chunks: Array<{ chunk: any; encoding: string }>, callback: (error?: Error | null) => void, ): void;}
export class WritableState { [kOnFinished]: Array<(error?: Error) => void> = []; afterWriteTickInfo: null | AfterWriteTick = null; allBuffers = true; allNoop = true; autoDestroy: boolean; buffered: Array<{ allBuffers?: boolean; // deno-lint-ignore no-explicit-any chunk: any; encoding: string; callback: (error: Error) => void; }> = []; bufferedIndex = 0; bufferProcessing = false; closed = false; closeEmitted = false; constructed: boolean; corked = 0; decodeStrings: boolean; defaultEncoding: WritableEncodings; destroyed = false; emitClose: boolean; ended = false; ending = false; errored: Error | null = null; errorEmitted = false; finalCalled = false; finished = false; highWaterMark: number; length = 0; needDrain = false; objectMode: boolean; onwrite: (error?: Error | null) => void; pendingcb = 0; prefinished = false; sync = true; writecb: null | ((error: Error) => void) = null; writable = true; writelen = 0; writing = false;
constructor(options: WritableOptions | undefined, stream: Writable) { this.objectMode = !!options?.objectMode;
this.highWaterMark = options?.highWaterMark ?? (this.objectMode ? 16 : 16 * 1024);
if (Number.isInteger(this.highWaterMark) && this.highWaterMark >= 0) { this.highWaterMark = Math.floor(this.highWaterMark); } else { throw new ERR_INVALID_OPT_VALUE("highWaterMark", this.highWaterMark); }
this.decodeStrings = !options?.decodeStrings === false;
this.defaultEncoding = options?.defaultEncoding || "utf8";
this.onwrite = onwrite.bind(undefined, stream);
resetBuffer(this);
this.emitClose = options?.emitClose ?? true; this.autoDestroy = options?.autoDestroy ?? true; this.constructed = true; }
getBuffer() { return this.buffered.slice(this.bufferedIndex); }
get bufferedRequestCount() { return this.buffered.length - this.bufferedIndex; }}
/** A bit simpler than readable streams.* Implement an async `._write(chunk, encoding, cb)`, and it'll handle all* the drain event emission and buffering.*/class Writable extends Stream { _final?: ( this: Writable, callback: (error?: Error | null | undefined) => void, ) => void; _writableState: WritableState; _writev?: writeV | null = null;
constructor(options?: WritableOptions) { super(); this._writableState = new WritableState(options, this);
if (options) { if (typeof options.write === "function") { this._write = options.write; }
if (typeof options.writev === "function") { this._writev = options.writev; }
if (typeof options.destroy === "function") { this._destroy = options.destroy; }
if (typeof options.final === "function") { this._final = options.final; } } }
[captureRejectionSymbol](err?: Error) { this.destroy(err); }
static WritableState = WritableState;
get destroyed() { return this._writableState ? this._writableState.destroyed : false; }
set destroyed(value) { if (this._writableState) { this._writableState.destroyed = value; } }
get writable() { const w = this._writableState; return !w.destroyed && !w.errored && !w.ending && !w.ended; }
set writable(val) { if (this._writableState) { this._writableState.writable = !!val; } }
get writableFinished() { return this._writableState ? this._writableState.finished : false; }
get writableObjectMode() { return this._writableState ? this._writableState.objectMode : false; }
get writableBuffer() { return this._writableState && this._writableState.getBuffer(); }
get writableEnded() { return this._writableState ? this._writableState.ending : false; }
get writableHighWaterMark() { return this._writableState && this._writableState.highWaterMark; }
get writableCorked() { return this._writableState ? this._writableState.corked : 0; }
get writableLength() { return this._writableState && this._writableState.length; }
_undestroy() { const w = this._writableState; w.constructed = true; w.destroyed = false; w.closed = false; w.closeEmitted = false; w.errored = null; w.errorEmitted = false; w.ended = false; w.ending = false; w.finalCalled = false; w.prefinished = false; w.finished = false; }
_destroy(err: Error | null, cb: (error?: Error | null) => void) { cb(err); }
destroy(err?: Error | null, cb?: () => void) { const state = this._writableState; if (!state.destroyed) { queueMicrotask(() => errorBuffer(state)); } destroy.call(this, err, cb); return this; }
end(cb?: () => void): void; // deno-lint-ignore no-explicit-any end(chunk: any, cb?: () => void): void; // deno-lint-ignore no-explicit-any end(chunk: any, encoding: WritableEncodings, cb?: () => void): void;
end( // deno-lint-ignore no-explicit-any x?: any | (() => void), y?: WritableEncodings | (() => void), z?: () => void, ) { const state = this._writableState; // deno-lint-ignore no-explicit-any let chunk: any | null; let encoding: WritableEncodings | null; let cb: undefined | ((error?: Error) => void);
if (typeof x === "function") { chunk = null; encoding = null; cb = x; } else if (typeof y === "function") { chunk = x; encoding = null; cb = y; } else { chunk = x; encoding = y as WritableEncodings; cb = z; }
if (chunk !== null && chunk !== undefined) { this.write(chunk, encoding); }
if (state.corked) { state.corked = 1; this.uncork(); }
let err: Error | undefined; if (!state.errored && !state.ending) { state.ending = true; finishMaybe(this, state, true); state.ended = true; } else if (state.finished) { err = new ERR_STREAM_ALREADY_FINISHED("end"); } else if (state.destroyed) { err = new ERR_STREAM_DESTROYED("end"); }
if (typeof cb === "function") { if (err || state.finished) { queueMicrotask(() => { (cb as (error?: Error | undefined) => void)(err); }); } else { state[kOnFinished].push(cb); } }
return this; }
_write( // deno-lint-ignore no-explicit-any chunk: any, encoding: string, cb: (error?: Error | null) => void, ): void { if (this._writev) { this._writev([{ chunk, encoding }], cb); } else { throw new ERR_METHOD_NOT_IMPLEMENTED("_write()"); } }
//This signature was changed to keep inheritance coherent pipe(dest: Writable): Writable { errorOrDestroy(this, new ERR_STREAM_CANNOT_PIPE()); return dest; }
// deno-lint-ignore no-explicit-any write(chunk: any, cb?: (error: Error | null | undefined) => void): boolean; write( // deno-lint-ignore no-explicit-any chunk: any, encoding: WritableEncodings | null, cb?: (error: Error | null | undefined) => void, ): boolean;
write( // deno-lint-ignore no-explicit-any chunk: any, x?: WritableEncodings | null | ((error: Error | null | undefined) => void), y?: ((error: Error | null | undefined) => void), ) { const state = this._writableState; let encoding: WritableEncodings; let cb: (error?: Error | null) => void;
if (typeof x === "function") { cb = x; encoding = state.defaultEncoding; } else { if (!x) { encoding = state.defaultEncoding; } else if (x !== "buffer" && !Buffer.isEncoding(x)) { throw new ERR_UNKNOWN_ENCODING(x); } else { encoding = x; } if (typeof y !== "function") { cb = nop; } else { cb = y; } }
if (chunk === null) { throw new ERR_STREAM_NULL_VALUES(); } else if (!state.objectMode) { if (typeof chunk === "string") { if (state.decodeStrings !== false) { chunk = Buffer.from(chunk, encoding); encoding = "buffer"; } } else if (chunk instanceof Buffer) { encoding = "buffer"; } else if (Stream._isUint8Array(chunk)) { chunk = Stream._uint8ArrayToBuffer(chunk); encoding = "buffer"; } else { throw new ERR_INVALID_ARG_TYPE( "chunk", ["string", "Buffer", "Uint8Array"], chunk, ); } }
let err: Error | undefined; if (state.ending) { err = new ERR_STREAM_WRITE_AFTER_END(); } else if (state.destroyed) { err = new ERR_STREAM_DESTROYED("write"); }
if (err) { queueMicrotask(() => cb(err)); errorOrDestroy(this, err, true); return false; } state.pendingcb++; return writeOrBuffer(this, state, chunk, encoding, cb); }
cork() { this._writableState.corked++; }
uncork() { const state = this._writableState;
if (state.corked) { state.corked--;
if (!state.writing) { clearBuffer(this, state); } } }
setDefaultEncoding(encoding: string) { // node::ParseEncoding() requires lower case. if (typeof encoding === "string") { encoding = encoding.toLowerCase(); } if (!Buffer.isEncoding(encoding)) { throw new ERR_UNKNOWN_ENCODING(encoding); } this._writableState.defaultEncoding = encoding as WritableEncodings; return this; }}
export default Writable;