Skip to main content
Module

std/node/internal/streams/from.mjs

Deno standard library
Go to Latest
File
// Copyright 2018-2022 the Deno authors. All rights reserved. MIT license.// Copyright Joyent and Node contributors. All rights reserved. MIT license.// deno-lint-ignore-file
import { Buffer } from "../../buffer.ts";import { ERR_INVALID_ARG_TYPE, ERR_STREAM_NULL_VALUES } from "../errors.ts";import * as process from "../../_process/process.ts";
function _from(Readable, iterable, opts) { let iterator; if (typeof iterable === "string" || iterable instanceof Buffer) { return new Readable({ objectMode: true, ...opts, read() { this.push(iterable); this.push(null); }, }); }
let isAsync; if (iterable && iterable[Symbol.asyncIterator]) { isAsync = true; iterator = iterable[Symbol.asyncIterator](); } else if (iterable && iterable[Symbol.iterator]) { isAsync = false; iterator = iterable[Symbol.iterator](); } else { throw new ERR_INVALID_ARG_TYPE("iterable", ["Iterable"], iterable); }
const readable = new Readable({ objectMode: true, highWaterMark: 1, // TODO(ronag): What options should be allowed? ...opts, });
// Flag to protect against _read // being called before last iteration completion. let reading = false;
readable._read = function () { if (!reading) { reading = true; next(); } };
readable._destroy = function (error, cb) { close(error).then( () => process.nextTick(cb, error), // nextTick is here in case cb throws (e) => process.nextTick(cb, e || error), ); };
async function close(error) { const hadError = (error !== undefined) && (error !== null); const hasThrow = typeof iterator.throw === "function"; if (hadError && hasThrow) { const { value, done } = await iterator.throw(error); await value; if (done) { return; } } if (typeof iterator.return === "function") { const { value } = await iterator.return(); await value; } }
async function next() { for (;;) { try { const { value, done } = isAsync ? await iterator.next() : iterator.next();
if (done) { readable.push(null); } else { const res = (value && typeof value.then === "function") ? await value : value; if (res === null) { reading = false; throw new ERR_STREAM_NULL_VALUES(); } else if (readable.push(res)) { continue; } else { reading = false; } } } catch (err) { readable.destroy(err); } break; } } return readable;}
export default _from;