Skip to content

Commit

Permalink
Moving to connection pools and independent connections on backup/restore
Browse files Browse the repository at this point in the history
  • Loading branch information
maxpert committed Oct 11, 2022
1 parent 279404e commit d74c280
Show file tree
Hide file tree
Showing 4 changed files with 236 additions and 109 deletions.
81 changes: 61 additions & 20 deletions db/change_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,16 @@ func (conn *SqliteStreamDB) Replicate(event *ChangeLogEvent) error {
}

func (conn *SqliteStreamDB) CleanupChangeLogs(beforeTime time.Time) (int64, error) {
sqlConn, err := conn.pool.Borrow()
if err != nil {
return 0, err
}
defer sqlConn.Return()

total := int64(0)
for name := range conn.watchTablesSchema {
metaTableName := conn.metaTable(name, changeLogName)
rs, err := conn.Delete(metaTableName).
rs, err := sqlConn.DB().Delete(metaTableName).
Where(
goqu.C("state").Eq(Published),
goqu.C("created_at").Lte(beforeTime.UnixMilli()),
Expand Down Expand Up @@ -133,7 +139,13 @@ func (conn *SqliteStreamDB) consumeReplicationEvent(event *ChangeLogEvent) error
conn.dbLock.Lock()
defer conn.dbLock.Unlock()

return conn.WithTx(func(tnx *goqu.TxDatabase) error {
sqlConn, err := conn.pool.Borrow()
if err != nil {
return err
}
defer sqlConn.Return()

return sqlConn.DB().WithTx(func(tnx *goqu.TxDatabase) error {
primaryKeyMap := conn.getPrimaryKeyMap(event)
if primaryKeyMap == nil {
return ErrNoTableMapping
Expand Down Expand Up @@ -170,6 +182,12 @@ func (conn *SqliteStreamDB) getPrimaryKeyMap(event *ChangeLogEvent) map[string]a
}

func (conn *SqliteStreamDB) initTriggers(tableName string) error {
sqlConn, err := conn.pool.Borrow()
if err != nil {
return err
}
defer sqlConn.Return()

name := strings.TrimSpace(tableName)
if strings.HasPrefix(name, "sqlite_") || strings.HasPrefix(name, conn.prefix) {
return fmt.Errorf("invalid table to watch %s", tableName)
Expand All @@ -182,27 +200,28 @@ func (conn *SqliteStreamDB) initTriggers(tableName string) error {
}

log.Info().Msg(fmt.Sprintf("Creating trigger for %v", name))
_, err = conn.Exec(script)
_, err = sqlConn.DB().Exec(script)
if err != nil {
return err
}

return nil
}

func (conn *SqliteStreamDB) watchChanges(path string) {
func (conn *SqliteStreamDB) watchChanges(watcher *fsnotify.Watcher, path string) {
shmPath := path + "-shm"
walPath := path + "-wal"
watcher := conn.watcher

errDB := watcher.Add(path)
errShm := watcher.Add(shmPath)
errWal := watcher.Add(walPath)
changeLogTicker := time.NewTicker(time.Millisecond * 500)

changeLogTicker := time.NewTicker(time.Millisecond * 250)
updateTime := time.Now()

for {
select {
case ev, ok := <-conn.watcher.Events:
case ev, ok := <-watcher.Events:
if !ok {
return
}
Expand All @@ -214,23 +233,33 @@ func (conn *SqliteStreamDB) watchChanges(path string) {
if t.After(updateTime) {
conn.publishChangeLog()
}
}

if errShm != nil {
errShm = watcher.Add(shmPath)
}
if errDB != nil {
errDB = watcher.Add(path)
}

if errWal != nil {
errWal = watcher.Add(walPath)
}
if errShm != nil {
errShm = watcher.Add(shmPath)
}

if errWal != nil {
errWal = watcher.Add(walPath)
}

updateTime = time.Now()
}
}

func (conn *SqliteStreamDB) getGlobalChanges(limit uint) ([]globalChangeLogEntry, error) {
sqlConn, err := conn.pool.Borrow()
if err != nil {
return nil, err
}
defer sqlConn.Return()

var entries []globalChangeLogEntry
err := conn.
err = sqlConn.DB().
From(conn.globalMetaTable()).
Limit(limit).
ScanStructs(&entries)
Expand Down Expand Up @@ -261,10 +290,16 @@ func (conn *SqliteStreamDB) publishChangeLog() {
return
}

sqlConn, err := conn.pool.Borrow()
if err != nil {
log.Error().Err(err).Msg("Unable to acquire connection from pool")
}
defer sqlConn.Return()

for _, change := range changes {
logEntries := changeLogEntry{}

found, err := conn.Select("id", "type", "state").
found, err := sqlConn.DB().Select("id", "type", "state").
From(conn.metaTable(change.TableName, changeLogName)).
Where(
goqu.C("state").Eq(Pending),
Expand Down Expand Up @@ -295,8 +330,8 @@ func (conn *SqliteStreamDB) publishChangeLog() {
log.Error().Err(err).Msg("Unable to consume changes")
}

err = conn.WithTx(func(txDatabase *goqu.TxDatabase) error {
_, err = conn.Update(conn.metaTable(change.TableName, changeLogName)).
err = sqlConn.DB().WithTx(func(tx *goqu.TxDatabase) error {
_, err = tx.Update(conn.metaTable(change.TableName, changeLogName)).
Set(goqu.Record{"state": Published}).
Where(goqu.Ex{"id": change.ChangeTableId}).
Prepared(true).
Expand All @@ -307,7 +342,7 @@ func (conn *SqliteStreamDB) publishChangeLog() {
return err
}

_, err = conn.Delete(conn.globalMetaTable()).
_, err = tx.Delete(conn.globalMetaTable()).
Where(goqu.C("id").Eq(change.Id)).
Prepared(true).
Executor().
Expand Down Expand Up @@ -391,14 +426,20 @@ func (conn *SqliteStreamDB) fetchChangeRows(
idColumnName string,
rowIds []int64,
) (*sql.Rows, error) {
sqlConn, err := conn.pool.Borrow()
if err != nil {
return nil, err
}
defer sqlConn.Return()

columnNames := make([]any, 0)
tableCols := conn.watchTablesSchema[tableName]
columnNames = append(columnNames, goqu.C("id").As(idColumnName))
for _, col := range tableCols {
columnNames = append(columnNames, goqu.C("val_"+col.Name).As(col.Name))
}

query, params, err := conn.From(conn.metaTable(tableName, changeLogName)).
query, params, err := sqlConn.DB().From(conn.metaTable(tableName, changeLogName)).
Select(columnNames...).
Where(goqu.C("id").In(rowIds)).
Prepared(true).
Expand All @@ -407,7 +448,7 @@ func (conn *SqliteStreamDB) fetchChangeRows(
return nil, err
}

rawRows, err := conn.Query(query, params...)
rawRows, err := sqlConn.DB().Query(query, params...)
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit d74c280

Please sign in to comment.