import { fast_deep_equal as isEqual, fast_json_stable_stringify as stringify,} from "../../deps.ts";
import { AuthorKeypair, Doc, DocToSet, Path } from "../util/doc-types.ts";import { ReplicaCacheIsClosedError, ReplicaIsClosedError,} from "../util/errors.ts";
import { cleanUpQuery, docMatchesFilter } from "../query/query.ts";import { QueryFollower } from "../query-follower/query-follower.ts";import { Query } from "../query/query-types.ts";import { IReplica, LiveQueryEvent } from "./replica-types.ts";
import { Logger } from "../util/log.ts";
const logger = new Logger("replica-cache", "green");
function justLocalIndex({ _localIndex }: Doc) { return _localIndex;}
function sortAndLimit(query: Query, docs: Doc[]) { const filteredDocs: Doc[] = [];
for (const doc of docs) { if (query.orderBy === "path ASC") { if (query.startAfter !== undefined) { if ( query.startAfter.path !== undefined && doc.path <= query.startAfter.path ) { continue; } } } if (query.orderBy === "path DESC") { if (query.startAfter !== undefined) { if ( query.startAfter.path !== undefined && doc.path >= query.startAfter.path ) { continue; } } } if (query.orderBy === "localIndex ASC") { if (query.startAfter !== undefined) { if ( query.startAfter.localIndex !== undefined && (doc._localIndex || 0) <= query.startAfter.localIndex ) { continue; } } } if (query.orderBy === "localIndex DESC") { if (query.startAfter !== undefined) { if ( query.startAfter.localIndex !== undefined && (doc._localIndex || 0) >= query.startAfter.localIndex ) { continue; } } }
filteredDocs.push(doc);
if (query.limit !== undefined && filteredDocs.length >= query.limit) { break; } }
return filteredDocs;}
type CacheEntry = { docs: Doc[]; follower: QueryFollower; expires: number };
export class ReplicaCache { version = 0;
_replica: IReplica;
_docCache = new Map< string, CacheEntry >();
_timeToLive: number;
_onCacheUpdatedCallbacks = new Set<(entry: string) => void>();
_isClosed = false;
_onFireCacheUpdatedsWrapper = (cb: () => void) => cb();
constructor( replica: IReplica, timeToLive?: number, onCacheUpdatedWrapper?: (cb: () => void) => void, ) { this._replica = replica; this._timeToLive = timeToLive || 1000;
if (onCacheUpdatedWrapper) { this._onFireCacheUpdatedsWrapper = onCacheUpdatedWrapper; } }
async close() { if (this._isClosed) throw new ReplicaCacheIsClosedError(); this._isClosed = true;
await Promise.all( Array.from(this._docCache.values()).map((entry) => entry.follower.close), );
this._docCache.clear(); }
isClosed() { return this._isClosed; }
set(keypair: AuthorKeypair, docToSet: DocToSet) { if (this._isClosed) throw new ReplicaCacheIsClosedError();
return this._replica.set(keypair, docToSet); }
getAllDocs(): Doc[] { if (this._isClosed) throw new ReplicaCacheIsClosedError(); if (this._replica.isClosed()) { throw new ReplicaIsClosedError(); } return this.queryDocs({ historyMode: "all", orderBy: "path DESC", }); }
getLatestDocs(): Doc[] { if (this._isClosed) throw new ReplicaCacheIsClosedError(); if (this._replica.isClosed()) { throw new ReplicaIsClosedError(); } return this.queryDocs({ historyMode: "latest", orderBy: "path DESC", }); }
getAllDocsAtPath(path: Path): Doc[] { if (this._isClosed) throw new ReplicaCacheIsClosedError(); if (this._replica.isClosed()) { throw new ReplicaIsClosedError(); } return this.queryDocs({ historyMode: "all", orderBy: "path DESC", filter: { path: path }, }); }
getLatestDocAtPath(path: Path): Doc | undefined { if (this._isClosed) throw new ReplicaCacheIsClosedError(); if (this._replica.isClosed()) { throw new ReplicaIsClosedError(); } const docs = this.queryDocs({ historyMode: "latest", orderBy: "path DESC", filter: { path: path }, }); if (docs.length === 0) { return undefined; } return docs[0]; }
queryDocs(query: Query = {}): Doc[] { if (this._isClosed) throw new ReplicaCacheIsClosedError(); if (this._replica.isClosed()) { throw new ReplicaIsClosedError(); } const cleanUpQueryResult = cleanUpQuery(query);
if (cleanUpQueryResult.willMatch === "nothing") { return []; }
const queryString = stringify(cleanUpQueryResult.query);
const cachedResult = this._docCache.get(queryString);
if (cachedResult) { if (Date.now() > cachedResult.expires) { this._replica.queryDocs(query).then((docs) => { const localIndexes = docs.map(justLocalIndex).sort(); const cacheLocalIndexes = cachedResult.docs.map(justLocalIndex) .sort();
if (isEqual(localIndexes, cacheLocalIndexes)) { return; }
this._docCache.set(queryString, { follower: cachedResult.follower, docs, expires: Date.now() + this._timeToLive, });
logger.debug("Updated cache because result expired."); this._fireOnCacheUpdateds(queryString); }); }
return cachedResult.docs; }
const follower = new QueryFollower( this._replica, { ...query, historyMode: "all", orderBy: "localIndex ASC" }, );
follower.bus.on((event: LiveQueryEvent) => { if (event.kind === "existing" || event.kind === "success") { logger.debug({ doc: event.doc.path, queryString }); this._updateCache(queryString, event.doc); } });
follower.hatch();
this._docCache.set(queryString, { follower, docs: [], expires: Date.now() + this._timeToLive, });
this._replica.queryDocs(query).then((docs) => { this._docCache.set(queryString, { follower, docs, expires: Date.now() + this._timeToLive, }); logger.debug("Updated cache with a new entry."); this._fireOnCacheUpdateds(queryString); });
return []; }
overwriteAllDocsByAuthor(keypair: AuthorKeypair) { if (this._isClosed) throw new ReplicaCacheIsClosedError(); if (this._replica.isClosed()) { throw new ReplicaIsClosedError(); } return this._replica.overwriteAllDocsByAuthor(keypair); }
_updateCache(key: string, doc: Doc): void { const entry = this._docCache.get(key);
if (!entry) { return; }
const query: Query = JSON.parse(key);
const appendDoc = () => { const nextDocs = [...entry.docs, doc]; this._docCache.set(key, { ...entry, docs: sortAndLimit(query, nextDocs), }); this._fireOnCacheUpdateds(key); };
const replaceDoc = ({ exact }: { exact: boolean }) => { const nextDocs = entry.docs.map((existingDoc) => { if ( exact && existingDoc.path === doc.path && existingDoc.author === doc.author ) { return doc; } else if (!exact && existingDoc.path === doc.path) { return doc; }
return existingDoc; });
this._docCache.set(key, { ...entry, docs: sortAndLimit(query, nextDocs), }); this._fireOnCacheUpdateds(key); };
const documentsWithSamePath = entry.docs.filter( (existingDoc) => existingDoc.path === doc.path, );
const documentsWithSamePathAndAuthor = entry.docs.filter( (existingDoc) => existingDoc.path === doc.path && existingDoc.author === doc.author, );
if (documentsWithSamePath.length === 0) { if ( (query.filter && docMatchesFilter(doc, query.filter)) || !query.filter ) { logger.debug( "Updated cache after appending a doc to a entry with matching filter.", ); appendDoc(); } return; }
const historyMode = query.historyMode || "latest";
if (historyMode === "all") { if (documentsWithSamePathAndAuthor.length === 0) { logger.debug( "Updated cache after appending a version of a doc to a historyMode: all query.", ); appendDoc(); return; }
logger.debug( "Updated cache after replacing a version of a doc in a historyMode: all query.", ); replaceDoc({ exact: true }); return; }
const latestDoc = documentsWithSamePath[0];
const docIsDifferent = doc.author !== latestDoc?.author || !isEqual(doc, latestDoc);
const docIsLater = doc.timestamp > latestDoc.timestamp;
if (docIsDifferent && docIsLater) { logger.debug( "Updated cache after replacing a doc with its latest version.", ); replaceDoc({ exact: false }); return; } }
_fireOnCacheUpdateds(entry: string) { this.version++;
this._onFireCacheUpdatedsWrapper(() => { this._onCacheUpdatedCallbacks.forEach((cb) => { cb(entry); }); }); }
onCacheUpdated(callback: (entryKey: string) => void): () => void { if (this._isClosed) throw new ReplicaCacheIsClosedError(); if (this._replica.isClosed()) { throw new ReplicaIsClosedError(); }
this._onCacheUpdatedCallbacks.add(callback);
return () => { this._onCacheUpdatedCallbacks.delete(callback); }; }}