From bc2a112b092018061561966f8bbacefc9f111a5b Mon Sep 17 00:00:00 2001 From: zhaohai <33314633+zhaohai666@users.noreply.github.com> Date: Fri, 17 May 2024 15:19:10 +0800 Subject: [PATCH] Client configuration add namespace (#750) * clientConfiguration add namespace * update beta version * add namespace check * add namespace * add namespace * update version * Remove empty values from settings * update format and single quotes * update format * update version * remove codecov * Change namespace to required * Change namespace to required * Change namespace to required * test adds default parameters --------- Co-authored-by: zh378814 --- .github/workflows/nodejs_coverage.yml | 46 ------------------- nodejs/examples/Producer.ts | 1 + nodejs/examples/SimpleConsumer.ts | 1 + nodejs/src/client/BaseClient.ts | 6 +++ nodejs/src/client/Settings.ts | 5 +- nodejs/src/consumer/SimpleConsumer.ts | 2 +- .../consumer/SimpleSubscriptionSettings.ts | 5 +- nodejs/src/message/PublishingMessage.ts | 6 ++- nodejs/src/producer/Producer.ts | 10 ++-- nodejs/src/producer/PublishingSettings.ts | 6 +-- nodejs/test/consumer/SimpleConsumer.test.ts | 9 +++- nodejs/test/helper.ts | 1 + nodejs/test/producer/Producer.test.ts | 12 ++++- 13 files changed, 47 insertions(+), 63 deletions(-) delete mode 100644 .github/workflows/nodejs_coverage.yml diff --git a/.github/workflows/nodejs_coverage.yml b/.github/workflows/nodejs_coverage.yml deleted file mode 100644 index 2a29a5b28..000000000 --- a/.github/workflows/nodejs_coverage.yml +++ /dev/null @@ -1,46 +0,0 @@ -name: Node.js Coverage -on: - pull_request: - types: [opened, reopened, synchronize] - paths: - - 'nodejs/**' - push: - branches: - - master - -jobs: - build: - runs-on: ubuntu-latest - strategy: - matrix: - node-version: [16.19.0, 16.x, 18.x, 20.x] - steps: - - name: Checkout Git Source - uses: actions/checkout@v3 - with: - submodules: recursive - - - name: Use Node.js ${{ matrix.node-version }} - uses: actions/setup-node@v3 - with: - node-version: ${{ matrix.node-version }} - - - name: Install dependencies - working-directory: ./nodejs - run: npm i && npm run init - - - name: Start RocketMQ Server - working-directory: ./nodejs - run: npm run start-rocketmq - - - name: Run test - working-directory: ./nodejs - run: npm run ci - - - name: Code Coverage - uses: codecov/codecov-action@v3 - with: - files: ./nodejs/coverage/coverage-final.json - flags: nodejs - fail_ci_if_error: true - verbose: true diff --git a/nodejs/examples/Producer.ts b/nodejs/examples/Producer.ts index c90c761f0..6b8be4c1f 100644 --- a/nodejs/examples/Producer.ts +++ b/nodejs/examples/Producer.ts @@ -19,6 +19,7 @@ import { Producer } from '..'; const producer = new Producer({ endpoints: '127.0.0.1:8081', + namespace: '' }); await producer.startup(); diff --git a/nodejs/examples/SimpleConsumer.ts b/nodejs/examples/SimpleConsumer.ts index c58d9d170..fb36c38d8 100644 --- a/nodejs/examples/SimpleConsumer.ts +++ b/nodejs/examples/SimpleConsumer.ts @@ -20,6 +20,7 @@ import { SimpleConsumer } from '..'; const simpleConsumer = new SimpleConsumer({ consumerGroup: 'nodejs-demo-group', endpoints: '127.0.0.1:8081', + namespace: '', subscriptions: new Map().set('TopicTest', 'nodejs-demo'), }); await simpleConsumer.startup(); diff --git a/nodejs/src/client/BaseClient.ts b/nodejs/src/client/BaseClient.ts index 9fa12daef..77f790b95 100644 --- a/nodejs/src/client/BaseClient.ts +++ b/nodejs/src/client/BaseClient.ts @@ -57,6 +57,7 @@ export interface BaseClientOptions { * - example.com:8443 */ endpoints: string; + namespace: string; sessionCredentials?: SessionCredentials; requestTimeout?: number; logger?: ILogger; @@ -76,6 +77,7 @@ export abstract class BaseClient { readonly clientType = ClientType.CLIENT_TYPE_UNSPECIFIED; readonly sslEnabled: boolean; readonly #sessionCredentials?: SessionCredentials; + readonly namespace: string; protected readonly endpoints: Endpoints; protected readonly isolated = new Map(); protected readonly requestTimeout: number; @@ -92,6 +94,7 @@ export abstract class BaseClient { this.logger = options.logger ?? getDefaultLogger(); this.sslEnabled = options.sslEnabled === true; this.endpoints = new Endpoints(options.endpoints); + this.namespace = options.namespace; this.#sessionCredentials = options.sessionCredentials; // https://rocketmq.apache.org/docs/introduction/03limits/ // Default request timeout is 3000ms @@ -288,6 +291,9 @@ export abstract class BaseClient { metadata.set('x-mq-language', 'HTTP'); // version of client metadata.set('x-mq-client-version', UserAgent.INSTANCE.version); + if (this.namespace) { + metadata.set('x-mq-namespace', this.namespace); + } if (this.#sessionCredentials) { if (this.#sessionCredentials.securityToken) { metadata.set('x-mq-session-token', this.#sessionCredentials.securityToken); diff --git a/nodejs/src/client/Settings.ts b/nodejs/src/client/Settings.ts index 34f7eebdc..d67b51ae3 100644 --- a/nodejs/src/client/Settings.ts +++ b/nodejs/src/client/Settings.ts @@ -20,15 +20,16 @@ import { Endpoints } from '../route/Endpoints'; import { RetryPolicy } from '../retry'; export abstract class Settings { + protected readonly namespace: string; protected readonly clientId: string; protected readonly clientType: ClientType; protected readonly accessPoint: Endpoints; protected retryPolicy?: RetryPolicy; protected readonly requestTimeout: number; - constructor(clientId: string, clientType: ClientType, accessPoint: Endpoints, requestTimeout: number, - retryPolicy?: RetryPolicy) { + constructor(namespace: string, clientId: string, clientType: ClientType, accessPoint: Endpoints, requestTimeout: number, retryPolicy?: RetryPolicy) { this.clientId = clientId; + this.namespace = namespace; this.clientType = clientType; this.accessPoint = accessPoint; this.retryPolicy = retryPolicy; diff --git a/nodejs/src/consumer/SimpleConsumer.ts b/nodejs/src/consumer/SimpleConsumer.ts index 916f362cb..b5b045a6e 100644 --- a/nodejs/src/consumer/SimpleConsumer.ts +++ b/nodejs/src/consumer/SimpleConsumer.ts @@ -60,7 +60,7 @@ export class SimpleConsumer extends Consumer { } } this.#awaitDuration = options.awaitDuration ?? 30000; - this.#simpleSubscriptionSettings = new SimpleSubscriptionSettings(this.clientId, this.endpoints, + this.#simpleSubscriptionSettings = new SimpleSubscriptionSettings(options.namespace, this.clientId, this.endpoints, this.consumerGroup, this.requestTimeout, this.#awaitDuration, this.#subscriptionExpressions); } diff --git a/nodejs/src/consumer/SimpleSubscriptionSettings.ts b/nodejs/src/consumer/SimpleSubscriptionSettings.ts index 116a613bc..f12c4e285 100644 --- a/nodejs/src/consumer/SimpleSubscriptionSettings.ts +++ b/nodejs/src/consumer/SimpleSubscriptionSettings.ts @@ -30,9 +30,8 @@ export class SimpleSubscriptionSettings extends Settings { readonly group: string; readonly subscriptionExpressions: Map; - constructor(clientId: string, accessPoint: Endpoints, group: string, - requestTimeout: number, longPollingTimeout: number, subscriptionExpressions: Map) { - super(clientId, ClientType.SIMPLE_CONSUMER, accessPoint, requestTimeout); + constructor(namespace: string, clientId: string, accessPoint: Endpoints, group: string, requestTimeout: number, longPollingTimeout: number, subscriptionExpressions: Map) { + super(namespace, clientId, ClientType.SIMPLE_CONSUMER, accessPoint, requestTimeout); this.longPollingTimeout = longPollingTimeout; this.group = group; this.subscriptionExpressions = subscriptionExpressions; diff --git a/nodejs/src/message/PublishingMessage.ts b/nodejs/src/message/PublishingMessage.ts index b6015bf37..959d2f286 100644 --- a/nodejs/src/message/PublishingMessage.ts +++ b/nodejs/src/message/PublishingMessage.ts @@ -68,7 +68,7 @@ export class PublishingMessage extends Message { * This method should be invoked before each message sending, because the born time is reset before each * invocation, which means that it should not be invoked ahead of time. */ - toProtobuf(mq: MessageQueue) { + toProtobuf(namespace: string, mq: MessageQueue) { const systemProperties = new SystemProperties() .setKeysList(this.keys) .setMessageId(this.messageId) @@ -87,8 +87,10 @@ export class PublishingMessage extends Message { systemProperties.setMessageGroup(this.messageGroup); } + const resource = createResource(this.topic); + resource.setResourceNamespace(namespace); const message = new MessagePB() - .setTopic(createResource(this.topic)) + .setTopic(resource) .setBody(this.body) .setSystemProperties(systemProperties); if (this.properties) { diff --git a/nodejs/src/producer/Producer.ts b/nodejs/src/producer/Producer.ts index 623ecf461..8484167fb 100644 --- a/nodejs/src/producer/Producer.ts +++ b/nodejs/src/producer/Producer.ts @@ -66,7 +66,7 @@ export class Producer extends BaseClient { // https://rocketmq.apache.org/docs/introduction/03limits/ // Default max number of message sending retries is 3 const retryPolicy = ExponentialBackoffRetryPolicy.immediatelyRetryPolicy(options.maxAttempts ?? 3); - this.#publishingSettings = new PublishingSettings(this.clientId, this.endpoints, retryPolicy, + this.#publishingSettings = new PublishingSettings(options.namespace, this.clientId, this.endpoints, retryPolicy, this.requestTimeout, this.topics); this.#checker = options.checker; } @@ -85,7 +85,7 @@ export class Producer extends BaseClient { const request = new EndTransactionRequest() .setMessageId(messageId) .setTransactionId(transactionId) - .setTopic(createResource(message.topic)) + .setTopic(createResource(message.topic).setResourceNamespace(this.namespace)) .setResolution(resolution); const response = await this.rpcClientManager.endTransaction(endpoints, request, this.requestTimeout); StatusChecker.check(response.getStatus()?.toObject()); @@ -187,7 +187,11 @@ export class Producer extends BaseClient { #wrapSendMessageRequest(pubMessages: PublishingMessage[], mq: MessageQueue) { const request = new SendMessageRequest(); for (const pubMessage of pubMessages) { - request.addMessages(pubMessage.toProtobuf(mq)); + if (this.namespace) { + request.addMessages(pubMessage.toProtobuf(this.namespace, mq)); + } else { + request.addMessages(pubMessage.toProtobuf('', mq)); + } } return request; } diff --git a/nodejs/src/producer/PublishingSettings.ts b/nodejs/src/producer/PublishingSettings.ts index fb72d66dd..8b413808f 100644 --- a/nodejs/src/producer/PublishingSettings.ts +++ b/nodejs/src/producer/PublishingSettings.ts @@ -35,9 +35,8 @@ export class PublishingSettings extends Settings { #maxBodySizeBytes = 4 * 1024 * 1024; #validateMessageType = true; - constructor(clientId: string, accessPoint: Endpoints, retryPolicy: ExponentialBackoffRetryPolicy, - requestTimeout: number, topics: Set) { - super(clientId, ClientType.PRODUCER, accessPoint, requestTimeout, retryPolicy); + constructor(namespace: string, clientId: string, accessPoint: Endpoints, retryPolicy: ExponentialBackoffRetryPolicy, requestTimeout: number, topics: Set) { + super(namespace, clientId, ClientType.PRODUCER, accessPoint, requestTimeout, retryPolicy); this.#topics = topics; } @@ -54,6 +53,7 @@ export class PublishingSettings extends Settings { .setValidateMessageType(this.#validateMessageType); for (const topic of this.#topics) { publishing.addTopics().setName(topic); + publishing.addTopics().setResourceNamespace(this.namespace); } return new SettingsPB() .setClientType(this.clientType) diff --git a/nodejs/test/consumer/SimpleConsumer.test.ts b/nodejs/test/consumer/SimpleConsumer.test.ts index 8e375a90d..3962a2ea1 100644 --- a/nodejs/test/consumer/SimpleConsumer.test.ts +++ b/nodejs/test/consumer/SimpleConsumer.test.ts @@ -21,7 +21,7 @@ import { SimpleConsumer, FilterExpression, Producer, } from '../../src'; -import { topics, endpoints, sessionCredentials } from '../helper'; +import { topics, endpoints, sessionCredentials, namespace } from '../helper'; describe('test/consumer/SimpleConsumer.test.ts', () => { let producer: Producer | null = null; @@ -42,6 +42,7 @@ describe('test/consumer/SimpleConsumer.test.ts', () => { if (!sessionCredentials) return; simpleConsumer = new SimpleConsumer({ endpoints, + namespace, sessionCredentials, consumerGroup: 'nodejs-unittest-group', subscriptions: new Map().set(topics.delay, FilterExpression.SUB_ALL), @@ -53,6 +54,7 @@ describe('test/consumer/SimpleConsumer.test.ts', () => { if (!sessionCredentials) return; simpleConsumer = new SimpleConsumer({ endpoints, + namespace, sessionCredentials: { ...sessionCredentials, accessKey: 'wrong', @@ -69,6 +71,7 @@ describe('test/consumer/SimpleConsumer.test.ts', () => { if (!sessionCredentials) return; simpleConsumer = new SimpleConsumer({ endpoints, + namespace, sessionCredentials: { ...sessionCredentials, accessSecret: 'wrong', @@ -88,11 +91,13 @@ describe('test/consumer/SimpleConsumer.test.ts', () => { const tag = `nodejs-unittest-tag-${randomUUID()}`; producer = new Producer({ endpoints, - sessionCredentials, + namespace, + sessionCredentials }); await producer.startup(); simpleConsumer = new SimpleConsumer({ endpoints, + namespace, sessionCredentials, consumerGroup: `nodejs-unittest-group-${randomUUID()}`, subscriptions: new Map().set(topic, new FilterExpression(tag)), diff --git a/nodejs/test/helper.ts b/nodejs/test/helper.ts index 4a3deec26..4cd23fa4b 100644 --- a/nodejs/test/helper.ts +++ b/nodejs/test/helper.ts @@ -18,6 +18,7 @@ import { SessionCredentials } from '../src/client'; export const endpoints = process.env.ROCKETMQ_NODEJS_CLIENT_ENDPOINTS ?? 'localhost:8081'; +export const namespace = process.env.ROCKETMQ_NODEJS_CLIENT_NAMESPACE ?? ''; export const topics = { normal: 'TopicTestForNormal', fifo: 'TopicTestForFifo', diff --git a/nodejs/test/producer/Producer.test.ts b/nodejs/test/producer/Producer.test.ts index 005cd3b94..66deec777 100644 --- a/nodejs/test/producer/Producer.test.ts +++ b/nodejs/test/producer/Producer.test.ts @@ -19,7 +19,7 @@ import { strict as assert } from 'node:assert'; import { randomUUID } from 'node:crypto'; import { NotFoundException, Producer, SimpleConsumer } from '../../src'; import { TransactionResolution } from '../../proto/apache/rocketmq/v2/definition_pb'; -import { topics, endpoints, sessionCredentials, consumerGroup } from '../helper'; +import { topics, endpoints, sessionCredentials, consumerGroup, namespace } from '../helper'; describe('test/producer/Producer.test.ts', () => { let producer: Producer | null = null; @@ -39,6 +39,7 @@ describe('test/producer/Producer.test.ts', () => { it('should startup success', async () => { producer = new Producer({ endpoints, + namespace, sessionCredentials, maxAttempts: 2, }); @@ -66,6 +67,7 @@ describe('test/producer/Producer.test.ts', () => { producer = new Producer({ topic: 'TopicTest-not-exists', endpoints, + namespace, sessionCredentials, maxAttempts: 2, }); @@ -87,6 +89,7 @@ describe('test/producer/Producer.test.ts', () => { const tag = `nodejs-unittest-tag-${randomUUID()}`; producer = new Producer({ endpoints, + namespace, sessionCredentials, maxAttempts: 2, }); @@ -108,6 +111,7 @@ describe('test/producer/Producer.test.ts', () => { simpleConsumer = new SimpleConsumer({ consumerGroup, endpoints, + namespace, sessionCredentials, subscriptions: new Map().set(topic, tag), awaitDuration: 3000, @@ -124,6 +128,7 @@ describe('test/producer/Producer.test.ts', () => { const tag = `nodejs-unittest-tag-${randomUUID()}`; producer = new Producer({ endpoints, + namespace, sessionCredentials, maxAttempts: 2, }); @@ -147,6 +152,7 @@ describe('test/producer/Producer.test.ts', () => { simpleConsumer = new SimpleConsumer({ consumerGroup, endpoints, + namespace, sessionCredentials, subscriptions: new Map().set(topic, tag), awaitDuration: 3000, @@ -166,6 +172,7 @@ describe('test/producer/Producer.test.ts', () => { const tag = `nodejs-unittest-tag-${randomUUID()}`; producer = new Producer({ endpoints, + namespace, sessionCredentials, maxAttempts: 2, }); @@ -173,6 +180,7 @@ describe('test/producer/Producer.test.ts', () => { simpleConsumer = new SimpleConsumer({ consumerGroup, endpoints, + namespace, sessionCredentials, subscriptions: new Map().set(topic, tag), awaitDuration: 3000, @@ -238,6 +246,7 @@ describe('test/producer/Producer.test.ts', () => { const tag = `nodejs-unittest-tag-${randomUUID()}`; producer = new Producer({ endpoints, + namespace, sessionCredentials, maxAttempts: 2, checker: { @@ -266,6 +275,7 @@ describe('test/producer/Producer.test.ts', () => { simpleConsumer = new SimpleConsumer({ consumerGroup, endpoints, + namespace, sessionCredentials, subscriptions: new Map().set(topic, tag), awaitDuration: 3000,