Skip to content

Commit

Permalink
fix: handle fetch by uri errors, add test
Browse files Browse the repository at this point in the history
  • Loading branch information
Frando committed Mar 12, 2024
1 parent 753e351 commit c1e9ba2
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 72 deletions.
130 changes: 72 additions & 58 deletions packages/repco-core/src/datasource.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,9 @@ export interface DataSource {
* @returns A `Promise` that resolves to the fetched record, or `null` if no record was found.
*/
fetchByUri(uri: string): Promise<SourceRecordForm[] | null>
fetchByUriBatch(uris: string[]): Promise<SourceRecordForm[]>
fetchByUriBatch(
uris: string[],
): Promise<{ sourceRecords: SourceRecordForm[]; errors?: Error[] }>
/**
* Determines whether the data source is capable of fetching records by UID.
*
Expand Down Expand Up @@ -115,9 +117,20 @@ export abstract class BaseDataSource implements DataSource {
return this.definition.uid
}

async fetchByUriBatch(uris: string[]): Promise<SourceRecordForm[]> {
const res = await Promise.all(uris.map((uri) => this.fetchByUri(uri)))
return res.filter(notEmpty).flat()
async fetchByUriBatch(uris: string[]) {
const errors: Error[] = []
const res = await Promise.all(
uris.map(async (uri) => {
try {
return await this.fetchByUri(uri)
} catch (err) {
errors.push(err as Error)
return []
}
}),
)
const sourceRecords = res.filter(notEmpty).flat()
return { sourceRecords, errors }
}

canFetchUri(_uid: string): boolean {
Expand All @@ -126,6 +139,24 @@ export abstract class BaseDataSource implements DataSource {
async fetchUpdates(_cursor: string | null): Promise<FetchUpdatesResult> {
return { cursor: '', records: [] }
}

async getErrors(repo: Repo, opts: GetErrorOpts = { take: 100 }) {
const where: Prisma.IngestErrorWhereInput = {
repoDid: repo.did,
datasourceUid: this.definition.uid,
}
const data = repo.prisma.ingestError.findMany({
take: opts.take,
skip: opts.skip,
where,
})
return data
}
}

export type GetErrorOpts = {
take?: number
skip?: number
}

type FailedHydrates = { err: Error; row: any }
Expand Down Expand Up @@ -161,64 +192,42 @@ export class DataSourceRegistry extends Registry<DataSource> {
// checked above
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
const ds = this.get(uid)!
const sourceRecords = await ds.fetchByUriBatch(filteredUris)
const { entities, errors } = await persistAndMapSourceRecords(
repo,
ds,
sourceRecords,
)
for (const error of errors) {
await error.persist(repo.prisma)
}
for (const e of entities) {
e.headers?.EntityUris?.forEach((uri) => found.add(uri))
try {
const { err, res } = await tryCatch(
async () => await ds.fetchByUriBatch(filteredUris),
(cause) => IngestError.atFetchUpdates(cause, repo, ds),
)
if (!res) {
throw err
}
const { sourceRecords, errors: fetchErrors } = res
if (fetchErrors) {
for (const cause of fetchErrors) {
const error = IngestError.atFetchUpdates(cause, repo, ds)
await error.persist(repo.prisma)
}
}
const { entities, errors } = await persistAndMapSourceRecords(
repo,
ds,
sourceRecords,
)
for (const error of errors) {
await error.persist(repo.prisma)
}
for (const e of entities) {
e.headers?.EntityUris?.forEach((uri) => found.add(uri))
}
fetched.push(...entities)
} catch (err) {
if (!(err instanceof IngestError)) throw err
await err.persist(repo.prisma)
}
fetched.push(...entities)
}
for (const uri of uris) {
if (!found.has(uri)) notFound.add(uri)
}
return { fetched, notFound: Array.from(notFound) }

//
// let found = false
// for (const datasource of matchingSources) {
// try {
// const sourceRecords = await datasource.fetchByUri(uri)
// if (sourceRecords && sourceRecords.length) {
// const entities = await mapAndPersistSourceRecord(
// repo,
// datasource,
// sourceRecords,
// )
// fetched.push(...entities)
// found = true
// break
// }
// } catch (err) {
// // The datasource failed to fetch the entity.
// // Log the error and proceed.
// const fail = {
// uri,
// datasourceUid: datasource.definition.uid,
// timestamp: new Date(),
// errorMessage: (err as Error).message,
// errorDetails: errToSerializable(err as Error),
// }
// await repo.prisma.failedDatasourceFetches.upsert({
// create: { ...fail },
// update: { ...fail },
// where: {
// uri_datasourceUid: {
// uri: fail.uri,
// datasourceUid: fail.datasourceUid,
// },
// },
// })
// }
// }
// if (!found) notFound.push(uri)
// }
}

registerFromPlugins(
Expand Down Expand Up @@ -363,7 +372,7 @@ export class IngestError extends Error {
cause: any,
repo: Repo,
datasource: DataSource,
cursor: string | null,
cursor?: string | null,
) {
return new IngestError({
scope: IngestErrorScope.FetchUpdates,
Expand Down Expand Up @@ -415,6 +424,10 @@ export class IngestError extends Error {
}
}
const id = createRandomId()
const details: any = {
stack: this.cause?.stack || this.stack,
nextCursor: this.nextCursor,
}
const data = {
id,
repoDid: this.repoDid,
Expand All @@ -424,7 +437,7 @@ export class IngestError extends Error {
sourceRecordId: this.sourceRecordId,
timestamp: this.timestamp,
errorMessage: this.toString(),
errorDetails: JSON.stringify(this),
errorDetails: details,
}
await prisma.ingestError.create({ data })
}
Expand Down Expand Up @@ -546,6 +559,7 @@ export async function ingestUpdatesFromDataSourceAtCursor(
await repo.saveBatch(entities) // TODO: Agent
return { nextCursor, records, entities, errors }
} catch (cause) {
console.log('saveBatch failure', cause)
const err = IngestError.atSaveBatch(
cause,
repo,
Expand Down
33 changes: 25 additions & 8 deletions packages/repco-core/src/datasources/cba.ts
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,9 @@ export class CbaDataSource implements DataSource {
}

// Fetch a list of URIs in batches
async fetchByUriBatch(uris: string[]): Promise<SourceRecordForm[]> {
async fetchByUriBatch(
uris: string[],
): Promise<{ sourceRecords: SourceRecordForm[]; errors?: Error[] }> {
// Map of (source record type) -> (batch endpoint)
const batchEndpoints: Record<string, string> = {
post: 'post',
Expand Down Expand Up @@ -165,12 +167,14 @@ export class CbaDataSource implements DataSource {
// Map each endpoint bucket to list of promises.
// Each promise in the list fetches a slice of the overall URLs (to respect page limits)
// Collect the list of all bucket promises in a single promise.
const batchedPromises: Promise<SourceRecordForm[][]> = Promise.all(
const batchedPromises: Promise<
{ sourceRecords: SourceRecordForm[]; errors: any[] }[]
> = Promise.all(
Object.entries(buckets).map(async ([endpoint, { type, ids }]) => {
try {
let idx = 0
const pageLimit = this.config.pageLimit
const res = []
const res: SourceRecordForm[] = []
while (idx < ids.length) {
const slice = ids.slice(idx, idx + pageLimit)
idx += slice.length
Expand All @@ -191,18 +195,25 @@ export class CbaDataSource implements DataSource {
}),
)
}
return res
return { sourceRecords: res, errors: [] }
} catch (error) {
// TODO: Persist the error.
log.warn({ msg: `CBA datasource failure`, error })
return []
return { errors: [error as Error], sourceRecords: [] }
}
}),
)
// Map the URIs without batch support to a promise that resolves this URI.
// Combine them all in a single promise.
const unbatchedPromises = Promise.all(
unbatched.map((uri) => this.fetchByUri(uri)),
unbatched.map(async (uri) => {
try {
const sourceRecords = await this.fetchByUri(uri)
return { sourceRecords, errors: [] }
} catch (err) {
return { errors: [err], sourceRecords: [] }
}
}),
)

// Resolve all pending promises-of-promises in a single call
Expand All @@ -211,8 +222,14 @@ export class CbaDataSource implements DataSource {
unbatchedPromises,
])

// Flatten the paged arrays
return [...batchedRes.flat(), ...unbatchedRes.flat()]
const allRes = [...batchedRes, ...unbatchedRes]
const errors: Error[] = []
const sourceRecords: SourceRecordForm[] = []
for (const row of allRes) {
errors.push(...row.errors.filter(notEmpty))
sourceRecords.push(...row.sourceRecords)
}
return { sourceRecords, errors }
}

async fetchByUri(uri: string): Promise<SourceRecordForm[]> {
Expand Down
1 change: 1 addition & 0 deletions packages/repco-core/src/repo/relation-finder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ export class RelationFinder {
const res = stack.map((uid) => this.entities.get(uid)!)
return res
} catch (err) {
console.log('RESOLVE ERROR', err)
if (err instanceof GGraphError) {
console.error('circular relation', {
from: this.entities.get(err.id),
Expand Down
24 changes: 18 additions & 6 deletions packages/repco-core/test/datasource.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ class TestDataSource extends BaseDataSource implements DataSource {
mapUppercase = false
insertMissing = false
resolveMissing = false
failFetch = false
failMap = false

get definition(): DataSourceDefinition {
return {
Expand All @@ -58,6 +60,9 @@ class TestDataSource extends BaseDataSource implements DataSource {
}

async fetchUpdates(cursor: string | null): Promise<FetchUpdatesResult> {
if (this.failFetch) {
throw new Error('Failed to fetch')
}
if (cursor === '1') {
return { cursor, records: [] }
}
Expand All @@ -83,6 +88,9 @@ class TestDataSource extends BaseDataSource implements DataSource {
}
}
async fetchByUri(uid: string): Promise<SourceRecordForm[] | null> {
if (this.failFetch) {
throw new Error('Failed to fetch')
}
if (uid === 'urn:test:file:1') {
return [
intoSourceRecord({
Expand Down Expand Up @@ -124,6 +132,9 @@ class TestDataSource extends BaseDataSource implements DataSource {
}

async mapSourceRecord(record: SourceRecordForm): Promise<EntityForm[]> {
if (this.failMap) {
throw new Error('Cannot parse data')
}
const form = JSON.parse(record.body) as EntityForm
if (this.mapUppercase) {
if (form.type === 'ContentItem') {
Expand Down Expand Up @@ -198,8 +209,7 @@ test('remap', async (assert) => {
assert.is(entitiesAfter[0].title, 'TEST1')
})

// TODO: This is not working with batching right now.
test.skip('failed fetches', async (assert) => {
test('failed fetches', async (assert) => {
const prisma = await setup(assert)
const repo = await repoRegistry.create(prisma, 'test')

Expand All @@ -210,15 +220,17 @@ test.skip('failed fetches', async (assert) => {
datasource.insertMissing = true

{
const _res = await ingestUpdatesFromDataSource(repo, datasource, false)
const res = await ingestUpdatesFromDataSource(repo, datasource, false)
console.log('res', res)
const fails = await datasource.getErrors(repo)
console.log('fails', fails)
assert.is(fails.length, 1)
// assert.is(res.count, 1)
const revisions = await repo.prisma.revision.count()
assert.is(revisions, 3)
const entities = await repo.prisma.entity.count()
assert.is(entities, 3)
const fails = await repo.prisma.failedDatasourceFetches.findMany()
assert.is(fails.length, 1)
assert.is(fails[0].uri, 'urn:test:media:fail')
// assert.is(fails[0].uri, 'urn:test:media:fail')
}

datasource.resolveMissing = true
Expand Down

0 comments on commit c1e9ba2

Please sign in to comment.