import { fast_json_stable_stringify as stringify, shallowEqualArrays, shallowEqualObjects,} from "../../deps.ts";import { AuthorAddress, DocAttachment, DocBase, DocWithAttachment, Path,} from "../util/doc-types.ts";import { isErr, ReplicaCacheIsClosedError, ReplicaIsClosedError, ValidationError,} from "../util/errors.ts";import { cleanUpQuery, docMatchesFilter } from "../query/query.ts";import { Query } from "../query/query-types.ts";import { AttachmentIngestEvent, ExpireEvent, IngestEvent, IngestEventSuccess, QuerySourceEvent,} from "./replica-types.ts";import { Logger } from "../util/log.ts";import { CallbackSink } from "../streams/stream_utils.ts";import { Replica } from "./replica.ts";
import { DEFAULT_FORMAT, DEFAULT_FORMATS, getFormatLookup,} from "../formats/util.ts";import { DefaultFormat, DefaultFormats, FormatArg, FormatDocType, FormatInputType, FormatsArg,} from "../formats/format_types.ts";import { AuthorKeypair } from "../crypto/crypto-types.ts";
const logger = new Logger("replica-cache", "green");
function justLocalIndex({ _localIndex }: DocBase<string>) { return _localIndex;}
function sortAndLimit<DocType extends DocBase<string>>( query: Query<string[]>, docs: DocType[],) { const filteredDocs: DocType[] = [];
for (const doc of docs) { if (query.orderBy === "path ASC") { if (query.startAfter !== undefined) { if ( query.startAfter.path !== undefined && doc.path <= query.startAfter.path ) { continue; } } } if (query.orderBy === "path DESC") { if (query.startAfter !== undefined) { if ( query.startAfter.path !== undefined && doc.path >= query.startAfter.path ) { continue; } } } if (query.orderBy === "localIndex ASC") { if (query.startAfter !== undefined) { if ( query.startAfter.localIndex !== undefined && (doc._localIndex || 0) <= query.startAfter.localIndex ) { continue; } } } if (query.orderBy === "localIndex DESC") { if (query.startAfter !== undefined) { if ( query.startAfter.localIndex !== undefined && (doc._localIndex || 0) >= query.startAfter.localIndex ) { continue; } } }
filteredDocs.push(doc);
if (query.limit !== undefined && filteredDocs.length >= query.limit) { break; } }
return filteredDocs;}
type DocsCacheEntry<DocType> = { docs: DocType[]; stream: ReadableStream<QuerySourceEvent<DocBase<string>>>; expires: number; close: () => void;};
type AttachmentCacheEntry<F> = { expires: number; attachment: DocAttachment | undefined | ValidationError; format: FormatArg<F>;};
export class ReplicaCache { version = 0;
private replica: Replica;
private docCache = new Map< string, DocsCacheEntry<DocBase<string>> >();
private timeToLive: number;
private onCacheUpdatedCallbacks = new Set<() => void>();
private closed = false;
private onFireCacheUpdatedsWrapper = (cb: () => void) => cb();
constructor( replica: Replica, timeToLive?: number, onCacheUpdatedWrapper?: (cb: () => void) => void, ) { this.replica = replica; this.timeToLive = timeToLive || 1000;
if (onCacheUpdatedWrapper) { this.onFireCacheUpdatedsWrapper = onCacheUpdatedWrapper; }
const onReplicaEvent = this.onReplicaEvent.bind(this); const close = this.close.bind(this); const isClosed = this.isClosed.bind(this);
this.replica.getEventStream("*").pipeTo( new WritableStream({ async write(event) { if ( event.kind === "attachment_ingest" || event.kind === "success" || event.kind === "expire" ) { await onReplicaEvent(event); } else if (event.kind === "willClose" && !isClosed()) { await close(); } }, }), ).catch(() => {}); }
async close() { if (this.closed) throw new ReplicaCacheIsClosedError(); this.closed = true;
await Promise.all( Array.from(this.docCache.values()).map((entry) => entry.close()), );
this.docCache.clear();
this.onCacheUpdatedCallbacks.clear(); }
isClosed() { return this.closed; }
getAllDocs<F = DefaultFormats>( formats?: FormatsArg<F>, ): FormatDocType<F>[] { return this.queryDocs({ historyMode: "all", orderBy: "path DESC", }, formats); }
getLatestDocs<F = DefaultFormats>( formats?: FormatsArg<F>, ): FormatDocType<F>[] { return this.queryDocs({ historyMode: "latest", orderBy: "path DESC", }, formats); }
getAllDocsAtPath<F = DefaultFormats>( path: Path, formats?: FormatsArg<F>, ): FormatDocType<F>[] { return this.queryDocs({ historyMode: "all", orderBy: "path DESC", filter: { path: path }, }, formats); }
getLatestDocAtPath<F = DefaultFormat>( path: Path, format?: FormatArg<F>, ): FormatDocType<F> | undefined { const docs = this.queryDocs({ historyMode: "latest", orderBy: "path DESC", filter: { path: path }, }, format ? [format] : undefined); if (docs.length === 0) { return undefined; } return docs[0] as FormatDocType<F>; }
queryDocs<F = DefaultFormats>( query: Omit<Query<[string]>, "formats"> = {}, formats?: FormatsArg<F>, ): FormatDocType<F>[] { if (this.closed) throw new ReplicaCacheIsClosedError(); if (this.replica.isClosed()) { throw new ReplicaIsClosedError(); }
const f = formats ? formats : DEFAULT_FORMATS; const queryWithFormats = { ...query, formats: f.map((f) => f.id), };
const cleanUpQueryResult = cleanUpQuery(queryWithFormats);
if (cleanUpQueryResult.willMatch === "nothing") { return []; }
const queryString = stringify(cleanUpQueryResult.query);
const cachedResult = this.docCache.get(queryString);
if (cachedResult) { if (Date.now() > cachedResult.expires) { this.replica.queryDocs(query, formats).then((docs) => { const localIndexes = docs.map(justLocalIndex).sort(); const cacheLocalIndexes = cachedResult.docs.map(justLocalIndex) .sort();
if (shallowEqualArrays(localIndexes, cacheLocalIndexes)) { return; }
this.docCache.set(queryString, { stream: cachedResult.stream, close: cachedResult.close, docs: docs, expires: Date.now() + this.timeToLive, });
logger.debug("Updated cache because result expired."); this.fireOnCacheUpdateds(); }); }
return cachedResult.docs as FormatDocType<F>[]; }
const stream = this.replica.getQueryStream(query, "new", formats);
const callbackSink = new CallbackSink< QuerySourceEvent<DocBase<string>> >();
const unsub = callbackSink.onWrite((event) => { if (event.kind === "existing" || event.kind === "success") { logger.debug({ doc: event.doc.path, queryString }); this._updateCache(queryString, event.doc); } });
const callbackStream = new WritableStream(callbackSink);
const abortController = new AbortController();
stream.pipeTo(callbackStream, { signal: abortController.signal });
const close = () => { unsub(); };
this.docCache.set(queryString, { stream, docs: [], expires: Date.now() + this.timeToLive, close, });
this.replica.queryDocs(queryWithFormats).then((docs) => { this.docCache.set(queryString, { stream, close, docs: docs, expires: Date.now() + this.timeToLive, }); logger.debug("Updated cache with a new entry."); this.fireOnCacheUpdateds(); });
return []; }
queryPaths<F = DefaultFormats>( query: Omit<Query<[string]>, "formats"> = {}, formats?: FormatsArg<F>, ): Path[] { const docs = this.queryDocs(query, formats); const pathsSet = new Set(docs.map(({ path }) => path)); return Array.from(pathsSet).sort(); }
queryAuthors<F = DefaultFormats>( query: Omit<Query<[string]>, "formats"> = {}, formats?: FormatsArg<F>, ): AuthorAddress[] { const docs = this.queryDocs(query, formats); const authorsSet = new Set(docs.map(({ author }) => author)); return Array.from(authorsSet).sort(); }
_updateCache(key: string, doc: DocBase<string>): void { const entry = this.docCache.get(key);
if (!entry) { return; }
const query: Query<string[]> = JSON.parse(key);
const appendDoc = () => { const nextDocs = [...entry.docs, doc]; this.docCache.set(key, { ...entry, docs: sortAndLimit(query, nextDocs), }); this.fireOnCacheUpdateds(); };
const replaceDoc = ({ exact }: { exact: boolean }) => { const nextDocs = entry.docs.map((existingDoc) => { if ( exact && existingDoc.path === doc.path && existingDoc.author === doc.author ) { return doc; } else if (!exact && existingDoc.path === doc.path) { return doc; }
return existingDoc; });
this.docCache.set(key, { ...entry, docs: sortAndLimit(query, nextDocs), }); this.fireOnCacheUpdateds(); };
const documentsWithSamePath = entry.docs.filter( (existingDoc) => existingDoc.path === doc.path, );
const documentsWithSamePathAndAuthor = entry.docs.filter( (existingDoc) => existingDoc.path === doc.path && existingDoc.author === doc.author, );
if (documentsWithSamePath.length === 0) { if ( (query.filter && docMatchesFilter(doc, query.filter)) || !query.filter ) { logger.debug( "Updated cache after appending a doc to a entry with matching filter.", ); appendDoc(); } return; }
const historyMode = query.historyMode || "latest";
if (historyMode === "all") { if (documentsWithSamePathAndAuthor.length === 0) { logger.debug( "Updated cache after appending a version of a doc to a historyMode: all query.", ); appendDoc(); return; }
logger.debug( "Updated cache after replacing a version of a doc in a historyMode: all query.", ); replaceDoc({ exact: true }); return; }
const latestDoc = documentsWithSamePath[0];
const docIsDifferent = doc.author !== latestDoc?.author || !shallowEqualObjects(doc, latestDoc);
const docIsLater = doc.timestamp > latestDoc.timestamp;
if (docIsDifferent && docIsLater) { logger.debug( "Updated cache after replacing a doc with its latest version.", ); replaceDoc({ exact: false }); return; } }
private fireOnCacheUpdateds() { this.version++;
this.onFireCacheUpdatedsWrapper(() => { this.onCacheUpdatedCallbacks.forEach((cb) => { cb(); }); }); }
onCacheUpdated(callback: () => void): () => void { if (this.closed) throw new ReplicaCacheIsClosedError(); if (this.replica.isClosed()) { throw new ReplicaIsClosedError(); }
this.onCacheUpdatedCallbacks.add(callback);
return () => { this.onCacheUpdatedCallbacks.delete(callback); }; }
set<F = DefaultFormat>( keypair: AuthorKeypair, docToSet: Omit<FormatInputType<F>, "format">, format?: FormatArg<F>, ): Promise< IngestEvent<FormatDocType<F>> | ValidationError > { if (this.closed) throw new ReplicaCacheIsClosedError();
return this.replica.set(keypair, docToSet, format); }
overwriteAllDocsByAuthor<F = DefaultFormat>( keypair: AuthorKeypair, format?: FormatArg<F>, ) { if (this.closed) throw new ReplicaCacheIsClosedError(); if (this.replica.isClosed()) { throw new ReplicaIsClosedError(); } return this.replica.overwriteAllDocsByAuthor( keypair, format, ); }
wipeDocAtPath<F = DefaultFormat>( keypair: AuthorKeypair, path: string, format: FormatArg<F> = DEFAULT_FORMAT as unknown as FormatArg<F>, ): Promise<IngestEvent<FormatDocType<F>> | ValidationError> { if (this.closed) throw new ReplicaCacheIsClosedError();
return this.replica.wipeDocAtPath(keypair, path, format); }
private attachmentCache = new Map< string, AttachmentCacheEntry<any> >();
private async onReplicaEvent( event: | ExpireEvent<DocBase<string>> | IngestEventSuccess<DocBase<string>> | AttachmentIngestEvent<DocBase<string>>, ) { const cacheKey = `${event.doc.path}|${event.doc.author}`; const cacheEntry = this.attachmentCache.get(cacheKey);
if (!cacheEntry) { return; }
const res = await this.replica.getAttachment(event.doc, cacheEntry.format);
this.attachmentCache.set( cacheKey, { expires: Date.now() + this.timeToLive, attachment: res, format: cacheEntry.format, }, );
this.fireOnCacheUpdateds(); }
getAttachment<F = DefaultFormat>( doc: FormatDocType<F>, format: FormatArg<F> = DEFAULT_FORMAT as unknown as FormatArg<F>, ): DocAttachment | undefined | ValidationError { if (this.closed) throw new ReplicaCacheIsClosedError(); if (this.replica.isClosed()) { throw new ReplicaIsClosedError(); }
const attachmentInfo = format.getAttachmentInfo(doc);
if (isErr(attachmentInfo)) { return attachmentInfo; }
const cacheKey = `${doc.path}|${doc.author}`;
const cachedResult = this.attachmentCache.get(cacheKey);
if (cachedResult) { if (Date.now() > cachedResult.expires) { this.replica.getAttachment(doc, format).then((res) => { this.attachmentCache.set( cacheKey, { expires: Date.now() + this.timeToLive, attachment: res, format, }, );
this.fireOnCacheUpdateds(); }); }
if (doc.deleteAfter && Date.now() * 1000 > doc.deleteAfter) { return new ValidationError("This document has expired"); }
return cachedResult.attachment; }
this.attachmentCache.set(cacheKey, { attachment: undefined, expires: Date.now() + this.timeToLive, format, });
this.replica.getAttachment(doc, format).then((res) => {
this.attachmentCache.set( cacheKey, { expires: Date.now() + this.timeToLive, attachment: res, format, }, );
this.fireOnCacheUpdateds(); });
return undefined; }
addAttachments<F = DefaultFormats>( docs: FormatDocType<F>[], formats?: FormatsArg<F>, ): DocWithAttachment<FormatDocType<F>>[] { const result: DocWithAttachment<FormatDocType<F>>[] = [];
const lookup = getFormatLookup(formats);
for (const doc of docs) { const cachedResult = this.getAttachment(doc, lookup[doc.format]);
result.push({ ...doc, attachment: cachedResult, }); }
return result; }}