Skip to content

Commit eb5e29c

Browse files
committed
Refactor cmd
1 parent 2a02474 commit eb5e29c

File tree

2 files changed

+154
-89
lines changed

2 files changed

+154
-89
lines changed

origin/blobserver/server.go

Lines changed: 1 addition & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -987,7 +987,7 @@ func (s *Server) commitClusterUploadHandler(w http.ResponseWriter, r *http.Reque
987987
if err := s.writeBack(namespace, d, 0); err != nil {
988988
return err
989989
}
990-
err = s.applyToReplicas(d, func(i int, client blobclient.Client) error {
990+
err = s.applyToReplicas(r.Context(), d, func(i int, client blobclient.Client) error {
991991
delay := s.config.DuplicateWriteBackStagger * time.Duration(i+1)
992992
f, err := s.cas.GetCacheFileReader(d.Hex())
993993
if err != nil {
@@ -1130,65 +1130,4 @@ func (s *Server) getRequestID(r *http.Request) string {
11301130
return "unknown"
11311131
}
11321132

1133-
// setOctetStreamContentType sets the content type to application/octet-stream
1134-
func setOctetStreamContentType(w http.ResponseWriter) {
1135-
w.Header().Set("Content-Type", "application/octet-stream")
1136-
}
1137-
1138-
// setContentLength sets the content length header
1139-
func setContentLength(w http.ResponseWriter, length int) {
1140-
w.Header().Set("Content-Length", strconv.Itoa(length))
1141-
}
1142-
1143-
// setUploadLocation sets the upload location header
1144-
func setUploadLocation(w http.ResponseWriter, uid string) {
1145-
w.Header().Set("Location", uid)
1146-
}
1147-
1148-
// blobExists checks if a blob exists in the cache
1149-
func blobExists(cas *store.CAStore, d core.Digest) (bool, error) {
1150-
_, err := cas.GetCacheFileStat(d.Hex())
1151-
if err == nil {
1152-
return true, nil
1153-
}
1154-
if os.IsNotExist(err) {
1155-
return false, nil
1156-
}
1157-
return false, err
1158-
}
11591133

1160-
// parseContentRange parses the content range header
1161-
func parseContentRange(headers http.Header) (start, end int64, err error) {
1162-
rangeHeader := headers.Get("Content-Range")
1163-
if rangeHeader == "" {
1164-
return 0, 0, fmt.Errorf("missing Content-Range header")
1165-
}
1166-
1167-
// Parse "bytes start-end/total" format
1168-
parts := strings.Split(rangeHeader, " ")
1169-
if len(parts) != 2 || parts[0] != "bytes" {
1170-
return 0, 0, fmt.Errorf("invalid Content-Range format")
1171-
}
1172-
1173-
rangeParts := strings.Split(parts[1], "/")
1174-
if len(rangeParts) != 2 {
1175-
return 0, 0, fmt.Errorf("invalid Content-Range format")
1176-
}
1177-
1178-
startEndParts := strings.Split(rangeParts[0], "-")
1179-
if len(startEndParts) != 2 {
1180-
return 0, 0, fmt.Errorf("invalid Content-Range format")
1181-
}
1182-
1183-
start, err = strconv.ParseInt(startEndParts[0], 10, 64)
1184-
if err != nil {
1185-
return 0, 0, fmt.Errorf("invalid start range: %s", err)
1186-
}
1187-
1188-
end, err = strconv.ParseInt(startEndParts[1], 10, 64)
1189-
if err != nil {
1190-
return 0, 0, fmt.Errorf("invalid end range: %s", err)
1191-
}
1192-
1193-
return start, end, nil
1194-
}

origin/cmd/cmd.go

Lines changed: 153 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
package cmd
1515

1616
import (
17+
"crypto/tls"
1718
"encoding/json"
1819
"flag"
1920
"fmt"
@@ -44,6 +45,7 @@ import (
4445

4546
"github.com/andres-erbsen/clock"
4647
"github.com/go-chi/chi"
48+
"github.com/jmoiron/sqlx"
4749
"github.com/uber-go/tally"
4850
"go.uber.org/zap"
4951
)
@@ -110,18 +112,43 @@ func WithLogger(l *zap.Logger) Option {
110112

111113
// Run runs the origin.
112114
func Run(flags *Flags, opts ...Option) {
115+
validateFlags(flags)
116+
117+
var overrides options
118+
for _, o := range opts {
119+
o(&overrides)
120+
}
121+
122+
config := setupConfiguration(flags, &overrides)
123+
logger := setupLogging(config, &overrides)
124+
defer func() {
125+
if logger != nil {
126+
logger.Sync()
127+
}
128+
}()
129+
130+
stats, statsCloser := setupMetrics(config, flags, &overrides)
131+
defer statsCloser()
132+
133+
hostname := setupHostname(flags)
134+
peerIP := setupPeerIP(flags)
135+
136+
components := setupCoreComponents(config, flags, hostname, peerIP, stats)
137+
server := setupBlobServer(config, flags, hostname, components, stats)
138+
139+
startServices(config, flags, server, components.scheduler)
140+
}
141+
142+
func validateFlags(flags *Flags) {
113143
if flags.PeerPort == 0 {
114144
panic("must specify non-zero peer port")
115145
}
116146
if flags.BlobServerPort == 0 {
117147
panic("must specify non-zero blob server port")
118148
}
149+
}
119150

120-
var overrides options
121-
for _, o := range opts {
122-
o(&overrides)
123-
}
124-
151+
func setupConfiguration(flags *Flags, overrides *options) Config {
125152
var config Config
126153
if overrides.config != nil {
127154
config = *overrides.config
@@ -135,26 +162,34 @@ func Run(flags *Flags, opts ...Option) {
135162
}
136163
}
137164
}
165+
return config
166+
}
138167

168+
func setupLogging(config Config, overrides *options) *zap.Logger {
139169
if overrides.logger != nil {
140170
log.SetGlobalLogger(overrides.logger.Sugar())
171+
return overrides.logger
141172
} else {
142173
zlog := log.ConfigureLogger(config.ZapLogging)
143-
defer zlog.Sync()
174+
return zlog.Desugar()
144175
}
176+
}
145177

146-
stats := overrides.metrics
147-
if stats == nil {
148-
s, closer, err := metrics.New(config.Metrics, flags.KrakenCluster)
149-
if err != nil {
150-
log.Fatalf("Failed to init metrics: %s", err)
151-
}
152-
stats = s
153-
defer closer.Close()
178+
func setupMetrics(config Config, flags *Flags, overrides *options) (tally.Scope, func()) {
179+
if overrides.metrics != nil {
180+
return overrides.metrics, func() {}
154181
}
155182

156-
go metrics.EmitVersion(stats)
183+
s, closer, err := metrics.New(config.Metrics, flags.KrakenCluster)
184+
if err != nil {
185+
log.Fatalf("Failed to init metrics: %s", err)
186+
}
157187

188+
go metrics.EmitVersion(s)
189+
return s, func() { closer.Close() }
190+
}
191+
192+
func setupHostname(flags *Flags) string {
158193
var hostname string
159194
if flags.BlobServerHostName == "" {
160195
var err error
@@ -166,36 +201,96 @@ func Run(flags *Flags, opts ...Option) {
166201
hostname = flags.BlobServerHostName
167202
}
168203
log.Infof("Configuring origin with hostname '%s'", hostname)
204+
return hostname
205+
}
169206

207+
func setupPeerIP(flags *Flags) string {
170208
if flags.PeerIP == "" {
171209
localIP, err := netutil.GetLocalIP()
172210
if err != nil {
173211
log.Fatalf("Error getting local ip: %s", err)
174212
}
175-
flags.PeerIP = localIP
213+
return localIP
176214
}
215+
return flags.PeerIP
216+
}
177217

218+
type coreComponents struct {
219+
cas *store.CAStore
220+
pctx core.PeerContext
221+
backendManager *backend.Manager
222+
writeBackManager persistedretry.Manager
223+
metaInfoGen *metainfogen.Generator
224+
blobRefresher *blobrefresh.Refresher
225+
scheduler scheduler.ReloadableScheduler
226+
hashRing hashring.Ring
227+
tls *tls.Config
228+
}
229+
230+
func setupCoreComponents(config Config, flags *Flags, hostname, peerIP string, stats tally.Scope) *coreComponents {
231+
cas := setupCAStore(config, stats)
232+
pctx := setupPeerContext(config, flags, peerIP)
233+
backendManager := setupBackendManager(config, stats)
234+
235+
localDB := setupLocalDB(config)
236+
writeBackManager := setupWriteBackManager(config, stats, cas, backendManager, localDB)
237+
metaInfoGen := setupMetaInfoGenerator(config, cas)
238+
blobRefresher := setupBlobRefresher(config, stats, cas, backendManager, metaInfoGen)
239+
240+
netevents := setupNetworkEvents(config)
241+
schedulerInstance := setupScheduler(config, stats, pctx, cas, netevents, blobRefresher)
242+
243+
cluster := setupCluster(config)
244+
tlsConfig := setupTLS(config)
245+
hashRing := setupHashRing(config, flags, hostname, cluster, tlsConfig, backendManager)
246+
247+
return &coreComponents{
248+
cas: cas,
249+
pctx: pctx,
250+
backendManager: backendManager,
251+
writeBackManager: writeBackManager,
252+
metaInfoGen: metaInfoGen,
253+
blobRefresher: blobRefresher,
254+
scheduler: schedulerInstance,
255+
hashRing: hashRing,
256+
tls: tlsConfig,
257+
}
258+
}
259+
260+
func setupCAStore(config Config, stats tally.Scope) *store.CAStore {
178261
cas, err := store.NewCAStore(config.CAStore, stats)
179262
if err != nil {
180263
log.Fatalf("Failed to create castore: %s", err)
181264
}
265+
return cas
266+
}
182267

268+
func setupPeerContext(config Config, flags *Flags, peerIP string) core.PeerContext {
183269
pctx, err := core.NewPeerContext(
184-
config.PeerIDFactory, flags.Zone, flags.KrakenCluster, flags.PeerIP, flags.PeerPort, true)
270+
config.PeerIDFactory, flags.Zone, flags.KrakenCluster, peerIP, flags.PeerPort, true)
185271
if err != nil {
186272
log.Fatalf("Failed to create peer context: %s", err)
187273
}
274+
return pctx
275+
}
188276

277+
func setupBackendManager(config Config, stats tally.Scope) *backend.Manager {
189278
backendManager, err := backend.NewManager(config.BackendManager, config.Backends, config.Auth, stats)
190279
if err != nil {
191280
log.Fatalf("Error creating backend manager: %s", err)
192281
}
282+
return backendManager
283+
}
193284

285+
func setupLocalDB(config Config) *sqlx.DB {
194286
localDB, err := localdb.New(config.LocalDB)
195287
if err != nil {
196288
log.Fatalf("Error creating local db: %s", err)
197289
}
290+
return localDB
291+
}
198292

293+
func setupWriteBackManager(config Config, stats tally.Scope, cas *store.CAStore, backendManager *backend.Manager, localDB *sqlx.DB) persistedretry.Manager {
199294
writeBackManager, err := persistedretry.NewManager(
200295
config.WriteBack,
201296
stats,
@@ -204,35 +299,55 @@ func Run(flags *Flags, opts ...Option) {
204299
if err != nil {
205300
log.Fatalf("Error creating write-back manager: %s", err)
206301
}
302+
return writeBackManager
303+
}
207304

305+
func setupMetaInfoGenerator(config Config, cas *store.CAStore) *metainfogen.Generator {
208306
metaInfoGenerator, err := metainfogen.New(config.MetaInfoGen, cas)
209307
if err != nil {
210308
log.Fatalf("Error creating metainfo generator: %s", err)
211309
}
310+
return metaInfoGenerator
311+
}
212312

213-
blobRefresher := blobrefresh.New(config.BlobRefresh, stats, cas, backendManager, metaInfoGenerator)
313+
func setupBlobRefresher(config Config, stats tally.Scope, cas *store.CAStore, backendManager *backend.Manager, metaInfoGen *metainfogen.Generator) *blobrefresh.Refresher {
314+
return blobrefresh.New(config.BlobRefresh, stats, cas, backendManager, metaInfoGen)
315+
}
214316

317+
func setupNetworkEvents(config Config) networkevent.Producer {
215318
netevents, err := networkevent.NewProducer(config.NetworkEvent)
216319
if err != nil {
217320
log.Fatalf("Error creating network event producer: %s", err)
218321
}
322+
return netevents
323+
}
219324

325+
func setupScheduler(config Config, stats tally.Scope, pctx core.PeerContext, cas *store.CAStore, netevents networkevent.Producer, blobRefresher *blobrefresh.Refresher) scheduler.ReloadableScheduler {
220326
sched, err := scheduler.NewOriginScheduler(
221327
config.Scheduler, stats, pctx, cas, netevents, blobRefresher)
222328
if err != nil {
223329
log.Fatalf("Error creating scheduler: %s", err)
224330
}
331+
return sched
332+
}
225333

334+
func setupCluster(config Config) hostlist.List {
226335
cluster, err := hostlist.New(config.Cluster)
227336
if err != nil {
228337
log.Fatalf("Error creating cluster host list: %s", err)
229338
}
339+
return cluster
340+
}
230341

342+
func setupTLS(config Config) *tls.Config {
231343
tls, err := config.TLS.BuildClient()
232344
if err != nil {
233345
log.Fatalf("Error building client tls config: %s", err)
234346
}
347+
return tls
348+
}
235349

350+
func setupHashRing(config Config, flags *Flags, hostname string, cluster hostlist.List, tls *tls.Config, backendManager *backend.Manager) hashring.Ring {
236351
healthCheckFilter := healthcheck.NewFilter(config.HealthCheck, healthcheck.Default(tls))
237352

238353
hashRing := hashring.New(
@@ -242,6 +357,7 @@ func Run(flags *Flags, opts ...Option) {
242357
hashring.WithWatcher(backend.NewBandwidthWatcher(backendManager)))
243358
go hashRing.Monitor(nil)
244359

360+
// Validate that this origin is in the hash ring
245361
addr := fmt.Sprintf("%s:%d", hostname, flags.BlobServerPort)
246362
if !hashRing.Contains(addr) {
247363
// When DNS is used for hash ring membership, the members will be IP
@@ -258,24 +374,34 @@ func Run(flags *Flags, opts ...Option) {
258374
}
259375
}
260376

377+
return hashRing
378+
}
379+
380+
func setupBlobServer(config Config, flags *Flags, hostname string, components *coreComponents, stats tally.Scope) *blobserver.Server {
381+
addr := fmt.Sprintf("%s:%d", hostname, flags.BlobServerPort)
382+
261383
server, err := blobserver.New(
262384
config.BlobServer,
263385
stats,
264386
clock.New(),
265387
addr,
266-
hashRing,
267-
cas,
268-
blobclient.NewProvider(blobclient.WithTLS(tls)),
269-
blobclient.NewClusterProvider(blobclient.WithTLS(tls)),
270-
pctx,
271-
backendManager,
272-
blobRefresher,
273-
metaInfoGenerator,
274-
writeBackManager)
388+
components.hashRing,
389+
components.cas,
390+
blobclient.NewProvider(blobclient.WithTLS(components.tls)),
391+
blobclient.NewClusterProvider(blobclient.WithTLS(components.tls)),
392+
components.pctx,
393+
components.backendManager,
394+
components.blobRefresher,
395+
components.metaInfoGen,
396+
components.writeBackManager)
275397
if err != nil {
276398
log.Fatalf("Error initializing blob server: %s", err)
277399
}
278400

401+
return server
402+
}
403+
404+
func startServices(config Config, flags *Flags, server *blobserver.Server, sched scheduler.ReloadableScheduler) {
279405
h := addTorrentDebugEndpoints(server.Handler(), sched)
280406

281407
go func() { log.Fatal(server.ListenAndServe(h)) }()

0 commit comments

Comments
 (0)