@@ -31,6 +31,8 @@ import (
31
31
dataflowpb "google.golang.org/genproto/googleapis/dataflow/v1beta3"
32
32
"google.golang.org/protobuf/types/known/fieldmaskpb"
33
33
34
+ resourcemanager "cloud.google.com/go/resourcemanager/apiv3"
35
+ resourcemanagerpb "cloud.google.com/go/resourcemanager/apiv3/resourcemanagerpb"
34
36
"github.com/GoogleCloudPlatform/spanner-migration-tool/common/constants"
35
37
"github.com/GoogleCloudPlatform/spanner-migration-tool/common/utils"
36
38
"github.com/GoogleCloudPlatform/spanner-migration-tool/internal"
@@ -106,7 +108,7 @@ type DataflowCfg struct {
106
108
107
109
type StreamingCfg struct {
108
110
DatastreamCfg DatastreamCfg `json:"datastreamCfg"`
109
- GcsCfg GcsCfg `json:"gcsCfg"`
111
+ GcsCfg GcsCfg `json:"gcsCfg"`
110
112
DataflowCfg DataflowCfg `json:"dataflowCfg"`
111
113
TmpDir string `json:"tmpDir"`
112
114
PubsubCfg internal.PubsubResources `json:"pubsubCfg"`
@@ -463,7 +465,8 @@ func createNotificationOnBucket(ctx context.Context, storageClient *storage.Clie
463
465
464
466
// LaunchStream populates the parameters from the streaming config and triggers a stream on Cloud Datastream.
465
467
func LaunchStream (ctx context.Context , sourceProfile profiles.SourceProfile , dbList []profiles.LogicalShard , projectID string , datastreamCfg DatastreamCfg ) error {
466
- fmt .Println ("Launching stream " , fmt .Sprintf ("projects/%s/locations/%s" , projectID , datastreamCfg .StreamLocation ))
468
+ projectNumberResource := GetProjectNumberResource (ctx , fmt .Sprintf ("projects/%s" , projectID ))
469
+ fmt .Println ("Launching stream " , fmt .Sprintf ("%s/locations/%s" , projectNumberResource , datastreamCfg .StreamLocation ))
467
470
dsClient , err := datastream .NewClient (ctx )
468
471
if err != nil {
469
472
return fmt .Errorf ("datastream client can not be created: %v" , err )
@@ -478,15 +481,15 @@ func LaunchStream(ctx context.Context, sourceProfile profiles.SourceProfile, dbL
478
481
FileFormat : & datastreampb.GcsDestinationConfig_AvroFileFormat {},
479
482
}
480
483
srcCfg := & datastreampb.SourceConfig {
481
- SourceConnectionProfile : fmt .Sprintf ("projects/ %s/locations/%s/connectionProfiles/%s" , projectID , datastreamCfg .SourceConnectionConfig .Location , datastreamCfg .SourceConnectionConfig .Name ),
484
+ SourceConnectionProfile : fmt .Sprintf ("%s/locations/%s/connectionProfiles/%s" , projectNumberResource , datastreamCfg .SourceConnectionConfig .Location , datastreamCfg .SourceConnectionConfig .Name ),
482
485
}
483
486
err = getSourceStreamConfig (srcCfg , sourceProfile , dbList , datastreamCfg )
484
487
if err != nil {
485
488
return fmt .Errorf ("could not get source stream config: %v" , err )
486
489
}
487
490
488
491
dstCfg := & datastreampb.DestinationConfig {
489
- DestinationConnectionProfile : fmt .Sprintf ("projects/ %s/locations/%s/connectionProfiles/%s" , projectID , datastreamCfg .DestinationConnectionConfig .Location , datastreamCfg .DestinationConnectionConfig .Name ),
492
+ DestinationConnectionProfile : fmt .Sprintf ("%s/locations/%s/connectionProfiles/%s" , projectNumberResource , datastreamCfg .DestinationConnectionConfig .Location , datastreamCfg .DestinationConnectionConfig .Name ),
490
493
DestinationStreamConfig : & datastreampb.DestinationConfig_GcsDestinationConfig {GcsDestinationConfig : gcsDstCfg },
491
494
}
492
495
streamInfo := & datastreampb.Stream {
@@ -497,7 +500,7 @@ func LaunchStream(ctx context.Context, sourceProfile profiles.SourceProfile, dbL
497
500
BackfillStrategy : & datastreampb.Stream_BackfillAll {BackfillAll : & datastreampb.Stream_BackfillAllStrategy {}},
498
501
}
499
502
createStreamRequest := & datastreampb.CreateStreamRequest {
500
- Parent : fmt .Sprintf ("projects/ %s/locations/%s" , projectID , datastreamCfg .StreamLocation ),
503
+ Parent : fmt .Sprintf ("%s/locations/%s" , projectNumberResource , datastreamCfg .StreamLocation ),
501
504
StreamId : datastreamCfg .StreamId ,
502
505
Stream : streamInfo ,
503
506
}
@@ -518,7 +521,7 @@ func LaunchStream(ctx context.Context, sourceProfile profiles.SourceProfile, dbL
518
521
fmt .Println ("Successfully created stream " , datastreamCfg .StreamId )
519
522
520
523
fmt .Print ("Setting stream state to RUNNING..." )
521
- streamInfo .Name = fmt .Sprintf ("projects/ %s/locations/%s/streams/%s" , projectID , datastreamCfg .StreamLocation , datastreamCfg .StreamId )
524
+ streamInfo .Name = fmt .Sprintf ("%s/locations/%s/streams/%s" , projectNumberResource , datastreamCfg .StreamLocation , datastreamCfg .StreamId )
522
525
updateStreamRequest := & datastreampb.UpdateStreamRequest {
523
526
UpdateMask : & fieldmaskpb.FieldMask {Paths : []string {"state" }},
524
527
Stream : streamInfo ,
@@ -756,6 +759,36 @@ func CreateStreamingConfig(pl profiles.DataShard) StreamingCfg {
756
759
return streamingCfg
757
760
}
758
761
762
+ // Maps Project-Id to ProjectNumber.
763
+ var ProjectNumberResourceCache sync.Map
764
+
765
+ // Returns a string that encodes the project number like `projects/12345`
766
+ func GetProjectNumberResource (ctx context.Context , projectID string ) string {
767
+ projectNumberResource , found := ProjectNumberResourceCache .Load (projectID )
768
+ if found {
769
+ return projectNumberResource .(string )
770
+ }
771
+
772
+ rmClient , err := resourcemanager .NewProjectsClient (ctx )
773
+ if err != nil {
774
+ logger .Log .Warn (fmt .Sprintf ("Could not create resourcemanager client to query project number. Defaulting to ProjectId=%s. error=%v" ,
775
+ projectID , err ))
776
+ return projectID
777
+ }
778
+ defer rmClient .Close ()
779
+ req := resourcemanagerpb.GetProjectRequest {Name : projectID }
780
+ project , err := rmClient .GetProject (ctx , & req )
781
+ if err != nil {
782
+ logger .Log .Warn (fmt .Sprintf ("Could not query resourcemanager to get project number. Defaulting to ProjectId=%s. error=%v" ,
783
+ projectID , err ))
784
+ return projectID
785
+ }
786
+ projectNumberResource = project .GetName ()
787
+ ProjectNumberResourceCache .Store (projectID , projectNumberResource )
788
+ return projectNumberResource .(string )
789
+
790
+ }
791
+
759
792
func StartDatastream (ctx context.Context , streamingCfg StreamingCfg , sourceProfile profiles.SourceProfile , targetProfile profiles.TargetProfile , schemaDetails map [string ]internal.SchemaDetails ) (StreamingCfg , error ) {
760
793
driver := sourceProfile .Driver
761
794
var dbList []profiles.LogicalShard
0 commit comments