Skip to content

Commit

Permalink
Client configuration add namespace (#750)
Browse files Browse the repository at this point in the history
* clientConfiguration add namespace

* update beta version

* add namespace check

* add namespace

* add namespace

* update version

* Remove empty values from settings

* update format and single quotes

* update format

* update version

* remove codecov

* Change namespace to required

* Change namespace to required

* Change namespace to required

* test adds default parameters

---------

Co-authored-by: zh378814 <[email protected]>
  • Loading branch information
zhaohai666 and zh378814 committed May 17, 2024
1 parent 5790fc4 commit bc2a112
Show file tree
Hide file tree
Showing 13 changed files with 47 additions and 63 deletions.
46 changes: 0 additions & 46 deletions .github/workflows/nodejs_coverage.yml

This file was deleted.

1 change: 1 addition & 0 deletions nodejs/examples/Producer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import { Producer } from '..';

const producer = new Producer({
endpoints: '127.0.0.1:8081',
namespace: ''
});
await producer.startup();

Expand Down
1 change: 1 addition & 0 deletions nodejs/examples/SimpleConsumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import { SimpleConsumer } from '..';
const simpleConsumer = new SimpleConsumer({
consumerGroup: 'nodejs-demo-group',
endpoints: '127.0.0.1:8081',
namespace: '',
subscriptions: new Map().set('TopicTest', 'nodejs-demo'),
});
await simpleConsumer.startup();
Expand Down
6 changes: 6 additions & 0 deletions nodejs/src/client/BaseClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ export interface BaseClientOptions {
* - example.com:8443
*/
endpoints: string;
namespace: string;
sessionCredentials?: SessionCredentials;
requestTimeout?: number;
logger?: ILogger;
Expand All @@ -76,6 +77,7 @@ export abstract class BaseClient {
readonly clientType = ClientType.CLIENT_TYPE_UNSPECIFIED;
readonly sslEnabled: boolean;
readonly #sessionCredentials?: SessionCredentials;
readonly namespace: string;
protected readonly endpoints: Endpoints;
protected readonly isolated = new Map<string, Endpoints>();
protected readonly requestTimeout: number;
Expand All @@ -92,6 +94,7 @@ export abstract class BaseClient {
this.logger = options.logger ?? getDefaultLogger();
this.sslEnabled = options.sslEnabled === true;
this.endpoints = new Endpoints(options.endpoints);
this.namespace = options.namespace;
this.#sessionCredentials = options.sessionCredentials;
// https://rocketmq.apache.org/docs/introduction/03limits/
// Default request timeout is 3000ms
Expand Down Expand Up @@ -288,6 +291,9 @@ export abstract class BaseClient {
metadata.set('x-mq-language', 'HTTP');
// version of client
metadata.set('x-mq-client-version', UserAgent.INSTANCE.version);
if (this.namespace) {
metadata.set('x-mq-namespace', this.namespace);
}
if (this.#sessionCredentials) {
if (this.#sessionCredentials.securityToken) {
metadata.set('x-mq-session-token', this.#sessionCredentials.securityToken);
Expand Down
5 changes: 3 additions & 2 deletions nodejs/src/client/Settings.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,16 @@ import { Endpoints } from '../route/Endpoints';
import { RetryPolicy } from '../retry';

export abstract class Settings {
protected readonly namespace: string;
protected readonly clientId: string;
protected readonly clientType: ClientType;
protected readonly accessPoint: Endpoints;
protected retryPolicy?: RetryPolicy;
protected readonly requestTimeout: number;

constructor(clientId: string, clientType: ClientType, accessPoint: Endpoints, requestTimeout: number,
retryPolicy?: RetryPolicy) {
constructor(namespace: string, clientId: string, clientType: ClientType, accessPoint: Endpoints, requestTimeout: number, retryPolicy?: RetryPolicy) {
this.clientId = clientId;
this.namespace = namespace;
this.clientType = clientType;
this.accessPoint = accessPoint;
this.retryPolicy = retryPolicy;
Expand Down
2 changes: 1 addition & 1 deletion nodejs/src/consumer/SimpleConsumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ export class SimpleConsumer extends Consumer {
}
}
this.#awaitDuration = options.awaitDuration ?? 30000;
this.#simpleSubscriptionSettings = new SimpleSubscriptionSettings(this.clientId, this.endpoints,
this.#simpleSubscriptionSettings = new SimpleSubscriptionSettings(options.namespace, this.clientId, this.endpoints,
this.consumerGroup, this.requestTimeout, this.#awaitDuration, this.#subscriptionExpressions);
}

Expand Down
5 changes: 2 additions & 3 deletions nodejs/src/consumer/SimpleSubscriptionSettings.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,8 @@ export class SimpleSubscriptionSettings extends Settings {
readonly group: string;
readonly subscriptionExpressions: Map<string, FilterExpression>;

constructor(clientId: string, accessPoint: Endpoints, group: string,
requestTimeout: number, longPollingTimeout: number, subscriptionExpressions: Map<string, FilterExpression>) {
super(clientId, ClientType.SIMPLE_CONSUMER, accessPoint, requestTimeout);
constructor(namespace: string, clientId: string, accessPoint: Endpoints, group: string, requestTimeout: number, longPollingTimeout: number, subscriptionExpressions: Map<string, FilterExpression>) {
super(namespace, clientId, ClientType.SIMPLE_CONSUMER, accessPoint, requestTimeout);
this.longPollingTimeout = longPollingTimeout;
this.group = group;
this.subscriptionExpressions = subscriptionExpressions;
Expand Down
6 changes: 4 additions & 2 deletions nodejs/src/message/PublishingMessage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ export class PublishingMessage extends Message {
* This method should be invoked before each message sending, because the born time is reset before each
* invocation, which means that it should not be invoked ahead of time.
*/
toProtobuf(mq: MessageQueue) {
toProtobuf(namespace: string, mq: MessageQueue) {
const systemProperties = new SystemProperties()
.setKeysList(this.keys)
.setMessageId(this.messageId)
Expand All @@ -87,8 +87,10 @@ export class PublishingMessage extends Message {
systemProperties.setMessageGroup(this.messageGroup);
}

const resource = createResource(this.topic);
resource.setResourceNamespace(namespace);
const message = new MessagePB()
.setTopic(createResource(this.topic))
.setTopic(resource)
.setBody(this.body)
.setSystemProperties(systemProperties);
if (this.properties) {
Expand Down
10 changes: 7 additions & 3 deletions nodejs/src/producer/Producer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ export class Producer extends BaseClient {
// https://rocketmq.apache.org/docs/introduction/03limits/
// Default max number of message sending retries is 3
const retryPolicy = ExponentialBackoffRetryPolicy.immediatelyRetryPolicy(options.maxAttempts ?? 3);
this.#publishingSettings = new PublishingSettings(this.clientId, this.endpoints, retryPolicy,
this.#publishingSettings = new PublishingSettings(options.namespace, this.clientId, this.endpoints, retryPolicy,
this.requestTimeout, this.topics);
this.#checker = options.checker;
}
Expand All @@ -85,7 +85,7 @@ export class Producer extends BaseClient {
const request = new EndTransactionRequest()
.setMessageId(messageId)
.setTransactionId(transactionId)
.setTopic(createResource(message.topic))
.setTopic(createResource(message.topic).setResourceNamespace(this.namespace))
.setResolution(resolution);
const response = await this.rpcClientManager.endTransaction(endpoints, request, this.requestTimeout);
StatusChecker.check(response.getStatus()?.toObject());
Expand Down Expand Up @@ -187,7 +187,11 @@ export class Producer extends BaseClient {
#wrapSendMessageRequest(pubMessages: PublishingMessage[], mq: MessageQueue) {
const request = new SendMessageRequest();
for (const pubMessage of pubMessages) {
request.addMessages(pubMessage.toProtobuf(mq));
if (this.namespace) {
request.addMessages(pubMessage.toProtobuf(this.namespace, mq));
} else {
request.addMessages(pubMessage.toProtobuf('', mq));
}
}
return request;
}
Expand Down
6 changes: 3 additions & 3 deletions nodejs/src/producer/PublishingSettings.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,8 @@ export class PublishingSettings extends Settings {
#maxBodySizeBytes = 4 * 1024 * 1024;
#validateMessageType = true;

constructor(clientId: string, accessPoint: Endpoints, retryPolicy: ExponentialBackoffRetryPolicy,
requestTimeout: number, topics: Set<string>) {
super(clientId, ClientType.PRODUCER, accessPoint, requestTimeout, retryPolicy);
constructor(namespace: string, clientId: string, accessPoint: Endpoints, retryPolicy: ExponentialBackoffRetryPolicy, requestTimeout: number, topics: Set<string>) {
super(namespace, clientId, ClientType.PRODUCER, accessPoint, requestTimeout, retryPolicy);
this.#topics = topics;
}

Expand All @@ -54,6 +53,7 @@ export class PublishingSettings extends Settings {
.setValidateMessageType(this.#validateMessageType);
for (const topic of this.#topics) {
publishing.addTopics().setName(topic);
publishing.addTopics().setResourceNamespace(this.namespace);
}
return new SettingsPB()
.setClientType(this.clientType)
Expand Down
9 changes: 7 additions & 2 deletions nodejs/test/consumer/SimpleConsumer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import {
SimpleConsumer, FilterExpression,
Producer,
} from '../../src';
import { topics, endpoints, sessionCredentials } from '../helper';
import { topics, endpoints, sessionCredentials, namespace } from '../helper';

describe('test/consumer/SimpleConsumer.test.ts', () => {
let producer: Producer | null = null;
Expand All @@ -42,6 +42,7 @@ describe('test/consumer/SimpleConsumer.test.ts', () => {
if (!sessionCredentials) return;
simpleConsumer = new SimpleConsumer({
endpoints,
namespace,
sessionCredentials,
consumerGroup: 'nodejs-unittest-group',
subscriptions: new Map().set(topics.delay, FilterExpression.SUB_ALL),
Expand All @@ -53,6 +54,7 @@ describe('test/consumer/SimpleConsumer.test.ts', () => {
if (!sessionCredentials) return;
simpleConsumer = new SimpleConsumer({
endpoints,
namespace,
sessionCredentials: {
...sessionCredentials,
accessKey: 'wrong',
Expand All @@ -69,6 +71,7 @@ describe('test/consumer/SimpleConsumer.test.ts', () => {
if (!sessionCredentials) return;
simpleConsumer = new SimpleConsumer({
endpoints,
namespace,
sessionCredentials: {
...sessionCredentials,
accessSecret: 'wrong',
Expand All @@ -88,11 +91,13 @@ describe('test/consumer/SimpleConsumer.test.ts', () => {
const tag = `nodejs-unittest-tag-${randomUUID()}`;
producer = new Producer({
endpoints,
sessionCredentials,
namespace,
sessionCredentials
});
await producer.startup();
simpleConsumer = new SimpleConsumer({
endpoints,
namespace,
sessionCredentials,
consumerGroup: `nodejs-unittest-group-${randomUUID()}`,
subscriptions: new Map().set(topic, new FilterExpression(tag)),
Expand Down
1 change: 1 addition & 0 deletions nodejs/test/helper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import { SessionCredentials } from '../src/client';

export const endpoints = process.env.ROCKETMQ_NODEJS_CLIENT_ENDPOINTS ?? 'localhost:8081';
export const namespace = process.env.ROCKETMQ_NODEJS_CLIENT_NAMESPACE ?? '';
export const topics = {
normal: 'TopicTestForNormal',
fifo: 'TopicTestForFifo',
Expand Down
12 changes: 11 additions & 1 deletion nodejs/test/producer/Producer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import { strict as assert } from 'node:assert';
import { randomUUID } from 'node:crypto';
import { NotFoundException, Producer, SimpleConsumer } from '../../src';
import { TransactionResolution } from '../../proto/apache/rocketmq/v2/definition_pb';
import { topics, endpoints, sessionCredentials, consumerGroup } from '../helper';
import { topics, endpoints, sessionCredentials, consumerGroup, namespace } from '../helper';

describe('test/producer/Producer.test.ts', () => {
let producer: Producer | null = null;
Expand All @@ -39,6 +39,7 @@ describe('test/producer/Producer.test.ts', () => {
it('should startup success', async () => {
producer = new Producer({
endpoints,
namespace,
sessionCredentials,
maxAttempts: 2,
});
Expand Down Expand Up @@ -66,6 +67,7 @@ describe('test/producer/Producer.test.ts', () => {
producer = new Producer({
topic: 'TopicTest-not-exists',
endpoints,
namespace,
sessionCredentials,
maxAttempts: 2,
});
Expand All @@ -87,6 +89,7 @@ describe('test/producer/Producer.test.ts', () => {
const tag = `nodejs-unittest-tag-${randomUUID()}`;
producer = new Producer({
endpoints,
namespace,
sessionCredentials,
maxAttempts: 2,
});
Expand All @@ -108,6 +111,7 @@ describe('test/producer/Producer.test.ts', () => {
simpleConsumer = new SimpleConsumer({
consumerGroup,
endpoints,
namespace,
sessionCredentials,
subscriptions: new Map().set(topic, tag),
awaitDuration: 3000,
Expand All @@ -124,6 +128,7 @@ describe('test/producer/Producer.test.ts', () => {
const tag = `nodejs-unittest-tag-${randomUUID()}`;
producer = new Producer({
endpoints,
namespace,
sessionCredentials,
maxAttempts: 2,
});
Expand All @@ -147,6 +152,7 @@ describe('test/producer/Producer.test.ts', () => {
simpleConsumer = new SimpleConsumer({
consumerGroup,
endpoints,
namespace,
sessionCredentials,
subscriptions: new Map().set(topic, tag),
awaitDuration: 3000,
Expand All @@ -166,13 +172,15 @@ describe('test/producer/Producer.test.ts', () => {
const tag = `nodejs-unittest-tag-${randomUUID()}`;
producer = new Producer({
endpoints,
namespace,
sessionCredentials,
maxAttempts: 2,
});
await producer.startup();
simpleConsumer = new SimpleConsumer({
consumerGroup,
endpoints,
namespace,
sessionCredentials,
subscriptions: new Map().set(topic, tag),
awaitDuration: 3000,
Expand Down Expand Up @@ -238,6 +246,7 @@ describe('test/producer/Producer.test.ts', () => {
const tag = `nodejs-unittest-tag-${randomUUID()}`;
producer = new Producer({
endpoints,
namespace,
sessionCredentials,
maxAttempts: 2,
checker: {
Expand Down Expand Up @@ -266,6 +275,7 @@ describe('test/producer/Producer.test.ts', () => {
simpleConsumer = new SimpleConsumer({
consumerGroup,
endpoints,
namespace,
sessionCredentials,
subscriptions: new Map().set(topic, tag),
awaitDuration: 3000,
Expand Down

0 comments on commit bc2a112

Please sign in to comment.