import { Cmp } from "./util-types.ts";import { AuthorAddress, DocAttachment, DocBase, DocWithAttachment, FormatName, Path, ShareAddress,} from "../util/doc-types.ts";import { Query } from "../query/query-types.ts";import { IngestEvent, IReplicaDriver, MultiFormatReplicaOpts, QuerySourceEvent, QuerySourceMode, ReplicaEvent, ReplicaId,} from "./replica-types.ts";import { isErr, ReplicaIsClosedError, ValidationError,} from "../util/errors.ts";import { microsecondNow, randomId } from "../util/misc.ts";import { compareArrays } from "./compare.ts";import { checkShareIsValid } from "../core-validators/addresses.ts";
import { Logger } from "../util/log.ts";import { CallbackSink, ChannelMultiStream, LockStream, OrCh,} from "../streams/stream_utils.ts";import { DefaultFormat, DefaultFormats, FormatArg, FormatDocType, FormatInputType, FormatsArg,} from "../formats/format_types.ts";import { DEFAULT_FORMAT, DEFAULT_FORMATS, getFormatLookup, getFormatsWithFallback, getFormatWithFallback,} from "../formats/util.ts";
import { docMatchesFilter } from "../query/query.ts";import { deferred } from "../../deps.ts";import { AuthorKeypair } from "../crypto/crypto-types.ts";
const J = JSON.stringify;const logger = new Logger("replica", "gold");const loggerSet = new Logger("replica set", "gold");const loggerIngest = new Logger("replica ingest", "gold");
function docCompareNewestFirst< FormatType extends FormatName, DocA extends DocBase<FormatType>, DocB extends DocBase<FormatType>,>( a: DocA, b: DocB,): Cmp { return compareArrays( [a.timestamp, a.signature], [b.timestamp, a.signature], ["DESC", "ASC"], );}
export class MultiformatReplica { replicaId: ReplicaId; share: ShareAddress; replicaDriver: IReplicaDriver; formatsConfig: Record<string, unknown>;
private _isClosed = false; private ingestLockStream = new LockStream(); private eventMultiStream: ChannelMultiStream< ReplicaEvent<DocBase<string>>["kind"], "kind", ReplicaEvent<DocBase<string>> > = new ChannelMultiStream("kind", true); private eventWriter: WritableStreamDefaultWriter< ReplicaEvent<DocBase<string>> >; private callbackSink = new CallbackSink< ReplicaEvent<DocBase<string>> >(); private eraseInterval: number; private expireEventTimeouts = new Map<string, number>();
constructor({ driver, config }: MultiFormatReplicaOpts) { const addressIsValidResult = checkShareIsValid(driver.docDriver.share);
if (isErr(addressIsValidResult)) { throw addressIsValidResult; }
logger.debug( `constructor. driver = ${(driver as any)?.constructor?.name}`, );
this.replicaId = "replica-" + randomId(); this.share = driver.docDriver.share; this.replicaDriver = driver; this.formatsConfig = config || {};
this.eventWriter = this.eventMultiStream.getWritableStream().getWriter();
this.eventMultiStream.getReadableStream("*").pipeTo( new WritableStream(this.callbackSink), );
this.eraseInterval = setInterval(() => { if (!this.isClosed()) { this.pruneExpiredDocsAndAttachments(); } else { clearInterval(this.eraseInterval); } }, 1000 * 60 * 60); }
isClosed(): boolean { return this._isClosed; }
async close(erase: boolean): Promise<void> { logger.debug("closing..."); if (this._isClosed) throw new ReplicaIsClosedError();
await this.ingestLockStream.close();
for (const timeout of this.expireEventTimeouts.values()) { clearTimeout(timeout); }
logger.debug(" sending willClose blockingly..."); await this.eventWriter.write({ kind: "willClose", }); logger.debug(" marking self as closed..."); if (erase === false) { await this.pruneExpiredDocsAndAttachments(); }
await this.replicaDriver.attachmentDriver.clearStaging();
this._isClosed = true; logger.debug(` closing ReplicaDriver (erase = ${erase})...`);
await this.replicaDriver.docDriver.close(erase); await this.replicaDriver.attachmentDriver.close(erase);
clearInterval(this.eraseInterval);
logger.debug(" sending didClose nonblockingly..."); await this.eventWriter.write({ kind: "didClose", }); logger.debug("...closing done");
return Promise.resolve(); }
async getConfig(key: string): Promise<string | undefined> { if (this._isClosed) throw new ReplicaIsClosedError(); return await this.replicaDriver.docDriver.getConfig(key); } async setConfig(key: string, value: string): Promise<void> { if (this._isClosed) throw new ReplicaIsClosedError(); return await this.replicaDriver.docDriver.setConfig(key, value); } async listConfigKeys(): Promise<string[]> { if (this._isClosed) throw new ReplicaIsClosedError(); return await this.replicaDriver.docDriver.listConfigKeys(); } async deleteConfig(key: string): Promise<boolean> { if (this._isClosed) throw new ReplicaIsClosedError(); return await this.replicaDriver.docDriver.deleteConfig(key); }
getMaxLocalIndex(): Promise<number> { if (this._isClosed) throw new ReplicaIsClosedError(); return this.replicaDriver.docDriver.getMaxLocalIndex(); }
getAllDocs<F = DefaultFormats>( formats?: FormatsArg<F>, ): Promise<FormatDocType<F>[]> { logger.debug(`getAllDocs()`);
return this.queryDocs({ historyMode: "all", orderBy: "path ASC", }, formats); } getLatestDocs<F = DefaultFormats>( formats?: FormatsArg<F>, ): Promise<FormatDocType<F>[]> { logger.debug(`getLatestDocs()`);
return this.queryDocs({ historyMode: "latest", orderBy: "path ASC", }, formats); } getAllDocsAtPath<F = DefaultFormats>( path: Path, formats?: FormatsArg<F>, ): Promise<FormatDocType<F>[]> { logger.debug(`getAllDocsAtPath("${path}")`);
return this.queryDocs({ historyMode: "all", orderBy: "path ASC", filter: { path: path }, }, formats); } async getLatestDocAtPath<F = DefaultFormat>( path: Path, format?: FormatArg<F>, ): Promise<FormatDocType<F> | undefined> { logger.debug(`getLatestDocsAtPath("${path}")`);
const docs = await this.queryDocs({ historyMode: "latest", orderBy: "path ASC", filter: { path: path }, }, format ? [format] : undefined);
if (docs.length === 0) return undefined; return docs[0] as FormatDocType<F>; }
async queryDocs<F = DefaultFormats>( query: Omit<Query<[string]>, "formats"> = {}, formats?: FormatsArg<F>, ): Promise<FormatDocType<F>[]> { logger.debug(`queryDocs`, query); if (this._isClosed) throw new ReplicaIsClosedError(); const f = getFormatsWithFallback(formats);
return await this.replicaDriver.docDriver.queryDocs({ ...query, formats: f.map((f) => f.id), }) as FormatDocType<F>[]; }
async queryPaths<F = DefaultFormats>( query: Omit<Query<[string]>, "formats"> = {}, formats?: FormatsArg<F>, ): Promise<Path[]> { const docs = await this.queryDocs(query, formats); const pathsSet = new Set(docs.map(({ path }) => path)); return Array.from(pathsSet).sort(); }
async queryAuthors<F = DefaultFormats>( query: Omit<Query<[string]>, "formats"> = {}, formats?: FormatsArg<F>, ): Promise<AuthorAddress[]> { const docs = await this.queryDocs(query, formats); const authorsSet = new Set(docs.map(({ author }) => author)); return Array.from(authorsSet).sort(); }
async set<F = DefaultFormat>( keypair: AuthorKeypair, docToSet: Omit<FormatInputType<F>, "format">, format: FormatArg<F> = DEFAULT_FORMAT as unknown as FormatArg<F>, ): Promise< IngestEvent<FormatDocType<F>> > { loggerSet.debug(`set`, docToSet); if (this._isClosed) throw new ReplicaIsClosedError();
loggerSet.debug( "...deciding timestamp: getting latest doc at the same path (from any author)", );
const latestDocSamePath = await this.getLatestDocAtPath(docToSet.path);
let timestamp: number; if (typeof docToSet.timestamp === "number") { timestamp = docToSet.timestamp; } else {
if (latestDocSamePath === undefined) { timestamp = microsecondNow(); } else { timestamp = Math.max( microsecondNow(), latestDocSamePath.timestamp + 1, ); } }
loggerSet.debug("...generating doc");
let cleanedDoc;
if (latestDocSamePath) { const res = format.removeExtraFields(latestDocSamePath);
if (!isErr(res)) { cleanedDoc = res.doc; } }
const result = await format.generateDocument({ keypair, input: { ...docToSet, format: format.id }, share: this.share, timestamp, prevLatestDoc: cleanedDoc, config: this.formatsConfig[format.id] as Record<string, unknown>, });
if (isErr(result)) { return { kind: "failure", reason: "invalid_document", err: result, }; }
loggerSet.debug("...signature =", result.doc.signature);
if (result.attachment) { const stageResult = await this.replicaDriver.attachmentDriver.stage( format.id, result.attachment, );
if (isErr(stageResult)) { return { kind: "failure", reason: `invalid_document`, err: stageResult, }; }
const updatedDocRes = await format.updateAttachmentFields( keypair, result.doc, stageResult.size, stageResult.hash, this.formatsConfig[format.id] as Record<string, unknown>, );
if (isErr(updatedDocRes)) { await stageResult.reject(); return { kind: "failure", reason: `invalid_document`, err: updatedDocRes, }; }
loggerSet.debug("...ingesting attachment"); loggerSet.debug("-----------------------"); await stageResult.commit();
await this.eventWriter.write({ kind: "attachment_ingest", doc: updatedDocRes, hash: stageResult.hash, size: stageResult.size, sourceId: "local", });
loggerSet.debug("...done ingesting attachment");
loggerSet.debug("...ingesting"); loggerSet.debug("-----------------------");
const ingestEvent = await this.ingest( format, updatedDocRes as FormatDocType<F>, "local", ); loggerSet.debug("...done ingesting");
loggerSet.debug("...set is done.");
return ingestEvent; }
loggerSet.debug("...ingesting"); loggerSet.debug("-----------------------"); const ingestResult = await this.ingest( format, result.doc as FormatDocType<F>, "local", ); loggerSet.debug("...done ingesting");
loggerSet.debug("...set is done.");
return ingestResult; }
async ingest<F = DefaultFormat>( format: FormatArg<F>, docToIngest: FormatDocType<F>, sourceId: "local" | string, ): Promise< IngestEvent<FormatDocType<F>> > { loggerIngest.debug(`ingest`, docToIngest); if (this._isClosed) throw new ReplicaIsClosedError();
loggerIngest.debug("...removing extra fields");
const removeResultsOrErr = format .removeExtraFields(docToIngest);
if (isErr(removeResultsOrErr)) { return { kind: "failure", reason: "invalid_document", err: removeResultsOrErr, }; } docToIngest = removeResultsOrErr.doc as FormatDocType<F>;
const extraFields = removeResultsOrErr.extras; if (Object.keys(extraFields).length > 0) { loggerIngest.debug(`...extra fields found: ${J(extraFields)}`); }
const docIsValid = await format.checkDocumentIsValid(docToIngest);
if (isErr(docIsValid)) { return { kind: "failure", reason: "invalid_document", err: docIsValid, }; }
const ingestPromise = deferred<IngestEvent<FormatDocType<F>>>();
await this.ingestLockStream.run(async () => { loggerIngest.debug(" >> ingest: start of protected region"); loggerIngest.debug( " > getting other history docs at the same path by any author", ); const existingDocsSamePath = await this.getAllDocsAtPath( docToIngest.path, [format], ); loggerIngest.debug(` > ...got ${existingDocsSamePath.length}`);
loggerIngest.debug(" > getting prevLatest and prevSameAuthor"); const prevLatest = existingDocsSamePath[0] ?? null; const prevSameAuthor = existingDocsSamePath.filter((d) => d.author === docToIngest.author )[0] ?? null;
loggerIngest.debug( " > checking if new doc is latest at this path", ); existingDocsSamePath.push(docToIngest); existingDocsSamePath.sort(docCompareNewestFirst); const isLatest = existingDocsSamePath[0] === docToIngest; loggerIngest.debug(` > ...isLatest: ${isLatest}`);
if (!isLatest && prevSameAuthor !== null) { loggerIngest.debug( " > new doc is not latest and there is another one from the same author...", ); const docComp = docCompareNewestFirst( docToIngest, prevSameAuthor, ); if (docComp === Cmp.GT) { loggerIngest.debug( " > new doc is GT prevSameAuthor, so it is obsolete", ); ingestPromise.resolve({ kind: "nothing_happened", reason: "obsolete_from_same_author", doc: docToIngest, });
return; } if (docComp === Cmp.EQ) { loggerIngest.debug( " > new doc is EQ prevSameAuthor, so it is redundant (already_had_it)", ); ingestPromise.resolve({ kind: "nothing_happened", reason: "already_had_it", doc: docToIngest, });
return; } } loggerIngest.debug(" > upserting into ReplicaDriver...");
const docAsWritten = await this.replicaDriver.docDriver.upsert( docToIngest, );
loggerIngest.debug(" > ...done upserting into ReplicaDriver"); loggerIngest.debug(" > ...getting ReplicaDriver maxLocalIndex..."); const maxLocalIndex = await this.replicaDriver.docDriver .getMaxLocalIndex();
loggerIngest.debug( " >> ingest: end of protected region, returning a WriteEvent from the lock", );
const event = { kind: "success" as const, maxLocalIndex, doc: docAsWritten, docIsLatest: isLatest, prevDocFromSameAuthor: prevSameAuthor as FormatDocType<F>, prevLatestDoc: prevLatest as FormatDocType<F>, sourceId, };
ingestPromise.resolve(event); await this.eventWriter.write(event);
if (docAsWritten.deleteAfter) { const key = `${docAsWritten.path}|${docAsWritten.author}`;
const existingTimeout = this.expireEventTimeouts.get(key);
if (existingTimeout) { clearTimeout(existingTimeout); }
const waitFor = (docAsWritten.deleteAfter / 1000) - Date.now();
this.expireEventTimeouts.set( key, setTimeout(() => { this.eventWriter.write({ kind: "expire", doc: docAsWritten, }); }, waitFor), ); } });
return ingestPromise; }
async overwriteAllDocsByAuthor<F = DefaultFormat>( keypair: AuthorKeypair, format?: FormatArg<F>, ): Promise<number | ValidationError> { const f = getFormatWithFallback(format);
logger.debug(`overwriteAllDocsByAuthor("${keypair.address}")`); if (this._isClosed) throw new ReplicaIsClosedError(); const docsToOverwrite = await this.queryDocs({ filter: { author: keypair.address }, historyMode: "all", }, [f]); logger.debug( ` ...found ${docsToOverwrite.length} docs to overwrite`, ); let numOverwritten = 0; let numAlreadyEmpty = 0;
for (const doc of docsToOverwrite as FormatDocType<F>[]) { const didWipe = await this.wipeDocument( keypair, doc, getFormatWithFallback(format), );
if (isErr(didWipe)) { return didWipe; } else { numOverwritten += 1; } } logger.debug( ` ...done; ${numOverwritten} overwritten to be empty; ${numAlreadyEmpty} were already empty; out of total ${docsToOverwrite.length} docs`, ); return numOverwritten; }
async wipeDocAtPath<F = DefaultFormat>( keypair: AuthorKeypair, path: string, format: FormatArg<F> = DEFAULT_FORMAT as unknown as FormatArg<F>, ): Promise<IngestEvent<FormatDocType<F>> | ValidationError> { const latestDocAtPath = await this.getLatestDocAtPath(path, format);
if (!latestDocAtPath) { return new ValidationError("No document exists at that path"); }
return this.wipeDocument( keypair, latestDocAtPath as FormatDocType<F>, format, ); }
private async wipeDocument<F>( keypair: AuthorKeypair, doc: FormatDocType<F>, format: FormatArg<F>, ): Promise<IngestEvent<FormatDocType<F>>> { const docToWipe: FormatDocType<F> = { ...doc, timestamp: Math.max(doc.timestamp + 1, Date.now() * 1000), author: keypair.address, };
const wipedDoc = await format.wipeDocument( keypair, docToWipe, this.formatsConfig[format.id] as Record<string, unknown>, );
if (isErr(wipedDoc)) { return { kind: "failure", err: wipedDoc, reason: "invalid_document", }; }
const didIngest = await this.ingest( format, wipedDoc as FormatDocType<F>, "local", );
if (didIngest.kind === "success") { const attachmentInfo = format.getAttachmentInfo(doc);
if (!isErr(attachmentInfo)) { const eraseRes = await this.replicaDriver.attachmentDriver.erase( format.id, attachmentInfo.hash, );
if (!isErr(eraseRes)) { await this.eventWriter.write({ kind: "attachment_prune", format: format.id, hash: attachmentInfo.hash, }); } } }
return didIngest; }
private async pruneExpiredDocsAndAttachments<F>( formats: FormatsArg<F> = DEFAULT_FORMATS as unknown as FormatsArg<F>, ) { const erasedDocs = await this.replicaDriver.docDriver.eraseExpiredDocs();
for (const doc of erasedDocs) { await this.eventWriter.write({ kind: "expire", doc, }); }
const formatLookup = getFormatLookup(formats);
const allowedHashes: Record<string, Set<string>> = {};
await this.getQueryStream( { historyMode: "all", orderBy: "localIndex ASC", }, "existing", formats, ).pipeTo( new WritableStream({ write(event) { if (event.kind === "existing") { const format = formatLookup[event.doc.format];
const attachmentInfo = format.getAttachmentInfo(event.doc);
if (!isErr(attachmentInfo)) { const maybeExistingSet = allowedHashes[format.id];
if (maybeExistingSet) { maybeExistingSet.add(attachmentInfo.hash); } else { allowedHashes[format.id] = new Set([attachmentInfo.hash]); } } } }, }), );
const erasedAttachments = await this.replicaDriver.attachmentDriver.filter( allowedHashes, );
for (const attachment of erasedAttachments) { await this.eventWriter.write({ kind: "attachment_prune", format: attachment.format, hash: attachment.hash, }); } }
getEventStream( channel: OrCh<ReplicaEvent<DocBase<string>>["kind"]> = "*", ): ReadableStream<ReplicaEvent<DocBase<string>>> { return this.eventMultiStream.getReadableStream(channel); }
getQueryStream<F = DefaultFormats>( query: Omit<Query<[string]>, "formats"> = {}, mode?: QuerySourceMode, formats?: FormatsArg<F>, ): ReadableStream<QuerySourceEvent<FormatDocType<F>>> { const queryDocs = this.queryDocs.bind(this); const getEventStream = this.getEventStream.bind(this);
return new ReadableStream({ async start(controller) { if (mode === "existing" || mode === "everything") { const docs = await queryDocs(query, formats);
for (const doc of docs) { controller.enqueue({ kind: "existing", doc: doc, }); } }
controller.enqueue({ kind: "processed_all_existing" });
if (mode === "existing") { controller.close(); return; }
const eventStream = getEventStream();
const reader = eventStream.getReader();
while (true) { const { done, value: event } = await reader.read();
if (done) return;
if (event.kind === "expire" || event.kind === "success") { if (query.filter) { if (docMatchesFilter(event.doc, query.filter)) { controller.enqueue( event as QuerySourceEvent<FormatDocType<F>>, ); continue; } }
controller.enqueue(event as QuerySourceEvent<FormatDocType<F>>); continue; }
if (event.kind === "didClose") { controller.close(); } } }, }) as ReadableStream< QuerySourceEvent<FormatDocType<F>> >; }
onEvent( callback: ( event: ReplicaEvent<DocBase<FormatName>>, ) => void | Promise<void>, ) { return this.callbackSink.onWrite(callback); }
async ingestAttachment<F = DefaultFormat>( format: FormatArg<F>, doc: FormatDocType<F>, attachment: Uint8Array | ReadableStream<Uint8Array>, sourceId: "local" | string, ): Promise< true | false | ValidationError > { if (this._isClosed) throw new ReplicaIsClosedError();
const removeResultsOrErr = format .removeExtraFields(doc);
if (isErr(removeResultsOrErr)) { return Promise.resolve(removeResultsOrErr); } doc = removeResultsOrErr.doc as FormatDocType<F>;
const docIsValid = await format.checkDocumentIsValid(doc);
if (isErr(docIsValid)) { return Promise.resolve(docIsValid); }
const existingAttachment = await this.getAttachment(doc, format);
if (existingAttachment && !isErr(existingAttachment)) { return false; }
const attachmentInfo = format.getAttachmentInfo(doc);
if (isErr(attachmentInfo)) { return Promise.resolve(attachmentInfo); }
const stageRes = await this.replicaDriver.attachmentDriver .stage( doc.format, attachment, );
if (isErr(stageRes)) { return stageRes; }
if (stageRes.hash !== attachmentInfo.hash) { await stageRes.reject(); return new ValidationError( "Attachment's hash did not match the document's", ); }
if (stageRes.size !== attachmentInfo.size) { await stageRes.reject(); return new ValidationError( "Attachment's size did not match the document's", ); }
await stageRes.commit();
await this.eventWriter.write({ kind: "attachment_ingest", doc, hash: stageRes.hash, size: stageRes.size, sourceId, });
return true; }
getAttachment<F = DefaultFormat>( doc: FormatDocType<F>, format: FormatArg<F> = DEFAULT_FORMAT as unknown as FormatArg<F>, ): Promise<DocAttachment | undefined | ValidationError> { if (doc.deleteAfter && doc.deleteAfter < Date.now() * 1000) { return Promise.resolve(new ValidationError("This document has expired")); }
const attachmentInfo = format.getAttachmentInfo(doc);
if (!isErr(attachmentInfo)) { return this.replicaDriver.attachmentDriver.getAttachment( doc.format, attachmentInfo.hash, ); } else { return Promise.resolve(attachmentInfo); } }
addAttachments<F = DefaultFormats>( docs: FormatDocType<F>[], formats?: FormatsArg<F>, ): Promise< Awaited< DocWithAttachment<FormatDocType<F>> >[] > { const f = getFormatsWithFallback(formats);
const formatLookup: Record<string, FormatArg<F>> = {};
for (const format of f) { formatLookup[format.id] = format as typeof formatLookup[string]; }
const promises = docs.map((doc) => { return new Promise< FormatDocType<F> & { attachment: ValidationError | DocAttachment | undefined; } >((resolve) => { const format = formatLookup[doc.format];
const attachmentInfo = format.getAttachmentInfo(doc);
if (!isErr(attachmentInfo)) { this.replicaDriver.attachmentDriver.getAttachment( doc.format, attachmentInfo.hash, ) .then( (attachment) => { resolve({ ...doc, attachment }); }, ); } else { return resolve({ ...doc, attachment: attachmentInfo, }); } }); });
return Promise.all(promises); }}