Skip to content

Commit 9c64df7

Browse files
Merge pull request #1 from hypermodeinc/main
make dgraph import work across the internet (dgraph-io#9456)
2 parents df26d43 + e6980be commit 9c64df7

File tree

14 files changed

+283
-130
lines changed

14 files changed

+283
-130
lines changed

.github/workflows/ci-dgraph-integration2-tests.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ jobs:
2424
dgraph-integration2-tests:
2525
if: github.event.pull_request.draft == false
2626
runs-on: ubuntu-latest
27-
timeout-minutes: 30
27+
timeout-minutes: 90
2828
steps:
2929
- uses: actions/checkout@v5
3030
with:
@@ -48,7 +48,7 @@ jobs:
4848
# move the binary
4949
cp dgraph/dgraph ~/go/bin/dgraph
5050
# run the tests
51-
go test -v -timeout=30m -failfast -tags=integration2 ./...
51+
go test -v -timeout=90m -failfast -tags=integration2 ./...
5252
# clean up docker containers after test execution
5353
go clean -testcache
5454
# sleep

Dockerfile

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,13 @@ RUN apt-get update && apt-get install -y --no-install-recommends \
44
bzip2=1.0.8-5+b1 \
55
git=1:2.39.5-0+deb12u2 \
66
&& rm -rf /var/lib/apt/lists/*
7+
ARG TARGETARCH=amd64
8+
ARG TARGETOS=linux
79
WORKDIR /go/src/repo
810
COPY go.mod go.sum ./
911
RUN go mod download && go mod verify
1012
COPY . .
11-
RUN CGO_ENABLED=0 make
13+
RUN CGO_ENABLED=0 GOOS=${TARGETOS} GOARCH=${TARGETARCH} make
1214

1315
###################### Stage II ######################
1416
FROM ubuntu:24.04

dgraph/cmd/dgraphimport/import_client.go

Lines changed: 33 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -127,23 +127,18 @@ func streamSnapshotForGroup(ctx context.Context, dc api.DgraphClient, pdir strin
127127
if err != nil {
128128
return fmt.Errorf("failed to start external snapshot stream for group %d: %w", groupId, err)
129129
}
130-
131130
defer func() {
132-
if _, err := out.CloseAndRecv(); err != nil {
133-
glog.Errorf("failed to close the stream for group [%v]: %v", groupId, err)
134-
}
135-
136-
glog.Infof("[import] Group [%v]: Received ACK ", groupId)
131+
_ = out.CloseSend()
137132
}()
138133

139134
// Open the BadgerDB instance at the specified directory
140135
opt := badger.DefaultOptions(pdir)
136+
opt.ReadOnly = true
141137
ps, err := badger.OpenManaged(opt)
142138
if err != nil {
143139
glog.Errorf("failed to open BadgerDB at [%s]: %v", pdir, err)
144140
return fmt.Errorf("failed to open BadgerDB at [%v]: %v", pdir, err)
145141
}
146-
147142
defer func() {
148143
if err := ps.Close(); err != nil {
149144
glog.Warningf("[import] Error closing BadgerDB: %v", err)
@@ -154,17 +149,19 @@ func streamSnapshotForGroup(ctx context.Context, dc api.DgraphClient, pdir strin
154149
glog.Infof("[import] Sending request for streaming external snapshot for group ID [%v]", groupId)
155150
groupReq := &api.StreamExtSnapshotRequest{GroupId: groupId}
156151
if err := out.Send(groupReq); err != nil {
157-
return fmt.Errorf("failed to send request for streaming external snapshot for group ID [%v] to the server: %w",
158-
groupId, err)
152+
return fmt.Errorf("failed to send request for group ID [%v] to the server: %w", groupId, err)
153+
}
154+
if _, err := out.Recv(); err != nil {
155+
return fmt.Errorf("failed to receive response for group ID [%v] from the server: %w", groupId, err)
159156
}
160157

158+
glog.Infof("[import] Group [%v]: Received ACK for sending group request", groupId)
159+
161160
// Configure and start the BadgerDB stream
162161
glog.Infof("[import] Starting BadgerDB stream for group [%v]", groupId)
163-
164162
if err := streamBadger(ctx, ps, out, groupId); err != nil {
165163
return fmt.Errorf("badger streaming failed for group [%v]: %v", groupId, err)
166164
}
167-
168165
return nil
169166
}
170167

@@ -180,6 +177,11 @@ func streamBadger(ctx context.Context, ps *badger.DB, out api.Dgraph_StreamExtSn
180177
if err := out.Send(&api.StreamExtSnapshotRequest{Pkt: p}); err != nil && !errors.Is(err, io.EOF) {
181178
return fmt.Errorf("failed to send data chunk: %w", err)
182179
}
180+
if _, err := out.Recv(); err != nil {
181+
return fmt.Errorf("failed to receive response for group ID [%v] from the server: %w", groupId, err)
182+
}
183+
glog.Infof("[import] Group [%v]: Received ACK for sending data chunk", groupId)
184+
183185
return nil
184186
}
185187

@@ -196,5 +198,25 @@ func streamBadger(ctx context.Context, ps *badger.DB, out api.Dgraph_StreamExtSn
196198
return fmt.Errorf("failed to send 'done' signal for group [%d]: %w", groupId, err)
197199
}
198200

201+
for {
202+
if ctx.Err() != nil {
203+
return ctx.Err()
204+
}
205+
resp, err := out.Recv()
206+
if errors.Is(err, io.EOF) {
207+
return fmt.Errorf("server closed stream before Finish=true for group [%d]", groupId)
208+
}
209+
if err != nil {
210+
return fmt.Errorf("failed to receive final response for group ID [%v] from the server: %w", groupId, err)
211+
}
212+
if resp.Finish {
213+
glog.Infof("[import] Group [%v]: Received final Finish=true", groupId)
214+
break
215+
}
216+
glog.Infof("[import] Group [%v]: Waiting for Finish=true, got interim ACK", groupId)
217+
}
218+
219+
glog.Infof("[import] Group [%v]: Received ACK for sending completion signal", groupId)
220+
199221
return nil
200222
}

dgraph/cmd/dgraphimport/import_test.go

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
//go:build integration
1+
//go:build integration2
22

33
/*
44
* SPDX-FileCopyrightText: © Hypermode Inc. <[email protected]>
@@ -79,7 +79,8 @@ func TestDrainModeAfterStartSnapshotStream(t *testing.T) {
7979

8080
for _, tt := range tests {
8181
t.Run(tt.name, func(t *testing.T) {
82-
conf := dgraphtest.NewClusterConfig().WithNumAlphas(tt.numAlphas).WithNumZeros(tt.numZeros).WithReplicas(tt.replicas)
82+
conf := dgraphtest.NewClusterConfig().WithNumAlphas(tt.numAlphas).
83+
WithNumZeros(tt.numZeros).WithReplicas(tt.replicas)
8384
c, err := dgraphtest.NewLocalCluster(conf)
8485
require.NoError(t, err)
8586
defer func() { c.Cleanup(t.Failed()) }()
@@ -284,15 +285,14 @@ func runImportTest(t *testing.T, tt testcase) {
284285
require.ErrorContains(t, err, tt.err)
285286
return
286287
}
287-
288288
require.NoError(t, Import(context.Background(), connectionString, outDir))
289289

290290
for group, alphas := range alphaGroups {
291291
for i := 0; i < tt.downAlphas; i++ {
292292
alphaID := alphas[i]
293293
t.Logf("Starting alpha %v from group %v", alphaID, group)
294294
require.NoError(t, targetCluster.StartAlpha(alphaID))
295-
require.NoError(t, waitForAlphaReady(t, targetCluster, alphaID, 30*time.Second))
295+
require.NoError(t, waitForAlphaReady(t, targetCluster, alphaID, 60*time.Second))
296296
}
297297
}
298298

@@ -306,7 +306,7 @@ func runImportTest(t *testing.T, tt testcase) {
306306
require.NoError(t, err)
307307
defer cleanup()
308308

309-
require.NoError(t, validateClientConnection(t, gc, 10*time.Second))
309+
require.NoError(t, validateClientConnection(t, gc, 30*time.Second))
310310
verifyImportResults(t, gc, tt.downAlphas)
311311
}
312312
}
@@ -347,7 +347,9 @@ func setupBulkCluster(t *testing.T, numAlphas int, encrypted bool) (*dgraphtest.
347347
}
348348

349349
// setupTargetCluster creates and starts a cluster that will receive the imported data
350-
func setupTargetCluster(t *testing.T, numAlphas, replicasFactor int) (*dgraphtest.LocalCluster, *dgraphapi.GrpcClient, func()) {
350+
func setupTargetCluster(t *testing.T, numAlphas, replicasFactor int) (
351+
*dgraphtest.LocalCluster, *dgraphapi.GrpcClient, func()) {
352+
351353
conf := dgraphtest.NewClusterConfig().
352354
WithNumAlphas(numAlphas).
353355
WithNumZeros(3).
@@ -366,7 +368,7 @@ func setupTargetCluster(t *testing.T, numAlphas, replicasFactor int) (*dgraphtes
366368

367369
// verifyImportResults validates the result of an import operation with retry logic
368370
func verifyImportResults(t *testing.T, gc *dgraphapi.GrpcClient, downAlphas int) {
369-
maxRetries := 1
371+
maxRetries := 5
370372
if downAlphas > 0 {
371373
maxRetries = 10
372374
}
@@ -547,7 +549,7 @@ func retryHealthCheck(t *testing.T, cluster *dgraphtest.LocalCluster, timeout ti
547549
// validateClientConnection ensures the client connection is working before use
548550
func validateClientConnection(t *testing.T, gc *dgraphapi.GrpcClient, timeout time.Duration) error {
549551
deadline := time.Now().Add(timeout)
550-
retryDelay := 200 * time.Millisecond
552+
retryDelay := 1 * time.Second
551553

552554
for time.Now().Before(deadline) {
553555
if _, err := gc.Query("schema{}"); err != nil {

dgraph/cmd/dgraphimport/run.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ func init() {
3434

3535
flag := ImportCmd.Cmd.Flags()
3636
flag.StringP("files", "f", "", "Location of *.rdf(.gz) or *.json(.gz) file(s) to load.")
37+
flag.StringP("snapshot-dir", "p", "", "Location of p directory")
3738
flag.StringP("schema", "s", "", "Location of DQL schema file.")
3839
flag.StringP("graphql_schema", "g", "", "Location of the GraphQL schema file.")
3940
flag.StringP("graphql-schema", "", "", "Location of the GraphQL schema file.")
@@ -72,6 +73,17 @@ func run() {
7273
os.Exit(1)
7374
}
7475

76+
// if snapshot p directory is already provided, there is no need to run bulk loader
77+
if ImportCmd.Conf.GetString("snapshot-dir") != "" {
78+
connStr := ImportCmd.Conf.GetString("conn-str")
79+
snapshotDir := ImportCmd.Conf.GetString("snapshot-dir")
80+
if err := Import(context.Background(), connStr, snapshotDir); err != nil {
81+
fmt.Println("Failed to import data:", err)
82+
os.Exit(1)
83+
}
84+
return
85+
}
86+
7587
cacheSize := 64 << 20 // These are the default values. User can overwrite them using --badger.
7688
cacheDefaults := fmt.Sprintf("indexcachesize=%d; blockcachesize=%d; ",
7789
(70*cacheSize)/100, (30*cacheSize)/100)

dgraph/cmd/root.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"github.com/spf13/viper"
2323

2424
"github.com/hypermodeinc/dgraph/v25/acl"
25+
2526
"github.com/hypermodeinc/dgraph/v25/audit"
2627
"github.com/hypermodeinc/dgraph/v25/backup"
2728
checkupgrade "github.com/hypermodeinc/dgraph/v25/check_upgrade"

dgraphtest/load.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -503,6 +503,11 @@ func (c *LocalCluster) BulkLoad(opts BulkOpts) error {
503503
args = append(args, "-g", strings.Join(opts.GQLSchemaFiles, ","))
504504
}
505505

506+
// dgraphCmdPath := os.Getenv("DGRAPH_CMD_PATH")
507+
// if dgraphCmdPath == "" {
508+
// dgraphCmdPath = filepath.Join(c.tempBinDir, "dgraph")
509+
// }
510+
506511
log.Printf("[INFO] running bulk loader with args: [%v]", strings.Join(args, " "))
507512
binaryName := "dgraph"
508513
if os.Getenv("DGRAPH_BINARY") != "" {

edgraph/server.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1828,6 +1828,7 @@ func (s *Server) UpdateExtSnapshotStreamingState(ctx context.Context,
18281828

18291829
groups, err := worker.ProposeDrain(ctx, req)
18301830
if err != nil {
1831+
glog.Errorf("[import] failed to propose drain mode: %v", err)
18311832
return nil, err
18321833
}
18331834

@@ -1838,8 +1839,11 @@ func (s *Server) UpdateExtSnapshotStreamingState(ctx context.Context,
18381839

18391840
func (s *Server) StreamExtSnapshot(stream api.Dgraph_StreamExtSnapshotServer) error {
18401841
defer x.ExtSnapshotStreamingState(false)
1841-
1842-
return worker.InStream(stream)
1842+
if err := worker.InStream(stream); err != nil {
1843+
glog.Errorf("[import] failed to stream external snapshot: %v", err)
1844+
return err
1845+
}
1846+
return nil
18431847
}
18441848

18451849
// CommitOrAbort commits or aborts a transaction.

go.mod

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,9 @@ require (
77
github.com/HdrHistogram/hdrhistogram-go v1.1.2
88
github.com/IBM/sarama v1.46.1
99
github.com/Masterminds/semver/v3 v3.4.0
10-
github.com/blevesearch/bleve/v2 v2.5.3
10+
github.com/blevesearch/bleve/v2 v2.5.2
1111
github.com/dgraph-io/badger/v4 v4.8.0
12-
github.com/dgraph-io/dgo/v250 v250.0.0-preview4.0.20251001031539-c5607f0af3d0
12+
github.com/dgraph-io/dgo/v250 v250.0.0-preview7
1313
github.com/dgraph-io/gqlgen v0.13.2
1414
github.com/dgraph-io/gqlparser/v2 v2.2.2
1515
github.com/dgraph-io/graphql-transport-ws v0.0.0-20210511143556-2cef522f1f15

go.sum

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -77,8 +77,8 @@ github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
7777
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
7878
github.com/bits-and-blooms/bitset v1.22.0 h1:Tquv9S8+SGaS3EhyA+up3FXzmkhxPGjQQCkcs2uw7w4=
7979
github.com/bits-and-blooms/bitset v1.22.0/go.mod h1:7hO7Gc7Pp1vODcmWvKMRA9BNmbv6a/7QIWpPxHddWR8=
80-
github.com/blevesearch/bleve/v2 v2.5.3 h1:9l1xtKaETv64SZc1jc4Sy0N804laSa/LeMbYddq1YEM=
81-
github.com/blevesearch/bleve/v2 v2.5.3/go.mod h1:Z/e8aWjiq8HeX+nW8qROSxiE0830yQA071dwR3yoMzw=
80+
github.com/blevesearch/bleve/v2 v2.5.2 h1:Ab0r0MODV2C5A6BEL87GqLBySqp/s9xFgceCju6BQk8=
81+
github.com/blevesearch/bleve/v2 v2.5.2/go.mod h1:5Dj6dUQxZM6aqYT3eutTD/GpWKGFSsV8f7LDidFbwXo=
8282
github.com/blevesearch/bleve_index_api v1.2.8 h1:Y98Pu5/MdlkRyLM0qDHostYo7i+Vv1cDNhqTeR4Sy6Y=
8383
github.com/blevesearch/bleve_index_api v1.2.8/go.mod h1:rKQDl4u51uwafZxFrPD1R7xFOwKnzZW7s/LSeK4lgo0=
8484
github.com/blevesearch/geo v0.2.4 h1:ECIGQhw+QALCZaDcogRTNSJYQXRtC8/m8IKiA706cqk=
@@ -132,8 +132,8 @@ github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1
132132
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
133133
github.com/dgraph-io/badger/v4 v4.8.0 h1:JYph1ChBijCw8SLeybvPINizbDKWZ5n/GYbz2yhN/bs=
134134
github.com/dgraph-io/badger/v4 v4.8.0/go.mod h1:U6on6e8k/RTbUWxqKR0MvugJuVmkxSNc79ap4917h4w=
135-
github.com/dgraph-io/dgo/v250 v250.0.0-preview4.0.20251001031539-c5607f0af3d0 h1:cbGtNKHWe34SYYPtFe/klD+cvJx5LI844Iz/akWvofo=
136-
github.com/dgraph-io/dgo/v250 v250.0.0-preview4.0.20251001031539-c5607f0af3d0/go.mod h1:H3PcQuhmfzSC/1I7FLJYOxntpk3UG6lmZAyv0QxRm+o=
135+
github.com/dgraph-io/dgo/v250 v250.0.0-preview7 h1:cEranHYlUFwacvtksjwXU8qB1NHIhrvPE1gdGS6r+lU=
136+
github.com/dgraph-io/dgo/v250 v250.0.0-preview7/go.mod h1:OVSaapUnuqaY4beLe98CajukINwbVm0JRNp0SRBCz/w=
137137
github.com/dgraph-io/gqlgen v0.13.2 h1:TNhndk+eHKj5qE7BenKKSYdSIdOGhLqxR1rCiMso9KM=
138138
github.com/dgraph-io/gqlgen v0.13.2/go.mod h1:iCOrOv9lngN7KAo+jMgvUPVDlYHdf7qDwsTkQby2Sis=
139139
github.com/dgraph-io/gqlparser/v2 v2.1.1/go.mod h1:MYS4jppjyx8b9tuUtjV7jU1UFZK6P9fvO8TsIsQtRKU=

0 commit comments

Comments
 (0)