import { ObjectId } from "../../deps.ts";import { Collection } from "../collection/collection.ts";import { FindCursor } from "../collection/commands/find.ts";import { Database } from "../database.ts";import { MongoRuntimeError } from "../error.ts";import { Filter } from "../types.ts";import { Chunk, File, FileId, GridFSBucketOptions, GridFSFindOptions, GridFSUploadOptions,} from "../types/gridfs.ts";import { checkIndexes } from "./indexes.ts";import { createUploadStream } from "./upload.ts";
export class GridFSBucket { #chunksCollection: Collection<Chunk>; #filesCollection: Collection<File>; #chunkSizeBytes: number; #checkedIndexes = false;
private readonly getBucketData = () => ({ filesCollection: this.#filesCollection, chunksCollection: this.#chunksCollection, chunkSizeBytes: this.#chunkSizeBytes, });
constructor(db: Database, options: GridFSBucketOptions = {}) { const newLocal = options.bucketName ?? "fs"; this.#chunksCollection = db.collection(`${newLocal}.chunks`); this.#filesCollection = db.collection(`${newLocal}.files`); this.#chunkSizeBytes = options.chunkSizeBytes ?? 255 * 1024; }
openUploadStream( filename: string, options?: GridFSUploadOptions, ): Promise<WritableStream<Uint8Array>> { return this.openUploadStreamWithId( new ObjectId(), filename, options, ); }
async openUploadStreamWithId( id: FileId, filename: string, options?: GridFSUploadOptions, ): Promise<WritableStream<Uint8Array>> { if (!this.#checkedIndexes) await this.#checkIndexes(); return createUploadStream(this.getBucketData(), filename, id, options); }
async uploadFromStream( filename: string, source: ReadableStream, options?: GridFSUploadOptions, ): Promise<ObjectId> { const objectid = new ObjectId(); await source.pipeTo( await this.openUploadStreamWithId(objectid, filename, options), ); return objectid; }
async uploadFromStreamWithId( id: FileId, filename: string, source: ReadableStream, options: GridFSUploadOptions, ): Promise<void> { await source.pipeTo( await this.openUploadStreamWithId(id, filename, options), ); }
async openDownloadStream(id: FileId): Promise<ReadableStream<Uint8Array>> { if (!this.#checkedIndexes) await this.#checkIndexes();
return new ReadableStream<Uint8Array>({ start: async (controller) => { const collection = this.#chunksCollection.find({ files_id: id }); await collection.forEach((value) => controller.enqueue(value?.data.buffer) ); controller.close(); }, }); }
async downloadToStream(id: FileId, destination: WritableStream<Uint8Array>) { await (await this.openDownloadStream(id)).pipeTo(destination); }
async delete(id: FileId) { await this.#filesCollection.deleteOne({ _id: id }); const response = await this.#chunksCollection.deleteMany({ files_id: id }); if (!response) { throw new MongoRuntimeError(`File not found for id ${id}`); } }
find( filter: Filter<File>, options: GridFSFindOptions = {}, ): FindCursor<File> { return this.#filesCollection.find(filter ?? {}, options); }
async drop() { await this.#filesCollection.drop(); await this.#chunksCollection.drop(); }
#checkIndexes = () => checkIndexes( this.#filesCollection, this.#chunksCollection, (value) => (this.#checkedIndexes = value), );}