Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3,451 changes: 1,878 additions & 1,573 deletions package-lock.json

Large diffs are not rendered by default.

9 changes: 5 additions & 4 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@
"author": "Streamr Network AG <[email protected]>",
"dependencies": {
"@streamr/config": "^5.3.7",
"@streamr/dht": "102.0.0-beta.0",
"@streamr/sdk": "102.0.0-beta.0",
"@streamr/trackerless-network": "102.0.0-beta.0",
"@streamr/utils": "102.0.0-beta.0",
"@streamr/dht": "102.0.0-beta.3",
"@streamr/sdk": "102.0.0-beta.3",
"@streamr/trackerless-network": "102.0.0-beta.3",
"@streamr/utils": "102.0.0-beta.3",
"@types/node-fetch": "^2.6.3",
"class-validator": "^0.14.1",
"cors": "^2.8.5",
Expand All @@ -36,6 +36,7 @@
"typedi": "^0.10.0"
},
"devDependencies": {
"@streamr/test-utils": "^102.0.0-beta.3",
"@tsconfig/node20": "^20.1.2",
"@types/cors": "^2.8.17",
"@types/express": "^4.17.21",
Expand Down
2 changes: 1 addition & 1 deletion src/StreamrClientFacade.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ export class StreamrClientFacade {

searchStreams(owner: string): AsyncIterable<Stream> {
return this.client.searchStreams(undefined, {
user: owner,
userId: owner,
allowPublic: false,
allOf: [StreamPermission.GRANT]
})
Expand Down
16 changes: 8 additions & 8 deletions src/crawler/Crawler.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { PeerDescriptor, toNodeId } from '@streamr/dht'
import { DhtAddress, Stream, StreamCreationEvent, StreamMetadata, StreamPermission } from '@streamr/sdk'
import { DhtAddress, getStreamPartitionCount, Stream, StreamCreationEvent, StreamMetadata, StreamPermission } from '@streamr/sdk'
import { Logger, StreamID, StreamPartID, StreamPartIDUtils, binaryToHex, toStreamPartID, wait } from '@streamr/utils'
import { difference, range, sortBy } from 'lodash'
import pLimit from 'p-limit'
Expand Down Expand Up @@ -58,7 +58,7 @@ const createNodeInfoLogOutput = (nodeInfo: NormalizedNodeInfo) => {
controlLayerNeighbors: sp.controlLayerNeighbors.map(toNodeId),
contentDeliveryLayerNeighbors: sp.contentDeliveryLayerNeighbors.map((n: any) => toNodeId(n.peerDescriptor)) // TODO better type
})),
version: nodeInfo.version
applicationVersion: nodeInfo.applicationVersion
}
}

Expand Down Expand Up @@ -176,7 +176,7 @@ export class Crawler {
const workedThreadLimit = pLimit(MAX_SUBSCRIPTION_COUNT)
await Promise.all(sortedContractStreams.map((stream: Stream) => {
return workedThreadLimit(async () => {
await this.analyzeStream(stream.id, stream.getMetadata(), topology, subscribeGate)
await this.analyzeStream(stream.id, await stream.getMetadata(), topology, subscribeGate)
})
}))

Expand All @@ -191,7 +191,7 @@ export class Crawler {
): Promise<void> {
logger.info(`Analyze ${id}`)
const peersByPartition = new Map<number, Set<DhtAddress>>
for (const partition of range(metadata.partitions)) {
for (const partition of range(getStreamPartitionCount(metadata))) {
peersByPartition.set(partition, topology.getPeers(toStreamPartID(id, partition)))
}
try {
Expand All @@ -212,7 +212,7 @@ export class Crawler {
logger.info(`Replace ${id}`)
await this.streamRepository.replaceStream({
id,
description: metadata.description ?? null,
description: metadata.description as string ?? null,
peerCount: peerIds.size,
messagesPerSecond: messageRate.messagesPerSecond,
bytesPerSecond: messageRate.bytesPerSecond,
Expand Down Expand Up @@ -253,7 +253,7 @@ export class Crawler {
// is the only publisher and subscriber
await this.streamRepository.replaceStream({
id: payload.streamId,
description: payload.metadata.description ?? null,
description: payload.metadata.description as string ?? null,
peerCount: 0,
messagesPerSecond: 0,
bytesPerSecond: 0,
Expand All @@ -265,14 +265,14 @@ export class Crawler {
await wait(this.config.crawler.newStreamAnalysisDelay)
// the entryPoints may contain duplicates (i.e. same node is an entry point for
// multiple partitions), but crawlTopology can ignore those
const entryPoints = (await Promise.all(range(payload.metadata.partitions)
const entryPoints = (await Promise.all(range(getStreamPartitionCount(payload.metadata))
.map((p) => toStreamPartID(payload.streamId, p))
.map((sp) => localNode.fetchStreamPartEntryPoints(sp)))).flat()
const topology = await crawlTopology(localNode, entryPoints, (nodeInfo: NormalizedNodeInfo) => {
const streamPartitions = nodeInfo.streamPartitions.filter(
(sp) => StreamPartIDUtils.getStreamID(sp.id as StreamPartID) === payload.streamId
)
return (streamPartitions.map((sp) => sp.contentDeliveryLayerNeighbors.map((n) => n.peerDescriptor!))).flat()
return (streamPartitions.map((sp) => sp.contentDeliveryLayerNeighbors.map((n) => n.peerDescriptor))).flat()
}, `stream-${payload.streamId}-${Date.now()}`)
// TODO could add new nodes and neighbors to NodeRepository?
await this.analyzeStream(payload.streamId, payload.metadata, topology, this.subscribeGate!)
Expand Down
2 changes: 1 addition & 1 deletion src/crawler/NetworkNodeFacade.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ export type NormalizedNodeInfo = Omit<NodeInfo, 'streamPartitions'>
& { streamPartitions: Omit<ArrayElement<NodeInfo['streamPartitions']>, 'deprecatedContentDeliveryLayerNeighbors'>[] }

const toNormalizeNodeInfo = (info: NodeInfo): NormalizedNodeInfo => {
const isLegacyFormat = semver.satisfies(semver.coerce(info.version)!, '< 102.0.0')
const isLegacyFormat = semver.satisfies(semver.coerce(info.applicationVersion)!, '< 102.0.0')
return {
...info,
streamPartitions: info.streamPartitions.map((sp) => ({
Expand Down
2 changes: 1 addition & 1 deletion src/crawler/Topology.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ export class Topology {
const streamPartNeighbors: Multimap<StreamPartID, DhtAddress> = new Multimap()
for (const streamPartitionInfo of info.streamPartitions) {
const neighbors = streamPartitionInfo.contentDeliveryLayerNeighbors
.map((n) => toNodeId(n.peerDescriptor!))
.map((n) => toNodeId(n.peerDescriptor))
.filter((id) => nodeIds.has(id))
streamPartNeighbors.addAll(StreamPartIDUtils.parse(streamPartitionInfo.id), neighbors)
}
Expand Down
4 changes: 2 additions & 2 deletions test/Crawler.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ describe('Crawler', () => {
peerDescriptor: n
})) ?? []
}],
version: '102.0.0'
applicationVersion: '102.0.0'
}
}

Expand Down Expand Up @@ -60,7 +60,7 @@ describe('Crawler', () => {
const topology = await crawlTopology(
localNode as any,
[nodes[0], nodes[5]],
(response: NormalizedNodeInfo) => response.streamPartitions[0].contentDeliveryLayerNeighbors.map((n) => n.peerDescriptor!),
(response: NormalizedNodeInfo) => response.streamPartitions[0].contentDeliveryLayerNeighbors.map((n) => n.peerDescriptor),
''
)
expect(localNode.fetchNodeInfo).toHaveBeenCalledTimes(nodes.length)
Expand Down
4 changes: 2 additions & 2 deletions test/NetworkNodeFacade.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ const NORMAL_INFO = {
rtt: 123
}]
}],
version: '102.0.0-beta.0'
applicationVersion: '102.0.0-beta.2'
}
const LEGACY_INFO = {
peerDescriptor: createTestPeerDescriptor(),
Expand All @@ -43,7 +43,7 @@ const LEGACY_INFO = {
deprecatedContentDeliveryLayerNeighbors: [createTestPeerDescriptor()],
contentDeliveryLayerNeighbors: []
}],
version: '101.1.1-beta.1'
applicationVersion: '101.1.1-beta.1'
}

const createMockNode = (rawNodeInfo: NodeInfo): Partial<NetworkNode> => {
Expand Down
21 changes: 9 additions & 12 deletions test/end-to-end.test.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import 'reflect-metadata'

import { DhtAddress, NodeType, randomDhtAddress, toDhtAddress, toDhtAddressRaw } from '@streamr/dht'
import StreamrClient, { CONFIG_TEST, NetworkNodeType, PeerDescriptor, StreamID, StreamPermission, StreamrClientConfig } from '@streamr/sdk'
import StreamrClient, { NetworkNodeType, PeerDescriptor, StreamID, StreamPermission, StreamrClientConfig } from '@streamr/sdk'
import { NetworkNode, createNetworkNode } from '@streamr/trackerless-network'
import { StreamPartID, collect, setAbortableInterval, toStreamPartID, waitForCondition } from '@streamr/utils'
import { StreamPartID, collect, setAbortableInterval, toStreamPartID, until } from '@streamr/utils'
import { fetchPrivateKeyWithGas } from '@streamr/test-utils'
import { sample, uniq, without } from 'lodash'
import Container from 'typedi'
import { CONFIG_TOKEN } from '../src/Config'
Expand All @@ -15,8 +16,6 @@ import { Stream } from '../src/entities/Stream'
import { createDatabase, queryAPI } from '../src/utils'
import { TEST_DATABASE_NAME, dropTestDatabaseIfExists } from './utils'

const PUBLISHER_PRIVATE_KEY = '0x0000000000000000000000000000000000000000000000000000000000000001'
const SUBSCRIBER_PRIVATE_KEY = '0x0000000000000000000000000000000000000000000000000000000000000002'
const ENTRY_POINT_PORT = 40501
const PARTITION_COUNT = 3
const ACTIVE_PARTITIONS = [1, 2]
Expand Down Expand Up @@ -50,11 +49,9 @@ const startEntryPoint = async (): Promise<NetworkNode> => {

const createClientConfig = (entryPointPeerDescriptor: PeerDescriptor): StreamrClientConfig => {
return {
...CONFIG_TEST,
environment: 'dev2',
network: {
...CONFIG_TEST.network,
controlLayer: {
...CONFIG_TEST.network!.controlLayer,
entryPoints: [{
nodeId: toDhtAddress(entryPointPeerDescriptor.nodeId),
type: NetworkNodeType.NODEJS,
Expand Down Expand Up @@ -156,7 +153,7 @@ describe('end-to-end', () => {
if (isPublic) {
await stream.grantPermissions({ public: true, permissions })
} else {
await stream.grantPermissions({ user: await subscriber.getAddress(), permissions })
await stream.grantPermissions({ userId: await subscriber.getAddress(), permissions })
}
return stream
}
Expand All @@ -180,7 +177,7 @@ describe('end-to-end', () => {
const waitForTheGraphToIndex = async (streamIds: StreamID[]): Promise<void> => {
const client = createClient(undefined, entryPoint.getPeerDescriptor())
for (const streamId of streamIds) {
await waitForCondition(async () => {
await until(async () => {
const streams = await collect(client.searchStreams(streamId, undefined))
return streams.length > 0
}, 5000, 500)
Expand Down Expand Up @@ -210,8 +207,8 @@ describe('end-to-end', () => {
await dropTestDatabaseIfExists(config.database)
await createDatabase(config.database)
Container.set(CONFIG_TOKEN, config)
publisher = createClient(PUBLISHER_PRIVATE_KEY, entryPoint.getPeerDescriptor())
subscriber = createClient(SUBSCRIBER_PRIVATE_KEY, entryPoint.getPeerDescriptor())
publisher = createClient(await fetchPrivateKeyWithGas(), entryPoint.getPeerDescriptor())
subscriber = createClient(await fetchPrivateKeyWithGas(), entryPoint.getPeerDescriptor())
const server = Container.get(APIServer)
await server.start()
apiPort = Container.get(APIServer).getPort()
Expand Down Expand Up @@ -277,7 +274,7 @@ describe('end-to-end', () => {
const newStream = await createTestStream(false)
await startPublisherAndSubscriberForStream(newStream.id, publishingAbortControler.signal)

await waitForCondition(async () => {
await until(async () => {
const metrics = await queryStreamMetrics(newStream.id, apiPort)
return (metrics !== undefined) && (metrics.peerCount >= 2)
}, 20 * 1000, 1000)
Expand Down
Loading