Skip to content

Commit 8f4fea9

Browse files
committed
ts: first draft of rxjs-based runtime
1 parent 78d7116 commit 8f4fea9

File tree

4 files changed

+643
-512
lines changed

4 files changed

+643
-512
lines changed

examples/typescript/src/subscribe-token-transactions.ts

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -108,23 +108,24 @@ async function main() {
108108

109109
console.log(`Starting subscription for group ${groupName}...`);
110110

111-
let subscription: DragonsmouthAdapterSession;
112-
113-
subscription = await client.dragonsmouthSubscribeWithConfig(
111+
const session = await client.dragonsmouthSubscribeWithConfig(
114112
groupName,
115113
request,
116114
subscribeConfig,
117-
FUMAROLE_X_TOKEN
118115
);
119116

120-
const { sink, source, fumaroleHandle } = subscription;
117+
session.startWith(async (next) => {
121118

122-
// await fumaroleHandle;
119+
})
123120

124-
fumaroleHandle.catch((e) => {
125-
console.log("caught in fumarole handle");
126-
console.log(e);
127-
});
121+
// const { sink, source, fumaroleHandle } = subscription;
122+
123+
// // await fumaroleHandle;
124+
125+
// fumaroleHandle.catch((e) => {
126+
// console.log("caught in fumarole handle");
127+
// console.log(e);
128+
// });
128129

129130
// Handle fumarole connection closure in background
130131
fumaroleHandle.then((res) => {

typescript-sdk/src/connectivity.ts

Lines changed: 2 additions & 101 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,7 @@ export class FumaroleGrpcConnector {
119119
if (this.config.xToken !== undefined) {
120120
this.logger.debug("Adding x-token to metadata");
121121
metadata.add(X_TOKEN_HEADER, this.config.xToken);
122+
// TODO remove this
122123
metadata.add("x-subscription-id", this.config.xToken);
123124
}
124125
return callback(null, metadata);
@@ -183,104 +184,4 @@ export class FumaroleGrpcConnector {
183184
throw error;
184185
}
185186
}
186-
}
187-
188-
// Helper function to create a gRPC channel (for backward compatibility)
189-
export function createGrpcChannel(
190-
endpoint: string,
191-
xToken?: string,
192-
compression?: any,
193-
...grpcOptions: { [key: string]: any }[]
194-
): Client {
195-
console.debug(`Creating gRPC channel for endpoint: ${endpoint}`);
196-
197-
const defaultOptions: { [key: string]: any } = {
198-
"grpc.max_receive_message_length": 111111110,
199-
"grpc.keepalive_time_ms": 10000,
200-
"grpc.keepalive_timeout_ms": 5000,
201-
"grpc.http2.min_time_between_pings_ms": 10000,
202-
"grpc.keepalive_permit_without_calls": 1,
203-
};
204-
205-
const channelOptions: { [key: string]: any } = {
206-
...defaultOptions,
207-
};
208-
209-
// Add additional options
210-
grpcOptions.forEach((opt) => {
211-
Object.entries(opt).forEach(([key, value]) => {
212-
console.debug(`Setting channel option: ${key} = ${value}`);
213-
channelOptions[key] = value;
214-
});
215-
});
216-
217-
try {
218-
const endpointURL = new URL(endpoint);
219-
console.debug(
220-
`Parsed URL - protocol: ${endpointURL.protocol}, hostname: ${endpointURL.hostname}, port: ${endpointURL.port}`
221-
);
222-
223-
let port = endpointURL.port;
224-
if (port === "") {
225-
switch (endpointURL.protocol) {
226-
case "https:":
227-
port = "443";
228-
break;
229-
case "http:":
230-
port = "80";
231-
break;
232-
}
233-
console.debug(`No port specified, using default port: ${port}`);
234-
}
235-
236-
let channelCredentials: ChannelCredentials;
237-
238-
// Check if we need to use TLS.
239-
if (endpointURL.protocol === "https:") {
240-
console.debug("HTTPS detected, setting up SSL credentials");
241-
const sslCreds = credentials.createSsl();
242-
console.debug("SSL credentials created");
243-
244-
const callCreds = credentials.createFromMetadataGenerator(
245-
(_params, callback) => {
246-
const metadata = new Metadata();
247-
if (xToken !== undefined) {
248-
console.debug("Adding x-token to metadata");
249-
metadata.add(X_TOKEN_HEADER, xToken);
250-
metadata.add("x-subscription-id", xToken);
251-
}
252-
return callback(null, metadata);
253-
}
254-
);
255-
console.debug("Call credentials created");
256-
257-
channelCredentials = credentials.combineChannelCredentials(
258-
sslCreds,
259-
callCreds
260-
);
261-
console.debug("Combined credentials created for secure channel");
262-
} else {
263-
channelCredentials = credentials.createInsecure();
264-
console.debug("Using insecure channel without authentication");
265-
}
266-
267-
const finalEndpoint = `${endpointURL.hostname}:${port}`;
268-
console.debug(`Creating gRPC client with endpoint: ${finalEndpoint}`);
269-
270-
const client = new Client(
271-
finalEndpoint,
272-
channelCredentials,
273-
channelOptions
274-
);
275-
276-
console.debug("gRPC client created successfully");
277-
return client;
278-
} catch (error) {
279-
console.debug(
280-
`Error creating gRPC channel: ${
281-
error instanceof Error ? error.message : String(error)
282-
}`
283-
);
284-
throw error;
285-
}
286-
}
187+
}

typescript-sdk/src/index.ts

Lines changed: 18 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -46,9 +46,8 @@ import {
4646
} from "./types";
4747
import { FumaroleSM } from "./runtime/state-machine";
4848
import { downloadSlotObserverFactory, GrpcSlotDownloader } from "./runtime/grpc-slot-downloader";
49-
import { DownloadTaskArgs, DownloadTaskResult, FumeDragonsmouthRuntime, RuntimeEvent } from "./runtime/runtime";
49+
import { DownloadTaskArgs, DownloadTaskResult, fumaroleObservable, FumaroleRuntimeArgs, RuntimeEvent } from "./runtime/runtime";
5050
import { firstValueFrom, from, Observable, Observer, share, Subject } from "rxjs";
51-
import { combineHostPort } from "@grpc/grpc-js/build/src/uri-parser";
5251
import { createDeferred } from "./utils/promise";
5352

5453
(BigInt.prototype as any).toJSON = function () {
@@ -146,32 +145,26 @@ export class FumaroleClient {
146145
async dragonsmouthSubscribe(
147146
consumerGroupName: string,
148147
request: SubscribeRequest,
149-
xToken: string
150-
): Promise<void> {
148+
): Promise<Observable<SubscribeUpdate>> {
151149
return this.dragonsmouthSubscribeWithConfig(
152150
consumerGroupName,
153151
request,
154152
getDefaultFumaroleSubscribeConfig(),
155-
xToken
156153
);
157154
}
158155

159156
public async dragonsmouthSubscribeWithConfig(
160157
consumerGroupName: string,
161-
request: SubscribeRequest,
158+
initialSubscribeRequest: SubscribeRequest,
162159
config: FumaroleSubscribeConfig,
163-
xToken: string
164-
): Promise<void> {
160+
): Promise<Observable<SubscribeUpdate>> {
165161

166162
const initialJoin: JoinControlPlane = { consumerGroupName };
167163
const initialJoinCommand: ControlCommand = { initialJoin };
168164
const controlPlaneCommandSubject = new Subject<ControlCommand>();
169165
const fumaroleRuntimeEventSubject = new Subject<RuntimeEvent>();
170166

171167
const metadata = new Metadata();
172-
// TODO remove the x-subscription-id
173-
metadata.add("x-subscription-id", xToken);
174-
metadata.add("x-token", xToken);
175168

176169
console.log("SUBSCRIBE METADATA");
177170
console.log(metadata.getMap());
@@ -214,44 +207,32 @@ export class FumaroleClient {
214207
const sm = new FumaroleSM(lastCommittedOffset, config.slotMemoryRetention);
215208

216209
const dragonsmouthOutlet = new Subject<SubscribeUpdate>();
217-
const downloadTaskSubject = new Subject<DownloadTaskResult>();
210+
const downloadTaskResultSubject = new Subject<DownloadTaskResult>();
218211
// // Connect data plane and create slot downloader
219212
const dataPlaneClient = await this.connector.connect();
220213
const grpcSlotDownloadCtx: GrpcSlotDownloader = {
221214
client: dataPlaneClient,
222215
client_metadata: metadata,
223216
dragonsmouthOutlet: dragonsmouthOutlet,
224-
downloadTaskResultObserver: downloadTaskSubject,
217+
downloadTaskResultObserver: downloadTaskResultSubject,
225218
}
226219
const grpcSlotDownloader: Observer<DownloadTaskArgs> = downloadSlotObserverFactory(
227220
grpcSlotDownloadCtx
228221
)
229-
230222

231223

232-
// Create Fume runtime
233-
// const rt = new FumeDragonsmouthRuntime(
234-
// sm,
235-
// grpcSlotDownloader,
236-
// subscribeRequestQueue,
237-
// request,
238-
// consumerGroupName,
239-
// fumeControlPlaneQ,
240-
// fumeControlPlaneRxQ,
241-
// dragonsmouthOutlet,
242-
// config.commitInterval,
243-
// config.gcInterval,
244-
// config.concurrentDownloadLimit
245-
// );
246-
247-
// const fumaroleHandle = rt.run();
248-
// console.log(`Fumarole handle created:`, fumaroleHandle);
249-
250-
// return {
251-
// sink: subscribeRequestQueue,
252-
// source: dragonsmouthOutlet,
253-
// fumaroleHandle,
254-
// };
224+
const runtimeArgs: FumaroleRuntimeArgs = {
225+
downloadTaskObserver: grpcSlotDownloader,
226+
downloadTaskResultObservable: downloadTaskResultSubject.asObservable(),
227+
controlPlaneObserver: controlPlaneCommandSubject,
228+
controlPlaneResponseObservable: ctrlPlaneResponseObservable,
229+
sm,
230+
commitIntervalMillis: config.commitInterval,
231+
maxConcurrentDownload: config.concurrentDownloadLimit,
232+
initialSubscribeRequest,
233+
}
234+
235+
return fumaroleObservable(runtimeArgs)
255236
}
256237

257238
async listConsumerGroups(): Promise<ListConsumerGroupsResponse> {

0 commit comments

Comments
 (0)