Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/ci-dgraph-integration2-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ jobs:
dgraph-integration2-tests:
if: github.event.pull_request.draft == false
runs-on: ubuntu-latest
timeout-minutes: 30
timeout-minutes: 90
steps:
- uses: actions/checkout@v5
with:
Expand All @@ -48,7 +48,7 @@ jobs:
# move the binary
cp dgraph/dgraph ~/go/bin/dgraph
# run the tests
go test -v -timeout=30m -failfast -tags=integration2 ./...
go test -v -timeout=90m -failfast -tags=integration2 ./...
# clean up docker containers after test execution
go clean -testcache
# sleep
Expand Down
4 changes: 3 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@ RUN apt-get update && apt-get install -y --no-install-recommends \
bzip2=1.0.8-5+b1 \
git=1:2.39.5-0+deb12u2 \
&& rm -rf /var/lib/apt/lists/*
ARG TARGETARCH=amd64
ARG TARGETOS=linux
WORKDIR /go/src/repo
COPY go.mod go.sum ./
RUN go mod download && go mod verify
COPY . .
RUN CGO_ENABLED=0 make
RUN CGO_ENABLED=0 GOOS=${TARGETOS} GOARCH=${TARGETARCH} make

###################### Stage II ######################
FROM ubuntu:24.04
Expand Down
44 changes: 33 additions & 11 deletions dgraph/cmd/dgraphimport/import_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,23 +127,18 @@ func streamSnapshotForGroup(ctx context.Context, dc api.DgraphClient, pdir strin
if err != nil {
return fmt.Errorf("failed to start external snapshot stream for group %d: %w", groupId, err)
}

defer func() {
if _, err := out.CloseAndRecv(); err != nil {
glog.Errorf("failed to close the stream for group [%v]: %v", groupId, err)
}

glog.Infof("[import] Group [%v]: Received ACK ", groupId)
_ = out.CloseSend()
}()

// Open the BadgerDB instance at the specified directory
opt := badger.DefaultOptions(pdir)
opt.ReadOnly = true
ps, err := badger.OpenManaged(opt)
if err != nil {
glog.Errorf("failed to open BadgerDB at [%s]: %v", pdir, err)
return fmt.Errorf("failed to open BadgerDB at [%v]: %v", pdir, err)
}

defer func() {
if err := ps.Close(); err != nil {
glog.Warningf("[import] Error closing BadgerDB: %v", err)
Expand All @@ -154,17 +149,19 @@ func streamSnapshotForGroup(ctx context.Context, dc api.DgraphClient, pdir strin
glog.Infof("[import] Sending request for streaming external snapshot for group ID [%v]", groupId)
groupReq := &api.StreamExtSnapshotRequest{GroupId: groupId}
if err := out.Send(groupReq); err != nil {
return fmt.Errorf("failed to send request for streaming external snapshot for group ID [%v] to the server: %w",
groupId, err)
return fmt.Errorf("failed to send request for group ID [%v] to the server: %w", groupId, err)
}
if _, err := out.Recv(); err != nil {
return fmt.Errorf("failed to receive response for group ID [%v] from the server: %w", groupId, err)
}

glog.Infof("[import] Group [%v]: Received ACK for sending group request", groupId)

// Configure and start the BadgerDB stream
glog.Infof("[import] Starting BadgerDB stream for group [%v]", groupId)

if err := streamBadger(ctx, ps, out, groupId); err != nil {
return fmt.Errorf("badger streaming failed for group [%v]: %v", groupId, err)
}

return nil
}

Expand All @@ -180,6 +177,11 @@ func streamBadger(ctx context.Context, ps *badger.DB, out api.Dgraph_StreamExtSn
if err := out.Send(&api.StreamExtSnapshotRequest{Pkt: p}); err != nil && !errors.Is(err, io.EOF) {
return fmt.Errorf("failed to send data chunk: %w", err)
}
if _, err := out.Recv(); err != nil {
return fmt.Errorf("failed to receive response for group ID [%v] from the server: %w", groupId, err)
}
glog.Infof("[import] Group [%v]: Received ACK for sending data chunk", groupId)

return nil
}

Expand All @@ -196,5 +198,25 @@ func streamBadger(ctx context.Context, ps *badger.DB, out api.Dgraph_StreamExtSn
return fmt.Errorf("failed to send 'done' signal for group [%d]: %w", groupId, err)
}

for {
if ctx.Err() != nil {
return ctx.Err()
}
resp, err := out.Recv()
if errors.Is(err, io.EOF) {
return fmt.Errorf("server closed stream before Finish=true for group [%d]", groupId)
}
if err != nil {
return fmt.Errorf("failed to receive final response for group ID [%v] from the server: %w", groupId, err)
}
if resp.Finish {
glog.Infof("[import] Group [%v]: Received final Finish=true", groupId)
break
}
glog.Infof("[import] Group [%v]: Waiting for Finish=true, got interim ACK", groupId)
}

glog.Infof("[import] Group [%v]: Received ACK for sending completion signal", groupId)

return nil
}
18 changes: 10 additions & 8 deletions dgraph/cmd/dgraphimport/import_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
//go:build integration
//go:build integration2

/*
* SPDX-FileCopyrightText: © Hypermode Inc. <[email protected]>
Expand Down Expand Up @@ -79,7 +79,8 @@ func TestDrainModeAfterStartSnapshotStream(t *testing.T) {

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
conf := dgraphtest.NewClusterConfig().WithNumAlphas(tt.numAlphas).WithNumZeros(tt.numZeros).WithReplicas(tt.replicas)
conf := dgraphtest.NewClusterConfig().WithNumAlphas(tt.numAlphas).
WithNumZeros(tt.numZeros).WithReplicas(tt.replicas)
c, err := dgraphtest.NewLocalCluster(conf)
require.NoError(t, err)
defer func() { c.Cleanup(t.Failed()) }()
Expand Down Expand Up @@ -284,15 +285,14 @@ func runImportTest(t *testing.T, tt testcase) {
require.ErrorContains(t, err, tt.err)
return
}

require.NoError(t, Import(context.Background(), connectionString, outDir))

for group, alphas := range alphaGroups {
for i := 0; i < tt.downAlphas; i++ {
alphaID := alphas[i]
t.Logf("Starting alpha %v from group %v", alphaID, group)
require.NoError(t, targetCluster.StartAlpha(alphaID))
require.NoError(t, waitForAlphaReady(t, targetCluster, alphaID, 30*time.Second))
require.NoError(t, waitForAlphaReady(t, targetCluster, alphaID, 60*time.Second))
}
}

Expand All @@ -306,7 +306,7 @@ func runImportTest(t *testing.T, tt testcase) {
require.NoError(t, err)
defer cleanup()

require.NoError(t, validateClientConnection(t, gc, 10*time.Second))
require.NoError(t, validateClientConnection(t, gc, 30*time.Second))
verifyImportResults(t, gc, tt.downAlphas)
}
}
Expand Down Expand Up @@ -347,7 +347,9 @@ func setupBulkCluster(t *testing.T, numAlphas int, encrypted bool) (*dgraphtest.
}

// setupTargetCluster creates and starts a cluster that will receive the imported data
func setupTargetCluster(t *testing.T, numAlphas, replicasFactor int) (*dgraphtest.LocalCluster, *dgraphapi.GrpcClient, func()) {
func setupTargetCluster(t *testing.T, numAlphas, replicasFactor int) (
*dgraphtest.LocalCluster, *dgraphapi.GrpcClient, func()) {

conf := dgraphtest.NewClusterConfig().
WithNumAlphas(numAlphas).
WithNumZeros(3).
Expand All @@ -366,7 +368,7 @@ func setupTargetCluster(t *testing.T, numAlphas, replicasFactor int) (*dgraphtes

// verifyImportResults validates the result of an import operation with retry logic
func verifyImportResults(t *testing.T, gc *dgraphapi.GrpcClient, downAlphas int) {
maxRetries := 1
maxRetries := 5
if downAlphas > 0 {
maxRetries = 10
}
Expand Down Expand Up @@ -547,7 +549,7 @@ func retryHealthCheck(t *testing.T, cluster *dgraphtest.LocalCluster, timeout ti
// validateClientConnection ensures the client connection is working before use
func validateClientConnection(t *testing.T, gc *dgraphapi.GrpcClient, timeout time.Duration) error {
deadline := time.Now().Add(timeout)
retryDelay := 200 * time.Millisecond
retryDelay := 1 * time.Second

for time.Now().Before(deadline) {
if _, err := gc.Query("schema{}"); err != nil {
Expand Down
12 changes: 12 additions & 0 deletions dgraph/cmd/dgraphimport/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ func init() {

flag := ImportCmd.Cmd.Flags()
flag.StringP("files", "f", "", "Location of *.rdf(.gz) or *.json(.gz) file(s) to load.")
flag.StringP("snapshot-dir", "p", "", "Location of p directory")
flag.StringP("schema", "s", "", "Location of DQL schema file.")
flag.StringP("graphql_schema", "g", "", "Location of the GraphQL schema file.")
flag.StringP("graphql-schema", "", "", "Location of the GraphQL schema file.")
Expand Down Expand Up @@ -72,6 +73,17 @@ func run() {
os.Exit(1)
}

// if snapshot p directory is already provided, there is no need to run bulk loader
if ImportCmd.Conf.GetString("snapshot-dir") != "" {
connStr := ImportCmd.Conf.GetString("conn-str")
snapshotDir := ImportCmd.Conf.GetString("snapshot-dir")
if err := Import(context.Background(), connStr, snapshotDir); err != nil {
fmt.Println("Failed to import data:", err)
os.Exit(1)
}
return
}

cacheSize := 64 << 20 // These are the default values. User can overwrite them using --badger.
cacheDefaults := fmt.Sprintf("indexcachesize=%d; blockcachesize=%d; ",
(70*cacheSize)/100, (30*cacheSize)/100)
Expand Down
1 change: 1 addition & 0 deletions dgraph/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/spf13/viper"

"github.com/hypermodeinc/dgraph/v25/acl"

"github.com/hypermodeinc/dgraph/v25/audit"
"github.com/hypermodeinc/dgraph/v25/backup"
checkupgrade "github.com/hypermodeinc/dgraph/v25/check_upgrade"
Expand Down
5 changes: 5 additions & 0 deletions dgraphtest/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -503,6 +503,11 @@ func (c *LocalCluster) BulkLoad(opts BulkOpts) error {
args = append(args, "-g", strings.Join(opts.GQLSchemaFiles, ","))
}

// dgraphCmdPath := os.Getenv("DGRAPH_CMD_PATH")
// if dgraphCmdPath == "" {
// dgraphCmdPath = filepath.Join(c.tempBinDir, "dgraph")
// }

log.Printf("[INFO] running bulk loader with args: [%v]", strings.Join(args, " "))
binaryName := "dgraph"
if os.Getenv("DGRAPH_BINARY") != "" {
Expand Down
8 changes: 6 additions & 2 deletions edgraph/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1828,6 +1828,7 @@ func (s *Server) UpdateExtSnapshotStreamingState(ctx context.Context,

groups, err := worker.ProposeDrain(ctx, req)
if err != nil {
glog.Errorf("[import] failed to propose drain mode: %v", err)
return nil, err
}

Expand All @@ -1838,8 +1839,11 @@ func (s *Server) UpdateExtSnapshotStreamingState(ctx context.Context,

func (s *Server) StreamExtSnapshot(stream api.Dgraph_StreamExtSnapshotServer) error {
defer x.ExtSnapshotStreamingState(false)

return worker.InStream(stream)
if err := worker.InStream(stream); err != nil {
glog.Errorf("[import] failed to stream external snapshot: %v", err)
return err
}
return nil
}

// CommitOrAbort commits or aborts a transaction.
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ require (
github.com/HdrHistogram/hdrhistogram-go v1.1.2
github.com/IBM/sarama v1.46.1
github.com/Masterminds/semver/v3 v3.4.0
github.com/blevesearch/bleve/v2 v2.5.3
github.com/blevesearch/bleve/v2 v2.5.2
github.com/dgraph-io/badger/v4 v4.8.0
github.com/dgraph-io/dgo/v250 v250.0.0-preview4.0.20251001031539-c5607f0af3d0
github.com/dgraph-io/dgo/v250 v250.0.0-preview7
github.com/dgraph-io/gqlgen v0.13.2
github.com/dgraph-io/gqlparser/v2 v2.2.2
github.com/dgraph-io/graphql-transport-ws v0.0.0-20210511143556-2cef522f1f15
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@ github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/bits-and-blooms/bitset v1.22.0 h1:Tquv9S8+SGaS3EhyA+up3FXzmkhxPGjQQCkcs2uw7w4=
github.com/bits-and-blooms/bitset v1.22.0/go.mod h1:7hO7Gc7Pp1vODcmWvKMRA9BNmbv6a/7QIWpPxHddWR8=
github.com/blevesearch/bleve/v2 v2.5.3 h1:9l1xtKaETv64SZc1jc4Sy0N804laSa/LeMbYddq1YEM=
github.com/blevesearch/bleve/v2 v2.5.3/go.mod h1:Z/e8aWjiq8HeX+nW8qROSxiE0830yQA071dwR3yoMzw=
github.com/blevesearch/bleve/v2 v2.5.2 h1:Ab0r0MODV2C5A6BEL87GqLBySqp/s9xFgceCju6BQk8=
github.com/blevesearch/bleve/v2 v2.5.2/go.mod h1:5Dj6dUQxZM6aqYT3eutTD/GpWKGFSsV8f7LDidFbwXo=
github.com/blevesearch/bleve_index_api v1.2.8 h1:Y98Pu5/MdlkRyLM0qDHostYo7i+Vv1cDNhqTeR4Sy6Y=
github.com/blevesearch/bleve_index_api v1.2.8/go.mod h1:rKQDl4u51uwafZxFrPD1R7xFOwKnzZW7s/LSeK4lgo0=
github.com/blevesearch/geo v0.2.4 h1:ECIGQhw+QALCZaDcogRTNSJYQXRtC8/m8IKiA706cqk=
Expand Down Expand Up @@ -132,8 +132,8 @@ github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgraph-io/badger/v4 v4.8.0 h1:JYph1ChBijCw8SLeybvPINizbDKWZ5n/GYbz2yhN/bs=
github.com/dgraph-io/badger/v4 v4.8.0/go.mod h1:U6on6e8k/RTbUWxqKR0MvugJuVmkxSNc79ap4917h4w=
github.com/dgraph-io/dgo/v250 v250.0.0-preview4.0.20251001031539-c5607f0af3d0 h1:cbGtNKHWe34SYYPtFe/klD+cvJx5LI844Iz/akWvofo=
github.com/dgraph-io/dgo/v250 v250.0.0-preview4.0.20251001031539-c5607f0af3d0/go.mod h1:H3PcQuhmfzSC/1I7FLJYOxntpk3UG6lmZAyv0QxRm+o=
github.com/dgraph-io/dgo/v250 v250.0.0-preview7 h1:cEranHYlUFwacvtksjwXU8qB1NHIhrvPE1gdGS6r+lU=
github.com/dgraph-io/dgo/v250 v250.0.0-preview7/go.mod h1:OVSaapUnuqaY4beLe98CajukINwbVm0JRNp0SRBCz/w=
github.com/dgraph-io/gqlgen v0.13.2 h1:TNhndk+eHKj5qE7BenKKSYdSIdOGhLqxR1rCiMso9KM=
github.com/dgraph-io/gqlgen v0.13.2/go.mod h1:iCOrOv9lngN7KAo+jMgvUPVDlYHdf7qDwsTkQby2Sis=
github.com/dgraph-io/gqlparser/v2 v2.1.1/go.mod h1:MYS4jppjyx8b9tuUtjV7jU1UFZK6P9fvO8TsIsQtRKU=
Expand Down
2 changes: 1 addition & 1 deletion protos/pb.proto
Original file line number Diff line number Diff line change
Expand Up @@ -599,7 +599,7 @@ service Worker {
rpc DeleteNamespace(DeleteNsRequest) returns (Status) {}
rpc TaskStatus(TaskStatusRequest) returns (TaskStatusResponse) {}
rpc UpdateExtSnapshotStreamingState(api.UpdateExtSnapshotStreamingStateRequest) returns (Status) {}
rpc StreamExtSnapshot(stream api.StreamExtSnapshotRequest) returns (api.StreamExtSnapshotResponse) {}
rpc StreamExtSnapshot(stream api.StreamExtSnapshotRequest) returns (stream api.StreamExtSnapshotResponse) {}
}

message TabletResponse {
Expand Down
7 changes: 4 additions & 3 deletions protos/pb/pb.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 5 additions & 7 deletions protos/pb/pb_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading