diff --git a/internal/engineproto/engine.proto b/internal/engineproto/engine.proto new file mode 100644 index 0000000000..8bb32ea5cd --- /dev/null +++ b/internal/engineproto/engine.proto @@ -0,0 +1,292 @@ +syntax = "proto3"; + +package centrifugal.centrifugo.engine; + +option go_package = "./;engineproto"; + +// Controller service defines methods for inter-node communication. +service Controller { + // StreamControlEvents delivers a unidirectional stream of messages from other nodes in + // the cluster. + rpc StreamControlEvents(StreamControlEventsRequest) returns (stream ControlEvent); + // PublishControl sends a control message to other nodes. See PublishControlRequest + // for more semantics description. + rpc PublishControl(PublishControlRequest) returns (PublishControlResponse); +} + +// Broker service defines methods for PUB/SUB implementation. It can optionally provide +// history management in channels. +service Broker { + // StreamBrokerEvents delivers a unidirectional stream of broker events to the node. + rpc StreamBrokerEvents(StreamBrokerEventsRequest) returns (stream BrokerEvent); + // Subscribe to a channel. + rpc Subscribe(SubscribeRequest) returns (SubscribeResponse); + // Unsubscribe from a channel. + rpc Unsubscribe(UnsubscribeRequest) returns (UnsubscribeResponse); + // Publish sends data into a channel. See PublishRequest for more semantics description. + rpc Publish(PublishRequest) returns (PublishResponse); + // PublishJoin sends a Join message into a channel. + rpc PublishJoin(PublishJoinRequest) returns (PublishJoinResponse); + // PublishLeave sends a Leave message into a channel. + rpc PublishLeave(PublishLeaveRequest) returns (PublishLeaveResponse); + // History extracts publications from a channel’s history stream. + rpc History(HistoryRequest) returns (HistoryResponse); + // RemoveHistory removes the history from a channel. + rpc RemoveHistory(RemoveHistoryRequest) returns (RemoveHistoryResponse); + + // Below, we also have Controller methods. Generally, these methods are not + // required to be implemented if Controller explicitly configured in Centrifugo + // (in-memory, Redis-based or Nats-based), or it's a Broker defined for a + // channel namespace. + + // StreamControlEvents is the same as Controller.StreamControlEvents. + rpc StreamControlEvents(StreamControlEventsRequest) returns (stream ControlEvent); + // PublishControl is the same as Controller.PublishControl. + rpc PublishControl(PublishControlRequest) returns (PublishControlResponse); +} + +// PresenceManager service is responsible for channel presence management. +service PresenceManager { + // Presence returns full actual presence data for a channel. + rpc Presence(PresenceRequest) returns (PresenceResponse); + // PresenceStats returns short statistics of current presence data. + rpc PresenceStats(PresenceStatsRequest) returns (PresenceStats); + // AddPresence sets or updates presence information for a client. This is + // called on connection start and periodically for channels with presence on. + rpc AddPresence(AddPresenceRequest) returns (AddPresenceResponse); + // RemovePresence removes presence information for a client. This is called when + // client leaves channel. If this method returns error then request won't + // be retried – PresenceManager implementation should detect and eventually + // expire stale (not updated) client entries. + rpc RemovePresence(RemovePresenceRequest) returns (RemovePresenceResponse); +} + +// ControlEvent represents a control event carrying arbitrary control data. +message ControlEvent { + bytes data = 1; +} + +message StreamControlEventsRequest { + // The identifier of current Centrifugo node. + string node_id = 1; +} + +// Publication is a data sent to a channel. +message Publication { + // Offset is an incremental position number inside a history stream. + uint64 offset = 1; + // Data published to a channel. + bytes data = 2; + // Info is optional information about client connection published with this data. + ClientInfo info = 3; + // Tags contains a map with custom key-values attached to a Publication. + map tags = 4; + // Time of publication as Unix timestamp in milliseconds. + int64 time = 5; + // Channel is only set when subscription channel does not match channel in Publication. + string channel = 6; +} + +// ClientInfo contains information about a client connection. +message ClientInfo { + // ClientID is a client unique id. + string client_id = 1; + // UserID is an ID of an authenticated user. Empty means anonymous. + string user_id = 2; + // ConnInfo is additional information about the connection. + bytes conn_info = 3; + // ChanInfo is additional information in the context of a channel subscription. + bytes chan_info = 4; +} + +// StreamPosition describes a position in a publication history stream. +message StreamPosition { + // Offset defines the publication incremental offset. + uint64 offset = 1; + // Epoch allows handling situations when the history stream is reset. + string epoch = 2; +} + +// HistoryFilter allows filtering history according to the set fields. +message HistoryFilter { + // Since extracts publications from stream starting at the given position. + StreamPosition since = 1; + // Limit number of publications to return (-1 for no limit, 0 for none). + int32 limit = 2; + // Reverse direction flag. + bool reverse = 3; +} + +// HistoryOptions define fields to alter History method behaviour. +message HistoryOptions { + // Filter for history publications. + HistoryFilter filter = 1; + // MetaTTL overrides the default history meta information expiration time. + int64 meta_ttl = 2; +} + +// PublishOptions define fields to alter the behaviour of the Publish operation. +message PublishOptions { + // history_ttl sets history expiration for inactive streams. + int64 history_ttl = 1; + // history_size sets a limit on history size. + int32 history_size = 2; + // history_meta_ttl sets a history meta information expiration time upon publish. + int64 history_meta_ttl = 3; + // client_info to include into the Publication. + ClientInfo client_info = 4; + // tags to attach to the Publication. + map tags = 5; + // idempotency_key for idempotent publish. + string idempotency_key = 6; + // idempotent_result_ttl sets expiration for idempotency results in channel. + int64 idempotent_result_ttl = 7; + // use_delta is set to true when delta encoding should be used for this publication. + bool use_delta = 8; +} + +// PublicationEvent represents an event carrying publication data. +message PublicationEvent { + string channel = 1; + Publication publication = 2; + StreamPosition stream_position = 3; + bool use_delta = 4; + Publication previous_publication = 5; // Optional previous publication (if use_delta was true). +} + +// JoinEvent represents an event when a client joins a channel. +message JoinEvent { + string channel = 1; + ClientInfo info = 2; +} + +// LeaveEvent represents an event when a client leaves a channel. +message LeaveEvent { + string channel = 1; + ClientInfo info = 2; +} + +// BrokerEvent is a wrapper that can carry one of the event types. +// Should contain ONE OF the defined events. We are not using oneof +// feature of Protobuf for better interoperability with different +// systems +message BrokerEvent { + PublicationEvent publication_event = 1; + JoinEvent join_event = 2; + LeaveEvent leave_event = 3; +} + +message StreamBrokerEventsRequest {} + +message SubscribeRequest { + string channel = 1; +} + +message SubscribeResponse { +} + +message UnsubscribeRequest { + string channel = 1; +} + +message UnsubscribeResponse { +} + +message PublishRequest { + // channel to publish to. + string channel = 1; + // data to publish. + bytes data = 2; + // various options for publish operation. + PublishOptions opts = 3; +} + +message PublishResponse { + // StreamPosition describes the stream epoch and offset assigned to the publication. + StreamPosition stream_position = 1; + // Suppressed indicates if the Publish operation was idempotently suppressed. + bool suppressed = 2; +} + +message PublishJoinRequest { + string channel = 1; + ClientInfo info = 2; +} + +message PublishJoinResponse {} + +message PublishLeaveRequest { + string channel = 1; + ClientInfo info = 2; +} + +message PublishLeaveResponse {} + +message PublishControlRequest { + // data is a payload set by Centrifuge, it should be delivered untouched to other nodes. + bytes data = 1; + // If node_id is empty then the message must be delivered to all nodes. If set – only to node with that identifier. + string node_id = 2; +} + +message PublishControlResponse {} + +message HistoryRequest { + string channel = 1; + HistoryOptions opts = 2; +} + +message HistoryResponse { + repeated Publication publications = 1; + // StreamPosition reflects the current history stream top offset and epoch. + StreamPosition stream_position = 2; +} + +message RemoveHistoryRequest { + string channel = 1; +} + +message RemoveHistoryResponse {} + +// ------------------------ +// Presence Manager Messages +// ------------------------ + +// PresenceStats represents short presence information for a channel. +message PresenceStats { + int32 num_clients = 1; // Number of client connections. + int32 num_users = 2; // Number of unique users. +} + +// PresenceRequest is used to query full presence data. +message PresenceRequest { + string channel = 1; +} + +// PresenceResponse returns full presence data as a map. +message PresenceResponse { + map clients = 1; +} + +// PresenceStatsRequest is used to request presence statistics. +message PresenceStatsRequest { + string channel = 1; +} + +// AddPresenceRequest updates or sets presence information. +message AddPresenceRequest { + string channel = 1; + string client_id = 2; + ClientInfo info = 3; +} + +message AddPresenceResponse {} + +// RemovePresenceRequest removes presence information. +message RemovePresenceRequest { + string channel = 1; + string client_id = 2; + string user_id = 3; +} + +message RemovePresenceResponse {}