From 46799d14d693587a6341441f5f5e626565d9b6af Mon Sep 17 00:00:00 2001 From: Liang Zhao Date: Wed, 16 Nov 2022 22:30:00 +0800 Subject: [PATCH 1/2] feat(stream): implement framework of now executor --- dashboard/proto/gen/stream_plan.ts | 40 +++++++++++++- proto/stream_plan.proto | 6 ++ src/stream/src/executor/mod.rs | 22 ++++++++ src/stream/src/executor/now.rs | 88 ++++++++++++++++++++++++++++++ src/stream/src/from_proto/mod.rs | 3 + src/stream/src/from_proto/now.rs | 52 ++++++++++++++++++ 6 files changed, 210 insertions(+), 1 deletion(-) create mode 100644 src/stream/src/executor/now.rs create mode 100644 src/stream/src/from_proto/now.rs diff --git a/dashboard/proto/gen/stream_plan.ts b/dashboard/proto/gen/stream_plan.ts index 22d8287d95968..e988c9702ea70 100644 --- a/dashboard/proto/gen/stream_plan.ts +++ b/dashboard/proto/gen/stream_plan.ts @@ -701,6 +701,11 @@ export interface RowIdGenNode { rowIdIndex: number; } +export interface NowNode { + /** Persists emitted 'now'. */ + stateTable: Table | undefined; +} + export interface StreamNode { nodeBody?: | { $case: "source"; source: SourceNode } @@ -731,7 +736,8 @@ export interface StreamNode { | { $case: "sort"; sort: SortNode } | { $case: "watermarkFilter"; watermarkFilter: WatermarkFilterNode } | { $case: "dml"; dml: DmlNode } - | { $case: "rowIdGen"; rowIdGen: RowIdGenNode }; + | { $case: "rowIdGen"; rowIdGen: RowIdGenNode } + | { $case: "now"; now: NowNode }; /** * The id for the operator. This is local per mview. * TODO: should better be a uint32. @@ -3085,6 +3091,31 @@ export const RowIdGenNode = { }, }; +function createBaseNowNode(): NowNode { + return { stateTable: undefined }; +} + +export const NowNode = { + fromJSON(object: any): NowNode { + return { stateTable: isSet(object.stateTable) ? Table.fromJSON(object.stateTable) : undefined }; + }, + + toJSON(message: NowNode): unknown { + const obj: any = {}; + message.stateTable !== undefined && + (obj.stateTable = message.stateTable ? Table.toJSON(message.stateTable) : undefined); + return obj; + }, + + fromPartial, I>>(object: I): NowNode { + const message = createBaseNowNode(); + message.stateTable = (object.stateTable !== undefined && object.stateTable !== null) + ? Table.fromPartial(object.stateTable) + : undefined; + return message; + }, +}; + function createBaseStreamNode(): StreamNode { return { nodeBody: undefined, operatorId: 0, input: [], streamKey: [], appendOnly: false, identity: "", fields: [] }; } @@ -3150,6 +3181,8 @@ export const StreamNode = { ? { $case: "dml", dml: DmlNode.fromJSON(object.dml) } : isSet(object.rowIdGen) ? { $case: "rowIdGen", rowIdGen: RowIdGenNode.fromJSON(object.rowIdGen) } + : isSet(object.now) + ? { $case: "now", now: NowNode.fromJSON(object.now) } : undefined, operatorId: isSet(object.operatorId) ? Number(object.operatorId) : 0, input: Array.isArray(object?.input) @@ -3230,6 +3263,8 @@ export const StreamNode = { (obj.dml = message.nodeBody?.dml ? DmlNode.toJSON(message.nodeBody?.dml) : undefined); message.nodeBody?.$case === "rowIdGen" && (obj.rowIdGen = message.nodeBody?.rowIdGen ? RowIdGenNode.toJSON(message.nodeBody?.rowIdGen) : undefined); + message.nodeBody?.$case === "now" && + (obj.now = message.nodeBody?.now ? NowNode.toJSON(message.nodeBody?.now) : undefined); message.operatorId !== undefined && (obj.operatorId = Math.round(message.operatorId)); if (message.input) { obj.input = message.input.map((e) => @@ -3448,6 +3483,9 @@ export const StreamNode = { ) { message.nodeBody = { $case: "rowIdGen", rowIdGen: RowIdGenNode.fromPartial(object.nodeBody.rowIdGen) }; } + if (object.nodeBody?.$case === "now" && object.nodeBody?.now !== undefined && object.nodeBody?.now !== null) { + message.nodeBody = { $case: "now", now: NowNode.fromPartial(object.nodeBody.now) }; + } message.operatorId = object.operatorId ?? 0; message.input = object.input?.map((e) => StreamNode.fromPartial(e)) || []; message.streamKey = object.streamKey?.map((e) => e) || []; diff --git a/proto/stream_plan.proto b/proto/stream_plan.proto index e9103359d7c83..518afc9115dd8 100644 --- a/proto/stream_plan.proto +++ b/proto/stream_plan.proto @@ -440,6 +440,11 @@ message RowIdGenNode { uint64 row_id_index = 1; } +message NowNode { + // Persists emitted 'now'. + catalog.Table state_table = 1; +} + message StreamNode { oneof node_body { SourceNode source = 100; @@ -471,6 +476,7 @@ message StreamNode { WatermarkFilterNode watermark_filter = 126; DmlNode dml = 127; RowIdGenNode row_id_gen = 128; + NowNode now = 129; } // The id for the operator. This is local per mview. // TODO: should better be a uint32. diff --git a/src/stream/src/executor/mod.rs b/src/stream/src/executor/mod.rs index 91fe35895609c..aabdd63c06956 100644 --- a/src/stream/src/executor/mod.rs +++ b/src/stream/src/executor/mod.rs @@ -69,6 +69,7 @@ mod lookup_union; mod managed_state; mod merge; mod mview; +mod now; mod project; mod project_set; mod rearranged_chain; @@ -109,6 +110,7 @@ pub use lookup::*; pub use lookup_union::LookupUnionExecutor; pub use merge::MergeExecutor; pub use mview::*; +pub use now::NowExecutor; pub use project::ProjectExecutor; pub use project_set::*; pub use rearranged_chain::RearrangedChainExecutor; @@ -235,6 +237,15 @@ impl Barrier { } } + pub fn with_prev_epoch_for_test(epoch: u64, prev_epoch: u64) -> Self { + Self { + epoch: EpochPair::new(epoch, prev_epoch), + checkpoint: true, + mutation: Default::default(), + passed_actors: Default::default(), + } + } + #[must_use] pub fn with_mutation(self, mutation: Mutation) -> Self { Self { @@ -279,11 +290,22 @@ impl Barrier { ) } + /// Whether this barrier is for pause. + pub fn is_pause(&self) -> bool { + matches!(self.mutation.as_deref(), Some(Mutation::Pause)) + } + /// Whether this barrier is for configuration change. Used for source executor initialization. pub fn is_update(&self) -> bool { matches!(self.mutation.as_deref(), Some(Mutation::Update { .. })) } + /// Whether this barrier is for resume. Used for now executor to determine whether to yield a + /// chunk and a watermark before this barrier. + pub fn is_resume(&self) -> bool { + matches!(self.mutation.as_deref(), Some(Mutation::Resume)) + } + /// Returns the [`MergeUpdate`] if this barrier is to update the merge executors for the actor /// with `actor_id`. pub fn as_update_merge( diff --git a/src/stream/src/executor/now.rs b/src/stream/src/executor/now.rs new file mode 100644 index 0000000000000..8f7c653ad5f78 --- /dev/null +++ b/src/stream/src/executor/now.rs @@ -0,0 +1,88 @@ +// Copyright 2022 Singularity Data +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use futures::StreamExt; +use futures_async_stream::try_stream; +use risingwave_common::catalog::{Field, Schema}; +use risingwave_common::types::DataType; +use risingwave_storage::StateStore; +use tokio::sync::mpsc::UnboundedReceiver; + +use super::{ + Barrier, BoxedMessageStream, Executor, Message, PkIndices, PkIndicesRef, StreamExecutorError, +}; +use crate::common::table::state_table::StateTable; + +pub struct NowExecutor { + /// Receiver of barrier channel. + barrier_receiver: UnboundedReceiver, + + pk_indices: PkIndices, + identity: String, + schema: Schema, + state_table: StateTable, +} + +impl NowExecutor { + pub fn new( + barrier_receiver: UnboundedReceiver, + executor_id: u64, + state_table: StateTable, + ) -> Self { + let schema = Schema::new(vec![Field { + data_type: DataType::Timestamp, + name: String::from("now"), + sub_fields: vec![], + type_name: String::default(), + }]); + Self { + barrier_receiver, + pk_indices: vec![], + identity: format!("NowExecutor {:X}", executor_id), + schema, + state_table, + } + } + + #[try_stream(ok = Message, error = StreamExecutorError)] + async fn into_stream(self) { + #[allow(unused_variables)] + let Self { + barrier_receiver, + state_table, + schema, + .. + } = self; + + todo!("https://github.com/risingwavelabs/risingwave/pull/6408"); + } +} + +impl Executor for NowExecutor { + fn execute(self: Box) -> BoxedMessageStream { + self.into_stream().boxed() + } + + fn schema(&self) -> &Schema { + &self.schema + } + + fn pk_indices(&self) -> PkIndicesRef<'_> { + &self.pk_indices + } + + fn identity(&self) -> &str { + self.identity.as_str() + } +} diff --git a/src/stream/src/from_proto/mod.rs b/src/stream/src/from_proto/mod.rs index f5107b7db17e0..6ca89e32551d9 100644 --- a/src/stream/src/from_proto/mod.rs +++ b/src/stream/src/from_proto/mod.rs @@ -31,6 +31,7 @@ mod lookup; mod lookup_union; mod merge; mod mview; +mod now; mod project; mod project_set; mod row_id_gen; @@ -64,6 +65,7 @@ use self::lookup::*; use self::lookup_union::*; use self::merge::*; use self::mview::*; +use self::now::NowExecutorBuilder; use self::project::*; use self::project_set::*; use self::row_id_gen::RowIdGenExecutorBuilder; @@ -143,5 +145,6 @@ pub async fn create_executor( NodeBody::WatermarkFilter => WatermarkFilterBuilder, NodeBody::Dml => DmlExecutorBuilder, NodeBody::RowIdGen => RowIdGenExecutorBuilder, + NodeBody::Now => NowExecutorBuilder, } } diff --git a/src/stream/src/from_proto/now.rs b/src/stream/src/from_proto/now.rs new file mode 100644 index 0000000000000..b52d5fed2bae4 --- /dev/null +++ b/src/stream/src/from_proto/now.rs @@ -0,0 +1,52 @@ +// Copyright 2022 Singularity Data +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use risingwave_pb::stream_plan::NowNode; +use risingwave_storage::StateStore; +use tokio::sync::mpsc::unbounded_channel; + +use super::ExecutorBuilder; +use crate::common::table::state_table::StateTable; +use crate::error::StreamResult; +use crate::executor::{BoxedExecutor, NowExecutor}; +use crate::task::{ExecutorParams, LocalStreamManagerCore}; + +pub struct NowExecutorBuilder; + +#[async_trait::async_trait] +impl ExecutorBuilder for NowExecutorBuilder { + type Node = NowNode; + + async fn new_boxed_executor( + params: ExecutorParams, + node: &NowNode, + store: impl StateStore, + stream: &mut LocalStreamManagerCore, + ) -> StreamResult { + let (sender, barrier_receiver) = unbounded_channel(); + stream + .context + .lock_barrier_manager() + .register_sender(params.actor_context.id, sender); + + let state_table = + StateTable::from_table_catalog(node.get_state_table()?, store, None).await; + + Ok(Box::new(NowExecutor::new( + barrier_receiver, + params.executor_id, + state_table, + ))) + } +} From 498a7198d0f61046b2f6f08fdbb56a8bcfa82204 Mon Sep 17 00:00:00 2001 From: Liang Zhao Date: Wed, 30 Nov 2022 22:47:00 +0800 Subject: [PATCH 2/2] remove barrier interface --- src/stream/src/executor/mod.rs | 20 -------------------- 1 file changed, 20 deletions(-) diff --git a/src/stream/src/executor/mod.rs b/src/stream/src/executor/mod.rs index aabdd63c06956..61cb9aae0f0c0 100644 --- a/src/stream/src/executor/mod.rs +++ b/src/stream/src/executor/mod.rs @@ -237,15 +237,6 @@ impl Barrier { } } - pub fn with_prev_epoch_for_test(epoch: u64, prev_epoch: u64) -> Self { - Self { - epoch: EpochPair::new(epoch, prev_epoch), - checkpoint: true, - mutation: Default::default(), - passed_actors: Default::default(), - } - } - #[must_use] pub fn with_mutation(self, mutation: Mutation) -> Self { Self { @@ -290,22 +281,11 @@ impl Barrier { ) } - /// Whether this barrier is for pause. - pub fn is_pause(&self) -> bool { - matches!(self.mutation.as_deref(), Some(Mutation::Pause)) - } - /// Whether this barrier is for configuration change. Used for source executor initialization. pub fn is_update(&self) -> bool { matches!(self.mutation.as_deref(), Some(Mutation::Update { .. })) } - /// Whether this barrier is for resume. Used for now executor to determine whether to yield a - /// chunk and a watermark before this barrier. - pub fn is_resume(&self) -> bool { - matches!(self.mutation.as_deref(), Some(Mutation::Resume)) - } - /// Returns the [`MergeUpdate`] if this barrier is to update the merge executors for the actor /// with `actor_id`. pub fn as_update_merge(