Skip to content

Commit ab94bf1

Browse files
committed
Make dgraph import work over the internet
This PR fixes the timeout issue when cloudflare is not happy for just one way data send. It adds application level ACKs to work around the 120s timeout for a HTTP response. Additionally, it adds an argument to take p directory as input for the dgraph import command.
1 parent 883880d commit ab94bf1

File tree

16 files changed

+151
-116
lines changed

16 files changed

+151
-116
lines changed

.github/workflows/ci-dgraph-integration2-tests.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ jobs:
2424
dgraph-integration2-tests:
2525
if: github.event.pull_request.draft == false
2626
runs-on: warp-ubuntu-latest-x64-4x
27-
timeout-minutes: 30
27+
timeout-minutes: 60
2828
steps:
2929
- uses: actions/checkout@v4
3030
with:

Dockerfile

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,13 @@ RUN apt-get update && apt-get install -y --no-install-recommends \
44
bzip2=1.0.8-5+b1 \
55
git=1:2.39.5-0+deb12u2 \
66
&& rm -rf /var/lib/apt/lists/*
7+
ARG TARGETARCH=amd64
8+
ARG TARGETOS=linux
79
WORKDIR /go/src/repo
810
COPY go.mod go.sum ./
911
RUN go mod download && go mod verify
1012
COPY . .
11-
RUN CGO_ENABLED=0 make
13+
RUN CGO_ENABLED=0 GOOS=${TARGETOS} GOARCH=${TARGETARCH} make
1214

1315
###################### Stage II ######################
1416
FROM ubuntu:24.04

dgraph/cmd/dgraphimport/import_client.go

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -127,23 +127,18 @@ func streamSnapshotForGroup(ctx context.Context, dc apiv2.DgraphClient, pdir str
127127
if err != nil {
128128
return fmt.Errorf("failed to start external snapshot stream for group %d: %w", groupId, err)
129129
}
130-
131130
defer func() {
132-
if _, err := out.CloseAndRecv(); err != nil {
133-
glog.Errorf("failed to close the stream for group [%v]: %v", groupId, err)
134-
}
135-
136-
glog.Infof("[import] Group [%v]: Received ACK ", groupId)
131+
_ = out.CloseSend()
137132
}()
138133

139134
// Open the BadgerDB instance at the specified directory
140135
opt := badger.DefaultOptions(pdir)
136+
opt.ReadOnly = true
141137
ps, err := badger.OpenManaged(opt)
142138
if err != nil {
143139
glog.Errorf("failed to open BadgerDB at [%s]: %v", pdir, err)
144140
return fmt.Errorf("failed to open BadgerDB at [%v]: %v", pdir, err)
145141
}
146-
147142
defer func() {
148143
if err := ps.Close(); err != nil {
149144
glog.Warningf("[import] Error closing BadgerDB: %v", err)
@@ -154,17 +149,18 @@ func streamSnapshotForGroup(ctx context.Context, dc apiv2.DgraphClient, pdir str
154149
glog.Infof("[import] Sending request for streaming external snapshot for group ID [%v]", groupId)
155150
groupReq := &apiv2.StreamExtSnapshotRequest{GroupId: groupId}
156151
if err := out.Send(groupReq); err != nil {
157-
return fmt.Errorf("failed to send request for streaming external snapshot for group ID [%v] to the server: %w",
158-
groupId, err)
152+
return fmt.Errorf("failed to send request for group ID [%v] to the server: %w", groupId, err)
153+
}
154+
if _, err := out.Recv(); err != nil {
155+
return fmt.Errorf("failed to receive response for group ID [%v] from the server: %w", groupId, err)
159156
}
157+
glog.Infof("[import] Group [%v]: Received ACK for sending group request", groupId)
160158

161159
// Configure and start the BadgerDB stream
162160
glog.Infof("[import] Starting BadgerDB stream for group [%v]", groupId)
163-
164161
if err := streamBadger(ctx, ps, out, groupId); err != nil {
165162
return fmt.Errorf("badger streaming failed for group [%v]: %v", groupId, err)
166163
}
167-
168164
return nil
169165
}
170166

@@ -180,6 +176,11 @@ func streamBadger(ctx context.Context, ps *badger.DB, out apiv2.Dgraph_StreamExt
180176
if err := out.Send(&apiv2.StreamExtSnapshotRequest{Pkt: p}); err != nil && !errors.Is(err, io.EOF) {
181177
return fmt.Errorf("failed to send data chunk: %w", err)
182178
}
179+
if _, err := out.Recv(); err != nil {
180+
return fmt.Errorf("failed to receive response for group ID [%v] from the server: %w", groupId, err)
181+
}
182+
glog.Infof("[import] Group [%v]: Received ACK for sending data chunk", groupId)
183+
183184
return nil
184185
}
185186

@@ -196,5 +197,10 @@ func streamBadger(ctx context.Context, ps *badger.DB, out apiv2.Dgraph_StreamExt
196197
return fmt.Errorf("failed to send 'done' signal for group [%d]: %w", groupId, err)
197198
}
198199

200+
if _, err := out.Recv(); err != nil {
201+
return fmt.Errorf("failed to receive response for group ID [%v] from the server: %w", groupId, err)
202+
}
203+
glog.Infof("[import] Group [%v]: Received ACK for sending completion signal", groupId)
204+
199205
return nil
200206
}

dgraph/cmd/dgraphimport/import_test.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
//go:build integration
1+
//go:build integration2
22

33
/*
44
* SPDX-FileCopyrightText: © Hypermode Inc. <[email protected]>
@@ -75,7 +75,8 @@ func TestDrainModeAfterStartSnapshotStream(t *testing.T) {
7575

7676
for _, tt := range tests {
7777
t.Run(tt.name, func(t *testing.T) {
78-
conf := dgraphtest.NewClusterConfig().WithNumAlphas(tt.numAlphas).WithNumZeros(tt.numZeros).WithReplicas(tt.replicas)
78+
conf := dgraphtest.NewClusterConfig().WithNumAlphas(tt.numAlphas).
79+
WithNumZeros(tt.numZeros).WithReplicas(tt.replicas)
7980
c, err := dgraphtest.NewLocalCluster(conf)
8081
require.NoError(t, err)
8182
defer func() { c.Cleanup(t.Failed()) }()
@@ -268,14 +269,12 @@ func runImportTest(t *testing.T, tt testcase) {
268269
require.NoError(t, targetCluster.StopAlpha(alphaID))
269270
}
270271
}
271-
272272
if tt.err != "" {
273273
err := Import(context.Background(), connectionString, outDir)
274274
require.Error(t, err)
275275
require.ErrorContains(t, err, tt.err)
276276
return
277277
}
278-
279278
require.NoError(t, Import(context.Background(), connectionString, outDir))
280279

281280
for group, alphas := range alphaGroups {
@@ -287,7 +286,6 @@ func runImportTest(t *testing.T, tt testcase) {
287286
}
288287

289288
require.NoError(t, targetCluster.HealthCheck(false))
290-
291289
t.Log("Import completed")
292290

293291
for i := 0; i < tt.targetAlphas; i++ {
@@ -330,7 +328,9 @@ func setupBulkCluster(t *testing.T, numAlphas int, encrypted bool) (*dgraphtest.
330328
}
331329

332330
// setupTargetCluster creates and starts a cluster that will receive the imported data
333-
func setupTargetCluster(t *testing.T, numAlphas, replicasFactor int) (*dgraphtest.LocalCluster, *dgraphapi.GrpcClient, func()) {
331+
func setupTargetCluster(t *testing.T, numAlphas, replicasFactor int) (
332+
*dgraphtest.LocalCluster, *dgraphapi.GrpcClient, func()) {
333+
334334
conf := dgraphtest.NewClusterConfig().
335335
WithNumAlphas(numAlphas).
336336
WithNumZeros(3).

dgraph/cmd/dgraphimport/run.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ func init() {
3434

3535
flag := ImportCmd.Cmd.Flags()
3636
flag.StringP("files", "f", "", "Location of *.rdf(.gz) or *.json(.gz) file(s) to load.")
37+
flag.StringP("snapshot-dir", "p", "", "Location of p directory")
3738
flag.StringP("schema", "s", "", "Location of DQL schema file.")
3839
flag.StringP("graphql_schema", "g", "", "Location of the GraphQL schema file.")
3940
flag.StringP("graphql-schema", "", "", "Location of the GraphQL schema file.")
@@ -72,6 +73,17 @@ func run() {
7273
os.Exit(1)
7374
}
7475

76+
// if snapshot p directory is already provided, there is no need to run bulk loader
77+
if ImportCmd.Conf.GetString("snapshot-dir") != "" {
78+
connStr := ImportCmd.Conf.GetString("conn-str")
79+
snapshotDir := ImportCmd.Conf.GetString("snapshot-dir")
80+
if err := Import(context.Background(), connStr, snapshotDir); err != nil {
81+
fmt.Println("Failed to import data:", err)
82+
os.Exit(1)
83+
}
84+
return
85+
}
86+
7587
cacheSize := 64 << 20 // These are the default values. User can overwrite them using --badger.
7688
cacheDefaults := fmt.Sprintf("indexcachesize=%d; blockcachesize=%d; ",
7789
(70*cacheSize)/100, (30*cacheSize)/100)

dgraphtest/load.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -503,8 +503,12 @@ func (c *LocalCluster) BulkLoad(opts BulkOpts) error {
503503
args = append(args, "-g", strings.Join(opts.GQLSchemaFiles, ","))
504504
}
505505

506+
dgraphCmdPath := os.Getenv("DGRAPH_CMD_PATH")
507+
if dgraphCmdPath == "" {
508+
dgraphCmdPath = filepath.Join(c.tempBinDir, "dgraph")
509+
}
506510
log.Printf("[INFO] running bulk loader with args: [%v]", strings.Join(args, " "))
507-
cmd := exec.Command(filepath.Join(c.tempBinDir, "dgraph"), args...)
511+
cmd := exec.Command(dgraphCmdPath, args...)
508512
if out, err := cmd.CombinedOutput(); err != nil {
509513
return errors.Wrapf(err, "error running bulk loader: %v", string(out))
510514
} else {

edgraph/server.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1829,18 +1829,20 @@ func (s *ServerV25) UpdateExtSnapshotStreamingState(ctx context.Context,
18291829

18301830
groups, err := worker.ProposeDrain(ctx, req)
18311831
if err != nil {
1832+
glog.Errorf("[import] failed to propose drain mode: %v", err)
18321833
return nil, err
18331834
}
18341835

1835-
resp := &apiv2.UpdateExtSnapshotStreamingStateResponse{Groups: groups}
1836-
1837-
return resp, nil
1836+
return &apiv2.UpdateExtSnapshotStreamingStateResponse{Groups: groups}, nil
18381837
}
18391838

18401839
func (s *ServerV25) StreamExtSnapshot(stream apiv2.Dgraph_StreamExtSnapshotServer) error {
18411840
defer x.ExtSnapshotStreamingState(false)
1842-
1843-
return worker.InStream(stream)
1841+
if err := worker.InStream(stream); err != nil {
1842+
glog.Errorf("[import] failed to stream external snapshot: %v", err)
1843+
return err
1844+
}
1845+
return nil
18441846
}
18451847

18461848
// CommitOrAbort commits or aborts a transaction.

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ require (
99
github.com/Masterminds/semver/v3 v3.4.0
1010
github.com/blevesearch/bleve/v2 v2.5.2
1111
github.com/dgraph-io/badger/v4 v4.7.0
12-
github.com/dgraph-io/dgo/v250 v250.0.0-preview4.0.20250619041351-4a519e53fb9d
12+
github.com/dgraph-io/dgo/v250 v250.0.0-preview4.0.20250709182152-32901102e0d0
1313
github.com/dgraph-io/gqlgen v0.13.2
1414
github.com/dgraph-io/gqlparser/v2 v2.2.2
1515
github.com/dgraph-io/graphql-transport-ws v0.0.0-20210511143556-2cef522f1f15

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -128,8 +128,8 @@ github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1
128128
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
129129
github.com/dgraph-io/badger/v4 v4.7.0 h1:Q+J8HApYAY7UMpL8d9owqiB+odzEc0zn/aqOD9jhc6Y=
130130
github.com/dgraph-io/badger/v4 v4.7.0/go.mod h1:He7TzG3YBy3j4f5baj5B7Zl2XyfNe5bl4Udl0aPemVA=
131-
github.com/dgraph-io/dgo/v250 v250.0.0-preview4.0.20250619041351-4a519e53fb9d h1:9PLyvZY1Nih05g+2womk+kNnX3Gb20kx5BsK3foA5a8=
132-
github.com/dgraph-io/dgo/v250 v250.0.0-preview4.0.20250619041351-4a519e53fb9d/go.mod h1:gLr7uM+x/8PjSQJ4Ca9kfQF15uBzruDzRK3bnELt3vE=
131+
github.com/dgraph-io/dgo/v250 v250.0.0-preview4.0.20250709182152-32901102e0d0 h1:gvNB/M+LrjkX9c4QJCPdODVoxgMt3zlIBW9i8xVfoYo=
132+
github.com/dgraph-io/dgo/v250 v250.0.0-preview4.0.20250709182152-32901102e0d0/go.mod h1:gLr7uM+x/8PjSQJ4Ca9kfQF15uBzruDzRK3bnELt3vE=
133133
github.com/dgraph-io/gqlgen v0.13.2 h1:TNhndk+eHKj5qE7BenKKSYdSIdOGhLqxR1rCiMso9KM=
134134
github.com/dgraph-io/gqlgen v0.13.2/go.mod h1:iCOrOv9lngN7KAo+jMgvUPVDlYHdf7qDwsTkQby2Sis=
135135
github.com/dgraph-io/gqlparser/v2 v2.1.1/go.mod h1:MYS4jppjyx8b9tuUtjV7jU1UFZK6P9fvO8TsIsQtRKU=

protos/Makefile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ regenerate: tidy-deps copy-protos check clean
5353
@protoc \
5454
--proto_path=/usr/local/include \
5555
--proto_path=/usr/include \
56+
--proto_path=/opt/homebrew/include/google/protobuf \
5657
--proto_path=${PROTO_PATH} \
5758
--go_out=pb --go-grpc_out=pb \
5859
--go_opt=paths=source_relative \

0 commit comments

Comments
 (0)