Skip to content

Commit

Permalink
Fix rate limits (#43)
Browse files Browse the repository at this point in the history
* fix rate limits when value is below 60

* tweaks to the rate limiter, creating a drive service does not contribute to the total calls count
  • Loading branch information
BenCookie95 authored Dec 12, 2024
1 parent f472376 commit dbe1a8d
Show file tree
Hide file tree
Showing 5 changed files with 22 additions and 25 deletions.
19 changes: 11 additions & 8 deletions server/plugin/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -503,13 +503,6 @@ func (p *Plugin) handleDriveWatchNotifications(c *Context, w http.ResponseWriter
return
}

driveService, err := p.GoogleClient.NewDriveService(c.Ctx, userID)
if err != nil {
p.API.LogError("Failed to create Google Drive service", "err", err, "userID", userID)
w.WriteHeader(http.StatusInternalServerError)
return
}

clusterService := pluginapi.NewClusterService(p.API)
// Mutex to prevent race conditions from multiple requests directed at the same user in a short period of time.
m, err := clusterService.NewMutex("drive_watch_notifications_" + userID)
Expand All @@ -527,6 +520,13 @@ func (p *Plugin) handleDriveWatchNotifications(c *Context, w http.ResponseWriter
}
defer m.Unlock()

driveService, err := p.GoogleClient.NewDriveService(c.Ctx, userID)
if err != nil {
p.API.LogError("Failed to create Google Drive service", "err", err, "userID", userID)
w.WriteHeader(http.StatusInternalServerError)
return
}

// Get the pageToken from the KV store, it has changed since we acquired the lock.
watchChannelData, err = p.KVStore.GetWatchChannelData(userID)
if err != nil {
Expand Down Expand Up @@ -689,21 +689,24 @@ func (p *Plugin) handleDriveWatchNotifications(c *Context, w http.ResponseWriter
// We don't want to spam the user with notifications if there are more than 5 activities.
if len(activities) > 5 {
p.handleMultipleActivitiesNotification(change.File, userID)
lastActivityTime = change.File.ModifiedTime
} else {
// Newest activity is at the end of the list so iterate through the list in reverse.
for i := len(activities) - 1; i >= 0; i-- {
activity := activities[i]
if activity.PrimaryActionDetail.Comment != nil {
lastActivityTime = activity.Timestamp
p.handleCommentNotifications(c.Ctx, driveService, change.File, userID, activity)
}

if activity.PrimaryActionDetail.PermissionChange != nil {
lastActivityTime = activity.Timestamp
p.handleFileSharedNotification(change.File, userID)
}
}
}

err = p.KVStore.StoreLastActivityForFile(userID, change.FileId, change.File.ModifiedTime)
err = p.KVStore.StoreLastActivityForFile(userID, change.FileId, lastActivityTime)
if err != nil {
p.API.LogError("Failed to store last activity for file", "err", err, "fileID", change.FileId, "userID", userID)
}
Expand Down
7 changes: 4 additions & 3 deletions server/plugin/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,10 +247,11 @@ func TestNotificationWebhook(t *testing.T) {
mocks.MockKVStore.EXPECT().StoreWatchChannelData("userId1", *watchChannelData).Return(nil)
mocks.MockGoogleClient.EXPECT().NewDriveActivityService(context.Background(), "userId1").Return(mocks.MockDriveActivity, nil)
mocks.MockKVStore.EXPECT().GetLastActivityForFile("userId1", changeList.Changes[0].File.Id).Return(changeList.Changes[0].File.ModifiedTime, nil)
activityResponse := GetSampleDriveactivityPermissionResponse()
mocks.MockDriveActivity.EXPECT().Query(context.Background(), &driveactivity.QueryDriveActivityRequest{
ItemName: fmt.Sprintf("items/%s", changeList.Changes[0].File.Id),
Filter: "time > \"" + changeList.Changes[0].File.ModifiedTime + "\"",
}).Return(GetSampleDriveactivityPermissionResponse(), nil).MaxTimes(1)
}).Return(activityResponse, nil).MaxTimes(1)
te.mockAPI.On("GetConfig").Return(nil)
te.mockAPI.On("GetDirectChannel", "userId1", te.plugin.BotUserID).Return(&mattermostModel.Channel{Id: "channelId1"}, nil).Times(1)
post := &mattermostModel.Post{
Expand All @@ -267,7 +268,7 @@ func TestNotificationWebhook(t *testing.T) {
},
}
te.mockAPI.On("CreatePost", post).Return(nil, nil).Times(1)
mocks.MockKVStore.EXPECT().StoreLastActivityForFile("userId1", changeList.Changes[0].File.Id, changeList.Changes[0].File.ModifiedTime).Return(nil)
mocks.MockKVStore.EXPECT().StoreLastActivityForFile("userId1", changeList.Changes[0].File.Id, activityResponse.Activities[0].Timestamp).Return(nil)
},
},
"Send a notification for a comment on a file": {
Expand Down Expand Up @@ -330,7 +331,7 @@ func TestNotificationWebhook(t *testing.T) {
},
}
te.mockAPI.On("CreatePost", post).Return(nil, nil).Times(1)
mocks.MockKVStore.EXPECT().StoreLastActivityForFile("userId1", changeList.Changes[0].File.Id, changeList.Changes[0].File.ModifiedTime).Return(nil)
mocks.MockKVStore.EXPECT().StoreLastActivityForFile("userId1", changeList.Changes[0].File.Id, activityResponse.Activities[0].Timestamp).Return(nil)
},
},
} {
Expand Down
3 changes: 2 additions & 1 deletion server/plugin/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,8 @@ func (p *Plugin) handleCreate(c *plugin.Context, args *model.CommandArgs, parame
Optional: true,
})

ctx := context.Background()
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
defer cancel()
serviceV2, err := p.GoogleClient.NewDriveV2Service(ctx, args.UserId)
if err != nil {
p.API.LogError("Failed to create drive client", "err", err)
Expand Down
14 changes: 2 additions & 12 deletions server/plugin/google/google.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ const (
)

func NewGoogleClient(oauthConfig oauth2.Config, config *config.Configuration, kvstore kvstore.KVStore, papi plugin.API) ClientInterface {
maximumQueriesPerSecond := config.QueriesPerMinute / 60
maximumQueriesPerSecond := float64(config.QueriesPerMinute) / 60
burstSize := config.BurstSize

return &Client{
Expand All @@ -72,11 +72,6 @@ func (g *Client) NewDriveService(ctx context.Context, userID string) (DriveInter
return nil, err
}

err = g.driveLimiter.WaitN(ctx, 1)
if err != nil {
return nil, err
}

srv, err := drive.NewService(ctx, option.WithTokenSource(g.oauthConfig.TokenSource(ctx, authToken)))
if err != nil {
return nil, err
Expand Down Expand Up @@ -105,11 +100,6 @@ func (g *Client) NewDriveV2Service(ctx context.Context, userID string) (DriveV2I
return nil, err
}

err = g.driveLimiter.WaitN(ctx, 1)
if err != nil {
return nil, err
}

srv, err := driveV2.NewService(ctx, option.WithTokenSource(g.oauthConfig.TokenSource(ctx, authToken)))
if err != nil {
return nil, err
Expand Down Expand Up @@ -345,6 +335,6 @@ func (ds googleServiceBase) checkRateLimits(ctx context.Context) error {

func (g *Client) ReloadConfigs(newQueriesPerMinute int, newBurstSize int, oauthConfig oauth2.Config) {
g.oauthConfig = oauthConfig
g.driveLimiter.SetLimit(rate.Limit(newQueriesPerMinute / 60))
g.driveLimiter.SetLimit(rate.Limit(float64(newQueriesPerMinute) / 60))
g.driveLimiter.SetBurst(newBurstSize)
}
4 changes: 3 additions & 1 deletion server/plugin/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ func GetSampleDriveactivityCommentResponse() *driveactivity.QueryDriveActivityRe
},
},
},
Timestamp: "2021-01-02T00:00:00.000Z",
},
},
NextPageToken: "",
Expand All @@ -165,7 +166,8 @@ func GetSampleDriveactivityPermissionResponse() *driveactivity.QueryDriveActivit
},
},
},
Targets: []*driveactivity.Target{},
Targets: []*driveactivity.Target{},
Timestamp: "2021-01-02T00:00:00.000Z",
},
},
NextPageToken: "",
Expand Down

0 comments on commit dbe1a8d

Please sign in to comment.