Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

neil's ci branch. ignore me. #8717

Draft
wants to merge 7 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
784 changes: 403 additions & 381 deletions go/gen/proto/dolt/services/remotesapi/v1alpha1/chunkstore.pb.go

Large diffs are not rendered by default.

7 changes: 6 additions & 1 deletion go/libraries/doltcore/remotesrv/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,12 @@ func (rs *RemoteChunkStore) StreamDownloadLocations(stream remotesapi.ChunkStore
var ranges []*remotesapi.RangeChunk
for h, r := range hashToRange {
hCpy := h
ranges = append(ranges, &remotesapi.RangeChunk{Hash: hCpy[:], Offset: r.Offset, Length: r.Length})
ranges = append(ranges, &remotesapi.RangeChunk{
Hash: hCpy[:],
Offset: r.Offset,
Length: r.Length,
DictionaryOffset: r.DictOffset,
DictionaryLength: r.DictLength})
}

url := rs.getDownloadUrl(md, prefix+"/"+loc)
Expand Down
10 changes: 8 additions & 2 deletions go/libraries/doltcore/remotesrv/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,12 +94,18 @@ func (fh filehandler) ServeHTTP(respWr http.ResponseWriter, req *http.Request) {
respWr.WriteHeader(http.StatusBadRequest)
return
}
_, ok := hash.MaybeParse(path[i+1:])

fileName := path[i+1:]
if strings.HasSuffix(fileName, ".darc") {
fileName = fileName[:len(fileName)-5]
}
_, ok := hash.MaybeParse(fileName)
if !ok {
logger.WithField("last_path_component", path[i+1:]).Warn("bad request with unparseable last path component")
logger.WithField("last_path_component", fileName).Warn("bad request with unparseable last path component")
respWr.WriteHeader(http.StatusBadRequest)
return
}

abs, err := fh.fs.Abs(path)
if err != nil {
logger.WithError(err).Error("could not get absolute path")
Expand Down
10 changes: 0 additions & 10 deletions go/libraries/doltcore/remotesrv/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package remotesrv
import (
"context"
"crypto/tls"
"errors"
"net"
"net/http"
"strings"
Expand All @@ -29,7 +28,6 @@ import (
"google.golang.org/grpc"

remotesapi "github.com/dolthub/dolt/go/gen/proto/dolt/services/remotesapi/v1alpha1"
"github.com/dolthub/dolt/go/libraries/doltcore/env"
"github.com/dolthub/dolt/go/libraries/utils/filesys"
)

Expand Down Expand Up @@ -80,14 +78,6 @@ func NewServer(args ServerArgs) (*Server, error) {
args.Logger = logrus.NewEntry(logrus.StandardLogger())
}

storageMetadata, err := env.GetMultiEnvStorageMetadata(args.FS)
if err != nil {
return nil, err
}
if storageMetadata.ArchiveFilesPresent() {
return nil, errors.New("archive files present. Please run `dolt archive --revert` before running the server.")
}

s := new(Server)
s.stopChan = make(chan struct{})

Expand Down
8 changes: 4 additions & 4 deletions go/libraries/doltcore/remotestorage/chunk_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,20 @@ import (
// ChunkCache is an interface used for caching chunks
type ChunkCache interface {
// Put puts a slice of chunks into the cache.
Put(c []nbs.CompressedChunk) bool
Put(c []nbs.ToChunker) bool

// Get gets a map of hash to chunk for a set of hashes. In the event that a chunk is not in the cache, chunks.Empty.
// is put in it's place
Get(h hash.HashSet) map[hash.Hash]nbs.CompressedChunk
Get(h hash.HashSet) map[hash.Hash]nbs.ToChunker

// Has takes a set of hashes and returns the set of hashes that the cache currently does not have in it.
Has(h hash.HashSet) (absent hash.HashSet)

// PutChunk puts a single chunk in the cache. true returns in the event that the chunk was cached successfully
// and false is returned if that chunk is already is the cache.
PutChunk(chunk nbs.CompressedChunk) bool
PutChunk(chunk nbs.ToChunker) bool

// GetAndClearChunksToFlush gets a map of hash to chunk which includes all the chunks that were put in the cache
// between the last time GetAndClearChunksToFlush was called and now.
GetAndClearChunksToFlush() map[hash.Hash]nbs.CompressedChunk
GetAndClearChunksToFlush() map[hash.Hash]nbs.ToChunker
}
21 changes: 15 additions & 6 deletions go/libraries/doltcore/remotestorage/chunk_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,14 @@ type ChunkFetcher struct {
eg *errgroup.Group
egCtx context.Context

// toGetCh is the channel used to request chunks. This will be initially given a root,
// and as refs are found, they will be added to the channel for workers to batch and request. NM4.
toGetCh chan hash.HashSet
resCh chan nbs.CompressedChunk

// resCh is the results channel for the fetcher. It is used both to return
// chunks themselves, and to indicate which chunks were requested but missing
// buy having a Hash, but are empty. NM4.
resCh chan nbs.ToChunker

abortCh chan struct{}
stats StatsRecorder
Expand All @@ -69,7 +75,7 @@ func NewChunkFetcher(ctx context.Context, dcs *DoltChunkStore) *ChunkFetcher {
egCtx: ctx,

toGetCh: make(chan hash.HashSet),
resCh: make(chan nbs.CompressedChunk),
resCh: make(chan nbs.ToChunker),

abortCh: make(chan struct{}),
stats: StatsFactory(),
Expand Down Expand Up @@ -123,7 +129,7 @@ func (f *ChunkFetcher) CloseSend() error {
// by |Get|. Returns |io.EOF| after |CloseSend| is called and all requested
// chunks have been successfully received. Returns an error if this
// |ChunkFetcher| is terminally failed or if the supplied |ctx| is |Done|.
func (f *ChunkFetcher) Recv(ctx context.Context) (nbs.CompressedChunk, error) {
func (f *ChunkFetcher) Recv(ctx context.Context) (nbs.ToChunker, error) {
select {
case <-ctx.Done():
return nbs.CompressedChunk{}, context.Cause(ctx)
Expand Down Expand Up @@ -219,7 +225,7 @@ func fetcherHashSetToGetDlLocsReqsThread(ctx context.Context, reqCh chan hash.Ha
// delivered in |reqCh|, and they will be delivered in order.
//
// This function handles backoff and retries for the underlying streaming RPC.
func fetcherRPCDownloadLocsThread(ctx context.Context, reqCh chan *remotesapi.GetDownloadLocsRequest, resCh chan []*remotesapi.DownloadLoc, client remotesapi.ChunkStoreServiceClient, storeRepoToken func(string), missingChunkCh chan nbs.CompressedChunk, host string) error {
func fetcherRPCDownloadLocsThread(ctx context.Context, reqCh chan *remotesapi.GetDownloadLocsRequest, resCh chan []*remotesapi.DownloadLoc, client remotesapi.ChunkStoreServiceClient, storeRepoToken func(string), missingChunkCh chan nbs.ToChunker, host string) error {
stream, err := reliable.MakeCall[*remotesapi.GetDownloadLocsRequest, *remotesapi.GetDownloadLocsResponse](
ctx,
reliable.CallOptions[*remotesapi.GetDownloadLocsRequest, *remotesapi.GetDownloadLocsResponse]{
Expand Down Expand Up @@ -255,6 +261,7 @@ func fetcherRPCDownloadLocsThread(ctx context.Context, reqCh chan *remotesapi.Ge
})
eg.Go(func() error {
for {
// NM4 - Where there responses come back - resp is an rpc struct. NM4.
resp, err := stream.Recv()
if err == io.EOF {
close(resCh)
Expand Down Expand Up @@ -300,6 +307,7 @@ func getMissingChunks(req *remotesapi.GetDownloadLocsRequest, resp *remotesapi.G
numRequested := len(req.ChunkHashes)
numResponded := 0
for _, loc := range resp.Locs {
// NM4 - Looky here.
hgr := loc.Location.(*remotesapi.DownloadLoc_HttpGetRange).HttpGetRange
numResponded += len(hgr.Ranges)
}
Expand Down Expand Up @@ -362,6 +370,7 @@ func (d downloads) Add(resp *remotesapi.DownloadLoc) {
d.refreshes[path] = refresh
}
for _, r := range gr.Ranges {
// NM4 - this is where the offset is read!! do something here or nearby.
d.ranges.Insert(gr.Url, r.Hash, r.Offset, r.Length)
}
}
Expand Down Expand Up @@ -527,7 +536,7 @@ func (cc *ConcurrencyControl) Run(ctx context.Context, done <-chan struct{}, ss
}
}

func fetcherDownloadURLThreads(ctx context.Context, fetchReqCh chan fetchReq, doneCh chan struct{}, chunkCh chan nbs.CompressedChunk, client remotesapi.ChunkStoreServiceClient, stats StatsRecorder, fetcher HTTPFetcher, params NetworkRequestParams) error {
func fetcherDownloadURLThreads(ctx context.Context, fetchReqCh chan fetchReq, doneCh chan struct{}, chunkCh chan nbs.ToChunker, client remotesapi.ChunkStoreServiceClient, stats StatsRecorder, fetcher HTTPFetcher, params NetworkRequestParams) error {
eg, ctx := errgroup.WithContext(ctx)
cc := &ConcurrencyControl{
MaxConcurrency: params.MaximumConcurrentDownloads,
Expand Down Expand Up @@ -559,7 +568,7 @@ func fetcherDownloadURLThreads(ctx context.Context, fetchReqCh chan fetchReq, do
return nil
}

func fetcherDownloadURLThread(ctx context.Context, fetchReqCh chan fetchReq, doneCh <-chan struct{}, chunkCh chan nbs.CompressedChunk, client remotesapi.ChunkStoreServiceClient, stats StatsRecorder, health reliable.HealthRecorder, fetcher HTTPFetcher, params NetworkRequestParams) error {
func fetcherDownloadURLThread(ctx context.Context, fetchReqCh chan fetchReq, doneCh <-chan struct{}, chunkCh chan nbs.ToChunker, client remotesapi.ChunkStoreServiceClient, stats StatsRecorder, health reliable.HealthRecorder, fetcher HTTPFetcher, params NetworkRequestParams) error {
respCh := make(chan fetchResp)
cancelCh := make(chan struct{})
for {
Expand Down
16 changes: 8 additions & 8 deletions go/libraries/doltcore/remotestorage/chunk_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ func (dcs *DoltChunkStore) Get(ctx context.Context, h hash.Hash) (chunks.Chunk,
func (dcs *DoltChunkStore) GetMany(ctx context.Context, hashes hash.HashSet, found func(context.Context, *chunks.Chunk)) error {
ae := atomicerr.New()
decompressedSize := uint64(0)
err := dcs.GetManyCompressed(ctx, hashes, func(ctx context.Context, cc nbs.CompressedChunk) {
err := dcs.GetManyCompressed(ctx, hashes, func(ctx context.Context, cc nbs.ToChunker) {
if ae.IsSet() {
return
}
Expand All @@ -340,7 +340,7 @@ func (dcs *DoltChunkStore) GetMany(ctx context.Context, hashes hash.HashSet, fou

// GetMany gets the Chunks with |hashes| from the store. On return, |foundChunks| will have been fully sent all chunks
// which have been found. Any non-present chunks will silently be ignored.
func (dcs *DoltChunkStore) GetManyCompressed(ctx context.Context, hashes hash.HashSet, found func(context.Context, nbs.CompressedChunk)) error {
func (dcs *DoltChunkStore) GetManyCompressed(ctx context.Context, hashes hash.HashSet, found func(context.Context, nbs.ToChunker)) error {
ctx, span := tracer.Start(ctx, "remotestorage.GetManyCompressed")
defer span.End()

Expand All @@ -353,7 +353,7 @@ func (dcs *DoltChunkStore) GetManyCompressed(ctx context.Context, hashes hash.Ha
for h := range hashes {
c := hashToChunk[h]

if c.IsEmpty() {
if c == nil || c.IsEmpty() {
notCached = append(notCached, h)
} else {
found(ctx, c)
Expand Down Expand Up @@ -432,7 +432,7 @@ func sortRangesBySize(ranges []*GetRange) {

type resourcePathToUrlFunc func(ctx context.Context, lastError error, resourcePath string) (url string, err error)

func (gr *GetRange) GetDownloadFunc(ctx context.Context, stats StatsRecorder, health reliable.HealthRecorder, fetcher HTTPFetcher, params NetworkRequestParams, chunkChan chan nbs.CompressedChunk, pathToUrl resourcePathToUrlFunc) func() error {
func (gr *GetRange) GetDownloadFunc(ctx context.Context, stats StatsRecorder, health reliable.HealthRecorder, fetcher HTTPFetcher, params NetworkRequestParams, chunkChan chan nbs.ToChunker, pathToUrl resourcePathToUrlFunc) func() error {
if len(gr.Ranges) == 0 {
return func() error { return nil }
}
Expand Down Expand Up @@ -574,7 +574,7 @@ type RepoRequest interface {
SetRepoPath(string)
}

func (dcs *DoltChunkStore) readChunksAndCache(ctx context.Context, hashes []hash.Hash, found func(context.Context, nbs.CompressedChunk)) (err error) {
func (dcs *DoltChunkStore) readChunksAndCache(ctx context.Context, hashes []hash.Hash, found func(context.Context, nbs.ToChunker)) (err error) {
toSend := hash.NewHashSet(hashes...)

fetcher := dcs.ChunkFetcher(ctx)
Expand Down Expand Up @@ -603,7 +603,7 @@ func (dcs *DoltChunkStore) readChunksAndCache(ctx context.Context, hashes []hash
return err
}
// Don't forward on empty/not found chunks.
if len(cc.CompressedData) > 0 {
if !cc.IsEmpty() {
if dcs.cache.PutChunk(cc) {
return ErrCacheCapacityExceeded
}
Expand Down Expand Up @@ -644,7 +644,7 @@ func (dcs *DoltChunkStore) HasMany(ctx context.Context, hashes hash.HashSet) (ha
hashSl, byteSl := HashSetToSlices(notCached)

absent := make(hash.HashSet)
var found []nbs.CompressedChunk
var found []nbs.ToChunker
var err error

batchItr(len(hashSl), maxHasManyBatchSize, func(st, end int) (stop bool) {
Expand Down Expand Up @@ -738,7 +738,7 @@ func (dcs *DoltChunkStore) Put(ctx context.Context, c chunks.Chunk, getAddrs chu
}

cc := nbs.ChunkToCompressedChunk(c)
if dcs.cache.Put([]nbs.CompressedChunk{cc}) {
if dcs.cache.Put([]nbs.ToChunker{cc}) {
return ErrCacheCapacityExceeded
}
return nil
Expand Down
28 changes: 14 additions & 14 deletions go/libraries/doltcore/remotestorage/map_chunk_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,16 @@ import (
// mapChunkCache is a ChunkCache implementation that stores everything in an in memory map.
type mapChunkCache struct {
mu *sync.Mutex
hashToChunk map[hash.Hash]nbs.CompressedChunk
toFlush map[hash.Hash]nbs.CompressedChunk
hashToChunk map[hash.Hash]nbs.ToChunker
toFlush map[hash.Hash]nbs.ToChunker
cm CapacityMonitor
}

func newMapChunkCache() *mapChunkCache {
return &mapChunkCache{
&sync.Mutex{},
make(map[hash.Hash]nbs.CompressedChunk),
make(map[hash.Hash]nbs.CompressedChunk),
make(map[hash.Hash]nbs.ToChunker),
make(map[hash.Hash]nbs.ToChunker),
NewUncappedCapacityMonitor(),
}
}
Expand All @@ -42,14 +42,14 @@ func newMapChunkCache() *mapChunkCache {
func NewMapChunkCacheWithMaxCapacity(maxCapacity int64) *mapChunkCache {
return &mapChunkCache{
&sync.Mutex{},
make(map[hash.Hash]nbs.CompressedChunk),
make(map[hash.Hash]nbs.CompressedChunk),
make(map[hash.Hash]nbs.ToChunker),
make(map[hash.Hash]nbs.ToChunker),
NewFixedCapacityMonitor(maxCapacity),
}
}

// Put puts a slice of chunks into the cache.
func (mcc *mapChunkCache) Put(chnks []nbs.CompressedChunk) bool {
func (mcc *mapChunkCache) Put(chnks []nbs.ToChunker) bool {
mcc.mu.Lock()
defer mcc.mu.Unlock()

Expand All @@ -63,7 +63,7 @@ func (mcc *mapChunkCache) Put(chnks []nbs.CompressedChunk) bool {
}
}

if mcc.cm.CapacityExceeded(len(c.FullCompressedChunk)) {
if mcc.cm.CapacityExceeded(int(c.FullCompressedChunkLen())) {
return true
}

Expand All @@ -79,8 +79,8 @@ func (mcc *mapChunkCache) Put(chnks []nbs.CompressedChunk) bool {

// Get gets a map of hash to chunk for a set of hashes. In the event that a chunk is not in the cache, chunks.Empty.
// is put in it's place
func (mcc *mapChunkCache) Get(hashes hash.HashSet) map[hash.Hash]nbs.CompressedChunk {
hashToChunk := make(map[hash.Hash]nbs.CompressedChunk)
func (mcc *mapChunkCache) Get(hashes hash.HashSet) map[hash.Hash]nbs.ToChunker {
hashToChunk := make(map[hash.Hash]nbs.ToChunker)

mcc.mu.Lock()
defer mcc.mu.Unlock()
Expand Down Expand Up @@ -112,13 +112,13 @@ func (mcc *mapChunkCache) Has(hashes hash.HashSet) (absent hash.HashSet) {
return absent
}

func (mcc *mapChunkCache) PutChunk(ch nbs.CompressedChunk) bool {
func (mcc *mapChunkCache) PutChunk(ch nbs.ToChunker) bool {
mcc.mu.Lock()
defer mcc.mu.Unlock()

h := ch.Hash()
if existing, ok := mcc.hashToChunk[h]; !ok || existing.IsEmpty() {
if mcc.cm.CapacityExceeded(len(ch.FullCompressedChunk)) {
if mcc.cm.CapacityExceeded(int(ch.FullCompressedChunkLen())) {
return true
}
mcc.hashToChunk[h] = ch
Expand All @@ -130,8 +130,8 @@ func (mcc *mapChunkCache) PutChunk(ch nbs.CompressedChunk) bool {

// GetAndClearChunksToFlush gets a map of hash to chunk which includes all the chunks that were put in the cache
// between the last time GetAndClearChunksToFlush was called and now.
func (mcc *mapChunkCache) GetAndClearChunksToFlush() map[hash.Hash]nbs.CompressedChunk {
newToFlush := make(map[hash.Hash]nbs.CompressedChunk)
func (mcc *mapChunkCache) GetAndClearChunksToFlush() map[hash.Hash]nbs.ToChunker {
newToFlush := make(map[hash.Hash]nbs.ToChunker)

mcc.mu.Lock()
defer mcc.mu.Unlock()
Expand Down
6 changes: 3 additions & 3 deletions go/libraries/doltcore/remotestorage/map_chunk_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ import (
"github.com/dolthub/dolt/go/store/nbs"
)

func genRandomChunks(rng *rand.Rand, n int) (hash.HashSet, []nbs.CompressedChunk) {
chks := make([]nbs.CompressedChunk, n)
func genRandomChunks(rng *rand.Rand, n int) (hash.HashSet, []nbs.ToChunker) {
chks := make([]nbs.ToChunker, n)
hashes := make(hash.HashSet)
for i := 0; i < n; i++ {
size := int(rng.Int31n(99) + 1)
Expand Down Expand Up @@ -88,7 +88,7 @@ func TestMapChunkCache(t *testing.T) {

toFlush = mapChunkCache.GetAndClearChunksToFlush()

expected := map[hash.Hash]nbs.CompressedChunk{moreChks[0].Hash(): moreChks[0]}
expected := map[hash.Hash]nbs.ToChunker{moreChks[0].Hash(): moreChks[0]}
eq := reflect.DeepEqual(toFlush, expected)
assert.True(t, eq, "Missing or unexpected chunks to flush (seed %d)", seed)
}
10 changes: 5 additions & 5 deletions go/libraries/doltcore/remotestorage/noop_chunk_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,22 +29,22 @@ var noopChunkCache = &noopChunkCacheImpl{}
type noopChunkCacheImpl struct {
}

func (*noopChunkCacheImpl) Put(chnks []nbs.CompressedChunk) bool {
func (*noopChunkCacheImpl) Put(chnks []nbs.ToChunker) bool {
return false
}

func (*noopChunkCacheImpl) Get(hashes hash.HashSet) map[hash.Hash]nbs.CompressedChunk {
return make(map[hash.Hash]nbs.CompressedChunk)
func (*noopChunkCacheImpl) Get(hashes hash.HashSet) map[hash.Hash]nbs.ToChunker {
return make(map[hash.Hash]nbs.ToChunker)
}

func (*noopChunkCacheImpl) Has(hashes hash.HashSet) (absent hash.HashSet) {
return hashes
}

func (*noopChunkCacheImpl) PutChunk(ch nbs.CompressedChunk) bool {
func (*noopChunkCacheImpl) PutChunk(ch nbs.ToChunker) bool {
return false
}

func (*noopChunkCacheImpl) GetAndClearChunksToFlush() map[hash.Hash]nbs.CompressedChunk {
func (*noopChunkCacheImpl) GetAndClearChunksToFlush() map[hash.Hash]nbs.ToChunker {
panic("noopChunkCache does not support GetAndClearChunksToFlush().")
}
4 changes: 2 additions & 2 deletions go/store/cmd/noms/noms_manifest.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func runManifest(ctx context.Context, args []string) int {
nbsFiles := make([]NbsFile, numSpecs)
for i := 0; i < numSpecs; i++ {
tableSpecInfo := manifest.GetTableSpecInfo(i)
path := filepath.Join(spec.DatabaseName, tableSpecInfo.GetName())
path := filepath.Join(spec.DatabaseName, tableSpecInfo.GetFileName())
fileInfo, err := os.Stat(path)
nbsFiles[i] = NbsFile{tableSpecInfo, fileInfo, err}
}
Expand All @@ -130,7 +130,7 @@ func runManifest(ctx context.Context, args []string) int {
fmt.Println(" referenced nbs files:")

for _, nbsFile := range nbsFiles {
name := nbsFile.manifestSpec.GetName()
name := nbsFile.manifestSpec.GetFileName()
chunkCnt := nbsFile.manifestSpec.GetChunkCount()
sizeStr := nbsFile.sizeStr()
existsStr := nbsFile.fileInfoErr == nil
Expand Down
Loading
Loading