Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -1009,7 +1009,7 @@ make ci
This will basically run docker-compose command:

```sh
(docker-compose -f docker-compose.test.yml -p machinery_ci up --build -d) && (docker logs -f machinery_sut &) && (docker wait machinery_sut)
(docker compose -f docker-compose.test.yml -p machinery_ci up --build -d) && (docker logs -f machinery_sut &) && (docker wait machinery_sut)
```

Alternative approach is to setup a development environment on your machine.
Expand All @@ -1021,6 +1021,7 @@ export AMQP_URL=amqp://guest:guest@localhost:5672/
export REDIS_URL=localhost:6379
export MEMCACHE_URL=localhost:11211
export MONGODB_URL=localhost:27017
export DYNAMODB_URL=localhost:8000
```

To run integration tests against an SQS instance, you will need to create a "test_queue" in SQS and export these environment variables:
Expand Down
10 changes: 10 additions & 0 deletions docker-compose.test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,14 @@ services:
- memcached
- mongo
- gcppubsub
- dynamodb
links:
- rabbitmq
- redis
- memcached
- mongo
- gcppubsub
- dynamodb
build:
context: .
dockerfile: ./Dockerfile.test
Expand All @@ -35,6 +37,7 @@ services:
GCPPUBSUB_URL: 'gcppubsub://example-project/test_subscription_queue'
GCPPUBSUB_TOPIC: 'test_topic_queue'
PUBSUB_EMULATOR_HOST: 'gcppubsub:8085'
DYNAMODB_URL: 'dynamodb:8000'

rabbitmq:
container_name: machinery_sut_rabbitmq
Expand Down Expand Up @@ -70,3 +73,10 @@ services:
dockerfile: ./Dockerfile.gcppubsub
logging:
driver: none

dynamodb:
command: "-jar DynamoDBLocal.jar -sharedDb -dbPath ./data"
image: "amazon/dynamodb-local:latest"
container_name: dynamodb-local
ports:
- "8000:8000"
4 changes: 2 additions & 2 deletions v1/backends/dynamodb/dynamodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ func (b *Backend) PurgeState(taskUUID string) error {
input := &dynamodb.DeleteItemInput{
Key: map[string]*dynamodb.AttributeValue{
"TaskUUID": {
N: aws.String(taskUUID),
S: aws.String(taskUUID),
},
},
TableName: aws.String(b.cnf.DynamoDB.TaskStatesTable),
Expand All @@ -308,7 +308,7 @@ func (b *Backend) PurgeGroupMeta(groupUUID string) error {
input := &dynamodb.DeleteItemInput{
Key: map[string]*dynamodb.AttributeValue{
"GroupUUID": {
N: aws.String(groupUUID),
S: aws.String(groupUUID),
},
},
TableName: aws.String(b.cnf.DynamoDB.GroupMetasTable),
Expand Down
198 changes: 63 additions & 135 deletions v1/backends/dynamodb/dynamodb_export_test.go
Original file line number Diff line number Diff line change
@@ -1,33 +1,27 @@
package dynamodb

import (
"errors"
"os"

"github.com/RichardKnop/machinery/v1/config"
"github.com/RichardKnop/machinery/v1/tasks"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/dynamodb"
"github.com/aws/aws-sdk-go/service/dynamodb/dynamodbiface"

"github.com/RichardKnop/machinery/v1/config"
"github.com/RichardKnop/machinery/v1/tasks"
)

var (
TestDynamoDBBackend *Backend
TestErrDynamoDBBackend *Backend
TestCnf *config.Config
TestDBClient dynamodbiface.DynamoDBAPI
TestErrDBClient dynamodbiface.DynamoDBAPI
TestGroupMeta *tasks.GroupMeta
TestTask1 map[string]*dynamodb.AttributeValue
TestTask2 map[string]*dynamodb.AttributeValue
TestTask3 map[string]*dynamodb.AttributeValue
TestDynamoDBBackend *Backend
TestCnf *config.Config
)

type TestDynamoDBClient struct {
dynamodbiface.DynamoDBAPI
PutItemOverride func(*dynamodb.PutItemInput) (*dynamodb.PutItemOutput, error)
UpdateItemOverride func(*dynamodb.UpdateItemInput) (*dynamodb.UpdateItemOutput, error)
GetItemOverride func(input *dynamodb.GetItemInput) (*dynamodb.GetItemOutput, error)
BatchGetItemOverride func(*dynamodb.BatchGetItemInput) (*dynamodb.BatchGetItemOutput, error)
}

Expand All @@ -41,155 +35,89 @@ func (t *TestDynamoDBClient) PutItem(input *dynamodb.PutItemInput) (*dynamodb.Pu
if t.PutItemOverride != nil {
return t.PutItemOverride(input)
}
return &dynamodb.PutItemOutput{}, nil
return t.DynamoDBAPI.PutItem(input)
}
func (t *TestDynamoDBClient) BatchGetItem(input *dynamodb.BatchGetItemInput) (*dynamodb.BatchGetItemOutput, error) {
if t.BatchGetItemOverride != nil {
return t.BatchGetItemOverride(input)
}
return &dynamodb.BatchGetItemOutput{}, nil
}

func (t *TestDynamoDBClient) GetItem(input *dynamodb.GetItemInput) (*dynamodb.GetItemOutput, error) {
if t.GetItemOverride != nil {
return t.GetItemOverride(input)
}
var output *dynamodb.GetItemOutput
switch *input.TableName {
case "group_metas":
output = &dynamodb.GetItemOutput{
Item: map[string]*dynamodb.AttributeValue{
"TaskUUIDs": {
L: []*dynamodb.AttributeValue{
{
S: aws.String("testTaskUUID1"),
},
{
S: aws.String("testTaskUUID2"),
},
{
S: aws.String("testTaskUUID3"),
},
},
},
"ChordTriggered": {
BOOL: aws.Bool(false),
},
"GroupUUID": {
S: aws.String("testGroupUUID"),
},
"Lock": {
BOOL: aws.Bool(false),
},
},
}
case "task_states":
if input.Key["TaskUUID"] == nil {
output = &dynamodb.GetItemOutput{
Item: map[string]*dynamodb.AttributeValue{
"Error": {
NULL: aws.Bool(false),
},
"State": {
S: aws.String(tasks.StatePending),
},
"TaskUUID": {
S: aws.String("testTaskUUID1"),
},
"Results:": {
NULL: aws.Bool(true),
},
},
}
} else {
if *(input.Key["TaskUUID"].S) == "testTaskUUID1" {
output = &dynamodb.GetItemOutput{
Item: TestTask1,
}
} else if *(input.Key["TaskUUID"].S) == "testTaskUUID2" {
output = &dynamodb.GetItemOutput{
Item: TestTask2,
}

} else if *(input.Key["TaskUUID"].S) == "testTaskUUID3" {
output = &dynamodb.GetItemOutput{
Item: TestTask3,
}
}
}

}
return output, nil
return t.DynamoDBAPI.BatchGetItem(input)
}

func (t *TestDynamoDBClient) DeleteItem(*dynamodb.DeleteItemInput) (*dynamodb.DeleteItemOutput, error) {
return &dynamodb.DeleteItemOutput{}, nil
func (t *TestDynamoDBClient) DeleteItem(input *dynamodb.DeleteItemInput) (*dynamodb.DeleteItemOutput, error) {
return t.DynamoDBAPI.DeleteItem(input)
}

func (t *TestDynamoDBClient) UpdateItem(input *dynamodb.UpdateItemInput) (*dynamodb.UpdateItemOutput, error) {
if t.UpdateItemOverride != nil {
return t.UpdateItemOverride(input)
}
return &dynamodb.UpdateItemOutput{}, nil
}

func (t *TestDynamoDBClient) ListTables(*dynamodb.ListTablesInput) (*dynamodb.ListTablesOutput, error) {
return &dynamodb.ListTablesOutput{
TableNames: []*string{
aws.String("group_metas"),
aws.String("task_states"),
},
}, nil
}

// Always returns error
type TestErrDynamoDBClient struct {
dynamodbiface.DynamoDBAPI
}

func (t *TestErrDynamoDBClient) PutItem(*dynamodb.PutItemInput) (*dynamodb.PutItemOutput, error) {
return nil, errors.New("error when putting an item")
}

func (t *TestErrDynamoDBClient) GetItem(*dynamodb.GetItemInput) (*dynamodb.GetItemOutput, error) {
return nil, errors.New("error when getting an item")
}

func (t *TestErrDynamoDBClient) DeleteItem(*dynamodb.DeleteItemInput) (*dynamodb.DeleteItemOutput, error) {
return nil, errors.New("error when deleting an item")
}

func (t *TestErrDynamoDBClient) Scan(*dynamodb.ScanInput) (*dynamodb.ScanOutput, error) {
return nil, errors.New("error when scanning an item")
return t.DynamoDBAPI.UpdateItem(input)
}

func (t *TestErrDynamoDBClient) UpdateItem(*dynamodb.UpdateItemInput) (*dynamodb.UpdateItemOutput, error) {
return nil, errors.New("error when updating an item")
}
func init() {
os.Setenv("AWS_ACCESS_KEY", "xxx")
os.Setenv("AWS_SECRET_KEY", "xxx")
cred := credentials.NewEnvCredentials()

func (t *TestErrDynamoDBClient) ListTables(*dynamodb.ListTablesInput) (*dynamodb.ListTablesOutput, error) {
return nil, errors.New("error when listing tables")
}
awsSession, _ := session.NewSession()
dynamodbClient := dynamodb.New(awsSession, &aws.Config{
Region: aws.String("us-west-2"),
Credentials: cred,
Endpoint: aws.String(os.Getenv("DYNAMODB_URL")),
})

func init() {
TestCnf = &config.Config{
ResultBackend: os.Getenv("DYNAMODB_URL"),
ResultsExpireIn: 30,
DynamoDB: &config.DynamoDBConfig{
TaskStatesTable: "task_states",
GroupMetasTable: "group_metas",
Client: dynamodbClient,
},
}
TestDBClient = new(TestDynamoDBClient)
TestDynamoDBBackend = &Backend{cnf: TestCnf, client: TestDBClient}

TestErrDBClient = new(TestErrDynamoDBClient)
TestErrDynamoDBBackend = &Backend{cnf: TestCnf, client: TestErrDBClient}
TestDynamoDBBackend = &Backend{cnf: TestCnf, client: &TestDynamoDBClient{DynamoDBAPI: dynamodbClient}}

TestGroupMeta = &tasks.GroupMeta{
GroupUUID: "testGroupUUID",
TaskUUIDs: []string{"testTaskUUID1", "testTaskUUID2", "testTaskUUID3"},
}
dynamodbClient.CreateTable(&dynamodb.CreateTableInput{
AttributeDefinitions: []*dynamodb.AttributeDefinition{
{
AttributeName: aws.String("TaskUUID"),
AttributeType: aws.String("S"),
},
},
KeySchema: []*dynamodb.KeySchemaElement{
{
AttributeName: aws.String("TaskUUID"),
KeyType: aws.String("HASH"),
},
},
ProvisionedThroughput: &dynamodb.ProvisionedThroughput{
ReadCapacityUnits: aws.Int64(10),
WriteCapacityUnits: aws.Int64(10),
},
TableName: aws.String("task_states"),
})

dynamodbClient.CreateTable(&dynamodb.CreateTableInput{
AttributeDefinitions: []*dynamodb.AttributeDefinition{
{
AttributeName: aws.String("GroupUUID"),
AttributeType: aws.String("S"),
},
},
KeySchema: []*dynamodb.KeySchemaElement{
{
AttributeName: aws.String("GroupUUID"),
KeyType: aws.String("HASH"),
},
},
ProvisionedThroughput: &dynamodb.ProvisionedThroughput{
ReadCapacityUnits: aws.Int64(10),
WriteCapacityUnits: aws.Int64(10),
},
TableName: aws.String("group_metas"),
})
}

func (b *Backend) GetConfig() *config.Config {
Expand Down
Loading