-
Notifications
You must be signed in to change notification settings - Fork 33
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support Orchestration ID Reuse Policies #45
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some initial feedback. One high level point is that I'd like it if we can try to organize this in a way that reduces duplicate code when we need to support multiple backends.
backend/backend.go
Outdated
@@ -16,6 +16,7 @@ var ( | |||
ErrNotInitialized = errors.New("backend not initialized") | |||
ErrWorkItemLockLost = errors.New("lock on work-item was lost") | |||
ErrBackendAlreadyStarted = errors.New("backend is already started") | |||
ErrSkipExistInstance = errors.New("Skip creating existing instance") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: golang style guides say that error messages should start with a lowercase letter. I see warnings in VS Code when I try to create error messages that start with capital letters.
backend/sqlite/sqlite.go
Outdated
@@ -403,6 +403,9 @@ func (be *sqliteBackend) CreateOrchestrationInstance(ctx context.Context, e *bac | |||
|
|||
var instanceID string | |||
if err := be.createOrchestrationInstanceInternal(ctx, e, tx, &instanceID); err != nil { | |||
if err == backend.ErrSkipExistInstance { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In Go, it's preferred to do errors.Is(err, backend.ErrSkipExistinstance)
instead of err == backend.ErrSkipExistInstance
. More info here: https://go.dev/blog/go1.13-errors.
if err == backend.ErrSkipExistInstance { | |
if errors.Is(err, backend.ErrSkipExistinstance) { |
backend/sqlite/sqlite.go
Outdated
@@ -403,6 +403,9 @@ func (be *sqliteBackend) CreateOrchestrationInstance(ctx context.Context, e *bac | |||
|
|||
var instanceID string | |||
if err := be.createOrchestrationInstanceInternal(ctx, e, tx, &instanceID); err != nil { | |||
if err == backend.ErrSkipExistInstance { | |||
return nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should log a warning message here. Something like: "An instance with ID '%s' already exists; dropping duplicate create request"
backend/sqlite/sqlite.go
Outdated
instanceExists = true | ||
} | ||
|
||
if !instanceExists || startEvent.InstanceExistOption == protos.InstanceExistOption_Throw_IF_EXIST { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As far as I can tell, this won't actually throw if the instance doesn't exist. The query below is INSERT OR IGNORE...
which means that the query will just return 0 results if the instance already exists, and we'll return ErrDuplicateEvent
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, yes, I think I got confused. My suggestion is that we keep the existing logic as-is except for the TERMINATE_IF_EXIST
case. Instead of handling THROW_IF_EXIST
or SKIP_IF_EXIST
here, we can simply update the caller to check for ErrDuplicateEvent
. If we get ErrDuplicateEvent
and the policy is THROW_IF_EXIST
, then we can return the error as we do today. Otherwise, it the policy is SKIP_IF_EXIST
, we can just log a warning and return a success. I think it's better to have that logic in the calling code so that we can reduce the amount of special handling the different backends need to implement.
backend/sqlite/sqlite.go
Outdated
) VALUES (?, ?, ?, ?, ?, ?, ?)`, | ||
startEvent.Name, | ||
startEvent.Version.GetValue(), | ||
`SELECT * FROM [Instances] WHERE [InstanceID] = ?`, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need this new SQL statement or is the existing logic sufficient?
backend/sqlite/sqlite.go
Outdated
*instanceID = startEvent.OrchestrationInstance.InstanceId | ||
*instanceID = startEvent.OrchestrationInstance.InstanceId | ||
} else if startEvent.InstanceExistOption == protos.InstanceExistOption_SKIP_IF_EXIST { | ||
be.logger.Debugf( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of logging here, I suggest we log in the calling code. That way, we have common logging behavior that works for all backend types. I want to minimize the amount of duplication we have to do across all backends (sqlite, actors, postgres, custom, etc.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One question about the Backend
interface, if I want to update it will it be a break change for Dapr workflow, I haven't really look into how Dapr is using durabletask-go, I assume it implements the Backend
interface but using actor storage. So is it ok to update this interface?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think my suggestion requires a breaking change, but it's okay for us to make breaking changes still since durabletask-go
is not consider "stable". Yes, we'll need to update the Dapr implementation, but that's fine - we've done it many times before. It doesn't impact customers.
backend/sqlite/sqlite.go
Outdated
// same instance id exists and InstanceExistOption_TERMINATE_IF_EXIST set | ||
// to terminate old instance | ||
// 1. remove all entires in history table | ||
// 2. remove all entries in instance table |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We just need to remove the instance with the same ID, not "all" instances.
backend/sqlite/sqlite.go
Outdated
// 3. remove all entries in newEvents table | ||
// 4. remove all entries in new task table | ||
// 5. insert new instance to instance table. | ||
// do all above updates in one atomic operation |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We're already operating in the context of a transaction, so it's okay if you do this in multiple discrete steps.
0676d38
to
f5b9aac
Compare
Just add a more concrete implementation, but also got two questions
|
@@ -16,6 +16,7 @@ var ( | |||
ErrNotInitialized = errors.New("backend not initialized") | |||
ErrWorkItemLockLost = errors.New("lock on work-item was lost") | |||
ErrBackendAlreadyStarted = errors.New("backend is already started") | |||
ErrInstanceNotExists = errors.New("isntance does not exist") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ErrInstanceNotExists = errors.New("isntance does not exist") | |
ErrInstanceNotExists = errors.New("instance does not exist") |
Open a new PR #46, closing this one. |
This PR tries to update the logic to support reuse orchestration ID, more details can be found #42, dapr/dapr#7101
Corresponding protobuf updates can be found microsoft/durabletask-protobuf#19
It adds enum
InstanceExistOption
which contains three optionsTHROW_IF_EXIST
,TERMINATE_IF_EXIST
,SKIP_IF_EXIST
.THROW_IF_EXIST
will be the default behavior which is also the current behavior.