Skip to content

Commit

Permalink
Pubsub notificaiton setup for low downtime migration (#656)
Browse files Browse the repository at this point in the history
* Pubsub notificaiton setup and cleanup

* reverting config json commit

* ui resources

* Refactoring methods

* addressing comments

* addressing comments

* addressing comments

* Introducing DataflowOutput struct for StartMigration method

* Documenting required permissions

* Documenting architecture

* Adding note

* Moving notes to top

* Updating the note

* updating readme and updating subfolder creation logic

* updating error log message

* using zap logger
  • Loading branch information
darshan-sj authored Oct 18, 2023
1 parent 998c8fe commit 386defc
Show file tree
Hide file tree
Showing 20 changed files with 410 additions and 97 deletions.
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@

[![integration-tests-against-emulator](https://github.com/GoogleCloudPlatform/spanner-migration-tool/actions/workflows/integration-tests-against-emulator.yaml/badge.svg)](https://github.com/GoogleCloudPlatform/spanner-migration-tool/actions/workflows/integration-tests-against-emulator.yaml) [![code-coverage-check](https://github.com/GoogleCloudPlatform/spanner-migration-tool/actions/workflows/test-coverage.yaml/badge.svg)](https://github.com/GoogleCloudPlatform/spanner-migration-tool/actions/workflows/test-coverage.yaml) [![codecov](https://codecov.io/gh/GoogleCloudPlatform/spanner-migration-tool/graph/badge.svg?token=HY9RCUlxzm)](https://codecov.io/gh/GoogleCloudPlatform/spanner-migration-tool)


> [!IMPORTANT]
> We have changed architecture of the minimal downtime migration and added Pub/Sub notifications component. There are some changes in required permissions because of the new component. Please go through [Permissions page](https://googlecloudplatform.github.io/spanner-migration-tool/permissions.html) and [design page](https://googlecloudplatform.github.io/spanner-migration-tool/minimal) of the documentation.

## Overview

Spanner migration tool is a stand-alone open source tool for Cloud Spanner evaluation and
Expand Down
19 changes: 16 additions & 3 deletions conversion/conversion.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,10 +298,14 @@ func dataFromDatabase(ctx context.Context, sourceProfile profiles.SourceProfile,
if err != nil {
return nil, err
}
err = infoSchema.StartStreamingMigration(ctx, client, conv, streamInfo)
dfOutput, err := infoSchema.StartStreamingMigration(ctx, client, conv, streamInfo)
if err != nil {
return nil, err
}
dfJobId := dfOutput.JobID
gcloudCmd := dfOutput.GCloudCmd
streamingCfg, _ := streamInfo["streamingCfg"].(streaming.StreamingCfg)
streaming.StoreGeneratedResources(conv, streamingCfg, dfJobId, gcloudCmd, targetProfile.Conn.Sp.Project, "")
return bw, nil
}
return performSnapshotMigration(config, conv, client, infoSchema, internal.AdditionalDataAttributes{ShardId: ""}), nil
Expand All @@ -321,6 +325,7 @@ func dataFromDatabaseForDMSMigration() (*writer.BatchWriter, error) {
func dataFromDatabaseForDataflowMigration(targetProfile profiles.TargetProfile, ctx context.Context, sourceProfile profiles.SourceProfile, conv *internal.Conv) (*writer.BatchWriter, error) {
updateShardsWithDataflowConfig(sourceProfile.Config.ShardConfigurationDataflow)
conv.Audit.StreamingStats.ShardToDataStreamNameMap = make(map[string]string)
conv.Audit.StreamingStats.ShardToPubsubIdMap = make(map[string]internal.PubsubCfg)
conv.Audit.StreamingStats.ShardToDataflowInfoMap = make(map[string]internal.ShardedDataflowJobResources)
tableList, err := common.GetIncludedSrcTablesFromConv(conv)
if err != nil {
Expand Down Expand Up @@ -348,13 +353,21 @@ func dataFromDatabaseForDataflowMigration(targetProfile profiles.TargetProfile,
return common.TaskResult[*profiles.DataShard]{Result: p, Err: err}
}
fmt.Printf("Initiating migration for shard: %v\n", p.DataShardId)

pubsubCfg, err := streaming.CreatePubsubResources(ctx, targetProfile.Conn.Sp.Project, streamingCfg.DatastreamCfg.DestinationConnectionConfig, targetProfile.Conn.Sp.Dbname)
if err != nil {
return common.TaskResult[*profiles.DataShard]{Result: p, Err: err}
}
streamingCfg.PubsubCfg = *pubsubCfg
err = streaming.LaunchStream(ctx, sourceProfile, p.LogicalShards, targetProfile.Conn.Sp.Project, streamingCfg.DatastreamCfg)
if err != nil {
return common.TaskResult[*profiles.DataShard]{Result: p, Err: err}
}
streamingCfg.DataflowCfg.DbNameToShardIdMap = dbNameToShardIdMap
err = streaming.StartDataflow(ctx, targetProfile, streamingCfg, conv)
dfOutput, err := streaming.StartDataflow(ctx, targetProfile, streamingCfg, conv)
if err != nil {
return common.TaskResult[*profiles.DataShard]{Result: p, Err: err}
}
streaming.StoreGeneratedResources(conv, streamingCfg, dfOutput.JobID, dfOutput.GCloudCmd, targetProfile.Conn.Sp.Project, p.DataShardId)
return common.TaskResult[*profiles.DataShard]{Result: p, Err: err}
}
_, err = common.RunParallelTasks(sourceProfile.Config.ShardConfigurationDataflow.DataShards, 20, asyncProcessShards, true)
Expand Down
3 changes: 3 additions & 0 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ Spanner migration tool (SMT) is a stand-alone open source tool for Cloud Spanner

---

{: .highlight }
We have changed architecture of the minimal downtime migration and added Pub/Sub notifications component. There are changes on required permissions to run the migrations because of the new component. Please go through [Permissions page](./permissions.md) and [design page](./minimal/minimal.md) of the documentation.

Spanner migration tool is a stand-alone open source tool for Cloud Spanner evaluation and
migration, using data from an existing PostgreSQL, MySQL, SQL Server, Oracle or DynamoDB database.
The tool ingests schema and data from either a pg_dump/mysqldump file or directly
Expand Down
4 changes: 2 additions & 2 deletions docs/minimal/minimal.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ permalink: /minimal
{: .note }
Minimal downtime migrations are only supported for MySQL, Postgres and Oracle source databases.

A minimal downtime migration consists of two components, migration of existing data from the database and the stream of changes (writes and updates) that are made to the source database during migration, referred to as change database capture (CDC). Using Spanner migration tool, the entire process where Datastream reads data from the source database and writes to a GCS bucket and data flow reads data from GCS bucket and writes to spanner database can be orchestrated using a unified interface. Performing schema changes on the source database during the migration is not supported. This is the suggested mode of migration for most databases.
A minimal downtime migration consists of two components, migration of existing data from the database and the stream of changes (writes and updates) that are made to the source database during migration, referred to as change database capture (CDC). The process of migration involves Datastream reading data from the source database and writing to a GCS bucket, then GCS publishing a notification to Pub/Sub topic on each new file, then a Dataflow job when notified by the Pub/Sub subscription about the new file, reading the data from GCS bucket and writing to spanner database. With Spanner migration tool, this entire process can be orchestrated using a unified interface. Performing schema changes on the source database during the migration is not supported. This is the suggested mode of migration for most databases.

![](https://services.google.com/fh/files/helpcenter/asset-ripjb7eowf.png)

Expand All @@ -23,7 +23,7 @@ Sharded migrations are currently only supported for MySQL.

Spanner migration tool supports sharded migrations for MySQL. Spanner migration tool does this is by multiplexing a minimal downtime migration across multiple shards. It uses the user configured schema while transforming the data read from each shard, at the time of writing to Spanner automatically. This provides an integrated experience to perform an end-to-end sharded migration. Below is the architecture of how sharded migrations work:

![](https://services.google.com/fh/files/misc/smt_shard_arch.png)
![](https://services.google.com/fh/files/misc/hb_sharded_migrations_with_pubsub_1.png)

### Terminology

Expand Down
22 changes: 21 additions & 1 deletion docs/permissions.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ Ensure that Datastream and Dataflow apis are enabled on your project.
```sh
gcloud services enable storage.googleapis.com
```
5. Enable the Pub/Sub api by using:

```sh
gcloud services enable pubsub.googleapis.com
```

### Configuring connectivity for `spanner-migration-tool`

Expand Down Expand Up @@ -127,6 +132,20 @@ Grant the user **Editor role** to create buckets in the project.

Enable access to Datastream, Dataflow and Spanner using [service accounts](https://cloud.google.com/compute/docs/access/create-enable-service-accounts-for-instances).

### Pub/Sub

Grant the user [**Pub/Sub Editor**](https://cloud.google.com/pubsub/docs/access-control#pubsub.editor) to create Pub/Sub topic and subscription for low downtime migrations.

Additionally, we need to grant Pub/Sub publisher permission to GCS service agent. This will enable GCS to push a notification to a Pub/Sub topic whenever a new file is created. Refer to [this](https://cloud.google.com/storage/docs/reporting-changes#before-you-begin) page for more details.
1. Get the GCS service agent id using the following command:
```sh
gcloud storage service-agent --project=<PROJECT_ID>
```
2. Grant pubsub publisher role to the service agent using the following command:
```sh
gcloud projects add-iam-policy-binding PROJECT_ID --member=serviceAccount:<GCS_SERVICE_ACCOUNT_ID> --role=roles/pubsub.publisher
```

### Other Permissions

In addition to these, the `DatastreamToSpanner` pipeline created by SMT requires
Expand All @@ -142,4 +161,5 @@ the following roles as well:
- Cloud Spanner Database user
- Cloud Spanner Restore Admin
- Cloud Spanner Viewer
- Dataflow Worker
- Dataflow Worker
- Pub/Sub Subscriber
14 changes: 14 additions & 0 deletions internal/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,20 @@ type streamingStats struct {
DataflowGcloudCmd string
ShardToDataStreamNameMap map[string]string
ShardToDataflowInfoMap map[string]ShardedDataflowJobResources
PubsubCfg PubsubCfg
ShardToPubsubIdMap map[string]PubsubCfg
}

type PubsubCfg struct {
TopicId string
SubscriptionId string
NotificationId string
BucketName string
}

type DataflowOutput struct {
JobID string
GCloudCmd string
}

// Stores information related to rules during schema conversion
Expand Down
2 changes: 1 addition & 1 deletion sources/common/infoschema.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type InfoSchema interface {
GetIndexes(conv *internal.Conv, table SchemaAndName, colNameIdMp map[string]string) ([]schema.Index, error)
ProcessData(conv *internal.Conv, tableId string, srcSchema schema.Table, spCols []string, spSchema ddl.CreateTable, additionalAttributes internal.AdditionalDataAttributes) error
StartChangeDataCapture(ctx context.Context, conv *internal.Conv) (map[string]interface{}, error)
StartStreamingMigration(ctx context.Context, client *sp.Client, conv *internal.Conv, streamInfo map[string]interface{}) error
StartStreamingMigration(ctx context.Context, client *sp.Client, conv *internal.Conv, streamInfo map[string]interface{}) (internal.DataflowOutput, error)
}

// SchemaAndName contains the schema and name for a table
Expand Down
4 changes: 2 additions & 2 deletions sources/dynamodb/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ func (isi InfoSchemaImpl) StartChangeDataCapture(ctx context.Context, conv *inte
// StartStreamingMigration starts the streaming migration process by creating a seperate
// worker thread/goroutine for each table's DynamoDB Stream. It catches Ctrl+C signal if
// customer wants to stop the process.
func (isi InfoSchemaImpl) StartStreamingMigration(ctx context.Context, client *sp.Client, conv *internal.Conv, latestStreamArn map[string]interface{}) error {
func (isi InfoSchemaImpl) StartStreamingMigration(ctx context.Context, client *sp.Client, conv *internal.Conv, latestStreamArn map[string]interface{}) (internal.DataflowOutput, error) {
fmt.Println("Processing of DynamoDB Streams started...")
fmt.Println("Use Ctrl+C to stop the process.")

Expand All @@ -243,7 +243,7 @@ func (isi InfoSchemaImpl) StartStreamingMigration(ctx context.Context, client *s
fillConvWithStreamingStats(streamInfo, conv)

fmt.Println("DynamoDB Streams processed successfully.")
return nil
return internal.DataflowOutput{}, nil
}

func getSchemaIndexStruct(indexName string, keySchema []*dynamodb.KeySchemaElement, colNameIdMap map[string]string) schema.Index {
Expand Down
21 changes: 15 additions & 6 deletions sources/mysql/infoschema.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,10 +364,19 @@ func (isi InfoSchemaImpl) StartChangeDataCapture(ctx context.Context, conv *inte
mp := make(map[string]interface{})
var (
tableList []string
err error
err error
)
tableList, err = common.GetIncludedSrcTablesFromConv(conv)
streamingCfg, err := streaming.StartDatastream(ctx, isi.SourceProfile, isi.TargetProfile, tableList)
streamingCfg, err := streaming.ReadStreamingConfig(isi.SourceProfile.Conn.Mysql.StreamingConfig, isi.TargetProfile.Conn.Sp.Dbname, tableList)
if err != nil {
return nil, fmt.Errorf("error reading streaming config: %v", err)
}
pubsubCfg, err := streaming.CreatePubsubResources(ctx, isi.TargetProfile.Conn.Sp.Project, streamingCfg.DatastreamCfg.DestinationConnectionConfig, isi.SourceProfile.Conn.Mysql.Db)
if err != nil {
return nil, fmt.Errorf("error creating pubsub resources: %v", err)
}
streamingCfg.PubsubCfg = *pubsubCfg
streamingCfg, err = streaming.StartDatastream(ctx, streamingCfg, isi.SourceProfile, isi.TargetProfile, tableList)
if err != nil {
err = fmt.Errorf("error starting datastream: %v", err)
return nil, err
Expand All @@ -378,15 +387,15 @@ func (isi InfoSchemaImpl) StartChangeDataCapture(ctx context.Context, conv *inte

// StartStreamingMigration is used for automatic triggering of Dataflow job when
// performing a streaming migration.
func (isi InfoSchemaImpl) StartStreamingMigration(ctx context.Context, client *sp.Client, conv *internal.Conv, streamingInfo map[string]interface{}) error {
func (isi InfoSchemaImpl) StartStreamingMigration(ctx context.Context, client *sp.Client, conv *internal.Conv, streamingInfo map[string]interface{}) (internal.DataflowOutput, error) {
streamingCfg, _ := streamingInfo["streamingCfg"].(streaming.StreamingCfg)

err := streaming.StartDataflow(ctx, isi.TargetProfile, streamingCfg, conv)
dfOutput, err := streaming.StartDataflow(ctx, isi.TargetProfile, streamingCfg, conv)
if err != nil {
err = fmt.Errorf("error starting dataflow: %v", err)
return err
return internal.DataflowOutput{}, err
}
return nil
return dfOutput, nil
}

func toType(dataType string, columnType string, charLen sql.NullInt64, numericPrecision, numericScale sql.NullInt64) schema.Type {
Expand Down
21 changes: 15 additions & 6 deletions sources/oracle/infoschema.go
Original file line number Diff line number Diff line change
Expand Up @@ -428,10 +428,19 @@ func (isi InfoSchemaImpl) StartChangeDataCapture(ctx context.Context, conv *inte
mp := make(map[string]interface{})
var (
tableList []string
err error
err error
)
tableList, err = common.GetIncludedSrcTablesFromConv(conv)
streamingCfg, err := streaming.StartDatastream(ctx, isi.SourceProfile, isi.TargetProfile, tableList)
streamingCfg, err := streaming.ReadStreamingConfig(isi.SourceProfile.Conn.Oracle.StreamingConfig, isi.TargetProfile.Conn.Sp.Dbname, tableList)
if err != nil {
return nil, fmt.Errorf("error reading streaming config: %v", err)
}
pubsubCfg, err := streaming.CreatePubsubResources(ctx, isi.TargetProfile.Conn.Sp.Project, streamingCfg.DatastreamCfg.DestinationConnectionConfig, isi.SourceProfile.Conn.Oracle.User)
if err != nil {
return nil, fmt.Errorf("error creating pubsub resources: %v", err)
}
streamingCfg.PubsubCfg = *pubsubCfg
streamingCfg, err = streaming.StartDatastream(ctx, streamingCfg, isi.SourceProfile, isi.TargetProfile, tableList)
if err != nil {
err = fmt.Errorf("error starting datastream: %v", err)
return nil, err
Expand All @@ -442,13 +451,13 @@ func (isi InfoSchemaImpl) StartChangeDataCapture(ctx context.Context, conv *inte

// StartStreamingMigration is used for automatic triggering of Dataflow job when
// performing a streaming migration.
func (isi InfoSchemaImpl) StartStreamingMigration(ctx context.Context, client *sp.Client, conv *internal.Conv, streamingInfo map[string]interface{}) error {
func (isi InfoSchemaImpl) StartStreamingMigration(ctx context.Context, client *sp.Client, conv *internal.Conv, streamingInfo map[string]interface{}) (internal.DataflowOutput, error) {
streamingCfg, _ := streamingInfo["streamingCfg"].(streaming.StreamingCfg)
err := streaming.StartDataflow(ctx, isi.TargetProfile, streamingCfg, conv)
dfOutput, err := streaming.StartDataflow(ctx, isi.TargetProfile, streamingCfg, conv)
if err != nil {
return err
return internal.DataflowOutput{}, err
}
return nil
return dfOutput, nil
}

func toType(dataType string, typecode, elementDataType sql.NullString, charLen sql.NullInt64, numericPrecision, numericScale, elementCharMaxLen, elementNumericPrecision, elementNumericScale sql.NullInt64) schema.Type {
Expand Down
23 changes: 15 additions & 8 deletions sources/postgres/infoschema.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,22 @@ func (isi InfoSchemaImpl) StartChangeDataCapture(ctx context.Context, conv *inte
mp := make(map[string]interface{})
var (
tableList []string
err error
err error
)
tableList, err = common.GetIncludedSrcTablesFromConv(conv)
if err != nil {
err = fmt.Errorf("error fetching the tableList to setup datastream migration, defaulting to all tables: %v", err)
}
streamingCfg, err := streaming.StartDatastream(ctx, isi.SourceProfile, isi.TargetProfile, tableList)
streamingCfg, err := streaming.ReadStreamingConfig(isi.SourceProfile.Conn.Pg.StreamingConfig, isi.TargetProfile.Conn.Sp.Dbname, tableList)
if err != nil {
return nil, fmt.Errorf("error reading streaming config: %v", err)
}
pubsubCfg, err := streaming.CreatePubsubResources(ctx, isi.TargetProfile.Conn.Sp.Project, streamingCfg.DatastreamCfg.DestinationConnectionConfig, isi.TargetProfile.Conn.Sp.Dbname)
if err != nil {
return nil, fmt.Errorf("error creating pubsub resources: %v", err)
}
streamingCfg.PubsubCfg = *pubsubCfg
streamingCfg, err = streaming.StartDatastream(ctx, streamingCfg, isi.SourceProfile, isi.TargetProfile, tableList)
if err != nil {
err = fmt.Errorf("error starting datastream: %v", err)
return nil, err
Expand All @@ -78,19 +87,17 @@ func (isi InfoSchemaImpl) StartChangeDataCapture(ctx context.Context, conv *inte
return mp, err
}



// StartStreamingMigration is used for automatic triggering of Dataflow job when
// performing a streaming migration.
func (isi InfoSchemaImpl) StartStreamingMigration(ctx context.Context, client *sp.Client, conv *internal.Conv, streamingInfo map[string]interface{}) error {
func (isi InfoSchemaImpl) StartStreamingMigration(ctx context.Context, client *sp.Client, conv *internal.Conv, streamingInfo map[string]interface{}) (internal.DataflowOutput, error) {
streamingCfg, _ := streamingInfo["streamingCfg"].(streaming.StreamingCfg)

err := streaming.StartDataflow(ctx, isi.TargetProfile, streamingCfg, conv)
dfOutput, err := streaming.StartDataflow(ctx, isi.TargetProfile, streamingCfg, conv)
if err != nil {
err = fmt.Errorf("error starting dataflow: %v", err)
return err
return internal.DataflowOutput{}, err
}
return nil
return dfOutput, nil
}

// GetToDdl function below implement the common.InfoSchema interface.
Expand Down
4 changes: 2 additions & 2 deletions sources/spanner/infoschema.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,8 @@ func (isi InfoSchemaImpl) StartChangeDataCapture(ctx context.Context, conv *inte
return nil, nil
}

func (isi InfoSchemaImpl) StartStreamingMigration(ctx context.Context, client *spanner.Client, conv *internal.Conv, streamingInfo map[string]interface{}) error {
return nil
func (isi InfoSchemaImpl) StartStreamingMigration(ctx context.Context, client *spanner.Client, conv *internal.Conv, streamingInfo map[string]interface{}) (internal.DataflowOutput, error) {
return internal.DataflowOutput{}, nil
}

// GetTableName returns table name.
Expand Down
4 changes: 2 additions & 2 deletions sources/sqlserver/infoschema.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ func (isi InfoSchemaImpl) StartChangeDataCapture(ctx context.Context, conv *inte
return nil, nil
}

func (isi InfoSchemaImpl) StartStreamingMigration(ctx context.Context, client *sp.Client, conv *internal.Conv, streamingInfo map[string]interface{}) error {
return nil
func (isi InfoSchemaImpl) StartStreamingMigration(ctx context.Context, client *sp.Client, conv *internal.Conv, streamingInfo map[string]interface{}) (internal.DataflowOutput, error) {
return internal.DataflowOutput{}, nil
}

// GetTableName returns table name.
Expand Down
Loading

0 comments on commit 386defc

Please sign in to comment.