import { Cmp } from "../util-types.ts";import { DocBase, LocalIndex, Path, ShareAddress,} from "../../util/doc-types.ts";import { Query } from "../../query/query-types.ts";import { IReplicaDocDriver } from "../replica-types.ts";import { isErr, ReplicaIsClosedError, ValidationError,} from "../../util/errors.ts";
import { compareArrays, compareByObjKey, sortedInPlace } from "../compare.ts";import { cleanUpQuery, docIsExpired, docMatchesFilter,} from "../../query/query.ts";
import { Logger } from "../../util/log.ts";import { checkShareIsValid } from "../../core-validators/addresses.ts";
let logger = new Logger("storage driver async memory", "yellow");
function combinePathAndAuthor<DocType extends DocBase<string>>(doc: DocType) { return `${doc.path}|${doc.author}`;}
function docComparePathASCthenNewestFirst<DocType extends DocBase<string>>( a: DocType, b: DocType,): Cmp { return compareArrays( [a.path, a.timestamp, a.signature], [b.path, b.timestamp, a.signature], ["ASC", "DESC", "ASC"], );}
function docComparePathDESCthenNewestFirst<DocType extends DocBase<string>>( a: DocType, b: DocType,): Cmp { return compareArrays( [a.path, a.timestamp, a.signature], [b.path, b.timestamp, a.signature], ["DESC", "DESC", "ASC"], );}
export class DocDriverMemory implements IReplicaDocDriver { share: ShareAddress; _maxLocalIndex: LocalIndex = -1; _isClosed: boolean = false; _configKv: Record<string, string> = {};
docByPathAndAuthor: Map<string, DocBase<string>> = new Map(); docsByPathNewestFirst: Map<Path, DocBase<string>[]> = new Map(); latestDocsByPath: Map<string, DocBase<string>> = new Map();
constructor(share: ShareAddress) { logger.debug("constructor");
const addressIsValidResult = checkShareIsValid(share);
if (isErr(addressIsValidResult)) { throw addressIsValidResult; }
this.share = share; }
isClosed(): boolean { return this._isClosed; } close(erase: boolean) { logger.debug("close"); if (this._isClosed) throw new ReplicaIsClosedError(); if (erase) { logger.debug("...close: and erase"); this._configKv = {}; this._maxLocalIndex = -1; this.docsByPathNewestFirst.clear(); this.docByPathAndAuthor.clear(); } this._isClosed = true; logger.debug("...close is done.");
return Promise.resolve(); }
async getConfig(key: string): Promise<string | undefined> { if (this._isClosed) throw new ReplicaIsClosedError(); return this._configKv[key]; } async setConfig(key: string, value: string): Promise<void> { if (this._isClosed) throw new ReplicaIsClosedError(); this._configKv[key] = value; } async listConfigKeys(): Promise<string[]> { if (this._isClosed) throw new ReplicaIsClosedError(); return sortedInPlace(Object.keys(this._configKv)); } async deleteConfig(key: string): Promise<boolean> { if (this._isClosed) throw new ReplicaIsClosedError(); let had = (key in this._configKv); delete this._configKv[key]; return had; }
getMaxLocalIndex() { if (this._isClosed) throw new ReplicaIsClosedError(); logger.debug(`getMaxLocalIndex(): it's ${this._maxLocalIndex}`); return Promise.resolve(this._maxLocalIndex); }
async _getAllDocs(): Promise<DocBase<string>[]> { if (this._isClosed) throw new ReplicaIsClosedError(); return [...this.docByPathAndAuthor.values()]; } async _getLatestDocs(): Promise<DocBase<string>[]> { if (this._isClosed) throw new ReplicaIsClosedError();
return Array.from(this.latestDocsByPath.values()); }
async queryDocs( queryToClean: Query<string[]>, ): Promise<DocBase<string>[]> {
logger.debug("queryDocs", queryToClean); if (this._isClosed) throw new ReplicaIsClosedError();
const { query, willMatch } = cleanUpQuery(queryToClean); logger.debug(` cleanUpQuery. willMatch = ${willMatch}`); if (willMatch === "nothing") { return []; }
if (query.historyMode === "latest" && query.filter?.path) { const maybeDoc = this.latestDocsByPath.get(query.filter.path);
if ( maybeDoc && maybeDoc.deleteAfter && maybeDoc.deleteAfter < Date.now() * 1000 ) { return []; }
return maybeDoc ? [maybeDoc] : []; }
if (query.historyMode === "all" && query.filter?.path) { const maybeDocs = this.docsByPathNewestFirst.get( query.filter.path, );
if (maybeDocs) { const notExpired = [];
for (const doc of maybeDocs) { if (doc.deleteAfter === null || doc.deleteAfter === undefined) { notExpired.push(doc); }
if (doc.deleteAfter && doc.deleteAfter > Date.now() * 1000) { notExpired.push(doc); } }
return notExpired; }
return []; }
logger.debug(` getting docs; historyMode = ${query.historyMode}`); const docs = query.historyMode === "all" ? await this._getAllDocs() : await this._getLatestDocs();
const filteredDocs: DocBase<string>[] = []; logger.debug(` filtering docs`);
const microNow = Date.now() * 1000;
for (const doc of docs) { if (query.orderBy === "path ASC") { if (query.startAfter !== undefined) { if ( query.startAfter.path !== undefined && doc.path <= query.startAfter.path ) { continue; } } } if (query.orderBy === "path DESC") { if (query.startAfter !== undefined) { if ( query.startAfter.path !== undefined && doc.path >= query.startAfter.path ) { continue; } } } if (query.orderBy === "localIndex ASC") { if (query.startAfter !== undefined) { if ( query.startAfter.localIndex !== undefined && (doc._localIndex ?? 0) <= query.startAfter.localIndex ) { continue; } } } if (query.orderBy === "localIndex DESC") { if (query.startAfter !== undefined) { if ( query.startAfter.localIndex !== undefined && (doc._localIndex ?? 0) >= query.startAfter.localIndex ) { continue; } } }
if (doc.deleteAfter && doc.deleteAfter < microNow) { continue; }
if (query.filter && !docMatchesFilter(doc, query.filter)) continue;
filteredDocs.push(doc); }
logger.debug(` ordering docs: ${query.orderBy}`); if (query.orderBy === "path ASC") { filteredDocs.sort(docComparePathASCthenNewestFirst); } else if (query.orderBy === "path DESC") { filteredDocs.sort(docComparePathDESCthenNewestFirst); } else if (query.orderBy === "localIndex ASC") { filteredDocs.sort(compareByObjKey("_localIndex", "ASC")); } else if (query.orderBy === "localIndex DESC") { filteredDocs.sort(compareByObjKey("_localIndex", "DESC")); } else if (query.orderBy) { throw new ValidationError( "unrecognized query orderBy: " + JSON.stringify(query.orderBy), ); }
if ( query.limit !== undefined && docs.length >= query.limit ) { return filteredDocs.slice(0, query.limit); }
logger.debug( ` queryDocs is done: found ${filteredDocs.length} docs.`, ); return filteredDocs; }
upsert<DocType extends DocBase<string>>( doc: DocType, ): Promise<DocType> {
if (this._isClosed) throw new ReplicaIsClosedError();
doc = { ...doc }; this._maxLocalIndex += 1; doc._localIndex = this._maxLocalIndex; Object.freeze(doc); logger.debug("upsert", doc);
this.docByPathAndAuthor.set(combinePathAndAuthor(doc), doc);
let docsByPath = this.docsByPathNewestFirst.get(doc.path) ?? []; docsByPath = docsByPath.filter((d) => d.author !== doc.author); docsByPath.push(doc); docsByPath.sort(docComparePathASCthenNewestFirst); this.docsByPathNewestFirst.set(doc.path, docsByPath);
const latestDoc = docsByPath[0]; this.latestDocsByPath.set(doc.path, latestDoc);
return Promise.resolve(doc); }
eraseExpiredDocs() { const expiredDocs = [];
for (const [, doc] of this.docByPathAndAuthor) { if (docIsExpired(doc)) { expiredDocs.push(doc); } }
for (const expiredDoc of expiredDocs) { this.docsByPathNewestFirst.delete(expiredDoc.path); this.latestDocsByPath.delete(expiredDoc.path); this.docByPathAndAuthor.delete(combinePathAndAuthor(expiredDoc)); }
return Promise.resolve(expiredDocs); }}