Skip to content

Commit

Permalink
refactor: refactor source code and unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Tochemey committed Oct 14, 2023
1 parent 36ff7b5 commit e13bae4
Show file tree
Hide file tree
Showing 18 changed files with 1,162 additions and 157 deletions.
141 changes: 109 additions & 32 deletions egopb/ego.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 1 addition & 3 deletions eventstore/iface.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,7 @@ type EventsStore interface {
// PersistenceIDs returns the distinct list of all the persistence ids in the journal store
PersistenceIDs(ctx context.Context, pageSize uint64, pageToken string) (persistenceIDs []string, nextPageToken string, err error)
// GetShardEvents returns the next (max) events after the offset in the journal for a given shard
GetShardEvents(ctx context.Context, shardNumber uint64, offset uint64, max uint64) (events []*egopb.Event, err error)
// NumShards returns the total number of shards in the events store
NumShards(ctx context.Context) (int, error)
GetShardEvents(ctx context.Context, shardNumber uint64, offset int64, max uint64) ([]*egopb.Event, int64, error)
// ShardNumbers returns the distinct list of all the shards in the journal store
ShardNumbers(ctx context.Context) ([]uint64, error)
}
139 changes: 127 additions & 12 deletions eventstore/memory/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"fmt"
"sort"

goset "github.com/deckarep/golang-set/v2"
"github.com/google/uuid"
"github.com/hashicorp/go-memdb"
"github.com/pkg/errors"
Expand Down Expand Up @@ -443,23 +444,137 @@ func (s *EventsStore) GetLatestEvent(ctx context.Context, persistenceID string)
}

// GetShardEvents returns the next (max) events after the offset in the journal for a given shard
// nolint
func (s *EventsStore) GetShardEvents(ctx context.Context, shardNumber uint64, offset uint64, max uint64) (events []*egopb.Event, err error) {
panic("implement me")
}
func (s *EventsStore) GetShardEvents(ctx context.Context, shardNumber uint64, offset int64, max uint64) ([]*egopb.Event, int64, error) {
// add a span context
ctx, span := telemetry.SpanContext(ctx, "eventsStore.GetShardEvents")
defer span.End()

// check whether this instance of the journal is connected or not
if !s.connected.Load() {
return nil, 0, errors.New("journal store is not connected")
}

// spawn a db transaction for read-only
txn := s.db.Txn(false)
// fetch all the records for the given shard
it, err := txn.Get(journalTableName, persistenceIDIndex)
// handle the error
if err != nil {
// abort the transaction
txn.Abort()
return nil, 0, errors.Wrapf(err, "failed to get events of shard=(%d)", shardNumber)
}

// loop over the records and delete them
var journals []*journal
for row := it.Next(); row != nil; row = it.Next() {
// cast the elt into the journal
if journal, ok := row.(*journal); ok {
// filter out the journal of the given shard number
if journal.ShardNumber == shardNumber {
journals = append(journals, journal)
}
}
}
// let us abort the transaction after fetching the matching records
txn.Abort()

// short circuit the operation when there are no records
if len(journals) == 0 {
return nil, 0, nil
}

var events []*egopb.Event
for _, journal := range journals {
// only fetch record which timestamp is greater than the offset
if journal.Timestamp > offset {
// unmarshal the event and the state
evt, err := toProto(journal.EventManifest, journal.EventPayload)
if err != nil {
return nil, 0, errors.Wrap(err, "failed to unmarshal the journal event")
}
state, err := toProto(journal.StateManifest, journal.StatePayload)
if err != nil {
return nil, 0, errors.Wrap(err, "failed to unmarshal the journal state")
}

// NumShards returns the total number of shards in the events store
// nolint
func (s *EventsStore) NumShards(ctx context.Context) (int, error) {
//TODO implement me
panic("implement me")
if uint64(len(events)) <= max {
// create the event and add it to the list of events
events = append(events, &egopb.Event{
PersistenceId: journal.PersistenceID,
SequenceNumber: journal.SequenceNumber,
IsDeleted: journal.IsDeleted,
Event: evt,
ResultingState: state,
Timestamp: journal.Timestamp,
Shard: journal.ShardNumber,
})
}
}
}

// short circuit the operation when there are no records
if len(events) == 0 {
return nil, 0, nil
}

// sort the subset by timestamp
sort.SliceStable(events, func(i, j int) bool {
return events[i].GetTimestamp() <= events[j].GetTimestamp()
})

// grab the next offset
nextOffset := events[len(events)-1].GetTimestamp()

return events, nextOffset, nil
}

// ShardNumbers returns the distinct list of all the shards in the journal store
// nolint
func (s *EventsStore) ShardNumbers(ctx context.Context) ([]uint64, error) {
//TODO implement me
panic("implement me")
// add a span context
ctx, span := telemetry.SpanContext(ctx, "eventsStore.NumShards")
defer span.End()

// check whether this instance of the journal is connected or not
if !s.connected.Load() {
return nil, errors.New("journal store is not connected")
}

// spawn a db transaction for read-only
txn := s.db.Txn(false)
// fetch all the records
it, err := txn.Get(journalTableName, persistenceIDIndex)
// handle the error
if err != nil {
// abort the transaction
txn.Abort()
return nil, errors.Wrap(err, "failed to fetch the list of shard number")
}

// loop over the records and delete them
var journals []*journal
for row := it.Next(); row != nil; row = it.Next() {
if journal, ok := row.(*journal); ok {
journals = append(journals, journal)
}
}
// let us abort the transaction after fetching the matching records
txn.Abort()

// short circuit the operation when there are no records
if len(journals) == 0 {
return nil, nil
}

// create a set to hold the unique list of shard numbers
shards := goset.NewSet[uint64]()
// iterate the list of journals and extract the shard numbers
for _, journal := range journals {
shards.Add(journal.ShardNumber)
}

// return the list
return shards.ToSlice(), nil
}

// toProto converts a byte array given its manifest into a valid proto message
Expand Down
Loading

0 comments on commit e13bae4

Please sign in to comment.