Skip to content

Commit 36cd685

Browse files
authored
Effect Calls Cache (#633)
* Use persistence module in load layer instead of storage * The most hacky cache persistence wip * Make the cache work for uni v4 locally * Improve cache dump logic * Track Effect Cache size * Fix cache count retrieval * Use TSV, await dump and restore, upload cache only on the clean start, support dev console sync cache * Expose GraphQl endpoint in TUI * Dynamically choose psql exec * Allow -alpha sufixed versions * Fix wrong assumption about schema existence * Call psql with host, user and password * Fixes * More env variables * Better cache restore and dump handling * Don't dump hash on HS, allow cache minification via schema, fix psql for local dev * Fix create effect schema internally * Enable BigInt json serialization for effect output * Use default host from express * Remove debug logs from init * Write cache once per batch * Fix effect cache performance downsides * Use postgres:17.5 * Fix tests * Fixes after review
1 parent 5533b68 commit 36cd685

39 files changed

+1045
-280
lines changed

codegenerator/cli/npm/envio/index.d.ts

Lines changed: 4 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import type {
1313
} from "./src/Envio.gen.ts";
1414

1515
import { schema as bigDecimalSchema } from "./src/bindings/BigDecimal.gen.ts";
16+
import { schema as bigintSchema } from "./src/bindings/BigInt.gen.ts";
1617
import * as Sury from "rescript-schema";
1718

1819
type UnknownToOutput<T> = T extends Sury.Schema<unknown>
@@ -35,26 +36,6 @@ type UnknownToOutput<T> = T extends Sury.Schema<unknown>
3536
>
3637
: T;
3738

38-
type UnknownToInput<T> = T extends Sury.Schema<unknown>
39-
? Sury.Input<T>
40-
: T extends (...args: any[]) => any
41-
? T
42-
: T extends unknown[]
43-
? { [K in keyof T]: UnknownToInput<T[K]> }
44-
: T extends { [k in keyof T]: unknown }
45-
? Flatten<
46-
{
47-
[k in keyof T as HasUndefined<UnknownToInput<T[k]>> extends true
48-
? k
49-
: never]?: UnknownToInput<T[k]>;
50-
} & {
51-
[k in keyof T as HasUndefined<UnknownToInput<T[k]>> extends true
52-
? never
53-
: k]: UnknownToInput<T[k]>;
54-
}
55-
>
56-
: T;
57-
5839
type HasUndefined<T> = [T] extends [undefined]
5940
? true
6041
: undefined extends T
@@ -92,7 +73,7 @@ type Flatten<T> = T extends object
9273
export function experimental_createEffect<
9374
IS,
9475
OS,
95-
I = UnknownToInput<IS>,
76+
I = UnknownToOutput<IS>,
9677
O = UnknownToOutput<OS>,
9778
// A hack to enforce that the inferred return type
9879
// matches the output schema type
@@ -121,7 +102,7 @@ export declare namespace S {
121102
export const boolean: typeof Sury.boolean;
122103
export const int32: typeof Sury.int32;
123104
export const number: typeof Sury.number;
124-
export const bigint: typeof Sury.bigint;
105+
export const bigint: typeof bigintSchema;
125106
export const never: typeof Sury.never;
126107
export const union: typeof Sury.union;
127108
export const object: typeof Sury.object;
@@ -130,6 +111,7 @@ export declare namespace S {
130111
// Don't expose recursive for now, since it's too advanced
131112
// export const recursive: typeof Sury.recursive;
132113
export const transform: typeof Sury.transform;
114+
export const shape: typeof Sury.shape;
133115
export const refine: typeof Sury.refine;
134116
export const schema: typeof Sury.schema;
135117
export const record: typeof Sury.record;

codegenerator/cli/npm/envio/index.js

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ exports.S = {
1313
boolean: Sury.boolean,
1414
int32: Sury.int32,
1515
number: Sury.number,
16-
bigint: Sury.bigint,
16+
bigint: require("./src/bindings/BigInt.res.js").schema,
1717
never: Sury.never,
1818
union: Sury.union,
1919
object: Sury.object,
@@ -22,6 +22,7 @@ exports.S = {
2222
// Don't expose recursive for now, since it's too advanced
2323
// recursive: Sury.recursive,
2424
transform: Sury.transform,
25+
shape: Sury.shape,
2526
refine: Sury.refine,
2627
schema: Sury.schema,
2728
record: Sury.record,

codegenerator/cli/npm/envio/src/Envio.res

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@ let experimental_createEffect = (
4242
handler: effectArgs<'input> => promise<'output>,
4343
) => {
4444
Prometheus.EffectCallsCount.set(~callsCount=0, ~effectName=options.name)
45+
let outputSchema =
46+
S.schema(_ => options.output)->(Utils.magic: S.t<S.t<'output>> => S.t<Internal.effectOutput>)
4547
{
4648
name: options.name,
4749
handler: handler->(
@@ -50,6 +52,28 @@ let experimental_createEffect = (
5052
>
5153
),
5254
callsCount: 0,
53-
cache: options.cache->Belt.Option.getWithDefault(false),
55+
// This is the way to make the createEffect API
56+
// work without the need for users to call S.schema themselves,
57+
// but simply pass the desired object/tuple/etc.
58+
// If they pass a schem, it'll also work.
59+
input: S.schema(_ => options.input)->(
60+
Utils.magic: S.t<S.t<'input>> => S.t<Internal.effectInput>
61+
),
62+
output: outputSchema,
63+
cache: switch options.cache {
64+
| Some(true) =>
65+
let itemSchema = S.schema((s): Internal.effectCacheItem => {
66+
id: s.matches(S.string),
67+
output: s.matches(outputSchema),
68+
})
69+
Some({
70+
table: Internal.makeCacheTable(~effectName=options.name),
71+
rowsSchema: S.array(itemSchema),
72+
itemSchema,
73+
})
74+
| None
75+
| Some(false) =>
76+
None
77+
},
5478
}->(Utils.magic: Internal.effect => effect<'input, 'output>)
5579
}

codegenerator/cli/npm/envio/src/Internal.res

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -194,12 +194,31 @@ type effectArgs = {
194194
context: effectContext,
195195
cacheKey: string,
196196
}
197+
type effectCacheItem = {id: string, output: effectOutput}
198+
type effectCacheMeta = {
199+
itemSchema: S.t<effectCacheItem>,
200+
rowsSchema: S.t<array<effectCacheItem>>,
201+
table: Table.table,
202+
}
197203
type effect = {
198204
name: string,
199205
handler: effectArgs => promise<effectOutput>,
200-
cache: bool,
206+
cache: option<effectCacheMeta>,
207+
output: S.t<effectOutput>,
208+
input: S.t<effectInput>,
201209
mutable callsCount: int,
202210
}
211+
let cacheTablePrefix = "envio_effect_"
212+
let makeCacheTable = (~effectName) => {
213+
Table.mkTable(
214+
cacheTablePrefix ++ effectName,
215+
~fields=[
216+
Table.mkField("id", Text, ~fieldSchema=S.string, ~isPrimaryKey=true),
217+
Table.mkField("output", JsonB, ~fieldSchema=S.json(~validate=false)),
218+
],
219+
~compositeIndices=[],
220+
)
221+
}
203222

204223
@genType.import(("./Types.ts", "Invalid"))
205224
type noEventFilters

codegenerator/cli/npm/envio/src/Persistence.res

Lines changed: 97 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,12 @@
55
// Currently there are quite many code spread across
66
// DbFunctions, Db, Migrations, InMemoryStore modules which use codegen code directly.
77

8-
// The type reflects an effect cache table in the db
8+
// The type reflects an cache table in the db
99
// It might be present even if the effect is not used in the application
10-
type effectCache = {
11-
name: string,
10+
type effectCacheRecord = {
11+
effectName: string,
1212
// Number of rows in the table
13-
mutable size: int,
14-
// Lazily attached table definition when effect is used in the application
15-
mutable table: option<Table.table>,
13+
mutable count: int,
1614
}
1715

1816
type operator = [#">" | #"="]
@@ -28,7 +26,6 @@ type storage = {
2826
~generalTables: array<Table.table>=?,
2927
~enums: array<Internal.enumConfig<Internal.enum>>=?,
3028
) => promise<unit>,
31-
loadEffectCaches: unit => promise<array<effectCache>>,
3229
@raises("StorageError")
3330
loadByIdsOrThrow: 'item. (
3431
~ids: array<string>,
@@ -50,14 +47,26 @@ type storage = {
5047
~table: Table.table,
5148
~itemSchema: S.t<'item>,
5249
) => promise<unit>,
50+
@raises("StorageError")
51+
setEffectCacheOrThrow: (
52+
~effect: Internal.effect,
53+
~items: array<Internal.effectCacheItem>,
54+
~initialize: bool,
55+
) => promise<unit>,
56+
// This is to download cache from the database to .envio/cache
57+
dumpEffectCache: unit => promise<unit>,
58+
// This is not good, but the function does two things:
59+
// - Gets info about existing cache tables
60+
// - if withUpload is true, it also populates the cache from .envio/cache to the database
61+
restoreEffectCache: (~withUpload: bool) => promise<array<effectCacheRecord>>,
5362
}
5463

5564
exception StorageError({message: string, reason: exn})
5665

5766
type storageStatus =
5867
| Unknown
5968
| Initializing(promise<unit>)
60-
| Ready({cleanRun: bool, effectCaches: dict<effectCache>})
69+
| Ready({cleanRun: bool, cache: dict<effectCacheRecord>})
6170

6271
type t = {
6372
userEntities: array<Internal.entityConfig>,
@@ -66,7 +75,6 @@ type t = {
6675
allEnums: array<Internal.enumConfig<Internal.enum>>,
6776
mutable storageStatus: storageStatus,
6877
storage: storage,
69-
onStorageInitialize: option<unit => promise<unit>>,
7078
}
7179

7280
let entityHistoryActionEnumConfig: Internal.enumConfig<EntityHistory.RowAction.t> = {
@@ -83,7 +91,6 @@ let make = (
8391
~allEnums,
8492
~staticTables,
8593
~storage,
86-
~onStorageInitialize=?,
8794
) => {
8895
let allEntities = userEntities->Js.Array2.concat([dcRegistryEntityConfig])
8996
let allEnums =
@@ -95,62 +102,70 @@ let make = (
95102
allEnums,
96103
storageStatus: Unknown,
97104
storage,
98-
onStorageInitialize,
99105
}
100106
}
101107

102-
let init = async (persistence, ~reset=false) => {
103-
try {
104-
let shouldRun = switch persistence.storageStatus {
105-
| Unknown => true
106-
| Initializing(promise) => {
107-
await promise
108-
reset
109-
}
110-
| Ready(_) => reset
111-
}
112-
if shouldRun {
113-
let resolveRef = ref(%raw(`null`))
114-
let promise = Promise.make((resolve, _) => {
115-
resolveRef := resolve
116-
})
117-
persistence.storageStatus = Initializing(promise)
118-
if reset || !(await persistence.storage.isInitialized()) {
119-
let _ = await persistence.storage.initialize(
120-
~entities=persistence.allEntities,
121-
~generalTables=persistence.staticTables,
122-
~enums=persistence.allEnums,
123-
)
108+
let init = {
109+
let loadInitialCache = async (persistence, ~withUpload) => {
110+
let effectCacheRecords = await persistence.storage.restoreEffectCache(~withUpload)
111+
let cache = Js.Dict.empty()
112+
effectCacheRecords->Js.Array2.forEach(record => {
113+
Prometheus.EffectCacheCount.set(~count=record.count, ~effectName=record.effectName)
114+
cache->Js.Dict.set(record.effectName, record)
115+
})
116+
cache
117+
}
124118

125-
persistence.storageStatus = Ready({
126-
cleanRun: true,
127-
effectCaches: Js.Dict.empty(),
128-
})
129-
switch persistence.onStorageInitialize {
130-
| Some(onStorageInitialize) => await onStorageInitialize()
131-
| None => ()
119+
async (persistence, ~reset=false) => {
120+
try {
121+
let shouldRun = switch persistence.storageStatus {
122+
| Unknown => true
123+
| Initializing(promise) => {
124+
await promise
125+
reset
132126
}
133-
} else if (
134-
// In case of a race condition,
135-
// we want to set the initial status to Ready only once.
136-
switch persistence.storageStatus {
137-
| Initializing(_) => true
138-
| _ => false
139-
}
140-
) {
141-
let effectCaches = Js.Dict.empty()
142-
(await persistence.storage.loadEffectCaches())->Js.Array2.forEach(effectCache => {
143-
effectCaches->Js.Dict.set(effectCache.name, effectCache)
144-
})
145-
persistence.storageStatus = Ready({
146-
cleanRun: false,
147-
effectCaches,
127+
| Ready(_) => reset
128+
}
129+
if shouldRun {
130+
let resolveRef = ref(%raw(`null`))
131+
let promise = Promise.make((resolve, _) => {
132+
resolveRef := resolve
148133
})
134+
persistence.storageStatus = Initializing(promise)
135+
if reset || !(await persistence.storage.isInitialized()) {
136+
Logging.info(`Initializing the indexer storage...`)
137+
138+
await persistence.storage.initialize(
139+
~entities=persistence.allEntities,
140+
~generalTables=persistence.staticTables,
141+
~enums=persistence.allEnums,
142+
)
143+
144+
Logging.info(`The indexer storage is ready. Uploading cache...`)
145+
persistence.storageStatus = Ready({
146+
cleanRun: true,
147+
cache: await loadInitialCache(persistence, ~withUpload=true),
148+
})
149+
} else if (
150+
// In case of a race condition,
151+
// we want to set the initial status to Ready only once.
152+
switch persistence.storageStatus {
153+
| Initializing(_) => true
154+
| _ => false
155+
}
156+
) {
157+
Logging.info(`The indexer storage is ready.`)
158+
persistence.storageStatus = Ready({
159+
cleanRun: false,
160+
cache: await loadInitialCache(persistence, ~withUpload=false),
161+
})
162+
}
163+
resolveRef.contents()
149164
}
150-
resolveRef.contents()
165+
} catch {
166+
| exn =>
167+
exn->ErrorHandling.mkLogAndRaise(~msg=`EE800: Failed to initialize the indexer storage.`)
151168
}
152-
} catch {
153-
| exn => exn->ErrorHandling.mkLogAndRaise(~msg=`EE800: Failed to initialize the indexer storage.`)
154169
}
155170
}
156171

@@ -162,3 +177,27 @@ let getInitializedStorageOrThrow = persistence => {
162177
| Ready(_) => persistence.storage
163178
}
164179
}
180+
181+
let setEffectCacheOrThrow = async (persistence, ~effect: Internal.effect, ~items) => {
182+
switch persistence.storageStatus {
183+
| Unknown
184+
| Initializing(_) =>
185+
Js.Exn.raiseError(`Failed to access the indexer storage. The Persistence layer is not initialized.`)
186+
| Ready({cache}) => {
187+
let storage = persistence.storage
188+
let effectName = effect.name
189+
let effectCacheRecord = switch cache->Utils.Dict.dangerouslyGetNonOption(effectName) {
190+
| Some(c) => c
191+
| None => {
192+
let c = {effectName, count: 0}
193+
cache->Js.Dict.set(effectName, c)
194+
c
195+
}
196+
}
197+
let initialize = effectCacheRecord.count === 0
198+
await storage.setEffectCacheOrThrow(~effect, ~items, ~initialize)
199+
effectCacheRecord.count = effectCacheRecord.count + items->Js.Array2.length
200+
Prometheus.EffectCacheCount.set(~count=effectCacheRecord.count, ~effectName)
201+
}
202+
}
203+
}

0 commit comments

Comments
 (0)