Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Support for SQLite backend #130

Open
wants to merge 45 commits into
base: main
Choose a base branch
from

Conversation

pranavmodx
Copy link

Fixes #129

✅ Feature parity with existing backends
✅ Roughly the same amount of test coverage as existing backends

All of the major features are working with this new SQLite backend. Almost all of the tests in other backends have been extended for this one. More can be added as required to capture missed or specific cases. Currently tests create test.db within backends/sqlite. Not sure if that is ideal. I am open to hearing your thoughts.

For listening to jobs, Go channels are used as we don't have something similar to pg_notify for SQLite.
I added a new in progress job status which was useful for our use case but up to you to decide whether to keep it.

Copy link
Owner

@acaloiaro acaloiaro left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is looking solid! Thanks for the contribution.

I've commented directly on code lines where I have question and change requests.

Here are a few more thoughts and questions that don't direclty relate to code.

  • Can you help me understand how mutual exclusion of jobs is achieved? For Postgres it is achieved with FOR UPDATE SKIP LOCKED at the query level.
  • Is mutual exclusion affected by: -DSQLITE_THREADSAFE=0
  • Is mutual exclusion affected by WAL mode?: https://www.sqlite.org/wal.html
  • Would it make sense to get test coverage over sqlite in these alternative modes? Or is that being too paranoid?

Tests and lints

There are some tests (data races) and lints failing. Feel free to annotate some of the lints to be ignored if you don't think they make sense or seem to onerous.

You can lint and test locally

  • make lint
  • go test backends/sqlite/sqlite_backend_test.go -tags testing -race

);

CREATE TABLE neoq_dead_jobs (
id integer primary key not null,
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like your editor/autoformatter adds an extreanous indent at the start of these CREATE TABLE statements (same for the neoq_jobs table).

handlers map[string]handler.Handler // a map of queue names to queue handlers
queueListenerChan map[string]chan string // each queue has a listener channel to process enqueued jobs
logger logging.Logger // backend-wide logger
dbMutex *sync.RWMutex // protects concurrent access to sqlite db on SqliteBackend
Copy link
Owner

@acaloiaro acaloiaro Jun 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this may be how we're achieving job mutual exclusion. Though I do wonder if we should consider using SQLlite's internal mutual exclusion instead and enforce THREADSAFE=1...is that possible?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was not aware of this mode. I can try experimenting with this and see if it works well.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @acaloiaro, I tried couple of things and here's my understanding:

  1. We might not need a custom job mutual exclusion (with sync.RWMutex). Apparently, SQLite by default works in serialized mode aka -DSQLITE_THREADSAFE=1, meaning it handles it internally. It could handle around 3k concurrent writes (neoq.Enqueue) before getting a database is locked error. I think this is why I went ahead with the custom handling earlier.
    But alternatively (and perhaps a better solution), go-sqlite3 driver FAQ recommends using a ?cache=shared mode with db.SetMaxOpenConns(1) (a related thread). It seems to handle as high as 10k concurrent writes and probably more.
    I couldn't observe possible implications from this as long as the transactions are very small - which brings me to the question - do we need to wrap entire handleJob code in a transaction (like how it's done in postgres_backend.go), or do one small transaction before handler.Exec (for deadline check) and one after (marking as processed)? Is there a reason why the former is done? Because otherwise, the latter makes more sense to me as both it follows the general thumb rule and also does not slow down writes. The enqueues get stuck otherwise beyond a point as one of the handleJob will obtain exclusive access to db. Let me know your thoughts.

  2. Mutual exclusion would most likely be affected by -DSQLITE_THREADSAFE=0. I didn't try this - it requires manually building SQLite source code and linking it to go-sqlite3 driver. Is this something necessary to consider?

  3. WAL Mode works well too without the need for custom handling. According the documentation, it helps in achieving higher concurrency. But it introduces additional files -shm and -wal, and the latter came to be around 3 times the size of db.

Note: QueueListenerChanBufferSize and handler.Concurrency needs to be correctly configured by user based on their specific needs for things to work well.

backends/sqlite/sqlite_backend.go Show resolved Hide resolved
}

dbURI := strings.Split(s.config.ConnectionString, "/")
dbPath := strings.Join(dbURI[1:], "/")
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's make sure len(dbURI) > 1, and throw an error that the connection string is malformed, before indexing into its slice.

Copy link
Author

@pranavmodx pranavmodx Jun 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alternatively, I thought to have:
dbPath := strings.TrimPrefix(s.config.ConnectionString, "sqlite3://")

When testing for Windows I found that the connection string must be of the form sqlite3://file:///<db_path> for it to open the db successfully. If we do a split on "/" we get an extra "/" attached at the start of the resulting db path. What do you think?

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds better than splitting.

}

// Rollback is safe to call even if the tx is already closed, so if
// the tx commits successfully, this is a no-op
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I remember making this comment about Postgres transactions. Is it true of Sqlite too?

Copy link
Author

@pranavmodx pranavmodx Jun 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pgx lib clearly documents that tx.Rollback is safe to call multiple times even after commit. But the same is not there for sql lib. Theoretically rollback shouldn't have any effect after commit right? But if it is a concern then maybe we can do something like:

defer func() {
    if err != nil {
        _ = tx.Rollback(ctx)
        s.logger.Error("Transaction rolled back due to error", slog.Any("error", err))
    }
}()

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This confirms that it’s a no-op for the sql.DB interface more generally.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is from the fact that in their example code they are doing a defer tx.Rollback() like we are doing currently?

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was referring to the following from the example:

Defer the transaction’s rollback. If the transaction succeeds, it will be committed before the function exits, making the deferred rollback call a no-op. If the transaction fails it won’t be committed, meaning that the rollback will be called as the function exits.

return
}

func (s *SqliteBackend) updateJobToInProgress(ctx context.Context, h handler.Handler, job *jobs.Job) (err error) {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should consider having this function receive a *sql.Tx rather than it create one. That way, job handling can use a single transaction for all db operations.

In the PG backend, we do this by adding a tx to the ctx, e.g. from handleJob:

ctx = context.WithValue(ctx, txCtxVarKey, tx)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In any case we'll need atleast 2 transactions right? One for in progress/failed and one for processed/failed ? Since in progress needs to be committed

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How important is in progress to you? Is there a specific use case you have in mind?

I intentionally avoided it for postgres because it's an additional database round trip, and the best use case I had for it was to be able to show in-progress jobs in a UI (which doesn't exist). As you can see, leaving in progress out also reduced complexity because the "state machine" for determining its correct value also needs to be considered. I'd be inclined to remove the in progress requirement unless it's serving a specific goal of yours.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had the very same use case for being able to show in-progress jobs in a UI. But maybe you're right, it adds more complexity. We can perhaps remove the requirement for now as it would also be consistent with other backends.

// In progress job
if jobErr == nil && len(status) != 0 {
qstr := "UPDATE neoq_jobs SET status = $1, error = $2 WHERE id = $3"
_, err = tx.ExecContext(ctx, qstr, status, errMsg, job.ID)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's check for this error and return an error when it's non-nil. Otherwise we could have in-progress jobs without an in-progress status, which may cause confusion.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is captured in line 440 to 447 right? That err variable gets returned from the function.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, my mistake.

return
})
// Make sure that each neoq worker only works on one thing at a time.
h.Concurrency = 1
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This possibly places the test under unrealistic conditions. I would expect real world users to use default concurrency. Will this test work with concurrency > 1?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are test cases borrowed from postgres backend. I hadn't put much thought on this specific case. I can try your suggestion.

const WaitForJobTime = 1100 * time.Millisecond

// allow time for listener to start and for at least one job to process
time.Sleep(WaitForJobTime)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a done channel here instead? 1.1s is a pretty long fixed wait time. Some of my early tests did this, but I've tried to get away from any fixed wait times in favor of a done channel and a timeout instead.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

feat: Support for SQLite backend
2 participants