Skip to content

Commit

Permalink
- fix: Ingestion starts directly when a datasource is added.
Browse files Browse the repository at this point in the history
- Included repco-server in bundling
- Added RepoRegistry with cache
- Added repo info cli command to admin api
  • Loading branch information
flipsimon committed Nov 17, 2023
1 parent 1e60eb4 commit 4fdb505
Show file tree
Hide file tree
Showing 8 changed files with 128 additions and 55 deletions.
7 changes: 6 additions & 1 deletion packages/repco-cli/scripts/bundle.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,12 @@ async function main() {
watch: watch && { onRebuild },
sourcemap: true,
platform: 'node',
external: ['@prisma/client', 'classic-level', 'repco-server'],
external: [
'@prisma/client',
'classic-level',
'pg-native',
'graphile-build-pg',
],
banner: {
js: 'import {createRequire as __createRequire } from "module";const require=__createRequire(import.meta.url);',
},
Expand Down
33 changes: 16 additions & 17 deletions packages/repco-cli/src/commands/repo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -226,28 +226,27 @@ export const list = createCommand({
},
})

//TODO
export const info = createCommand({
name: 'info',
help: 'Info on a repo',
arguments: [{ name: 'repo', required: true, help: 'DID or name of repo' }],
async run(_opts, args) {
const repo = await Repo.openWithDefaults(args.repo)
const table = new Table()
const revisionCount = await repo.prisma.revision.count({
where: { repoDid: repo.did },
})
const commitCount = await repo.prisma.commit.count({
where: { repoDid: repo.did },
})
table.push(['DID', repo.did])
table.push(['Name', repo.name || ''])
table.push(['Writable', JSON.stringify(repo.writeable)])
table.push(['Head (commit)', ((await repo.getHead()) || '-').toString()])
table.push(['Head (revision)', (await repo.getCursor()) || '-'])
table.push(['Revisions', String(revisionCount)])
table.push(['Commits', String(commitCount)])
print(table.toString())
try {
const res = (await request(`/repo/${args.repo}`, {
method: 'GET',
})) as any
const table = new Table()
table.push(['Did', res.info.did])
table.push(['Name', res.info.name])
table.push(['Writeable', JSON.stringify(res.info.writeable)])
table.push(['Head (commit)', res.info.headCommit])
table.push(['Head (revisions)', res.info.headRevisions])
table.push(['Revisions', String(res.info.revisions)])
table.push(['Commits', String(res.info.commits)])
print(table.toString())
} catch (err) {
console.error('got error', err)
}
},
})

Expand Down
3 changes: 2 additions & 1 deletion packages/repco-cli/src/commands/run.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import exitHook from 'async-exit-hook'
import { log, UntilStopped } from 'repco-common'
import { defaultDataSourcePlugins, Ingester, Repo } from 'repco-core'
import { PrismaClient } from 'repco-prisma'
import { runServer } from 'repco-server'
import { createCommand } from '../parse.js'
import { startPostgres } from '../util/postgres.js'

Expand All @@ -17,6 +18,7 @@ export const run = createCommand({
},
async run(opts) {
const shutdown: Array<() => Promise<void>> = []
log.debug('start')
if (opts.temp) {
log.warn(
'Running in temp mode with inmemory PostgreSQL - all changes will be lost',
Expand Down Expand Up @@ -46,7 +48,6 @@ export const run = createCommand({
shutdown.push(ingest.shutdown)

// start server
const { runServer } = await import('repco-server')
const server = runServer(prisma, port)
shutdown.push(server.shutdown)

Expand Down
2 changes: 1 addition & 1 deletion packages/repco-cli/src/commands/server.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { PrismaClient } from 'repco-prisma'
import { runServer } from 'repco-server'
import { createCommand } from '../parse.js'

export const server = createCommand({
Expand All @@ -10,7 +11,6 @@ export const server = createCommand({
async run(opts) {
const prisma = new PrismaClient()
const port = Number(opts.port) || Number(process.env.PORT) || 8765
const { runServer } = await import('repco-server')
runServer(prisma, port)
},
})
98 changes: 69 additions & 29 deletions packages/repco-core/src/repo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,47 @@ function defaultBlockStore(
return new PrismaIpldBlockStore(prisma)
}

class RepoRegistry {
repos: Map<string, Repo> = new Map()
opening: Map<string, Promise<void>> = new Map()

public async open(prisma: PrismaClient, didOrName: string): Promise<Repo> {
const did = await Repo.nameToDid(prisma, didOrName)
if (!this.repos.has(did)) {
if (!this.opening.has(did)) {
await this._openInner(prisma, did)
} else {
await this.opening.get(did)
}
}
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
return this.repos.get(did)!
}

async _openInner(prisma: PrismaClient, did: string) {
let _resolve: (v: void | PromiseLike<void>) => void
let _reject: (e: any) => void
const promise = new Promise<void>((resolve, reject) => {
_resolve = resolve
_reject = reject
})
this.opening.set(did, promise)
try {
const repo = await Repo.load(prisma, did)
this.repos.set(did, repo)
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
_resolve!()
} catch (err) {
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
_reject!(err)
} finally {
this.opening.delete(did)
}
}
}

export const repoRegistry = new RepoRegistry()

export class Repo extends EventEmitter {
public dsr: DataSourceRegistry
public blockstore: IpldBlockStore
Expand All @@ -109,9 +150,6 @@ export class Repo extends EventEmitter {
private publishingCapability: string | null
private validatedAgents: Set<string> = new Set()

public static CACHE: Map<string, Repo> = new Map()
public static cache = true

private txlock = new Mutex()

public log: Logger
Expand All @@ -130,7 +168,12 @@ export class Repo extends EventEmitter {
return Repo.open(prisma, nameOrDid)
}

static async create(prisma: PrismaClient, name: string, did?: string) {
static async create(
prisma: PrismaClient,
name: string,
did?: string,
useCache = true,
) {
if (!name.match(/[a-zA-Z0-9-]{3,64}/)) {
throw new Error(
'Repo name is invalid. Repo names must be between 3 and 64 alphanumerical characters',
Expand Down Expand Up @@ -158,47 +201,44 @@ export class Repo extends EventEmitter {
name,
},
})
const repo = await Repo.open(prisma, did)
const repo = await Repo.open(prisma, did, useCache)
if (repo.writeable) {
await repo.saveBatch([], { commitEmpty: true })
}
return repo
}

static async open(prisma: PrismaClient, didOrName: string): Promise<Repo> {
if (Repo.cache && Repo.CACHE.has(didOrName)) {
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
return Repo.CACHE.get(didOrName)!
static async open(
prisma: PrismaClient,
didOrName: string,
useCache = true,
): Promise<Repo> {
if (useCache) return repoRegistry.open(prisma, didOrName)
else {
const did = await Repo.nameToDid(prisma, didOrName)
return Repo.load(prisma, did)
}
}

const isDid = didOrName.startsWith('did:')
const params: OpenParams = {}
if (isDid) params.did = didOrName
else params.name = didOrName

const repo = await Repo.load(prisma, params)

if (Repo.cache) {
Repo.CACHE.set(repo.did, repo)
if (repo.name) Repo.CACHE.set(repo.name, repo)
static async nameToDid(prisma: PrismaClient, name: string): Promise<string> {
if (name.startsWith('did:')) return name
const record = await prisma.repo.findFirst({
where: { name },
select: { did: true },
})
if (!record) {
throw new RepoError(ErrorCode.NOT_FOUND, `Repo not found`)
}

return repo
return record.did
}

static async load(prisma: PrismaClient, params: OpenParams): Promise<Repo> {
if (!params.did && !params.name) {
throw new Error(
'Invalid open params: One of `did` or `name` is required.',
)
}
static async load(prisma: PrismaClient, did: string): Promise<Repo> {
const record = await prisma.repo.findFirst({
where: { OR: [{ did: params.did }, { name: params.name }] },
where: { did },
})
if (!record) {
throw new RepoError(ErrorCode.NOT_FOUND, `Repo not found`)
}
const did = record.did
const cap = await getPublishingUcanForInstance(prisma, did).catch(
(_) => null,
)
Expand Down
5 changes: 2 additions & 3 deletions packages/repco-core/test/bench/basic.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@ for (const batch of batchSizes) {
assert.timeout(1000 * 1000)
const count = 500
const [prisma1, prisma2] = await setup2(assert)
Repo.cache = false
const repo = await Repo.create(prisma1, 'test')
const repo = await Repo.create(prisma1, 'test', undefined, false)
const group = `${count} (batch ${batch})`
bench(`create ${group}`, async (b: any) => {
const batches = count / batch
Expand All @@ -35,7 +34,7 @@ for (const batch of batchSizes) {
await fs.writeFile(carfile, stream)
b.end()
})
const repo2 = await Repo.create(prisma2, 'test-clone', repo.did)
const repo2 = await Repo.create(prisma2, 'test-clone', repo.did, false)
bench(`import ${group}`, async (b: any) => {
b.start()
const readStream = createReadStream(carfile)
Expand Down
5 changes: 2 additions & 3 deletions packages/repco-core/test/sync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,8 @@ import { Repo } from '../lib.js'

test('simple sync', async (assert) => {
const [prisma1, prisma2] = await setup2(assert)
Repo.cache = false
const repo1 = await Repo.create(prisma1, 'default')
const repo2 = await Repo.create(prisma2, 'synced', repo1.did)
const repo1 = await Repo.create(prisma1, 'default', undefined, false)
const repo2 = await Repo.create(prisma2, 'synced', repo1.did, false)
assert.is(repo1.did, repo2.did)
assert.is(repo1.writeable, true)
assert.is(repo2.writeable, false)
Expand Down
30 changes: 30 additions & 0 deletions packages/repco-server/src/routes/admin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ router.post('/repo', async (req, res, next) => {
throw new ServerError(500, `Failed to create repo ${body.name}: ` + err)
}
})

// list repos
router.get('/repo', async (req, res) => {
try {
Expand All @@ -84,6 +85,35 @@ router.get('/repo', async (req, res) => {
}
})

// info on a repo
router.get('/repo/:repo', async (req, res) => {
try {
const { prisma } = getLocals(res)
const repo = await Repo.open(prisma, req.params.repo)
const revisionCount = await repo.prisma.revision.count({
where: { repoDid: repo.did },
})
const commitCount = await repo.prisma.commit.count({
where: { repoDid: repo.did },
})
const info = {
did: repo.did,
name: repo.name || '',
writeable: repo.writeable,
headCommit: ((await repo.getHead()) || '-').toString(),
headRevisions: (await repo.getCursor()) || '-',
revisions: revisionCount,
commits: commitCount,
}
res.send({ info })
} catch (err) {
throw new ServerError(
500,
`Failed to get information for repo ${req.params.repo}` + err,
)
}
})

// create datasource
router.post('/repo/:repo/ds', async (req, res) => {
try {
Expand Down

0 comments on commit 4fdb505

Please sign in to comment.