-
Notifications
You must be signed in to change notification settings - Fork 1
/
asketic-orient-sync.ts
100 lines (86 loc) · 3.09 KB
/
asketic-orient-sync.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
import * as _ from 'lodash';
import { Node } from 'ancient-mixins/lib/node';
import { asket, IQueryResolver } from 'ancient-asket/lib/asket';
import { OrientSync } from './orient-sync';
class AsketicOrientSync extends Node {
constructor(
public db,
) {
super();
}
resolver() {
const resolver: IQueryResolver = (flow) => new Promise((resolve) => {
if (flow.name === 'select') {
const from = _.get(flow, 'schema.options.from');
const where = _.get(flow, 'schema.options.where');
if (
_.isObject(flow.data) || !(from && where)
) {
_.set(flow, 'schema.fields.@rid', {});
const rid = _.toString(_.get(flow, 'data.@rid'));
const env = rid ? { ...flow.env, rid, path: [ ...flow.env.path, { '@rid': rid } ] } : flow.env;
resolve({ ...flow, env });
} else {
const parentSync = _.get(flow, 'env.sync');
const sync = new OrientSync(this.db, from, where);
const path = [ ...flow.env.path, flow.key ];
sync.on('destroyed', () => {
this.emit('unsync', { flow, path, sync });
sync.unsubscribe()
});
if (parentSync) {
parentSync.on('destroyed', () => sync.destroy());
parentSync.on('removed', ({ rid }) => rid == flow.env.rid && sync.destroy());
parentSync.on('changed', ({ rid }) => rid == flow.env.rid && sync.destroy());
}
sync.select((records) => {
sync.rids = _.map(records, d => d['@rid'].toString());
this.emit('selected', { flow, path, sync });
sync.resubscribe();
sync.on('added', ({ rid, record }) => {
asket({
resolver,
query: { schema: flow.schema },
data: record,
env: { ...flow.env, rid, path: [ ...path, rid ], }
}).then(({ data }) => {
this.emit('added', { flow, rid, path, sync, data });
});
});
sync.on('changed', ({ rid, record }) => {
asket({
resolver,
query: { schema: flow.schema },
data: record,
env: { ...flow.env, rid, path: [ ...path, rid ], }
}).then(({ data }) => {
this.emit('changed', { flow, rid, path, sync, data });
});
});
sync.on('removed', ({ rid, record }) => {
this.emit('removed', { flow, rid, path, sync });
});
resolve({ ...flow, env: { ...flow.env, sync, path }, data: records });
});
}
} else if (flow.name === '@rid') {
_.unset(flow, 'schema.fields');
resolve({ ...flow, data: _.toString(flow.data) });
} else {
resolve({ ...flow, env: { ...flow.env, path: [ ...flow.env.path, flow.key ] }});
resolve(flow);
}
});
return resolver;
}
ask(query) {
return asket({
resolver: this.resolver(),
query,
env: { path: [], },
});
}
}
export {
AsketicOrientSync,
}