From 81e811d01b9bc5af05852a690f6324675b779cd9 Mon Sep 17 00:00:00 2001 From: Alexey Novakov Date: Wed, 13 Sep 2023 16:11:56 +0200 Subject: [PATCH] add savepoint rewrite example --- build.sbt | 6 +- src/main/avro/WordCountState.avsc | 10 +- .../com/github/novakovalexey/WordCount.scala | 107 +++++++++++++++--- 3 files changed, 101 insertions(+), 22 deletions(-) diff --git a/build.sbt b/build.sbt index c529250..1f58062 100644 --- a/build.sbt +++ b/build.sbt @@ -3,14 +3,14 @@ Global / onChangedBuildSource := ReloadOnSourceChanges ThisBuild / organization := "com.github.novakov-alexey" ThisBuild / scalaVersion := "2.12.7" - lazy val flinkVersion = "1.15.2" -//lazy val flinkVersion = "1.14.6" +lazy val flinkVersion = "1.15.2" +// lazy val flinkVersion = "1.14.6" lazy val root = (project in file(".")).settings( name := "scala-212-savepoint", libraryDependencies ++= Seq( "org.apache.flink" % "flink-streaming-java" % flinkVersion % Provided, - //"org.apache.flink" %% "flink-streaming-scala" % flinkVersion % Provided, + "org.apache.flink" %% "flink-streaming-scala" % flinkVersion % Provided, "org.apache.flink" % "flink-state-processor-api" % flinkVersion % Provided, "org.apache.flink" % "flink-clients" % flinkVersion % Provided, "org.apache.flink" % "flink-avro" % flinkVersion, diff --git a/src/main/avro/WordCountState.avsc b/src/main/avro/WordCountState.avsc index 6a0578f..67938cb 100644 --- a/src/main/avro/WordCountState.avsc +++ b/src/main/avro/WordCountState.avsc @@ -13,7 +13,15 @@ }, { "name": "changed", - "type": "long" + "type": "long" + }, + { + "name": "lastCount", + "type": [ + "null", + "int" + ], + "default": null } ] } \ No newline at end of file diff --git a/src/main/scala/com/github/novakovalexey/WordCount.scala b/src/main/scala/com/github/novakovalexey/WordCount.scala index 6b330af..fbfcb3b 100644 --- a/src/main/scala/com/github/novakovalexey/WordCount.scala +++ b/src/main/scala/com/github/novakovalexey/WordCount.scala @@ -2,19 +2,35 @@ package com.github.novakovalexey import org.apache.flink.api.common.state.{MapState, MapStateDescriptor} import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.java.ExecutionEnvironment +import org.apache.flink.api.java.functions.KeySelector +import org.apache.flink.api.java.operators.DataSource +import org.apache.flink.runtime.state.{ + FunctionInitializationContext, + FunctionSnapshotContext +} +import org.apache.flink.state.api.{ + BootstrapTransformation, + SavepointReader, + SavepointWriter +} +import org.apache.flink.state.api.functions.{ + KeyedStateBootstrapFunction, + StateBootstrapFunction +} +import org.apache.flink.api.scala.ExecutionEnvironment +// import org.apache.flink.api.java.ExecutionEnvironment import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment -//import org.apache.flink.streaming.api.scala._ +//import org.apache.flink.streaming.api.scala.createTypeInformation import org.apache.flink.streaming.api.functions.KeyedProcessFunction import org.apache.flink.streaming.api.functions.source.FromIteratorFunction import org.apache.flink.configuration.Configuration import org.apache.flink.runtime.state.hashmap.HashMapStateBackend import org.apache.flink.state.api.Savepoint +import org.apache.flink.state.api.OperatorTransformation import org.apache.flink.state.api.functions.KeyedStateReaderFunction import org.apache.flink.state.api.functions.KeyedStateReaderFunction.Context import org.apache.flink.util.Collector import WordCounter._ -import org.apache.flink.api.java.functions.KeySelector import scala.util.Random import scala.collection.JavaConverters._ @@ -36,12 +52,18 @@ object FakeSource { object WordCounter { val stateDescriptor = - new MapStateDescriptor("wordCounter", classOf[Int], classOf[WordCountState]) + new MapStateDescriptor( + "wordCounter", + TypeInformation.of(classOf[Int]), + // createTypeInformation[Int], + TypeInformation.of(classOf[WordCountState]) + // createTypeInformation[WordCountState] + ) } class WordCounter extends KeyedProcessFunction[Int, Event, Event] { - // MapState is used here to check serialization. ValueState would be more efficient - @transient var countState: MapState[Int, WordCountState] = _ + // MapState is used here to check serialization. ValueState would be enough to count numbers + private var countState: MapState[Int, WordCountState] = _ override def open(parameters: Configuration): Unit = countState = getRuntimeContext.getMapState(stateDescriptor) @@ -60,7 +82,8 @@ class WordCounter extends KeyedProcessFunction[Int, Event, Event] { WordCountState( event.number, event.count + count, - System.currentTimeMillis + System.currentTimeMillis, + Some(count) ) ) collector.collect(event.copy(count = count)) @@ -69,13 +92,17 @@ class WordCounter extends KeyedProcessFunction[Int, Event, Event] { object Main extends App { val conf = new Configuration() - //conf.setString("state.savepoints.dir", "file:///tmp/savepoints") - // val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf) - val env = StreamExecutionEnvironment.getExecutionEnvironment + conf.setString("state.savepoints.dir", "file:///tmp/savepoints") + val env = StreamExecutionEnvironment.getExecutionEnvironment(conf) + // env.getConfig.disableGenericTypes() + + val eventTi = TypeInformation.of(classOf[Event]) + env .addSource(FakeSource.iterator) - .returns(TypeInformation.of(classOf[Event])) - .keyBy(_.number, TypeInformation.of(classOf[Int])) + .returns(eventTi) + .keyBy((e: Event) => e.number, TypeInformation.of(classOf[Int])) + // .keyBy(_.number, createTypeInformation[Int]) .process(new WordCounter()) .uid("word-count") .print() @@ -93,25 +120,69 @@ class ReaderFunction extends KeyedStateReaderFunction[Int, WordCountState] { key: Int, ctx: Context, out: Collector[WordCountState] - ): Unit = + ): Unit = { + val state = countState.get(key) out.collect( - WordCountState(key, countState.get(key).count, System.currentTimeMillis) + WordCountState( + key, + state.count, + System.currentTimeMillis, + state.lastCount + ) ) + } } object ReadState extends App { - val env = ExecutionEnvironment.getExecutionEnvironment - val savepoint = Savepoint.load( + val env = StreamExecutionEnvironment.getExecutionEnvironment + val oldSavepointPath = "/tmp/flink-savepoints/savepoint-7fb950-384cc7627885" + val savepoint = SavepointReader.read( env, - "/tmp/flink-savepoints/savepoint-5103b1-065fc210b104", + oldSavepointPath, new HashMapStateBackend() ) + val keyedState = savepoint.readKeyedState( "word-count", new ReaderFunction(), TypeInformation.of(classOf[Int]), + // createTypeInformation[Int], // comes from flink-scala-api TypeInformation.of(classOf[WordCountState]) + // createTypeInformation[WordCountState] // comes from flink-scala-api ) - val res = keyedState.collect().asScala + val res = keyedState.executeAndCollect().asScala println(res.mkString("\n")) + + val transformation = OperatorTransformation + .bootstrapWith(keyedState) + .keyBy( + (value: WordCountState) => value.key, + TypeInformation.of(classOf[Int]) + ) + .transform(new KeyedStateBootstrapFunction[Int, WordCountState] { + private var countState: MapState[Int, WordCountState] = _ + + override def open(parameters: Configuration): Unit = { + val descriptor = new MapStateDescriptor( + "wordCounter", + TypeInformation.of(classOf[Int]), + TypeInformation.of(classOf[WordCountState]) + ) // this target state descriptor, which can be used to use different serializers / type info + countState = getRuntimeContext.getMapState(descriptor) + } + + override def processElement( + value: WordCountState, + ctx: KeyedStateBootstrapFunction[Int, WordCountState]#Context + ): Unit = + countState.put(value.key, value) + }) + + SavepointWriter + .fromExistingSavepoint(oldSavepointPath) + .removeOperator("word-count") + .withOperator("word-count", transformation) + .write(oldSavepointPath.replaceAll("savepoint-", "new-savepoint-")) + + env.execute() }