@@ -61,6 +61,17 @@ public boolean push(Context context, @Nullable Object[] tuple, Vertex vertex) {
61
61
if (pin == 0 ) { // left
62
62
waitRightFinFlag (param );
63
63
TupleKey leftKey = HashJoinParam .rtrimTupleKey (new TupleKey (leftMapping .revMap (tuple )));
64
+
65
+ if (HashJoinParam .containsNull (leftKey )) {
66
+ if ("inner" .equalsIgnoreCase (param .getJoinType ()) || "right" .equalsIgnoreCase (param .getJoinType ())) {
67
+ return true ;
68
+ } else if ("left" .equalsIgnoreCase (param .getJoinType ()) || "full" .equalsIgnoreCase (param .getJoinType ())) {
69
+ Object [] newTuple = Arrays .copyOf (tuple , leftLength + rightLength );
70
+ Arrays .fill (newTuple , leftLength , leftLength + rightLength , null );
71
+ return pushToNext (param , edge , context , newTuple );
72
+ }
73
+ }
74
+
64
75
boolean isEmpty = isEmpty (leftKey , param );
65
76
if (isEmpty && ("inner" .equalsIgnoreCase (param .getJoinType ()) || "right" .equalsIgnoreCase (param .getJoinType ()))) {
66
77
return true ;
@@ -87,12 +98,23 @@ public boolean push(Context context, @Nullable Object[] tuple, Vertex vertex) {
87
98
}
88
99
} else if (pin == 1 ) { //right
89
100
TupleKey rightKey = HashJoinParam .rtrimTupleKey (new TupleKey (rightMapping .revMap (tuple )));
90
- if (isEmpty (rightKey , param ) && "inner" .equalsIgnoreCase (param .getJoinType ())) {
91
- return true ;
101
+ if (HashJoinParam .containsNull (rightKey )) {
102
+ if ("inner" .equalsIgnoreCase (param .getJoinType ()) || "left" .equalsIgnoreCase (param .getJoinType ())) {
103
+ return true ;
104
+ } else if ("right" .equalsIgnoreCase (param .getJoinType ()) || "full" .equalsIgnoreCase (param .getJoinType ())) {
105
+ List <TupleWithJoinFlag > list = param .getHashMap ()
106
+ .computeIfAbsent (rightKey , k -> Collections .synchronizedList (new LinkedList <>()));
107
+ list .add (new TupleWithJoinFlag (tuple ));
108
+ }
109
+ } else {
110
+ if (isEmpty (rightKey , param ) && "inner" .equalsIgnoreCase (param .getJoinType ())) {
111
+ return true ;
112
+ }
113
+ List <TupleWithJoinFlag > list = param .getHashMap ()
114
+ .computeIfAbsent (rightKey , k -> Collections .synchronizedList (new LinkedList <>()));
115
+ list .add (new TupleWithJoinFlag (tuple ));
92
116
}
93
- List <TupleWithJoinFlag > list = param .getHashMap ()
94
- .computeIfAbsent (rightKey , k -> Collections .synchronizedList (new LinkedList <>()));
95
- list .add (new TupleWithJoinFlag (tuple ));
117
+
96
118
}
97
119
return true ;
98
120
} finally {
0 commit comments