Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,14 @@
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;

import org.apache.calcite.rex.RexNode;

import javax.annotation.Nullable;

import java.util.List;
import java.util.Map;
import java.util.Optional;

Expand All @@ -44,6 +46,9 @@ public class DeltaJoinSpec {
public static final String FIELD_NAME_LOOKUP_TABLE = "lookupTable";
public static final String FIELD_NAME_LOOKUP_KEYS = "lookupKeys";
public static final String FIELD_NAME_REMAINING_CONDITION = "remainingCondition";
public static final String FIELD_NAME_PROJECTION_ON_TEMPORAL_TABLE =
"projectionOnTemporalTable";
public static final String FIELD_NAME_FILTER_ON_TEMPORAL_TABLE = "filterOnTemporalTable";

@JsonProperty(FIELD_NAME_LOOKUP_TABLE)
private final TemporalTableSourceSpec lookupTable;
Expand All @@ -56,15 +61,29 @@ public class DeltaJoinSpec {
@JsonProperty(FIELD_NAME_REMAINING_CONDITION)
private final @Nullable RexNode remainingCondition;

@JsonProperty(FIELD_NAME_PROJECTION_ON_TEMPORAL_TABLE)
@JsonInclude(JsonInclude.Include.NON_NULL)
private final @Nullable List<RexNode> projectionOnTemporalTable;

@JsonProperty(FIELD_NAME_FILTER_ON_TEMPORAL_TABLE)
@JsonInclude(JsonInclude.Include.NON_NULL)
private final @Nullable RexNode filterOnTemporalTable;

@JsonCreator
public DeltaJoinSpec(
@JsonProperty(FIELD_NAME_LOOKUP_TABLE) TemporalTableSourceSpec lookupTable,
@JsonProperty(FIELD_NAME_LOOKUP_KEYS)
Map<Integer, FunctionCallUtil.FunctionParam> lookupKeyMap,
@JsonProperty(FIELD_NAME_REMAINING_CONDITION) @Nullable RexNode remainingCondition) {
@JsonProperty(FIELD_NAME_REMAINING_CONDITION) @Nullable RexNode remainingCondition,
@JsonProperty(FIELD_NAME_PROJECTION_ON_TEMPORAL_TABLE) @Nullable
List<RexNode> projectionOnTemporalTable,
@JsonProperty(FIELD_NAME_FILTER_ON_TEMPORAL_TABLE) @Nullable
RexNode filterOnTemporalTable) {
this.lookupKeyMap = lookupKeyMap;
this.lookupTable = lookupTable;
this.remainingCondition = remainingCondition;
this.projectionOnTemporalTable = projectionOnTemporalTable;
this.filterOnTemporalTable = filterOnTemporalTable;
}

@JsonIgnore
Expand All @@ -81,4 +100,14 @@ public Map<Integer, FunctionCallUtil.FunctionParam> getLookupKeyMap() {
public Optional<RexNode> getRemainingCondition() {
return Optional.ofNullable(remainingCondition);
}

@JsonIgnore
public Optional<List<RexNode>> getProjectionOnTemporalTable() {
return Optional.ofNullable(projectionOnTemporalTable);
}

@JsonIgnore
public Optional<RexNode> getFilterOnTemporalTable() {
return Optional.ofNullable(filterOnTemporalTable);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.table.planner.plan.nodes.exec.stream;

import org.apache.flink.FlinkVersion;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.ReadableConfig;
Expand Down Expand Up @@ -54,6 +55,7 @@
import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
import org.apache.flink.table.planner.utils.ShortcutUtils;
import org.apache.flink.table.runtime.collector.TableFunctionResultFuture;
import org.apache.flink.table.runtime.generated.GeneratedFunction;
import org.apache.flink.table.runtime.generated.GeneratedResultFuture;
import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
import org.apache.flink.table.runtime.operators.StreamingDeltaJoinOperatorFactory;
Expand Down Expand Up @@ -389,6 +391,7 @@ private AsyncDeltaJoinRunner createAsyncDeltaJoinRunner(
boolean treatRightAsLookupTable) {
RelOptTable lookupTable = treatRightAsLookupTable ? rightTempTable : leftTempTable;
RowType streamSideType = treatRightAsLookupTable ? leftStreamSideType : rightStreamSideType;
RowType lookupSideType = treatRightAsLookupTable ? rightStreamSideType : leftStreamSideType;

AsyncTableFunction<?> lookupSideAsyncTableFunction =
getUnwrappedAsyncLookupFunction(lookupTable, lookupKeys.keySet(), classLoader);
Expand Down Expand Up @@ -454,11 +457,36 @@ private AsyncDeltaJoinRunner createAsyncDeltaJoinRunner(
JavaScalaConversionUtil.toScala(newCond));
}

GeneratedFunction<FlatMapFunction<RowData, RowData>> lookupSideGeneratedCalc = null;
if ((treatRightAsLookupTable
&& lookupRightTableJoinSpec.getProjectionOnTemporalTable().isPresent())
|| (!treatRightAsLookupTable
&& lookupLeftTableJoinSpec.getProjectionOnTemporalTable().isPresent())) {
// a projection or filter after lookup table
List<RexNode> projectionOnTemporalTable =
treatRightAsLookupTable
? lookupRightTableJoinSpec.getProjectionOnTemporalTable().get()
: lookupLeftTableJoinSpec.getProjectionOnTemporalTable().get();
RexNode filterOnTemporalTable =
treatRightAsLookupTable
? lookupRightTableJoinSpec.getFilterOnTemporalTable().orElse(null)
: lookupLeftTableJoinSpec.getFilterOnTemporalTable().orElse(null);
lookupSideGeneratedCalc =
LookupJoinCodeGenerator.generateCalcMapFunction(
config,
planner.getFlinkContext().getClassLoader(),
JavaScalaConversionUtil.toScala(projectionOnTemporalTable),
filterOnTemporalTable,
lookupSideType,
lookupTableSourceRowType);
}

return new AsyncDeltaJoinRunner(
lookupSideGeneratedFuncWithType.tableFunc(),
(DataStructureConverter<RowData, Object>) lookupSideFetcherConverter,
lookupSideGeneratedCalc,
lookupSideGeneratedResultFuture,
InternalSerializers.create(lookupTableSourceRowType),
InternalSerializers.create(lookupSideType),
leftJoinKeySelector,
leftUpsertKeySelector,
rightJoinKeySelector,
Expand Down
Loading