diff --git a/CHANGELOG.md b/CHANGELOG.md index 8d1d96ffe95..718794a69ba 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,10 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. ## [v1.8.0] - 2018-07-23 ### Added + + +- **CUMULUS-718** Adds integration test for Kinesis triggering a workflow. + - **GITC-776-3** Added more flexibility for rules. You can now edit all fields on the rule's record We may need to update the api documentation to reflect this. diff --git a/example/app/config.yml b/example/app/config.yml index 3fb716e4554..a882267b0b6 100644 --- a/example/app/config.yml +++ b/example/app/config.yml @@ -7,6 +7,9 @@ default: bucket: '{{buckets.internal.name}}' distributionEndpoint: '{{api_distribution_url}}' ems_provider: CUMULUS + streamName: '{{stackName}}-testStream' + awsAccountId: '{{AWS_ACCOUNT_ID}}' + awsRegion: us-east-1 IngestGranule: IngestGranuleOutput: granules: @@ -244,3 +247,51 @@ jl: shared-2: name: rvl-internal type: shared + + +# Begin-MHS-config +mhs: + prefix: mhs-cumulus + stackName: mhs-cumulus + stackNameNoDash: mhsCumulus + + iams: + ecsRoleArn: 'arn:aws:iam::{{AWS_ACCOUNT_ID}}:role/mhs-ecs' + lambdaApiGatewayRoleArn: 'arn:aws:iam::{{AWS_ACCOUNT_ID}}:role/mhs-lambda-api-gateway' + lambdaProcessingRoleArn: 'arn:aws:iam::{{AWS_ACCOUNT_ID}}:role/mhs-lambda-processing' + stepRoleArn: 'arn:aws:iam::{{AWS_ACCOUNT_ID}}:role/mhs-steprole' + instanceProfile: 'arn:aws:iam::{{AWS_ACCOUNT_ID}}:instance-profile/mhs-ecs' + distributionRoleArn: 'arn:aws:iam::{{AWS_ACCOUNT_ID}}:role/mhs-distribution-api-lambda' + + buckets: + internal: + name: mhs-internal + type: internal + private: + name: mhs-private + type: private + protected: + name: mhs-protected + type: protected + public: + name: mhs-public + type: public + protected-2: + name: mhs-protected-2 + type: protected + shared: + name: cumulus-data-shared + type: shared + api_distribution_url: '{{API_DISTRIBUTION_URL}}' + system_bucket: '{{buckets.internal.name}}' + + users: + - username: '{{EARTHDATA_LOGIN_USERNAME}}' + + AutoScalingPolicyConfiguration: + GranulesTable: + enableAutoScaling: false + FilesTable: + enableAutoScaling: false + +# END-mhs-config diff --git a/example/data/collections/L2_HR_PIXC-000.json b/example/data/collections/L2_HR_PIXC-000.json new file mode 100644 index 00000000000..1f9099afe36 --- /dev/null +++ b/example/data/collections/L2_HR_PIXC-000.json @@ -0,0 +1,15 @@ +{ + "name": "L2_HR_PIXC", + "version": "000", + "provider_path": "podaac-cumulus/test-data", + "granuleId": "^.*$", + "granuleIdExtraction": "^(.*)(\\.h5|\\.cmr)", + "sampleFileName": "L2_HR_PIXC_product_0001-of-4154.h5", + "files": [ + { + "bucket": "private", + "regex": ".*.h5$", + "sampleFileName": "L2_HR_PIXC_product_0001-of-4154.h5" + } + ] +} diff --git a/example/data/providers/PODAAC_SWOT.json b/example/data/providers/PODAAC_SWOT.json new file mode 100644 index 00000000000..6e8de6868b6 --- /dev/null +++ b/example/data/providers/PODAAC_SWOT.json @@ -0,0 +1,6 @@ +{ + "globalConnectionLimit": 10, + "host": "replaced with internal bucket in addProviders()", + "id": "PODAAC_SWOT", + "protocol": "s3" +} diff --git a/example/data/records/L2_HR_PIXC_product_0001-of-4154.json b/example/data/records/L2_HR_PIXC_product_0001-of-4154.json new file mode 100644 index 00000000000..3058419d75b --- /dev/null +++ b/example/data/records/L2_HR_PIXC_product_0001-of-4154.json @@ -0,0 +1,20 @@ +{ + "provider": "PODAAC_SWOT", + "collection": "L2_HR_PIXC", + "bucket": "random-bucket", + "identifier": "<>", + "product": { + "name": "L2_HR_PIXC_product_0001-of-4154", + "dataVersion": "001", + "files": [ + { + "type": "data", + "uri": "s3://random-bucket/cumulus-test-data/pdrs/L2_HR_PIXC_product_0001-of-4154.h5", + "name": "L2_HR_PIXC_product_0001-of-4154.h5", + "checksum-type": "md5", + "checksum": "bb802eb3daf7d2c4e641b7551e89527f", + "size": 322904208 + } + ] + } +} diff --git a/example/data/rules/L2_HR_PIXC_kinesisRule.json b/example/data/rules/L2_HR_PIXC_kinesisRule.json new file mode 100644 index 00000000000..b5254e05b09 --- /dev/null +++ b/example/data/rules/L2_HR_PIXC_kinesisRule.json @@ -0,0 +1,15 @@ +{ + "collection": { + "name": "L2_HR_PIXC", + "version": "000" + }, + "name": "L2_HR_PIXC_kinesisRule", + "provider": "PODAAC_SWOT", + "rule": { + "type": "kinesis", + "value": "arn:aws:kinesis:{{awsRegion}}:{{awsAccountId}}:stream/{{streamName}}", + "arn": "arn:aws:kinesis:{{awsRegion}}:{{awsAccountId}}:stream/{{streamName}}" + }, + "state": "ENABLED", + "workflow": "KinesisTriggerTest" +} diff --git a/example/lambdas.yml b/example/lambdas.yml index d39f5153d7a..7f6e54acaf0 100644 --- a/example/lambdas.yml +++ b/example/lambdas.yml @@ -73,7 +73,7 @@ SfSnsReport: handler: index.handler timeout: 300 source: 'node_modules/@cumulus/sf-sns-report/dist' - useMessageAdapter: true + useMessageAdapter: true SyncGranule: handler: index.handler @@ -107,7 +107,7 @@ ModisProcessing: key: deploy/cumulus-process/modis/0.5.2.zip runtime: python2.7 -InRegionS3Policy: +InRegionS3Policy: handler: index.inRegionS3Policy memory: 256 timeout: 300 @@ -117,3 +117,13 @@ SnsS3Test: handler: index.handler source: 'lambdas/snsS3Test/' +CNMToCMA: + handler: 'gov.nasa.cumulus.CnmToGranuleHandler::handleRequestStreams' + timeout: 300 + runtime: java8 + memory: 128 + s3Source: + bucket: '{{buckets.shared.name}}' + key: 'daacs/podaac/CNMToCMA-0.0.1.zip' + useMessageAdapter: false + launchInVpc: true diff --git a/example/spec/helpers/kinesisHelpers.js b/example/spec/helpers/kinesisHelpers.js new file mode 100644 index 00000000000..e1843f5f13b --- /dev/null +++ b/example/spec/helpers/kinesisHelpers.js @@ -0,0 +1,149 @@ +'use strict'; + +const _ = require('lodash'); +const { Kinesis, StepFunctions } = require('aws-sdk'); + +const { + LambdaStep, + getWorkflowArn, + timeout +} = require('@cumulus/integration-tests'); + +const { loadConfig } = require('../helpers/testUtils'); + +const testConfig = loadConfig(); + +const lambdaStep = new LambdaStep(); +const sfn = new StepFunctions({ region: testConfig.awsRegion }); +const kinesis = new Kinesis({ apiVersion: '2013-12-02', region: testConfig.awsRegion }); + +const waitPeriodMs = 1000; + +/** + * returns the most recently executed KinesisTriggerTest workflow + * + * @returns {Object} state function execution . + */ +async function getLastExecution() { + const kinesisTriggerTestStpFnArn = await getWorkflowArn(testConfig.stackName, testConfig.bucket, 'KinesisTriggerTest'); + const data = await sfn.listExecutions({ stateMachineArn: kinesisTriggerTestStpFnArn }).promise(); + return (_.orderBy(data.executions, 'startDate', 'desc')[0]); +} + + +/** + * Wait for a number of periods for a kinesis stream to become active. + * + * @param {string} streamName - name of kinesis stream to wait for + * @param {integer} maxNumberElapsedPeriods - number of periods to wait for stream + * default value 30; duration of period is 1000ms + * @returns {string} current stream status: 'ACTIVE' + * @throws {Error} - Error describing current stream status + */ +async function waitForActiveStream(streamName, maxNumberElapsedPeriods = 60) { + let streamStatus = 'Anything'; + let elapsedPeriods = 0; + let stream; + + /* eslint-disable no-await-in-loop */ + while (streamStatus !== 'ACTIVE' && elapsedPeriods < maxNumberElapsedPeriods) { + await timeout(waitPeriodMs); + stream = await kinesis.describeStream({ StreamName: streamName }).promise(); + streamStatus = stream.StreamDescription.StreamStatus; + elapsedPeriods += 1; + } + /* eslint-enable no-await-in-loop */ + + if (streamStatus === 'ACTIVE') return streamStatus; + throw new Error(`Stream never became active: status: ${streamStatus}`); +} + +/** + * Helper function to delete a stream by name + * + * @param {string} streamName - name of kinesis stream to delete + * @returns {Promise} - a kinesis delete stream proxy object. + */ +async function deleteTestStream(streamName) { + return kinesis.deleteStream({ StreamName: streamName }).promise(); +} + +/** + * returns a active kinesis stream, creating it if necessary. + * + * @param {string} streamName - name of stream to return + * @returns {Object} empty object if stream is created and ready. + * @throws {Error} Kinesis error if stream cannot be created. + */ +async function createOrUseTestStream(streamName) { + let stream; + + try { + stream = await kinesis.describeStream({ StreamName: streamName }).promise(); + } + catch (err) { + if (err.code === 'ResourceNotFoundException') { + console.log('Creating a new stream:', streamName); + stream = await kinesis.createStream({ StreamName: streamName, ShardCount: 1 }).promise(); + } + else { + throw err; + } + } + return stream; +} + +/** + * add a record to the kinesis stream. + * + * @param {string} streamName - kinesis stream name + * @param {Object} record - CNM object to drop on stream + * @returns {Promise} - Kinesis putRecord response proxy object. + */ +async function putRecordOnStream(streamName, record) { + return kinesis.putRecord({ + Data: JSON.stringify(record), + PartitionKey: '1', + StreamName: streamName + }).promise(); +} + +/** + * Wait until an exectution matching the desired execution starts. + * + * @param {string} recordIdentifier - random string identifying correct execution for test + * @param {integer} maxWaitTime - maximum time to wait for the correct execution in milliseconds + * @returns {Object} - {executionArn: , status: } + * @throws {Error} - any AWS error, re-thrown from AWS execution or 'Workflow Never Started'. + */ +async function waitForTestSfStarted(recordIdentifier, maxWaitTime) { + let timeWaited = 0; + let lastExecution; + let workflowExecution; + + /* eslint-disable no-await-in-loop */ + while (timeWaited < maxWaitTime && workflowExecution === undefined) { + await timeout(waitPeriodMs); + timeWaited += waitPeriodMs; + lastExecution = await getLastExecution(); + // getLastExecution returns undefined if no previous execution exists + if (lastExecution && lastExecution.executionArn) { + const taskOutput = await lambdaStep.getStepOutput(lastExecution.executionArn, 'sf2snsStart'); + if (taskOutput !== null && taskOutput.payload.identifier === recordIdentifier) { + workflowExecution = lastExecution; + } + } + } + /* eslint-disable no-await-in-loop */ + if (timeWaited < maxWaitTime) return workflowExecution; + throw new Error('Never found started workflow.'); +} + + +module.exports = { + createOrUseTestStream, + deleteTestStream, + putRecordOnStream, + waitForActiveStream, + waitForTestSfStarted +}; diff --git a/example/spec/helpers/testUtils.js b/example/spec/helpers/testUtils.js index c6a4136f2be..11df3f9a11b 100644 --- a/example/spec/helpers/testUtils.js +++ b/example/spec/helpers/testUtils.js @@ -31,6 +31,8 @@ function loadConfig() { throw new Error('the default deployment cannot be used for integration tests'); } + config.test_configs.buckets = config.buckets; + return config.test_configs; } diff --git a/example/spec/ingestGranule/IngestGranuleSuccessSpec.js b/example/spec/ingestGranule/IngestGranuleSuccessSpec.js index a8b0c8b133b..85371fd2842 100644 --- a/example/spec/ingestGranule/IngestGranuleSuccessSpec.js +++ b/example/spec/ingestGranule/IngestGranuleSuccessSpec.js @@ -27,19 +27,19 @@ const { } = require('../helpers/granuleUtils'); const config = loadConfig(); const lambdaStep = new LambdaStep(); -const taskName = 'IngestGranule'; +const workflowName = 'IngestGranule'; const granuleRegex = '^MOD09GQ\\.A[\\d]{7}\\.[\\w]{6}\\.006.[\\d]{13}$'; const testDataGranuleId = 'MOD09GQ.A2016358.h13v04.006.2016360104606'; const templatedSyncGranuleFilename = templateFile({ inputTemplateFilename: './spec/ingestGranule/SyncGranule.output.payload.template.json', - config: config[taskName].SyncGranuleOutput + config: config[workflowName].SyncGranuleOutput }); const templatedOutputPayloadFilename = templateFile({ inputTemplateFilename: './spec/ingestGranule/IngestGranule.output.payload.template.json', - config: config[taskName].IngestGranuleOutput + config: config[workflowName].IngestGranuleOutput }); /** @@ -103,11 +103,11 @@ describe('The S3 Ingest Granules workflow', () => { // eslint-disable-next-line function-paren-newline workflowExecution = await buildAndExecuteWorkflow( - config.stackName, config.bucket, taskName, collection, provider, inputPayload + config.stackName, config.bucket, workflowName, collection, provider, inputPayload ); failingWorkflowExecution = await buildAndExecuteWorkflow( - config.stackName, config.bucket, taskName, collection, provider, {} + config.stackName, config.bucket, workflowName, collection, provider, {} ); failedExecutionArn = failingWorkflowExecution.executionArn.split(':'); failedExecutionName = failedExecutionArn.pop(); diff --git a/example/spec/kinesisTests/KinesisTestTriggerSpec.js b/example/spec/kinesisTests/KinesisTestTriggerSpec.js new file mode 100644 index 00000000000..119e0728615 --- /dev/null +++ b/example/spec/kinesisTests/KinesisTestTriggerSpec.js @@ -0,0 +1,138 @@ +'use strict'; + +const { s3 } = require('@cumulus/common/aws'); + +jasmine.DEFAULT_TIMEOUT_INTERVAL = 550000; + +const { + LambdaStep, + waitForCompletedExecution +} = require('@cumulus/integration-tests'); +const { randomString } = require('@cumulus/common/test-utils'); + +const { loadConfig } = require('../helpers/testUtils'); +const { + createOrUseTestStream, + putRecordOnStream, + waitForActiveStream, + waitForTestSfStarted +} = require('../helpers/kinesisHelpers'); + +const record = require('../../data/records/L2_HR_PIXC_product_0001-of-4154.json'); + +const granuleId = record.product.name; +const recordIdentifier = randomString(); +record.identifier = recordIdentifier; + +const testConfig = loadConfig(); + +const lambdaStep = new LambdaStep(); +const streamName = testConfig.streamName; + + +const recordFile = record.product.files[0]; +const expectedTranslatePayload = { + granules: [ + { + granuleId: record.product.name, + files: [ + { + path: 'cumulus-test-data/pdrs', + url_path: recordFile.uri, + bucket: record.bucket, + name: recordFile.name, + size: recordFile.size + } + ] + } + ] +}; + +const fileData = expectedTranslatePayload.granules[0].files[0]; +const filePrefix = `file-staging/${testConfig.stackName}/L2_HR_PIXC`; + +const fileDataWithFilename = { + ...fileData, + filename: `s3://${testConfig.buckets.private.name}/${filePrefix}/${recordFile.name}`, + bucket: testConfig.buckets.private.name, + url_path: '', + fileStagingDir: filePrefix +}; + +const expectedSyncGranulesPayload = { + granules: [ + { + granuleId: granuleId, + files: [fileDataWithFilename] + } + ] +}; + +// When kinesis-type rules exist, the Cumulus lambda kinesisConsumer is +// configured to trigger workflows when new records arrive on a Kinesis +// stream. When a record appears on the stream, the kinesisConsumer lambda +// triggers workflows associated with the kinesis-type rules. +describe('The Cloud Notification Mechanism Kinesis workflow', () => { + const maxWaitTime = 1000 * 60 * 4; + let executionStatus; + let s3FileHead; + + afterAll(async () => { + await s3().deleteObject({ + Bucket: testConfig.buckets.private.name, + Key: `${filePrefix}/${fileData.name}` + }).promise(); + }); + + + beforeAll(async () => { + try { + await createOrUseTestStream(streamName); + console.log(`\nwaits for active Stream ${streamName}.`); + await waitForActiveStream(streamName); + console.log(`Drops record onto ${streamName}.`); + await putRecordOnStream(streamName, record); + console.log(`waits for stepfunction to start ${streamName}`); + this.workflowExecution = await waitForTestSfStarted(recordIdentifier, maxWaitTime); + console.log(`waits for completed execution of ${this.workflowExecution.executionArn}.`); + executionStatus = await waitForCompletedExecution(this.workflowExecution.executionArn); + } + catch (error) { + console.log(error); + console.log('Tests conditions can\'t get met...exiting.'); + process.exit(1); + } + }); + + it('executes successfully', () => { + expect(executionStatus).toEqual('SUCCEEDED'); + }); + + describe('the TranslateMessage Lambda', () => { + beforeAll(async () => { + this.lambdaOutput = await lambdaStep.getStepOutput(this.workflowExecution.executionArn, 'CNMToCMA'); + }); + + it('outputs the granules object', () => { + expect(this.lambdaOutput.payload).toEqual(expectedTranslatePayload); + }); + }); + + describe('the SyncGranule Lambda', () => { + beforeAll(async () => { + this.lambdaOutput = await lambdaStep.getStepOutput(this.workflowExecution.executionArn, 'SyncGranule'); + }); + + it('outputs the granules object', () => { + expect(this.lambdaOutput.payload).toEqual(expectedSyncGranulesPayload); + }); + + it('syncs data to s3 target location.', async () => { + s3FileHead = await s3().headObject({ + Bucket: testConfig.buckets.private.name, + Key: `${filePrefix}/${fileData.name}` + }).promise(); + expect(new Date() - s3FileHead.LastModified < maxWaitTime).toBeTruthy(); + }); + }); +}); diff --git a/example/spec/setup/CleansUpTestResources.js b/example/spec/setup/CleansUpTestResources.js new file mode 100644 index 00000000000..adab56920c4 --- /dev/null +++ b/example/spec/setup/CleansUpTestResources.js @@ -0,0 +1,14 @@ +const { loadConfig } = require('../helpers/testUtils'); +const { deleteRules, rulesList } = require('@cumulus/integration-tests'); +const testConfig = loadConfig(); +const ruleDirectory = './data/rules'; + +jasmine.DEFAULT_TIMEOUT_INTERVAL = 550000; + +describe('Cleans up Test Resources', () => { + it('removes the PODAAC_SWOT rule', async () => { + const rules = await rulesList(testConfig.stackName, testConfig.bucket, ruleDirectory); + const deleted = await deleteRules(testConfig.stackName, testConfig.bucket, rules); // + expect(deleted).toBeGreaterThanOrEqual(1); + }); +}); diff --git a/example/spec/setup/CreatesTestResources.js b/example/spec/setup/CreatesTestResources.js new file mode 100644 index 00000000000..2728b6cef3a --- /dev/null +++ b/example/spec/setup/CreatesTestResources.js @@ -0,0 +1,27 @@ +const { loadConfig } = require('../helpers/testUtils'); +const { createOrUseTestStream, waitForActiveStream } = require('../helpers/kinesisHelpers'); +const { Kinesis } = require('aws-sdk'); +const testConfig = loadConfig(); +const kinesis = new Kinesis({ apiVersion: '2013-12-02', region: testConfig.awsRegion }); + + +jasmine.DEFAULT_TIMEOUT_INTERVAL = 550000; + +describe('Creates Necessary Test Resources', () => { + beforeAll(async () => { + try { + await createOrUseTestStream(testConfig.streamName); + await waitForActiveStream(testConfig.streamName); + } + catch (error) { + console.log(error); + console.log('failed to set up necessary test resources...exiting.'); + process.exit(1); + } + }); + + it('Prepares a kinesis stream for integration tests.', async () => { + const stream = await kinesis.describeStream({ StreamName: testConfig.streamName }).promise(); + expect(stream.StreamDescription.StreamStatus).toBe('ACTIVE'); + }); +}); diff --git a/example/spec/setup/PopulateProvidersCollections.js b/example/spec/setup/PopulateProvidersCollections.js index ab4e6946db7..7b6d5010a74 100644 --- a/example/spec/setup/PopulateProvidersCollections.js +++ b/example/spec/setup/PopulateProvidersCollections.js @@ -1,20 +1,24 @@ 'use strict'; const path = require('path'); -const { addProviders, addCollections } = require('@cumulus/integration-tests'); +const { addProviders, addCollections, addRules } = require('@cumulus/integration-tests'); const { s3 } = require('@cumulus/common/aws'); const { loadConfig } = require('../helpers/testUtils'); const fs = require('fs-extra'); const config = loadConfig(); +jasmine.DEFAULT_TIMEOUT_INTERVAL = 550000; + const collectionsDirectory = './data/collections'; const providersDirectory = './data/providers'; +const rulesDirectory = './data/rules'; const s3data = [ '@cumulus/test-data/pdrs/MOD09GQ_1granule_v3.PDR', '@cumulus/test-data/granules/MOD09GQ.A2016358.h13v04.006.2016360104606.hdf.met', '@cumulus/test-data/granules/MOD09GQ.A2016358.h13v04.006.2016360104606.hdf', - '@cumulus/test-data/granules/MOD09GQ.A2016358.h13v04.006.2016360104606_ndvi.jpg' + '@cumulus/test-data/granules/MOD09GQ.A2016358.h13v04.006.2016360104606_ndvi.jpg', + '@cumulus/test-data/granules/L2_HR_PIXC_product_0001-of-4154.h5' ]; /** @@ -45,25 +49,29 @@ function uploadTestDataToBucket(bucket) { return Promise.all(s3data.map((file) => uploadTestDataToS3(file, bucket))); } -describe('Populating providers and collections to database', () => { +describe('Populating providers, collections and rules to database', () => { let collections; let providers; + let rules; + beforeAll(async () => { try { collections = await addCollections(config.stackName, config.bucket, collectionsDirectory); providers = await addProviders(config.stackName, config.bucket, providersDirectory, config.bucket); + rules = await addRules(config, rulesDirectory); console.log(`Uploading test data to S3 bucket: ${config.bucket}`); await uploadTestDataToBucket(config.bucket); } catch (e) { - console.log(e); + console.log(JSON.stringify(e)); throw e; } }); - it('providers and collections are added successfully', async () => { - expect(collections >= 1).toBe(true); - expect(providers >= 1).toBe(true); + it('providers, collections and rules are added successfully', async () => { + expect(providers).toBeGreaterThanOrEqual(1, 'Number of providers incorrect.'); + expect(collections).toBeGreaterThanOrEqual(1, 'Number of collections incorrect.'); + expect(rules).toBeGreaterThanOrEqual(1, 'Number of rules incorrect.'); }); }); diff --git a/example/spec/support/jasmine.json b/example/spec/support/jasmine.json index 10144a126d1..8737034d610 100644 --- a/example/spec/support/jasmine.json +++ b/example/spec/support/jasmine.json @@ -1,12 +1,15 @@ { "spec_dir": "spec", "spec_files": [ + "setup/CreatesTestResources.js", "setup/PopulateProvidersCollections.js", - "**/*[sS]pec.js" + "**/*[sS]pec.js", + "setup/CleansUpTestResources.js" ], "helpers": [ "helpers/**/*.js" ], + "stopOnSpecFailure": true, "stopSpecOnExpectationFailure": false, - "random": false + "random": false } diff --git a/example/workflows.yml b/example/workflows.yml index 18ce7d6faf4..8e092a88c4e 100644 --- a/example/workflows.yml +++ b/example/workflows.yml @@ -172,7 +172,7 @@ DiscoverGranules: CumulusConfig: cumulus_message: input: '{$}' - Next: VpcOrNot + Next: VpcOrNot VpcOrNot: Type: Choice Choices: @@ -183,7 +183,7 @@ DiscoverGranules: StringEquals: https Next: DiscoverGranules - Variable: $.meta.provider.protocol - StringEquals: s3 + StringEquals: s3 Next: DiscoverGranulesNoVpc DiscoverGranules: CumulusConfig: @@ -397,7 +397,7 @@ IngestGranule: CumulusConfig: cumulus_message: input: '{$}' - Next: VpcOrNot + Next: VpcOrNot VpcOrNot: Type: Choice Choices: @@ -408,7 +408,7 @@ IngestGranule: StringEquals: https Next: SyncGranule - Variable: $.meta.provider.protocol - StringEquals: s3 + StringEquals: s3 Next: SyncGranuleNoVpc SyncGranule: CumulusConfig: @@ -433,7 +433,7 @@ IngestGranule: - States.ALL ResultPath: '$.exception' Next: StopStatus - Next: ChooseProcess + Next: ChooseProcess SyncGranuleNoVpc: CumulusConfig: buckets: '{$.meta.buckets}' @@ -457,7 +457,7 @@ IngestGranule: - States.ALL ResultPath: '$.exception' Next: StopStatus - Next: ChooseProcess + Next: ChooseProcess ChooseProcess: Type: Choice Choices: @@ -534,6 +534,66 @@ IngestGranule: Type: Fail Cause: 'Workflow failed' +KinesisTriggerTest: + Comment: 'Tests Workflow from Kinesis Stream' + StartAt: Report + States: + Report: + Type: Task + Resource: ${sf2snsStartLambdaFunction.Arn} + Next: TranslateMessage + TranslateMessage: + Type: Task + Resource: ${CNMToCMALambdaFunction.Arn} + Retry: + - ErrorEquals: + - States.ALL + IntervalSeconds: 10 + MaxAttempts: 2 + Catch: + - ErrorEquals: + - States.ALL + ResultPath: '$.exception' + Next: StopStatus + Next: SyncGranule + SyncGranule: + CumulusConfig: + provider: '{$.meta.provider}' + buckets: '{$.meta.buckets}' + collection: '{$.meta.collection}' + downloadBucket: '{$.meta.buckets.private.name}' + stack: '{$.meta.stack}' + cumulus_message: + outputs: + - source: '{$.granules}' + destination: '{$.meta.input_granules}' + - source: '{$}' + destination: '{$.payload}' + Type: Task + Resource: ${SyncGranuleLambdaFunction.Arn} + Retry: + - ErrorEquals: + - States.ALL + IntervalSeconds: 10 + MaxAttempts: 3 + Catch: + - ErrorEquals: + - States.ALL + ResultPath: '$.exception' + Next: StopStatus + Next: StopStatus + StopStatus: + Type: Task + Resource: ${sf2snsEndLambdaFunction.Arn} + Catch: + - ErrorEquals: + - States.ALL + Next: WorkflowFailed + End: true + WorkflowFailed: + Type: Fail + Cause: 'Workflow failed' + SyncGranule: Comment: 'Sync Granule' StartAt: Report @@ -544,7 +604,7 @@ SyncGranule: CumulusConfig: cumulus_message: input: '{$}' - Next: VpcOrNot + Next: VpcOrNot VpcOrNot: Type: Choice Choices: @@ -555,7 +615,7 @@ SyncGranule: StringEquals: https Next: SyncGranule - Variable: $.meta.provider.protocol - StringEquals: s3 + StringEquals: s3 Next: SyncGranuleNoVpc SyncGranule: CumulusConfig: @@ -580,7 +640,7 @@ SyncGranule: - States.ALL ResultPath: '$.exception' Next: StopStatus - Next: StopStatus + Next: StopStatus SyncGranuleNoVpc: CumulusConfig: buckets: '{$.meta.buckets}' @@ -623,7 +683,7 @@ SyncGranule: End: true WorkflowFailed: Type: Fail - Cause: 'Workflow failed' + Cause: 'Workflow failed' EcsHelloWorldWorkflow: Comment: 'Returns Hello World' diff --git a/packages/api/models/rules.js b/packages/api/models/rules.js index 8a2618166cf..66b458ba51b 100644 --- a/packages/api/models/rules.js +++ b/packages/api/models/rules.js @@ -155,7 +155,7 @@ class Rule extends Manager { * @returns {Promise} updated rule item */ async addKinesisEventSource(item) { - // use the existing event source mapping if it already exists + // use the existing event source mapping if it already exists and is enabled const listParams = { FunctionName: process.env.kinesisConsumer }; const listData = await aws.lambda(listParams).listEventSourceMappings().promise(); if (listData.EventSourceMappings && listData.EventSourceMappings.length > 0) { @@ -164,8 +164,17 @@ class Rule extends Manager { return (mapping.EventSourceArn === item.rule.value); }); if (mappingExists) { - item.rule.arn = mappingExists.UUID; - return item; + const mappingEnabled = listData.EventSourceMappings + .find((mapping) => { // eslint-disable-line arrow-body-style + return (mapping.EventSourceArn === item.rule.value && + mapping.State === 'Enabled'); + }); + + if (mappingEnabled) { + item.rule.arn = mappingEnabled.UUID; + return item; + } + await this.deleteKinesisEventSource({ UUID: mappingExists.UUID }).promise(); } } @@ -173,7 +182,7 @@ class Rule extends Manager { const params = { EventSourceArn: item.rule.value, FunctionName: process.env.kinesisConsumer, - StartingPosition: 'LATEST', + StartingPosition: 'TRIM_HORIZON', Enabled: item.state === 'ENABLED' }; diff --git a/packages/integration-tests/index.js b/packages/integration-tests/index.js index 22581a1ff81..0b10fadcc47 100644 --- a/packages/integration-tests/index.js +++ b/packages/integration-tests/index.js @@ -263,8 +263,8 @@ async function addProviders(stackName, bucketName, dataDirectory, s3Host = null) * @returns {Promise.} number of rules added */ async function addRules(config, dataDirectory) { - const { stackName, bucketName } = config; - const rules = await setupSeedData(stackName, bucketName, dataDirectory); + const { stackName, bucket } = config; + const rules = await setupSeedData(stackName, bucket, dataDirectory); const promises = rules.map((rule) => limit(() => { const ruleTemplate = Handlebars.compile(JSON.stringify(rule)); @@ -276,6 +276,43 @@ async function addRules(config, dataDirectory) { return Promise.all(promises).then((rs) => rs.length); } +/** + * deletes a rule by name + * + * @param {string} name - name of the rule to delete. + * @returns {Promise.} - superclass delete promise + */ +async function _deleteOneRule(name) { + const r = new Rule(); + return r.get({ name: name }).then((item) => r.delete(item)); +} + + +/** + * returns a list of rule objects + * + * @param {string} stackName - Cloud formation stack name + * @param {string} bucketName - S3 internal bucket name + * @param {string} rulesDirectory - The directory continaing rules json files + * @returns {list} - list of rules found in rulesDirectory + */ +async function rulesList(stackName, bucketName, rulesDirectory) { + return setupSeedData(stackName, bucketName, rulesDirectory); +} + +/** + * + * @param {string} stackName - Cloud formation stack name + * @param {string} bucketName - S3 internal bucket name + * @param {Array} rules - List of rules objects to delete + * @returns {Promise.} - Number of rules deleted + */ +async function deleteRules(stackName, bucketName, rules) { + setProcessEnvironment(stackName, bucketName); + const promises = rules.map((rule) => limit(() => _deleteOneRule(rule.name))); + return Promise.all(promises).then((rs) => rs.length); +} + /** * build workflow message * @@ -354,6 +391,8 @@ module.exports = { getOnlineResources: cmr.getOnlineResources, generateCmrFilesForGranules: cmr.generateCmrFilesForGranules, addRules, + deleteRules, + rulesList, timeout, getWorkflowArn }; diff --git a/packages/integration-tests/sfnStep.js b/packages/integration-tests/sfnStep.js index 84ec4605442..65644f73654 100644 --- a/packages/integration-tests/sfnStep.js +++ b/packages/integration-tests/sfnStep.js @@ -109,14 +109,22 @@ class SfnStep { // Use the first passed execution, or last execution if none passed let stepExecution = stepExecutions[stepExecutions.length - 1]; - const passedExecutions = stepExecutions.filter((e) => - e.completeEvent !== null && e.completeEvent.type === this.successEvent); - if (passedExecutions) { + const passedExecutions = stepExecutions.filter((e) => { + if ((e.completeEvent !== null) + && ((e.completeEvent === undefined) || !('type' in e.completeEvent))) { + console.log(`incomplete Execution discovered found e : ${JSON.stringify(e)}`); + } + return (typeof e.completeEvent !== 'undefined' + && e.completeEvent !== null + && e.completeEvent.type === this.successEvent); + }); + if (passedExecutions && passedExecutions.length > 0) { stepExecution = passedExecutions[0]; } - if (stepExecution.completeEvent === null || - stepExecution.completeEvent.type !== this.successEvent) { + if (typeof stepExecution.completeEvent === 'undefined' + || stepExecution.completeEvent === null + || stepExecution.completeEvent.type !== this.successEvent) { console.log(`Step ${stepName} was not successful.`); return null; } diff --git a/packages/test-data/granules/L2_HR_PIXC_product_0001-of-4154.h5 b/packages/test-data/granules/L2_HR_PIXC_product_0001-of-4154.h5 new file mode 100644 index 00000000000..c15812a441d --- /dev/null +++ b/packages/test-data/granules/L2_HR_PIXC_product_0001-of-4154.h5 @@ -0,0 +1 @@ +This is a fake hdf5 file used to test PODAAC_SWOT. \ No newline at end of file