Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create/Remove Message Streams #30

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 20 additions & 1 deletion core/src/app.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,27 @@ module.exports = {
const dataFiles = TemplateReader.create({configPath});

const mappingResolver = await endpointDataFileResolver.createResolvers({endpoints, mappings, templates: dataFiles});
const globalState = {
streamingContexts: {}
};

const globalActions = {
"stopStreaming@": async (arg) => {
const contextId = arg["context@"];
const streamingContext = globalState.streamingContexts[contextId]
if(!contextId){
throw new Error("stopStreaming@: Please provide context@ to stop.")
}
if(!streamingContext){
throw new Error(`stopStreaming@: No streaming context found for context@=${contextId}`)
}
delete globalState.streamingContexts[contextId]
await streamingContext.stop();
}
}

const controllers = await Promise.all(endpoints.map((endpoint) => {
return Controllers.create(endpoint, mappingResolver, dataFiles)
return Controllers.create(endpoint, mappingResolver, dataFiles, globalState, globalActions)
}));

const client = recording ? await Client.create({host : remoteHost, port : remotePort, endpoints}) : null;
Expand Down
30 changes: 28 additions & 2 deletions core/src/controllers/duplex-streaming-controller.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
const DEFAULT_STREAMING_DELAY = 1000;

module.exports = {
create: async ({endpoint, mappingResolver, dataFiles}) => {
create: async ({endpoint, mappingResolver, dataFiles, globalState, globalActions}) => {
return {
endpointId: endpoint.getId(),
canHandle: (endpointId) => {
Expand All @@ -18,6 +18,29 @@ module.exports = {
const template = await dataFiles.get(responseFile);
const response = template.getResponse().compile({request : {body: request}});

// ** Start of of streaming context handling ****************************************
const stop = async () => {
keepStreaming = false;
};

// Set streaming context
if(response.hasOwnProperty("context")){
globalState.streamingContexts[response.context] = {stop};
}

// Check for actions
if(response.actions){
for(const action in response.actions){
const globalAction = globalActions[action];
if(!globalAction){
throw new Error(`Undefined actions : ${action}`)
}
await globalAction(response.actions[action]);
console.log("Stopped streaming for messages")
}
}
// ** End of streaming context handling ****************************************

if(!response){
return callContext.end();
}
Expand All @@ -27,9 +50,12 @@ module.exports = {
let keepStreaming = !response.doNotRepeat ;

for(let i =0; i < responses.length || keepStreaming; i++){
const nextMessage = responses[i%responses.length];
console.log(`Sending response for ${endpoint.getId()} from ${responseFile} `, nextMessage);
callContext.write(nextMessage);
await new Promise((resolve => setTimeout(resolve, streamingDelay)));
callContext.write(responses[i%responses.length]);
}
console.log("End of streaming for ", request)
};

callContext.on('data', (response) => {
Expand Down
8 changes: 4 additions & 4 deletions core/src/controllers/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ const ClientStreamController = require('./duplex-streaming-controller');
const DuplexStreamController = require('./duplex-streaming-controller');

module.exports = {
create: (endpoint, mappingResolver, dataFiles) => {
create: (endpoint, mappingResolver, dataFiles, globalState, globalActions) => {
const clientStream = endpoint.isStreamingRequest();
const serverStream = endpoint.isStreamingResponse();

Expand All @@ -17,13 +17,13 @@ module.exports = {
}

if(isServerStream){
return ServerStreamController.create({endpoint, mappingResolver, dataFiles});
return ServerStreamController.create({endpoint, mappingResolver, dataFiles, globalState, globalActions});
}

if(isClientStream){
return ClientStreamController.create({endpoint, mappingResolver, dataFiles});
return ClientStreamController.create({endpoint, mappingResolver, dataFiles, globalState, globalActions});
}

return DuplexStreamController.create({endpoint, mappingResolver, dataFiles});
return DuplexStreamController.create({endpoint, mappingResolver, dataFiles, globalState, globalActions});
}
}
2 changes: 2 additions & 0 deletions core/src/mappingTemplate.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ module.exports = {
if(response['stream@']){
return {
stream: compiled(response['stream@'], variables),
context: compiled(response['context@'], variables),
actions: compiled(response['actions@'], variables),
doNotRepeat: !!response['doNotRepeat@'],
streamInterval: response['streamInterval@'],
}
Expand Down
7 changes: 5 additions & 2 deletions core/src/utils/files.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@ const readYamlFile = (filePath) => {
};

const readYamlFileInDir = (dir, filePath) => {
return readTextFile(path.join(dir, filePath)).then(data => yaml.parse(data));
const yamlPath = path.join(dir, filePath);
return readTextFile(yamlPath).then(data => yaml.parse(data)).catch(e => {
throw `Failed to read file at ${yamlPath}: ${e.message}`
});
};

const writeYaml = (dir, file, content) => {
Expand Down Expand Up @@ -98,4 +101,4 @@ module.exports = {
createTempDir,
writeYaml,
copyFile
};
};
51 changes: 51 additions & 0 deletions core/tests/app/streamingContext.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
const appTestHelper = require('./appTestHelper');
const fixtures = require('../fixtures');

describe('app.js', () => {
let app;

beforeAll(async () => {
app = await appTestHelper.launchApp({
port: 50056,
protosPath: fixtures.pricesProject.protosPath,
configPath: fixtures.pricesProject.configPath,
});
});

afterAll(async () => {
await app.closeApp();
});

describe('Streaming context', () => {
const timeout = 60000;

test('should stop messages by streaming context', async () => {
const requestId = "#request:9231";
const expectedPriceMessage = {quote: "quote"};
const expectedStopMessage = {quote: `${requestId} won't be streamed anymore`};

// Create a streaming message
const clientStream = await app.client.openPriceStream();
await clientStream.sendMessage({uic: 501, assetType: 'Stock', requestId});

// // wait for 1 sec and close streaming
await new Promise(resolve => setTimeout(resolve, 1000));
await clientStream.sendMessage({uic: 599, assetType: 'Stock', requestId});

// Wait for another 1 sec to observer if any more messages were streamed
await new Promise(resolve => setTimeout(resolve, 1000));
clientStream.stop();
const streamedMessages = await clientStream.getNext();

// No messages must stream once the subscription is turned off
// No message must follow stop message

const lastMessage = streamedMessages.pop();
const secondLastMessage = streamedMessages.pop();

expect(secondLastMessage).toEqual(expectedPriceMessage);
expect(lastMessage).toEqual(expectedStopMessage);
});

});
});
6 changes: 5 additions & 1 deletion core/tests/fixtures/prices-project/config/mappings.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,11 @@ prices.streaming.Pricing.TwoWaySubscribe : [
"prices/stock-streams/211-stock.yaml",
"prices/stock-streams/212-stock.yaml",
"prices/stock-streams/299-stock-template.yaml",
"prices/stock-streams/300-stock-template-repeat.yaml"
"prices/stock-streams/300-stock-template-repeat.yaml",
],
[
"streaming/stream-with-context.yml",
"streaming/stop-stream-by-context.yml",
],
"prices/two-way-stream.js.yaml",
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
request@ : {
uic: "599",
assetType: "any@",
requestId: "any@",
}

response@ : {
"stream@" : [{
"quote": "{{request.body.requestId}} won't be streamed anymore",
}],
"actions@": {
"stopStreaming@": {"context@": "my-endpoint-{{request.body.requestId}}"}
},
"doNotRepeat@": true,
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
request@ : {
uic: "501",
assetType: "any@",
requestId: "any@",
}

response@ : {
"stream@" : [{"quote": "quote"}],
"doNotRepeat@": false,
"context@": "my-endpoint-{{request.body.requestId}}",
"streamInterval@": 10,
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@ package prices.streaming;
message PriceRequest {
string uic = 1;
string assetType = 2;
string requestId = 3;
}

// The response message
message PriceResponse {
string quote = 1;
}
}
38 changes: 38 additions & 0 deletions core/tests/helpers/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,44 @@ module.exports = {
setTimeout(() => call.end(), timeout);
}),

openPriceStream : (timeout = 1000) => new Promise(async (resolve, reject) => {
const protoDefinition = grpc.loadPackageDefinition(pricesProto).prices.streaming;
const client = new protoDefinition.Pricing(url, grpc.credentials.createInsecure());

const data = [];
const call = client.TwoWaySubscribe();


call.on('data', function(response) {
data.push(response);
});

call.on('end', function() {
resolve(data);
});

call.on('error', function(error) {
reject({data, error});
});
// setTimeout(() => call.end(), timeout);
resolve({
stop: async () => {
await call.end();
},
sendMessage : (request) => {
call.write(request);
},
getNext : () => {
return new Promise(resolve => {
// return updates received so far and clear saved data
const messages = data.splice(0);
resolve(messages);
})
}
}
)
}),

getClientStreamResponses : (requests, timeout = 1000) => new Promise(async (resolve, reject) => {
const protoDefinition = grpc.loadPackageDefinition(pricesProto).prices.streaming;
const client = new protoDefinition.Pricing(url, grpc.credentials.createInsecure());
Expand Down
3 changes: 2 additions & 1 deletion core/tests/init.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,8 @@ describe('init project', () => {
const expectedContent = {
'request@': {
assetType: '@any',
uic: '@any'
uic: '@any',
requestId: '@any',
},
'response@': {
'stream@': [{quote: 'string'}],
Expand Down