diff --git a/.cargo/config.toml b/.cargo/config.toml new file mode 100644 index 0000000..1494815 --- /dev/null +++ b/.cargo/config.toml @@ -0,0 +1,8 @@ +[build] +rustflags = ["-C", "target-cpu=native"] + +[target.wasm32-unknown-unknown] +rustflags = ["-C", "target-feature=+avx2"] + +[target.wasm32-wasi] +rustflags = ["-C", "target-feature=+avx2"] diff --git a/.gitignore b/.gitignore index ea8c4bf..89c0fff 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,3 @@ /target +.idea +*.iml diff --git a/Cargo.lock b/Cargo.lock index 41edd88..bac039e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1375,6 +1375,15 @@ dependencies = [ "miniz_oxide", ] +[[package]] +name = "float-cmp" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "98de4bbd547a563b716d8dfa9aad1cb19bfab00f4fa09a6a4ed21dbcf44ce9c4" +dependencies = [ + "num-traits", +] + [[package]] name = "fnv" version = "1.0.7" @@ -1583,6 +1592,16 @@ version = "1.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "eabb4a44450da02c90444cf74558da904edde8fb4e9035a9a6a4e15445af0bd7" +[[package]] +name = "halfbrown" +version = "0.1.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e2a3c70a9c00cc1ee87b54e89f9505f73bb17d63f1b25c9a462ba8ef885444f" +dependencies = [ + "hashbrown 0.13.2", + "serde", +] + [[package]] name = "hash_hasher" version = "2.0.3" @@ -1606,6 +1625,15 @@ dependencies = [ "ahash 0.8.3", ] +[[package]] +name = "hashbrown" +version = "0.13.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "43a3c133739dddd0d2990f9a4bdf8eb4b21ef50e4851ca85ab661199821d510e" +dependencies = [ + "ahash 0.8.3", +] + [[package]] name = "headers" version = "0.3.8" @@ -2043,6 +2071,70 @@ version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" +[[package]] +name = "lexical-core" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2cde5de06e8d4c2faabc400238f9ae1c74d5412d03a7bd067645ccbc47070e46" +dependencies = [ + "lexical-parse-float", + "lexical-parse-integer", + "lexical-util", + "lexical-write-float", + "lexical-write-integer", +] + +[[package]] +name = "lexical-parse-float" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "683b3a5ebd0130b8fb52ba0bdc718cc56815b6a097e28ae5a6997d0ad17dc05f" +dependencies = [ + "lexical-parse-integer", + "lexical-util", + "static_assertions", +] + +[[package]] +name = "lexical-parse-integer" +version = "0.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d0994485ed0c312f6d965766754ea177d07f9c00c9b82a5ee62ed5b47945ee9" +dependencies = [ + "lexical-util", + "static_assertions", +] + +[[package]] +name = "lexical-util" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5255b9ff16ff898710eb9eb63cb39248ea8a5bb036bea8085b1a767ff6c4e3fc" +dependencies = [ + "static_assertions", +] + +[[package]] +name = "lexical-write-float" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "accabaa1c4581f05a3923d1b4cfd124c329352288b7b9da09e766b0668116862" +dependencies = [ + "lexical-util", + "lexical-write-integer", + "static_assertions", +] + +[[package]] +name = "lexical-write-integer" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e1b6f3d1f4422866b68192d62f77bc5c700bee84f3069f2469d7bc8c77852446" +dependencies = [ + "lexical-util", + "static_assertions", +] + [[package]] name = "libc" version = "0.2.140" @@ -2877,6 +2969,7 @@ version = "0.1.0" dependencies = [ "anyhow", "async-stream", + "bytes", "futures", "mz-expr", "mz-pgcopy", @@ -2884,6 +2977,9 @@ dependencies = [ "mz-repr", "once_cell", "postgres-protocol", + "serde", + "serde_json", + "simd-json", "tokio", "tokio-postgres", "uuid", @@ -3787,6 +3883,20 @@ dependencies = [ "outref", ] +[[package]] +name = "simd-json" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e3375b6c3d8c048ba09c8b4b6c3f1d3f35e06b71db07d231c323943a949e1b8" +dependencies = [ + "halfbrown", + "lexical-core", + "serde", + "serde_json", + "simdutf8", + "value-trait", +] + [[package]] name = "simdutf8" version = "0.1.4" @@ -4666,6 +4776,18 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d" +[[package]] +name = "value-trait" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "995de1aa349a0dc50f4aa40870dce12961a30229027230bad09acd2843edbe9e" +dependencies = [ + "float-cmp", + "halfbrown", + "itoa", + "ryu", +] + [[package]] name = "vcpkg" version = "0.2.15" diff --git a/Cargo.toml b/Cargo.toml index 798caec..0e51bc9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,3 +21,7 @@ uuid = "*" async-stream = "*" futures = "*" once_cell = "*" +simd-json = "0.7.0" +bytes = "1.4.0" +serde = "*" +serde_json = "*" diff --git a/README.md b/README.md index 85260bc..f8f0f75 100644 --- a/README.md +++ b/README.md @@ -7,7 +7,7 @@ A postgres instance with a `testdb` dfatabase and a `testtbl` table with a `id` ```postgresql CREATE DATABASE testdb; \c testdb; -CREATE TABLE testtbl (id int, name text); +CREATE TABLE testtbl (id int PRIMARY KEY, name text); CREATE PUBLICATION testpub FOR TABLE testtbl; INSERT INTO testtbl VALUES (1, 'snot'); @@ -28,3 +28,10 @@ potentially the starting position can be provided by a 2nd parameter ```bash cargo run -- "host=127.0.0.1 port=5433 user=postgres password=password dbname=testdb" 0/17773B0 ``` + + +## Notes +the tables set for replication needs to have a primary key otherwise you get an error about Replication identity missing for the table updates +```sql +ALTER TABLE testtbl REPLICA IDENTITY DEFAULT; +``` \ No newline at end of file diff --git a/db.dockerfile b/db.dockerfile new file mode 100644 index 0000000..b833b41 --- /dev/null +++ b/db.dockerfile @@ -0,0 +1,4 @@ +FROM postgres:14.7 + +RUN apt-get update && apt-get install -y postgresql-14-wal2json + diff --git a/docker-compose.yaml b/docker-compose.yaml new file mode 100644 index 0000000..0a83bf5 --- /dev/null +++ b/docker-compose.yaml @@ -0,0 +1,24 @@ +version: "3.4" + +services: + db: + image: postgres:14.5 + container_name: db + build: + context: . + dockerfile: db.dockerfile + restart: always + environment: + POSTGRES_USER: postgres + POSTGRES_PASSWORD: password + command: -c wal_level=logical + ports: + - 5432:5432 + + adminer: + image: adminer + restart: always + depends_on: + - db + ports: + - 8080:8080 diff --git a/src/main.rs b/src/main.rs index 69ba4ad..53f13e1 100644 --- a/src/main.rs +++ b/src/main.rs @@ -4,11 +4,10 @@ use mz_expr::MirScalarExpr; use mz_postgres_util::{desc::PostgresTableDesc, Config}; use mz_repr::{Datum, DatumVec, Row}; use once_cell::sync::Lazy; -use postgres_protocol::message::backend::{ - LogicalReplicationMessage, ReplicationMessage, XLogDataBody, -}; +use postgres_protocol::message::backend::*; use std::{ collections::BTreeMap, + env, str::FromStr, sync::{ atomic::{AtomicU64, Ordering}, @@ -19,6 +18,8 @@ use std::{ use tokio_postgres::{ replication::LogicalReplicationStream, types::PgLsn, Client, SimpleQueryMessage, }; +mod serializer; +use serializer::SerializedXLogDataBody; //https://dev.materialize.com/api/rust/mz_postgres_util/struct.Config.html //https://github.com/MaterializeInc/materialize/blob/main/src/storage/src/source/postgres.rs#L507 @@ -53,20 +54,26 @@ fn parse_single_row( #[tokio::main] async fn main() -> Result<(), ReplicationError> { - let mut replication_lsn = PgLsn::from(23525992); + let args: Vec = env::args().collect(); + let mut replication_lsn = if args.len() > 2 { + PgLsn::from_str(&args[2]).unwrap() + } else { + PgLsn::from(0) + }; - let pg_config = tokio_postgres::Config::from_str( - "host=127.0.0.1 port=5433 user=postgres password=password dbname=testdb", - )?; + //"host=127.0.0.1 port=5433 user=postgres password=password dbname=testdb", + let pg_config = tokio_postgres::Config::from_str(&args[1])?; let tunnel_config = mz_postgres_util::TunnelConfig::Direct; let connection_config = Config::new(pg_config, tunnel_config)?; - let slot = "snot"; + let slot = "gamess"; - let publication = "testpub"; + let publication = "gamespub"; let source_id = "source_id"; if replication_lsn == PgLsn::from(0) { + println!("======== BEGIN SNAPSHOT =========="); + let publication_tables = mz_postgres_util::publication_info(&connection_config, publication, None).await?; @@ -157,7 +164,6 @@ async fn main() -> Result<(), ReplicationError> { dbg!(output, row, slot_lsn, 1); } - println!("======== END SNAPSHOT =========="); if let Some(temp_slot) = temp_slot { @@ -186,7 +192,7 @@ async fn main() -> Result<(), ReplicationError> { slot_lsn, Arc::new(0.into()), ) - .await; + .await; tokio::pin!(replication_stream); while let Some(event) = replication_stream.next().await { @@ -207,7 +213,7 @@ async fn main() -> Result<(), ReplicationError> { // Arc::clone(&resume_lsn), Arc::new(AtomicU64::new(0)), ) - .await; + .await; tokio::pin!(replication_stream); // TODO(petrosagg): The API does not guarantee that we won't see an error after we have already @@ -373,7 +379,7 @@ async fn produce_replication<'a>( // Scratch space to use while evaluating casts // let mut datum_vec = DatumVec::new(); - let last_commit_lsn = as_of; + let mut last_commit_lsn = as_of; // let mut observed_wal_end = as_of; // The outer loop alternates the client between streaming the replication slot and using // normal SQL queries with pg admin functions to fast-foward our cursor in the event of WAL @@ -414,8 +420,73 @@ async fn produce_replication<'a>( match stream.as_mut().next().await { Some(Ok(ReplicationMessage::XLogData(xlog_data))) => { last_data_message = Instant::now(); - yield xlog_data; + match xlog_data.data() { + LogicalReplicationMessage::Origin(origin) => { + // metrics.transactions.inc(); + last_commit_lsn = PgLsn::from(origin.commit_lsn()); + + println!("======== ORIGIN =========="); + let serialized_xlog = serde_json::to_string_pretty(&SerializedXLogDataBody(xlog_data)).unwrap(); + println!("{}", serialized_xlog); + println!("======== END OF the ORIGIN MESSAGE JSON =========="); + + // for (output, row) in deletes.drain(..) { + // yield Event::Message(last_commit_lsn, (output, row, -1)); + // } + // for (output, row) in inserts.drain(..) { + // yield Event::Message(last_commit_lsn, (output, row, 1)); + // } + // yield Event::Progress([PgLsn::from(u64::from(last_commit_lsn) + 1)]); + // metrics.lsn.set(last_commit_lsn.into()); + } + + LogicalReplicationMessage::Commit(commit) => { + last_commit_lsn = PgLsn::from(commit.end_lsn()); + println!("======== COMMIT =========="); + let serialized_xlog = serde_json::to_string_pretty(&SerializedXLogDataBody(xlog_data)).unwrap(); + println!("{}", serialized_xlog); + println!("======== END OF the COMMIT MESSAGE JSON =========="); + } + LogicalReplicationMessage::Begin(begin) => { + last_commit_lsn = PgLsn::from(begin.final_lsn()); + println!("======== BEGIN =========="); + let serialized_xlog = serde_json::to_string_pretty(&SerializedXLogDataBody(xlog_data)).unwrap(); + println!("{}", serialized_xlog); + println!("======== END OF the BEGIN MESSAGE JSON =========="); + + } + + LogicalReplicationMessage::Insert(_insert) => { + println!("======== INSERT =========="); + let serialized_xlog = serde_json::to_string_pretty(&SerializedXLogDataBody(xlog_data)).unwrap(); + println!("{}", serialized_xlog); + println!("======== END OF the INSERT MESSAGE JSON =========="); + } + + LogicalReplicationMessage::Update(_update) => { + println!("======== UPDATE =========="); + let serialized_xlog = serde_json::to_string_pretty(&SerializedXLogDataBody(xlog_data)).unwrap(); + println!("{}", serialized_xlog); + println!("======== END OF the UPDATE MESSAGE JSON =========="); + } + + LogicalReplicationMessage::Delete(_delete) => { + println!("======== DELETE =========="); + let serialized_xlog = serde_json::to_string_pretty(&SerializedXLogDataBody(xlog_data)).unwrap(); + println!("{}", serialized_xlog); + println!("======== END OF the DELETE MESSAGE JSON =========="); + } + + LogicalReplicationMessage::Relation(_relation) => { + println!("======== RELATION =========="); + let serialized_xlog = serde_json::to_string_pretty(&SerializedXLogDataBody(xlog_data)).unwrap(); + println!("{}", serialized_xlog); + println!("======== END OF the RELATION MESSAGE JSON =========="); + } + _ => yield xlog_data, + } } + ////////////////////////////////////// // Some(Ok(XLogData(xlog_data))) => match xlog_data.data() { // Begin(_) => { // last_data_message = Instant::now(); @@ -453,14 +524,14 @@ async fn produce_replication<'a>( // .ok_or_else(err) // .err_definite()? // .tuple_data(); - + // // let mut old_datums = datum_vec.borrow(); // datums_from_tuple(rel_id, old_tuple, &mut *old_datums) // .err_definite()?; // let old_row = cast_row(&info.casts, &old_datums).err_definite()?; // deletes.push((info.output_index, old_row)); // drop(old_datums); - + // // // If the new tuple contains unchanged toast values, reuse the ones // // from the old tuple // let new_tuple = update @@ -504,7 +575,7 @@ async fn produce_replication<'a>( // last_data_message = Instant::now(); // metrics.transactions.inc(); // last_commit_lsn = PgLsn::from(commit.end_lsn()); - + // // for (output, row) in deletes.drain(..) { // yield Event::Message(last_commit_lsn, (output, row, -1)); // } @@ -541,7 +612,7 @@ async fn produce_replication<'a>( // ); // valid_schema_change = false; // } - + // // // Relation messages do not include nullability/primary_key data so we // // check the name, type_oid, and type_mod explicitly and error if any // // of them differ @@ -550,7 +621,7 @@ async fn produce_replication<'a>( // let rel_typoid = u32::try_from(rel.type_id()).unwrap(); // let same_typoid = src.type_oid == rel_typoid; // let same_typmod = src.type_mod == rel.type_modifier(); - + // // if !same_name || !same_typoid || !same_typmod { // warn!( // "alter table error: name {}, oid {}, old_schema {:?}, new_schema {:?}", @@ -559,11 +630,11 @@ async fn produce_replication<'a>( // info.desc.columns, // relation.columns() // ); - + // // valid_schema_change = false; // } // } - + // // if valid_schema_change { // // Because the replication stream doesn't // // include columns' attnums, we need to check @@ -580,7 +651,7 @@ async fn produce_replication<'a>( // ) // .await // .err_indefinite()?; - + // // let remote_schema_eq = // Some(&info.desc) == current_publication_info.get(0); // if !remote_schema_eq { @@ -591,11 +662,11 @@ async fn produce_replication<'a>( // info.desc.columns, // current_publication_info.get(0) // ); - + // // valid_schema_change = false; // } // } - + // // if !valid_schema_change { // return Err(Definite(anyhow!( // "source table {} with oid {} has been altered", @@ -638,6 +709,7 @@ async fn produce_replication<'a>( // )))?; // } // }, + /////////////////// Some(Ok(ReplicationMessage::PrimaryKeepAlive(keepalive))) => { needs_status_update = needs_status_update || keepalive.reply() == 1; // observed_wal_end = PgLsn::from(keepalive.wal_end()); @@ -650,6 +722,7 @@ async fn produce_replication<'a>( return Err(ReplicationError::from(err))?; } None => { + dbg!("eof"); break; } // The enum is marked non_exhaustive, better be conservative @@ -730,4 +803,4 @@ async fn produce_replication<'a>( // } } }) -} +} \ No newline at end of file diff --git a/src/serializer.rs b/src/serializer.rs new file mode 100644 index 0000000..627a89e --- /dev/null +++ b/src/serializer.rs @@ -0,0 +1,277 @@ +use postgres_protocol::message::backend::*; +use serde::{Serializer, Serialize}; +use serde::ser::{Error, SerializeStruct}; +use std::fmt; + +/// SerializedTuple: A wrapper struct around a Tuple object that provides a Serialize implementation +/// for it. This struct is used to serialize the tuple data of a PostgreSQL message into JSON format. + +pub(crate) struct SerializedTuple<'a>(pub &'a Tuple); + +impl<'a> Serialize for SerializedTuple<'a> { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + let data = self.0.tuple_data().iter().map(|d| SerializedTupleData::from_tuple_data(d)).collect::>(); + let mut state = serializer.serialize_struct("Tuple", 1)?; + state.serialize_field("data", &data)?; + state.end() + } +} + +///SerializedOptionTuple: Similar to SerializedTuple, but for an optional Tuple object. +/// If the inner Option is Some, this struct serializes the tuple data. If the inner Option is None, +/// this struct serializes None. +pub(crate) struct SerializedOptionTuple<'a>(pub Option<&'a Tuple>); + +impl<'a> Serialize for SerializedOptionTuple<'a> { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + match self.0 { + Some(tuple) => { + let data = tuple.tuple_data().iter().map(|d| SerializedTupleData::from_tuple_data(d)).collect::>(); + let mut state = serializer.serialize_struct("Tuple", 1)?; + state.serialize_field("data", &data)?; + state.end() + }, + None => serializer.serialize_none(), + } + } +} + +impl<'a, 'b> From<&'b Tuple> for SerializedTuple<'a> where 'b: 'a { + fn from(tuple: &'b Tuple) -> Self { + SerializedTuple(tuple) + } +} + +///SerializedTupleData: An enum that represents the different types of data that can be present in +/// a tuple. This struct is used by SerializedTuple and SerializedOptionTuple +/// to serialize tuple data into JSON format. +#[derive(Serialize)] +enum SerializedTupleData { + Null, + UnchangedToast, + Text(String), +} + +impl SerializedTupleData { + fn from_tuple_data(data: &TupleData) -> SerializedTupleData { + match data { + TupleData::Null => SerializedTupleData::Null, + TupleData::UnchangedToast => SerializedTupleData::UnchangedToast, + TupleData::Text(bytes) => SerializedTupleData::Text(String::from_utf8_lossy(bytes.as_ref()).to_string()), + } + } +} + +///SerializedReplicaIdentity: An enum that represents the different types of replica identity that +/// can be configured for a table in PostgreSQL. This struct is used to serialize replica identity +/// information into JSON format. +#[derive(Serialize)] +enum SerializedReplicaIdentity { + Default, + Nothing, + Full, + Index, +} +impl SerializedReplicaIdentity { + fn from_replica_identity(replica_identity: &ReplicaIdentity) -> SerializedReplicaIdentity { + match replica_identity { + ReplicaIdentity::Default => SerializedReplicaIdentity::Default, + ReplicaIdentity::Nothing => SerializedReplicaIdentity::Nothing, + ReplicaIdentity::Full => SerializedReplicaIdentity::Full, + ReplicaIdentity::Index => SerializedReplicaIdentity::Index, + } + } +} + +///SerializedColumn: A struct that represents a PostgreSQL column, with information about +/// the column's flags, name, type ID, and type modifier. +/// This struct is used to serialize column information into JSON format. +#[derive(Debug, Serialize)] +pub struct SerializedColumn<'a> { + flags: i8, + name: &'a str, + type_id: i32, + type_modifier: i32, +} + +impl<'a> From<&'a Column> for SerializedColumn<'a> { + fn from(column: &'a Column) -> Self { + SerializedColumn { + flags: column.flags(), + name: column.name().unwrap(), // this will panic if there's an error reading the name + type_id: column.type_id(), + type_modifier: column.type_modifier(), + } + } +} + +///SerializedColumns: A wrapper struct around an array of Column objects that provides a +/// From implementation for it. This struct is used to serialize +/// an array of columns into JSON format. +#[derive(Debug, Serialize)] +pub struct SerializedColumns<'a> { + columns: Vec>, +} + +impl<'a> From<&'a [Column]> for SerializedColumns<'a> { + fn from(columns: &'a [Column]) -> Self { + SerializedColumns { + columns: columns.iter().map(|column| SerializedColumn::from(column)).collect(), + } + } +} + +///CustomError: A struct that represents a custom error type. +/// This struct is used to wrap errors from the std::io module and +/// provide a custom error message when they occur. +#[derive(Debug)] +pub struct CustomError { + message: String, +} + +impl From for CustomError { + fn from(error: std::io::Error) -> Self { + CustomError { + message: error.to_string(), + } + } +} + +impl Serialize for CustomError { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + serializer.serialize_str(&self.message) + } +} + +impl fmt::Display for CustomError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.message) + } +} + +///SerializedXLogDataBody: A wrapper struct around a XLogDataBody object that provides a +/// Serialize implementation for it. +/// This struct is used to serialize logical replication messages into JSON format. +#[derive(Debug)] +pub(crate) struct SerializedXLogDataBody(pub XLogDataBody); + +impl Serialize for SerializedXLogDataBody { + fn serialize(&self, serializer: S) -> Result { + let data = &SerializedLogicalReplicationMessage(&self.0.data()); + let mut state = serializer.serialize_struct("XLogDataBody", 3)?; + state.serialize_field("wal_start", &self.0.wal_start())?; + state.serialize_field("wal_end", &self.0.wal_end())?; + state.serialize_field("timestamp", &self.0.timestamp())?; + state.serialize_field("data", &data)?; + state.end() + } +} + +///SerializedLogicalReplicationMessage: A wrapper struct around a LogicalReplicationMessage object +/// that provides a Serialize implementation for it. +/// This struct is used by SerializedXLogDataBody to serialize +/// the data field of a logical replication message into JSON format. +pub(crate) struct SerializedLogicalReplicationMessage<'a>(pub &'a LogicalReplicationMessage); + +impl<'a> Serialize for SerializedLogicalReplicationMessage<'a> { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + let mut state = serializer.serialize_struct("LogicalReplicationMessage", 5)?; + match self.0 { + LogicalReplicationMessage::Begin(ref msg) => { + state.serialize_field("final_lsn", &msg.final_lsn())?; + state.serialize_field("timestamp", &msg.timestamp())?; + state.serialize_field("xid", &msg.xid())?; + } + LogicalReplicationMessage::Commit(ref msg) => { + state.serialize_field("flags", &msg.flags())?; + state.serialize_field("commit_lsn", &msg.commit_lsn())?; + state.serialize_field("end_lsn", &msg.end_lsn())?; + state.serialize_field("timestamp", &msg.timestamp())?; + } + LogicalReplicationMessage::Origin(ref msg) => { + state.serialize_field("commit_lsn", &msg.commit_lsn())?; + let name = msg + .name() + .map_err(CustomError::from) + .map_err(S::Error::custom)?; + state.serialize_field("name", &name)?; + } + LogicalReplicationMessage::Relation(ref msg) => { + state.serialize_field("rel_id", &msg.rel_id())?; + let namespace = msg + .namespace() + .map_err(CustomError::from) + .map_err(S::Error::custom)?; + let name = msg + .name() + .map_err(CustomError::from) + .map_err(S::Error::custom)?; + state.serialize_field("namespace", &namespace)?; + state.serialize_field("name", &name)?; + state.serialize_field("replica_identity", &SerializedReplicaIdentity::from_replica_identity(&msg.replica_identity()))?; + state.serialize_field("columns", &SerializedColumns::from(msg.columns()))?; + + } + LogicalReplicationMessage::Type(ref msg) => { + state.serialize_field("id", &msg.id())?; + let namespace = msg + .namespace() + .map_err(CustomError::from) + .map_err(S::Error::custom)?; + let name = msg + .name() + .map_err(CustomError::from) + .map_err(S::Error::custom)?; + state.serialize_field("namespace", &namespace)?; + state.serialize_field("name", &name)?; + } + LogicalReplicationMessage::Insert(ref msg) => { + state.serialize_field("rel_id", &msg.rel_id())?; + let serialized_tuple = SerializedTuple(msg.tuple()); + state.serialize_field("tuple", &serialized_tuple)?; + } + LogicalReplicationMessage::Update(ref msg) => { + state.serialize_field("rel_id", &msg.rel_id())?; + if let Some(_old_tuple) = &msg.old_tuple() { + let old_tuple = SerializedOptionTuple(*&msg.old_tuple()); + state.serialize_field("old_tuple", &old_tuple)?; + } + if let Some(_key_tuple) = &msg.key_tuple() { + let key_tuple = SerializedOptionTuple(*&msg.key_tuple()); + state.serialize_field("key_tuple", &key_tuple)?; + } + let new_tuple = SerializedTuple(&msg.new_tuple()); + state.serialize_field("new_tuple", &new_tuple)?; + } + LogicalReplicationMessage::Delete(ref msg) => { + state.serialize_field("rel_id", &msg.rel_id())?; + if let Some(_old_tuple) = &msg.old_tuple() { + let old_tuple = SerializedOptionTuple(*&msg.old_tuple()); + state.serialize_field("old_tuple", &old_tuple)?; + } + if let Some(_key_tuple) = &msg.key_tuple() { + let key_tuple = SerializedOptionTuple(*&msg.key_tuple()); + state.serialize_field("key_tuple", &key_tuple)?; + } + } + LogicalReplicationMessage::Truncate(ref msg) => { + state.serialize_field("options", &msg.options())?; + state.serialize_field("rel_ids", &msg.rel_ids())?; + } + _ => {} + } + state.end() + } +}