@@ -289,18 +289,16 @@ public void visitTezOp(TezOperator tezOp) throws VisitorException {
289289 if (storeVertexGroupOps [i ] != null ) {
290290 continue ;
291291 }
292- }
293- if (existingVertexGroup != null ) {
294- storeVertexGroupOps [i ] = existingVertexGroup ;
295- existingVertexGroup .getVertexGroupMembers ().remove (unionOp .getOperatorKey ());
296- existingVertexGroup .getVertexGroupMembers ().addAll (unionOp .getUnionMembers ());
297- existingVertexGroup .getVertexGroupInfo ().removeInput (unionOp .getOperatorKey ());
298- } else {
299292 storeVertexGroupOps [i ] = new TezOperator (OperatorKey .genOpKey (scope ));
300293 storeVertexGroupOps [i ].setVertexGroupInfo (new VertexGroupInfo (unionStoreOutputs .get (i )));
301294 storeVertexGroupOps [i ].getVertexGroupInfo ().setSFile (unionStoreOutputs .get (i ).getSFile ());
302295 storeVertexGroupOps [i ].setVertexGroupMembers (new ArrayList <OperatorKey >(unionOp .getUnionMembers ()));
303296 tezPlan .add (storeVertexGroupOps [i ]);
297+ } else {
298+ storeVertexGroupOps [i ] = existingVertexGroup ;
299+ existingVertexGroup .getVertexGroupMembers ().remove (unionOp .getOperatorKey ());
300+ existingVertexGroup .getVertexGroupMembers ().addAll (unionOp .getUnionMembers ());
301+ existingVertexGroup .getVertexGroupInfo ().removeInput (unionOp .getOperatorKey ());
304302 }
305303 }
306304
@@ -320,19 +318,36 @@ public void visitTezOp(TezOperator tezOp) throws VisitorException {
320318 TezOperator [] outputVertexGroupOps = new TezOperator [unionOutputKeys .size ()];
321319 String [] newOutputKeys = new String [unionOutputKeys .size ()];
322320 for (int i =0 ; i < outputVertexGroupOps .length ; i ++) {
323- for (int j = 0 ; j < i ; j ++) {
324- if (unionOutputKeys .get (i ).equals (unionOutputKeys .get (j ))) {
325- outputVertexGroupOps [i ] = outputVertexGroupOps [j ];
326- break ;
321+ TezOperator existingVertexGroup = null ;
322+ if (successors != null ) {
323+ for (TezOperator succ : successors ) {
324+ if (succ .isVertexGroup ()
325+ && unionOutputKeys .get (i ).equals (succ .getVertexGroupInfo ().getOutput ()) ) {
326+ existingVertexGroup = succ ;
327+ break ;
328+ }
327329 }
328330 }
329- if (outputVertexGroupOps [i ] != null ) {
330- continue ;
331+ if (existingVertexGroup == null ) {
332+ for (int j = 0 ; j < i ; j ++) {
333+ if (unionOutputKeys .get (i ).equals (unionOutputKeys .get (j ))) {
334+ outputVertexGroupOps [i ] = outputVertexGroupOps [j ];
335+ break ;
336+ }
337+ }
338+ if (outputVertexGroupOps [i ] != null ) {
339+ continue ;
340+ }
341+ outputVertexGroupOps [i ] = new TezOperator (OperatorKey .genOpKey (scope ));
342+ outputVertexGroupOps [i ].setVertexGroupInfo (new VertexGroupInfo ());
343+ outputVertexGroupOps [i ].getVertexGroupInfo ().setOutput (unionOutputKeys .get (i ));
344+ outputVertexGroupOps [i ].setVertexGroupMembers (new ArrayList <OperatorKey >(unionOp .getUnionMembers ()));
345+ } else {
346+ outputVertexGroupOps [i ] = existingVertexGroup ;
347+ existingVertexGroup .getVertexGroupMembers ().remove (unionOp .getOperatorKey ());
348+ existingVertexGroup .getVertexGroupMembers ().addAll (unionOp .getUnionMembers ());
349+ existingVertexGroup .getVertexGroupInfo ().removeInput (unionOp .getOperatorKey ());
331350 }
332- outputVertexGroupOps [i ] = new TezOperator (OperatorKey .genOpKey (scope ));
333- outputVertexGroupOps [i ].setVertexGroupInfo (new VertexGroupInfo ());
334- outputVertexGroupOps [i ].getVertexGroupInfo ().setOutput (unionOutputKeys .get (i ));
335- outputVertexGroupOps [i ].setVertexGroupMembers (new ArrayList <OperatorKey >(unionOp .getUnionMembers ()));
336351 newOutputKeys [i ] = outputVertexGroupOps [i ].getOperatorKey ().toString ();
337352 tezPlan .add (outputVertexGroupOps [i ]);
338353 }
@@ -619,18 +634,6 @@ private void connectVertexGroupsToSuccessors(TezOperator unionOp,
619634 // Connect to outputVertexGroupOps
620635 for (Entry <OperatorKey , TezEdgeDescriptor > entry : unionOp .outEdges .entrySet ()) {
621636 TezOperator succOp = tezPlan .getOperator (entry .getKey ());
622- // Case of union followed by union.
623- // unionOp.outEdges will not point to vertex group, but to its output.
624- // So find the vertex group if there is one.
625- TezOperator succOpVertexGroup = null ;
626- for (TezOperator succ : successors ) {
627- if (succ .isVertexGroup ()
628- && succOp .getOperatorKey ().toString ()
629- .equals (succ .getVertexGroupInfo ().getOutput ())) {
630- succOpVertexGroup = succ ;
631- break ;
632- }
633- }
634637 TezEdgeDescriptor edge = entry .getValue ();
635638 // Edge cannot be one to one as it will get input from two or
636639 // more union predecessors. Change it to SCATTER_GATHER
@@ -641,26 +644,14 @@ private void connectVertexGroupsToSuccessors(TezOperator unionOp,
641644 edge .inputClassName = UnorderedKVInput .class .getName ();
642645 }
643646 TezOperator vertexGroupOp = outputVertexGroupOps [unionOutputKeys .indexOf (entry .getKey ().toString ())];
644- for (OperatorKey predKey : vertexGroupOp . getVertexGroupMembers ()) {
647+ for (OperatorKey predKey : unionOp . getUnionMembers ()) {
645648 TezOperator pred = tezPlan .getOperator (predKey );
646649 // Keep the output edge directly to successor
647650 // Don't need to keep output edge for vertexgroup
648651 pred .outEdges .put (entry .getKey (), edge );
649652 succOp .inEdges .put (predKey , edge );
650- if (succOpVertexGroup != null ) {
651- succOpVertexGroup .getVertexGroupMembers ().add (predKey );
652- succOpVertexGroup .getVertexGroupInfo ().addInput (predKey );
653- // Connect directly to the successor vertex group
654- tezPlan .disconnect (pred , vertexGroupOp );
655- tezPlan .connect (pred , succOpVertexGroup );
656- }
657653 }
658- if (succOpVertexGroup != null ) {
659- succOpVertexGroup .getVertexGroupMembers ().remove (unionOp .getOperatorKey ());
660- succOpVertexGroup .getVertexGroupInfo ().removeInput (unionOp .getOperatorKey ());
661- //Discard the new vertex group created
662- tezPlan .remove (vertexGroupOp );
663- } else {
654+ if (!tezPlan .pathExists (vertexGroupOp , succOp )) {
664655 tezPlan .connect (vertexGroupOp , succOp );
665656 }
666657 }
0 commit comments