diff --git a/events/firehose.go b/events/firehose.go index 85b8fd18..350c5b31 100644 --- a/events/firehose.go +++ b/events/firehose.go @@ -25,6 +25,15 @@ const ( KinesisFirehoseTransformedStateProcessingFailed = "ProcessingFailed" ) +// KinesisFirehoseOTFOperation represents the operation to apply on the record during on-the-fly record routing. +type KinesisFirehoseOTFOperation string + +const ( + KinesisFirehoseOTFOperationInsert KinesisFirehoseOTFOperation = "insert" + KinesisFirehoseOTFOperationUpdate KinesisFirehoseOTFOperation = "update" + KinesisFirehoseOTFOperationDelete KinesisFirehoseOTFOperation = "delete" +) + type KinesisFirehoseResponse struct { Records []KinesisFirehoseResponseRecord `json:"records"` } @@ -37,7 +46,14 @@ type KinesisFirehoseResponseRecord struct { } type KinesisFirehoseResponseRecordMetadata struct { - PartitionKeys map[string]string `json:"partitionKeys"` + PartitionKeys map[string]string `json:"partitionKeys"` + OTFMetadata KinesisFirehoseResponseRecordOTFMetadata `json:"otfMetadata"` +} + +type KinesisFirehoseResponseRecordOTFMetadata struct { + DestinationDatabaseName string `json:"destinationDatabaseName"` + DestinationTableName string `json:"destinationTableName"` + Operation KinesisFirehoseOTFOperation `json:"operation"` // The Operation field must have one of the following values – insert, update, or delete. } type KinesisFirehoseRecordMetadata struct { diff --git a/events/testdata/kinesis-firehose-response.json b/events/testdata/kinesis-firehose-response.json index c7c4466c..4ad248d4 100644 --- a/events/testdata/kinesis-firehose-response.json +++ b/events/testdata/kinesis-firehose-response.json @@ -1,31 +1,46 @@ { - "records": [ - { - "data": "SGVsbG8gV29ybGQ=", - "recordId": "record1", - "result": "TRANSFORMED_STATE_OK", - "metadata": { - "partitionKeys": {} - } - }, - { - "data": "SGVsbG8gV29ybGQ=", - "recordId": "record2", - "result": "TRANSFORMED_STATE_DROPPED", - "metadata": { - "partitionKeys": {} - } - }, - { - "data": "SGVsbG8gV29ybGQ=", - "recordId": "record3", - "result": "TransformedStateOk", - "metadata": { - "partitionKeys": { - "iamKey1": "iamValue1", - "iamKey2": "iamValue2" - } - } - } - ] - } + "records": [ + { + "data": "SGVsbG8gV29ybGQ=", + "recordId": "record1", + "result": "TRANSFORMED_STATE_OK", + "metadata": { + "partitionKeys": {}, + "otfMetadata": { + "destinationTableName": "", + "destinationDatabaseName": "", + "operation": "" + } + } + }, + { + "data": "SGVsbG8gV29ybGQ=", + "recordId": "record2", + "result": "TRANSFORMED_STATE_DROPPED", + "metadata": { + "partitionKeys": {}, + "otfMetadata": { + "destinationTableName": "", + "destinationDatabaseName": "", + "operation": "" + } + } + }, + { + "data": "SGVsbG8gV29ybGQ=", + "recordId": "record3", + "result": "TransformedStateOk", + "metadata": { + "partitionKeys": { + "iamKey1": "iamValue1", + "iamKey2": "iamValue2" + }, + "otfMetadata": { + "destinationTableName": "table1", + "destinationDatabaseName": "database1", + "operation": "update" + } + } + } + ] +}