Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: add better logging to client proto requests/responses #336

Merged
merged 2 commits into from
Aug 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 6 additions & 52 deletions clients/typescript/src/satellite/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import {
startReplicationErrorToSatelliteError,
shapeRequestToSatShapeReq,
subsErrorToSatelliteError,
msgToString,
} from '../util/proto'
import { Socket, SocketFactory } from '../sockets/index'
import _m0 from 'protobufjs/minimal.js'
Expand Down Expand Up @@ -83,6 +84,7 @@ import {
UnsubscribeResponse,
} from './shapes/types'
import { SubscriptionsDataCache } from './shapes/cache'
import { setMaskBit, getMaskBit } from '../util/bitmaskHelpers'

type IncomingHandler = {
handle: (
Expand Down Expand Up @@ -842,7 +844,8 @@ export class SatelliteClient extends EventEmitter implements Client {
// TODO: properly handle socket errors; update connectivity state
private handleIncoming(data: Buffer) {
const messageOrError = toMessage(data)
Log.info(`Received message ${JSON.stringify(messageOrError)}`)
if (Log.getLevel() <= 1 && !(messageOrError instanceof SatelliteError))
Log.debug(`[proto] recv: ${msgToString(messageOrError)}`)
if (messageOrError instanceof Error) {
this.emit('error', messageOrError)
} else {
Expand Down Expand Up @@ -974,7 +977,7 @@ export class SatelliteClient extends EventEmitter implements Client {
}

private sendMessage<T extends SatPbMsg>(request: T) {
Log.debug(`Sending message ${JSON.stringify(request)}`)
if (Log.getLevel() <= 1) Log.debug(`[proto] send: ${msgToString(request)}`)
if (!this.socket) {
throw new SatelliteError(
SatelliteErrorCode.UNEXPECTED_STATE,
Expand Down Expand Up @@ -1094,55 +1097,6 @@ export function deserializeRow(
)
}

/**
* Sets a bit in the mask. Modifies the mask in place.
*
* Mask is represented as a Uint8Array, which will be serialized element-by-element as a mask.
* This means that `indexFromStart` enumerates all bits in the mask in the order they will be serialized:
*
* @example
* setMaskBit(new Uint8Array([0b00000000, 0b00000000]), 0)
* // => new Uint8Array([0b10000000, 0b00000000])
*
* @example
* setMaskBit(new Uint8Array([0b00000000, 0b00000000]), 8)
* // => new Uint8Array([0b00000000, 0b10000000])
*
* @param array Uint8Array mask
* @param indexFromStart bit index in the mask
*/
function setMaskBit(array: Uint8Array, indexFromStart: number): void {
const byteIndex = Math.floor(indexFromStart / 8)
const bitIndex = 7 - (indexFromStart % 8)

const mask = 0x01 << bitIndex
array[byteIndex] = array[byteIndex] | mask
}

/**
* Reads a bit in the mask
*
* Mask is represented as a Uint8Array, which will be serialized element-by-element as a mask.
* This means that `indexFromStart` enumerates all bits in the mask in the order they will be serialized:
*
* @example
* getMaskBit(new Uint8Array([0b10000000, 0b00000000]), 0)
* // => 1
*
* @example
* getMaskBit(new Uint8Array([0b10000000, 0b00000000]), 8)
* // => 0
*
* @param array Uint8Array mask
* @param indexFromStart bit index in the mask
*/
function getMaskBit(array: Uint8Array, indexFromStart: number): number {
const byteIndex = Math.floor(indexFromStart / 8)
const bitIndex = 7 - (indexFromStart % 8)

return (array[byteIndex] >>> bitIndex) & 0x01
}

function calculateNumBytes(column_num: number): number {
const rem = column_num % 8
if (rem == 0) {
Expand Down Expand Up @@ -1187,7 +1141,7 @@ function serializeNullData(): Uint8Array {
return typeEncoder.text('')
}

export function toMessage(data: Uint8Array): SatPbMsg | Error {
export function toMessage(data: Uint8Array): SatPbMsg | SatelliteError {
const code = data[0]
const type = getTypeFromCode(code)
const obj = getObjFromString(type)
Expand Down
49 changes: 49 additions & 0 deletions clients/typescript/src/util/bitmaskHelpers.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/**
* Sets a bit in the mask. Modifies the mask in place.
*
* Mask is represented as a Uint8Array, which will be serialized element-by-element as a mask.
* This means that `indexFromStart` enumerates all bits in the mask in the order they will be serialized:
*
* @example
* setMaskBit(new Uint8Array([0b00000000, 0b00000000]), 0)
* // => new Uint8Array([0b10000000, 0b00000000])
*
* @example
* setMaskBit(new Uint8Array([0b00000000, 0b00000000]), 8)
* // => new Uint8Array([0b00000000, 0b10000000])
*
* @param array Uint8Array mask
* @param indexFromStart bit index in the mask
*/

export function setMaskBit(array: Uint8Array, indexFromStart: number): void {
const byteIndex = Math.floor(indexFromStart / 8)
const bitIndex = 7 - (indexFromStart % 8)

const mask = 1 << bitIndex
array[byteIndex] = array[byteIndex] | mask
}
/**
* Reads a bit in the mask
*
* Mask is represented as a Uint8Array, which will be serialized element-by-element as a mask.
* This means that `indexFromStart` enumerates all bits in the mask in the order they will be serialized:
*
* @example
* getMaskBit(new Uint8Array([0b10000000, 0b00000000]), 0)
* // => 1
*
* @example
* getMaskBit(new Uint8Array([0b10000000, 0b00000000]), 8)
* // => 0
*
* @param array Uint8Array mask
* @param indexFromStart bit index in the mask
*/

export function getMaskBit(array: Uint8Array, indexFromStart: number): 1 | 0 {
const byteIndex = Math.floor(indexFromStart / 8)
const bitIndex = 7 - (indexFromStart % 8)

return ((array[byteIndex] >>> bitIndex) & 1) as 1 | 0
}
131 changes: 131 additions & 0 deletions clients/typescript/src/util/proto.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ import * as Pb from '../_generated/protocol/satellite'
import * as _m0 from 'protobufjs/minimal'
import { SatelliteError, SatelliteErrorCode } from './types'
import { ShapeRequest } from '../satellite/shapes/types'
import { base64, typeDecoder } from './common'
import { getMaskBit } from './bitmaskHelpers'

type GetName<T extends { $type: string }> =
T['$type'] extends `Electric.Satellite.v1_4.${infer K}` ? K : never
Expand Down Expand Up @@ -269,3 +271,132 @@ export function shapeRequestToSatShapeReq(
}
return shapeReqs
}

export function msgToString(message: SatPbMsg): string {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like this to have a more structured representation, using a map or something, that makes it more readable than going through the switch structure.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wouldn't, to be honest. This is essentially a debug function. I explicitly don't want to make it generic, since we have a very much finite set of messages on the protocol, and debug representation can be just that. I really don't want to encourage using this function for anything more than debug logging. If we need a better intermediate representation of messages - sure, at some point, maybe. I'd rather not over-complicate this function for no reason.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fine

switch (message.$type) {
case 'Electric.Satellite.v1_4.SatAuthReq':
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's going to happen when we bump the protocol version? Seems like this function will start returning undefined and someone will have to find-and-replace all of these by hand. Can we make it protocol-version-agnostic?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if hard to make proto agnostic, add a test that forces us to update this function on a protocol bump

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This case will break types on protocol version update. TS is smart enough to check for case exhaustiveness, and any new messages or changes to exisiting ones will be immediately visible, so I don't think that's a problem

return `#SatAuthReq{id: ${message.id}, token: ${message.token}}`
case 'Electric.Satellite.v1_4.SatAuthResp':
return `#SatAuthResp{id: ${message.id}}`
case 'Electric.Satellite.v1_4.SatErrorResp':
return `#SatErrorResp{type: ${
Pb.SatErrorResp_ErrorCode[message.errorType]
}}`
case 'Electric.Satellite.v1_4.SatInStartReplicationReq': {
const schemaVersion = message.schemaVersion
? ` schema: ${message.schemaVersion},`
: ''
return `#SatInStartReplicationReq{lsn: ${base64.fromBytes(
message.lsn
)},${schemaVersion} subscriptions: [${message.subscriptionIds}]}`
}
case 'Electric.Satellite.v1_4.SatInStartReplicationResp':
return `#SatInStartReplicationResp{}`
case 'Electric.Satellite.v1_4.SatInStopReplicationReq':
return `#SatInStopReplicationReq{}`
case 'Electric.Satellite.v1_4.SatInStopReplicationResp':
return `#SatInStopReplicationResp{}`
case 'Electric.Satellite.v1_4.SatMigrationNotification':
return `#SatMigrationNotification{to: ${message.newSchemaVersion}, from: ${message.newSchemaVersion}}`
case 'Electric.Satellite.v1_4.SatPingReq':
return `#SatPingReq{}`
case 'Electric.Satellite.v1_4.SatPingResp':
return `#SatPingResp{lsn: ${
message.lsn ? base64.fromBytes(message.lsn) : 'NULL'
}}`
case 'Electric.Satellite.v1_4.SatRelation': {
const cols = message.columns
.map((x) => `${x.name}: ${x.type}${x.primaryKey ? ' PK' : ''}`)
.join(', ')
return `#SatRelation{for: ${message.schemaName}.${message.tableName}, as: ${message.relationId}, cols: [${cols}]}`
}
case 'Electric.Satellite.v1_4.SatSubsDataBegin':
return `#SatSubsDataBegin{id: ${
message.subscriptionId
}, lsn: ${base64.fromBytes(message.lsn)}}`
case 'Electric.Satellite.v1_4.SatSubsDataEnd':
return `#SatSubsDataEnd{}`
case 'Electric.Satellite.v1_4.SatShapeDataBegin':
return `#SatShapeDataBegin{id: ${message.requestId}}`
case 'Electric.Satellite.v1_4.SatShapeDataEnd':
return `#SatShapeDataEnd{}`
case 'Electric.Satellite.v1_4.SatSubsDataError': {
const shapeErrors = message.shapeRequestError.map(
(x) =>
`${x.requestId}: ${Pb.SatSubsDataError_ShapeReqError_Code[x.code]} (${
x.message
})`
)
const code = Pb.SatSubsDataError_Code[message.code]
return `#SatSubsDataError{id: ${message.subscriptionId}, code: ${code}, msg: "${message.message}", errors: [${shapeErrors}]}`
}
case 'Electric.Satellite.v1_4.SatSubsReq':
return `#SatSubsReq{id: ${message.subscriptionId}, shapes: ${message.shapeRequests}}`
case 'Electric.Satellite.v1_4.SatSubsResp': {
if (message.err) {
const shapeErrors = message.err.shapeRequestError.map(
(x) =>
`${x.requestId}: ${
Pb.SatSubsResp_SatSubsError_ShapeReqError_Code[x.code]
} (${x.message})`
)
return `#SatSubsReq{id: ${message.subscriptionId}, err: ${
Pb.SatSubsResp_SatSubsError_Code[message.err.code]
} (${message.err.message}), shapes: [${shapeErrors}]}`
} else {
return `#SatSubsReq{id: ${message.subscriptionId}}`
}
}
case 'Electric.Satellite.v1_4.SatUnsubsReq':
return `#SatUnsubsReq{ids: ${message.subscriptionIds}}`
case 'Electric.Satellite.v1_4.SatUnsubsResp':
return `#SatUnsubsResp{}`
case 'Electric.Satellite.v1_4.SatOpLog':
return `#SatOpLog{ops: [${message.ops.map(opToString).join(', ')}]}`
}
}

function opToString(op: Pb.SatTransOp): string {
if (op.begin)
return `#Begin{lsn: ${base64.fromBytes(
op.begin.lsn
)}, ts: ${op.begin.commitTimestamp.toString()}, isMigration: ${
op.begin.isMigration
}}`
if (op.commit) return `#Commit{lsn: ${base64.fromBytes(op.commit.lsn)}}`
if (op.insert)
return `#Insert{for: ${op.insert.relationId}, tags: [${
op.insert.tags
}], new: [${op.insert.rowData ? rowToString(op.insert.rowData) : ''}]}`
if (op.update)
return `#Update{for: ${op.update.relationId}, tags: [${
op.update.tags
}], new: [${
op.update.rowData ? rowToString(op.update.rowData) : ''
}], old: data: [${
op.update.oldRowData ? rowToString(op.update.oldRowData) : ''
}]}`
if (op.delete)
return `#Delete{for: ${op.delete.relationId}, tags: [${
op.delete.tags
}], old: [${
op.delete.oldRowData ? rowToString(op.delete.oldRowData) : ''
}]}`
if (op.migrate)
return `#Migrate{vsn: ${op.migrate.version}, for: ${
op.migrate.table?.name
}, stmts: [${op.migrate.stmts
.map((x) => x.sql.replaceAll('\n', '\\n'))
.join('; ')}]}`
return ''
}

function rowToString(row: Pb.SatOpRow): string {
return row.values
.map((x, i) =>
getMaskBit(row.nullsBitmask, i) == 0
? JSON.stringify(typeDecoder.text(x))
: '∅'
)
.join(', ')
}
4 changes: 2 additions & 2 deletions e2e/tests/03.03_node_satellite_sends_and_recieves_data.lux
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@
[invoke migrate_items_table 20230504114018]

[shell satellite_1]
?Received message \{"\$$type":"Electric.Satellite.v\d+_\d+.SatInStartReplicationResp"\}
??[proto] recv: #SatInStartReplicationResp
[invoke node_await_table "items"]
[invoke node_sync_table "items"]
[shell satellite_2]
?Received message \{"\$$type":"Electric.Satellite.v\d+_\d+.SatInStartReplicationResp"\}
??[proto] recv: #SatInStartReplicationResp
[invoke node_await_table "items"]
[invoke node_sync_table "items"]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@
!\d items

[shell satellite_1]
?Received message \{"\$$type":"Electric.Satellite.v\d+_\d+.SatInStartReplicationResp"\}
??[proto] recv: #SatInStartReplicationResp
[invoke node_await_table "items"]
[invoke node_sync_table "items"]
[shell satellite_2]
?Received message \{"\$$type":"Electric.Satellite.v\d+_\d+.SatInStartReplicationResp"\}
??[proto] recv: #SatInStartReplicationResp
[invoke node_await_table "items"]
[invoke node_sync_table "items"]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,19 +38,15 @@
[invoke setup_client 1 "electric_1" 5133]

[shell satellite_1]
?Sending message \{"\$$type":"Electric.Satellite.v[0-9_]+.SatInStartReplicationReq","lsn":\{\}
?Received message \{"\$$type":"Electric.Satellite.v[0-9_]+.SatInStartReplicationResp"\}
??[proto] send: #SatInStartReplicationReq
??[proto] recv: #SatInStartReplicationResp

# Verifying the initial sync. The client receives migrations corresponding to all electrified tables on the server.
?Received message \{"\$$type":"Electric.Satellite.v[0-9_]+.SatRelation","schemaName":"public",.*"tableName":"entries"
?Received message \{"\$$type":"Electric.Satellite.v[0-9_]+.SatOpLog","ops":\[\
\{"\$$type":"Electric.Satellite.v[0-9_]+.SatTransOp","begin":\{.*\}\},\
\{"\$$type":"Electric.Satellite.v[0-9_]+.SatTransOp","migrate":\{.*,"version":"[0-9_]+",.*,"table":\{.*,"name":"entries"

?Received message \{"\$$type":"Electric.Satellite.v[0-9_]+.SatRelation","schemaName":"public",.*"tableName":"items"
?Received message \{"\$$type":"Electric.Satellite.v[0-9_]+.SatOpLog","ops":\[\
\{"\$$type":"Electric.Satellite.v[0-9_]+.SatTransOp","begin":\{.*\}\},\
\{"\$$type":"Electric.Satellite.v[0-9_]+.SatTransOp","migrate":\{.*,"version":"[0-9_]+",.*,"table":\{.*,"name":"items"
??[proto] recv: #SatRelation{for: public.entries
?\[proto\] recv: #SatOpLog\{.*#Migrate\{vsn: [0-9_]+, for: entries

??[proto] recv: #SatRelation{for: public.items
?\[proto\] recv: #SatOpLog\{.*#Migrate\{vsn: [0-9_]+, for: items

[cleanup]
[invoke teardown]
2 changes: 1 addition & 1 deletion e2e/tests/03.06_node_satellite_does_sync_on_subscribe.lux
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
[invoke migrate_items_table 20230504114018]

[shell satellite_1]
?Received message \{"\$$type":"Electric.Satellite.v\d+_\d+.SatInStartReplicationResp"\}
??[proto] recv: #SatInStartReplicationResp
[invoke node_await_table "items"]
# We shouldn't see this "hello from pg" until we actually do sync
-(hello from pg|$fail_pattern)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
[invoke migrate_items_table 20230504114018]

[shell satellite_1]
?Received message \{"\$$type":"Electric.Satellite.v\d+_\d+.SatInStartReplicationResp"\}
??[proto] recv: #SatInStartReplicationResp
[invoke node_await_table "items"]
# We shouldn't see this "hello from pg" until we actually do sync
-(hello from pg|$fail_pattern)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
[invoke migrate_items_table 20230504114018]

[shell satellite_1]
?Received message \{"\$$type":"Electric.Satellite.v\d+_\d+.SatInStartReplicationResp"\}
??[proto] recv: #SatInStartReplicationResp
[invoke node_await_table "items"]
# We shouldn't see this "hello from pg" until we actually do sync
-(hello from pg|$fail_pattern)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,14 @@
# The client starts replication from scratch.
?no previous LSN, start replication from scratch
# It passes its schema version to the server.
?Sending message \{"\$$type":"Electric.Satellite.v[0-9_]+.SatInStartReplicationReq","lsn":\{\},.*?"schemaVersion":"$migration1_vsn"
?Received message \{"\$$type":"Electric.Satellite.v[0-9_]+.SatInStartReplicationResp"\}
??[proto] send: #SatInStartReplicationReq{lsn: , schema: $migration1_vsn
??[proto] recv: #SatInStartReplicationResp{}

# The client receives only the second migration.
?Received message \{"\$$type":"Electric.Satellite.v[0-9_]+.SatOpLog","ops":\[\
\{"\$$type":"Electric.Satellite.v[0-9_]+.SatTransOp","begin":\{.*\}\},\
\{"\$$type":"Electric.Satellite.v[0-9_]+.SatTransOp","migrate":\{.*,"version":"$migration2_vsn",.*,"table":\{.*,"name":"bar"
?\[proto\] recv: #SatOpLog\{.*#Migrate\{vsn: $migration2_vsn, for: bar

# Wait for the ping message to make sure we don't receive anything mentioning $migration1_vsn from the server.
?Received message \{"\$$type":"Electric.Satellite.v[0-9_]+.SatPingReq"\}
??[proto] recv: #SatPingReq{}

[cleanup]
[invoke teardown]
Loading
Loading