Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
292 changes: 292 additions & 0 deletions internal/engineproto/engine.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,292 @@
syntax = "proto3";

package centrifugal.centrifugo.engine;

option go_package = "./;engineproto";

// Controller service defines methods for inter-node communication.
service Controller {
// StreamControlEvents delivers a unidirectional stream of messages from other nodes in
// the cluster.
rpc StreamControlEvents(StreamControlEventsRequest) returns (stream ControlEvent);
// PublishControl sends a control message to other nodes. See PublishControlRequest
// for more semantics description.
rpc PublishControl(PublishControlRequest) returns (PublishControlResponse);
}

// Broker service defines methods for PUB/SUB implementation. It can optionally provide
// history management in channels.
service Broker {
// StreamBrokerEvents delivers a unidirectional stream of broker events to the node.
rpc StreamBrokerEvents(StreamBrokerEventsRequest) returns (stream BrokerEvent);
// Subscribe to a channel.
rpc Subscribe(SubscribeRequest) returns (SubscribeResponse);
// Unsubscribe from a channel.
rpc Unsubscribe(UnsubscribeRequest) returns (UnsubscribeResponse);
// Publish sends data into a channel. See PublishRequest for more semantics description.
rpc Publish(PublishRequest) returns (PublishResponse);
// PublishJoin sends a Join message into a channel.
rpc PublishJoin(PublishJoinRequest) returns (PublishJoinResponse);
// PublishLeave sends a Leave message into a channel.
rpc PublishLeave(PublishLeaveRequest) returns (PublishLeaveResponse);
// History extracts publications from a channel’s history stream.
rpc History(HistoryRequest) returns (HistoryResponse);
// RemoveHistory removes the history from a channel.
rpc RemoveHistory(RemoveHistoryRequest) returns (RemoveHistoryResponse);

// Below, we also have Controller methods. Generally, these methods are not
// required to be implemented if Controller explicitly configured in Centrifugo
// (in-memory, Redis-based or Nats-based), or it's a Broker defined for a
// channel namespace.

// StreamControlEvents is the same as Controller.StreamControlEvents.
rpc StreamControlEvents(StreamControlEventsRequest) returns (stream ControlEvent);
// PublishControl is the same as Controller.PublishControl.
rpc PublishControl(PublishControlRequest) returns (PublishControlResponse);
}

// PresenceManager service is responsible for channel presence management.
service PresenceManager {
// Presence returns full actual presence data for a channel.
rpc Presence(PresenceRequest) returns (PresenceResponse);
// PresenceStats returns short statistics of current presence data.
rpc PresenceStats(PresenceStatsRequest) returns (PresenceStats);
// AddPresence sets or updates presence information for a client. This is
// called on connection start and periodically for channels with presence on.
rpc AddPresence(AddPresenceRequest) returns (AddPresenceResponse);
// RemovePresence removes presence information for a client. This is called when
// client leaves channel. If this method returns error then request won't
// be retried – PresenceManager implementation should detect and eventually
// expire stale (not updated) client entries.
rpc RemovePresence(RemovePresenceRequest) returns (RemovePresenceResponse);
}

// ControlEvent represents a control event carrying arbitrary control data.
message ControlEvent {
bytes data = 1;
}

message StreamControlEventsRequest {
// The identifier of current Centrifugo node.
string node_id = 1;
}

// Publication is a data sent to a channel.
message Publication {
// Offset is an incremental position number inside a history stream.
uint64 offset = 1;
// Data published to a channel.
bytes data = 2;
// Info is optional information about client connection published with this data.
ClientInfo info = 3;
// Tags contains a map with custom key-values attached to a Publication.
map<string, string> tags = 4;
// Time of publication as Unix timestamp in milliseconds.
int64 time = 5;
// Channel is only set when subscription channel does not match channel in Publication.
string channel = 6;
}

// ClientInfo contains information about a client connection.
message ClientInfo {
// ClientID is a client unique id.
string client_id = 1;
// UserID is an ID of an authenticated user. Empty means anonymous.
string user_id = 2;
// ConnInfo is additional information about the connection.
bytes conn_info = 3;
// ChanInfo is additional information in the context of a channel subscription.
bytes chan_info = 4;
}

// StreamPosition describes a position in a publication history stream.
message StreamPosition {
// Offset defines the publication incremental offset.
uint64 offset = 1;
// Epoch allows handling situations when the history stream is reset.
string epoch = 2;
}

// HistoryFilter allows filtering history according to the set fields.
message HistoryFilter {
// Since extracts publications from stream starting at the given position.
StreamPosition since = 1;
// Limit number of publications to return (-1 for no limit, 0 for none).
int32 limit = 2;
// Reverse direction flag.
bool reverse = 3;
}

// HistoryOptions define fields to alter History method behaviour.
message HistoryOptions {
// Filter for history publications.
HistoryFilter filter = 1;
// MetaTTL overrides the default history meta information expiration time.
int64 meta_ttl = 2;
}

// PublishOptions define fields to alter the behaviour of the Publish operation.
message PublishOptions {
// history_ttl sets history expiration for inactive streams.
int64 history_ttl = 1;
// history_size sets a limit on history size.
int32 history_size = 2;
// history_meta_ttl sets a history meta information expiration time upon publish.
int64 history_meta_ttl = 3;
// client_info to include into the Publication.
ClientInfo client_info = 4;
// tags to attach to the Publication.
map<string, string> tags = 5;
// idempotency_key for idempotent publish.
string idempotency_key = 6;
// idempotent_result_ttl sets expiration for idempotency results in channel.
int64 idempotent_result_ttl = 7;
// use_delta is set to true when delta encoding should be used for this publication.
bool use_delta = 8;
}

// PublicationEvent represents an event carrying publication data.
message PublicationEvent {
string channel = 1;
Publication publication = 2;
StreamPosition stream_position = 3;
bool use_delta = 4;
Publication previous_publication = 5; // Optional previous publication (if use_delta was true).
}

// JoinEvent represents an event when a client joins a channel.
message JoinEvent {
string channel = 1;
ClientInfo info = 2;
}

// LeaveEvent represents an event when a client leaves a channel.
message LeaveEvent {
string channel = 1;
ClientInfo info = 2;
}

// BrokerEvent is a wrapper that can carry one of the event types.
// Should contain ONE OF the defined events. We are not using oneof
// feature of Protobuf for better interoperability with different
// systems
message BrokerEvent {
PublicationEvent publication_event = 1;
JoinEvent join_event = 2;
LeaveEvent leave_event = 3;
}

message StreamBrokerEventsRequest {}

message SubscribeRequest {
string channel = 1;
}

message SubscribeResponse {
}

message UnsubscribeRequest {
string channel = 1;
}

message UnsubscribeResponse {
}

message PublishRequest {
// channel to publish to.
string channel = 1;
// data to publish.
bytes data = 2;
// various options for publish operation.
PublishOptions opts = 3;
}

message PublishResponse {
// StreamPosition describes the stream epoch and offset assigned to the publication.
StreamPosition stream_position = 1;
// Suppressed indicates if the Publish operation was idempotently suppressed.
bool suppressed = 2;
}

message PublishJoinRequest {
string channel = 1;
ClientInfo info = 2;
}

message PublishJoinResponse {}

message PublishLeaveRequest {
string channel = 1;
ClientInfo info = 2;
}

message PublishLeaveResponse {}

message PublishControlRequest {
// data is a payload set by Centrifuge, it should be delivered untouched to other nodes.
bytes data = 1;
// If node_id is empty then the message must be delivered to all nodes. If set – only to node with that identifier.
string node_id = 2;
}

message PublishControlResponse {}

message HistoryRequest {
string channel = 1;
HistoryOptions opts = 2;
}

message HistoryResponse {
repeated Publication publications = 1;
// StreamPosition reflects the current history stream top offset and epoch.
StreamPosition stream_position = 2;
}

message RemoveHistoryRequest {
string channel = 1;
}

message RemoveHistoryResponse {}

// ------------------------
// Presence Manager Messages
// ------------------------

// PresenceStats represents short presence information for a channel.
message PresenceStats {
int32 num_clients = 1; // Number of client connections.
int32 num_users = 2; // Number of unique users.
}

// PresenceRequest is used to query full presence data.
message PresenceRequest {
string channel = 1;
}

// PresenceResponse returns full presence data as a map.
message PresenceResponse {
map<string, ClientInfo> clients = 1;
}

// PresenceStatsRequest is used to request presence statistics.
message PresenceStatsRequest {
string channel = 1;
}

// AddPresenceRequest updates or sets presence information.
message AddPresenceRequest {
string channel = 1;
string client_id = 2;
ClientInfo info = 3;
}

message AddPresenceResponse {}

// RemovePresenceRequest removes presence information.
message RemovePresenceRequest {
string channel = 1;
string client_id = 2;
string user_id = 3;
}

message RemovePresenceResponse {}
Loading