Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: continue ingest after errors #103

Merged
merged 9 commits into from
Mar 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 92 additions & 4 deletions packages/repco-cli/src/commands/ds.ts
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,91 @@ export const ingest = createCommand({
},
})

export const errors = createCommand({
name: 'errors',
help: 'Show ingest error log',
arguments: [],
options: {
repo: {
type: 'string',
required: true,
short: 'r',
help: 'Repo name or DID',
},
ds: {
type: 'string',
required: false,
short: 'd',
help: 'Datasource UID (optional)',
},
offset: {
type: 'string',
short: 'o',
help: 'Offset from latest entries',
default: '0',
},
count: {
type: 'string',
short: 'o',
help: 'Number of entries to show',
default: '100',
},
stack: {
type: 'boolean',
short: 's',
help: 'Show stack trace',
},
json: {
type: 'boolean',
short: 'j',
help: 'Print as JSON',
},
},
async run(opts, _args) {
try {
if (!opts.repo) {
throw new Error('Repo name or did required with -r option.')
}
const query = new URLSearchParams()
if (opts.offset) {
query.set('offset', opts.offset)
}
if (opts.count) {
query.set('count', opts.count)
}
if (opts.ds) {
query.set('datasource', opts.ds)
}
const res = (await request(`/repo/${opts.repo}/ds/errors?${query}`, {
method: 'GET',
})) as any
if (opts.json) {
console.log(JSON.stringify(res.data))
} else {
for (const row of res.data) {
console.log('repo: ', row.repoDid)
console.log('datasource: ', row.datasourceUid)
console.log('timestamp: ', row.timestamp)
console.log('kind: ', row.kind)
console.log('error: ', row.errorMessage)
for (const cause of row.errorDetails?.causes || []) {
if (cause === row.errorMessage) continue
console.log(' caused by: ', cause)
}
console.log('cursor: ', JSON.parse(row.cursor))
console.log('sourcerecord:', row.sourceRecordId)
if (opts.stack) {
console.log('stack: ', row.errorDetails.stack)
}
console.log('---')
}
}
} catch (err) {
console.error(err)
}
},
})

export const remap = createCommand({
name: 'remap',
help: 'Remap all content from a datasource',
Expand All @@ -150,9 +235,12 @@ export const remap = createCommand({
if (!opts.repo) {
throw new Error('Repo name or did required with -r option.')
}
const res = (await request(`/repo/${opts.repo}/ds/${args.datasource}`, {
method: 'GET',
})) as any
const res = (await request(
`/repo/${opts.repo}/ds/${args.datasource}/remap`,
{
method: 'GET',
},
)) as any
console.log(res.result)
} catch (err) {
console.error('Error remapping datasource:', err)
Expand All @@ -163,5 +251,5 @@ export const remap = createCommand({
export const command = createCommandGroup({
name: 'ds',
help: 'Manage datasources',
commands: [add, list, ingest, listPlugins, remap],
commands: [add, list, ingest, listPlugins, remap, errors],
})
18 changes: 9 additions & 9 deletions packages/repco-cli/src/commands/run.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,18 +68,18 @@ function ingestAll(prisma: PrismaClient) {
const ingester = new Ingester(defaultDataSourcePlugins, repo)
ingesters.push(ingester)
const queue = ingester.workLoop()
for await (const result of queue) {
if ('error' in result) {
let lastCursor
for await (const outcome of queue) {
if (outcome.didFail()) {
log.error({
error: result.error,
message: `ingest ${result.uid} ERROR: ${result.error}`,
error: outcome.error,
msg: `ingest ${outcome.uid} failed: ${outcome.error?.toString()}`,
})
} else {
const cursor =
'cursor' in result && result.cursor
? JSON.parse(result.cursor).pageNumber
: ''
log.debug(`ingest ${result.uid}: ${result.ok} ${cursor}`)
if (outcome.cursor != lastCursor) {
lastCursor = outcome.cursor
log.debug(`ingest ${outcome.uid}: now at ${outcome.cursor}`)
}
}
}
}
Expand Down
Loading
Loading