Skip to content

Commit

Permalink
Add support to stop streaing messages.
Browse files Browse the repository at this point in the history
  • Loading branch information
nishants committed Mar 28, 2021
1 parent ea20cb7 commit 109af64
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 7 deletions.
21 changes: 20 additions & 1 deletion core/src/app.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,27 @@ module.exports = {
const dataFiles = TemplateReader.create({configPath});

const mappingResolver = await endpointDataFileResolver.createResolvers({endpoints, mappings, templates: dataFiles});
const globalState = {
streamingContexts: {}
};

const globalActions = {
"stopStreaming@": async (arg) => {
const contextId = arg["context@"];
const streamingContext = globalState.streamingContexts[contextId]
if(!contextId){
throw new Error("stopStreaming@: Please provide context@ to stop.")
}
if(!streamingContext){
throw new Error(`stopStreaming@: No streaming context found for context@=${contextId}`)
}
delete globalState.streamingContexts[contextId]
await streamingContext.stop();
}
}

const controllers = await Promise.all(endpoints.map((endpoint) => {
return Controllers.create(endpoint, mappingResolver, dataFiles)
return Controllers.create(endpoint, mappingResolver, dataFiles, globalState, globalActions)
}));

const client = recording ? await Client.create({host : remoteHost, port : remotePort, endpoints}) : null;
Expand Down
28 changes: 26 additions & 2 deletions core/src/controllers/duplex-streaming-controller.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
const DEFAULT_STREAMING_DELAY = 1000;

module.exports = {
create: async ({endpoint, mappingResolver, dataFiles}) => {
create: async ({endpoint, mappingResolver, dataFiles, globalState, globalActions}) => {
return {
endpointId: endpoint.getId(),
canHandle: (endpointId) => {
Expand All @@ -18,6 +18,29 @@ module.exports = {
const template = await dataFiles.get(responseFile);
const response = template.getResponse().compile({request : {body: request}});

// ** Start of of streaming context handling ****************************************
const stop = async () => {
keepStreaming = false;
};

// Set streaming context
if(response.hasOwnProperty("context")){
globalState.streamingContexts[response.context] = {stop};
}

// Check for actions
if(response.actions){
for(const action in response.actions){
const globalAction = globalActions[action];
if(!globalAction){
throw new Error(`Undefined actions : ${action}`)
}
await globalAction(response.actions[action]);
console.log("Stopped streaming for messages")
}
}
// ** End of streaming context handling ****************************************

if(!response){
return callContext.end();
}
Expand All @@ -27,11 +50,12 @@ module.exports = {
let keepStreaming = !response.doNotRepeat ;

for(let i =0; i < responses.length || keepStreaming; i++){
await new Promise((resolve => setTimeout(resolve, streamingDelay)));
const nextMessage = responses[i%responses.length];
console.log(`Sending response for ${endpoint.getId()} from ${responseFile} `, nextMessage);
callContext.write(nextMessage);
await new Promise((resolve => setTimeout(resolve, streamingDelay)));
}
console.log("End of streaming for ", request)
};

callContext.on('data', (response) => {
Expand Down
8 changes: 4 additions & 4 deletions core/src/controllers/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ const ClientStreamController = require('./duplex-streaming-controller');
const DuplexStreamController = require('./duplex-streaming-controller');

module.exports = {
create: (endpoint, mappingResolver, dataFiles) => {
create: (endpoint, mappingResolver, dataFiles, globalState, globalActions) => {
const clientStream = endpoint.isStreamingRequest();
const serverStream = endpoint.isStreamingResponse();

Expand All @@ -17,13 +17,13 @@ module.exports = {
}

if(isServerStream){
return ServerStreamController.create({endpoint, mappingResolver, dataFiles});
return ServerStreamController.create({endpoint, mappingResolver, dataFiles, globalState, globalActions});
}

if(isClientStream){
return ClientStreamController.create({endpoint, mappingResolver, dataFiles});
return ClientStreamController.create({endpoint, mappingResolver, dataFiles, globalState, globalActions});
}

return DuplexStreamController.create({endpoint, mappingResolver, dataFiles});
return DuplexStreamController.create({endpoint, mappingResolver, dataFiles, globalState, globalActions});
}
}
2 changes: 2 additions & 0 deletions core/src/mappingTemplate.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ module.exports = {
if(response['stream@']){
return {
stream: compiled(response['stream@'], variables),
context: compiled(response['context@'], variables),
actions: compiled(response['actions@'], variables),
doNotRepeat: !!response['doNotRepeat@'],
streamInterval: response['streamInterval@'],
}
Expand Down

0 comments on commit 109af64

Please sign in to comment.