diff --git a/README.md b/README.md index fc84fe185..879b6359b 100644 --- a/README.md +++ b/README.md @@ -1009,7 +1009,7 @@ make ci This will basically run docker-compose command: ```sh -(docker-compose -f docker-compose.test.yml -p machinery_ci up --build -d) && (docker logs -f machinery_sut &) && (docker wait machinery_sut) +(docker compose -f docker-compose.test.yml -p machinery_ci up --build -d) && (docker logs -f machinery_sut &) && (docker wait machinery_sut) ``` Alternative approach is to setup a development environment on your machine. @@ -1021,6 +1021,7 @@ export AMQP_URL=amqp://guest:guest@localhost:5672/ export REDIS_URL=localhost:6379 export MEMCACHE_URL=localhost:11211 export MONGODB_URL=localhost:27017 +export DYNAMODB_URL=localhost:8000 ``` To run integration tests against an SQS instance, you will need to create a "test_queue" in SQS and export these environment variables: diff --git a/docker-compose.test.yml b/docker-compose.test.yml index e9eb1965b..5e0aaa54c 100644 --- a/docker-compose.test.yml +++ b/docker-compose.test.yml @@ -12,12 +12,14 @@ services: - memcached - mongo - gcppubsub + - dynamodb links: - rabbitmq - redis - memcached - mongo - gcppubsub + - dynamodb build: context: . dockerfile: ./Dockerfile.test @@ -35,6 +37,7 @@ services: GCPPUBSUB_URL: 'gcppubsub://example-project/test_subscription_queue' GCPPUBSUB_TOPIC: 'test_topic_queue' PUBSUB_EMULATOR_HOST: 'gcppubsub:8085' + DYNAMODB_URL: 'dynamodb:8000' rabbitmq: container_name: machinery_sut_rabbitmq @@ -70,3 +73,10 @@ services: dockerfile: ./Dockerfile.gcppubsub logging: driver: none + + dynamodb: + command: "-jar DynamoDBLocal.jar -sharedDb -dbPath ./data" + image: "amazon/dynamodb-local:latest" + container_name: dynamodb-local + ports: + - "8000:8000" diff --git a/v1/backends/dynamodb/dynamodb.go b/v1/backends/dynamodb/dynamodb.go index a2a4a9bfb..b0e73ae18 100644 --- a/v1/backends/dynamodb/dynamodb.go +++ b/v1/backends/dynamodb/dynamodb.go @@ -290,7 +290,7 @@ func (b *Backend) PurgeState(taskUUID string) error { input := &dynamodb.DeleteItemInput{ Key: map[string]*dynamodb.AttributeValue{ "TaskUUID": { - N: aws.String(taskUUID), + S: aws.String(taskUUID), }, }, TableName: aws.String(b.cnf.DynamoDB.TaskStatesTable), @@ -308,7 +308,7 @@ func (b *Backend) PurgeGroupMeta(groupUUID string) error { input := &dynamodb.DeleteItemInput{ Key: map[string]*dynamodb.AttributeValue{ "GroupUUID": { - N: aws.String(groupUUID), + S: aws.String(groupUUID), }, }, TableName: aws.String(b.cnf.DynamoDB.GroupMetasTable), diff --git a/v1/backends/dynamodb/dynamodb_export_test.go b/v1/backends/dynamodb/dynamodb_export_test.go index 38e2935b1..d9b4cf520 100644 --- a/v1/backends/dynamodb/dynamodb_export_test.go +++ b/v1/backends/dynamodb/dynamodb_export_test.go @@ -1,33 +1,27 @@ package dynamodb import ( - "errors" "os" - "github.com/RichardKnop/machinery/v1/config" - "github.com/RichardKnop/machinery/v1/tasks" "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/credentials" + "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/dynamodb" "github.com/aws/aws-sdk-go/service/dynamodb/dynamodbiface" + + "github.com/RichardKnop/machinery/v1/config" + "github.com/RichardKnop/machinery/v1/tasks" ) var ( - TestDynamoDBBackend *Backend - TestErrDynamoDBBackend *Backend - TestCnf *config.Config - TestDBClient dynamodbiface.DynamoDBAPI - TestErrDBClient dynamodbiface.DynamoDBAPI - TestGroupMeta *tasks.GroupMeta - TestTask1 map[string]*dynamodb.AttributeValue - TestTask2 map[string]*dynamodb.AttributeValue - TestTask3 map[string]*dynamodb.AttributeValue + TestDynamoDBBackend *Backend + TestCnf *config.Config ) type TestDynamoDBClient struct { dynamodbiface.DynamoDBAPI PutItemOverride func(*dynamodb.PutItemInput) (*dynamodb.PutItemOutput, error) UpdateItemOverride func(*dynamodb.UpdateItemInput) (*dynamodb.UpdateItemOutput, error) - GetItemOverride func(input *dynamodb.GetItemInput) (*dynamodb.GetItemOutput, error) BatchGetItemOverride func(*dynamodb.BatchGetItemInput) (*dynamodb.BatchGetItemOutput, error) } @@ -41,155 +35,89 @@ func (t *TestDynamoDBClient) PutItem(input *dynamodb.PutItemInput) (*dynamodb.Pu if t.PutItemOverride != nil { return t.PutItemOverride(input) } - return &dynamodb.PutItemOutput{}, nil + return t.DynamoDBAPI.PutItem(input) } func (t *TestDynamoDBClient) BatchGetItem(input *dynamodb.BatchGetItemInput) (*dynamodb.BatchGetItemOutput, error) { if t.BatchGetItemOverride != nil { return t.BatchGetItemOverride(input) } - return &dynamodb.BatchGetItemOutput{}, nil -} - -func (t *TestDynamoDBClient) GetItem(input *dynamodb.GetItemInput) (*dynamodb.GetItemOutput, error) { - if t.GetItemOverride != nil { - return t.GetItemOverride(input) - } - var output *dynamodb.GetItemOutput - switch *input.TableName { - case "group_metas": - output = &dynamodb.GetItemOutput{ - Item: map[string]*dynamodb.AttributeValue{ - "TaskUUIDs": { - L: []*dynamodb.AttributeValue{ - { - S: aws.String("testTaskUUID1"), - }, - { - S: aws.String("testTaskUUID2"), - }, - { - S: aws.String("testTaskUUID3"), - }, - }, - }, - "ChordTriggered": { - BOOL: aws.Bool(false), - }, - "GroupUUID": { - S: aws.String("testGroupUUID"), - }, - "Lock": { - BOOL: aws.Bool(false), - }, - }, - } - case "task_states": - if input.Key["TaskUUID"] == nil { - output = &dynamodb.GetItemOutput{ - Item: map[string]*dynamodb.AttributeValue{ - "Error": { - NULL: aws.Bool(false), - }, - "State": { - S: aws.String(tasks.StatePending), - }, - "TaskUUID": { - S: aws.String("testTaskUUID1"), - }, - "Results:": { - NULL: aws.Bool(true), - }, - }, - } - } else { - if *(input.Key["TaskUUID"].S) == "testTaskUUID1" { - output = &dynamodb.GetItemOutput{ - Item: TestTask1, - } - } else if *(input.Key["TaskUUID"].S) == "testTaskUUID2" { - output = &dynamodb.GetItemOutput{ - Item: TestTask2, - } - - } else if *(input.Key["TaskUUID"].S) == "testTaskUUID3" { - output = &dynamodb.GetItemOutput{ - Item: TestTask3, - } - } - } - - } - return output, nil + return t.DynamoDBAPI.BatchGetItem(input) } -func (t *TestDynamoDBClient) DeleteItem(*dynamodb.DeleteItemInput) (*dynamodb.DeleteItemOutput, error) { - return &dynamodb.DeleteItemOutput{}, nil +func (t *TestDynamoDBClient) DeleteItem(input *dynamodb.DeleteItemInput) (*dynamodb.DeleteItemOutput, error) { + return t.DynamoDBAPI.DeleteItem(input) } func (t *TestDynamoDBClient) UpdateItem(input *dynamodb.UpdateItemInput) (*dynamodb.UpdateItemOutput, error) { if t.UpdateItemOverride != nil { return t.UpdateItemOverride(input) } - return &dynamodb.UpdateItemOutput{}, nil -} - -func (t *TestDynamoDBClient) ListTables(*dynamodb.ListTablesInput) (*dynamodb.ListTablesOutput, error) { - return &dynamodb.ListTablesOutput{ - TableNames: []*string{ - aws.String("group_metas"), - aws.String("task_states"), - }, - }, nil -} - -// Always returns error -type TestErrDynamoDBClient struct { - dynamodbiface.DynamoDBAPI -} - -func (t *TestErrDynamoDBClient) PutItem(*dynamodb.PutItemInput) (*dynamodb.PutItemOutput, error) { - return nil, errors.New("error when putting an item") -} - -func (t *TestErrDynamoDBClient) GetItem(*dynamodb.GetItemInput) (*dynamodb.GetItemOutput, error) { - return nil, errors.New("error when getting an item") -} - -func (t *TestErrDynamoDBClient) DeleteItem(*dynamodb.DeleteItemInput) (*dynamodb.DeleteItemOutput, error) { - return nil, errors.New("error when deleting an item") -} - -func (t *TestErrDynamoDBClient) Scan(*dynamodb.ScanInput) (*dynamodb.ScanOutput, error) { - return nil, errors.New("error when scanning an item") + return t.DynamoDBAPI.UpdateItem(input) } -func (t *TestErrDynamoDBClient) UpdateItem(*dynamodb.UpdateItemInput) (*dynamodb.UpdateItemOutput, error) { - return nil, errors.New("error when updating an item") -} +func init() { + os.Setenv("AWS_ACCESS_KEY", "xxx") + os.Setenv("AWS_SECRET_KEY", "xxx") + cred := credentials.NewEnvCredentials() -func (t *TestErrDynamoDBClient) ListTables(*dynamodb.ListTablesInput) (*dynamodb.ListTablesOutput, error) { - return nil, errors.New("error when listing tables") -} + awsSession, _ := session.NewSession() + dynamodbClient := dynamodb.New(awsSession, &aws.Config{ + Region: aws.String("us-west-2"), + Credentials: cred, + Endpoint: aws.String(os.Getenv("DYNAMODB_URL")), + }) -func init() { TestCnf = &config.Config{ ResultBackend: os.Getenv("DYNAMODB_URL"), ResultsExpireIn: 30, DynamoDB: &config.DynamoDBConfig{ TaskStatesTable: "task_states", GroupMetasTable: "group_metas", + Client: dynamodbClient, }, } - TestDBClient = new(TestDynamoDBClient) - TestDynamoDBBackend = &Backend{cnf: TestCnf, client: TestDBClient} - TestErrDBClient = new(TestErrDynamoDBClient) - TestErrDynamoDBBackend = &Backend{cnf: TestCnf, client: TestErrDBClient} + TestDynamoDBBackend = &Backend{cnf: TestCnf, client: &TestDynamoDBClient{DynamoDBAPI: dynamodbClient}} - TestGroupMeta = &tasks.GroupMeta{ - GroupUUID: "testGroupUUID", - TaskUUIDs: []string{"testTaskUUID1", "testTaskUUID2", "testTaskUUID3"}, - } + dynamodbClient.CreateTable(&dynamodb.CreateTableInput{ + AttributeDefinitions: []*dynamodb.AttributeDefinition{ + { + AttributeName: aws.String("TaskUUID"), + AttributeType: aws.String("S"), + }, + }, + KeySchema: []*dynamodb.KeySchemaElement{ + { + AttributeName: aws.String("TaskUUID"), + KeyType: aws.String("HASH"), + }, + }, + ProvisionedThroughput: &dynamodb.ProvisionedThroughput{ + ReadCapacityUnits: aws.Int64(10), + WriteCapacityUnits: aws.Int64(10), + }, + TableName: aws.String("task_states"), + }) + + dynamodbClient.CreateTable(&dynamodb.CreateTableInput{ + AttributeDefinitions: []*dynamodb.AttributeDefinition{ + { + AttributeName: aws.String("GroupUUID"), + AttributeType: aws.String("S"), + }, + }, + KeySchema: []*dynamodb.KeySchemaElement{ + { + AttributeName: aws.String("GroupUUID"), + KeyType: aws.String("HASH"), + }, + }, + ProvisionedThroughput: &dynamodb.ProvisionedThroughput{ + ReadCapacityUnits: aws.Int64(10), + WriteCapacityUnits: aws.Int64(10), + }, + TableName: aws.String("group_metas"), + }) } func (b *Backend) GetConfig() *config.Config { diff --git a/v1/backends/dynamodb/dynamodb_test.go b/v1/backends/dynamodb/dynamodb_test.go index 41c3ea153..54fc47d8d 100644 --- a/v1/backends/dynamodb/dynamodb_test.go +++ b/v1/backends/dynamodb/dynamodb_test.go @@ -1,7 +1,6 @@ package dynamodb_test import ( - "fmt" "strconv" "testing" "time" @@ -10,6 +9,7 @@ import ( "github.com/RichardKnop/machinery/v1/log" "github.com/RichardKnop/machinery/v1/tasks" "github.com/aws/aws-sdk-go/aws" + "github.com/google/uuid" "github.com/stretchr/testify/assert" awsdynamodb "github.com/aws/aws-sdk-go/service/dynamodb" @@ -22,15 +22,12 @@ func TestNew(t *testing.T) { } func TestInitGroup(t *testing.T) { - groupUUID := "testGroupUUID" - taskUUIDs := []string{"testTaskUUID1", "testTaskUUID2", "testTaskUUID3"} + groupUUID := uuid.New().String() + taskUUIDs := []string{uuid.New().String(), uuid.New().String(), uuid.New().String()} log.INFO.Println(dynamodb.TestDynamoDBBackend.GetConfig()) err := dynamodb.TestDynamoDBBackend.InitGroup(groupUUID, taskUUIDs) - assert.Nil(t, err) - - err = dynamodb.TestErrDynamoDBBackend.InitGroup(groupUUID, taskUUIDs) - assert.NotNil(t, err) + assert.NoError(t, err) // assert proper TTL value is set in InitGroup() dynamodb.TestDynamoDBBackend.GetConfig().ResultsExpireIn = 3 * 3600 // results should expire after 3 hours @@ -48,80 +45,75 @@ func TestInitGroup(t *testing.T) { return &awsdynamodb.PutItemOutput{}, nil } err = dynamodb.TestDynamoDBBackend.InitGroup(groupUUID, taskUUIDs) - assert.Nil(t, err) + assert.NoError(t, err) assert.True(t, isPutItemCalled) client.ResetOverrides() } func assertTTLValue(t *testing.T, expectedTTLTime time.Time, actualEncodedTTLValue string) { actualTTLTimestamp, err := strconv.ParseInt(actualEncodedTTLValue, 10, 64) - assert.Nil(t, err) + assert.NoError(t, err) actualTTLTime := time.Unix(actualTTLTimestamp, 0) assert.WithinDuration(t, expectedTTLTime, actualTTLTime, time.Second) } func TestGroupCompleted(t *testing.T) { - client := dynamodb.TestDynamoDBBackend.GetClient().(*dynamodb.TestDynamoDBClient) - tableName := dynamodb.TestDynamoDBBackend.GetConfig().DynamoDB.TaskStatesTable - // Override DynamoDB BatchGetItem() behavior - var isBatchGetItemCalled bool - client.BatchGetItemOverride = func(input *awsdynamodb.BatchGetItemInput) (*awsdynamodb.BatchGetItemOutput, error) { - isBatchGetItemCalled = true - assert.NotNil(t, input) - assert.Nil(t, input.Validate()) + groupUUID, _ := createGroupWithSucceedTask([]string{tasks.StateSuccess, tasks.StateSuccess}, t) - return &awsdynamodb.BatchGetItemOutput{ - Responses: map[string][]map[string]*awsdynamodb.AttributeValue{ - tableName: { - {"State": {S: aws.String(tasks.StateSuccess)}}, - {"State": {S: aws.String(tasks.StateSuccess)}}, - {"State": {S: aws.String(tasks.StateFailure)}}, - }, - }, - }, nil - } - groupUUID := "testGroupUUID" - isCompleted, err := dynamodb.TestDynamoDBBackend.GroupCompleted(groupUUID, 3) - assert.Nil(t, err) + isCompleted, err := dynamodb.TestDynamoDBBackend.GroupCompleted(groupUUID, 2) + assert.NoError(t, err) assert.True(t, isCompleted) - assert.True(t, isBatchGetItemCalled) - client.ResetOverrides() -} -func TestGroupCompletedReturnsError(t *testing.T) { - client := dynamodb.TestDynamoDBBackend.GetClient().(*dynamodb.TestDynamoDBClient) - client.BatchGetItemOverride = func(input *awsdynamodb.BatchGetItemInput) (*awsdynamodb.BatchGetItemOutput, error) { - return nil, fmt.Errorf("Simulating error from AWS") - } - isCompleted, err := dynamodb.TestDynamoDBBackend.GroupCompleted("test", 3) - assert.NotNil(t, err) - assert.False(t, isCompleted) - client.ResetOverrides() } // TestGroupCompletedReturnsFalse tests that the GroupCompleted() returns false when some tasks have not yet finished. func TestGroupCompletedReturnsFalse(t *testing.T) { - client := dynamodb.TestDynamoDBBackend.GetClient().(*dynamodb.TestDynamoDBClient) - tableName := dynamodb.TestDynamoDBBackend.GetConfig().DynamoDB.TaskStatesTable - // Override DynamoDB BatchGetItem() behavior - client.BatchGetItemOverride = func(_ *awsdynamodb.BatchGetItemInput) (*awsdynamodb.BatchGetItemOutput, error) { - return &awsdynamodb.BatchGetItemOutput{ - Responses: map[string][]map[string]*awsdynamodb.AttributeValue{ - tableName: { - {"State": {S: aws.String(tasks.StateSuccess)}}, - {"State": {S: aws.String(tasks.StateFailure)}}, - {"State": {S: aws.String(tasks.StatePending)}}, - }, - }, - }, nil - } - isCompleted, err := dynamodb.TestDynamoDBBackend.GroupCompleted("testGroup", 3) - assert.Nil(t, err) + groupUUID, _ := createGroupWithSucceedTask([]string{tasks.StateSuccess, tasks.StateSuccess, tasks.StatePending}, t) + + isCompleted, err := dynamodb.TestDynamoDBBackend.GroupCompleted(groupUUID, 3) + assert.NoError(t, err) assert.False(t, isCompleted) - client.ResetOverrides() +} + +func createGroupWithSucceedTask(taskStates []string, t *testing.T) (string, []string) { + groupUUID := uuid.New().String() + taskUUIDs := make([]string, 0, len(taskStates)) + for i := 0; i < len(taskStates); i++ { + taskUUIDs = append(taskUUIDs, uuid.New().String()) + + switch taskStates[i] { + case tasks.StateSuccess: + err := dynamodb.TestDynamoDBBackend.SetStateSuccess(&tasks.Signature{ + UUID: taskUUIDs[i], + }, []*tasks.TaskResult{}) + assert.NoError(t, err) + case tasks.StateFailure: + err := dynamodb.TestDynamoDBBackend.SetStateFailure(&tasks.Signature{ + UUID: taskUUIDs[i]}, "unexpected failure") + assert.NoError(t, err) + case tasks.StatePending: + err := dynamodb.TestDynamoDBBackend.SetStatePending(&tasks.Signature{ + UUID: taskUUIDs[i], + }) + assert.NoError(t, err) + case tasks.StateStarted: + err := dynamodb.TestDynamoDBBackend.SetStateStarted(&tasks.Signature{ + UUID: taskUUIDs[i], + }) + assert.NoError(t, err) + } + + } + log.INFO.Println(dynamodb.TestDynamoDBBackend.GetConfig()) + err := dynamodb.TestDynamoDBBackend.InitGroup(groupUUID, taskUUIDs) + assert.NoError(t, err) + + return groupUUID, taskUUIDs } // TestGroupCompletedReturnsFalse tests that the GroupCompleted() retries the the request until MaxFetchAttempts before returning an error func TestGroupCompletedRetries(t *testing.T) { + groupUUID, taskUUIDs := createGroupWithSucceedTask([]string{tasks.StateSuccess, tasks.StateSuccess}, t) + client := dynamodb.TestDynamoDBBackend.GetClient().(*dynamodb.TestDynamoDBClient) tableName := dynamodb.TestDynamoDBBackend.GetConfig().DynamoDB.TaskStatesTable // Override DynamoDB BatchGetItem() behavior @@ -138,21 +130,23 @@ func TestGroupCompletedRetries(t *testing.T) { UnprocessedKeys: map[string]*awsdynamodb.KeysAndAttributes{ tableName: { Keys: []map[string]*awsdynamodb.AttributeValue{ - {"TaskUUID": {S: aws.String("unfetchedTaskUUID1")}}, - {"TaskUUID": {S: aws.String("unfetchedTaskUUID2")}}, + {"TaskUUID": {S: aws.String(taskUUIDs[0])}}, + {"TaskUUID": {S: aws.String(taskUUIDs[1])}}, }, }, }, }, nil } - _, err := dynamodb.TestDynamoDBBackend.GroupCompleted("testGroup", 3) - assert.NotNil(t, err) + + _, err := dynamodb.TestDynamoDBBackend.GroupCompleted(groupUUID, 3) + assert.Error(t, err) assert.Equal(t, dynamodb.MaxFetchAttempts, countBatchGetItemAPICalls) client.ResetOverrides() } // TestGroupCompletedReturnsFalse tests that the GroupCompleted() retries the the request and returns success if all keys are fetched on retries. func TestGroupCompletedRetrieSuccess(t *testing.T) { + groupID, taskUUIDs := createGroupWithSucceedTask([]string{tasks.StateSuccess, tasks.StateSuccess, tasks.StateSuccess}, t) client := dynamodb.TestDynamoDBBackend.GetClient().(*dynamodb.TestDynamoDBClient) tableName := dynamodb.TestDynamoDBBackend.GetConfig().DynamoDB.TaskStatesTable // Override DynamoDB BatchGetItem() behavior @@ -169,9 +163,9 @@ func TestGroupCompletedRetrieSuccess(t *testing.T) { UnprocessedKeys: map[string]*awsdynamodb.KeysAndAttributes{ tableName: { Keys: []map[string]*awsdynamodb.AttributeValue{ - {"TaskUUID": {S: aws.String("unfetchedTaskUUID1")}}, - {"TaskUUID": {S: aws.String("unfetchedTaskUUID2")}}, - {"TaskUUID": {S: aws.String("unfetchedTaskUUID3")}}, + {"TaskUUID": {S: aws.String(taskUUIDs[0])}}, + {"TaskUUID": {S: aws.String(taskUUIDs[1])}}, + {"TaskUUID": {S: aws.String(taskUUIDs[2])}}, }, }, }, @@ -190,30 +184,27 @@ func TestGroupCompletedRetrieSuccess(t *testing.T) { }, }, nil } - isCompleted, err := dynamodb.TestDynamoDBBackend.GroupCompleted("testGroup", 3) - assert.Nil(t, err) + isCompleted, err := dynamodb.TestDynamoDBBackend.GroupCompleted(groupID, 3) + assert.NoError(t, err) assert.True(t, isCompleted) assert.Equal(t, 2, countBatchGetItemAPICalls) client.ResetOverrides() } func TestPrivateFuncGetGroupMeta(t *testing.T) { - groupUUID := "testGroupUUID" + groupUUID, taskUUIDs := createGroupWithSucceedTask([]string{tasks.StateSuccess, tasks.StateSuccess, tasks.StateSuccess}, t) meta, err := dynamodb.TestDynamoDBBackend.GetGroupMetaForTest(groupUUID) + assert.NoError(t, err) + item := tasks.GroupMeta{ - GroupUUID: "testGroupUUID", + GroupUUID: groupUUID, Lock: false, ChordTriggered: false, - TaskUUIDs: []string{ - "testTaskUUID1", - "testTaskUUID2", - "testTaskUUID3", - }, + TaskUUIDs: taskUUIDs, + CreatedAt: meta.CreatedAt, + TTL: meta.TTL, } - assert.Nil(t, err) assert.EqualValues(t, item, *meta) - _, err = dynamodb.TestErrDynamoDBBackend.GetGroupMetaForTest(groupUUID) - assert.NotNil(t, err) } func TestPrivateFuncUnmarshalTaskStateGetItemResult(t *testing.T) { @@ -257,97 +248,22 @@ func TestPrivateFuncUnmarshalTaskStateGetItemResult(t *testing.T) { State: tasks.StatePending, Error: "", } - state, err := dynamodb.TestErrDynamoDBBackend.UnmarshalTaskStateGetItemResultForTest(&result) - assert.Nil(t, err) + state, err := dynamodb.TestDynamoDBBackend.UnmarshalTaskStateGetItemResultForTest(&result) + assert.NoError(t, err) assert.EqualValues(t, item, *state) _, err = dynamodb.TestDynamoDBBackend.UnmarshalTaskStateGetItemResultForTest(nil) - assert.NotNil(t, err) + assert.Error(t, err) _, err = dynamodb.TestDynamoDBBackend.UnmarshalTaskStateGetItemResultForTest(&invalidResult) - assert.NotNil(t, err) - -} - -func TestPrivateFuncUnmarshalGroupMetaGetItemResult(t *testing.T) { - result := awsdynamodb.GetItemOutput{ - Item: map[string]*awsdynamodb.AttributeValue{ - "TaskUUIDs": { - L: []*awsdynamodb.AttributeValue{ - { - S: aws.String("testTaskUUID1"), - }, - { - S: aws.String("testTaskUUID2"), - }, - { - S: aws.String("testTaskUUID3"), - }, - }, - }, - "ChordTriggered": { - BOOL: aws.Bool(false), - }, - "GroupUUID": { - S: aws.String("testGroupUUID"), - }, - "Lock": { - BOOL: aws.Bool(false), - }, - }, - } - - invalidResult := awsdynamodb.GetItemOutput{ - Item: map[string]*awsdynamodb.AttributeValue{ - "TaskUUIDs": { - L: []*awsdynamodb.AttributeValue{ - { - S: aws.String("testTaskUUID1"), - }, - { - S: aws.String("testTaskUUID2"), - }, - { - S: aws.String("testTaskUUID3"), - }, - }, - }, - "ChordTriggered": { - S: aws.String("false"), // this attribute is invalid - }, - "GroupUUID": { - S: aws.String("testGroupUUID"), - }, - "Lock": { - BOOL: aws.Bool(false), - }, - }, - } - - item := tasks.GroupMeta{ - GroupUUID: "testGroupUUID", - Lock: false, - ChordTriggered: false, - TaskUUIDs: []string{ - "testTaskUUID1", - "testTaskUUID2", - "testTaskUUID3", - }, - } - meta, err := dynamodb.TestErrDynamoDBBackend.UnmarshalGroupMetaGetItemResultForTest(&result) - assert.Nil(t, err) - assert.EqualValues(t, item, *meta) - _, err = dynamodb.TestErrDynamoDBBackend.UnmarshalGroupMetaGetItemResultForTest(nil) - assert.NotNil(t, err) - - _, err = dynamodb.TestErrDynamoDBBackend.UnmarshalGroupMetaGetItemResultForTest(&invalidResult) - assert.NotNil(t, err) + assert.Error(t, err) } func TestPrivateFuncSetTaskState(t *testing.T) { signature := &tasks.Signature{ Name: "Test", + UUID: uuid.New().String(), Args: []tasks.Arg{ { Type: "int64", @@ -356,10 +272,8 @@ func TestPrivateFuncSetTaskState(t *testing.T) { }, } state := tasks.NewPendingTaskState(signature) - err := dynamodb.TestErrDynamoDBBackend.SetTaskStateForTest(state) - assert.NotNil(t, err) - err = dynamodb.TestDynamoDBBackend.SetTaskStateForTest(state) - assert.Nil(t, err) + err := dynamodb.TestDynamoDBBackend.SetTaskStateForTest(state) + assert.NoError(t, err) } // verifyUpdateInput is a helper function to verify valid dynamoDB update input. @@ -380,7 +294,7 @@ func verifyUpdateInput(t *testing.T, input *awsdynamodb.UpdateItemInput, expecte } func TestSetStateSuccess(t *testing.T) { - signature := &tasks.Signature{UUID: "testTaskUUID"} + signature := &tasks.Signature{UUID: uuid.New().String()} // assert correct task ID, state and TTL value is set in SetStateSuccess() dynamodb.TestDynamoDBBackend.GetConfig().ResultsExpireIn = 3 * 3600 // results should expire after 3 hours @@ -394,13 +308,13 @@ func TestSetStateSuccess(t *testing.T) { } err := dynamodb.TestDynamoDBBackend.SetStateSuccess(signature, nil) - assert.Nil(t, err) + assert.NoError(t, err) assert.True(t, isUpdateItemCalled) client.ResetOverrides() } func TestSetStateFailure(t *testing.T) { - signature := &tasks.Signature{UUID: "testTaskUUID"} + signature := &tasks.Signature{UUID: uuid.New().String()} // assert correct task ID, state and TTL value is set in SetStateFailure() dynamodb.TestDynamoDBBackend.GetConfig().ResultsExpireIn = 2 * 3600 // results should expire after 2 hours @@ -414,13 +328,13 @@ func TestSetStateFailure(t *testing.T) { } err := dynamodb.TestDynamoDBBackend.SetStateFailure(signature, "Some error occurred") - assert.Nil(t, err) + assert.NoError(t, err) assert.True(t, isUpdateItemCalled) client.ResetOverrides() } func TestSetStateReceived(t *testing.T) { - signature := &tasks.Signature{UUID: "testTaskUUID"} + signature := &tasks.Signature{UUID: uuid.New().String()} // assert correct task ID, state and *no* TTL value is set in SetStateReceived() dynamodb.TestDynamoDBBackend.GetConfig().ResultsExpireIn = 2 * 3600 // results should expire after 2 hours (ignored for this state) @@ -433,13 +347,13 @@ func TestSetStateReceived(t *testing.T) { } err := dynamodb.TestDynamoDBBackend.SetStateReceived(signature) - assert.Nil(t, err) + assert.NoError(t, err) assert.True(t, isUpdateItemCalled) client.ResetOverrides() } func TestSetStateStarted(t *testing.T) { - signature := &tasks.Signature{UUID: "testTaskUUID"} + signature := &tasks.Signature{UUID: uuid.New().String()} // assert correct task ID, state and *no* TTL value is set in SetStateStarted() dynamodb.TestDynamoDBBackend.GetConfig().ResultsExpireIn = 2 * 3600 // results should expire after 2 hours (ignored for this state) @@ -452,13 +366,13 @@ func TestSetStateStarted(t *testing.T) { } err := dynamodb.TestDynamoDBBackend.SetStateStarted(signature) - assert.Nil(t, err) + assert.NoError(t, err) assert.True(t, isUpdateItemCalled) client.ResetOverrides() } func TestSetStateRetry(t *testing.T) { - signature := &tasks.Signature{UUID: "testTaskUUID"} + signature := &tasks.Signature{UUID: uuid.New().String()} // assert correct task ID, state and *no* TTL value is set in SetStateStarted() dynamodb.TestDynamoDBBackend.GetConfig().ResultsExpireIn = 2 * 3600 // results should expire after 2 hours (ignored for this state) @@ -471,156 +385,109 @@ func TestSetStateRetry(t *testing.T) { } err := dynamodb.TestDynamoDBBackend.SetStateRetry(signature) - assert.Nil(t, err) + assert.NoError(t, err) assert.True(t, isUpdateItemCalled) client.ResetOverrides() } func TestGroupTaskStates(t *testing.T) { + groupUUID, taskUUIDs := createGroupWithSucceedTask([]string{tasks.StatePending, tasks.StateStarted, tasks.StateSuccess}, t) expectedStates := map[string]*tasks.TaskState{ - "testTaskUUID1": { - TaskUUID: "testTaskUUID1", + taskUUIDs[0]: { + TaskUUID: taskUUIDs[0], Results: nil, State: tasks.StatePending, Error: "", }, - "testTaskUUID2": { - TaskUUID: "testTaskUUID2", + taskUUIDs[1]: { + TaskUUID: taskUUIDs[1], Results: nil, State: tasks.StateStarted, Error: "", }, - "testTaskUUID3": { - TaskUUID: "testTaskUUID3", + taskUUIDs[2]: { + TaskUUID: taskUUIDs[2], Results: nil, State: tasks.StateSuccess, Error: "", }, } - client := dynamodb.TestDynamoDBBackend.GetClient().(*dynamodb.TestDynamoDBClient) - tableName := dynamodb.TestDynamoDBBackend.GetConfig().DynamoDB.TaskStatesTable - client.BatchGetItemOverride = func(input *awsdynamodb.BatchGetItemInput) (*awsdynamodb.BatchGetItemOutput, error) { - assert.Nil(t, input.Validate()) - return &awsdynamodb.BatchGetItemOutput{ - Responses: map[string][]map[string]*awsdynamodb.AttributeValue{ - tableName: { - { - "TaskUUID": {S: aws.String("testTaskUUID1")}, - "Results:": {NULL: aws.Bool(true)}, - "State": {S: aws.String(tasks.StatePending)}, - "Error": {NULL: aws.Bool(true)}, - }, - { - "TaskUUID": {S: aws.String("testTaskUUID2")}, - "Results:": {NULL: aws.Bool(true)}, - "State": {S: aws.String(tasks.StateStarted)}, - "Error": {NULL: aws.Bool(true)}, - }, - { - "TaskUUID": {S: aws.String("testTaskUUID3")}, - "Results:": {NULL: aws.Bool(true)}, - "State": {S: aws.String(tasks.StateSuccess)}, - "Error": {NULL: aws.Bool(true)}, - }, - }, - }, - }, nil - } - defer client.ResetOverrides() - states, err := dynamodb.TestDynamoDBBackend.GroupTaskStates("testGroupUUID", 3) - assert.Nil(t, err) + states, err := dynamodb.TestDynamoDBBackend.GroupTaskStates(groupUUID, 3) + assert.NoError(t, err) for _, s := range states { + expectedStates[s.TaskUUID].TTL = s.TTL + expectedStates[s.TaskUUID].CreatedAt = s.CreatedAt assert.EqualValues(t, *s, *expectedStates[s.TaskUUID]) } } func TestTriggerChord(t *testing.T) { - groupUUID := "testGroupUUID" + groupUUID, _ := createGroupWithSucceedTask([]string{tasks.StateSuccess, tasks.StateSuccess, tasks.StateSuccess}, t) triggered, err := dynamodb.TestDynamoDBBackend.TriggerChord(groupUUID) - assert.Nil(t, err) + assert.NoError(t, err) assert.True(t, triggered) } func TestGetState(t *testing.T) { - taskUUID := "testTaskUUID1" - expectedState := &tasks.TaskState{ - TaskUUID: "testTaskUUID1", - Results: nil, - State: tasks.StatePending, - Error: "", - } - client := dynamodb.TestDynamoDBBackend.GetClient().(*dynamodb.TestDynamoDBClient) - client.GetItemOverride = func(input *awsdynamodb.GetItemInput) (*awsdynamodb.GetItemOutput, error) { - return &awsdynamodb.GetItemOutput{ - Item: map[string]*awsdynamodb.AttributeValue{ - "TaskUUID": {S: aws.String("testTaskUUID1")}, - "Results:": {NULL: aws.Bool(true)}, - "State": {S: aws.String(tasks.StatePending)}, - "Error": {NULL: aws.Bool(false)}, - }, - }, nil - } - defer client.ResetOverrides() + taskUUID := uuid.New().String() + err := dynamodb.TestDynamoDBBackend.SetStatePending(&tasks.Signature{ + UUID: taskUUID, + }) state, err := dynamodb.TestDynamoDBBackend.GetState(taskUUID) - assert.Nil(t, err) + assert.NoError(t, err) + + expectedState := &tasks.TaskState{ + TaskUUID: taskUUID, + Results: nil, + State: tasks.StatePending, + Error: "", + CreatedAt: state.CreatedAt, + } assert.EqualValues(t, expectedState, state) } func TestPurgeState(t *testing.T) { - taskUUID := "testTaskUUID1" - err := dynamodb.TestDynamoDBBackend.PurgeState(taskUUID) - assert.Nil(t, err) - - err = dynamodb.TestErrDynamoDBBackend.PurgeState(taskUUID) - assert.NotNil(t, err) + _, taskUUIDs := createGroupWithSucceedTask([]string{tasks.StateSuccess}, t) + err := dynamodb.TestDynamoDBBackend.PurgeState(taskUUIDs[0]) + assert.NoError(t, err) } func TestPurgeGroupMeta(t *testing.T) { - groupUUID := "GroupUUID" + groupUUID, _ := createGroupWithSucceedTask([]string{tasks.StateSuccess}, t) err := dynamodb.TestDynamoDBBackend.PurgeGroupMeta(groupUUID) - assert.Nil(t, err) - - err = dynamodb.TestErrDynamoDBBackend.PurgeGroupMeta(groupUUID) - assert.NotNil(t, err) + assert.NoError(t, err) } func TestPrivateFuncLockGroupMeta(t *testing.T) { - groupUUID := "GroupUUID" + groupUUID, _ := createGroupWithSucceedTask([]string{tasks.StateSuccess}, t) err := dynamodb.TestDynamoDBBackend.LockGroupMetaForTest(groupUUID) - assert.Nil(t, err) - err = dynamodb.TestErrDynamoDBBackend.LockGroupMetaForTest(groupUUID) - assert.NotNil(t, err) + assert.NoError(t, err) } func TestPrivateFuncUnLockGroupMeta(t *testing.T) { groupUUID := "GroupUUID" err := dynamodb.TestDynamoDBBackend.UnlockGroupMetaForTest(groupUUID) - assert.Nil(t, err) - err = dynamodb.TestErrDynamoDBBackend.UnlockGroupMetaForTest(groupUUID) - assert.NotNil(t, err) + assert.NoError(t, err) } func TestPrivateFuncChordTriggered(t *testing.T) { - groupUUID := "GroupUUID" + groupUUID, _ := createGroupWithSucceedTask([]string{tasks.StateSuccess}, t) err := dynamodb.TestDynamoDBBackend.ChordTriggeredForTest(groupUUID) - assert.Nil(t, err) - err = dynamodb.TestErrDynamoDBBackend.ChordTriggeredForTest(groupUUID) - assert.NotNil(t, err) + assert.NoError(t, err) } func TestDynamoDBPrivateFuncUpdateGroupMetaLock(t *testing.T) { - groupUUID := "GroupUUID" + groupUUID, _ := createGroupWithSucceedTask([]string{tasks.StateSuccess}, t) err := dynamodb.TestDynamoDBBackend.UpdateGroupMetaLockForTest(groupUUID, true) - assert.Nil(t, err) - err = dynamodb.TestErrDynamoDBBackend.UpdateGroupMetaLockForTest(groupUUID, true) - assert.NotNil(t, err) + assert.NoError(t, err) } func TestPrivateFuncUpdateToFailureStateWithError(t *testing.T) { signature := &tasks.Signature{ Name: "Test", + UUID: uuid.New().String(), Args: []tasks.Arg{ { Type: "int64", @@ -631,7 +498,7 @@ func TestPrivateFuncUpdateToFailureStateWithError(t *testing.T) { state := tasks.NewFailureTaskState(signature, "This is an error") err := dynamodb.TestDynamoDBBackend.UpdateToFailureStateWithErrorForTest(state) - assert.Nil(t, err) + assert.NoError(t, err) } func TestPrivateFuncTableExistsForTest(t *testing.T) { @@ -642,17 +509,18 @@ func TestPrivateFuncTableExistsForTest(t *testing.T) { func TestPrivateFuncCheckRequiredTablesIfExistForTest(t *testing.T) { err := dynamodb.TestDynamoDBBackend.CheckRequiredTablesIfExistForTest() - assert.Nil(t, err) - taskTable := dynamodb.TestDynamoDBBackend.GetConfig().DynamoDB.TaskStatesTable - groupTable := dynamodb.TestDynamoDBBackend.GetConfig().DynamoDB.GroupMetasTable - err = dynamodb.TestErrDynamoDBBackend.CheckRequiredTablesIfExistForTest() - assert.NotNil(t, err) + assert.NoError(t, err) + dynamodb.TestDynamoDBBackend.GetConfig().DynamoDB.TaskStatesTable = "foo" err = dynamodb.TestDynamoDBBackend.CheckRequiredTablesIfExistForTest() - assert.NotNil(t, err) + assert.Error(t, err) + + taskTable := dynamodb.TestDynamoDBBackend.GetConfig().DynamoDB.TaskStatesTable + groupTable := dynamodb.TestDynamoDBBackend.GetConfig().DynamoDB.GroupMetasTable dynamodb.TestDynamoDBBackend.GetConfig().DynamoDB.TaskStatesTable = taskTable dynamodb.TestDynamoDBBackend.GetConfig().DynamoDB.GroupMetasTable = "foo" err = dynamodb.TestDynamoDBBackend.CheckRequiredTablesIfExistForTest() - assert.NotNil(t, err) + assert.Error(t, err) + dynamodb.TestDynamoDBBackend.GetConfig().DynamoDB.GroupMetasTable = groupTable }