Skip to content

Commit ac957ae

Browse files
matrivxuguangheng
authored andcommitted
[FLINK-22113][table-planner] Implement column uniqueness checking for TableSourceTable
This closes apache#17962 Co-authored-by: guanghxu <[email protected]>
1 parent fb38c99 commit ac957ae

File tree

6 files changed

+282
-59
lines changed

6 files changed

+282
-59
lines changed

flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnUniqueness.scala

+6-31
Original file line numberDiff line numberDiff line change
@@ -23,16 +23,12 @@ import org.apache.flink.table.planner.JBoolean
2323
import org.apache.flink.table.planner.expressions.PlannerNamedWindowProperty
2424
import org.apache.flink.table.planner.plan.nodes.FlinkRelNode
2525
import org.apache.flink.table.planner.plan.nodes.calcite.{Expand, Rank, WindowAggregate}
26-
import org.apache.flink.table.planner.plan.nodes.logical._
2726
import org.apache.flink.table.planner.plan.nodes.physical.batch._
2827
import org.apache.flink.table.planner.plan.nodes.physical.common.CommonPhysicalLookupJoin
2928
import org.apache.flink.table.planner.plan.nodes.physical.stream._
30-
import org.apache.flink.table.planner.plan.schema.FlinkPreparingTableBase
3129
import org.apache.flink.table.planner.plan.utils.{FlinkRelMdUtil, RankUtil}
3230
import org.apache.flink.table.runtime.operators.rank.RankType
33-
import org.apache.flink.table.sources.TableSource
3431

35-
import org.apache.calcite.plan.RelOptTable
3632
import org.apache.calcite.plan.volcano.RelSubset
3733
import org.apache.calcite.rel.`type`.RelDataType
3834
import org.apache.calcite.rel.convert.Converter
@@ -61,42 +57,21 @@ class FlinkRelMdColumnUniqueness private extends MetadataHandler[BuiltInMetadata
6157
mq: RelMetadataQuery,
6258
columns: ImmutableBitSet,
6359
ignoreNulls: Boolean): JBoolean = {
64-
areTableColumnsUnique(rel, null, rel.getTable, columns)
65-
}
66-
67-
def areColumnsUnique(
68-
rel: FlinkLogicalLegacyTableSourceScan,
69-
mq: RelMetadataQuery,
70-
columns: ImmutableBitSet,
71-
ignoreNulls: Boolean): JBoolean = {
72-
areTableColumnsUnique(rel, rel.tableSource, rel.getTable, columns)
60+
areTableColumnsUnique(rel, mq.getUniqueKeys(rel, ignoreNulls), columns)
7361
}
7462

7563
private def areTableColumnsUnique(
7664
rel: TableScan,
77-
tableSource: TableSource[_],
78-
relOptTable: RelOptTable,
65+
uniqueKeys: util.Set[ImmutableBitSet],
7966
columns: ImmutableBitSet): JBoolean = {
8067
if (columns.cardinality == 0) {
8168
return false
8269
}
8370

84-
// TODO get uniqueKeys from TableSchema of TableSource
85-
86-
relOptTable match {
87-
case table: FlinkPreparingTableBase => {
88-
val ukOptional = table.uniqueKeysSet
89-
if (ukOptional.isPresent) {
90-
if (ukOptional.get().isEmpty) {
91-
false
92-
} else {
93-
ukOptional.get().exists(columns.contains)
94-
}
95-
} else {
96-
null
97-
}
98-
}
99-
case _ => rel.getTable.isKey(columns)
71+
if (uniqueKeys != null) {
72+
uniqueKeys.exists(columns.contains) || rel.getTable.isKey(columns)
73+
} else {
74+
null
10075
}
10176
}
10277

flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueKeys.scala

-1
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@ import org.apache.calcite.sql.fun.SqlStdOperatorTable
4545
import org.apache.calcite.util.{Bug, BuiltInMethod, ImmutableBitSet, Util}
4646

4747
import java.util
48-
import java.util.Set
4948

5049
import scala.collection.JavaConversions._
5150

flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnUniquenessTest.scala

+47
Original file line numberDiff line numberDiff line change
@@ -622,4 +622,51 @@ class FlinkRelMdColumnUniquenessTest extends FlinkRelMdHandlerTestBase {
622622
}
623623
}
624624

625+
@Test
626+
def testAreColumnsUniqueOnTableSourceTable(): Unit = {
627+
Array(
628+
tableSourceTableLogicalScan,
629+
tableSourceTableFlinkLogicalScan,
630+
tableSourceTableBatchScan,
631+
tableSourceTableStreamScan
632+
)
633+
.foreach { scan =>
634+
assertTrue(mq.areColumnsUnique(scan, ImmutableBitSet.of(0, 1, 2, 3)))
635+
assertFalse(mq.areColumnsUnique(scan, ImmutableBitSet.of(1, 2)))
636+
assertTrue(mq.areColumnsUnique(scan, ImmutableBitSet.of(0, 1)))
637+
assertFalse(mq.areColumnsUnique(scan, ImmutableBitSet.of(0)))
638+
}
639+
}
640+
641+
@Test
642+
def testAreColumnsUniqueOnTablePartiallyProjectedKey(): Unit = {
643+
Array(
644+
tablePartiallyProjectedKeyLogicalScan,
645+
tablePartiallyProjectedKeyFlinkLogicalScan,
646+
tablePartiallyProjectedKeyBatchScan,
647+
tablePartiallyProjectedKeyStreamScan
648+
)
649+
.foreach { scan =>
650+
assertFalse(mq.areColumnsUnique(scan, ImmutableBitSet.of(0)))
651+
assertFalse(mq.areColumnsUnique(scan, ImmutableBitSet.of(0, 1)))
652+
assertFalse(mq.areColumnsUnique(scan, ImmutableBitSet.of(0, 1, 2)))
653+
assertFalse(mq.areColumnsUnique(scan, ImmutableBitSet.of(0, 1, 2, 3)))
654+
}
655+
}
656+
657+
@Test
658+
def testAreColumnsUniqueOntableSourceTableNonKeyNonKey(): Unit = {
659+
Array(
660+
tableSourceTableNonKeyLogicalScan,
661+
tableSourceTableNonKeyFlinkLogicalScan,
662+
tableSourceTableNonKeyBatchScan,
663+
tableSourceTableNonKeyStreamScan
664+
)
665+
.foreach { scan =>
666+
assertFalse(mq.areColumnsUnique(scan, ImmutableBitSet.of(0)))
667+
assertFalse(mq.areColumnsUnique(scan, ImmutableBitSet.of(0, 1)))
668+
assertFalse(mq.areColumnsUnique(scan, ImmutableBitSet.of(0, 1, 2)))
669+
assertFalse(mq.areColumnsUnique(scan, ImmutableBitSet.of(0, 1, 2, 3)))
670+
}
671+
}
625672
}

flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala

+104-1
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ import org.apache.flink.table.planner.plan.nodes.FlinkConventions
4040
import org.apache.flink.table.planner.plan.nodes.logical._
4141
import org.apache.flink.table.planner.plan.nodes.physical.batch._
4242
import org.apache.flink.table.planner.plan.nodes.physical.stream._
43-
import org.apache.flink.table.planner.plan.schema.{FlinkPreparingTableBase, IntermediateRelTable}
43+
import org.apache.flink.table.planner.plan.schema.{FlinkPreparingTableBase, IntermediateRelTable, TableSourceTable}
4444
import org.apache.flink.table.planner.plan.stream.sql.join.TestTemporalTable
4545
import org.apache.flink.table.planner.plan.utils._
4646
import org.apache.flink.table.planner.utils.Top3
@@ -171,6 +171,37 @@ class FlinkRelMdHandlerTestBase {
171171
protected lazy val empStreamScan: StreamPhysicalDataStreamScan =
172172
createDataStreamScan(ImmutableList.of("emp"), streamPhysicalTraits)
173173

174+
protected lazy val tableSourceTableLogicalScan: LogicalTableScan =
175+
createTableSourceTable(ImmutableList.of("TableSourceTable1"), logicalTraits)
176+
protected lazy val tableSourceTableFlinkLogicalScan: FlinkLogicalDataStreamTableScan =
177+
createTableSourceTable(ImmutableList.of("TableSourceTable1"), flinkLogicalTraits)
178+
protected lazy val tableSourceTableBatchScan: BatchPhysicalBoundedStreamScan =
179+
createTableSourceTable(ImmutableList.of("TableSourceTable1"), batchPhysicalTraits)
180+
protected lazy val tableSourceTableStreamScan: StreamPhysicalDataStreamScan =
181+
createTableSourceTable(ImmutableList.of("TableSourceTable1"), streamPhysicalTraits)
182+
183+
protected lazy val tablePartiallyProjectedKeyLogicalScan: LogicalTableScan =
184+
createTableSourceTable(ImmutableList.of("projected_table_source_table_with_partial_pk"),
185+
logicalTraits)
186+
protected lazy val tablePartiallyProjectedKeyFlinkLogicalScan: FlinkLogicalDataStreamTableScan =
187+
createTableSourceTable(ImmutableList.of("projected_table_source_table_with_partial_pk"),
188+
flinkLogicalTraits)
189+
protected lazy val tablePartiallyProjectedKeyBatchScan: BatchPhysicalBoundedStreamScan =
190+
createTableSourceTable(ImmutableList.of("projected_table_source_table_with_partial_pk"),
191+
batchPhysicalTraits)
192+
protected lazy val tablePartiallyProjectedKeyStreamScan: StreamPhysicalDataStreamScan =
193+
createTableSourceTable(ImmutableList.of("projected_table_source_table_with_partial_pk"),
194+
streamPhysicalTraits)
195+
196+
protected lazy val tableSourceTableNonKeyLogicalScan: LogicalTableScan =
197+
createTableSourceTable(ImmutableList.of("TableSourceTable3"), logicalTraits)
198+
protected lazy val tableSourceTableNonKeyFlinkLogicalScan: FlinkLogicalDataStreamTableScan =
199+
createTableSourceTable(ImmutableList.of("TableSourceTable3"), flinkLogicalTraits)
200+
protected lazy val tableSourceTableNonKeyBatchScan: BatchPhysicalBoundedStreamScan =
201+
createTableSourceTable(ImmutableList.of("TableSourceTable3"), batchPhysicalTraits)
202+
protected lazy val tableSourceTableNonKeyStreamScan: StreamPhysicalDataStreamScan =
203+
createTableSourceTable(ImmutableList.of("TableSourceTable3"), streamPhysicalTraits)
204+
174205
private lazy val valuesType = relBuilder.getTypeFactory
175206
.builder()
176207
.add("a", SqlTypeName.BIGINT)
@@ -2727,6 +2758,51 @@ class FlinkRelMdHandlerTestBase {
27272758
}
27282759
}
27292760

2761+
// select * from TableSourceTable1
2762+
// left join TableSourceTable2 on TableSourceTable1.b = TableSourceTable2.b
2763+
protected lazy val logicalLeftJoinOnContainedUniqueKeys: RelNode = relBuilder
2764+
.scan("TableSourceTable1")
2765+
.scan("TableSourceTable2")
2766+
.join(
2767+
JoinRelType.LEFT,
2768+
relBuilder.call(
2769+
EQUALS,
2770+
relBuilder.field(2, 0, 1),
2771+
relBuilder.field(2, 1, 1)
2772+
)
2773+
)
2774+
.build
2775+
2776+
// select * from TableSourceTable1
2777+
// left join TableSourceTable2 on TableSourceTable1.a = TableSourceTable2.a
2778+
protected lazy val logicalLeftJoinOnDisjointUniqueKeys: RelNode = relBuilder
2779+
.scan("TableSourceTable1")
2780+
.scan("TableSourceTable2")
2781+
.join(
2782+
JoinRelType.LEFT,
2783+
relBuilder.call(
2784+
EQUALS,
2785+
relBuilder.field(2, 0, 0),
2786+
relBuilder.field(2, 1, 0)
2787+
)
2788+
)
2789+
.build
2790+
2791+
// select * from TableSourceTable1
2792+
// left join TableSourceTable3 on TableSourceTable1.a = TableSourceTable3.a
2793+
protected lazy val logicalLeftJoinWithNoneKeyTableUniqueKeys: RelNode = relBuilder
2794+
.scan("TableSourceTable1")
2795+
.scan("TableSourceTable3")
2796+
.join(
2797+
JoinRelType.LEFT,
2798+
relBuilder.call(
2799+
EQUALS,
2800+
relBuilder.field(2, 0, 0),
2801+
relBuilder.field(2, 1, 0)
2802+
)
2803+
)
2804+
.build
2805+
27302806
protected def createDataStreamScan[T](
27312807
tableNames: util.List[String], traitSet: RelTraitSet): T = {
27322808
val table = relBuilder
@@ -2754,6 +2830,33 @@ class FlinkRelMdHandlerTestBase {
27542830
scan.asInstanceOf[T]
27552831
}
27562832

2833+
protected def createTableSourceTable[T](
2834+
tableNames: util.List[String], traitSet: RelTraitSet): T = {
2835+
val table = relBuilder
2836+
.getRelOptSchema
2837+
.asInstanceOf[CalciteCatalogReader]
2838+
.getTable(tableNames)
2839+
.asInstanceOf[TableSourceTable]
2840+
val conventionTrait = traitSet.getTrait(ConventionTraitDef.INSTANCE)
2841+
val scan = conventionTrait match {
2842+
case Convention.NONE =>
2843+
relBuilder.clear()
2844+
val scan = relBuilder.scan(tableNames).build()
2845+
scan.copy(traitSet, scan.getInputs)
2846+
case FlinkConventions.LOGICAL =>
2847+
new FlinkLogicalDataStreamTableScan(
2848+
cluster, traitSet, Collections.emptyList[RelHint](), table)
2849+
case FlinkConventions.BATCH_PHYSICAL =>
2850+
new BatchPhysicalBoundedStreamScan(
2851+
cluster, traitSet, Collections.emptyList[RelHint](), table, table.getRowType)
2852+
case FlinkConventions.STREAM_PHYSICAL =>
2853+
new StreamPhysicalDataStreamScan(
2854+
cluster, traitSet, Collections.emptyList[RelHint](), table, table.getRowType)
2855+
case _ => throw new TableException(s"Unsupported convention trait: $conventionTrait")
2856+
}
2857+
scan.asInstanceOf[T]
2858+
}
2859+
27572860
protected def createLiteralList(
27582861
rowType: RelDataType,
27592862
literalValues: Seq[String]): util.List[RexLiteral] = {

flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueKeysTest.scala

+16
Original file line numberDiff line numberDiff line change
@@ -314,6 +314,22 @@ class FlinkRelMdUniqueKeysTest extends FlinkRelMdHandlerTestBase {
314314
assertNull(mq.getUniqueKeys(testRel))
315315
}
316316

317+
@Test
318+
def testGetUniqueKeysOnTableScanTable(): Unit = {
319+
assertEquals(
320+
uniqueKeys(Array(0, 1), Array(0, 1, 5)),
321+
mq.getUniqueKeys(logicalLeftJoinOnContainedUniqueKeys).toSet
322+
)
323+
assertEquals(
324+
uniqueKeys(Array(0, 1, 5)),
325+
mq.getUniqueKeys(logicalLeftJoinOnDisjointUniqueKeys).toSet
326+
)
327+
assertEquals(
328+
uniqueKeys(),
329+
mq.getUniqueKeys(logicalLeftJoinWithNoneKeyTableUniqueKeys).toSet
330+
)
331+
}
332+
317333
private def uniqueKeys(keys: Array[Int]*): Set[ImmutableBitSet] = {
318334
keys.map(k => ImmutableBitSet.of(k: _*)).toSet
319335
}

0 commit comments

Comments
 (0)