diff --git a/dashboard/proto/gen/stream_plan.ts b/dashboard/proto/gen/stream_plan.ts index 6edbf50e020c9..4273c0313cebf 100644 --- a/dashboard/proto/gen/stream_plan.ts +++ b/dashboard/proto/gen/stream_plan.ts @@ -702,6 +702,11 @@ export interface RowIdGenNode { rowIdIndex: number; } +export interface NowNode { + /** Persists emitted 'now'. */ + stateTable: Table | undefined; +} + export interface StreamNode { nodeBody?: | { $case: "source"; source: SourceNode } @@ -732,7 +737,8 @@ export interface StreamNode { | { $case: "sort"; sort: SortNode } | { $case: "watermarkFilter"; watermarkFilter: WatermarkFilterNode } | { $case: "dml"; dml: DmlNode } - | { $case: "rowIdGen"; rowIdGen: RowIdGenNode }; + | { $case: "rowIdGen"; rowIdGen: RowIdGenNode } + | { $case: "now"; now: NowNode }; /** * The id for the operator. This is local per mview. * TODO: should better be a uint32. @@ -3090,6 +3096,31 @@ export const RowIdGenNode = { }, }; +function createBaseNowNode(): NowNode { + return { stateTable: undefined }; +} + +export const NowNode = { + fromJSON(object: any): NowNode { + return { stateTable: isSet(object.stateTable) ? Table.fromJSON(object.stateTable) : undefined }; + }, + + toJSON(message: NowNode): unknown { + const obj: any = {}; + message.stateTable !== undefined && + (obj.stateTable = message.stateTable ? Table.toJSON(message.stateTable) : undefined); + return obj; + }, + + fromPartial, I>>(object: I): NowNode { + const message = createBaseNowNode(); + message.stateTable = (object.stateTable !== undefined && object.stateTable !== null) + ? Table.fromPartial(object.stateTable) + : undefined; + return message; + }, +}; + function createBaseStreamNode(): StreamNode { return { nodeBody: undefined, operatorId: 0, input: [], streamKey: [], appendOnly: false, identity: "", fields: [] }; } @@ -3155,6 +3186,8 @@ export const StreamNode = { ? { $case: "dml", dml: DmlNode.fromJSON(object.dml) } : isSet(object.rowIdGen) ? { $case: "rowIdGen", rowIdGen: RowIdGenNode.fromJSON(object.rowIdGen) } + : isSet(object.now) + ? { $case: "now", now: NowNode.fromJSON(object.now) } : undefined, operatorId: isSet(object.operatorId) ? Number(object.operatorId) : 0, input: Array.isArray(object?.input) @@ -3235,6 +3268,8 @@ export const StreamNode = { (obj.dml = message.nodeBody?.dml ? DmlNode.toJSON(message.nodeBody?.dml) : undefined); message.nodeBody?.$case === "rowIdGen" && (obj.rowIdGen = message.nodeBody?.rowIdGen ? RowIdGenNode.toJSON(message.nodeBody?.rowIdGen) : undefined); + message.nodeBody?.$case === "now" && + (obj.now = message.nodeBody?.now ? NowNode.toJSON(message.nodeBody?.now) : undefined); message.operatorId !== undefined && (obj.operatorId = Math.round(message.operatorId)); if (message.input) { obj.input = message.input.map((e) => @@ -3453,6 +3488,9 @@ export const StreamNode = { ) { message.nodeBody = { $case: "rowIdGen", rowIdGen: RowIdGenNode.fromPartial(object.nodeBody.rowIdGen) }; } + if (object.nodeBody?.$case === "now" && object.nodeBody?.now !== undefined && object.nodeBody?.now !== null) { + message.nodeBody = { $case: "now", now: NowNode.fromPartial(object.nodeBody.now) }; + } message.operatorId = object.operatorId ?? 0; message.input = object.input?.map((e) => StreamNode.fromPartial(e)) || []; message.streamKey = object.streamKey?.map((e) => e) || []; diff --git a/proto/stream_plan.proto b/proto/stream_plan.proto index e4533f96be5da..ce24c9f0e2b1f 100644 --- a/proto/stream_plan.proto +++ b/proto/stream_plan.proto @@ -442,6 +442,11 @@ message RowIdGenNode { uint64 row_id_index = 1; } +message NowNode { + // Persists emitted 'now'. + catalog.Table state_table = 1; +} + message StreamNode { oneof node_body { SourceNode source = 100; @@ -473,6 +478,7 @@ message StreamNode { WatermarkFilterNode watermark_filter = 126; DmlNode dml = 127; RowIdGenNode row_id_gen = 128; + NowNode now = 129; } // The id for the operator. This is local per mview. // TODO: should better be a uint32. diff --git a/src/common/src/util/epoch.rs b/src/common/src/util/epoch.rs index bece2d7ce8a53..d9f9bb25858af 100644 --- a/src/common/src/util/epoch.rs +++ b/src/common/src/util/epoch.rs @@ -18,10 +18,12 @@ use std::time::{Duration, SystemTime}; use parse_display::Display; +static UNIX_SINGULARITY_DATE_SEC: u64 = 1_617_235_200; + /// `UNIX_SINGULARITY_DATE_EPOCH` represents the singularity date of the UNIX epoch: /// 2021-04-01T00:00:00Z. pub static UNIX_SINGULARITY_DATE_EPOCH: LazyLock = - LazyLock::new(|| SystemTime::UNIX_EPOCH + Duration::from_secs(1_617_235_200)); + LazyLock::new(|| SystemTime::UNIX_EPOCH + Duration::from_secs(UNIX_SINGULARITY_DATE_SEC)); #[derive(Clone, Copy, Debug, Display, PartialEq, Eq, PartialOrd, Ord, Hash)] pub struct Epoch(pub u64); @@ -72,6 +74,10 @@ impl Epoch { .as_millis() as u64 } + pub fn as_unix_millis(&self) -> u64 { + UNIX_SINGULARITY_DATE_SEC * 1000 + self.physical_time() + } + /// Returns the epoch in real system time. pub fn as_system_time(&self) -> SystemTime { *UNIX_SINGULARITY_DATE_EPOCH + Duration::from_millis(self.physical_time()) diff --git a/src/stream/src/executor/mod.rs b/src/stream/src/executor/mod.rs index 91fe35895609c..aabdd63c06956 100644 --- a/src/stream/src/executor/mod.rs +++ b/src/stream/src/executor/mod.rs @@ -69,6 +69,7 @@ mod lookup_union; mod managed_state; mod merge; mod mview; +mod now; mod project; mod project_set; mod rearranged_chain; @@ -109,6 +110,7 @@ pub use lookup::*; pub use lookup_union::LookupUnionExecutor; pub use merge::MergeExecutor; pub use mview::*; +pub use now::NowExecutor; pub use project::ProjectExecutor; pub use project_set::*; pub use rearranged_chain::RearrangedChainExecutor; @@ -235,6 +237,15 @@ impl Barrier { } } + pub fn with_prev_epoch_for_test(epoch: u64, prev_epoch: u64) -> Self { + Self { + epoch: EpochPair::new(epoch, prev_epoch), + checkpoint: true, + mutation: Default::default(), + passed_actors: Default::default(), + } + } + #[must_use] pub fn with_mutation(self, mutation: Mutation) -> Self { Self { @@ -279,11 +290,22 @@ impl Barrier { ) } + /// Whether this barrier is for pause. + pub fn is_pause(&self) -> bool { + matches!(self.mutation.as_deref(), Some(Mutation::Pause)) + } + /// Whether this barrier is for configuration change. Used for source executor initialization. pub fn is_update(&self) -> bool { matches!(self.mutation.as_deref(), Some(Mutation::Update { .. })) } + /// Whether this barrier is for resume. Used for now executor to determine whether to yield a + /// chunk and a watermark before this barrier. + pub fn is_resume(&self) -> bool { + matches!(self.mutation.as_deref(), Some(Mutation::Resume)) + } + /// Returns the [`MergeUpdate`] if this barrier is to update the merge executors for the actor /// with `actor_id`. pub fn as_update_merge( diff --git a/src/stream/src/executor/now.rs b/src/stream/src/executor/now.rs new file mode 100644 index 0000000000000..612589946b3c5 --- /dev/null +++ b/src/stream/src/executor/now.rs @@ -0,0 +1,302 @@ +// Copyright 2022 Singularity Data +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use async_stack_trace::StackTrace; +use chrono::NaiveDateTime; +use futures::{pin_mut, StreamExt}; +use futures_async_stream::try_stream; +use risingwave_common::array::{DataChunk, Op, StreamChunk}; +use risingwave_common::catalog::{Field, Schema}; +use risingwave_common::row::Row; +use risingwave_common::types::{DataType, NaiveDateTimeWrapper, ScalarImpl}; +use risingwave_common::util::epoch::Epoch; +use risingwave_storage::StateStore; +use tokio::sync::mpsc::UnboundedReceiver; + +use super::{ + Barrier, BoxedMessageStream, Executor, Message, Mutation, PkIndices, PkIndicesRef, + StreamExecutorError, Watermark, +}; +use crate::common::table::state_table::StateTable; + +pub struct NowExecutor { + /// Receiver of barrier channel. + barrier_receiver: UnboundedReceiver, + + pk_indices: PkIndices, + identity: String, + schema: Schema, + state_table: StateTable, +} + +impl NowExecutor { + pub fn new( + barrier_receiver: UnboundedReceiver, + executor_id: u64, + state_table: StateTable, + ) -> Self { + let schema = Schema::new(vec![Field { + data_type: DataType::Timestamp, + name: String::from("now"), + sub_fields: vec![], + type_name: String::default(), + }]); + Self { + barrier_receiver, + pk_indices: vec![], + identity: format!("NowExecutor {:X}", executor_id), + schema, + state_table, + } + } + + #[try_stream(ok = Message, error = StreamExecutorError)] + async fn into_stream(self) { + let Self { + mut barrier_receiver, + mut state_table, + schema, + .. + } = self; + + // Consume the first barrier message and initialize state table. + let barrier = barrier_receiver + .recv() + .stack_trace("now_executor_recv_first_barrier") + .await + .unwrap(); + let mut is_pausing = barrier.is_pause() || barrier.is_update(); + + state_table.init_epoch(barrier.epoch); + + // The first barrier message should be propagated. + yield Message::Barrier(barrier); + + let state_row = { + let data_iter = state_table.iter().await?; + pin_mut!(data_iter); + if let Some(state_row) = data_iter.next().await { + Some(state_row?) + } else { + None + } + }; + + let mut last_timestamp = state_row.and_then(|row| row[0].clone()); + + while let Some(barrier) = barrier_receiver.recv().await { + if !is_pausing { + let time_millis = Epoch::from(barrier.epoch.curr).as_unix_millis(); + let timestamp = Some(ScalarImpl::NaiveDateTime(NaiveDateTimeWrapper::new( + NaiveDateTime::from_timestamp_opt( + (time_millis / 1000) as i64, + (time_millis % 1000 * 1_000_000) as u32, + ) + .unwrap(), + ))); + + let data_chunk = DataChunk::from_rows( + &if last_timestamp.is_some() { + vec![ + Row::new(vec![last_timestamp.clone()]), + Row::new(vec![timestamp.clone()]), + ] + } else { + vec![Row::new(vec![timestamp.clone()])] + }, + &schema.data_types(), + ); + let mut ops = if last_timestamp.is_some() { + vec![Op::Delete] + } else { + vec![] + }; + ops.push(Op::Insert); + let stream_chunk = StreamChunk::from_parts(ops, data_chunk); + yield Message::Chunk(stream_chunk); + + yield Message::Watermark(Watermark::new( + 0, + DataType::TIMESTAMP, + timestamp.as_ref().unwrap().clone(), + )); + + if last_timestamp.is_some() { + state_table.delete(Row::new(vec![last_timestamp])); + } + state_table.insert(Row::new(vec![timestamp.clone()])); + last_timestamp = timestamp; + + state_table.commit(barrier.epoch).await?; + } else { + state_table.commit_no_data_expected(barrier.epoch); + } + if let Some(mutation) = barrier.mutation.as_deref() { + match mutation { + Mutation::Pause | Mutation::Update { .. } => is_pausing = true, + Mutation::Resume => is_pausing = false, + _ => {} + } + } + + yield Message::Barrier(barrier); + } + } +} + +impl Executor for NowExecutor { + fn execute(self: Box) -> BoxedMessageStream { + self.into_stream().boxed() + } + + fn schema(&self) -> &Schema { + &self.schema + } + + fn pk_indices(&self) -> PkIndicesRef<'_> { + &self.pk_indices + } + + fn identity(&self) -> &str { + self.identity.as_str() + } +} + +#[cfg(test)] +mod tests { + use std::str::FromStr; + + use chrono::NaiveDateTime; + use futures::StreamExt; + use risingwave_common::array::StreamChunk; + use risingwave_common::catalog::{ColumnDesc, ColumnId, TableId}; + use risingwave_common::test_prelude::StreamChunkTestExt; + use risingwave_common::types::{DataType, NaiveDateTimeWrapper, ScalarImpl}; + use risingwave_common::util::sort_util::OrderType; + use risingwave_storage::memory::MemoryStateStore; + use tokio::sync::mpsc::{unbounded_channel, UnboundedSender}; + + use super::NowExecutor; + use crate::common::table::state_table::StateTable; + use crate::executor::{Barrier, BoxedMessageStream, Executor, Message, PkIndices, Watermark}; + + #[tokio::test] + async fn test_now() { + let state_table = create_state_table().await; + let (tx, mut now_executor) = create_executor(state_table); + + // Init barrier + tx.send(Barrier::new_test_barrier(1)).unwrap(); + + // Consume the barrier + now_executor.next().await.unwrap().unwrap(); + + tx.send(Barrier::with_prev_epoch_for_test(1 << 16, 1)) + .unwrap(); + + // Consume the data chunk + let chunk_msg = now_executor.next().await.unwrap().unwrap(); + + assert_eq!( + chunk_msg.into_chunk().unwrap().compact(), + StreamChunk::from_pretty( + " TS + + 2021-04-01T00:00:00.001" + ) + ); + + // Consume the watermark + let watermark = now_executor.next().await.unwrap().unwrap(); + + assert_eq!( + watermark, + Message::Watermark(Watermark::new( + 0, + DataType::TIMESTAMP, + ScalarImpl::NaiveDateTime(NaiveDateTimeWrapper::new( + NaiveDateTime::from_str("2021-04-01T00:00:00.001").unwrap() + )) + )) + ); + + // Consume the barrier + now_executor.next().await.unwrap().unwrap(); + + tx.send(Barrier::with_prev_epoch_for_test(2 << 16, 1 << 16)) + .unwrap(); + + // Consume the data chunk + let chunk_msg = now_executor.next().await.unwrap().unwrap(); + + assert_eq!( + chunk_msg.into_chunk().unwrap().compact(), + StreamChunk::from_pretty( + " TS + - 2021-04-01T00:00:00.001 + + 2021-04-01T00:00:00.002" + ) + ); + + // Consume the watermark + let watermark = now_executor.next().await.unwrap().unwrap(); + + assert_eq!( + watermark, + Message::Watermark(Watermark::new( + 0, + DataType::TIMESTAMP, + ScalarImpl::NaiveDateTime(NaiveDateTimeWrapper::new( + NaiveDateTime::from_str("2021-04-01T00:00:00.002").unwrap() + )) + )) + ); + + // Consume the barrier + now_executor.next().await.unwrap().unwrap(); + } + + #[inline] + fn create_pk_indices() -> PkIndices { + vec![] + } + + #[inline] + fn create_order_types() -> Vec { + vec![] + } + + async fn create_state_table() -> StateTable { + let memory_state_store = MemoryStateStore::new(); + let table_id = TableId::new(1); + let column_descs = vec![ColumnDesc::unnamed(ColumnId::new(0), DataType::Timestamp)]; + let order_types = create_order_types(); + let pk_indices = create_pk_indices(); + StateTable::new_without_distribution( + memory_state_store, + table_id, + column_descs, + order_types, + pk_indices, + ) + .await + } + + fn create_executor( + state_table: StateTable, + ) -> (UnboundedSender, BoxedMessageStream) { + let (sender, barrier_receiver) = unbounded_channel(); + let now_executor = NowExecutor::new(barrier_receiver, 1, state_table); + (sender, Box::new(now_executor).execute()) + } +} diff --git a/src/stream/src/from_proto/mod.rs b/src/stream/src/from_proto/mod.rs index f5107b7db17e0..6ca89e32551d9 100644 --- a/src/stream/src/from_proto/mod.rs +++ b/src/stream/src/from_proto/mod.rs @@ -31,6 +31,7 @@ mod lookup; mod lookup_union; mod merge; mod mview; +mod now; mod project; mod project_set; mod row_id_gen; @@ -64,6 +65,7 @@ use self::lookup::*; use self::lookup_union::*; use self::merge::*; use self::mview::*; +use self::now::NowExecutorBuilder; use self::project::*; use self::project_set::*; use self::row_id_gen::RowIdGenExecutorBuilder; @@ -143,5 +145,6 @@ pub async fn create_executor( NodeBody::WatermarkFilter => WatermarkFilterBuilder, NodeBody::Dml => DmlExecutorBuilder, NodeBody::RowIdGen => RowIdGenExecutorBuilder, + NodeBody::Now => NowExecutorBuilder, } } diff --git a/src/stream/src/from_proto/now.rs b/src/stream/src/from_proto/now.rs new file mode 100644 index 0000000000000..b52d5fed2bae4 --- /dev/null +++ b/src/stream/src/from_proto/now.rs @@ -0,0 +1,52 @@ +// Copyright 2022 Singularity Data +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use risingwave_pb::stream_plan::NowNode; +use risingwave_storage::StateStore; +use tokio::sync::mpsc::unbounded_channel; + +use super::ExecutorBuilder; +use crate::common::table::state_table::StateTable; +use crate::error::StreamResult; +use crate::executor::{BoxedExecutor, NowExecutor}; +use crate::task::{ExecutorParams, LocalStreamManagerCore}; + +pub struct NowExecutorBuilder; + +#[async_trait::async_trait] +impl ExecutorBuilder for NowExecutorBuilder { + type Node = NowNode; + + async fn new_boxed_executor( + params: ExecutorParams, + node: &NowNode, + store: impl StateStore, + stream: &mut LocalStreamManagerCore, + ) -> StreamResult { + let (sender, barrier_receiver) = unbounded_channel(); + stream + .context + .lock_barrier_manager() + .register_sender(params.actor_context.id, sender); + + let state_table = + StateTable::from_table_catalog(node.get_state_table()?, store, None).await; + + Ok(Box::new(NowExecutor::new( + barrier_receiver, + params.executor_id, + state_table, + ))) + } +}