forked from livepeer/catalyst-api
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.go
274 lines (238 loc) · 12.2 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
package main
import (
"context"
"crypto/rsa"
"database/sql"
"flag"
"fmt"
"math/rand"
"os"
"os/signal"
"syscall"
"time"
"github.com/golang/glog"
_ "github.com/lib/pq"
"github.com/livepeer/catalyst-api/api"
"github.com/livepeer/catalyst-api/balancer"
"github.com/livepeer/catalyst-api/clients"
"github.com/livepeer/catalyst-api/cluster"
"github.com/livepeer/catalyst-api/config"
"github.com/livepeer/catalyst-api/crypto"
"github.com/livepeer/catalyst-api/handlers/misttriggers"
mistapiconnector "github.com/livepeer/catalyst-api/mapic"
"github.com/livepeer/catalyst-api/middleware"
"github.com/livepeer/catalyst-api/pipeline"
"github.com/livepeer/livepeer-data/pkg/mistconnector"
"github.com/peterbourgon/ff/v3"
"golang.org/x/sync/errgroup"
)
func main() {
err := flag.Set("logtostderr", "true")
if err != nil {
glog.Fatal(err)
}
vFlag := flag.Lookup("v")
fs := flag.NewFlagSet("catalyst-api", flag.ExitOnError)
cli := config.Cli{}
version := fs.Bool("version", false, "print application version")
// listen addresses
config.AddrFlag(fs, &cli.HTTPAddress, "http-addr", "0.0.0.0:8989", "Address to bind for external-facing Catalyst HTTP handling")
config.AddrFlag(fs, &cli.HTTPInternalAddress, "http-internal-addr", "127.0.0.1:7979", "Address to bind for internal privileged HTTP commands")
config.AddrFlag(fs, &cli.ClusterAddress, "cluster-addr", "0.0.0.0:9935", "Address to bind Serf network listeners to. To use an IPv6 address, specify [::1] or [::1]:7946.")
fs.StringVar(&cli.ClusterAdvertiseAddress, "cluster-advertise-addr", "", "Address to advertise to the other cluster members")
// catalyst-api parameters
fs.StringVar(&cli.APIToken, "api-token", "IAmAuthorized", "Auth header value for API access")
fs.StringVar(&cli.SourceOutput, "source-output", "", "URL for the video source segments used if source_segments is not defined in the upload request")
config.URLVarFlag(fs, &cli.PrivateBucketURL, "private-bucket", "", "URL for the private media bucket")
fs.StringVar(&cli.ExternalTranscoder, "external-transcoder", "", "URL for the external transcoder to be used by the pipeline coordinator. Only 1 implementation today for AWS MediaConvert which should be in the format: mediaconvert://key-id:key-secret@endpoint-host?region=aws-region&role=iam-role&s3_aux_bucket=s3://bucket")
fs.StringVar(&cli.VodPipelineStrategy, "vod-pipeline-strategy", string(pipeline.StrategyCatalystFfmpegDominance), "Which strategy to use for the VOD pipeline")
fs.StringVar(&cli.RecordingCallback, "recording", "http://recording.livepeer.com/recording/status", "Callback URL for recording start&stop events")
fs.StringVar(&cli.MetricsDBConnectionString, "metrics-db-connection-string", "", "Connection string to use for the metrics Postgres DB. Takes the form: host=X port=X user=X password=X dbname=X")
config.URLSliceVarFlag(fs, &cli.ImportIPFSGatewayURLs, "import-ipfs-gateway-urls", "https://vod-import-gtw.mypinata.cloud/ipfs/?pinataGatewayToken={{secrets.LP_PINATA_GATEWAY_TOKEN}},https://w3s.link/ipfs/,https://ipfs.io/ipfs/,https://cloudflare-ipfs.com/ipfs/", "Comma delimited ordered list of IPFS gateways (includes /ipfs/ suffix) to import assets from")
config.URLSliceVarFlag(fs, &cli.ImportArweaveGatewayURLs, "import-arweave-gateway-urls", "https://arweave.net/", "Comma delimited ordered list of arweave gateways")
fs.BoolVar(&cli.MistCleanup, "run-mist-cleanup", true, "Run mist cleanup script")
// mist-api-connector parameters
fs.IntVar(&cli.MistPort, "mist-port", 4242, "Port to connect to Mist")
fs.StringVar(&cli.MistHost, "mist-host", "127.0.0.1", "Hostname of the Mist server")
fs.StringVar(&cli.MistUser, "mist-user", "", "username of MistServer")
fs.StringVar(&cli.MistPassword, "mist-password", "", "password of MistServer")
fs.DurationVar(&cli.MistConnectTimeout, "mist-connect-timeout", 5*time.Minute, "Max time to wait attempting to connect to Mist server")
fs.StringVar(&cli.MistStreamSource, "mist-stream-source", "push://", "Stream source we should use for created Mist stream")
fs.StringVar(&cli.MistHardcodedBroadcasters, "mist-hardcoded-broadcasters", "", "Hardcoded broadcasters for use by MistProcLivepeer")
config.InvertedBoolFlag(fs, &cli.MistScrapeMetrics, "mist-scrape-metrics", true, "Scrape statistics from MistServer and publish to RabbitMQ")
fs.StringVar(&cli.MistSendAudio, "send-audio", "record", "when should we send audio? {always|never|record}")
fs.StringVar(&cli.MistBaseStreamName, "mist-base-stream-name", "", "Base stream name to be used in wildcard-based routing scheme")
fs.StringVar(&cli.APIServer, "api-server", "", "Livepeer API server to use")
fs.StringVar(&cli.AMQPURL, "amqp-url", "", "RabbitMQ url")
fs.StringVar(&cli.OwnRegion, "own-region", "", "Identifier of the region where the service is running, used for mapping external data back to current region")
fs.StringVar(&cli.StreamHealthHookURL, "stream-health-hook-url", "http://localhost:3004/api/stream/hook/health", "Address to POST stream health payloads to (response is ignored)")
// catalyst-node parameters
hostname, _ := os.Hostname()
fs.StringVar(&cli.NodeName, "node", hostname, "Name of this node within the cluster")
config.SpaceSliceFlag(fs, &cli.BalancerArgs, "balancer-args", []string{}, "arguments passed to MistUtilLoad")
fs.StringVar(&cli.NodeHost, "node-host", "", "Hostname this node should handle requests for. Requests on any other domain will trigger a redirect. Useful as a 404 handler to send users to another node.")
fs.Float64Var(&cli.NodeLatitude, "node-latitude", 0, "Latitude of this Catalyst node. Used for load balancing.")
fs.Float64Var(&cli.NodeLongitude, "node-longitude", 0, "Longitude of this Catalyst node. Used for load balancing.")
config.CommaSliceFlag(fs, &cli.RedirectPrefixes, "redirect-prefixes", []string{}, "Set of valid prefixes of playback id which are handled by mistserver")
config.CommaMapFlag(fs, &cli.Tags, "tags", map[string]string{"node": "media"}, "Serf tags for Catalyst nodes")
fs.IntVar(&cli.MistLoadBalancerPort, "mist-load-balancer-port", rand.Intn(10000)+40000, "MistUtilLoad port (default random)")
fs.StringVar(&cli.MistLoadBalancerTemplate, "mist-load-balancer-template", "http://%s:4242", "template for specifying the host that should be queried for Prometheus stat output for this node")
config.CommaSliceFlag(fs, &cli.RetryJoin, "retry-join", []string{}, "An agent to join with. This flag be specified multiple times. Does not exit on failure like -join, used to retry until success.")
fs.StringVar(&cli.EncryptKey, "encrypt", "", "Key for encrypting network traffic within Serf. Must be a base64-encoded 32-byte key.")
fs.StringVar(&cli.VodDecryptPublicKey, "catalyst-public-key", "", "Public key of the catalyst node for encryption")
fs.StringVar(&cli.VodDecryptPrivateKey, "catalyst-private-key", "", "Private key of the catalyst node for encryption")
fs.StringVar(&cli.GateURL, "gate-url", "http://localhost:3004/api/access-control/gate", "Address to contact playback gating API for access control verification")
// special parameters
mistJson := fs.Bool("j", false, "Print application info as JSON. Used by Mist to present flags in its UI.")
verbosity := fs.String("v", "", "Log verbosity. {4|5|6}")
_ = fs.String("config", "", "config file (optional)")
err = ff.Parse(fs, os.Args[1:],
ff.WithConfigFileFlag("config"),
ff.WithConfigFileParser(ff.PlainParser),
ff.WithEnvVarPrefix("CATALYST_API"),
)
if err != nil {
glog.Fatalf("error parsing cli: %s", err)
}
cli.ParseLegacyEnv()
if len(fs.Args()) > 0 {
glog.Fatalf("unexpected extra arguments on command line: %v", fs.Args())
}
err = flag.CommandLine.Parse(nil)
if err != nil {
glog.Fatal(err)
}
if *version {
fmt.Printf("catalyst-api version: %s", config.Version)
return
}
if *verbosity != "" {
err = vFlag.Value.Set(*verbosity)
if err != nil {
glog.Fatal(err)
}
}
if *mistJson {
mistconnector.PrintMistConfigJson("catalyst-api", "HTTP API server for translating Catalyst API requests into Mist calls", "Catalyst API", config.Version, fs)
return
}
// TODO: I don't love the global variables for these
config.ImportIPFSGatewayURLs = cli.ImportIPFSGatewayURLs
config.ImportArweaveGatewayURLs = cli.ImportArweaveGatewayURLs
config.RecordingCallback = cli.RecordingCallback
config.PrivateBucketURL = cli.PrivateBucketURL
config.HTTPInternalAddress = cli.HTTPInternalAddress
var (
metricsDB *sql.DB
)
// Kick off the callback client, to send job update messages on a regular interval
headers := map[string]string{"Authorization": fmt.Sprintf("Bearer %s", cli.APIToken)}
statusClient := clients.NewPeriodicCallbackClient(15*time.Second, headers).Start()
// Emit high-cardinality metrics to a Postrgres database if configured
if cli.MetricsDBConnectionString != "" {
metricsDB, err = sql.Open("postgres", cli.MetricsDBConnectionString)
if err != nil {
glog.Fatalf("Error creating postgres metrics connection: %v", err)
}
} else {
glog.Info("Postgres metrics connection string was not set, postgres metrics are disabled.")
}
var vodDecryptPrivateKey *rsa.PrivateKey
if cli.VodDecryptPrivateKey != "" && cli.VodDecryptPublicKey != "" {
vodDecryptPrivateKey, err = crypto.LoadPrivateKey(cli.VodDecryptPrivateKey)
if err != nil {
glog.Fatalf("Error loading vod decrypt private key: %v", err)
}
isValidKeyPair, err := crypto.ValidateKeyPair(cli.VodDecryptPublicKey, *vodDecryptPrivateKey)
if !isValidKeyPair || err != nil {
glog.Fatalf("Invalid vod decrypt key pair")
}
}
// Start the "co-ordinator" that determines whether to send jobs to the Catalyst transcoding pipeline
// or an external one
vodEngine, err := pipeline.NewCoordinator(pipeline.Strategy(cli.VodPipelineStrategy), cli.SourceOutput, cli.ExternalTranscoder, statusClient, metricsDB, vodDecryptPrivateKey)
if err != nil {
glog.Fatalf("Error creating VOD pipeline coordinator: %v", err)
}
if cli.ShouldMistCleanup() {
// Start cron style apps to run periodically
app := "mist-cleanup.sh"
// schedule mist-cleanup every 2hrs with a timeout of 15min
mistCleanup, err := middleware.NewShell(2*60*60*time.Second, 15*60*time.Second, app)
if err != nil {
glog.Info("Failed to shell out:", app, err)
}
mistCleanupTick := mistCleanup.RunBg()
defer mistCleanupTick.Stop()
}
broker := misttriggers.NewTriggerBroker()
var mapic mistapiconnector.IMac
if cli.ShouldMapic() {
mapic = mistapiconnector.NewMapic(&cli, broker)
}
// Start balancer
bal := balancer.NewBalancer(&balancer.Config{
Args: cli.BalancerArgs,
MistUtilLoadPort: uint32(cli.MistLoadBalancerPort),
MistLoadBalancerTemplate: cli.MistLoadBalancerTemplate,
MistHost: cli.MistHost,
MistPort: cli.MistPort,
NodeName: cli.NodeName,
})
c := cluster.NewCluster(&cli)
// Initialize root context; cancelling this prompts all components to shut down cleanly
group, ctx := errgroup.WithContext(context.Background())
group.Go(func() error {
return handleSignals(ctx)
})
group.Go(func() error {
return api.ListenAndServe(ctx, cli, vodEngine, bal, c)
})
group.Go(func() error {
return api.ListenAndServeInternal(ctx, cli, vodEngine, mapic, bal, c, broker)
})
if cli.ShouldMapic() {
group.Go(func() error {
return mapic.Start(ctx)
})
}
group.Go(func() error {
return bal.Start(ctx)
})
group.Go(func() error {
return c.Start(ctx)
})
group.Go(func() error {
return reconcileBalancer(ctx, bal, c)
})
err = group.Wait()
glog.Infof("Shutdown complete. Reason for shutdown: %s", err)
}
// Eventually this will be the main loop of the state machine, but we just have one variable right now.
func reconcileBalancer(ctx context.Context, bal balancer.Balancer, c cluster.Cluster) error {
memberCh := c.MemberChan()
for {
select {
case <-ctx.Done():
return nil
case list := <-memberCh:
err := bal.UpdateMembers(ctx, list)
if err != nil {
return fmt.Errorf("failed to update load balancer from member list: %w", err)
}
}
}
}
func handleSignals(ctx context.Context) error {
c := make(chan os.Signal, 1)
signal.Notify(c, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT)
for {
select {
case s := <-c:
glog.Errorf("caught signal=%v, attempting clean shutdown", s)
return fmt.Errorf("caught signal=%v", s)
case <-ctx.Done():
return nil
}
}
}