Skip to content

Commit

Permalink
Remapping and ingestion of datasources triggered from api now using r…
Browse files Browse the repository at this point in the history
…epco-server
  • Loading branch information
flipsimon committed Nov 20, 2023
1 parent 95f1947 commit 5f0cfb8
Show file tree
Hide file tree
Showing 2 changed files with 153 additions and 77 deletions.
139 changes: 80 additions & 59 deletions packages/repco-cli/src/commands/ds.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,10 @@ import pc from 'picocolors'
import {

Check warning on line 3 in packages/repco-cli/src/commands/ds.ts

View workflow job for this annotation

GitHub Actions / build-and-test

Replace `⏎··defaultDataSourcePlugins·as·plugins,⏎··Ingester,⏎··repoRegistry,⏎` with `·defaultDataSourcePlugins·as·plugins·`
defaultDataSourcePlugins as plugins,
Ingester,

Check warning on line 5 in packages/repco-cli/src/commands/ds.ts

View workflow job for this annotation

GitHub Actions / build-and-test

'Ingester' is defined but never used. Allowed unused vars must match /^_/u
remapDataSource,
repoRegistry,

Check warning on line 6 in packages/repco-cli/src/commands/ds.ts

View workflow job for this annotation

GitHub Actions / build-and-test

'repoRegistry' is defined but never used. Allowed unused vars must match /^_/u
} from 'repco-core'
import { request } from '../client.js'
import { CliError, createCommand, createCommandGroup } from '../parse.js'
import { createCommand, createCommandGroup } from '../parse.js'

export const listPlugins = createCommand({
name: 'list-plugins',
Expand All @@ -27,35 +26,43 @@ export const listPlugins = createCommand({
export const list = createCommand({
name: 'list',
help: 'Show configured datasources in a repo',
arguments: [{ name: 'repo', required: true, help: 'DID or name of repo' }],
options: {
repo: { type: 'string', short: 'r', help: 'Repo name or DID' },
json: { type: 'boolean', short: 'j', help: 'Output as JSON' },
},
async run(opts) {
const repo = await repoRegistry.openWithDefaults(opts.repo)
await repo.dsr.hydrate(repo.prisma, plugins, repo.did)
const data = repo.dsr
.all()
.map((ds) => ({ ...ds.definition, config: ds.config }))
if (opts.json) {
console.log(JSON.stringify(data))
} else {
for (const row of data) {
const data = []
for (const [key, value] of Object.entries(row)) {
let stringValue = value
if (key === 'config') stringValue = JSON.stringify(value)
data.push([key, stringValue])
async run(opts, args) {
try {
const res = (await request(`/repo/${args.repo}/ds`, {
method: 'GET',
})) as any
const data: {
config: any
uid: string
name: string
pluginUid: string
}[] = res.data
if (opts.json) {
console.log(JSON.stringify(data))
} else {
for (const row of data) {
const data = []
for (const [key, value] of Object.entries(row)) {
let stringValue = value
if (key === 'config') stringValue = JSON.stringify(value)
data.push([key, stringValue])
}
const max = data.reduce((sum, [k]) => Math.max(k.length, sum), 0)
const table = new Table({
wordWrap: true,
wrapOnWordBoundary: false,
colWidths: [max + 2, process.stdout.columns - 6 - max],
})
table.push(...data)
console.log(table.toString())
}
const max = data.reduce((sum, [k]) => Math.max(k.length, sum), 0)
const table = new Table({
wordWrap: true,
wrapOnWordBoundary: false,
colWidths: [max + 2, process.stdout.columns - 6 - max],
})
table.push(...data)
console.log(table.toString())
}
} catch (err) {
console.error('Error listing all repos', err)
}
},
})
Expand All @@ -76,18 +83,22 @@ export const add = createCommand({
},
},
async run(opts, args) {
if (!opts.repo) {
throw new Error(
'Either --repo option or REPCO_REPO environment variable is required.',
try {
if (!opts.repo) {
throw new Error(
'Either --repo option or REPCO_REPO environment variable is required.',
)
}
const res = (await request(`/repo/${opts.repo}/ds`, {
method: 'POST',
body: { pluginUid: args.plugin, config: args.config },
})) as any
console.log(
`Created datasource ${res.uid} in repo ${opts.repo} for plugin ${res.pluginUid}`,
)
} catch (err) {
console.error('Error adding datasource: ', err)
}
const res = (await request(`/repo/${opts.repo}/ds`, {
method: 'POST',
body: { pluginUid: args.plugin, config: args.config },
})) as any
console.log(
`Created datasource ${res.uid} in repo ${opts.repo} for plugin ${res.pluginUid}`,
)
},
})

Expand All @@ -96,7 +107,12 @@ export const ingest = createCommand({
help: 'Ingest content from datasources',
arguments: [],
options: {
repo: { type: 'string', short: 'r', help: 'Repo name or DID' },
repo: {
type: 'string',
required: true,
short: 'r',
help: 'Repo name or DID',
},
ds: {
type: 'string',
required: false,
Expand All @@ -106,22 +122,17 @@ export const ingest = createCommand({
loop: { type: 'boolean', short: 'l', help: 'Keep running in a loop' },
},
async run(opts, _args) {
const repo = await repoRegistry.openWithDefaults(opts.repo)
const ingester = new Ingester(plugins, repo)
if (opts.loop) {
const queue = ingester.workLoop()
for await (const result of queue) {
console.log(result)
}
} else {
if (!opts.ds) {
const result = await ingester.ingestAll()
console.log(result)
} else {
console.log('Ingesting datasource ' + opts.ds)
const result = await ingester.ingest(opts.ds)
console.log(result)
try {
if (!opts.repo) {
throw new Error('Repo name or did required with -r option.')
}
const res = (await request(`/repo/${opts.repo}/ds/ingest`, {
method: 'POST',
body: { ds: opts.ds, loop: opts.loop },
})) as any
console.log(res.result)
} catch (err) {
console.error('Error ingesting from datasource: ', err)
}
},
})
Expand All @@ -130,16 +141,26 @@ export const remap = createCommand({
name: 'remap',
help: 'Remap all content from a datasource',
options: {
repo: { type: 'string', short: 'r', help: 'Repo name or DID' },
repo: {
type: 'string',
short: 'r',
required: true,
help: 'Repo name or DID',
},
},
arguments: [{ name: 'datasource', required: true, help: 'Datasource UID' }],
async run(opts, args) {
const repo = await repoRegistry.openWithDefaults(opts.repo)
await repo.dsr.hydrate(repo.prisma, plugins, repo.did)
const ds = repo.dsr.get(args.datasource)
if (!ds) throw new CliError('Datasource does not exist')
const result = await remapDataSource(repo, ds)
console.log(result)
try {
if (!opts.repo) {
throw new Error('Repo name or did required with -r option.')
}
const res = (await request(`/repo/${opts.repo}/ds/${args.datasource}`, {
method: 'GET',
})) as any
console.log(res.result)
} catch (err) {
console.error('Error remapping datasource:', err)
}
},
})

Expand Down
91 changes: 73 additions & 18 deletions packages/repco-server/src/routes/admin.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
import Table from 'cli-table3'
import express from 'express'
import pc from 'picocolors'
import { repoRegistry } from 'repco-core'
import {
defaultDataSourcePlugins as plugins,
Ingester,
remapDataSource,
repoRegistry,
} from 'repco-core'
import { ServerError } from '../error.js'
import { getLocals } from '../lib.js'

Expand Down Expand Up @@ -65,7 +70,7 @@ router.post('/repo', async (req, res, next) => {
}
})

// list repos
// List repos
router.get('/repo', async (req, res) => {
try {
const { prisma } = getLocals(res)
Expand All @@ -85,7 +90,7 @@ router.get('/repo', async (req, res) => {
}
})

// info on a repo
// Print info on a repo
router.get('/repo/:repo', async (req, res) => {
try {
const { prisma } = getLocals(res)
Expand Down Expand Up @@ -136,21 +141,71 @@ router.post('/repo/:repo/ds', async (req, res) => {
}
})

// modify datasource
router.put('/repo/:repodid/ds/:dsuid', async (req, res) => {
const body = req.body
console.log('received body', body)
res.send({ ok: true })
// List datasources in a repo
router.get('/repo/:repo/ds', async (req, res) => {
try {
const { prisma } = getLocals(res)
const repo = await repoRegistry.open(prisma, req.params.repo)
await repo.dsr.hydrate(repo.prisma, plugins, repo.did)
const data = repo.dsr
.all()
.map((ds) => ({ ...ds.definition, config: ds.config }))
res.send({ data })
} catch (err) {
throw new ServerError(
500,
`Failed to list all datasources for repo ${req.params.repo}` + err,
)
}
})
// get datasource
router.get('/repo/:repodid/ds/:dsuid', async (req, res) => {
const body = req.body
console.log('received body', body)
res.send({ ok: true })

// Ingest from a datasource
router.post('/repo/:repo/ds/ingest', async (req, res) => {
try {
const { prisma } = getLocals(res)
const repo = await repoRegistry.open(prisma, req.params.repo)
const ingester = new Ingester(plugins, repo)
if (req.body.loop) {
const queue = ingester.workLoop()
const message = `Started the ingestion workloop for all datasources of repo ${req.params.repo}. \
See server logs for results of the ingestion process.`
res.send({ result: message })
for await (const result of queue) {
console.log(result)
}
} else {
if (!req.body.ds) {
const result = await ingester.ingestAll()
res.send({ result })
} else {
console.log('Ingesting datasource ' + req.body.ds)
const result = await ingester.ingest(req.body.ds)
res.send({ result })
}
}
} catch (err) {
throw new ServerError(
500,
`Failed to ingest from datasources for repo ${req.params.repo}` + err,
)
}
})
// list datasources
router.get('/repo/:repodid/ds', async (req, res) => {
const body = req.body
console.log('received body', body)
res.send({ ok: true })

// Remap a datasource
router.get('/repo/:repo/ds/:dsuid', async (req, res) => {
try {
const { prisma } = getLocals(res)
const repo = await repoRegistry.open(prisma, req.params.repo)
await repo.dsr.hydrate(repo.prisma, plugins, repo.did)
const ds = repo.dsr.get(req.params.dsuid)
if (!ds) throw new ServerError(500, 'Datasource does not exist')
const result = await remapDataSource(repo, ds)
res.send({ result })
} catch (err) {
throw new ServerError(
500,
`Failed to remap datasource ${req.params.dsuid} for repo ${req.params.repo}` +
err,
)
}
})

0 comments on commit 5f0cfb8

Please sign in to comment.