Skip to content

Commit 2909e9e

Browse files
fix private host discovery (#18)
* fix host * fix host lookup * remove log * fix check * add another log * more logs * remove rtt check * improve duplicate host cleanup * remove logs
1 parent a300eab commit 2909e9e

File tree

4 files changed

+49
-10
lines changed

4 files changed

+49
-10
lines changed

pkg/client.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -349,6 +349,12 @@ func (c *BlobCacheClient) getGRPCClient(ctx context.Context, request *ClientRequ
349349

350350
if resp.Ok {
351351
Logger.Infof("Content repopulated from source: %s\n", entry.SourcePath)
352+
c.mu.Lock()
353+
c.localHostCache[request.hash] = &localClientCache{
354+
host: host,
355+
timestamp: time.Now(),
356+
}
357+
c.mu.Unlock()
352358
return closestClient, host, nil
353359
}
354360

pkg/discovery.go

Lines changed: 38 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ func (d *DiscoveryClient) discoverHostsViaTailscale(ctx context.Context) ([]*Blo
129129
}
130130

131131
func (d *DiscoveryClient) discoverHostsViaMetadata(ctx context.Context) ([]*BlobCacheHost, error) {
132-
hosts, err := d.metadata.GetAvailableHosts(ctx)
132+
hosts, err := d.metadata.GetAvailableHosts(ctx, d.hostMap.Remove)
133133
if err != nil {
134134
return nil, err
135135
}
@@ -140,18 +140,16 @@ func (d *DiscoveryClient) discoverHostsViaMetadata(ctx context.Context) ([]*Blob
140140

141141
for _, host := range hosts {
142142
if host.PrivateAddr != "" {
143-
addr := fmt.Sprintf("%s:%d", host.PrivateAddr, d.cfg.Port)
144-
145143
// Don't try to get the state on peers we're already aware of
146-
if d.hostMap.Get(addr) != nil {
144+
if d.hostMap.Get(host.Addr) != nil {
147145
continue
148146
}
149147

150148
wg.Add(1)
151149
go func(addr string) {
152150
defer wg.Done()
153151

154-
hostState, err := d.GetHostState(ctx, addr)
152+
hostState, err := d.GetHostStateViaMetadata(ctx, addr, host.PrivateAddr)
155153
if err != nil {
156154
return
157155
}
@@ -161,7 +159,7 @@ func (d *DiscoveryClient) discoverHostsViaMetadata(ctx context.Context) ([]*Blob
161159
mu.Unlock()
162160

163161
Logger.Debugf("Added host with private address to map: %s", hostState.PrivateAddr)
164-
}(addr)
162+
}(host.Addr)
165163
}
166164
}
167165

@@ -243,3 +241,37 @@ func (d *DiscoveryClient) GetHostState(ctx context.Context, addr string) (*BlobC
243241

244242
return &host, nil
245243
}
244+
245+
func (d *DiscoveryClient) GetHostStateViaMetadata(ctx context.Context, addr, privateAddr string) (*BlobCacheHost, error) {
246+
host := BlobCacheHost{
247+
Addr: addr,
248+
RTT: 0,
249+
PrivateAddr: privateAddr,
250+
CapacityUsagePct: 0.0,
251+
}
252+
253+
var dialOpts = []grpc.DialOption{
254+
grpc.WithTransportCredentials(insecure.NewCredentials()),
255+
}
256+
257+
conn, err := grpc.Dial(privateAddr, dialOpts...)
258+
if err != nil {
259+
return nil, err
260+
}
261+
defer conn.Close()
262+
263+
c := proto.NewBlobCacheClient(conn)
264+
resp, err := c.GetState(ctx, &proto.GetStateRequest{})
265+
if err != nil {
266+
return nil, err
267+
}
268+
269+
host.RTT = 0
270+
host.CapacityUsagePct = float64(resp.GetCapacityUsagePct())
271+
272+
if resp.GetVersion() != BlobCacheVersion {
273+
return nil, fmt.Errorf("version mismatch: %s != %s", resp.GetVersion(), BlobCacheVersion)
274+
}
275+
276+
return &host, nil
277+
}

pkg/metadata.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -287,7 +287,7 @@ func (m *BlobCacheMetadata) GetFsNodeChildren(ctx context.Context, id string) ([
287287
return entries, nil
288288
}
289289

290-
func (m *BlobCacheMetadata) GetAvailableHosts(ctx context.Context) ([]*BlobCacheHost, error) {
290+
func (m *BlobCacheMetadata) GetAvailableHosts(ctx context.Context, removeHostCallback func(host *BlobCacheHost)) ([]*BlobCacheHost, error) {
291291
hostAddrs, err := m.rdb.SMembers(ctx, MetadataKeys.MetadataHostIndex()).Result()
292292
if err != nil {
293293
return nil, err
@@ -301,6 +301,7 @@ func (m *BlobCacheMetadata) GetAvailableHosts(ctx context.Context) ([]*BlobCache
301301
// If the keepalive key doesn't exist, remove the host index key
302302
if err == redis.Nil {
303303
m.RemoveHostFromIndex(ctx, &BlobCacheHost{Addr: addr})
304+
removeHostCallback(&BlobCacheHost{Addr: addr})
304305
}
305306

306307
continue

pkg/server.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ func NewCacheService(ctx context.Context, cfg BlobCacheConfig) (*CacheService, e
6060
if privateIpAddr != "" {
6161
Logger.Infof("Discovered private ip address: %s", privateIpAddr)
6262
}
63-
currentHost.PrivateAddr = privateIpAddr
63+
currentHost.PrivateAddr = fmt.Sprintf("%s:%d", privateIpAddr, cfg.Port)
6464
currentHost.CapacityUsagePct = 0
6565

6666
cas, err := NewContentAddressableStorage(ctx, currentHost, metadata, cfg)
@@ -101,7 +101,7 @@ func NewCacheService(ctx context.Context, cfg BlobCacheConfig) (*CacheService, e
101101
func (cs *CacheService) HostKeepAlive() {
102102
err := cs.metadata.AddHostToIndex(cs.ctx, cs.cas.currentHost)
103103
if err != nil {
104-
Logger.Fatalf("Failed to add host to index: %v", err)
104+
Logger.Fatalf("Failed to add host to index? %v", err)
105105
}
106106

107107
ticker := time.NewTicker(time.Duration(defaultHostKeepAliveIntervalS) * time.Second)
@@ -112,7 +112,7 @@ func (cs *CacheService) HostKeepAlive() {
112112
case <-cs.ctx.Done():
113113
return
114114
case <-ticker.C:
115-
cs.cas.currentHost.PrivateAddr = cs.privateIpAddr
115+
cs.cas.currentHost.PrivateAddr = fmt.Sprintf("%s:%d", cs.privateIpAddr, cs.cfg.Port)
116116
cs.cas.currentHost.CapacityUsagePct = cs.usagePct()
117117
cs.metadata.SetHostKeepAlive(cs.ctx, cs.cas.currentHost)
118118
}

0 commit comments

Comments
 (0)