import { ShareAddress } from "../util/doc-types.ts";import { IPeer } from "./peer-types.ts";
import { Logger } from "../util/log.ts";
import { Replica } from "../replica/replica.ts";import { Syncer } from "../syncer/syncer.ts";
import { BlockingBus } from "../streams/stream_utils.ts";import { PartnerWebServer } from "../syncer/partner_web_server.ts";import { PartnerLocal } from "../syncer/partner_local.ts";import { FormatsArg } from "../formats/format_types.ts";import { SyncerManager } from "../syncer/syncer_manager.ts";import { ISyncPartner } from "../syncer/syncer_types.ts";import { DiscoveryEvent, DiscoveryService } from "../discovery/types.ts";
const logger = new Logger("peer", "orangeRed");const J = JSON.stringify;
export class Peer implements IPeer { private replicaEventBus = new BlockingBus<Map<ShareAddress, Replica>>(); private syncerManager: SyncerManager;
replicaMap: Map<ShareAddress, Replica> = new Map();
constructor() { logger.debug("constructor");
this.syncerManager = new SyncerManager(this); }
hasShare(share: ShareAddress): boolean { return this.replicaMap.has(share); } shares(): ShareAddress[] { const keys = [...this.replicaMap.keys()]; keys.sort(); return keys; } replicas(): Replica[] { const keys = [...this.replicaMap.keys()]; keys.sort(); return keys.map((key) => this.replicaMap.get(key) as Replica); } size(): number { return this.replicaMap.size; } getReplica(ws: ShareAddress): Replica | undefined { return this.replicaMap.get(ws); }
async addReplica(replica: Replica): Promise<void> { logger.debug(`addReplica(${J(replica.share)})`); if (this.replicaMap.has(replica.share)) { logger.debug(`already had a replica with that share`); throw new Error( `Peer.addReplica: already has a replica with share ${ J(replica.share) }. Don't add another one.`, ); } this.replicaMap.set(replica.share, replica); await this.replicaEventBus.send(this.replicaMap); logger.debug(` ...addReplica: done`); } async removeReplicaByShare(share: ShareAddress): Promise<void> { logger.debug(`removeReplicaByShare(${J(share)})`); this.replicaMap.delete(share); await this.replicaEventBus.send(this.replicaMap); } async removeReplica( replica: Replica, ): Promise<void> { const existingReplica = this.replicaMap.get(replica.share); if (replica === existingReplica) { logger.debug(`removeReplica(${J(replica.share)})`); await this.removeReplicaByShare(replica.share); } else { logger.debug( `removeReplica(${ J(replica.share) }) -- same share but it's a different instance now; ignoring`, ); } }
sync<F>( target: IPeer | string, continuous = false, formats?: FormatsArg<F>, ): Syncer<undefined, F> { try { const partner = new PartnerWebServer({ url: target as string, appetite: continuous ? "continuous" : "once", });
return this.syncerManager.addPartner(partner, target as string, formats); } catch { if (target instanceof Peer) { const partner = new PartnerLocal( target as IPeer, this, continuous ? "continuous" : "once", formats, );
return this.syncerManager.addPartner(partner, "Local", formats); }
console.error( "Provided an invalid target for syncing to a peer:", target, ); return undefined as never; } }
addSyncPartner<I, F>( partner: ISyncPartner<I>, description: string, formats?: FormatsArg<F>, ) { return this.syncerManager.addPartner(partner, description, formats); }
getSyncers() { return this.syncerManager.getSyncers(); }
onReplicasChange( callback: (map: Map<ShareAddress, Replica>) => void | Promise<void>, ) { return this.replicaEventBus.on(callback); }
onSyncersChange( callback: ( map: Map< string, { description: string; syncer: Syncer<unknown, unknown> } >, ) => void | Promise<void>, ): () => void { return this.syncerManager.onSyncersChange(callback); }
discover( service: DiscoveryService, ): AsyncIterable<DiscoveryEvent> { const peer = this;
return { async *[Symbol.asyncIterator]() { for await (const event of service.events) { if (event.kind === "SERVICE_STOPPED") { break; }
if (event.kind === "PEER_EXITED") { yield event; continue; }
if (event.kind === "PEER_INITIATED_SYNC") { const syncer = await event.begin(peer);
yield { kind: "PEER_INITIATED_SYNC", description: event.description, syncer: syncer, }; continue; }
yield { kind: "PEER_DISCOVERED", description: event.description, sync: async (opts) => { const syncer = await event.begin( peer, opts?.syncContinuously ? "continuous" : "once", );
return syncer; }, }; } }, }; }}