Skip to content

Commit

Permalink
chore(electric,client): Create new protocol op to represent a compens…
Browse files Browse the repository at this point in the history
…ation (#639)

For permissions etc we need to always have the `OLD` values available,
so for clarity don't send out SatOpUpdate with no old_data values
  • Loading branch information
magnetised authored Nov 23, 2023
1 parent 863f9f3 commit eb722c9
Show file tree
Hide file tree
Showing 14 changed files with 629 additions and 35 deletions.
6 changes: 6 additions & 0 deletions .changeset/itchy-carrots-invite.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"@core/electric": minor
"electric-sql": minor
---

[VAX-1335] Create new protocol op to represent a compensation
99 changes: 99 additions & 0 deletions clients/typescript/src/_generated/protocol/satellite.ts
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ export interface SatTransOp {
insert?: SatOpInsert | undefined;
delete?: SatOpDelete | undefined;
migrate?: SatOpMigrate | undefined;
compensation?: SatOpCompensation | undefined;
}

/**
Expand Down Expand Up @@ -299,6 +300,16 @@ export interface SatOpDelete {
tags: string[];
}

export interface SatOpCompensation {
$type: "Electric.Satellite.SatOpCompensation";
relationId: number;
pkData:
| SatOpRow
| undefined;
/** dependency information */
tags: string[];
}

/** Message that corresponds to the single row. */
export interface SatOpRow {
$type: "Electric.Satellite.SatOpRow";
Expand Down Expand Up @@ -1508,6 +1519,7 @@ function createBaseSatTransOp(): SatTransOp {
insert: undefined,
delete: undefined,
migrate: undefined,
compensation: undefined,
};
}

Expand All @@ -1533,6 +1545,9 @@ export const SatTransOp = {
if (message.migrate !== undefined) {
SatOpMigrate.encode(message.migrate, writer.uint32(50).fork()).ldelim();
}
if (message.compensation !== undefined) {
SatOpCompensation.encode(message.compensation, writer.uint32(58).fork()).ldelim();
}
return writer;
},

Expand Down Expand Up @@ -1585,6 +1600,13 @@ export const SatTransOp = {

message.migrate = SatOpMigrate.decode(reader, reader.uint32());
continue;
case 7:
if (tag !== 58) {
break;
}

message.compensation = SatOpCompensation.decode(reader, reader.uint32());
continue;
}
if ((tag & 7) === 4 || tag === 0) {
break;
Expand Down Expand Up @@ -1618,6 +1640,9 @@ export const SatTransOp = {
message.migrate = (object.migrate !== undefined && object.migrate !== null)
? SatOpMigrate.fromPartial(object.migrate)
: undefined;
message.compensation = (object.compensation !== undefined && object.compensation !== null)
? SatOpCompensation.fromPartial(object.compensation)
: undefined;
return message;
},
};
Expand Down Expand Up @@ -2042,6 +2067,80 @@ export const SatOpDelete = {

messageTypeRegistry.set(SatOpDelete.$type, SatOpDelete);

function createBaseSatOpCompensation(): SatOpCompensation {
return { $type: "Electric.Satellite.SatOpCompensation", relationId: 0, pkData: undefined, tags: [] };
}

export const SatOpCompensation = {
$type: "Electric.Satellite.SatOpCompensation" as const,

encode(message: SatOpCompensation, writer: _m0.Writer = _m0.Writer.create()): _m0.Writer {
if (message.relationId !== 0) {
writer.uint32(8).uint32(message.relationId);
}
if (message.pkData !== undefined) {
SatOpRow.encode(message.pkData, writer.uint32(18).fork()).ldelim();
}
for (const v of message.tags) {
writer.uint32(34).string(v!);
}
return writer;
},

decode(input: _m0.Reader | Uint8Array, length?: number): SatOpCompensation {
const reader = input instanceof _m0.Reader ? input : _m0.Reader.create(input);
let end = length === undefined ? reader.len : reader.pos + length;
const message = createBaseSatOpCompensation();
while (reader.pos < end) {
const tag = reader.uint32();
switch (tag >>> 3) {
case 1:
if (tag !== 8) {
break;
}

message.relationId = reader.uint32();
continue;
case 2:
if (tag !== 18) {
break;
}

message.pkData = SatOpRow.decode(reader, reader.uint32());
continue;
case 4:
if (tag !== 34) {
break;
}

message.tags.push(reader.string());
continue;
}
if ((tag & 7) === 4 || tag === 0) {
break;
}
reader.skipType(tag & 7);
}
return message;
},

create<I extends Exact<DeepPartial<SatOpCompensation>, I>>(base?: I): SatOpCompensation {
return SatOpCompensation.fromPartial(base ?? {});
},

fromPartial<I extends Exact<DeepPartial<SatOpCompensation>, I>>(object: I): SatOpCompensation {
const message = createBaseSatOpCompensation();
message.relationId = object.relationId ?? 0;
message.pkData = (object.pkData !== undefined && object.pkData !== null)
? SatOpRow.fromPartial(object.pkData)
: undefined;
message.tags = object.tags?.map((e) => e) || [];
return message;
},
};

messageTypeRegistry.set(SatOpCompensation.$type, SatOpCompensation);

function createBaseSatOpRow(): SatOpRow {
return { $type: "Electric.Satellite.SatOpRow", nullsBitmask: new Uint8Array(), values: [] };
}
Expand Down
11 changes: 5 additions & 6 deletions clients/typescript/src/migrators/triggers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,9 @@ export function generateOplogTriggers(
/**
* Generates triggers for compensations for all foreign keys in the provided table.
*
* Compensation is recorded as a specially-formatted update. It acts as a no-op, with
* previous value set to NULL, and it's on the server to figure out that this is a no-op
* compensation operation (usually `UPDATE` would have previous row state known). The entire
* reason for it existing is to maybe revive the row if it has been deleted, so we need correct tags.
* Compensation is recorded as a SatOpCompensation messaage. The entire reason
* for it existing is to maybe revive the row if it has been deleted, so we need
* correct tags.
*
* The compensation update contains _just_ the primary keys, no other columns are present.
*
Expand Down Expand Up @@ -155,7 +154,7 @@ function generateCompensationTriggers(table: Table): Statement[] {
1 == (SELECT value from _electric_meta WHERE key == 'compensations')
BEGIN
INSERT INTO _electric_oplog (namespace, tablename, optype, primaryKey, newRow, oldRow, timestamp)
SELECT '${fkTableNamespace}', '${fkTableName}', 'UPDATE', json_object(${joinedFkPKs}), json_object(${joinedFkPKs}), NULL, NULL
SELECT '${fkTableNamespace}', '${fkTableName}', 'COMPENSATION', json_object(${joinedFkPKs}), json_object(${joinedFkPKs}), NULL, NULL
FROM "${fkTableNamespace}"."${fkTableName}" WHERE "${foreignKey.parentKey}" = new."${foreignKey.childKey}";
END;
`,
Expand All @@ -167,7 +166,7 @@ function generateCompensationTriggers(table: Table): Statement[] {
1 == (SELECT value from _electric_meta WHERE key == 'compensations')
BEGIN
INSERT INTO _electric_oplog (namespace, tablename, optype, primaryKey, newRow, oldRow, timestamp)
SELECT '${fkTableNamespace}', '${fkTableName}', 'UPDATE', json_object(${joinedFkPKs}), json_object(${joinedFkPKs}), NULL, NULL
SELECT '${fkTableNamespace}', '${fkTableName}', 'COMPENSATION', json_object(${joinedFkPKs}), json_object(${joinedFkPKs}), NULL, NULL
FROM "${fkTableNamespace}"."${fkTableName}" WHERE "${foreignKey.parentKey}" = new."${foreignKey.childKey}";
END;
`,
Expand Down
9 changes: 9 additions & 0 deletions clients/typescript/src/satellite/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -592,6 +592,15 @@ export class SatelliteClient implements Client {
},
})
break
case DataChangeType.COMPENSATION:
changeOp = SatTransOp.fromPartial({
compensation: {
pkData: record,
relationId: relation.id,
tags: tags,
},
})
break
}
ops.push(changeOp)
})
Expand Down
1 change: 1 addition & 0 deletions clients/typescript/src/util/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ export enum DataChangeType {
INSERT = 'INSERT',
UPDATE = 'UPDATE',
DELETE = 'DELETE',
COMPENSATION = 'COMPENSATION',
}

export type Change = DataChange | SchemaChange
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,7 @@ defmodule Electric.Postgres.ShadowTableTransformation do

defp build_bitmask(%Changes.NewRecord{}, columns), do: Enum.map(columns, fn _ -> "t" end)

defp build_bitmask(%Changes.UpdatedRecord{old_record: nil}, columns),
do: Enum.map(columns, fn _ -> "f" end)
defp build_bitmask(%Changes.Compensation{}, columns), do: Enum.map(columns, fn _ -> "f" end)

defp build_bitmask(%Changes.UpdatedRecord{old_record: old, record: new}, columns),
do: Enum.map(columns, fn col -> if old[col] != new[col], do: "t", else: "f" end)
Expand Down
17 changes: 13 additions & 4 deletions components/electric/lib/electric/replication/changes.ex
Original file line number Diff line number Diff line change
Expand Up @@ -57,19 +57,18 @@ defmodule Electric.Replication.Changes do
]

def count_operations(%__MODULE__{changes: changes}) do
base = %{operations: 0, inserts: 0, updates: 0, deletes: 0}
base = %{operations: 0, inserts: 0, updates: 0, deletes: 0, compensations: 0}

Enum.reduce(changes, base, fn %module{}, acc ->
key =
case module do
Changes.NewRecord -> :inserts
Changes.UpdatedRecord -> :updates
Changes.DeletedRecord -> :deletes
Changes.Compensation -> :compensations
end

%{^key => value, :operations => total} = acc

%{acc | key => value + 1, :operations => total + 1}
Map.update!(%{acc | operations: acc.operations + 1}, key, &(&1 + 1))
end)
end
end
Expand Down Expand Up @@ -105,6 +104,16 @@ defmodule Electric.Replication.Changes do
}
end

defmodule Compensation do
defstruct [:relation, :record, tags: []]

@type t() :: %__MODULE__{
relation: Changes.relation(),
record: Changes.record(),
tags: [Changes.tag()]
}
end

defmodule TruncatedRelation do
defstruct [:relation]
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -458,10 +458,7 @@ defmodule Electric.Replication.Postgres.SlotServer do
}
end

defp changes_to_wal(
%Changes.UpdatedRecord{relation: table, old_record: nil, record: new},
relations
) do
defp changes_to_wal(%Changes.Compensation{relation: table, record: new}, relations) do
%ReplicationMessages.Update{
relation_id: relations[table].oid,
tuple_data: record_to_tuple(new, relations[table].columns)
Expand Down
10 changes: 1 addition & 9 deletions components/electric/lib/electric/satellite/protobuf.ex
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ defmodule Electric.Satellite.Protobuf do
SatOpInsert,
SatOpUpdate,
SatOpMigrate,
SatOpCompensation,
SatTransOp,
SatRelation,
SatRelationColumn,
Expand All @@ -125,15 +126,6 @@ defmodule Electric.Satellite.Protobuf do
end
end

defmodule Version do
defstruct major: nil, minor: nil

@type t() :: %__MODULE__{
major: integer,
minor: integer
}
end

@spec decode(byte(), binary()) :: {:ok, sq_pb_msg()} | {:error, any()}
for {module, tag} <- @mapping do
def decode(unquote(tag), binary) do
Expand Down
Loading

0 comments on commit eb722c9

Please sign in to comment.