Skip to content

Commit

Permalink
Fixing checkpoint truncate
Browse files Browse the repository at this point in the history
  • Loading branch information
maxpert committed Sep 17, 2022
1 parent 48ca222 commit e43767c
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 22 deletions.
2 changes: 1 addition & 1 deletion build-linux.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,5 @@
CC=x86_64-linux-musl-gcc \
CXX=x86_64-linux-musl-g++ \
GOARCH=amd64 GOOS=linux CGO_ENABLED=1 \
go build -ldflags "-linkmode external -extldflags -static" -o dist/marmot-linux-amd64 .
go build -ldflags "-linkmode external -extldflags -static" -o dist/linux/amd64/marmot

34 changes: 19 additions & 15 deletions db/sqlite.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,25 +289,29 @@ func (conn *SqliteStreamDB) RestoreFrom(bkFilePath string) error {
// else is modifying or interacting with DB
return sgSQL.WithTx(func(_ *goqu.TxDatabase) error {
return dgSQL.WithTx(func(_ *goqu.TxDatabase) error {
fi, err := os.OpenFile(bkFilePath, os.O_RDWR, 0)
if err != nil {
return err
}
defer fi.Close()

fo, err := os.OpenFile(conn.dbPath, os.O_WRONLY, 0)
if err != nil {
return err
}
defer fo.Close()

bytesWritten, err := io.Copy(fo, fi)
log.Debug().Int64("bytes", bytesWritten).Msg("Backup bytes copied...")
return err
return copyFile(bkFilePath, conn.dbPath)
})
})
}

func copyFile(fromPath, toPath string) error {
fi, err := os.OpenFile(fromPath, os.O_RDWR, 0)
if err != nil {
return err
}
defer fi.Close()

fo, err := os.OpenFile(toPath, os.O_WRONLY, 0)
if err != nil {
return err
}
defer fo.Close()

bytesWritten, err := io.Copy(fo, fi)
log.Debug().Int64("bytes", bytesWritten).Msg("Backup bytes copied...")
return err
}

func (conn *SqliteStreamDB) GetRawConnection() *sqlite3.SQLiteConn {
return conn.rawConnection
}
Expand Down
4 changes: 2 additions & 2 deletions lib/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,8 +341,8 @@ func (r *RaftServer) config(clusterID uint64) config.Config {
ElectionRTT: 10,
HeartbeatRTT: 1,
CheckQuorum: true,
SnapshotEntries: 10_000,
CompactionOverhead: 1_000,
SnapshotEntries: 20,
CompactionOverhead: 0,
EntryCompressionType: config.Snappy,
SnapshotCompressionType: config.Snappy,
}
Expand Down
17 changes: 13 additions & 4 deletions lib/sqlite_log_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,13 +363,16 @@ func (s *SQLiteLogDB) CompactEntriesTo(clusterID uint64, nodeID uint64, index ui

defer func() {
var rows *sql.Rows
var err error

for {
if rows != nil {
_ = rows.Close()
}

rows, err := s.db.Query("PRAGMA wal_checkpoint(TRUNCATE)")
rows, err = s.db.Query("PRAGMA wal_checkpoint(TRUNCATE)")
if err != nil {
log.Error().Err(err).Msg("Unable to compact checkpoint")
break
}

Expand All @@ -378,18 +381,24 @@ func (s *SQLiteLogDB) CompactEntriesTo(clusterID uint64, nodeID uint64, index ui
var busy, logi, checkpointed int64
err = rows.Scan(&busy, &logi, &checkpointed)
if err != nil {
log.Error().Err(err).Msg("Unable to read checkpoint data")
break
}

if busy == 0 {
log.Info().
Int64("log_index", logi).
Int64("checkpointed", checkpointed).
Msg("Checkpoint complete")
break
}

_ = rows.Close()
time.Sleep(100 * time.Millisecond)
time.Sleep(200 * time.Millisecond)
}

_ = rows.Close()
if rows != nil {
_ = rows.Close()
}
close(ch)
}()

Expand Down

0 comments on commit e43767c

Please sign in to comment.