From 9c16bb552fe1b0d219adab01351a859b6617a6b2 Mon Sep 17 00:00:00 2001 From: Rob A'Court Date: Mon, 1 Jul 2024 09:42:26 +0100 Subject: [PATCH] Chunk large satellite payloads (#1399) To prevent the 100MB payload limit: ``` [Symbol(kError)]: RangeError: Max payload size exceeded at Receiver.haveLength (/Users/rob/src/electric-sql/electric/node_modules/.pnpm/ws@8.17.0/node_modules/ws/lib/receiver.js:419:28) at Receiver.getPayloadLength64 (/Users/rob/src/electric-sql/electric/node_modules/.pnpm/ws@8.17.0/node_modules/ws/lib/receiver.js:406:10) at Receiver.startLoop (/Users/rob/src/electric-sql/electric/node_modules/.pnpm/ws@8.17.0/node_modules/ws/lib/receiver.js:161:16) at Receiver._write (/Users/rob/src/electric-sql/electric/node_modules/.pnpm/ws@8.17.0/node_modules/ws/lib/receiver.js:94:10) at writeOrBuffer (node:internal/streams/writable:564:12) at _write (node:internal/streams/writable:493:10) at Writable.write (node:internal/streams/writable:502:10) at Socket.socketOnData (/Users/rob/src/electric-sql/electric/node_modules/.pnpm/ws@8.17.0/node_modules/ws/lib/websocket.js:1303:35) at Socket.emit (node:events:519:28) at addChunk (node:internal/streams/readable:559:12) { code: 'WS_ERR_UNSUPPORTED_MESSAGE_LENGTH', [Symbol(status-code)]: 1009 ``` --- .changeset/rare-dancers-explain.md | 5 +++++ .../lib/electric/satellite/serialization.ex | 13 ++++++++++--- 2 files changed, 15 insertions(+), 3 deletions(-) create mode 100644 .changeset/rare-dancers-explain.md diff --git a/.changeset/rare-dancers-explain.md b/.changeset/rare-dancers-explain.md new file mode 100644 index 0000000000..32e84f697a --- /dev/null +++ b/.changeset/rare-dancers-explain.md @@ -0,0 +1,5 @@ +--- +"@core/electric": patch +--- + +Limit the number of changes in a websocket frame to 100 changes to reduce the chance of frame exceeding 100MB limit in the case where there are lots of changes diff --git a/components/electric/lib/electric/satellite/serialization.ex b/components/electric/lib/electric/satellite/serialization.ex index e7c9e34830..2897f9ba78 100644 --- a/components/electric/lib/electric/satellite/serialization.ex +++ b/components/electric/lib/electric/satellite/serialization.ex @@ -74,7 +74,7 @@ defmodule Electric.Satellite.Serialization do commit_op = %SatTransOp{op: {:commit, tx_commit}} { - [%SatOpLog{ops: [begin_op | Enum.reverse([commit_op | state.ops])]}], + messages_from_ops([begin_op | Enum.reverse([commit_op | state.ops])]), state.new_relations, state.known_relations } @@ -93,7 +93,7 @@ defmodule Electric.Satellite.Serialization do # The changes cannot be migration relations, so our "state" is limited state = Enum.reduce(changes, state, &serialize_change/2) - {[%SatOpLog{ops: [begin_op | state.ops]}], state.new_relations, state.known_relations} + {messages_from_ops([begin_op | state.ops]), state.new_relations, state.known_relations} end def serialize_shape_data_as_tx(changes, known_relations) do @@ -106,7 +106,14 @@ defmodule Electric.Satellite.Serialization do # The changes cannot be migration relations, so our "state" is limited state = Enum.reduce(changes, state, &serialize_change/2) - {[%SatOpLog{ops: state.ops}], state.new_relations, state.known_relations} + {messages_from_ops(state.ops), state.new_relations, state.known_relations} + end + + @max_ops_per_message 100 + defp messages_from_ops(ops) do + ops + |> Enum.chunk_every(@max_ops_per_message) + |> Enum.map(&%SatOpLog{ops: &1}) end defp serialize_change(record, state) when is_migration_relation(record.relation) do