Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IWF-274: Optimize Timer creation #529

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft

IWF-274: Optimize Timer creation #529

wants to merge 3 commits into from

Conversation

bowersj27
Copy link
Contributor

Description

Checklist

  • Code compiles correctly
  • Tests for the changes have been added
  • All tests passing
  • This PR change is backwards-compatible
  • This PR CONTAINS a (planned) breaking change (it is not backwards compatible)

Related Issue

Closes #issue_number

Copy link
Contributor

@longquanzheng longquanzheng left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very great job!! this is quite complicated, but I believe it's worth it.


type sortedTimers struct {
status service.InternalTimerStatus
// Ordered slice of all timers being awaited on
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: add note for sorting on descending order

"github.com/indeedeng/iwf/service"
)

type sortedTimers struct {
Copy link
Contributor

@longquanzheng longquanzheng Dec 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably can still call it "TimerScheduler" and move tp.createGreedyTimerScheduler(ctx, continueAsNewCounter) under it.

So that this can be in a spearate file as timerScheduler.go and this processor can call NewTimerScheduler(...) to initiate it.

type TimerScheduler struct{
    PendingSchedules TimerInfo
    ScheduledTimes []int64
}

func NewTimerScheduler(...) *TimerScheduler{
   createGreedyTimerSchedulerCoroutine(ctx, continueAsNewCounter)
}

// start some single thread that manages timers
tp.createGreedyTimerScheduler(ctx, continueAsNewCounter)

err := provider.SetQueryHandler(ctx, service.GetCurrentTimerInfosQueryType, func() (service.GetCurrentTimerInfosQueryResponse, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice to return the TimerScheduler here (it will make it easier for debug, and potentially add some advanced test cases for this greedyTimerProcessor.

But we probably need to move GetCurrentTimerInfosQueryResponse to this package in order to do that.

}
insertIndex = i + 1
}
t.timers = append(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder it would be more memory-efficient to break it into two operations:

t.timers = append(t.timers[:insertIndex], toAdd)
t.timers = append(t.timers, t.timers[insertIndex:]...)

And also maybe a little more readable

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we probably need to add some check here:

func main() {
	arr := []int{1, 2}
	fmt.Println(arr)
	fmt.Println(arr[:1])
	fmt.Println(arr[:2])
	fmt.Println(arr[3:])
}

will panic because slice bounds out of range

https://go.dev/play/p/0yw2yFI6Cv7


func (t *sortedTimers) removeTimer(toRemove *service.TimerInfo) {
for i, timer := range t.timers {
if toRemove == timer {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you need to check the value is equal, not the ref.

if *toRemove == *timer

Comment on lines +117 to +122
for i := len(createdTimers) - 1; i >= 0; i-- {
if createdTimers[i] > now {
createdTimers = createdTimers[:i+1]
break
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: maybe also move to pruneScheduledTimes and rename the above to prunePendingSchedules

}
}
next := t.pendingTimers.pruneToNextTimer(now)
return (next != nil && (len(createdTimers) == 0 || next.FiringUnixTimestampSeconds < createdTimers[len(createdTimers)-1])) || continueAsNewCounter.IsThresholdMet()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe add a few comments here for the cases:

  • "next != nil && (len(createdTimers) == 0" means we should schedule a next timer because there are none
  • "next.FiringUnixTimestampSeconds < createdTimers[len(createdTimers)-1]))“ means we should schedule a next timer because the next pending one is earlier than the earliest in scheduledTimes
  • continueAsNewCounter.IsThresholdMet() continueAsNew is needed

next := t.pendingTimers.pruneToNextTimer(now)
//next := t.pendingTimers.getEarliestTimer()
// only create a new timer when a pending timer exists before the next existing timer fires
if next != nil && (len(createdTimers) == 0 || next.FiringUnixTimestampSeconds < createdTimers[len(createdTimers)-1]) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like we don't have to do this check again, if it's not continueAsNew, it should be the case already?

if next != nil && (len(createdTimers) == 0 || next.FiringUnixTimestampSeconds < createdTimers[len(createdTimers)-1]) {
fireAt := next.FiringUnixTimestampSeconds
duration := time.Duration(fireAt-now) * time.Second
t.provider.NewTimer(ctx, duration)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This NewTimer is a non-blocking one.
At first glance I thought it should be t.provider.Sleep(...) then I remember that we had a similar discussion when implemeting it for Wayfinder LoL.

So we don't need to wait here because we rely on the fact that when the timer fired, workflow task will get triggered and evaluate the Await condition, right?

Maybe add more comments here too.

return service.TimerSkipped
}

_ = t.provider.Await(ctx, func() bool {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: add a comment here that the timers in the TimerScheduler will trigger a workflow task when the timer fire, hence it will drive the workflow task to evalaute timer.FiringUnixTimestampSeconds <= t.provider.Now(ctx).Unix().

That was a tricky part too in our Wayfinder code :P

@longquanzheng longquanzheng changed the title Jira/iwf 274 IWF-274: Optimize Timer creation Dec 23, 2024
@longquanzheng
Copy link
Contributor

Would be great to add a new test case to it -- especially the idleTimeout pattern, which is that main use case that this optimization is for.

Most of the code can be from any_timer_signal_test
https://github.com/indeedeng/iwf/blob/main/integ/workflow/any_timer_signal/routers.go#L47
Currently it’s
S1-1->S1-2->S2-1

S1-1 will wait for timer, S1-2 will wait for timer+signal.

We can change it to
S1-1->S1-2->(maybe a few more iterations) -> completeWF

Both S1-1 and S1-2 will wait for signal+timer and only the second timer will fired. We will expect only one timer was started in Temporal.
Maybe we have to check history events like this: https://github.com/indeedeng/iwf/blob/main/integ/wait_until_search_attributes_optimization_test.go#L133
(and only doing it for Temporal is fine)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants