Skip to main content
Module

x/pg_mem/transforms/join.ts

An in memory postgres DB instance for your unit tests
Go to Latest
File
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546
import { _ISelection, IValue, _IIndex, _IDb, setId, getId, _Transaction, _ISchema, _SelectExplanation, _Explainer, IndexExpression, IndexOp, IndexKey, _IndexExplanation, Stats, _IAlias, TR, _IStatement } from '../interfaces-private.ts';import { buildBinaryValue, buildValue, uncache } from '../parser/expression-builder.ts';import { QueryError, ColumnNotFound, NotSupported, nil, DataType } from '../interfaces.ts';import { DataSourceBase, TransformBase } from './transform-base.ts';import { Expr, ExprRef, JoinClause, Name, SelectedColumn } from 'https://deno.land/x/pgsql_ast_parser@10.5.2/mod.ts';import { colToStr, nullIsh, SRecord } from '../utils.ts';import { Types } from '../datatypes/index.ts';import { SELECT_ALL } from '../execution/clean-results.ts';import { CustomAlias, Selection } from './selection.ts';import { withSelection, buildCtx } from '../parser/context.ts';
let jCnt = 0;
interface JoinRaw<TLeft, TRight> { '>restrictive': TLeft; '>joined': TRight;}interface JoinStrategy { iterate: _ISelection; iterateSide: 'joined' | 'restrictive'; joinIndex: _IIndex<any>; onValue: IValue; othersPredicate?: IValue<any>;}
interface Equality { left: IValue; right: IValue; eq: IValue;}
function* extractAnds(this: void, on: Expr): Iterable<Expr> { if (on.type === 'binary' && on.op === 'AND') { yield* extractAnds(on.left); yield* extractAnds(on.right); return; } yield on;}
function chooseStrategy(this: void, t: _Transaction, strategies: JoinStrategy[]) { strategies.sort((a, b) => a.iterate.entropy(t) > b.iterate.entropy(t) ? 1 : -1); return strategies[0];}
export class JoinSelection<TLeft = any, TRight = any> extends DataSourceBase<JoinRaw<TLeft, TRight>> {
get isExecutionWithNoResult(): boolean { return false; }
private _columns: IValue<any>[] = []; private seqScanExpression!: IValue<any>; private joinId: number; private columnsMappingParentToThis = new Map<IValue, IValue>(); private columnsMappingThisToParent = new Map<IValue, { side: 'joined' | 'restrictive'; col: IValue; }>(); private indexCache = new Map<IValue, _IIndex>(); strategies: JoinStrategy[] = []; private building = false; private ignoreDupes?: Set<IValue>; private mergeSelect?: Selection;

isOriginOf(a: IValue<any>): boolean { return this.joined.isOriginOf(a) || this.restrictive.isOriginOf(a); }
get columns(): IValue<any>[] { return this._columns; }
entropy(t: _Transaction): number { const strategy = chooseStrategy(t, this.strategies); if (!strategy) { // catastophic join... very high entropy... return this.restrictive.entropy(t) * this.joined.entropy(t); }
// todo: multiply that by the mean count per keys in strategy.joinIndex ? const ret = strategy.iterate.entropy(t); return ret; }
constructor(readonly restrictive: _ISelection<TLeft> , readonly joined: _ISelection<TRight> , on: JoinClause , private innerJoin: boolean) { super(buildCtx().schema);

this.joinId = jCnt++; for (const c of this.restrictive.listSelectableIdentities()) { const nc = c.setWrapper(this, x => (x as any)['>restrictive']); this.columnsMappingParentToThis.set(c, nc); if (c.type.primary === DataType.record) { continue; } this._columns.push(nc); this.columnsMappingThisToParent.set(nc, { col: c, side: 'restrictive' }); } for (const c of this.joined.listSelectableIdentities()) { const nc = c.setWrapper(this, x => (x as any)['>joined']); this.columnsMappingParentToThis.set(c, nc); if (c.type.primary === DataType.record) { continue; } this._columns.push(nc); this.columnsMappingThisToParent.set(nc, { col: c, side: 'joined', }); }
withSelection(this, () => { if (on.on) { this.fetchOnStrategies(on.on); } else if (on.using?.length) { this.fetchUsingStrategies(on.using); } else { throw new Error('Unspecified join ON clause'); } }); }
private wrap(v: IValue) { const ret = this.columnsMappingParentToThis.get(v); if (!ret) { throw new Error('Corrupted join (unknown column)'); } return ret; }
listSelectableIdentities(): Iterable<IValue> { return this.columnsMappingParentToThis.values(); }
private fetchOnStrategies(_on: Expr) { // build equalities eligible to a strategy const ands: Equality[] = []; const others: IValue[] = []; for (const on of extractAnds(_on)) { if (on.type !== 'binary' || on.op !== '=') { // join 'ON' clause not compatible with an indexed strategy others.push(buildValue(on)); continue; } this.building = true; const left = buildValue(on.left); const right = buildValue(on.right); this.building = false; // necessary because of the 'this.building' hack uncache(this); ands.push({ left, right, eq: buildValue(on), }); }
// compute strategies this.fetchAndStrategies(ands, others);

// build seq-scan expression this.seqScanExpression = buildValue(_on).cast(Types.bool); }
private fetchUsingStrategies(_using: Name[]) { // build equalities eligible to a strategy const ands = _using.map<Equality>(n => { const left = this.restrictive.getColumn(n.name); const right = this.joined.getColumn(n.name); return { left, right, eq: buildBinaryValue( this.wrap(left) , '=' , this.wrap(right)) } }); this.ignoreDupes = new Set(ands.map(x => this.wrap(x.left)));
// compute strategies this.fetchAndStrategies(ands, []);
// build seq-scan expression this.seqScanExpression = ands.slice(1) .reduce((a, b) => buildBinaryValue(a, 'AND', b.eq), ands[0].eq); }
private fetchAndStrategies(ands: Equality[], otherPredicates: IValue[]) {
for (let i = 0; i < ands.length; i++) { const { left, right } = ands[i]; const strats = [...this.fetchEqStrategyOn(left, right)]; if (!strats.length) { continue; } const others = [ ...ands.slice(0, i).map(x => x.eq), ...ands.slice(i + 1).map(x => x.eq), ...otherPredicates ]; if (others.length) { const and = others.slice(1) .reduce<IValue>((v, c) => buildBinaryValue(c, 'AND', v) , others[0]); for (const s of strats) { s.othersPredicate = and; } } this.strategies.push(...strats); } }
private *fetchEqStrategyOn(a: IValue, b: IValue): Iterable<JoinStrategy> { let restrictedVal: IValue | undefined = undefined; let joinedVal: IValue | undefined = undefined;
// const aIndex = a.wrappedOrigin?.getIndex() if (this.restrictive.isOriginOf(a) && this.joined.isOriginOf(b)) { restrictedVal = a; joinedVal = b; } else if (this.restrictive.isOriginOf(b) && this.joined.isOriginOf(a)) { restrictedVal = b; joinedVal = a; }
let processInner = this.innerJoin; let iterateSide: 'restrictive' | 'joined' = 'restrictive' while (restrictedVal && joinedVal) { // can always iterat on restricted value & use joined table foreign index const jindex = joinedVal.index; if (jindex && jindex.expressions.length === 1) { yield { iterate: iterateSide === 'restrictive' ? this.restrictive : this.joined, iterateSide, onValue: restrictedVal, joinIndex: jindex, } } if (!processInner) { break; } // if is an inner join, then both sides can be interverted processInner = false; const t = restrictedVal; restrictedVal = joinedVal; joinedVal = t; iterateSide = 'joined'; } }
getColumn(column: string | ExprRef): IValue; getColumn(column: string | ExprRef, nullIfNotFound?: boolean): IValue | nil; getColumn(column: string | ExprRef, nullIfNotFound?: boolean): IValue<any> | nil { let onLeft = this.restrictive.getColumn(column, true); let onRight = this.joined.getColumn(column, true); if (!onLeft && !onRight) { if (nullIfNotFound) { return null; } throw new ColumnNotFound(colToStr(column)); } if (!!onLeft && !!onRight) { throw new QueryError(`column reference "${colToStr(column)}" is ambiguous`); } const on = onLeft ?? onRight; if (this.building) { return on; } const mapped = this.columnsMappingParentToThis.get(on!); if (mapped) { return mapped; } throw new Error('Corrupted join'); }
stats(t: _Transaction): Stats | null { return null; }
*enumerate(t: _Transaction): Iterable<any> { const strategy = chooseStrategy(t, this.strategies); if (strategy) { // choose the iterator that has less values // find the right value using index for (const l of strategy.iterate.enumerate(t)) { yield* this.iterateStrategyItem(l, strategy, t); } } else { // perform a seq scan this.db.raiseGlobal('catastrophic-join-optimization'); const others = [...this.joined.enumerate(t)]; for (const l of this.restrictive.enumerate(t)) { yield* this.iterateCatastrophicItem(l, others, 'restrictive', t); } } }

selectAlias(alias: string): _IAlias | nil { let onLeft = this.restrictive.selectAlias(alias); let onRight = this.joined.selectAlias(alias); if (!onLeft && !onRight) { return null; } if (!!onLeft && !!onRight) { throw new QueryError(`alias "${alias}" is ambiguous`); } return new JoinMapAlias(this, onLeft ?? onRight!, onLeft ? '>restrictive' : '>joined'); }
*iterateCatastrophicItem(item: any, others: any[], side: 'joined' | 'restrictive', t: _Transaction) { const { template, buildItem } = this.builder(item, side); let yielded = false; for (const cr of others) { const combined = buildItem(cr); const result = this.seqScanExpression.get(combined, t); if (result) { yielded = true; yield combined; } } if (!this.innerJoin && !yielded) { yield template; } }
private builder(item: any, side: 'joined' | 'restrictive') {
// if we're in an inner join, and the chosen strategy // has inverted join order, then invert built items let template: any; let buildItem: (x: any) => any; if (side === 'joined') { buildItem = x => this.buildItem(x, item); template = this.buildItem(null as any, item); } else { buildItem = x => this.buildItem(item, x); template = this.buildItem(item, null as any); } return { buildItem, template }; }
*iterateStrategyItem(item: any, strategy: JoinStrategy, t: _Transaction) {
const { template, buildItem } = this.builder(item, strategy.iterateSide);
const joinValue = strategy.onValue.get(item, t); let yielded = false; if (!nullIsh(joinValue)) { // get corresponding right value(s) for (const o of strategy.joinIndex.enumerate({ type: 'eq', key: [joinValue], t, })) {
// build item const item = buildItem(o);
// check othre predicates (in case the join has an AND statement) if (strategy.othersPredicate) { const others = strategy.othersPredicate.get(item, t); if (!others) { continue; } }
// finally, yieldvalue yielded = true; yield item; } }
if (!this.innerJoin && !yielded) { yield template; } }
buildItem(l: TLeft, r: TRight) { const ret = { '>joined': r, '>restrictive': l, [SELECT_ALL]: () => this.merge(ret), } setId(ret, `join${this.joinId}-${getId(l)}-${getId(r)}`); return ret; }
private merge(item: any) { if (!this.mergeSelect) { let sel = this.columns.map<CustomAlias>(val => ({ val })); if (this.ignoreDupes) { sel = sel.filter(t => !this.ignoreDupes?.has(t.val)); } this.mergeSelect = new Selection(this, sel); }
// nb: second argument is null... this is a hack : we KNOW it wont use the transaction. const ret = this.mergeSelect.build(item, Symbol('hack') as any); return ret; }
hasItem(value: JoinRaw<TLeft, TRight>): boolean { throw new NotSupported('lookups on joins'); }
getIndex(forValue: IValue<any>): _IIndex<any> | nil { if (this.indexCache.has(forValue)) { return this.indexCache.get(forValue); } // todo: filter using indexes of tables (index propagation)' const mapped = this.columnsMappingThisToParent.get(forValue); if (!mapped) { return null; } const originIndex = mapped.col.index; if (!originIndex) { return null; } const ret = new JoinIndex(this, originIndex, mapped.side); this.indexCache.set(forValue, ret); return ret; }
explain(e: _Explainer): _SelectExplanation { const strategy = chooseStrategy(e.transaction, this.strategies); return { id: e.idFor(this), _: 'join', restrictive: this.restrictive.explain(e), joined: this.joined.explain(e), inner: this.innerJoin, on: strategy ? { iterate: e.idFor(strategy.iterate), iterateSide: strategy.iterateSide, joinIndex: strategy.joinIndex.explain(e), matches: strategy.onValue.explain(e), ...strategy.othersPredicate ? { filtered: true } : {}, } : { seqScan: this.seqScanExpression.explain(e), }, }; }}

class JoinMapAlias implements _IAlias {

constructor(private owner: JoinSelection, private target: _IAlias, private map: string) { }
*listColumns(): Iterable<IValue<any>> { for (const c of this.target.listColumns()) { yield c.setWrapper(this.owner, x => (x as any)[this.map]); } }}
export class JoinIndex<T> implements _IIndex<T> { constructor(readonly owner: JoinSelection<T>, private base: _IIndex, private side: 'restrictive' | 'joined') { }
get expressions(): IndexExpression[] { return this.base.expressions; }
stats(t: _Transaction, key?: IndexKey): Stats | null { return null; }
iterateKeys() { return null; }
entropy(op: IndexOp): number { const strategy = this.chooseStrategy(op.t); if (!strategy) { // very high entropy (catastophic join) return this.base.entropy(op) * this.other.entropy(op.t); } // todo: multiply that by the mean count per keys in strategy.joinIndex ? return this.base.entropy(op); }
eqFirst(rawKey: IndexKey, t: _Transaction): T | null { for (const i of this.enumerate({ type: 'eq', key: rawKey, t, })) { return i; } return null; }
private chooseStrategy(t: _Transaction) { const strats = this.owner.strategies.filter(x => x.iterateSide === this.side); if (!strats.length) { return null; } return chooseStrategy(t, strats); }
private get other() { return this.side === 'joined' ? this.owner.restrictive : this.owner.joined; }
*enumerate(op: IndexOp): Iterable<T> { const strategy = this.chooseStrategy(op.t); if (strategy) { for (const i of this.base.enumerate(op)) { yield* this.owner.iterateStrategyItem(i, strategy, op.t); } } else { this.owner.db.raiseGlobal('catastrophic-join-optimization'); const all = [...this.other.enumerate(op.t)];
for (const i of this.base.enumerate(op)) { yield* this.owner.iterateCatastrophicItem(i, all, this.side, op.t); } } }

explain(e: _Explainer): _IndexExplanation { const strat = this.chooseStrategy(e.transaction); return { _: 'indexOnJoin', index: this.base.explain(e), strategy: strat?.joinIndex?.explain(e) ?? 'catastrophic', } }}