Skip to content

Commit

Permalink
to #246 fix: add mutex in subscription.go
Browse files Browse the repository at this point in the history
  • Loading branch information
TianyuZhang1214 committed Dec 5, 2024
1 parent 483fca9 commit 02f92b4
Showing 1 changed file with 41 additions and 28 deletions.
69 changes: 41 additions & 28 deletions pgserver/logrepl/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,38 +16,48 @@ type Subscription struct {
}

var subscriptionMap = sync.Map{}
var createMutex sync.Mutex

func GetAllSubscriptions(ctx *sql.Context) ([]*Subscription, error) {
if err := loadAllSubscriptions(ctx); err != nil {
return nil, err
}

var replicators []*Subscription
var subscriptions []*Subscription
subscriptionMap.Range(func(key, value interface{}) bool {
replicators = append(replicators, value.(*Subscription))
if sub, ok := value.(*Subscription); ok {
subscriptions = append(subscriptions, sub)
}
return true
})

return replicators, nil
return subscriptions, nil
}

func GetSubscription(ctx *sql.Context, name string) (*Subscription, error) {
value, ok := subscriptionMap.Load(name)
if !ok {
err := loadAllSubscriptions(ctx)
if err != nil {
return nil, err
if value, ok := subscriptionMap.Load(name); ok {
if sub, ok := value.(*Subscription); ok {
return sub, nil
}
value, ok = subscriptionMap.Load(name)
if !ok {
return nil, err
}

// Attempt to reload all subscriptions if not found
if err := loadAllSubscriptions(ctx); err != nil {
return nil, err
}

if value, ok := subscriptionMap.Load(name); ok {
if sub, ok := value.(*Subscription); ok {
return sub, nil
}
}

return value.(*Subscription), nil
return nil, nil
}

func CreateSubscription(ctx *sql.Context, name string, conn string, publication string) (*Subscription, error) {
createMutex.Lock()
defer createMutex.Unlock()

subscription, err := GetSubscription(ctx, name)
if err != nil {
Expand Down Expand Up @@ -82,27 +92,30 @@ func loadAllSubscriptions(ctx *sql.Context) error {
}
defer rows.Close()

if rows != nil {
for rows.Next() {
var name, conn, pub string
err = rows.Scan(&name, &conn, &pub)
_, ok := subscriptionMap.Load(name)
if !ok {
replicator, err := NewLogicalReplicator(conn)
if err != nil {
return err
for rows.Next() {
var name, conn, pub string
if err := rows.Scan(&name, &conn, &pub); err != nil {
return err
}
if _, loaded := subscriptionMap.LoadOrStore(name, &Subscription{
Name: name,
Conn: conn,
Publication: pub,
Replicator: nil,
}); !loaded {
replicator, err := NewLogicalReplicator(conn)
if err != nil {
return err
}
if sub, ok := subscriptionMap.Load(name); ok {
if subscription, ok := sub.(*Subscription); ok {
subscription.Replicator = replicator
}
subscriptionMap.Store(name, &Subscription{
Name: name,
Conn: conn,
Publication: pub,
Replicator: replicator,
})
}
}
}

return err
return nil
}

func WriteSubscription(ctx *sql.Context, name, conn, pub string) error {
Expand Down

0 comments on commit 02f92b4

Please sign in to comment.