From 21cc690fd3d76afe25842593597967c0f8c52568 Mon Sep 17 00:00:00 2001 From: Lorenzo Franco Ranucci Date: Tue, 13 Dec 2022 10:46:25 +0100 Subject: [PATCH] add headers feature --- .devenv/tor-bare-metal.yaml | 2 + .devenv/tor-docker.yaml | 3 + README.md | 2 - adapters/kafka/event_dispatcher.go | 26 ++- adapters/kafka/producer.go | 13 +- .../api-server/cmd/create_outbox_table.sql | 1 + example/api-server/cmd/run.go | 6 +- example/api-server/go.mod | 1 + example/api-server/go.sum | 1 + example/tor/cmd/run.go | 2 + router/pkg/run/event_handler.go | 41 ++++- router/pkg/run/event_handler_test.go | 150 +++++++++++++++++- router/pkg/run/event_mapper.go | 73 +++++++-- router/pkg/run/run_test.go | 8 +- 14 files changed, 302 insertions(+), 27 deletions(-) diff --git a/.devenv/tor-bare-metal.yaml b/.devenv/tor-bare-metal.yaml index 2cf8e3c..bfb791e 100644 --- a/.devenv/tor-bare-metal.yaml +++ b/.devenv/tor-bare-metal.yaml @@ -4,6 +4,8 @@ dbUser: root dbPassword: root dbOutboxTableRef: my_schema.my_outbox_table aggregateTypeRegexp: "(?i)^order$" +dbHeadersColumnsNames: + - uuid kafkaBrokers: localhost:9093 kafkaTopic: outbox_topic diff --git a/.devenv/tor-docker.yaml b/.devenv/tor-docker.yaml index d8bcf68..d06263a 100644 --- a/.devenv/tor-docker.yaml +++ b/.devenv/tor-docker.yaml @@ -4,6 +4,9 @@ dbUser: root dbPassword: root dbOutboxTableRef: my_schema.my_outbox_table aggregateTypeRegexp: "(?i)^order$" +dbHeadersColumnsNames: + - uuid + kafkaBrokers: kafka:9092 kafkaTopic: outbox_topic diff --git a/README.md b/README.md index 96dc01a..c51c8e7 100644 --- a/README.md +++ b/README.md @@ -63,8 +63,6 @@ Tor is composed of several modules so it can be extensible and make dependencies Set up the system: ```shell -cd .devenv - make up ``` diff --git a/adapters/kafka/event_dispatcher.go b/adapters/kafka/event_dispatcher.go index 5a560ba..d8df883 100644 --- a/adapters/kafka/event_dispatcher.go +++ b/adapters/kafka/event_dispatcher.go @@ -1,5 +1,7 @@ package kafka +import "github.com/Shopify/sarama" + func NewEventDispatcher(producer *Producer) *EventDispatcher { return &EventDispatcher{producer: producer} } @@ -8,6 +10,26 @@ type EventDispatcher struct { producer *Producer } -func (k *EventDispatcher) Dispatch(routingKey string, event []byte) error { - return k.producer.Dispatch(routingKey, event) +func (k *EventDispatcher) Dispatch( + routingKey string, + event []byte, + headers []struct { + Key []byte + Value []byte + }, +) error { + return k.producer.Dispatch(routingKey, event, mapHeaders(headers)) +} + +func mapHeaders(h []struct { + Key []byte + Value []byte +}) []sarama.RecordHeader { + r := make([]sarama.RecordHeader, 0, len(h)) + + for _, v := range h { + r = append(r, v) + } + + return r } diff --git a/adapters/kafka/producer.go b/adapters/kafka/producer.go index c6b8a06..b8093ff 100644 --- a/adapters/kafka/producer.go +++ b/adapters/kafka/producer.go @@ -18,12 +18,17 @@ func NewProducer(brokers []string, topic string) (*Producer, error) { return &Producer{syncProducer: syncProducer, topic: topic}, nil } -func (p *Producer) Dispatch(key string, message []byte) error { +func (p *Producer) Dispatch( + key string, + message []byte, + headers []sarama.RecordHeader, +) error { _, _, err := p.syncProducer.SendMessage( &sarama.ProducerMessage{ - Key: sarama.StringEncoder(key), - Topic: p.topic, - Value: sarama.ByteEncoder(message), + Key: sarama.StringEncoder(key), + Topic: p.topic, + Value: sarama.ByteEncoder(message), + Headers: headers, }, ) diff --git a/example/api-server/cmd/create_outbox_table.sql b/example/api-server/cmd/create_outbox_table.sql index 3c0ef35..7e7148a 100644 --- a/example/api-server/cmd/create_outbox_table.sql +++ b/example/api-server/cmd/create_outbox_table.sql @@ -1,6 +1,7 @@ CREATE TABLE IF NOT EXISTS my_schema.my_outbox_table ( id BIGINT UNSIGNED AUTO_INCREMENT PRIMARY KEY, + uuid CHAR(36) NOT NULL UNIQUE, aggregate_type VARCHAR(255) NOT NULL, aggregate_id VARCHAR(255) NOT NULL, payload LONGBLOB NOT NULL diff --git a/example/api-server/cmd/run.go b/example/api-server/cmd/run.go index 280ea9b..cf704ad 100644 --- a/example/api-server/cmd/run.go +++ b/example/api-server/cmd/run.go @@ -8,6 +8,7 @@ import ( "net/http" _ "github.com/go-sql-driver/mysql" + "github.com/google/uuid" "github.com/jmoiron/sqlx" "github.com/julienschmidt/httprouter" "github.com/sirupsen/logrus" @@ -135,12 +136,13 @@ func (h *HTTPHandler) insertEvent(tx *sqlx.Tx, eventName string, orderUUID strin } query := ` -INSERT INTO my_schema.my_outbox_table (aggregate_type, aggregate_id, payload) -VALUES ('order', :aggregate_id, :payload);` +INSERT INTO my_schema.my_outbox_table (uuid, aggregate_type, aggregate_id, payload) +VALUES (:uuid, 'order', :aggregate_id, :payload);` if _, err := tx.NamedExec( h.db.Rebind(query), map[string]interface{}{ + "uuid": uuid.New().String(), "aggregate_id": orderUUID, "payload": payload, }, diff --git a/example/api-server/go.mod b/example/api-server/go.mod index 0394530..51a9603 100644 --- a/example/api-server/go.mod +++ b/example/api-server/go.mod @@ -4,6 +4,7 @@ go 1.19 require ( github.com/go-sql-driver/mysql v1.6.0 + github.com/google/uuid v1.1.2 github.com/jmoiron/sqlx v1.3.5 github.com/julienschmidt/httprouter v1.3.0 github.com/sirupsen/logrus v1.9.0 diff --git a/example/api-server/go.sum b/example/api-server/go.sum index 12c92bf..a2a5453 100644 --- a/example/api-server/go.sum +++ b/example/api-server/go.sum @@ -115,6 +115,7 @@ github.com/google/pprof v0.0.0-20201023163331-3e6fc7fc9c4c/go.mod h1:kpwsk12EmLe github.com/google/pprof v0.0.0-20201203190320-1bf35d6f28c2/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= github.com/google/pprof v0.0.0-20201218002935-b9804c9f04c2/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= +github.com/google/uuid v1.1.2 h1:EVhdT+1Kseyi1/pUmXKaFxYsDNy9RQYkMWRH68J/W7Y= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg= github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk= diff --git a/example/tor/cmd/run.go b/example/tor/cmd/run.go index 4ea3125..c206c95 100644 --- a/example/tor/cmd/run.go +++ b/example/tor/cmd/run.go @@ -45,6 +45,7 @@ var runCmd = &cobra.Command{ viper.GetString("dbAggregateIDColumnName"), viper.GetString("dbAggregateTypeColumnName"), viper.GetString("dbPayloadColumnName"), + viper.GetStringSlice("dbHeadersColumnsNames"), aggregateTypeRegexp, ) if err != nil { @@ -66,6 +67,7 @@ func init() { viper.MustBindEnv("dbAggregateIDColumnName", "DB_AGGREGATE_ID_COLUMN_NAME") viper.MustBindEnv("dbAggregateTypeColumnName", "DB_AGGREGATE_TYPE_COLUMN_NAME") viper.MustBindEnv("dbPayloadColumnName", "DB_PAYLOAD_COLUMN_NAME") + viper.MustBindEnv("dbHeadersColumnsNames", "DB_HEADERS_COLUMNS_NAME") viper.MustBindEnv("aggregateTypeRegexp", "AGGREGATE_TYPE_REGEXP_EXPRESSION") viper.MustBindEnv("kafkaBrokers", "KAFKA_BROKERS") diff --git a/router/pkg/run/event_handler.go b/router/pkg/run/event_handler.go index f53cc1f..e0fdf4d 100644 --- a/router/pkg/run/event_handler.go +++ b/router/pkg/run/event_handler.go @@ -22,13 +22,26 @@ type StateHandler interface { SetLastPosition(position mysql.Position) error } -type OutboxEvent struct { +type outboxEvent struct { AggregateID string Payload []byte + Headers []eventHeader +} + +type eventHeader struct { + Key []byte + Value []byte } type EventDispatcher interface { - Dispatch(routingKey string, event []byte) error + Dispatch( + routingKey string, + event []byte, + headers []struct { + Key []byte + Value []byte + }, + ) error } func NewEventHandler( @@ -36,6 +49,7 @@ func NewEventHandler( aggregateIDColumnName string, aggregateTypeColumnName string, payloadColumnName string, + headersColumnsNames []string, aggregateTypeRegexp *regexp.Regexp, ) (*EventHandler, error) { actualAggregateIDColumnName := defaultAggregateIDColumnName @@ -63,6 +77,7 @@ func NewEventHandler( aggregateIDColumnName: actualAggregateIDColumnName, aggregateTypeColumnName: actualAggregateTypeColumnName, payloadColumnName: actualPayloadColumnName, + headersColumnsNames: headersColumnsNames, aggregateTypeRegexp: actualAggregateTypeRegexp, }, eventDispatcher: eventDispatcher, @@ -90,7 +105,7 @@ func (h *EventHandler) OnRow(e *canal.RowsEvent) error { } for _, oe := range oes { - err = h.eventDispatcher.Dispatch(oe.AggregateID, oe.Payload) + err = h.eventDispatcher.Dispatch(oe.AggregateID, oe.Payload, mapHeaders(oe.Headers)) if err != nil { return err } @@ -110,3 +125,23 @@ func (h *EventHandler) OnPosSynced(p mysql.Position, g mysql.GTIDSet, f bool) er func (h *EventHandler) String() string { return "EventHandler" } + +func mapHeaders(h []eventHeader) []struct { + Key []byte + Value []byte +} { + if len(h) == 0 { + return nil + } + + r := make([]struct { + Key []byte + Value []byte + }, 0, len(h)) + + for _, v := range h { + r = append(r, v) + } + + return r +} diff --git a/router/pkg/run/event_handler_test.go b/router/pkg/run/event_handler_test.go index ec8c283..cb4bb12 100644 --- a/router/pkg/run/event_handler_test.go +++ b/router/pkg/run/event_handler_test.go @@ -19,6 +19,7 @@ func TestEventHandler_OnRow_HappyPaths(t *testing.T) { aggregateIdColumnName string aggregateTypeColumnName string payloadColumnName string + headersColumnsNames []string aggregateTypeRegexp *regexp.Regexp } type args struct { @@ -253,6 +254,67 @@ func TestEventHandler_OnRow_HappyPaths(t *testing.T) { }, }, }, + { + name: "single row-event, with headers", + args: args{ + e: &canal.RowsEvent{ + Table: &schema.Table{ + Schema: "my_schema", + Name: "outbox", + Columns: []schema.TableColumn{ + { + Name: "aggregate_id", + }, + { + Name: "counter", + }, + { + Name: "aggregate_type", + }, + { + Name: "payload", + }, + { + Name: "uuid", + }, + }, + }, + Action: canal.InsertAction, + Rows: [][]interface{}{ + { + "c44ade3e-9394-4e6e-8d2d-20707d61061c", + 1, + "order", + `{"name": "new order"}`, + "b948f9a6-5797-4585-b386-dd8a1a4e30db", + }, + }, + Header: &replication.EventHeader{}, + }, + }, + fields: fields{ + eventDispatcher: &eventDispatcherMock{}, + headersColumnsNames: []string{"uuid", "counter"}, + }, + wantDispatches: []dispatch{ + { + routingKey: "c44ade3e-9394-4e6e-8d2d-20707d61061c", + event: []byte(`{"name": "new order"}`), headers: []struct { + Key []byte + Value []byte + }{ + { + Key: []byte("uuid"), + Value: []byte("b948f9a6-5797-4585-b386-dd8a1a4e30db"), + }, + { + Key: []byte("counter"), + Value: []byte("1"), + }, + }, + }, + }, + }, } for _, tt := range tests { tt := tt @@ -262,6 +324,7 @@ func TestEventHandler_OnRow_HappyPaths(t *testing.T) { tt.fields.aggregateIdColumnName, tt.fields.aggregateTypeColumnName, tt.fields.payloadColumnName, + tt.fields.headersColumnsNames, tt.fields.aggregateTypeRegexp, ) require.NoError(t, err) @@ -283,6 +346,7 @@ func TestEventHandler_OnRow_UnhappyPaths(t *testing.T) { aggregateIdColumnName string aggregateTypeColumnName string payloadColumnName string + headersColumnsNames []string aggregateTypeRegexp *regexp.Regexp } type args struct { @@ -407,6 +471,45 @@ func TestEventHandler_OnRow_UnhappyPaths(t *testing.T) { }, wantErr: true, }, + { + name: "when a header's column value is missing then error", + args: args{ + e: &canal.RowsEvent{ + Table: &schema.Table{ + Schema: "my_schema", + Name: "outbox", + Columns: []schema.TableColumn{ + { + Name: "aggregate_id", + }, + { + Name: "aggregate_type", + }, + { + Name: "payload", + }, + { + Name: "uuid", + }, + }, + }, + Action: canal.InsertAction, + Rows: [][]interface{}{ + { + "c44ade3e-9394-4e6e-8d2d-20707d61061c", + "order", + "", + }, + }, + Header: &replication.EventHeader{}, + }, + }, + fields: fields{ + eventDispatcher: &eventDispatcherMock{}, + headersColumnsNames: []string{"uuid"}, + }, + wantErr: true, + }, { name: "when aggregate-id column is missing then error", args: args{ @@ -512,6 +615,42 @@ func TestEventHandler_OnRow_UnhappyPaths(t *testing.T) { }, wantErr: true, }, + { + name: "when header column is missing then error", + args: args{ + e: &canal.RowsEvent{ + Table: &schema.Table{ + Schema: "my_schema", + Name: "outbox", + Columns: []schema.TableColumn{ + { + Name: "aggregate_id", + }, + { + Name: "aggregate_type", + }, + { + Name: "payload", + }, + }, + }, + Action: canal.InsertAction, + Rows: [][]interface{}{ + { + "c44ade3e-9394-4e6e-8d2d-20707d61061c", + "order", + `{"name": "new order"}`, + }, + }, + Header: &replication.EventHeader{}, + }, + }, + fields: fields{ + eventDispatcher: &eventDispatcherMock{}, + headersColumnsNames: []string{"uuid"}, + }, + wantErr: true, + }, { name: "when aggregate-id value is not string then error", args: args{ @@ -626,6 +765,7 @@ func TestEventHandler_OnRow_UnhappyPaths(t *testing.T) { tt.fields.aggregateIdColumnName, tt.fields.aggregateTypeColumnName, tt.fields.payloadColumnName, + tt.fields.headersColumnsNames, tt.fields.aggregateTypeRegexp, ) if (err != nil) != tt.wantErrOnConstruct { @@ -650,6 +790,10 @@ func TestEventHandler_OnRow_UnhappyPaths(t *testing.T) { type dispatch struct { routingKey string event []byte + headers []struct { + Key []byte + Value []byte + } } type eventDispatcherMock struct { @@ -657,10 +801,14 @@ type eventDispatcherMock struct { err error } -func (e *eventDispatcherMock) Dispatch(routingKey string, event []byte) error { +func (e *eventDispatcherMock) Dispatch(routingKey string, event []byte, headers []struct { + Key []byte + Value []byte +}) error { e.dispatches = append(e.dispatches, dispatch{ routingKey: routingKey, event: event, + headers: headers, }) return e.err diff --git a/router/pkg/run/event_mapper.go b/router/pkg/run/event_mapper.go index c5c72c5..5981434 100644 --- a/router/pkg/run/event_mapper.go +++ b/router/pkg/run/event_mapper.go @@ -13,29 +13,35 @@ type EventMapper struct { aggregateIDColumnName string aggregateTypeColumnName string payloadColumnName string + headersColumnsNames []string aggregateTypeRegexp *regexp.Regexp } var notInsertError = errors.New("row-event is not an insert") -func (e *EventMapper) Map(event *canal.RowsEvent) ([]OutboxEvent, error) { +func (e *EventMapper) Map(event *canal.RowsEvent) ([]outboxEvent, error) { if event.Action != canal.InsertAction { return nil, notInsertError } - aggregateIDIndex, aggregateTypeIndex, payloadIndex, err := e.getColumnsIndex(event) + aggregateIDIndex, aggregateTypeIndex, payloadIndex, err := e.getMainColumnsIndices(event) if err != nil { return nil, err } - oes := make([]OutboxEvent, 0, len(event.Rows)) + headerColumnsIndices, err := e.getHeadersColumnsIndices(event, e.headersColumnsNames) + if err != nil { + return nil, err + } + + oes := make([]outboxEvent, 0, len(event.Rows)) for _, row := range event.Rows { - err := assertRowSizeIsValid(len(row), []int{aggregateTypeIndex, aggregateIDIndex, payloadIndex}) + err := assertRowSizeIsValid(len(row), []int{aggregateTypeIndex, aggregateIDIndex, payloadIndex}, headerColumnsIndices) if err != nil { return nil, err } - aggregateID, aggregateType, payload, err := getColumnsValue(row, aggregateIDIndex, aggregateTypeIndex, payloadIndex) + aggregateID, aggregateType, payload, err := getMainColumnsValue(row, aggregateIDIndex, aggregateTypeIndex, payloadIndex) if err != nil { return nil, err } @@ -47,26 +53,35 @@ func (e *EventMapper) Map(event *canal.RowsEvent) ([]OutboxEvent, error) { continue } - oes = append(oes, OutboxEvent{ + h := getHeaderColumnsValues(row, headerColumnsIndices) + + oes = append(oes, outboxEvent{ AggregateID: aggregateID, Payload: payload, + Headers: h, }) } return oes, nil } -func assertRowSizeIsValid(rowLen int, columnIndices []int) error { +func assertRowSizeIsValid(rowLen int, columnIndices []int, headerColumnIndices []headerIndex) error { for _, index := range columnIndices { if index >= rowLen { return fmt.Errorf("unexpected event row size") } } + for _, index := range headerColumnIndices { + if index.index >= rowLen { + return fmt.Errorf("unexpected event row size") + } + } + return nil } -func getColumnsValue( +func getMainColumnsValue( row []interface{}, aggregateIDIndex int, aggregateTypeIndex int, @@ -93,7 +108,7 @@ func getColumnsValue( return aggregateID, aggregateType, payload, nil } -func (e *EventMapper) getColumnsIndex(event *canal.RowsEvent) (int, int, int, error) { +func (e *EventMapper) getMainColumnsIndices(event *canal.RowsEvent) (int, int, int, error) { aggregateIDIndex := -1 aggregateTypeIndex := -1 payloadIndex := -1 @@ -128,3 +143,43 @@ func (e *EventMapper) getColumnsIndex(event *canal.RowsEvent) (int, int, int, er return aggregateIDIndex, aggregateTypeIndex, payloadIndex, nil } + +func getHeaderColumnsValues( + row []interface{}, + columnIndicesMap []headerIndex, +) []eventHeader { + r := make([]eventHeader, 0, len(columnIndicesMap)) + for _, i := range columnIndicesMap { + r = append(r, eventHeader{ + Key: []byte(i.name), + Value: []byte(fmt.Sprintf("%v", row[i.index])), + }) + } + + return r +} + +type headerIndex struct { + name string + index int +} + +func (e *EventMapper) getHeadersColumnsIndices(event *canal.RowsEvent, columnNames []string) ([]headerIndex, error) { + r := make([]headerIndex, 0, len(columnNames)) +outerLoop: + for _, cm := range columnNames { + for i, etc := range event.Table.Columns { + if etc.Name == cm { + r = append(r, headerIndex{ + name: cm, + index: i, + }) + continue outerLoop + } + } + + return nil, fmt.Errorf("column not found with name: %s", cm) + } + + return r, nil +} diff --git a/router/pkg/run/run_test.go b/router/pkg/run/run_test.go index 1737f9a..3a16202 100644 --- a/router/pkg/run/run_test.go +++ b/router/pkg/run/run_test.go @@ -13,7 +13,7 @@ import ( ) func TestRunner_RunWhenCanalRunFail(t *testing.T) { - handler, err := run.NewEventHandler(nil, "", "", "", nil) + handler, err := run.NewEventHandler(nil, "", "", "", nil, nil) require.NoError(t, err) expectedErr := errors.New("a") @@ -29,7 +29,7 @@ func TestRunner_RunWhenCanalRunFail(t *testing.T) { } func TestRunner_RunWhenStateHandlerSetFail(t *testing.T) { - handler, err := run.NewEventHandler(nil, "", "", "", nil) + handler, err := run.NewEventHandler(nil, "", "", "", nil, nil) require.NoError(t, err) expectedErr := errors.New("a") @@ -45,7 +45,7 @@ func TestRunner_RunWhenStateHandlerSetFail(t *testing.T) { } func TestRunner_RunWhenStateHandlerGetFail(t *testing.T) { - handler, err := run.NewEventHandler(nil, "", "", "", nil) + handler, err := run.NewEventHandler(nil, "", "", "", nil, nil) require.NoError(t, err) expectedErr := errors.New("a") @@ -61,7 +61,7 @@ func TestRunner_RunWhenStateHandlerGetFail(t *testing.T) { } func TestRunner_RunWhenNoError(t *testing.T) { - handler, err := run.NewEventHandler(nil, "", "", "", nil) + handler, err := run.NewEventHandler(nil, "", "", "", nil, nil) require.NoError(t, err) r := run.NewRunner(