Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(electric,client): Create new protocol op to represent a compensation #639

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/itchy-carrots-invite.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"@core/electric": minor
"electric-sql": minor
---

[VAX-1335] Create new protocol op to represent a compensation
99 changes: 99 additions & 0 deletions clients/typescript/src/_generated/protocol/satellite.ts
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ export interface SatTransOp {
insert?: SatOpInsert | undefined;
delete?: SatOpDelete | undefined;
migrate?: SatOpMigrate | undefined;
compensation?: SatOpCompensation | undefined;
}

/**
Expand Down Expand Up @@ -299,6 +300,16 @@ export interface SatOpDelete {
tags: string[];
}

export interface SatOpCompensation {
$type: "Electric.Satellite.SatOpCompensation";
relationId: number;
pkData:
| SatOpRow
| undefined;
/** dependency information */
tags: string[];
}

/** Message that corresponds to the single row. */
export interface SatOpRow {
$type: "Electric.Satellite.SatOpRow";
Expand Down Expand Up @@ -1508,6 +1519,7 @@ function createBaseSatTransOp(): SatTransOp {
insert: undefined,
delete: undefined,
migrate: undefined,
compensation: undefined,
};
}

Expand All @@ -1533,6 +1545,9 @@ export const SatTransOp = {
if (message.migrate !== undefined) {
SatOpMigrate.encode(message.migrate, writer.uint32(50).fork()).ldelim();
}
if (message.compensation !== undefined) {
SatOpCompensation.encode(message.compensation, writer.uint32(58).fork()).ldelim();
}
return writer;
},

Expand Down Expand Up @@ -1585,6 +1600,13 @@ export const SatTransOp = {

message.migrate = SatOpMigrate.decode(reader, reader.uint32());
continue;
case 7:
if (tag !== 58) {
break;
}

message.compensation = SatOpCompensation.decode(reader, reader.uint32());
continue;
}
if ((tag & 7) === 4 || tag === 0) {
break;
Expand Down Expand Up @@ -1618,6 +1640,9 @@ export const SatTransOp = {
message.migrate = (object.migrate !== undefined && object.migrate !== null)
? SatOpMigrate.fromPartial(object.migrate)
: undefined;
message.compensation = (object.compensation !== undefined && object.compensation !== null)
? SatOpCompensation.fromPartial(object.compensation)
: undefined;
return message;
},
};
Expand Down Expand Up @@ -2042,6 +2067,80 @@ export const SatOpDelete = {

messageTypeRegistry.set(SatOpDelete.$type, SatOpDelete);

function createBaseSatOpCompensation(): SatOpCompensation {
return { $type: "Electric.Satellite.SatOpCompensation", relationId: 0, pkData: undefined, tags: [] };
}

export const SatOpCompensation = {
$type: "Electric.Satellite.SatOpCompensation" as const,

encode(message: SatOpCompensation, writer: _m0.Writer = _m0.Writer.create()): _m0.Writer {
if (message.relationId !== 0) {
writer.uint32(8).uint32(message.relationId);
}
if (message.pkData !== undefined) {
SatOpRow.encode(message.pkData, writer.uint32(18).fork()).ldelim();
}
for (const v of message.tags) {
writer.uint32(34).string(v!);
}
return writer;
},

decode(input: _m0.Reader | Uint8Array, length?: number): SatOpCompensation {
const reader = input instanceof _m0.Reader ? input : _m0.Reader.create(input);
let end = length === undefined ? reader.len : reader.pos + length;
const message = createBaseSatOpCompensation();
while (reader.pos < end) {
const tag = reader.uint32();
switch (tag >>> 3) {
case 1:
if (tag !== 8) {
break;
}

message.relationId = reader.uint32();
continue;
case 2:
if (tag !== 18) {
break;
}

message.pkData = SatOpRow.decode(reader, reader.uint32());
continue;
case 4:
if (tag !== 34) {
break;
}

message.tags.push(reader.string());
continue;
}
if ((tag & 7) === 4 || tag === 0) {
break;
}
reader.skipType(tag & 7);
}
return message;
},

create<I extends Exact<DeepPartial<SatOpCompensation>, I>>(base?: I): SatOpCompensation {
return SatOpCompensation.fromPartial(base ?? {});
},

fromPartial<I extends Exact<DeepPartial<SatOpCompensation>, I>>(object: I): SatOpCompensation {
const message = createBaseSatOpCompensation();
message.relationId = object.relationId ?? 0;
message.pkData = (object.pkData !== undefined && object.pkData !== null)
? SatOpRow.fromPartial(object.pkData)
: undefined;
message.tags = object.tags?.map((e) => e) || [];
return message;
},
};

messageTypeRegistry.set(SatOpCompensation.$type, SatOpCompensation);

function createBaseSatOpRow(): SatOpRow {
return { $type: "Electric.Satellite.SatOpRow", nullsBitmask: new Uint8Array(), values: [] };
}
Expand Down
11 changes: 5 additions & 6 deletions clients/typescript/src/migrators/triggers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,9 @@ export function generateOplogTriggers(
/**
* Generates triggers for compensations for all foreign keys in the provided table.
*
* Compensation is recorded as a specially-formatted update. It acts as a no-op, with
* previous value set to NULL, and it's on the server to figure out that this is a no-op
* compensation operation (usually `UPDATE` would have previous row state known). The entire
* reason for it existing is to maybe revive the row if it has been deleted, so we need correct tags.
* Compensation is recorded as a SatOpCompensation messaage. The entire reason
* for it existing is to maybe revive the row if it has been deleted, so we need
* correct tags.
*
* The compensation update contains _just_ the primary keys, no other columns are present.
*
Expand Down Expand Up @@ -155,7 +154,7 @@ function generateCompensationTriggers(table: Table): Statement[] {
1 == (SELECT value from _electric_meta WHERE key == 'compensations')
BEGIN
INSERT INTO _electric_oplog (namespace, tablename, optype, primaryKey, newRow, oldRow, timestamp)
SELECT '${fkTableNamespace}', '${fkTableName}', 'UPDATE', json_object(${joinedFkPKs}), json_object(${joinedFkPKs}), NULL, NULL
SELECT '${fkTableNamespace}', '${fkTableName}', 'COMPENSATION', json_object(${joinedFkPKs}), json_object(${joinedFkPKs}), NULL, NULL
FROM "${fkTableNamespace}"."${fkTableName}" WHERE "${foreignKey.parentKey}" = new."${foreignKey.childKey}";
END;
`,
Expand All @@ -167,7 +166,7 @@ function generateCompensationTriggers(table: Table): Statement[] {
1 == (SELECT value from _electric_meta WHERE key == 'compensations')
BEGIN
INSERT INTO _electric_oplog (namespace, tablename, optype, primaryKey, newRow, oldRow, timestamp)
SELECT '${fkTableNamespace}', '${fkTableName}', 'UPDATE', json_object(${joinedFkPKs}), json_object(${joinedFkPKs}), NULL, NULL
SELECT '${fkTableNamespace}', '${fkTableName}', 'COMPENSATION', json_object(${joinedFkPKs}), json_object(${joinedFkPKs}), NULL, NULL
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to handle client updates (i.e. reapply these triggers)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes we would, good point. off the top of your head, is there a mechanism in place to trigger (😉 ) the re-creation of the triggers?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@icehaunter looking at the client it seems that adding re-application of these triggers on boot is a fairly major change. are we comfortable with postponing that feature for the moment considering that this change is backwards compatible with existing trigger implementations?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FROM "${fkTableNamespace}"."${fkTableName}" WHERE "${foreignKey.parentKey}" = new."${foreignKey.childKey}";
END;
`,
Expand Down
9 changes: 9 additions & 0 deletions clients/typescript/src/satellite/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -592,6 +592,15 @@ export class SatelliteClient implements Client {
},
})
break
case DataChangeType.COMPENSATION:
changeOp = SatTransOp.fromPartial({
compensation: {
pkData: record,
relationId: relation.id,
tags: tags,
},
})
break
}
ops.push(changeOp)
})
Expand Down
1 change: 1 addition & 0 deletions clients/typescript/src/util/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ export enum DataChangeType {
INSERT = 'INSERT',
UPDATE = 'UPDATE',
DELETE = 'DELETE',
COMPENSATION = 'COMPENSATION',
}

export type Change = DataChange | SchemaChange
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,7 @@ defmodule Electric.Postgres.ShadowTableTransformation do

defp build_bitmask(%Changes.NewRecord{}, columns), do: Enum.map(columns, fn _ -> "t" end)

defp build_bitmask(%Changes.UpdatedRecord{old_record: nil}, columns),
do: Enum.map(columns, fn _ -> "f" end)
defp build_bitmask(%Changes.Compensation{}, columns), do: Enum.map(columns, fn _ -> "f" end)

defp build_bitmask(%Changes.UpdatedRecord{old_record: old, record: new}, columns),
do: Enum.map(columns, fn col -> if old[col] != new[col], do: "t", else: "f" end)
Expand Down
17 changes: 13 additions & 4 deletions components/electric/lib/electric/replication/changes.ex
Original file line number Diff line number Diff line change
Expand Up @@ -57,19 +57,18 @@ defmodule Electric.Replication.Changes do
]

def count_operations(%__MODULE__{changes: changes}) do
base = %{operations: 0, inserts: 0, updates: 0, deletes: 0}
base = %{operations: 0, inserts: 0, updates: 0, deletes: 0, compensations: 0}

Enum.reduce(changes, base, fn %module{}, acc ->
key =
case module do
Changes.NewRecord -> :inserts
Changes.UpdatedRecord -> :updates
Changes.DeletedRecord -> :deletes
Changes.Compensation -> :compensations
end

%{^key => value, :operations => total} = acc

%{acc | key => value + 1, :operations => total + 1}
Map.update!(%{acc | operations: acc.operations + 1}, key, &(&1 + 1))
end)
end
end
Expand Down Expand Up @@ -105,6 +104,16 @@ defmodule Electric.Replication.Changes do
}
end

defmodule Compensation do
defstruct [:relation, :record, tags: []]

@type t() :: %__MODULE__{
relation: Changes.relation(),
record: Changes.record(),
tags: [Changes.tag()]
}
end

defmodule TruncatedRelation do
defstruct [:relation]
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -458,10 +458,7 @@ defmodule Electric.Replication.Postgres.SlotServer do
}
end

defp changes_to_wal(
%Changes.UpdatedRecord{relation: table, old_record: nil, record: new},
relations
) do
defp changes_to_wal(%Changes.Compensation{relation: table, record: new}, relations) do
%ReplicationMessages.Update{
relation_id: relations[table].oid,
tuple_data: record_to_tuple(new, relations[table].columns)
Expand Down
10 changes: 1 addition & 9 deletions components/electric/lib/electric/satellite/protobuf.ex
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ defmodule Electric.Satellite.Protobuf do
SatOpInsert,
SatOpUpdate,
SatOpMigrate,
SatOpCompensation,
SatTransOp,
SatRelation,
SatRelationColumn,
Expand All @@ -125,15 +126,6 @@ defmodule Electric.Satellite.Protobuf do
end
end

defmodule Version do
defstruct major: nil, minor: nil

@type t() :: %__MODULE__{
major: integer,
minor: integer
}
end

@spec decode(byte(), binary()) :: {:ok, sq_pb_msg()} | {:error, any()}
for {module, tag} <- @mapping do
def decode(unquote(tag), binary) do
Expand Down
Loading
Loading