Skip to content

Commit 38b4732

Browse files
committed
Better cache restore and dump handling
1 parent 2aaf134 commit 38b4732

File tree

6 files changed

+85
-86
lines changed

6 files changed

+85
-86
lines changed

codegenerator/cli/npm/envio/index.d.ts

Lines changed: 2 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -35,26 +35,6 @@ type UnknownToOutput<T> = T extends Sury.Schema<unknown>
3535
>
3636
: T;
3737

38-
type UnknownToInput<T> = T extends Sury.Schema<unknown>
39-
? Sury.Input<T>
40-
: T extends (...args: any[]) => any
41-
? T
42-
: T extends unknown[]
43-
? { [K in keyof T]: UnknownToInput<T[K]> }
44-
: T extends { [k in keyof T]: unknown }
45-
? Flatten<
46-
{
47-
[k in keyof T as HasUndefined<UnknownToInput<T[k]>> extends true
48-
? k
49-
: never]?: UnknownToInput<T[k]>;
50-
} & {
51-
[k in keyof T as HasUndefined<UnknownToInput<T[k]>> extends true
52-
? never
53-
: k]: UnknownToInput<T[k]>;
54-
}
55-
>
56-
: T;
57-
5838
type HasUndefined<T> = [T] extends [undefined]
5939
? true
6040
: undefined extends T
@@ -92,7 +72,7 @@ type Flatten<T> = T extends object
9272
export function experimental_createEffect<
9373
IS,
9474
OS,
95-
I = UnknownToInput<IS>,
75+
I = UnknownToOutput<IS>,
9676
O = UnknownToOutput<OS>,
9777
// A hack to enforce that the inferred return type
9878
// matches the output schema type
@@ -130,6 +110,7 @@ export declare namespace S {
130110
// Don't expose recursive for now, since it's too advanced
131111
// export const recursive: typeof Sury.recursive;
132112
export const transform: typeof Sury.transform;
113+
export const shape: typeof Sury.shape;
133114
export const refine: typeof Sury.refine;
134115
export const schema: typeof Sury.schema;
135116
export const record: typeof Sury.record;

codegenerator/cli/npm/envio/index.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ exports.S = {
2222
// Don't expose recursive for now, since it's too advanced
2323
// recursive: Sury.recursive,
2424
transform: Sury.transform,
25+
shape: Sury.shape,
2526
refine: Sury.refine,
2627
schema: Sury.schema,
2728
record: Sury.record,

codegenerator/cli/npm/envio/src/Persistence.res

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ let init = {
139139
~enums=persistence.allEnums,
140140
)
141141

142-
Logging.info(`The indexer storage is ready. Restoring cache...`)
142+
Logging.info(`The indexer storage is ready. Uploading cache...`)
143143
persistence.storageStatus = Ready({
144144
cleanRun: true,
145145
cache: await loadInitialCache(persistence, ~withUpload=true),
@@ -152,7 +152,7 @@ let init = {
152152
| _ => false
153153
}
154154
) {
155-
Logging.info(`The indexer storage is initialized. Restoring cache...`)
155+
Logging.info(`The indexer storage is ready.`)
156156
persistence.storageStatus = Ready({
157157
cleanRun: false,
158158
cache: await loadInitialCache(persistence, ~withUpload=false),

codegenerator/cli/npm/envio/src/PgStorage.res

Lines changed: 74 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -475,7 +475,10 @@ let getPsqlExec = {
475475
`${binary} --version`,
476476
(~error, ~stdout as _, ~stderr as _) => {
477477
switch error {
478-
| Value(_) => resolve(Error("Failed to find psql binary"))
478+
| Value(_) =>
479+
resolve(
480+
Error(`Please check if "psql" binary is installed or docker-compose is running for the local indexer.`),
481+
)
479482
| Null => resolve(Ok(binary))
480483
}
481484
},
@@ -496,20 +499,20 @@ let getPsqlExec = {
496499
}
497500
}
498501
}
499-
let psqlExecMissingErrorMessage = `Please check if "psql" binary is installed or docker-compose is running for the local indexer.`
500502

501503
let make = (
502504
~sql: Postgres.sql,
503505
~pgHost,
504506
~pgSchema,
507+
~pgPort,
505508
~pgUser,
506509
~pgDatabase,
507510
~pgPassword,
508511
~onInitialize=?,
509512
~onNewTables=?,
510513
): Persistence.storage => {
511514
let psqlExecOptions: NodeJs.ChildProcess.execOptions = {
512-
env: Js.Dict.fromArray([("PGPASSWORD", pgPassword)]),
515+
env: Js.Dict.fromArray([("PGPASSWORD", pgPassword), ("PATH", %raw(`process.env.PATH`))]),
513516
}
514517

515518
let cacheDirPath = NodeJs.Path.resolve([
@@ -709,73 +712,79 @@ let make = (
709712
}
710713

711714
let dumpEffectCache = async () => {
712-
let cacheTableInfo: array<schemaCacheTableInfo> =
713-
(await sql
714-
->Postgres.unsafe(makeSchemaCacheTableInfoQuery(~pgSchema)))
715-
->Js.Array2.filter(i => i.count > 0)
716-
717-
if cacheTableInfo->Utils.Array.notEmpty {
718-
// Create .envio/cache directory if it doesn't exist
719-
try {
720-
await NodeJs.Fs.Promises.access(cacheDirPath)
721-
} catch {
722-
| _ =>
723-
// Create directory if it doesn't exist
724-
await NodeJs.Fs.Promises.mkdir(~path=cacheDirPath, ~options={recursive: true})
725-
}
715+
try {
716+
let cacheTableInfo: array<schemaCacheTableInfo> =
717+
(await sql
718+
->Postgres.unsafe(makeSchemaCacheTableInfoQuery(~pgSchema)))
719+
->Js.Array2.filter(i => i.count > 0)
720+
721+
if cacheTableInfo->Utils.Array.notEmpty {
722+
// Create .envio/cache directory if it doesn't exist
723+
try {
724+
await NodeJs.Fs.Promises.access(cacheDirPath)
725+
} catch {
726+
| _ =>
727+
// Create directory if it doesn't exist
728+
await NodeJs.Fs.Promises.mkdir(~path=cacheDirPath, ~options={recursive: true})
729+
}
726730

727-
// Command for testing. Run from generated
728-
// docker-compose exec -T -u postgres envio-postgres psql -d envio-dev -c 'COPY "public"."envio_effect_getTokenMetadata" TO STDOUT (FORMAT text, HEADER);' > ../.envio/cache/getTokenMetadata.tsv
731+
// Command for testing. Run from generated
732+
// docker-compose exec -T -u postgres envio-postgres psql -d envio-dev -c 'COPY "public"."envio_effect_getTokenMetadata" TO STDOUT (FORMAT text, HEADER);' > ../.envio/cache/getTokenMetadata.tsv
729733

730-
switch await getPsqlExec(~pgUser, ~pgHost) {
731-
| Ok(psqlExec) => {
732-
Logging.info(
733-
`Dumping cache: ${cacheTableInfo
734-
->Js.Array2.map(({tableName, count}) =>
735-
tableName ++ " (" ++ count->Belt.Int.toString ++ " rows)"
736-
)
737-
->Js.Array2.joinWith(", ")}`,
738-
)
734+
switch await getPsqlExec(~pgUser, ~pgHost) {
735+
| Ok(psqlExec) => {
736+
Logging.info(
737+
`Dumping cache: ${cacheTableInfo
738+
->Js.Array2.map(({tableName, count}) =>
739+
tableName ++ " (" ++ count->Belt.Int.toString ++ " rows)"
740+
)
741+
->Js.Array2.joinWith(", ")}`,
742+
)
739743

740-
let promises = cacheTableInfo->Js.Array2.map(async ({tableName}) => {
741-
let cacheName = tableName->Js.String2.sliceToEnd(~from=cacheTablePrefixLength)
742-
let outputFile =
743-
NodeJs.Path.join(cacheDirPath, cacheName ++ ".tsv")->NodeJs.Path.toString
744-
745-
let command = `${psqlExec} -h ${pgHost} -U ${pgUser} -d ${pgDatabase} -c 'COPY "${pgSchema}"."${tableName}" TO STDOUT WITH (FORMAT text, HEADER);' > ${outputFile}`
746-
747-
Promise.make((resolve, reject) => {
748-
NodeJs.ChildProcess.execWithOptions(
749-
command,
750-
psqlExecOptions,
751-
(~error, ~stdout, ~stderr as _) => {
752-
switch error {
753-
| Value(error) => reject(error)
754-
| Null => resolve(stdout)
755-
}
756-
},
757-
)
744+
let promises = cacheTableInfo->Js.Array2.map(async ({tableName}) => {
745+
let cacheName = tableName->Js.String2.sliceToEnd(~from=cacheTablePrefixLength)
746+
let outputFile =
747+
NodeJs.Path.join(cacheDirPath, cacheName ++ ".tsv")->NodeJs.Path.toString
748+
749+
let command = `${psqlExec} -h ${pgHost} -p ${pgPort->Js.Int.toString} -U ${pgUser} -d ${pgDatabase} -c 'COPY "${pgSchema}"."${tableName}" TO STDOUT WITH (FORMAT text, HEADER);' > ${outputFile}`
750+
751+
Promise.make((resolve, reject) => {
752+
NodeJs.ChildProcess.execWithOptions(
753+
command,
754+
psqlExecOptions,
755+
(~error, ~stdout, ~stderr as _) => {
756+
switch error {
757+
| Value(error) => reject(error)
758+
| Null => resolve(stdout)
759+
}
760+
},
761+
)
762+
})
758763
})
759-
})
760764

761-
let _ = await promises->Promise.all
762-
Logging.info(`Successfully dumped cache to ${cacheDirPath->NodeJs.Path.toString}`)
765+
let _ = await promises->Promise.all
766+
Logging.info(`Successfully dumped cache to ${cacheDirPath->NodeJs.Path.toString}`)
767+
}
768+
| Error(message) => Logging.error(`Failed to dump cache. ${message}`)
763769
}
764-
| Error(_) => Logging.error(`Failed to dump cache. ${psqlExecMissingErrorMessage}`)
765770
}
771+
} catch {
772+
| exn => Logging.errorWithExn(exn->Internal.prettifyExn, `Failed to dump cache.`)
766773
}
767774
}
768775

769776
let restoreEffectCache = async (~withUpload) => {
770777
if withUpload {
771778
// Try to restore cache tables from binary files
772-
let (entries, psqlExecResult) = await Promise.all2((
773-
NodeJs.Fs.Promises.readdir(cacheDirPath),
774-
getPsqlExec(~pgUser, ~pgHost),
775-
))
779+
let nothingToUploadErrorMessage = "Nothing to upload."
776780

777-
switch psqlExecResult {
778-
| Ok(psqlExec) => {
781+
switch await Promise.all2((
782+
NodeJs.Fs.Promises.readdir(cacheDirPath)
783+
->Promise.thenResolve(e => Ok(e))
784+
->Promise.catch(_ => Promise.resolve(Error(nothingToUploadErrorMessage))),
785+
getPsqlExec(~pgUser, ~pgHost),
786+
)) {
787+
| (Ok(entries), Ok(psqlExec)) => {
779788
let cacheFiles = entries->Js.Array2.filter(entry => {
780789
entry->Js.String2.endsWith(".tsv")
781790
})
@@ -799,7 +808,7 @@ let make = (
799808
->Promise.then(() => {
800809
let inputFile = NodeJs.Path.join(cacheDirPath, entry)->NodeJs.Path.toString
801810

802-
let command = `${psqlExec} -h ${pgHost} -U ${pgUser} -d ${pgDatabase} -c 'COPY "${pgSchema}"."${tableName}" FROM STDIN WITH (FORMAT text, HEADER);' < ${inputFile}`
811+
let command = `${psqlExec} -h ${pgHost} -p ${pgPort->Js.Int.toString} -U ${pgUser} -d ${pgDatabase} -c 'COPY "${pgSchema}"."${tableName}" FROM STDIN WITH (FORMAT text, HEADER);' < ${inputFile}`
803812

804813
Promise.make(
805814
(resolve, reject) => {
@@ -819,17 +828,20 @@ let make = (
819828
})
820829
->Promise.all
821830
}
822-
| Error(_) =>
823-
Logging.error(
824-
`Failed to restore cache, continuing without it. ${psqlExecMissingErrorMessage}`,
825-
)
831+
| (Error(message), _)
832+
| (_, Error(message)) =>
833+
if message === nothingToUploadErrorMessage {
834+
Logging.info("No cache found to upload.")
835+
} else {
836+
Logging.error(`Failed to upload cache, continuing without it. ${message}`)
837+
}
826838
}
827839
}
828840

829841
let cacheTableInfo: array<schemaCacheTableInfo> =
830842
await sql->Postgres.unsafe(makeSchemaCacheTableInfoQuery(~pgSchema))
831843

832-
if withUpload {
844+
if withUpload && cacheTableInfo->Utils.Array.notEmpty {
833845
switch onNewTables {
834846
| Some(onNewTables) =>
835847
await onNewTables(

codegenerator/cli/templates/static/codegen/src/Config.res

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@ let codegenPersistence = Persistence.make(
9393
~pgUser=Env.Db.user,
9494
~pgDatabase=Env.Db.database,
9595
~pgPassword=Env.Db.password,
96+
~pgPort=Env.Db.port,
9697
~onInitialize=() => {
9798
if Env.Hasura.enabled {
9899
Hasura.trackDatabase(

codegenerator/cli/templates/static/codegen/src/Env.res

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ let maxPartitionConcurrency =
1919
envSafe->EnvSafe.get("ENVIO_MAX_PARTITION_CONCURRENCY", S.int, ~fallback=10)
2020
let indexingBlockLag = envSafe->EnvSafe.get("ENVIO_INDEXING_BLOCK_LAG", S.option(S.int))
2121

22+
// We want to be able to set it to 0.0.0.0
23+
// to allow to passthrough the port from a Docker container
2224
let serverHost = envSafe->EnvSafe.get("ENVIO_INDEXER_HOST", S.string, ~fallback="localhost")
2325
let serverPort =
2426
envSafe->EnvSafe.get(
@@ -117,7 +119,7 @@ Logging.setLogger(
117119
)
118120

119121
module Db = {
120-
let host = envSafe->EnvSafe.get("ENVIO_PG_HOST", S.string, ~devFallback="envio-postgres")
122+
let host = envSafe->EnvSafe.get("ENVIO_PG_HOST", S.string, ~devFallback="localhost")
121123
let port = envSafe->EnvSafe.get("ENVIO_PG_PORT", S.int->S.port, ~devFallback=5433)
122124
let user = envSafe->EnvSafe.get("ENVIO_PG_USER", S.string, ~devFallback="postgres")
123125
let password = envSafe->EnvSafe.get(
@@ -139,6 +141,8 @@ module Db = {
139141
}
140142

141143
module Hasura = {
144+
// Disable it on HS indexer run, since we don't have Hasura credentials anyways
145+
// Also, it might be useful for some users who don't care about Hasura
142146
let enabled = envSafe->EnvSafe.get("ENVIO_HASURA", S.bool, ~fallback=true)
143147

144148
let responseLimit = switch envSafe->EnvSafe.get("ENVIO_HASURA_RESPONSE_LIMIT", S.option(S.int)) {

0 commit comments

Comments
 (0)