Skip to content

Commit

Permalink
Key variable will be shown as Unknown instead of String
Browse files Browse the repository at this point in the history
  • Loading branch information
Szymon Bogusz committed Feb 14, 2025
1 parent d67fd37 commit 3f4bfbf
Show file tree
Hide file tree
Showing 3 changed files with 3 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ import org.apache.flink.api.common.functions.RichFlatMapFunction
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.util.Collector
import pl.touk.nussknacker.engine.api.context.{ProcessCompilationError, ValidationContext}
import pl.touk.nussknacker.engine.api.typed.typing.Typed
import pl.touk.nussknacker.engine.api._
import pl.touk.nussknacker.engine.api.typed.typing.Unknown
import pl.touk.nussknacker.engine.flink.api.process.{
FlinkCustomNodeContext,
FlinkLazyParameterFunctionHelper,
Expand Down Expand Up @@ -43,15 +43,6 @@ object keyed {
)
}

type GenericKeyedValue[K, V] = KeyedValue[K, V]

object GenericKeyedValue {

def apply[K, V](key: K, value: V): GenericKeyedValue[K, V] = GenericKeyedValue(key, value)

def unapply[K, V](keyedValue: GenericKeyedValue[K, V]): Option[(K, V)] = KeyedValue.unapply(keyedValue)
}

abstract class BaseKeyedValueMapper[OutputKey <: AnyRef: TypeTag, OutputValue <: AnyRef: TypeTag]
extends RichFlatMapFunction[Context, ValueWithContext[KeyedValue[OutputKey, OutputValue]]]
with LazyParameterInterpreterFunction {
Expand Down Expand Up @@ -107,26 +98,6 @@ object keyed {

}

class GenericKeyOnlyMapper[K <: AnyRef: TypeTag](
protected val lazyParameterHelper: FlinkLazyParameterFunctionHelper,
key: LazyParameter[K]
) extends RichFlatMapFunction[Context, ValueWithContext[K]]
with LazyParameterInterpreterFunction {

protected implicit def toEvaluateFunctionConverterImpl: ToEvaluateFunctionConverter = toEvaluateFunctionConverter

private lazy val interpreter = toEvaluateFunctionConverter.toEvaluateFunction(key)

protected def interpret(ctx: Context): K = interpreter(ctx)

override def flatMap(ctx: Context, out: Collector[ValueWithContext[K]]): Unit = {
collectHandlingErrors(ctx, out) {
ValueWithContext(interpret(ctx), ctx)
}
}

}

class StringKeyedValueMapper[T <: AnyRef: TypeTag](
protected val lazyParameterHelper: FlinkLazyParameterFunctionHelper,
key: LazyParameter[CharSequence],
Expand Down Expand Up @@ -181,7 +152,7 @@ object keyed {
def contextTransformation(ctx: ValidationContext)(
implicit nodeId: NodeId
): ValidatedNel[ProcessCompilationError, ValidationContext] =
ctx.withVariableOverriden(VariableConstants.KeyVariableName, Typed[String], None)
ctx.withVariableOverriden(VariableConstants.KeyVariableName, Unknown, None)

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,9 @@ import org.apache.flink.streaming.api.datastream.{DataStream, KeyedStream, Singl
import pl.touk.nussknacker.engine.api.{Context, LazyParameter, ValueWithContext}
import pl.touk.nussknacker.engine.flink.api.compat.ExplicitUidInOperatorsSupport
import pl.touk.nussknacker.engine.flink.api.process.FlinkCustomNodeContext
import pl.touk.nussknacker.engine.flink.util.keyed.{GenericKeyOnlyMapper, GenericKeyedValueMapper, StringKeyOnlyMapper}
import pl.touk.nussknacker.engine.flink.util.keyed.{GenericKeyedValueMapper, StringKeyOnlyMapper}
import pl.touk.nussknacker.engine.util.KeyedValue

import scala.reflect.ClassTag
import scala.reflect.runtime.universe.TypeTag

object richflink {
Expand All @@ -24,16 +23,6 @@ object richflink {
)
.keyBy((k: ValueWithContext[String]) => k.value)

def groupBy[K <: AnyRef: TypeTag: ClassTag](
groupBy: LazyParameter[K]
)(implicit ctx: FlinkCustomNodeContext): KeyedStream[ValueWithContext[K], K] =
dataStream
.flatMap(
new GenericKeyOnlyMapper(ctx.lazyParameterHelper, groupBy),
ctx.valueWithContextInfo.forClass[K]
)
.keyBy((k: ValueWithContext[K]) => k.value)

def groupByWithValue[T <: AnyRef: TypeTag, K <: AnyRef: TypeTag](
groupBy: LazyParameter[K],
value: LazyParameter[T]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package pl.touk.nussknacker.engine.flink.util.transformer.aggregate
import org.apache.flink.api.common.functions.AggregateFunction
import pl.touk.nussknacker.engine.api.ValueWithContext
import pl.touk.nussknacker.engine.api.typed.typing.TypingResult
import pl.touk.nussknacker.engine.flink.util.keyed.StringKeyedValue
import pl.touk.nussknacker.engine.util.KeyedValue

/**
Expand Down

0 comments on commit 3f4bfbf

Please sign in to comment.