Skip to content

Commit

Permalink
Merge branch 'master' into release-version-1.8.0
Browse files Browse the repository at this point in the history
  • Loading branch information
Lauren Frederick committed Jul 24, 2018
2 parents 86f3a9f + 8ad5077 commit 2dea593
Show file tree
Hide file tree
Showing 20 changed files with 616 additions and 37 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
## [v1.8.0] - 2018-07-23

### Added


- **CUMULUS-718** Adds integration test for Kinesis triggering a workflow.

- **GITC-776-3** Added more flexibility for rules. You can now edit all fields on the rule's record
We may need to update the api documentation to reflect this.

Expand Down
51 changes: 51 additions & 0 deletions example/app/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ default:
bucket: '{{buckets.internal.name}}'
distributionEndpoint: '{{api_distribution_url}}'
ems_provider: CUMULUS
streamName: '{{stackName}}-testStream'
awsAccountId: '{{AWS_ACCOUNT_ID}}'
awsRegion: us-east-1
IngestGranule:
IngestGranuleOutput:
granules:
Expand Down Expand Up @@ -244,3 +247,51 @@ jl:
shared-2:
name: rvl-internal
type: shared


# Begin-MHS-config
mhs:
prefix: mhs-cumulus
stackName: mhs-cumulus
stackNameNoDash: mhsCumulus

iams:
ecsRoleArn: 'arn:aws:iam::{{AWS_ACCOUNT_ID}}:role/mhs-ecs'
lambdaApiGatewayRoleArn: 'arn:aws:iam::{{AWS_ACCOUNT_ID}}:role/mhs-lambda-api-gateway'
lambdaProcessingRoleArn: 'arn:aws:iam::{{AWS_ACCOUNT_ID}}:role/mhs-lambda-processing'
stepRoleArn: 'arn:aws:iam::{{AWS_ACCOUNT_ID}}:role/mhs-steprole'
instanceProfile: 'arn:aws:iam::{{AWS_ACCOUNT_ID}}:instance-profile/mhs-ecs'
distributionRoleArn: 'arn:aws:iam::{{AWS_ACCOUNT_ID}}:role/mhs-distribution-api-lambda'

buckets:
internal:
name: mhs-internal
type: internal
private:
name: mhs-private
type: private
protected:
name: mhs-protected
type: protected
public:
name: mhs-public
type: public
protected-2:
name: mhs-protected-2
type: protected
shared:
name: cumulus-data-shared
type: shared
api_distribution_url: '{{API_DISTRIBUTION_URL}}'
system_bucket: '{{buckets.internal.name}}'

users:
- username: '{{EARTHDATA_LOGIN_USERNAME}}'

AutoScalingPolicyConfiguration:
GranulesTable:
enableAutoScaling: false
FilesTable:
enableAutoScaling: false

# END-mhs-config
15 changes: 15 additions & 0 deletions example/data/collections/L2_HR_PIXC-000.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{
"name": "L2_HR_PIXC",
"version": "000",
"provider_path": "podaac-cumulus/test-data",
"granuleId": "^.*$",
"granuleIdExtraction": "^(.*)(\\.h5|\\.cmr)",
"sampleFileName": "L2_HR_PIXC_product_0001-of-4154.h5",
"files": [
{
"bucket": "private",
"regex": ".*.h5$",
"sampleFileName": "L2_HR_PIXC_product_0001-of-4154.h5"
}
]
}
6 changes: 6 additions & 0 deletions example/data/providers/PODAAC_SWOT.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"globalConnectionLimit": 10,
"host": "replaced with internal bucket in addProviders()",
"id": "PODAAC_SWOT",
"protocol": "s3"
}
20 changes: 20 additions & 0 deletions example/data/records/L2_HR_PIXC_product_0001-of-4154.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
{
"provider": "PODAAC_SWOT",
"collection": "L2_HR_PIXC",
"bucket": "random-bucket",
"identifier": "<<REPLACED with random string>>",
"product": {
"name": "L2_HR_PIXC_product_0001-of-4154",
"dataVersion": "001",
"files": [
{
"type": "data",
"uri": "s3://random-bucket/cumulus-test-data/pdrs/L2_HR_PIXC_product_0001-of-4154.h5",
"name": "L2_HR_PIXC_product_0001-of-4154.h5",
"checksum-type": "md5",
"checksum": "bb802eb3daf7d2c4e641b7551e89527f",
"size": 322904208
}
]
}
}
15 changes: 15 additions & 0 deletions example/data/rules/L2_HR_PIXC_kinesisRule.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{
"collection": {
"name": "L2_HR_PIXC",
"version": "000"
},
"name": "L2_HR_PIXC_kinesisRule",
"provider": "PODAAC_SWOT",
"rule": {
"type": "kinesis",
"value": "arn:aws:kinesis:{{awsRegion}}:{{awsAccountId}}:stream/{{streamName}}",
"arn": "arn:aws:kinesis:{{awsRegion}}:{{awsAccountId}}:stream/{{streamName}}"
},
"state": "ENABLED",
"workflow": "KinesisTriggerTest"
}
14 changes: 12 additions & 2 deletions example/lambdas.yml
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ SfSnsReport:
handler: index.handler
timeout: 300
source: 'node_modules/@cumulus/sf-sns-report/dist'
useMessageAdapter: true
useMessageAdapter: true

SyncGranule:
handler: index.handler
Expand Down Expand Up @@ -107,7 +107,7 @@ ModisProcessing:
key: deploy/cumulus-process/modis/0.5.2.zip
runtime: python2.7

InRegionS3Policy:
InRegionS3Policy:
handler: index.inRegionS3Policy
memory: 256
timeout: 300
Expand All @@ -117,3 +117,13 @@ SnsS3Test:
handler: index.handler
source: 'lambdas/snsS3Test/'

CNMToCMA:
handler: 'gov.nasa.cumulus.CnmToGranuleHandler::handleRequestStreams'
timeout: 300
runtime: java8
memory: 128
s3Source:
bucket: '{{buckets.shared.name}}'
key: 'daacs/podaac/CNMToCMA-0.0.1.zip'
useMessageAdapter: false
launchInVpc: true
149 changes: 149 additions & 0 deletions example/spec/helpers/kinesisHelpers.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
'use strict';

const _ = require('lodash');
const { Kinesis, StepFunctions } = require('aws-sdk');

const {
LambdaStep,
getWorkflowArn,
timeout
} = require('@cumulus/integration-tests');

const { loadConfig } = require('../helpers/testUtils');

const testConfig = loadConfig();

const lambdaStep = new LambdaStep();
const sfn = new StepFunctions({ region: testConfig.awsRegion });
const kinesis = new Kinesis({ apiVersion: '2013-12-02', region: testConfig.awsRegion });

const waitPeriodMs = 1000;

/**
* returns the most recently executed KinesisTriggerTest workflow
*
* @returns {Object} state function execution .
*/
async function getLastExecution() {
const kinesisTriggerTestStpFnArn = await getWorkflowArn(testConfig.stackName, testConfig.bucket, 'KinesisTriggerTest');
const data = await sfn.listExecutions({ stateMachineArn: kinesisTriggerTestStpFnArn }).promise();
return (_.orderBy(data.executions, 'startDate', 'desc')[0]);
}


/**
* Wait for a number of periods for a kinesis stream to become active.
*
* @param {string} streamName - name of kinesis stream to wait for
* @param {integer} maxNumberElapsedPeriods - number of periods to wait for stream
* default value 30; duration of period is 1000ms
* @returns {string} current stream status: 'ACTIVE'
* @throws {Error} - Error describing current stream status
*/
async function waitForActiveStream(streamName, maxNumberElapsedPeriods = 60) {
let streamStatus = 'Anything';
let elapsedPeriods = 0;
let stream;

/* eslint-disable no-await-in-loop */
while (streamStatus !== 'ACTIVE' && elapsedPeriods < maxNumberElapsedPeriods) {
await timeout(waitPeriodMs);
stream = await kinesis.describeStream({ StreamName: streamName }).promise();
streamStatus = stream.StreamDescription.StreamStatus;
elapsedPeriods += 1;
}
/* eslint-enable no-await-in-loop */

if (streamStatus === 'ACTIVE') return streamStatus;
throw new Error(`Stream never became active: status: ${streamStatus}`);
}

/**
* Helper function to delete a stream by name
*
* @param {string} streamName - name of kinesis stream to delete
* @returns {Promise<Object>} - a kinesis delete stream proxy object.
*/
async function deleteTestStream(streamName) {
return kinesis.deleteStream({ StreamName: streamName }).promise();
}

/**
* returns a active kinesis stream, creating it if necessary.
*
* @param {string} streamName - name of stream to return
* @returns {Object} empty object if stream is created and ready.
* @throws {Error} Kinesis error if stream cannot be created.
*/
async function createOrUseTestStream(streamName) {
let stream;

try {
stream = await kinesis.describeStream({ StreamName: streamName }).promise();
}
catch (err) {
if (err.code === 'ResourceNotFoundException') {
console.log('Creating a new stream:', streamName);
stream = await kinesis.createStream({ StreamName: streamName, ShardCount: 1 }).promise();
}
else {
throw err;
}
}
return stream;
}

/**
* add a record to the kinesis stream.
*
* @param {string} streamName - kinesis stream name
* @param {Object} record - CNM object to drop on stream
* @returns {Promise<Object>} - Kinesis putRecord response proxy object.
*/
async function putRecordOnStream(streamName, record) {
return kinesis.putRecord({
Data: JSON.stringify(record),
PartitionKey: '1',
StreamName: streamName
}).promise();
}

/**
* Wait until an exectution matching the desired execution starts.
*
* @param {string} recordIdentifier - random string identifying correct execution for test
* @param {integer} maxWaitTime - maximum time to wait for the correct execution in milliseconds
* @returns {Object} - {executionArn: <arn>, status: <status>}
* @throws {Error} - any AWS error, re-thrown from AWS execution or 'Workflow Never Started'.
*/
async function waitForTestSfStarted(recordIdentifier, maxWaitTime) {
let timeWaited = 0;
let lastExecution;
let workflowExecution;

/* eslint-disable no-await-in-loop */
while (timeWaited < maxWaitTime && workflowExecution === undefined) {
await timeout(waitPeriodMs);
timeWaited += waitPeriodMs;
lastExecution = await getLastExecution();
// getLastExecution returns undefined if no previous execution exists
if (lastExecution && lastExecution.executionArn) {
const taskOutput = await lambdaStep.getStepOutput(lastExecution.executionArn, 'sf2snsStart');
if (taskOutput !== null && taskOutput.payload.identifier === recordIdentifier) {
workflowExecution = lastExecution;
}
}
}
/* eslint-disable no-await-in-loop */
if (timeWaited < maxWaitTime) return workflowExecution;
throw new Error('Never found started workflow.');
}


module.exports = {
createOrUseTestStream,
deleteTestStream,
putRecordOnStream,
waitForActiveStream,
waitForTestSfStarted
};
2 changes: 2 additions & 0 deletions example/spec/helpers/testUtils.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ function loadConfig() {
throw new Error('the default deployment cannot be used for integration tests');
}

config.test_configs.buckets = config.buckets;

return config.test_configs;
}

Expand Down
10 changes: 5 additions & 5 deletions example/spec/ingestGranule/IngestGranuleSuccessSpec.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,19 @@ const {
} = require('../helpers/granuleUtils');
const config = loadConfig();
const lambdaStep = new LambdaStep();
const taskName = 'IngestGranule';
const workflowName = 'IngestGranule';

const granuleRegex = '^MOD09GQ\\.A[\\d]{7}\\.[\\w]{6}\\.006.[\\d]{13}$';
const testDataGranuleId = 'MOD09GQ.A2016358.h13v04.006.2016360104606';

const templatedSyncGranuleFilename = templateFile({
inputTemplateFilename: './spec/ingestGranule/SyncGranule.output.payload.template.json',
config: config[taskName].SyncGranuleOutput
config: config[workflowName].SyncGranuleOutput
});

const templatedOutputPayloadFilename = templateFile({
inputTemplateFilename: './spec/ingestGranule/IngestGranule.output.payload.template.json',
config: config[taskName].IngestGranuleOutput
config: config[workflowName].IngestGranuleOutput
});

/**
Expand Down Expand Up @@ -103,11 +103,11 @@ describe('The S3 Ingest Granules workflow', () => {

// eslint-disable-next-line function-paren-newline
workflowExecution = await buildAndExecuteWorkflow(
config.stackName, config.bucket, taskName, collection, provider, inputPayload
config.stackName, config.bucket, workflowName, collection, provider, inputPayload
);

failingWorkflowExecution = await buildAndExecuteWorkflow(
config.stackName, config.bucket, taskName, collection, provider, {}
config.stackName, config.bucket, workflowName, collection, provider, {}
);
failedExecutionArn = failingWorkflowExecution.executionArn.split(':');
failedExecutionName = failedExecutionArn.pop();
Expand Down
Loading

0 comments on commit 2dea593

Please sign in to comment.