Skip to content

Commit

Permalink
- fix: creating a repo fires an event such that repos are registered …
Browse files Browse the repository at this point in the history
…directly after adding

- Moved repo creation logic to repoRegistry
  • Loading branch information
flipsimon committed Nov 20, 2023
1 parent 4fdb505 commit 95f1947
Show file tree
Hide file tree
Showing 17 changed files with 114 additions and 107 deletions.
4 changes: 2 additions & 2 deletions packages/repco-cli/src/commands/debug.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import casual from 'casual-browserify'
import prettyMs from 'pretty-ms'
import { SingleBar } from 'cli-progress'
import { EntityForm, Repo } from 'repco-core'
import { EntityForm, repoRegistry } from 'repco-core'
import { request } from '../client.js'
import { createCommand, createCommandGroup } from '../parse.js'

Expand Down Expand Up @@ -43,7 +43,7 @@ export const createContent = createCommand({
},
},
async run(opts, args) {
const repo = await Repo.openWithDefaults(args.repo)
const repo = await repoRegistry.openWithDefaults(args.repo)
let count, batch
if (opts.count) count = parseInt(opts.count)
if (!count || isNaN(count)) count = 1000
Expand Down
8 changes: 4 additions & 4 deletions packages/repco-cli/src/commands/ds.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import {
defaultDataSourcePlugins as plugins,
Ingester,
remapDataSource,
Repo,
repoRegistry,
} from 'repco-core'
import { request } from '../client.js'
import { CliError, createCommand, createCommandGroup } from '../parse.js'
Expand Down Expand Up @@ -32,7 +32,7 @@ export const list = createCommand({
json: { type: 'boolean', short: 'j', help: 'Output as JSON' },
},
async run(opts) {
const repo = await Repo.openWithDefaults(opts.repo)
const repo = await repoRegistry.openWithDefaults(opts.repo)
await repo.dsr.hydrate(repo.prisma, plugins, repo.did)
const data = repo.dsr
.all()
Expand Down Expand Up @@ -106,7 +106,7 @@ export const ingest = createCommand({
loop: { type: 'boolean', short: 'l', help: 'Keep running in a loop' },
},
async run(opts, _args) {
const repo = await Repo.openWithDefaults(opts.repo)
const repo = await repoRegistry.openWithDefaults(opts.repo)
const ingester = new Ingester(plugins, repo)
if (opts.loop) {
const queue = ingester.workLoop()
Expand Down Expand Up @@ -134,7 +134,7 @@ export const remap = createCommand({
},
arguments: [{ name: 'datasource', required: true, help: 'Datasource UID' }],
async run(opts, args) {
const repo = await Repo.openWithDefaults(opts.repo)
const repo = await repoRegistry.openWithDefaults(opts.repo)
await repo.dsr.hydrate(repo.prisma, plugins, repo.did)
const ds = repo.dsr.get(args.datasource)
if (!ds) throw new CliError('Datasource does not exist')
Expand Down
12 changes: 6 additions & 6 deletions packages/repco-cli/src/commands/repo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import speedometer from 'speedometer'
import { Presets, SingleBar } from 'cli-progress'
import { createReadStream } from 'fs'
import { CID } from 'multiformats/cid'
import { ExportProgress, ImportProgress, Repo } from 'repco-core'
import { ExportProgress, ImportProgress, repoRegistry } from 'repco-core'
import { PrismaClient } from 'repco-prisma'
import { pipeline } from 'stream/promises'
import { request } from '../client.js'
Expand Down Expand Up @@ -58,7 +58,7 @@ export const join = createCommand({
] as const,
async run(opts, args) {
const prisma = new PrismaClient()
const repo = await Repo.create(prisma, args.name, args.did)
const repo = await repoRegistry.create(prisma, args.name, args.did)
if (opts.gateway) await repo.setGateways(opts.gateway)
print(`Created mirror repo ${repo.name} and DID`)
print(` ${pc.yellow(repo.did)}`)
Expand All @@ -80,7 +80,7 @@ export const carExport = createCommand({
{ name: 'file', required: true, help: 'File path to export the repo to' },
] as const,
async run(opts, args) {
const repo = await Repo.openWithDefaults(args.repo)
const repo = await repoRegistry.openWithDefaults(args.repo)
let from
if (opts.from) from = CID.parse(opts.from)
const format =
Expand Down Expand Up @@ -147,7 +147,7 @@ export const carImport = createCommand({
{ name: 'file', required: true, help: 'File path to export the repo to' },
] as const,
async run(_opts, args) {
const repo = await Repo.openWithDefaults(args.repo)
const repo = await repoRegistry.openWithDefaults(args.repo)
print(`Import from: ${args.file === '-' ? 'STDIN' : args.file}`)
print(`Import to: Repo "${repo.name}" (${repo.did})`)
let input: AsyncIterable<Uint8Array>
Expand Down Expand Up @@ -258,7 +258,7 @@ export const logRevisions = createCommand({
},
arguments: [{ name: 'repo', required: true, help: 'DID or name of repo' }],
async run(opts, args) {
const repo = await Repo.openWithDefaults(args.repo)
const repo = await repoRegistry.openWithDefaults(args.repo)
let stream
if (opts.content) {
stream = repo.createContentStream()
Expand All @@ -278,7 +278,7 @@ export const syncCommand = createCommand({
{ name: 'repo', required: true, help: 'DID or name of repo' },
] as const,
async run(_opts, args) {
const repo = await Repo.openWithDefaults(args.repo)
const repo = await repoRegistry.openWithDefaults(args.repo)
await repo.pullFromGateways()
},
})
Expand Down
10 changes: 7 additions & 3 deletions packages/repco-cli/src/commands/run.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import exitHook from 'async-exit-hook'
import { log, UntilStopped } from 'repco-common'
import { defaultDataSourcePlugins, Ingester, Repo } from 'repco-core'
import { repoRegistry } from 'repco-core/dist/src/repo.js'
import { PrismaClient } from 'repco-prisma'
import { runServer } from 'repco-server'
import { createCommand } from '../parse.js'
Expand Down Expand Up @@ -67,7 +68,7 @@ export const run = createCommand({

function ingestAll(prisma: PrismaClient) {
const ingesters: Ingester[] = []
const tasks = Repo.mapAsync(prisma, async (repo) => {
const onRepo = async (repo: Repo) => {
const ingester = new Ingester(defaultDataSourcePlugins, repo)
ingesters.push(ingester)
const queue = ingester.workLoop()
Expand All @@ -85,9 +86,12 @@ function ingestAll(prisma: PrismaClient) {
log.debug(`ingest ${result.uid}: ${result.ok} ${cursor}`)
}
}
})
}
const tasks = repoRegistry.mapAsync(prisma, onRepo)
repoRegistry.on('create', onRepo)

const shutdown = async () => {
repoRegistry.removeListener('create', onRepo)
ingesters.forEach((ingester) => ingester.stop())
await tasks
}
Expand All @@ -100,7 +104,7 @@ function ingestAll(prisma: PrismaClient) {
function syncAllRepos(prisma: PrismaClient) {
const shutdownSignal = new UntilStopped()

const tasks = Repo.mapAsync(prisma, async (repo) => {
const tasks = repoRegistry.mapAsync(prisma, async (repo) => {
if (repo.writeable) return
try {
while (!shutdownSignal.stopped) {
Expand Down
2 changes: 1 addition & 1 deletion packages/repco-core/src/mod.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ export { repco, form } from './prisma.js'
export * from './datasource.js'
export * from './entity.js'
export * from './file.js'
export { Repo, RepoError } from './repo.js'
export { Repo, RepoError, repoRegistry } from './repo.js'
export * from './repo/blockstore.js'
export { HttpError } from './util/error.js'
export { encodeHeader } from './repo/codec-json.js'
Expand Down
110 changes: 54 additions & 56 deletions packages/repco-core/src/repo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -98,21 +98,30 @@ function defaultBlockStore(
return new PrismaIpldBlockStore(prisma)
}

class RepoRegistry {
class RepoRegistry extends EventEmitter {
repos: Map<string, Repo> = new Map()
opening: Map<string, Promise<void>> = new Map()

public async open(prisma: PrismaClient, didOrName: string): Promise<Repo> {
const did = await Repo.nameToDid(prisma, didOrName)
if (!this.repos.has(did)) {
if (!this.opening.has(did)) {
await this._openInner(prisma, did)
} else {
await this.opening.get(did)
public async open(
prisma: PrismaClient,
didOrName: string,
useCache = true,
): Promise<Repo> {
const did = await this.nameToDid(prisma, didOrName)
if (useCache) {
if (!this.repos.has(did)) {
if (!this.opening.has(did)) {
await this._openInner(prisma, did)
} else {
await this.opening.get(did)
}
}
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
return this.repos.get(did)!
} else {
const did = await this.nameToDid(prisma, didOrName)
return this.load(prisma, did)
}
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
return this.repos.get(did)!
}

async _openInner(prisma: PrismaClient, did: string) {
Expand All @@ -124,7 +133,7 @@ class RepoRegistry {
})
this.opening.set(did, promise)
try {
const repo = await Repo.load(prisma, did)
const repo = await this.load(prisma, did)
this.repos.set(did, repo)
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
_resolve!()
Expand All @@ -135,40 +144,22 @@ class RepoRegistry {
this.opening.delete(did)
}
}
}

export const repoRegistry = new RepoRegistry()

export class Repo extends EventEmitter {
public dsr: DataSourceRegistry
public blockstore: IpldBlockStore
public prisma: PrismaClient | Prisma.TransactionClient
public ipld: IpldRepo

public record: RepoRecord

private publishingCapability: string | null
private validatedAgents: Set<string> = new Set()

private txlock = new Mutex()

public log: Logger

static async createOrOpen(prisma: PrismaClient, name: string, did?: string) {
async createOrOpen(prisma: PrismaClient, name: string, did?: string) {
try {
return await Repo.open(prisma, did || name)
return await this.open(prisma, did || name)
} catch (_err) {
return await Repo.create(prisma, name)
return await this.create(prisma, name)
}
}

static async openWithDefaults(nameOrDid?: string) {
async openWithDefaults(nameOrDid?: string) {
const prisma = new PrismaClient()
if (!nameOrDid) nameOrDid = process.env.REPCO_REPO || 'default'
return Repo.open(prisma, nameOrDid)
return this.open(prisma, nameOrDid)
}

static async create(
async create(
prisma: PrismaClient,
name: string,
did?: string,
Expand Down Expand Up @@ -201,26 +192,15 @@ export class Repo extends EventEmitter {
name,
},
})
const repo = await Repo.open(prisma, did, useCache)
const repo = await this.open(prisma, did, useCache)
this.emit('create', repo)
if (repo.writeable) {
await repo.saveBatch([], { commitEmpty: true })
}
return repo
}

static async open(
prisma: PrismaClient,
didOrName: string,
useCache = true,
): Promise<Repo> {
if (useCache) return repoRegistry.open(prisma, didOrName)
else {
const did = await Repo.nameToDid(prisma, didOrName)
return Repo.load(prisma, did)
}
}

static async nameToDid(prisma: PrismaClient, name: string): Promise<string> {
async nameToDid(prisma: PrismaClient, name: string): Promise<string> {
if (name.startsWith('did:')) return name
const record = await prisma.repo.findFirst({
where: { name },
Expand All @@ -232,7 +212,7 @@ export class Repo extends EventEmitter {
return record.did
}

static async load(prisma: PrismaClient, did: string): Promise<Repo> {
async load(prisma: PrismaClient, did: string): Promise<Repo> {
const record = await prisma.repo.findFirst({
where: { did },
})
Expand All @@ -246,28 +226,46 @@ export class Repo extends EventEmitter {
return repo
}

static async list(prisma: PrismaClient): Promise<RepoRecord[]> {
async list(prisma: PrismaClient): Promise<RepoRecord[]> {
const list = await prisma.repo.findMany()
return list
}

static async all(prisma: PrismaClient): Promise<Repo[]> {
const list = await Repo.list(prisma)
async all(prisma: PrismaClient): Promise<Repo[]> {
const list = await this.list(prisma)
const repos = await Promise.all(
list.map(({ did }) => Repo.open(prisma, did)),
list.map(({ did }) => this.open(prisma, did)),
)
return repos
}

static async mapAsync<T = void>(
async mapAsync<T = void>(
prisma: PrismaClient,
mapAsync: (repo: Repo) => Promise<T>,
) {
const repos = await Repo.all(prisma)
const repos = await this.all(prisma)
const tasks = repos.map(mapAsync)
const results = await Promise.all(tasks)
return results
}
}

export const repoRegistry = new RepoRegistry()

export class Repo extends EventEmitter {
public dsr: DataSourceRegistry
public blockstore: IpldBlockStore
public prisma: PrismaClient | Prisma.TransactionClient
public ipld: IpldRepo

public record: RepoRecord

private publishingCapability: string | null
private validatedAgents: Set<string> = new Set()

private txlock = new Mutex()

public log: Logger

constructor(
prisma: PrismaClient | Prisma.TransactionClient,
Expand Down
4 changes: 2 additions & 2 deletions packages/repco-core/test/activitypub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import test from 'brittle'
import { fileURLToPath } from 'node:url'
import { assertFixture, mockFetch } from './util/fetch.js'
import { setup } from './util/setup.js'
import { Repo } from '../lib.js'
import { repoRegistry } from '../lib.js'
import { ingestUpdatesFromDataSources } from '../src/datasource.js'
import { ActivityPubDataSourcePlugin } from '../src/datasources/activitypub.js'
import { DataSourcePluginRegistry } from '../src/plugins.js'
Expand All @@ -19,7 +19,7 @@ const fixturePath = (name: string) =>
test('peertube datasource - basic1', async (assert) => {
mockFetch(assert, fixturePath('basic1'))
const prisma = await setup(assert)
const repo = await Repo.create(prisma, 'test')
const repo = await repoRegistry.create(prisma, 'test')
const plugins = new DataSourcePluginRegistry()
const activityPubPlugin = new ActivityPubDataSourcePlugin()
plugins.register(activityPubPlugin)
Expand Down
6 changes: 3 additions & 3 deletions packages/repco-core/test/basic.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import test from 'brittle'
import { setup } from './util/setup.js'
import { EntityForm, Repo } from '../lib.js'
import { EntityForm, repoRegistry } from '../lib.js'

test('smoke', async (assert) => {
const prisma = await setup(assert)
const repo = await Repo.create(prisma, 'default')
const repo = await repoRegistry.create(prisma, 'default')
const input = {
type: 'ContentItem',
content: {
Expand All @@ -25,7 +25,7 @@ test('smoke', async (assert) => {

test('update', async (assert) => {
const prisma = await setup(assert)
const repo = await Repo.create(prisma, 'default')
const repo = await repoRegistry.create(prisma, 'default')
const input: EntityForm = {
type: 'ContentItem',
headers: { EntityUris: ['first'] },
Expand Down
Loading

0 comments on commit 95f1947

Please sign in to comment.