From a8bf84d0cab1d6eca0c40b9964c2bde8a1076079 Mon Sep 17 00:00:00 2001 From: Alexander Emelin Date: Sun, 4 Nov 2018 11:18:02 +0300 Subject: [PATCH] update centrifuge lib: sharding channel resubscribe fix --- Gopkg.lock | 4 +-- .../centrifugal/centrifuge/engine_redis.go | 36 ++++++++++--------- 2 files changed, 22 insertions(+), 18 deletions(-) diff --git a/Gopkg.lock b/Gopkg.lock index 2e74ccc486..c8ddb5aeb5 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -43,7 +43,7 @@ [[projects]] branch = "master" - digest = "1:1c6c8077b2c23538b24150118c5e8a9382cba5d2d4ada4326c871008b1aec5da" + digest = "1:a8330e7a6fe67ab7d257487dbf633ca9e1cffc55ca18837691ca6f27b080a9d1" name = "github.com/centrifugal/centrifuge" packages = [ ".", @@ -54,7 +54,7 @@ "internal/uuid", ] pruneopts = "UT" - revision = "3e13dcb2c8bfbba9d75f69608df683f59b6b2824" + revision = "3624b26218753c901df07667436d7ca673151e97" [[projects]] digest = "1:a2c1d0e43bd3baaa071d1b9ed72c27d78169b2b269f71c105ac4ba34b1be4a39" diff --git a/vendor/github.com/centrifugal/centrifuge/engine_redis.go b/vendor/github.com/centrifugal/centrifuge/engine_redis.go index 21390c3a3f..4ca2aa53ea 100644 --- a/vendor/github.com/centrifugal/centrifuge/engine_redis.go +++ b/vendor/github.com/centrifugal/centrifuge/engine_redis.go @@ -76,6 +76,7 @@ type RedisEngine struct { // shard has everything to connect to Redis instance. type shard struct { node *Node + engine *RedisEngine eventHandler EngineEventHandler config RedisShardConfig pool *redis.Pool @@ -483,11 +484,11 @@ func (e *shard) gethistoryEpochKey(ch string) channelID { return channelID(e.config.Prefix + ".history.epoch." + ch) } -func (e *RedisEngine) shardIndex(channel string) int { +func (e *RedisEngine) getShard(channel string) *shard { if !e.sharding { - return 0 + return e.shards[0] } - return consistentIndex(channel, len(e.shards)) + return e.shards[consistentIndex(channel, len(e.shards))] } // Name returns name of engine. @@ -498,6 +499,7 @@ func (e *RedisEngine) name() string { // Run runs engine after node initialized. func (e *RedisEngine) run(h EngineEventHandler) error { for _, shard := range e.shards { + shard.engine = e err := shard.Run(h) if err != nil { return err @@ -512,17 +514,17 @@ func (e *RedisEngine) shutdown(ctx context.Context) error { // Publish - see engine interface description. func (e *RedisEngine) publish(ch string, pub *Publication, opts *ChannelOptions) <-chan error { - return e.shards[e.shardIndex(ch)].Publish(ch, pub, opts) + return e.getShard(ch).Publish(ch, pub, opts) } // PublishJoin - see engine interface description. func (e *RedisEngine) publishJoin(ch string, join *Join, opts *ChannelOptions) <-chan error { - return e.shards[e.shardIndex(ch)].PublishJoin(ch, join, opts) + return e.getShard(ch).PublishJoin(ch, join, opts) } // PublishLeave - see engine interface description. func (e *RedisEngine) publishLeave(ch string, leave *Leave, opts *ChannelOptions) <-chan error { - return e.shards[e.shardIndex(ch)].PublishLeave(ch, leave, opts) + return e.getShard(ch).PublishLeave(ch, leave, opts) } // PublishControl - see engine interface description. @@ -544,48 +546,48 @@ func (e *RedisEngine) publishControl(data []byte) <-chan error { // Subscribe - see engine interface description. func (e *RedisEngine) subscribe(ch string) error { - return e.shards[e.shardIndex(ch)].Subscribe(ch) + return e.getShard(ch).Subscribe(ch) } // Unsubscribe - see engine interface description. func (e *RedisEngine) unsubscribe(ch string) error { - return e.shards[e.shardIndex(ch)].Unsubscribe(ch) + return e.getShard(ch).Unsubscribe(ch) } // AddPresence - see engine interface description. func (e *RedisEngine) addPresence(ch string, uid string, info *ClientInfo, exp time.Duration) error { expire := int(exp.Seconds()) - return e.shards[e.shardIndex(ch)].AddPresence(ch, uid, info, expire) + return e.getShard(ch).AddPresence(ch, uid, info, expire) } // RemovePresence - see engine interface description. func (e *RedisEngine) removePresence(ch string, uid string) error { - return e.shards[e.shardIndex(ch)].RemovePresence(ch, uid) + return e.getShard(ch).RemovePresence(ch, uid) } // Presence - see engine interface description. func (e *RedisEngine) presence(ch string) (map[string]*ClientInfo, error) { - return e.shards[e.shardIndex(ch)].Presence(ch) + return e.getShard(ch).Presence(ch) } // PresenceStats - see engine interface description. func (e *RedisEngine) presenceStats(ch string) (PresenceStats, error) { - return e.shards[e.shardIndex(ch)].PresenceStats(ch) + return e.getShard(ch).PresenceStats(ch) } // History - see engine interface description. func (e *RedisEngine) history(ch string, limit int) ([]*Publication, error) { - return e.shards[e.shardIndex(ch)].History(ch, limit) + return e.getShard(ch).History(ch, limit) } // RecoverHistory - see engine interface description. func (e *RedisEngine) recoverHistory(ch string, since *recovery) ([]*Publication, bool, recovery, error) { - return e.shards[e.shardIndex(ch)].RecoverHistory(ch, since) + return e.getShard(ch).RecoverHistory(ch, since) } // RemoveHistory - see engine interface description. func (e *RedisEngine) removeHistory(ch string) error { - return e.shards[e.shardIndex(ch)].RemoveHistory(ch) + return e.getShard(ch).RemoveHistory(ch) } // Channels - see engine interface description. @@ -797,7 +799,9 @@ func (e *shard) runPubSub() { chIDs[1] = pingChannel for _, ch := range e.node.hub.Channels() { - chIDs = append(chIDs, e.messageChannelID(ch)) + if e.engine.getShard(ch) == e { + chIDs = append(chIDs, e.messageChannelID(ch)) + } } batch := make([]channelID, 0)