Skip to content

Commit

Permalink
Pass X-Api-Key for GRPC streaming client (#14)
Browse files Browse the repository at this point in the history
  • Loading branch information
denisbsu and denis4gold committed Jun 15, 2023
1 parent 290cfc3 commit 50dd1ba
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 5 deletions.
8 changes: 7 additions & 1 deletion src/client/proximaStreamClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,15 @@ export class ProximaStreamClient {
private readonly registry: StreamRegistry;
private readonly clients: Record<string, StreamDBConsumerClient>;
private readonly offsetsCache: NodeCache;
private readonly apiKey?: string;

public constructor(
private readonly options: StreamClientOptions = {
registry: new StreamRegistryClient(),
}
) {
this.registry = options.registry ?? new StreamRegistryClient();
this.apiKey = options.apiKey ?? this.registry.getApiKey();
this.clients = {};
this.offsetsCache = new NodeCache({ maxKeys: 10 * 1000 * 1000 });
}
Expand Down Expand Up @@ -133,11 +135,15 @@ export class ProximaStreamClient {
private getStreamConsumerClient(endpoint: string) {
return (
this.clients[endpoint] ??
(this.clients[endpoint] = new StreamDBConsumerClient(endpoint))
(this.clients[endpoint] = new StreamDBConsumerClient(
endpoint,
this.apiKey
))
);
}
}

export interface StreamClientOptions {
registry?: StreamRegistry;
apiKey?: string;
}
4 changes: 4 additions & 0 deletions src/client/singleStreamDbRegistry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ import { Offset, StreamEndpoint, StreamStats } from "../model";
export class SingleStreamDbRegistry implements StreamRegistry {
public constructor(private readonly streamDbUrl: string) {}

public getApiKey(): string | undefined {
return undefined;
}

public async getStreamEndpoints(
// eslint-disable-next-line @typescript-eslint/no-unused-vars
streamName: string,
Expand Down
1 change: 1 addition & 0 deletions src/client/streamRegistry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@ import { Offset, StreamEndpoint } from "../model";

export interface StreamRegistry {
getStreamEndpoints(stream: string, offset: Offset): Promise<StreamEndpoint[]>;
getApiKey(): string | undefined;
}
6 changes: 6 additions & 0 deletions src/client/streamRegistryClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ const DefaultRegistryClientOptions: StreamRegistryOptions = {

export class StreamRegistryClient implements StreamRegistry {
private readonly client: Axios;
private readonly apiKey?: string;

public constructor(
private readonly options: StreamRegistryOptions = DefaultRegistryClientOptions
Expand All @@ -32,6 +33,11 @@ export class StreamRegistryClient implements StreamRegistry {
(status >= 200 && status < 300) || status == 404,
headers: options.apiKey ? { "x-api-key": options.apiKey } : {},
});
this.apiKey = options.apiKey;
}

public getApiKey(): string | undefined {
return this.apiKey;
}

public async getStreamEndpoints(
Expand Down
22 changes: 18 additions & 4 deletions src/streamdb/consumerClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,26 @@ import { PausableStream, StreamController } from "../lib/pausableStream";
export class StreamDBConsumerClient {
private consumer: StreamConsumerServiceClient;

constructor(uri: string) {
const secure = uri.includes(":443");
const credentials = secure
constructor(uri: string, apiKey?: string) {
const isSecure = uri.includes(":443");
const channelCredentials = isSecure
? grpc.credentials.createSsl()
: grpc.credentials.createInsecure();
this.consumer = new StreamConsumerServiceClient(uri, credentials, {
const callCredentials = grpc.credentials.createFromMetadataGenerator(
(opts, callback) => {
const metadata = new grpc.Metadata();
if (apiKey !== undefined) {
metadata.add("x-api-key", apiKey);
}
callback(null, metadata);
}
);
const mergedCredentials = grpc.credentials.combineChannelCredentials(
channelCredentials,
callCredentials
);

this.consumer = new StreamConsumerServiceClient(uri, mergedCredentials, {
"grpc.keepalive_timeout_ms": 100 * 1000,
"grpc.keepalive_time_ms": 100 * 1000,
"grpc.keepalive_permit_without_calls": 1,
Expand Down

0 comments on commit 50dd1ba

Please sign in to comment.