Skip to content

Commit a06fdb3

Browse files
authored
Fix epoch calc for range (#296)
* Fix epoch calc for range * Move certain log messages to v 4 * Remove redundant wait * gsfa: check index size * Cleanup
1 parent 826735f commit a06fdb3

File tree

5 files changed

+119
-18
lines changed

5 files changed

+119
-18
lines changed

grpc-server.go

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -187,9 +187,9 @@ func (multi *MultiEpoch) GetBlock(ctx context.Context, params *old_faithful_grpc
187187
return err
188188
}
189189
if slot == 0 {
190-
klog.V(5).Infof("car start to slot(0)::%s", blockCid)
190+
klog.V(4).Infof("car start to slot(0)::%s", blockCid)
191191
} else {
192-
klog.V(5).Infof(
192+
klog.V(4).Infof(
193193
"slot(%d)::%s to slot(%d)::%s",
194194
uint64(block.Meta.Parent_slot),
195195
parentBlockCid,
@@ -262,7 +262,7 @@ func (multi *MultiEpoch) GetBlock(ctx context.Context, params *old_faithful_grpc
262262
attribute.Int64("read_length", int64(length)),
263263
)
264264

265-
klog.V(5).Infof("prefetching CAR: start=%d length=%d (parent_offset=%d)", start, length, parentOffset)
265+
klog.V(4).Infof("prefetching CAR: start=%d length=%d (parent_offset=%d)", start, length, parentOffset)
266266

267267
// This is the actual disk read operation - likely significant seek time here
268268
readCtx, readSpan := telemetry.StartDiskIOSpan(prefetchCtx, "read_car_section", map[string]string{
@@ -548,7 +548,7 @@ func (multi *MultiEpoch) GetBlock(ctx context.Context, params *old_faithful_grpc
548548
}
549549
} else {
550550
if slot != 0 {
551-
klog.V(5).Infof("parent slot is in a different epoch, not implemented yet (can't get previousBlockhash)")
551+
klog.V(4).Infof("parent slot is in a different epoch, not implemented yet (can't get previousBlockhash)")
552552
}
553553
}
554554
parentSpan.End()
@@ -899,13 +899,18 @@ func (multi *MultiEpoch) StreamTransactions(params *old_faithful_grpc.StreamTran
899899
endSlot = *params.EndSlot
900900
}
901901
gsfaReader, epochNums := multi.getGsfaReadersInEpochDescendingOrderForSlotRange(ctx, startSlot, endSlot)
902+
wantedEpochs := slottools.CalcEpochsForSlotRange(startSlot, endSlot)
903+
klog.V(4).Infof("Streaming transactions from slots %d to %d, epochs %v", startSlot, endSlot, wantedEpochs)
902904

903905
gsfaReadersLoaded := true
904906
if len(epochNums) == 0 {
905-
klog.V(5).Info("The requested slot range does not have any GSFA readers loaded, will use the default method")
907+
klog.V(4).Info("The requested slot range does not have any GSFA readers loaded, will use the default method")
906908
gsfaReadersLoaded = false
907909
} else {
908-
klog.V(5).Infof("Using GSFA readers for epochs: %v", epochNums)
910+
klog.V(4).Infof("Using GSFA readers for epochs: %v; wanted epochs: %v", epochNums, wantedEpochs)
911+
if len(epochNums) < len(wantedEpochs) {
912+
klog.V(4).Infof("Not all epochs in the requested slot range have GSFA readers loaded")
913+
}
909914
}
910915

911916
return multi.processSlotTransactions(ctx, ser, startSlot, endSlot, params.Filter, gsfaReader, gsfaReadersLoaded)
@@ -930,7 +935,7 @@ func (multi *MultiEpoch) processSlotTransactions(
930935
}
931936

932937
if filter == nil || len(filter.AccountInclude) == 0 || !gsfaReadersLoaded {
933-
klog.V(5).Infof("Using the old faithful method for streaming transactions from slots %d to %d", startSlot, endSlot)
938+
klog.V(4).Infof("Using the old faithful method for streaming transactions from slots %d to %d", startSlot, endSlot)
934939

935940
for slot := startSlot; slot <= endSlot; slot++ {
936941
select {
@@ -999,7 +1004,7 @@ func (multi *MultiEpoch) processSlotTransactions(
9991004
return nil
10001005
}
10011006

1002-
klog.V(5).Infof("Using GSFA reader for streaming transactions from slots %d to %d", startSlot, endSlot)
1007+
klog.V(4).Infof("Using GSFA reader for streaming transactions from slots %d to %d", startSlot, endSlot)
10031008

10041009
const batchSize = 100
10051010
buffer := newTxBuffer(uint64(startSlot), uint64(endSlot))
@@ -1053,7 +1058,7 @@ func (multi *MultiEpoch) processSlotTransactions(
10531058
return err
10541059
}
10551060
duration := time.Since(startTime)
1056-
klog.V(5).Infof("GSFA query completed for account %s, from slot %d to %d took %s", pKey.String(), startSlot, endSlot, duration)
1061+
klog.V(4).Infof("GSFA query completed for account %s, from slot %d to %d took %s", pKey.String(), startSlot, endSlot, duration)
10571062

10581063
for epochNumber, txns := range epochToTxns {
10591064
epochHandler, err := multi.GetEpoch(epochNumber)
@@ -1118,7 +1123,6 @@ func (multi *MultiEpoch) processSlotTransactions(
11181123
// Handle any errors
11191124
klog.V(5).Infof("Checking for errors from goroutines")
11201125
errCheckStartTime := time.Now()
1121-
wg.Wait()
11221126
select {
11231127
case err := <-wgWaitToChannel(wg):
11241128
if err != nil {
@@ -1139,11 +1143,11 @@ func (multi *MultiEpoch) processSlotTransactions(
11391143
if err := buffer.flush(ser); err != nil {
11401144
return err
11411145
}
1142-
klog.V(5).Infof("Buffer flush completed in %s", time.Since(flushStartTime))
1146+
klog.V(4).Infof("Buffer flush completed in %s", time.Since(flushStartTime))
11431147

11441148
// If we got here with no transactions (buffer is empty), send an empty response
11451149
if len(buffer.items) == 0 {
1146-
klog.V(5).Infof("No transactions found for the requested accounts, sending empty response")
1150+
klog.V(4).Infof("No transactions found for the requested accounts, sending empty response")
11471151
emptyResp := &old_faithful_grpc.TransactionResponse{
11481152
Slot: startSlot,
11491153
// Include other required fields as needed

gsfa/gsfa-read.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,17 @@ func isDir(path string) (bool, error) {
3030
return info.IsDir(), nil
3131
}
3232

33+
func sizeOfFile(path string) (int64, error) {
34+
info, err := os.Stat(path)
35+
if err != nil {
36+
return 0, fmt.Errorf("error while getting file info for %s: %w", path, err)
37+
}
38+
if info.IsDir() {
39+
return 0, fmt.Errorf("expected a file, but got a directory: %s", path)
40+
}
41+
return info.Size(), nil
42+
}
43+
3344
// NewGsfaReader opens an existing index in READ-ONLY mode.
3445
func NewGsfaReader(indexRootDir string) (*GsfaReader, error) {
3546
if ok, err := isDir(indexRootDir); err != nil {
@@ -40,6 +51,19 @@ func NewGsfaReader(indexRootDir string) (*GsfaReader, error) {
4051
index := &GsfaReader{}
4152
{
4253
offsetsIndex := filepath.Join(indexRootDir, string(indexes.Kind_PubkeyToOffsetAndSize)+".index")
54+
// check size; if is 1733 bytes, then it's the buggy version of the index.
55+
size, err := sizeOfFile(offsetsIndex)
56+
if err != nil {
57+
return nil, fmt.Errorf("error while getting size of offsets index file %s: %w", offsetsIndex, err)
58+
}
59+
if size == 1733 {
60+
return nil, fmt.Errorf("the offsets index file %s is the buggy version (1733 bytes); "+
61+
"please delete the whole gsfa index and re-create it with the latest old-faithful-cli", offsetsIndex)
62+
} else if size < 1733 {
63+
return nil, fmt.Errorf("the offsets index file %s is too small (%d bytes); "+
64+
"please delete the whole gsfa index and re-create it with the latest old-faithful-cli. "+
65+
"Expected at least 1733 bytes, got %d bytes", offsetsIndex, size, size)
66+
}
4367
offsets, err := indexes.Open_PubkeyToOffsetAndSize(offsetsIndex)
4468
if err != nil {
4569
return nil, fmt.Errorf("error while opening offsets index: %w", err)

multiepoch-getSignaturesForAddress.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -48,13 +48,13 @@ func (ser *MultiEpoch) getGsfaReadersInEpochDescendingOrderForSlotRange(ctx cont
4848
ser.mu.RLock()
4949
defer ser.mu.RUnlock()
5050

51-
startEpoch := slottools.CalcEpochForSlot(startSlot)
52-
endEpoch := slottools.CalcEpochForSlot(endSlot)
53-
54-
epochs := make([]*Epoch, 0, endEpoch-startEpoch+1)
55-
for _, epoch := range ser.epochs {
56-
if epoch.Epoch() >= startEpoch && epoch.Epoch() <= endEpoch {
51+
wantedEpochs := slottools.CalcEpochsForSlotRange(startSlot, endSlot)
52+
epochs := make([]*Epoch, 0, len(wantedEpochs))
53+
for _, wantedEpoch := range wantedEpochs {
54+
if epoch, ok := ser.epochs[wantedEpoch]; ok {
5755
epochs = append(epochs, epoch)
56+
} else {
57+
klog.Warningf("epoch %d not found in multiepoch", wantedEpoch)
5858
}
5959
}
6060

slottools/edges.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,3 +44,23 @@ func Uint64ToLEBytes(v uint64) []byte {
4444
func Uint64FromLEBytes(buf []byte) uint64 {
4545
return binary.LittleEndian.Uint64(buf)
4646
}
47+
48+
func CalcEpochsForSlotRange(startSlot, endSlot uint64) []uint64 {
49+
epochStart := CalcEpochForSlot(startSlot)
50+
epochEnd := CalcEpochForSlot(endSlot)
51+
return calcRangeInclusive(epochStart, epochEnd)
52+
}
53+
54+
func calcRangeInclusive(start, end uint64) []uint64 {
55+
if start == end {
56+
return []uint64{start} // if start and end are the same, return a slice with that single value
57+
}
58+
if start > end {
59+
end, start = start, end // ensure start is less than or equal to end
60+
}
61+
rangeSlice := make([]uint64, end-start+1)
62+
for i := range rangeSlice {
63+
rangeSlice[i] = start + uint64(i)
64+
}
65+
return rangeSlice
66+
}

slottools/edges_test.go

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package slottools
22

33
import (
4+
"fmt"
5+
"reflect"
46
"testing"
57

68
"github.com/stretchr/testify/require"
@@ -66,3 +68,54 @@ func TestUint64RangesHavePartialOverlapIncludingEdges(t *testing.T) {
6668
require.True(t, Uint64RangesHavePartialOverlapIncludingEdges(r1, r2))
6769
}
6870
}
71+
72+
func TestRanges(t *testing.T) {
73+
got := calcRangeInclusive(0, 5)
74+
if !reflect.DeepEqual(got, []uint64{0, 1, 2, 3, 4, 5}) {
75+
panic(fmt.Sprintf("calcRangeInclusive(0, 5) = %v, want [0 1 2 3 4 5]", got))
76+
}
77+
got = calcRangeInclusive(5, 0)
78+
if !reflect.DeepEqual(got, []uint64{0, 1, 2, 3, 4, 5}) {
79+
panic(fmt.Sprintf("calcRangeInclusive(5, 0) = %v, want [0 1 2 3 4 5]", got))
80+
}
81+
got = calcRangeInclusive(3, 3)
82+
if !reflect.DeepEqual(got, []uint64{3}) {
83+
panic(fmt.Sprintf("calcRangeInclusive(3, 3) = %v, want [3]", got))
84+
}
85+
got = calcRangeInclusive(0, 0)
86+
if !reflect.DeepEqual(got, []uint64{0}) {
87+
panic(fmt.Sprintf("calcRangeInclusive(0, 0) = %v, want [0]", got))
88+
}
89+
got = calcRangeInclusive(10, 15)
90+
if !reflect.DeepEqual(got, []uint64{10, 11, 12, 13, 14, 15}) {
91+
panic(fmt.Sprintf("calcRangeInclusive(10, 15) = %v, want [10 11 12 13 14 15]", got))
92+
}
93+
got = calcRangeInclusive(15, 10)
94+
if !reflect.DeepEqual(got, []uint64{10, 11, 12, 13, 14, 15}) {
95+
panic(fmt.Sprintf("calcRangeInclusive(15, 10) = %v, want [10 11 12 13 14 15]", got))
96+
}
97+
}
98+
99+
func TestEpochRange(t *testing.T) {
100+
{
101+
gotEpochs := CalcEpochsForSlotRange(0, 431999)
102+
wantEpochs := []uint64{0}
103+
if !reflect.DeepEqual(gotEpochs, wantEpochs) {
104+
t.Errorf("CalcEpochsForSlotRange(0, 431999) = %v, want %v", gotEpochs, wantEpochs)
105+
}
106+
}
107+
{
108+
gotEpochs := CalcEpochsForSlotRange(432000, 863999)
109+
wantEpochs := []uint64{1}
110+
if !reflect.DeepEqual(gotEpochs, wantEpochs) {
111+
t.Errorf("CalcEpochsForSlotRange(432000, 863999) = %v, want %v", gotEpochs, wantEpochs)
112+
}
113+
}
114+
{
115+
gotEpochs := CalcEpochsForSlotRange(343169187, 345302213)
116+
wantEpochs := []uint64{
117+
794, 795, 796, 797, 798, 799,
118+
}
119+
require.Equal(t, wantEpochs, gotEpochs, "CalcEpochsForSlotRange(343169187, 345302213) should return the expected epochs")
120+
}
121+
}

0 commit comments

Comments
 (0)