33
33
34
34
class AsyncRealtimeChannel :
35
35
"""
36
- ` Channel` is an abstraction for a topic listener for an existing socket connection.
37
- Each Channel has its own topic and a list of event-callbacks that responds to messages.
38
- Should only be instantiated through `connection.RealtimeClient() .channel(topic)`.
36
+ Channel is an abstraction for a topic subscription on an existing socket connection.
37
+ Each Channel has its own topic and a list of event-callbacks that respond to messages.
38
+ Should only be instantiated through `AsyncRealtimeClient .channel(topic)`.
39
39
"""
40
40
41
41
def __init__ (
@@ -52,13 +52,14 @@ def __init__(
52
52
:param params: Optional parameters for connection.
53
53
"""
54
54
self .socket = socket
55
- self .params = params or RealtimeChannelOptions (
56
- config = {
55
+ self .params = params or {}
56
+ if self .params .get ("config" ) is None :
57
+ self .params ["config" ] = {
57
58
"broadcast" : {"ack" : False , "self" : False },
58
59
"presence" : {"key" : "" },
59
60
"private" : False ,
60
61
}
61
- )
62
+
62
63
self .topic = topic
63
64
self ._joined_once = False
64
65
self .bindings : Dict [str , List [Binding ]] = {}
@@ -97,7 +98,7 @@ def on_close(*args):
97
98
logger .info (f"channel { self .topic } closed" )
98
99
self .rejoin_timer .reset ()
99
100
self .state = ChannelStates .CLOSED
100
- self .socket .remove_channel (self )
101
+ self .socket ._remove_channel (self )
101
102
102
103
def on_error (payload , * args ):
103
104
if self .is_leaving or self .is_closed :
@@ -148,12 +149,16 @@ async def subscribe(
148
149
] = None ,
149
150
) -> AsyncRealtimeChannel :
150
151
"""
151
- Subscribe to the channel.
152
+ Subscribe to the channel. Can only be called once per channel instance.
152
153
153
- :return: The Channel instance for method chaining.
154
+ :param callback: Optional callback function that receives subscription state updates
155
+ and any errors that occur during subscription
156
+ :return: The Channel instance for method chaining
157
+ :raises: Exception if called multiple times on the same channel instance
154
158
"""
155
159
if not self .socket .is_connected :
156
160
await self .socket .connect ()
161
+
157
162
if self ._joined_once :
158
163
raise Exception (
159
164
"Tried to subscribe multiple times. 'subscribe' can only be called a single time per channel instance"
@@ -249,6 +254,10 @@ def on_join_push_timeout(*args):
249
254
return self
250
255
251
256
async def unsubscribe (self ):
257
+ """
258
+ Unsubscribe from the channel and leave the topic.
259
+ Sets channel state to LEAVING and cleans up timers and pushes.
260
+ """
252
261
self .state = ChannelStates .LEAVING
253
262
254
263
self .rejoin_timer .reset ()
@@ -269,6 +278,15 @@ def _close(*args):
269
278
async def push (
270
279
self , event : str , payload : Dict [str , Any ], timeout : Optional [int ] = None
271
280
) -> AsyncPush :
281
+ """
282
+ Push a message to the channel.
283
+
284
+ :param event: The event name to push
285
+ :param payload: The payload to send
286
+ :param timeout: Optional timeout in milliseconds
287
+ :return: AsyncPush instance representing the push operation
288
+ :raises: Exception if called before subscribing to the channel
289
+ """
272
290
if not self ._joined_once :
273
291
raise Exception (
274
292
f"tried to push '{ event } ' to '{ self .topic } ' before joining. Use channel.subscribe() before pushing events"
@@ -350,9 +368,9 @@ def on_broadcast(
350
368
"""
351
369
Set up a listener for a specific broadcast event.
352
370
353
- :param event: The name of the broadcast event to listen for.
354
- :param callback: The callback function to execute when the event is received.
355
- :return: The Channel instance for method chaining.
371
+ :param event: The name of the broadcast event to listen for
372
+ :param callback: Function called with the payload when a matching broadcast is received
373
+ :return: The Channel instance for method chaining
356
374
"""
357
375
return self ._on (
358
376
"broadcast" ,
@@ -369,13 +387,14 @@ def on_postgres_changes(
369
387
filter : Optional [str ] = None ,
370
388
) -> AsyncRealtimeChannel :
371
389
"""
372
- Set up a listener for a specific Postgres changes event.
373
-
374
- :param event: The name of the Postgres changes event to listen for.
375
- :param table: The table name for which changes should be monitored.
376
- :param callback: The callback function to execute when the event is received.
377
- :param schema: The database schema where the table exists. Default is 'public'.
378
- :return: The Channel instance for method chaining.
390
+ Set up a listener for Postgres database changes.
391
+
392
+ :param event: The type of database event to listen for (INSERT, UPDATE, DELETE, or *)
393
+ :param callback: Function called with the payload when a matching change is detected
394
+ :param table: The table name to monitor. Defaults to "*" for all tables
395
+ :param schema: The database schema to monitor. Defaults to "public"
396
+ :param filter: Optional filter string to apply
397
+ :return: The Channel instance for method chaining
379
398
"""
380
399
381
400
binding_filter = {"event" : event , "schema" : schema , "table" : table }
@@ -402,22 +421,24 @@ def on_system(
402
421
# Presence methods
403
422
async def track (self , user_status : Dict [str , Any ]) -> None :
404
423
"""
405
- Track a user's presence .
424
+ Track presence status for the current user .
406
425
407
- :param user_status: User's presence status.
408
- :return: None
426
+ :param user_status: Dictionary containing the user's presence information
409
427
"""
410
428
await self .send_presence ("track" , user_status )
411
429
412
430
async def untrack (self ) -> None :
413
431
"""
414
- Untrack a user's presence.
415
-
416
- :return: None
432
+ Stop tracking presence for the current user.
417
433
"""
418
434
await self .send_presence ("untrack" , {})
419
435
420
436
def presence_state (self ) -> RealtimePresenceState :
437
+ """
438
+ Get the current state of presence on this channel.
439
+
440
+ :return: Dictionary mapping presence keys to lists of presence payloads
441
+ """
421
442
return self .presence .state
422
443
423
444
def on_presence_sync (self , callback : Callable [[], None ]) -> AsyncRealtimeChannel :
@@ -457,11 +478,10 @@ def on_presence_leave(
457
478
# Broadcast methods
458
479
async def send_broadcast (self , event : str , data : Any ) -> None :
459
480
"""
460
- Sends a broadcast message to the current channel.
481
+ Send a broadcast message through this channel.
461
482
462
- :param event: The name of the broadcast event.
463
- :param data: The data to be sent with the message.
464
- :return: An asyncio.Future object representing the send operation.
483
+ :param event: The name of the broadcast event
484
+ :param data: The payload to broadcast
465
485
"""
466
486
await self .push (
467
487
ChannelEvents .broadcast ,
0 commit comments