@@ -151,9 +151,11 @@ func streamSnapshotForGroup(ctx context.Context, dc api.DgraphClient, pdir strin
151151 if err := out .Send (groupReq ); err != nil {
152152 return fmt .Errorf ("failed to send request for group ID [%v] to the server: %w" , groupId , err )
153153 }
154+ fmt .Println ("waiting here==================================>" )
154155 if _ , err := out .Recv (); err != nil {
155156 return fmt .Errorf ("failed to receive response for group ID [%v] from the server: %w" , groupId , err )
156157 }
158+
157159 glog .Infof ("[import] Group [%v]: Received ACK for sending group request" , groupId )
158160
159161 // Configure and start the BadgerDB stream
@@ -197,9 +199,24 @@ func streamBadger(ctx context.Context, ps *badger.DB, out api.Dgraph_StreamExtSn
197199 return fmt .Errorf ("failed to send 'done' signal for group [%d]: %w" , groupId , err )
198200 }
199201
200- if _ , err := out .Recv (); err != nil {
201- return fmt .Errorf ("failed to receive response for group ID [%v] from the server: %w" , groupId , err )
202+ for {
203+ if ctx .Err () != nil {
204+ return ctx .Err ()
205+ }
206+ resp , err := out .Recv ()
207+ if errors .Is (err , io .EOF ) {
208+ return fmt .Errorf ("server closed stream before Finish=true for group [%d]" , groupId )
209+ }
210+ if err != nil {
211+ return fmt .Errorf ("failed to receive final response for group ID [%v] from the server: %w" , groupId , err )
212+ }
213+ if resp .Finish {
214+ glog .Infof ("[import] Group [%v]: Received final Finish=true" , groupId )
215+ break
216+ }
217+ glog .Infof ("[import] Group [%v]: Waiting for Finish=true, got interim ACK" , groupId )
202218 }
219+
203220 glog .Infof ("[import] Group [%v]: Received ACK for sending completion signal" , groupId )
204221
205222 return nil
0 commit comments