-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add waitForProcessComplete #126
Conversation
f6016bf
to
4cfa9ac
Compare
@@ -27,14 +27,14 @@ type immediateTaskConcurrentProcessor struct { | |||
rootCtx context.Context | |||
cfg config.Config | |||
taskToProcessChan chan data_models.ImmediateTask | |||
// for quickly checking if the shardId is being processed | |||
currentShards map[int32]bool |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
delete currentShards
73e3c6a
to
570a76e
Compare
570a76e
to
e226b48
Compare
visibilityStore persistence.VisibilityStore | ||
logger log.Logger | ||
// shardId: WaitForProcessCompletionChannelsPerShard | ||
waitForProcessCompletionChannelsPerShardMap map[int32]WaitForProcessCompletionChannelsPerShard |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: WaitForProcessCompletionChannelsPerShard -> WaitForProcessCompletionChannels. the type doesn't need to know it's being used as "per shard". the var name is a better place.
exists := w.currentShards[shardId] | ||
w.currentShards[shardId] = true | ||
w.taskToCommitChans[shardId] = tasksToCommitChan | ||
_, exists := w.taskToCommitChans[shardId] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would it be any racing condition here? Note that the map is not threadsafe
processor ImmediateTaskProcessor | ||
|
||
// processExecutionId : channel | ||
channelMap map[string]chan string |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
similarly here, operating a map under concurrency needs some locking to protect it
func (w *WaitForProcessCompletionChannelsPerShardImpl) terminationCheck(processExecutionId string) { | ||
// To check if the channel should be closed after a certain time period | ||
go func() { | ||
time.Sleep(time.Second * time.Duration(DEFAULT_WAIT_FOR_TIMEOUT_MAX+3)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a bit wasteful and hacky to use a background thread + timer.
A better/clean and formal way is to let/ensure every waiting request call the cleanup at the end.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently, the terminationCheck is only called after adding a request, we need to wait for the timeout to perform the cleanup. One request will call once of the method only.
exists := w.currentShards[shardId] | ||
w.currentShards[shardId] = true | ||
w.taskToCommitChans[shardId] = tasksToCommitChan | ||
lock := sync.RWMutex{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the lock needs to be inited in constructor and kept as a member field. see this simple example: https://github.com/uber/cadence/blob/02c7efbed448c4a493b3a971e8e0e292e17c6d91/common/collection/concurrentPriorityQueue.go#L27
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But what's the issue with the current implementation? It seems to me create a lock when needed is also doable. I am wondering if this is mainly about performance?
The lock needs to be a shared instance across threads in order to work as
expected
Thanks,
Quanzheng
…On Tue, May 21, 2024 at 10:48 PM Kaili Zhu ***@***.***> wrote:
***@***.**** commented on this pull request.
------------------------------
In engine/immediate_task_concurrent_processor.go
<#126 (comment)>:
> @@ -66,17 +67,48 @@ func (w *immediateTaskConcurrentProcessor) GetTasksToProcessChan() chan<- data_m
func (w *immediateTaskConcurrentProcessor) AddImmediateTaskQueue(
shardId int32, tasksToCommitChan chan<- data_models.ImmediateTask,
) (alreadyExisted bool) {
- exists := w.currentShards[shardId]
- w.currentShards[shardId] = true
- w.taskToCommitChans[shardId] = tasksToCommitChan
+ lock := sync.RWMutex{}
But what's the issue with the current implementation? It seems to me
create a lock when needed is also doable. I am wondering if this is mainly
about performance?
—
Reply to this email directly, view it on GitHub
<#126 (comment)>,
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/ABCQPMY5D5EA53VDLMPNWVDZDQWSLAVCNFSM6AAAAABGAOST5WVHI2DSMVQWIX3LMV43YUDVNRWFEZLROVSXG5CSMV3GSZLXHMZDANZQGE2DONRVGA>
.
You are receiving this because your review was requested.Message ID:
***@***.***>
|
Makes sense. Modified. |
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #126 +/- ##
==========================================
- Coverage 59.51% 57.44% -2.07%
==========================================
Files 92 93 +1
Lines 7279 7609 +330
==========================================
+ Hits 4332 4371 +39
- Misses 2666 2953 +287
- Partials 281 285 +4 ☔ View full report in Codecov by Sentry. |
Why make this pull request?
implement WaitForProcessComplete: