diff --git a/.github/workflows/ci-dgraph-integration2-tests.yml b/.github/workflows/ci-dgraph-integration2-tests.yml index fef9c4a59d5..53f1190242a 100644 --- a/.github/workflows/ci-dgraph-integration2-tests.yml +++ b/.github/workflows/ci-dgraph-integration2-tests.yml @@ -24,7 +24,7 @@ jobs: dgraph-integration2-tests: if: github.event.pull_request.draft == false runs-on: ubuntu-latest - timeout-minutes: 30 + timeout-minutes: 90 steps: - uses: actions/checkout@v5 with: @@ -48,7 +48,7 @@ jobs: # move the binary cp dgraph/dgraph ~/go/bin/dgraph # run the tests - go test -v -timeout=30m -failfast -tags=integration2 ./... + go test -v -timeout=90m -failfast -tags=integration2 ./... # clean up docker containers after test execution go clean -testcache # sleep diff --git a/Dockerfile b/Dockerfile index 396d543f903..f82b1c26c6e 100644 --- a/Dockerfile +++ b/Dockerfile @@ -4,11 +4,13 @@ RUN apt-get update && apt-get install -y --no-install-recommends \ bzip2=1.0.8-5+b1 \ git=1:2.39.5-0+deb12u2 \ && rm -rf /var/lib/apt/lists/* +ARG TARGETARCH=amd64 +ARG TARGETOS=linux WORKDIR /go/src/repo COPY go.mod go.sum ./ RUN go mod download && go mod verify COPY . . -RUN CGO_ENABLED=0 make +RUN CGO_ENABLED=0 GOOS=${TARGETOS} GOARCH=${TARGETARCH} make ###################### Stage II ###################### FROM ubuntu:24.04 diff --git a/dgraph/cmd/dgraphimport/import_client.go b/dgraph/cmd/dgraphimport/import_client.go index fc56c96cbce..cb30a973670 100644 --- a/dgraph/cmd/dgraphimport/import_client.go +++ b/dgraph/cmd/dgraphimport/import_client.go @@ -127,23 +127,18 @@ func streamSnapshotForGroup(ctx context.Context, dc api.DgraphClient, pdir strin if err != nil { return fmt.Errorf("failed to start external snapshot stream for group %d: %w", groupId, err) } - defer func() { - if _, err := out.CloseAndRecv(); err != nil { - glog.Errorf("failed to close the stream for group [%v]: %v", groupId, err) - } - - glog.Infof("[import] Group [%v]: Received ACK ", groupId) + _ = out.CloseSend() }() // Open the BadgerDB instance at the specified directory opt := badger.DefaultOptions(pdir) + opt.ReadOnly = true ps, err := badger.OpenManaged(opt) if err != nil { glog.Errorf("failed to open BadgerDB at [%s]: %v", pdir, err) return fmt.Errorf("failed to open BadgerDB at [%v]: %v", pdir, err) } - defer func() { if err := ps.Close(); err != nil { glog.Warningf("[import] Error closing BadgerDB: %v", err) @@ -154,17 +149,19 @@ func streamSnapshotForGroup(ctx context.Context, dc api.DgraphClient, pdir strin glog.Infof("[import] Sending request for streaming external snapshot for group ID [%v]", groupId) groupReq := &api.StreamExtSnapshotRequest{GroupId: groupId} if err := out.Send(groupReq); err != nil { - return fmt.Errorf("failed to send request for streaming external snapshot for group ID [%v] to the server: %w", - groupId, err) + return fmt.Errorf("failed to send request for group ID [%v] to the server: %w", groupId, err) + } + if _, err := out.Recv(); err != nil { + return fmt.Errorf("failed to receive response for group ID [%v] from the server: %w", groupId, err) } + glog.Infof("[import] Group [%v]: Received ACK for sending group request", groupId) + // Configure and start the BadgerDB stream glog.Infof("[import] Starting BadgerDB stream for group [%v]", groupId) - if err := streamBadger(ctx, ps, out, groupId); err != nil { return fmt.Errorf("badger streaming failed for group [%v]: %v", groupId, err) } - return nil } @@ -180,6 +177,11 @@ func streamBadger(ctx context.Context, ps *badger.DB, out api.Dgraph_StreamExtSn if err := out.Send(&api.StreamExtSnapshotRequest{Pkt: p}); err != nil && !errors.Is(err, io.EOF) { return fmt.Errorf("failed to send data chunk: %w", err) } + if _, err := out.Recv(); err != nil { + return fmt.Errorf("failed to receive response for group ID [%v] from the server: %w", groupId, err) + } + glog.Infof("[import] Group [%v]: Received ACK for sending data chunk", groupId) + return nil } @@ -196,5 +198,25 @@ func streamBadger(ctx context.Context, ps *badger.DB, out api.Dgraph_StreamExtSn return fmt.Errorf("failed to send 'done' signal for group [%d]: %w", groupId, err) } + for { + if ctx.Err() != nil { + return ctx.Err() + } + resp, err := out.Recv() + if errors.Is(err, io.EOF) { + return fmt.Errorf("server closed stream before Finish=true for group [%d]", groupId) + } + if err != nil { + return fmt.Errorf("failed to receive final response for group ID [%v] from the server: %w", groupId, err) + } + if resp.Finish { + glog.Infof("[import] Group [%v]: Received final Finish=true", groupId) + break + } + glog.Infof("[import] Group [%v]: Waiting for Finish=true, got interim ACK", groupId) + } + + glog.Infof("[import] Group [%v]: Received ACK for sending completion signal", groupId) + return nil } diff --git a/dgraph/cmd/dgraphimport/import_test.go b/dgraph/cmd/dgraphimport/import_test.go index 6513c95b3f4..6b96f23b498 100644 --- a/dgraph/cmd/dgraphimport/import_test.go +++ b/dgraph/cmd/dgraphimport/import_test.go @@ -1,4 +1,4 @@ -//go:build integration +//go:build integration2 /* * SPDX-FileCopyrightText: © Hypermode Inc. @@ -79,7 +79,8 @@ func TestDrainModeAfterStartSnapshotStream(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - conf := dgraphtest.NewClusterConfig().WithNumAlphas(tt.numAlphas).WithNumZeros(tt.numZeros).WithReplicas(tt.replicas) + conf := dgraphtest.NewClusterConfig().WithNumAlphas(tt.numAlphas). + WithNumZeros(tt.numZeros).WithReplicas(tt.replicas) c, err := dgraphtest.NewLocalCluster(conf) require.NoError(t, err) defer func() { c.Cleanup(t.Failed()) }() @@ -284,7 +285,6 @@ func runImportTest(t *testing.T, tt testcase) { require.ErrorContains(t, err, tt.err) return } - require.NoError(t, Import(context.Background(), connectionString, outDir)) for group, alphas := range alphaGroups { @@ -292,7 +292,7 @@ func runImportTest(t *testing.T, tt testcase) { alphaID := alphas[i] t.Logf("Starting alpha %v from group %v", alphaID, group) require.NoError(t, targetCluster.StartAlpha(alphaID)) - require.NoError(t, waitForAlphaReady(t, targetCluster, alphaID, 30*time.Second)) + require.NoError(t, waitForAlphaReady(t, targetCluster, alphaID, 60*time.Second)) } } @@ -306,7 +306,7 @@ func runImportTest(t *testing.T, tt testcase) { require.NoError(t, err) defer cleanup() - require.NoError(t, validateClientConnection(t, gc, 10*time.Second)) + require.NoError(t, validateClientConnection(t, gc, 30*time.Second)) verifyImportResults(t, gc, tt.downAlphas) } } @@ -347,7 +347,9 @@ func setupBulkCluster(t *testing.T, numAlphas int, encrypted bool) (*dgraphtest. } // setupTargetCluster creates and starts a cluster that will receive the imported data -func setupTargetCluster(t *testing.T, numAlphas, replicasFactor int) (*dgraphtest.LocalCluster, *dgraphapi.GrpcClient, func()) { +func setupTargetCluster(t *testing.T, numAlphas, replicasFactor int) ( + *dgraphtest.LocalCluster, *dgraphapi.GrpcClient, func()) { + conf := dgraphtest.NewClusterConfig(). WithNumAlphas(numAlphas). WithNumZeros(3). @@ -366,7 +368,7 @@ func setupTargetCluster(t *testing.T, numAlphas, replicasFactor int) (*dgraphtes // verifyImportResults validates the result of an import operation with retry logic func verifyImportResults(t *testing.T, gc *dgraphapi.GrpcClient, downAlphas int) { - maxRetries := 1 + maxRetries := 5 if downAlphas > 0 { maxRetries = 10 } @@ -547,7 +549,7 @@ func retryHealthCheck(t *testing.T, cluster *dgraphtest.LocalCluster, timeout ti // validateClientConnection ensures the client connection is working before use func validateClientConnection(t *testing.T, gc *dgraphapi.GrpcClient, timeout time.Duration) error { deadline := time.Now().Add(timeout) - retryDelay := 200 * time.Millisecond + retryDelay := 1 * time.Second for time.Now().Before(deadline) { if _, err := gc.Query("schema{}"); err != nil { diff --git a/dgraph/cmd/dgraphimport/run.go b/dgraph/cmd/dgraphimport/run.go index 37985ee4233..707931570b3 100644 --- a/dgraph/cmd/dgraphimport/run.go +++ b/dgraph/cmd/dgraphimport/run.go @@ -34,6 +34,7 @@ func init() { flag := ImportCmd.Cmd.Flags() flag.StringP("files", "f", "", "Location of *.rdf(.gz) or *.json(.gz) file(s) to load.") + flag.StringP("snapshot-dir", "p", "", "Location of p directory") flag.StringP("schema", "s", "", "Location of DQL schema file.") flag.StringP("graphql_schema", "g", "", "Location of the GraphQL schema file.") flag.StringP("graphql-schema", "", "", "Location of the GraphQL schema file.") @@ -72,6 +73,17 @@ func run() { os.Exit(1) } + // if snapshot p directory is already provided, there is no need to run bulk loader + if ImportCmd.Conf.GetString("snapshot-dir") != "" { + connStr := ImportCmd.Conf.GetString("conn-str") + snapshotDir := ImportCmd.Conf.GetString("snapshot-dir") + if err := Import(context.Background(), connStr, snapshotDir); err != nil { + fmt.Println("Failed to import data:", err) + os.Exit(1) + } + return + } + cacheSize := 64 << 20 // These are the default values. User can overwrite them using --badger. cacheDefaults := fmt.Sprintf("indexcachesize=%d; blockcachesize=%d; ", (70*cacheSize)/100, (30*cacheSize)/100) diff --git a/dgraph/cmd/root.go b/dgraph/cmd/root.go index e70fe49cb50..2e21a74a97a 100644 --- a/dgraph/cmd/root.go +++ b/dgraph/cmd/root.go @@ -22,6 +22,7 @@ import ( "github.com/spf13/viper" "github.com/hypermodeinc/dgraph/v25/acl" + "github.com/hypermodeinc/dgraph/v25/audit" "github.com/hypermodeinc/dgraph/v25/backup" checkupgrade "github.com/hypermodeinc/dgraph/v25/check_upgrade" diff --git a/dgraphtest/load.go b/dgraphtest/load.go index a2bbf0d8df0..deee3221133 100644 --- a/dgraphtest/load.go +++ b/dgraphtest/load.go @@ -503,6 +503,11 @@ func (c *LocalCluster) BulkLoad(opts BulkOpts) error { args = append(args, "-g", strings.Join(opts.GQLSchemaFiles, ",")) } + // dgraphCmdPath := os.Getenv("DGRAPH_CMD_PATH") + // if dgraphCmdPath == "" { + // dgraphCmdPath = filepath.Join(c.tempBinDir, "dgraph") + // } + log.Printf("[INFO] running bulk loader with args: [%v]", strings.Join(args, " ")) binaryName := "dgraph" if os.Getenv("DGRAPH_BINARY") != "" { diff --git a/edgraph/server.go b/edgraph/server.go index 130939a888b..4c8e6458631 100644 --- a/edgraph/server.go +++ b/edgraph/server.go @@ -1828,6 +1828,7 @@ func (s *Server) UpdateExtSnapshotStreamingState(ctx context.Context, groups, err := worker.ProposeDrain(ctx, req) if err != nil { + glog.Errorf("[import] failed to propose drain mode: %v", err) return nil, err } @@ -1838,8 +1839,11 @@ func (s *Server) UpdateExtSnapshotStreamingState(ctx context.Context, func (s *Server) StreamExtSnapshot(stream api.Dgraph_StreamExtSnapshotServer) error { defer x.ExtSnapshotStreamingState(false) - - return worker.InStream(stream) + if err := worker.InStream(stream); err != nil { + glog.Errorf("[import] failed to stream external snapshot: %v", err) + return err + } + return nil } // CommitOrAbort commits or aborts a transaction. diff --git a/go.mod b/go.mod index a3faae80669..31f2c55bfc5 100644 --- a/go.mod +++ b/go.mod @@ -7,9 +7,9 @@ require ( github.com/HdrHistogram/hdrhistogram-go v1.1.2 github.com/IBM/sarama v1.46.1 github.com/Masterminds/semver/v3 v3.4.0 - github.com/blevesearch/bleve/v2 v2.5.3 + github.com/blevesearch/bleve/v2 v2.5.2 github.com/dgraph-io/badger/v4 v4.8.0 - github.com/dgraph-io/dgo/v250 v250.0.0-preview4.0.20251001031539-c5607f0af3d0 + github.com/dgraph-io/dgo/v250 v250.0.0-preview7 github.com/dgraph-io/gqlgen v0.13.2 github.com/dgraph-io/gqlparser/v2 v2.2.2 github.com/dgraph-io/graphql-transport-ws v0.0.0-20210511143556-2cef522f1f15 diff --git a/go.sum b/go.sum index 0daff4a5a8c..12b0724fdcd 100644 --- a/go.sum +++ b/go.sum @@ -77,8 +77,8 @@ github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/bits-and-blooms/bitset v1.22.0 h1:Tquv9S8+SGaS3EhyA+up3FXzmkhxPGjQQCkcs2uw7w4= github.com/bits-and-blooms/bitset v1.22.0/go.mod h1:7hO7Gc7Pp1vODcmWvKMRA9BNmbv6a/7QIWpPxHddWR8= -github.com/blevesearch/bleve/v2 v2.5.3 h1:9l1xtKaETv64SZc1jc4Sy0N804laSa/LeMbYddq1YEM= -github.com/blevesearch/bleve/v2 v2.5.3/go.mod h1:Z/e8aWjiq8HeX+nW8qROSxiE0830yQA071dwR3yoMzw= +github.com/blevesearch/bleve/v2 v2.5.2 h1:Ab0r0MODV2C5A6BEL87GqLBySqp/s9xFgceCju6BQk8= +github.com/blevesearch/bleve/v2 v2.5.2/go.mod h1:5Dj6dUQxZM6aqYT3eutTD/GpWKGFSsV8f7LDidFbwXo= github.com/blevesearch/bleve_index_api v1.2.8 h1:Y98Pu5/MdlkRyLM0qDHostYo7i+Vv1cDNhqTeR4Sy6Y= github.com/blevesearch/bleve_index_api v1.2.8/go.mod h1:rKQDl4u51uwafZxFrPD1R7xFOwKnzZW7s/LSeK4lgo0= github.com/blevesearch/geo v0.2.4 h1:ECIGQhw+QALCZaDcogRTNSJYQXRtC8/m8IKiA706cqk= @@ -132,8 +132,8 @@ github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1 github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dgraph-io/badger/v4 v4.8.0 h1:JYph1ChBijCw8SLeybvPINizbDKWZ5n/GYbz2yhN/bs= github.com/dgraph-io/badger/v4 v4.8.0/go.mod h1:U6on6e8k/RTbUWxqKR0MvugJuVmkxSNc79ap4917h4w= -github.com/dgraph-io/dgo/v250 v250.0.0-preview4.0.20251001031539-c5607f0af3d0 h1:cbGtNKHWe34SYYPtFe/klD+cvJx5LI844Iz/akWvofo= -github.com/dgraph-io/dgo/v250 v250.0.0-preview4.0.20251001031539-c5607f0af3d0/go.mod h1:H3PcQuhmfzSC/1I7FLJYOxntpk3UG6lmZAyv0QxRm+o= +github.com/dgraph-io/dgo/v250 v250.0.0-preview7 h1:cEranHYlUFwacvtksjwXU8qB1NHIhrvPE1gdGS6r+lU= +github.com/dgraph-io/dgo/v250 v250.0.0-preview7/go.mod h1:OVSaapUnuqaY4beLe98CajukINwbVm0JRNp0SRBCz/w= github.com/dgraph-io/gqlgen v0.13.2 h1:TNhndk+eHKj5qE7BenKKSYdSIdOGhLqxR1rCiMso9KM= github.com/dgraph-io/gqlgen v0.13.2/go.mod h1:iCOrOv9lngN7KAo+jMgvUPVDlYHdf7qDwsTkQby2Sis= github.com/dgraph-io/gqlparser/v2 v2.1.1/go.mod h1:MYS4jppjyx8b9tuUtjV7jU1UFZK6P9fvO8TsIsQtRKU= diff --git a/protos/pb.proto b/protos/pb.proto index 75cb4996a7a..f5098c0e69e 100644 --- a/protos/pb.proto +++ b/protos/pb.proto @@ -599,7 +599,7 @@ service Worker { rpc DeleteNamespace(DeleteNsRequest) returns (Status) {} rpc TaskStatus(TaskStatusRequest) returns (TaskStatusResponse) {} rpc UpdateExtSnapshotStreamingState(api.UpdateExtSnapshotStreamingStateRequest) returns (Status) {} - rpc StreamExtSnapshot(stream api.StreamExtSnapshotRequest) returns (api.StreamExtSnapshotResponse) {} + rpc StreamExtSnapshot(stream api.StreamExtSnapshotRequest) returns (stream api.StreamExtSnapshotResponse) {} } message TabletResponse { diff --git a/protos/pb/pb.pb.go b/protos/pb/pb.pb.go index aafb38698de..1cb0709972c 100644 --- a/protos/pb/pb.pb.go +++ b/protos/pb/pb.pb.go @@ -6944,7 +6944,7 @@ var file_pb_proto_rawDesc = []byte{ 0x61, 0x74, 0x75, 0x73, 0x22, 0x00, 0x12, 0x31, 0x0a, 0x0a, 0x4d, 0x6f, 0x76, 0x65, 0x54, 0x61, 0x62, 0x6c, 0x65, 0x74, 0x12, 0x15, 0x2e, 0x70, 0x62, 0x2e, 0x4d, 0x6f, 0x76, 0x65, 0x54, 0x61, 0x62, 0x6c, 0x65, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x0a, 0x2e, 0x70, 0x62, - 0x2e, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x22, 0x00, 0x32, 0xa4, 0x07, 0x0a, 0x06, 0x57, 0x6f, + 0x2e, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x22, 0x00, 0x32, 0xa6, 0x07, 0x0a, 0x06, 0x57, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x12, 0x2a, 0x0a, 0x06, 0x4d, 0x75, 0x74, 0x61, 0x74, 0x65, 0x12, 0x0d, 0x2e, 0x70, 0x62, 0x2e, 0x4d, 0x75, 0x74, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x1a, 0x0f, 0x2e, 0x61, 0x70, 0x69, 0x2e, 0x54, 0x78, 0x6e, 0x43, 0x6f, 0x6e, 0x74, 0x65, 0x78, 0x74, 0x22, 0x00, @@ -6997,13 +6997,14 @@ var file_pb_proto_rawDesc = []byte{ 0x61, 0x70, 0x69, 0x2e, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x45, 0x78, 0x74, 0x53, 0x6e, 0x61, 0x70, 0x73, 0x68, 0x6f, 0x74, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x69, 0x6e, 0x67, 0x53, 0x74, 0x61, 0x74, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x0a, 0x2e, 0x70, 0x62, 0x2e, - 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x22, 0x00, 0x12, 0x56, 0x0a, 0x11, 0x53, 0x74, 0x72, 0x65, + 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x22, 0x00, 0x12, 0x58, 0x0a, 0x11, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x45, 0x78, 0x74, 0x53, 0x6e, 0x61, 0x70, 0x73, 0x68, 0x6f, 0x74, 0x12, 0x1d, 0x2e, 0x61, 0x70, 0x69, 0x2e, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x45, 0x78, 0x74, 0x53, 0x6e, 0x61, 0x70, 0x73, 0x68, 0x6f, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1e, 0x2e, 0x61, 0x70, 0x69, 0x2e, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x45, 0x78, 0x74, 0x53, 0x6e, 0x61, 0x70, 0x73, 0x68, 0x6f, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x28, 0x01, - 0x42, 0x06, 0x5a, 0x04, 0x2e, 0x2f, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x30, 0x01, 0x42, 0x06, 0x5a, 0x04, 0x2e, 0x2f, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x33, } var ( diff --git a/protos/pb/pb_grpc.pb.go b/protos/pb/pb_grpc.pb.go index af9438a532e..52bdc19dc35 100644 --- a/protos/pb/pb_grpc.pb.go +++ b/protos/pb/pb_grpc.pb.go @@ -1149,7 +1149,7 @@ func (c *workerClient) StreamExtSnapshot(ctx context.Context, opts ...grpc.CallO type Worker_StreamExtSnapshotClient interface { Send(*api.StreamExtSnapshotRequest) error - CloseAndRecv() (*api.StreamExtSnapshotResponse, error) + Recv() (*api.StreamExtSnapshotResponse, error) grpc.ClientStream } @@ -1161,10 +1161,7 @@ func (x *workerStreamExtSnapshotClient) Send(m *api.StreamExtSnapshotRequest) er return x.ClientStream.SendMsg(m) } -func (x *workerStreamExtSnapshotClient) CloseAndRecv() (*api.StreamExtSnapshotResponse, error) { - if err := x.ClientStream.CloseSend(); err != nil { - return nil, err - } +func (x *workerStreamExtSnapshotClient) Recv() (*api.StreamExtSnapshotResponse, error) { m := new(api.StreamExtSnapshotResponse) if err := x.ClientStream.RecvMsg(m); err != nil { return nil, err @@ -1555,7 +1552,7 @@ func _Worker_StreamExtSnapshot_Handler(srv interface{}, stream grpc.ServerStream } type Worker_StreamExtSnapshotServer interface { - SendAndClose(*api.StreamExtSnapshotResponse) error + Send(*api.StreamExtSnapshotResponse) error Recv() (*api.StreamExtSnapshotRequest, error) grpc.ServerStream } @@ -1564,7 +1561,7 @@ type workerStreamExtSnapshotServer struct { grpc.ServerStream } -func (x *workerStreamExtSnapshotServer) SendAndClose(m *api.StreamExtSnapshotResponse) error { +func (x *workerStreamExtSnapshotServer) Send(m *api.StreamExtSnapshotResponse) error { return x.ServerStream.SendMsg(m) } @@ -1652,6 +1649,7 @@ var Worker_ServiceDesc = grpc.ServiceDesc{ { StreamName: "StreamExtSnapshot", Handler: _Worker_StreamExtSnapshot_Handler, + ServerStreams: true, ClientStreams: true, }, }, diff --git a/worker/import.go b/worker/import.go index 73d915df8b7..f6498836400 100644 --- a/worker/import.go +++ b/worker/import.go @@ -25,25 +25,48 @@ import ( ) type pubSub struct { - subscribers []chan *api.StreamExtSnapshotRequest sync.RWMutex + subscribers []chan *api.StreamExtSnapshotRequest } // Subscribe returns a new channel to receive published messages -func (ps *pubSub) subscribe() <-chan *api.StreamExtSnapshotRequest { +func (ps *pubSub) subscribe() chan *api.StreamExtSnapshotRequest { ch := make(chan *api.StreamExtSnapshotRequest, 20) ps.Lock() - defer ps.Unlock() ps.subscribers = append(ps.subscribers, ch) + ps.Unlock() return ch } +// publish sends to all subscribers without blocking on any single subscriber. +// If a subscriber is not draining, skip it (it will be marked failed by its goroutine). func (ps *pubSub) publish(msg *api.StreamExtSnapshotRequest) { - // Send message to all subscribers without dropping any ps.RLock() - defer ps.RUnlock() - for _, ch := range ps.subscribers { - ch <- msg + // copy to avoid holding lock while sending + subs := make([]chan *api.StreamExtSnapshotRequest, len(ps.subscribers)) + copy(subs, ps.subscribers) + ps.RUnlock() + + for _, ch := range subs { + select { + case ch <- msg: + default: + // subscriber is stuck; skip to avoid blocking publisher + } + } +} + +// Unsubscribe removes a subscriber and closes its channel. +func (ps *pubSub) unsubscribe(ch chan *api.StreamExtSnapshotRequest) { + ps.Lock() + defer ps.Unlock() + for i, c := range ps.subscribers { + if c == ch { + // remove from slice + ps.subscribers = append(ps.subscribers[:i], ps.subscribers[i+1:]...) + close(c) + return + } } } @@ -60,31 +83,47 @@ func (ps *pubSub) handlePublisher(ctx context.Context, stream api.Dgraph_StreamE for { select { case <-ctx.Done(): - glog.Info("[import] Context cancelled, stopping receive goroutine.") + glog.Infof("[import] Context cancelled, stopping receive goroutine: %v", ctx.Err()) return ctx.Err() + default: msg, err := stream.Recv() - if err != nil { - if !errors.Is(err, io.EOF) { - glog.Errorf("[import] Error receiving from in stream: %v", err) - return err - } + if err != nil && !errors.Is(err, io.EOF) { + glog.Errorf("[import] error receiving from in stream: %v", err) + return err + } else if err == io.EOF { return nil } + if msg == nil { + continue + } ps.publish(msg) + + if msg.Pkt != nil && msg.Pkt.Done { + glog.Infof("[import] Received Done signal, breaking the loop") + return nil + } + if err := stream.Send(&api.StreamExtSnapshotResponse{Finish: false}); err != nil { + return err + } } } - } func (ps *pubSub) runForwardSubscriber(ctx context.Context, out api.Dgraph_StreamExtSnapshotClient, peerId string) error { + defer func() { + glog.Infof("[import] forward subscriber stopped for peer [%v]", peerId) + }() + buffer := ps.subscribe() - size := 0 + defer ps.unsubscribe(buffer) // ensure publisher won't block on us if we exit + Loop: for { select { case <-ctx.Done(): - return fmt.Errorf("context deadline exceeded") + glog.Infof("[import] Context cancelled, stopping receive goroutine: %v", ctx.Err()) + return ctx.Err() default: msg, ok := <-buffer @@ -92,42 +131,69 @@ Loop: break Loop } - data := &api.StreamExtSnapshotRequest{Pkt: &api.StreamPacket{Data: msg.Pkt.Data}} - if msg.Pkt.Done { glog.Infof("[import] received done signal from [%v]", peerId) d := api.StreamPacket{Done: true} if err := out.Send(&api.StreamExtSnapshotRequest{Pkt: &d}); err != nil { return err } - break Loop + + _ = out.CloseSend() + + for { + if ctx.Err() != nil { + return ctx.Err() + } + r, err := out.Recv() + if errors.Is(err, io.EOF) { + return fmt.Errorf("server closed stream before Finish=true for peer [%v]", peerId) + } + if err != nil { + return fmt.Errorf("failed to receive final response from peer [%v]: %w", peerId, err) + } + if r.Finish { + glog.Infof("[import] peer [%v]: Received final Finish=true", peerId) + break Loop + } + glog.Infof("[import] peer [%v]: Waiting for Finish=true, got interim ACK", peerId) + } } + data := &api.StreamExtSnapshotRequest{Pkt: &api.StreamPacket{Data: msg.Pkt.Data}} if err := out.Send(data); err != nil { return err } - size += len(msg.Pkt.Data) + if _, err := out.Recv(); err != nil { + return fmt.Errorf("failed to receive response from peer [%v]: %w", peerId, err) + } } } return nil } -func (ps *pubSub) runLocalSubscriber(ctx context.Context) error { +func (ps *pubSub) runLocalSubscriber(ctx context.Context, stream pb.Worker_StreamExtSnapshotServer) error { + defer func() { + glog.Infof("[import] local subscriber stopped") + }() + buffer := ps.subscribe() - size := 0 + defer ps.unsubscribe(buffer) // ensure publisher won't block on us if we exit glog.Infof("[import:flush] flushing external snapshot in badger db") + sw := pstore.NewStreamWriter() defer sw.Cancel() if err := sw.Prepare(); err != nil { return err } + Loop: for { select { case <-ctx.Done(): - return fmt.Errorf("context deadline exceeded") + glog.Infof("[import] Context cancelled, stopping receive goroutine: %v", ctx.Err()) + return ctx.Err() default: msg, ok := <-buffer @@ -135,11 +201,13 @@ Loop: break Loop } kvs := msg.GetPkt() - if kvs != nil && kvs.Done { + if kvs == nil { + continue + } + if kvs.Done { break } - size += len(kvs.Data) buf := z.NewBufferSlice(kvs.Data) if err := sw.Write(buf); err != nil { return err @@ -150,14 +218,18 @@ Loop: if err := sw.Flush(); err != nil { return err } + glog.Infof("[import:flush] successfully flushed data in badger db") - return postStreamProcessing(ctx) + if err := postStreamProcessing(ctx); err != nil { + return err + } + return nil } func ProposeDrain(ctx context.Context, drainMode *api.UpdateExtSnapshotStreamingStateRequest) ([]uint32, error) { - memState := GetMembershipState() + members := GetMembershipState() currentGroups := make([]uint32, 0) - for gid := range memState.GetGroups() { + for gid := range members.GetGroups() { currentGroups = append(currentGroups, gid) } @@ -176,8 +248,8 @@ func ProposeDrain(ctx context.Context, drainMode *api.UpdateExtSnapshotStreaming glog.Errorf("[import:apply-drainmode] unable to connect to the leader of group [%v]", gid) return nil, fmt.Errorf("unable to connect to the leader of group [%v] : %v", gid, conn.ErrNoConnection) } - con := pl.Get() - c := pb.NewWorkerClient(con) + + c := pb.NewWorkerClient(pl.Get()) glog.Infof("[import:apply-drainmode] Successfully connected to leader of group [%v]", gid) if _, err := c.UpdateExtSnapshotStreamingState(ctx, drainMode); err != nil { @@ -201,16 +273,20 @@ func InStream(stream api.Dgraph_StreamExtSnapshotServer) error { return fmt.Errorf("failed to receive initial stream message: %v", err) } + if err := stream.Send(&api.StreamExtSnapshotResponse{Finish: false}); err != nil { + return fmt.Errorf("failed to send initial response: %v", err) + } + groupId := req.GroupId if groupId == groups().Node.gid { - glog.Infof("[import] Streaming external snapshot to current Group [%v]", groupId) + glog.Infof("[import] streaming external snapshot to current group [%v]", groupId) return streamInGroup(stream, true) } - glog.Infof("[import] Streaming external snapshot to other Group [%v]", groupId) + glog.Infof("[import] streaming external snapshot to other group [%v]", groupId) pl := groups().Leader(groupId) if pl == nil { - glog.Errorf("[import] Unable to connect to the leader of group [%v]", groupId) + glog.Errorf("[import] unable to connect to the leader of group [%v]", groupId) return fmt.Errorf("unable to connect to the leader of group [%v] : %v", groupId, conn.ErrNoConnection) } @@ -218,63 +294,89 @@ func InStream(stream api.Dgraph_StreamExtSnapshotServer) error { c := pb.NewWorkerClient(con) alphaStream, err := c.StreamExtSnapshot(stream.Context()) if err != nil { + glog.Errorf("[import] failed to establish stream with leader: %v", err) return fmt.Errorf("failed to establish stream with leader: %v", err) } + glog.Infof("[import] [forward %d -> %d] start", groups().Node.gid, groupId) + glog.Infof("[import] [forward %d -> %d] start", groups().Node.MyAddr, groups().Leader(groupId).Addr) glog.Infof("[import] sending forward true to leader of group [%v]", groupId) forwardReq := &api.StreamExtSnapshotRequest{Forward: true} if err := alphaStream.Send(forwardReq); err != nil { - return fmt.Errorf("failed streamInGroupto send forward request: %w", err) + glog.Errorf("[import] failed to send forward request: %v", err) + return fmt.Errorf("failed to send forward request: %v", err) } return pipeTwoStream(stream, alphaStream, groupId) } -func pipeTwoStream(in api.Dgraph_StreamExtSnapshotServer, out pb.Worker_StreamExtSnapshotClient, - groupId uint32) error { - +func pipeTwoStream(in api.Dgraph_StreamExtSnapshotServer, out pb.Worker_StreamExtSnapshotClient, groupId uint32) error { currentGroup := groups().Node.gid - glog.Infof("[import] [forward from group-%v to group-%v] forwarding stream", currentGroup, groupId) + ctx := in.Context() + if err := out.Send(&api.StreamExtSnapshotRequest{GroupId: groupId}); err != nil { + return fmt.Errorf("send groupId downstream(%d): %w", groupId, err) + } + if _, err := out.Recv(); err != nil { + return fmt.Errorf("ack groupId downstream(%d): %w", groupId, err) + } - defer func() { - if err := in.SendAndClose(&api.StreamExtSnapshotResponse{}); err != nil { - glog.Errorf("[import] [forward from group %v to group %v] failed to send close on in"+ - " stream for group [%v]: %v", currentGroup, groupId, groupId, err) + for { + if err := ctx.Err(); err != nil { + return err } - }() - defer func() { - // Wait for ACK from the out stream - _, err := out.CloseAndRecv() + req, err := in.Recv() + if errors.Is(err, io.EOF) { + return nil + } if err != nil { - glog.Errorf("[import] [forward from group %v to group %v] failed to receive ACK from group [%v]: %v", - currentGroup, groupId, groupId, err) + return fmt.Errorf("recv upstream(%d): %w", currentGroup, err) + } + if req.Pkt == nil { + return fmt.Errorf("unexpected empty request") } - }() - ps := &pubSub{} - eg, egCtx := errgroup.WithContext(in.Context()) + if req.Pkt.Done { + // Forward Done, half-close downstream send. + if err := out.Send(&api.StreamExtSnapshotRequest{Pkt: req.Pkt}); err != nil && !errors.Is(err, io.EOF) { + return fmt.Errorf("send done downstream(%d): %w", groupId, err) + } + _ = out.CloseSend() - eg.Go(func() error { - if err := ps.runForwardSubscriber(egCtx, out, fmt.Sprintf("%d", groupId)); err != nil { - return err + // Drain downstream and relay upstream until Finish=true. + for { + if err := ctx.Err(); err != nil { + return err + } + resp, err := out.Recv() + if errors.Is(err, io.EOF) { + return fmt.Errorf("downstream(%d) closed before Finish=true", groupId) + } + if err != nil { + return fmt.Errorf("recv final downstream(%d): %w", groupId, err) + } + if err := in.Send(resp); err != nil { + return fmt.Errorf("relay final upstream: %w", err) + } + if resp.Finish { + glog.Infof("[import] [forward %d -> %d] finish", currentGroup, groupId) + return nil + } + } } - return nil - }) - eg.Go(func() error { - if err := ps.handlePublisher(egCtx, in); err != nil { - return err + // Normal data chunk: send -> wait ack -> send upstream ack. + if err := out.Send(&api.StreamExtSnapshotRequest{Pkt: req.Pkt}); err != nil { + return fmt.Errorf("send data downstream(%d): %w", groupId, err) + } + if _, err := out.Recv(); err != nil { + return fmt.Errorf("ack data downstream(%d): %w", groupId, err) + } + if err := in.Send(&api.StreamExtSnapshotResponse{}); err != nil { + return fmt.Errorf("send ack upstream: %w", err) } - return nil - }) - if err := eg.Wait(); err != nil { - return err } - - glog.Infof("[import] [forward from group %v to group %v] Received ACK from group [%v]", currentGroup, groupId, groupId) - return nil } func (w *grpcWorker) UpdateExtSnapshotStreamingState(ctx context.Context, @@ -287,7 +389,7 @@ func (w *grpcWorker) UpdateExtSnapshotStreamingState(ctx context.Context, return nil, errors.New("UpdateExtSnapshotStreamingStateRequest cannot have both Start and Finish set to true") } - glog.Infof("[import] Applying import mode proposal: %v", req) + glog.Infof("[import] Applying import mode proposal: %+v", req) err := groups().Node.proposeAndWait(ctx, &pb.Proposal{ExtSnapshotState: req}) return &pb.Status{}, err @@ -298,8 +400,9 @@ func (w *grpcWorker) UpdateExtSnapshotStreamingState(ctx context.Context, // If the node is the leader (Forward is true), it streams the data to its followers. // Otherwise, it simply writes the data to BadgerDB and flushes it. func (w *grpcWorker) StreamExtSnapshot(stream pb.Worker_StreamExtSnapshotServer) error { - glog.Info("[import] updating import mode to false") + glog.Info("[import] trying to update the import mode to false") defer x.ExtSnapshotStreamingState(false) + // Receive the first message to check the Forward flag. // If Forward is true, this node is the leader and should forward the stream to its followers. // If Forward is false, the node just writes and flushes the data. @@ -308,11 +411,8 @@ func (w *grpcWorker) StreamExtSnapshot(stream pb.Worker_StreamExtSnapshotServer) return err } - if err := streamInGroup(stream, forwardReq.Forward); err != nil { - return err - } - - return nil + glog.Infof("[import] received forward flag: %v", forwardReq.Forward) + return streamInGroup(stream, forwardReq.Forward) } // postStreamProcessing handles the post-stream processing of data received from the buffer into the local BadgerDB. @@ -334,7 +434,6 @@ func postStreamProcessing(ctx context.Context) error { groups().applyInitialTypes() ResetGQLSchemaStore() glog.Info("[import:flush] post stream processing done") - return nil } @@ -363,42 +462,46 @@ func postStreamProcessing(ctx context.Context) error { // - error: If there's an issue receiving data or if majority consensus isn't achieved (for leader) func streamInGroup(stream api.Dgraph_StreamExtSnapshotServer, forward bool) error { node := groups().Node - glog.Infof("[import] got stream,forwarding in group [%v]", forward) + glog.Infof("[import] got stream, forwarding in group [%v]", forward) - ps := &pubSub{} - eg, errGCtx := errgroup.WithContext(stream.Context()) // We created this to check the majority successfulNodes := make(map[string]bool) + ps := &pubSub{} + eg, errGCtx := errgroup.WithContext(stream.Context()) for _, member := range groups().state.Groups[node.gid].Members { if member.Addr == node.MyAddr { eg.Go(func() error { - if err := ps.runLocalSubscriber(errGCtx); err != nil { + if err := ps.runLocalSubscriber(errGCtx, stream); err != nil { glog.Errorf("[import:flush] failed to run local subscriber: %v", err) updateNodeStatus(&ps.RWMutex, successfulNodes, member.Addr, false) return err } + updateNodeStatus(&ps.RWMutex, successfulNodes, member.Addr, true) return nil }) continue } - // We are not going to return any error from here because we care about the majority of nodes. - // If the majority of nodes are able to receive the data, the remaining ones can catch up later. if forward { + // We are not going to return any error from here because we care about the majority of nodes. + // If the majority of nodes are able to receive the data, the remaining ones can catch up later. + glog.Infof("[import] Streaming external snapshot to [%v] from [%v] forward [%v]", member.Addr, node.MyAddr) eg.Go(func() error { glog.Infof(`[import:forward] streaming external snapshot to [%v] from [%v]`, member.Addr, node.MyAddr) if member.AmDead { glog.Infof(`[import:forward] [%v] is dead, skipping`, member.Addr) return nil } + pl, err := conn.GetPools().Get(member.Addr) if err != nil { updateNodeStatus(&ps.RWMutex, successfulNodes, member.Addr, false) glog.Errorf("connection error to [%v]: %v", member.Addr, err) return nil } + c := pb.NewWorkerClient(pl.Get()) peerStream, err := c.StreamExtSnapshot(errGCtx) if err != nil { @@ -407,9 +510,8 @@ func streamInGroup(stream api.Dgraph_StreamExtSnapshotServer, forward bool) erro return nil } defer func() { - _, err = peerStream.CloseAndRecv() - if err != nil { - glog.Errorf("[import:forward] failed to receive ACK from [%v]: %v", member.Addr, err) + if err := peerStream.CloseSend(); err != nil { + glog.Errorf("[import:forward] failed to close stream with peer [%v]: %v", member.Addr, err) } }() @@ -436,28 +538,32 @@ func streamInGroup(stream api.Dgraph_StreamExtSnapshotServer, forward bool) erro eg.Go(func() error { defer ps.close() defer func() { - if err := stream.SendAndClose(&api.StreamExtSnapshotResponse{}); err != nil { + if err := stream.Send(&api.StreamExtSnapshotResponse{}); err != nil { glog.Errorf("[import] failed to send close on in: %v", err) } }() if err := ps.handlePublisher(errGCtx, stream); err != nil { - return err + return fmt.Errorf("failed to run publisher: %v", err) } - return nil }) if err := eg.Wait(); err != nil { - return err + return fmt.Errorf("failed to run in group streaming: %v", err) + } + // Sends a StreamExtSnapshotResponse with the Finish flag set to true to indicate + // the completion of the streaming process. If an error occurs during the send + // operation, it is returned for further handling. + if err := stream.Send(&api.StreamExtSnapshotResponse{Finish: true}); err != nil { + glog.Errorf("failed to send done signal: %v", err) } // If this node is the leader and fails to reach a majority of nodes, we return an error. // This ensures that the data is reliably received by enough nodes before proceeding. if forward && !checkMajority(successfulNodes) { glog.Error("[import] Majority of nodes failed to receive data.") - return errors.New("failed to send data to majority of the nodes") + return fmt.Errorf("failed to send data to majority of the nodes") } - return nil }