Skip to content

Commit

Permalink
feat(runner/actions/mkvod): implement creating vod from live recording
Browse files Browse the repository at this point in the history
  • Loading branch information
joschahenningsen committed Feb 12, 2025
1 parent fb1836a commit adaa64a
Show file tree
Hide file tree
Showing 15 changed files with 655 additions and 199 deletions.
93 changes: 93 additions & 0 deletions go.work.sum

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions pkg/runner_manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,9 @@ func (m *Manager) Notify(ctx context.Context, notification *protobuf.Notificatio
log.Info("received stream end from runner")
// passing for now, not implemented.
return &protobuf.NotificationResponse{}, nil
case *protobuf.Notification_VodReady:
log.Info("vodReady", "payload", notification.GetVodReady())
return &protobuf.NotificationResponse{}, nil
default:
return nil, status.Error(codes.Unimplemented, "unsupported notification type")
}
Expand Down
10 changes: 10 additions & 0 deletions runner/commons.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
edition = "2023";
package protobuf;
option go_package = "runner/protobuf";

enum StreamVersion {
STREAM_VERSION_UNSPECIFIED = 0;
STREAM_VERSION_COMBINED = 1;
STREAM_VERSION_CAMERA = 2;
STREAM_VERSION_PRESENTATION = 3;
}
2 changes: 1 addition & 1 deletion runner/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ var Config struct {
GocastServer string `env:"GOCAST_SERVER" envDefault:"localhost:50056"`
Hostname string `env:"REALHOST" envDefault:"localhost"`
Version string `env:"VERSION" envDefault:"dev"`
EdgeServer string `env:"EDGE_SERVER" envDefault:"localhost:50057"`
EdgeServer string `env:"EDGE_SERVER" envDefault:"http://localhost:50057"`
}

func init() {
Expand Down
1 change: 1 addition & 0 deletions runner/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ func (r *Runner) RequestStream(ctx context.Context, req *protobuf.StreamRequest)
a := []actions.Action{
actions.Stream,
actions.StreamEnd,
actions.MkVOD,
}

jID := r.RunAction(a, data)
Expand Down
9 changes: 9 additions & 0 deletions runner/notifications.proto
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@ edition = "2023";
package protobuf;
option go_package = "runner/protobuf";

import "commons.proto";

message Notification {
oneof data {
StreamStartNotification stream_start = 1;
StreamEndNotification stream_end = 2;
HeartbeatNotification heartbeat = 3;
VODReadyNotification vod_ready = 4;
}
}

Expand All @@ -29,6 +32,12 @@ message HeartbeatNotification {
uint64 job_count = 3;
}

message VODReadyNotification {
StreamInfo stream = 1;
StreamVersion stream_version = 2;
string url = 3;
}

message NotificationResponse {

}
142 changes: 142 additions & 0 deletions runner/pkg/actions/mkvod.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
package actions

import (
"bufio"
"context"
"fmt"
"io"
"log/slog"
"net/url"
"os"
"path"
"strings"

log "github.com/sirupsen/logrus"

"github.com/tum-dev/gocast/runner/config"
"github.com/tum-dev/gocast/runner/pkg/metrics"
"github.com/tum-dev/gocast/runner/pkg/ptr"
"github.com/tum-dev/gocast/runner/protobuf"
)

// MkVOD takes a stream that was streamed and moves the hls stream to long term storage.
// Additionally, the playlist type is transformed from event to VOD.
func MkVOD(_ context.Context, _ *slog.Logger, notify chan *protobuf.Notification, d map[string]any, metrics *metrics.Broker) error {
streamID, ok := d["streamID"].(uint64)
if !ok {
return AbortingError(fmt.Errorf("no stream id in context"))
}
streamVersion, ok := d["streamVersion"].(string)
if !ok {
return AbortingError(fmt.Errorf("no stream end in context"))
}
recordingDir, ok := d["recordingDir"].(string)
if !ok {
return AbortingError(fmt.Errorf("no recordingDir in context"))
}

vodDir := path.Join(config.Config.StoragePath, fmt.Sprintf("%d", streamID), streamVersion)
err := os.MkdirAll(vodDir, os.ModePerm)
if err != nil {
return AbortingError(fmt.Errorf("create VOD directory: %w", err))
}

recordingContent, err := os.ReadDir(recordingDir)
if err != nil {
return AbortingError(fmt.Errorf("read recordingDir: %w", err))
}

for _, entry := range recordingContent {
if entry.IsDir() {
log.Warn("found dir in recordingDir, skipping", "name", entry.Name())
continue
}
if entry.Name() == "playlist.m3u8" {
srcPlst, err := os.Open(path.Join(recordingDir, entry.Name()))
if err != nil {
return AbortingError(fmt.Errorf("open recording playlist: %w", err))
}
srcPlstB, err := io.ReadAll(srcPlst)
if err != nil {
return AbortingError(fmt.Errorf("read recording playlist: %w", err))
}
dstPlstS := vodFromEventPlst(string(srcPlstB))
dstPlst, err := os.Create(path.Join(vodDir, entry.Name()))
if err != nil {
return AbortingError(fmt.Errorf("create vod playlist: %w", err))
}
_, err = io.WriteString(dstPlst, dstPlstS)
if err != nil {
return fmt.Errorf("write vod to playlist: %w", err)
}
_ = dstPlst.Close()
continue
}
err = copyFile(path.Join(recordingDir, entry.Name()), path.Join(vodDir, entry.Name()))
}

vodUrl, err := url.JoinPath(config.Config.EdgeServer, fmt.Sprintf("%d", streamID), streamVersion, "playlist.m3u8")
if err != nil {
return fmt.Errorf("join vod url: %w", err)
}
notify <- &protobuf.Notification{
Data: &protobuf.Notification_VodReady{
VodReady: &protobuf.VODReadyNotification{
Stream: &protobuf.StreamInfo{Id: ptr.Take(streamID)},
StreamVersion: ptr.Take(protobuf.StreamVersion(protobuf.StreamVersion_value[streamVersion])),
Url: ptr.Take(vodUrl),
},
},
}
return nil
}

func copyFile(sourcePath, destPath string) error {
inputFile, err := os.Open(sourcePath)
if err != nil {
return fmt.Errorf("open source file: %w", err)
}
defer inputFile.Close()

outputFile, err := os.Create(destPath)
if err != nil {
return fmt.Errorf("open dest file: %w", err)
}
defer outputFile.Close()

_, err = io.Copy(outputFile, inputFile)
if err != nil {
return fmt.Errorf("copy to dest from source: %w", err)
}
return nil
}

// vodFromEventPlst modifies an HLS playlist from EVENT to VOD
func vodFromEventPlst(playlist string) string {
var lines []string
hasEndlist := false

scanner := bufio.NewScanner(strings.NewReader(playlist))
for scanner.Scan() {
line := scanner.Text()

// Replace #EXT-X-PLAYLIST-TYPE:EVENT with #EXT-X-PLAYLIST-TYPE:VOD
if strings.Contains(line, "#EXT-X-PLAYLIST-TYPE:EVENT") {
line = "#EXT-X-PLAYLIST-TYPE:VOD"
}

// Check if #EXT-X-ENDLIST is already present
if line == "#EXT-X-ENDLIST" {
hasEndlist = true
}

lines = append(lines, line)
}

// Append #EXT-X-ENDLIST if missing
if !hasEndlist {
lines = append(lines, "#EXT-X-ENDLIST")
}

return strings.Join(lines, "\n")
}
26 changes: 26 additions & 0 deletions runner/pkg/actions/mkvod_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package actions

import "testing"

func TestConvertHLSPlaylist(t *testing.T) {
input := `#EXTM3U
#EXT-X-VERSION:3
#EXT-X-TARGETDURATION:2
#EXT-X-MEDIA-SEQUENCE:0
#EXT-X-PLAYLIST-TYPE:EVENT
#EXTINF:2.000000,
00000.ts`
expected := `#EXTM3U
#EXT-X-VERSION:3
#EXT-X-TARGETDURATION:2
#EXT-X-MEDIA-SEQUENCE:0
#EXT-X-PLAYLIST-TYPE:VOD
#EXTINF:2.000000,
00000.ts
#EXT-X-ENDLIST`

output := vodFromEventPlst(input)
if output != expected {
t.Errorf("Expected:\n%s\nGot:\n%s", expected, output)
}
}
4 changes: 2 additions & 2 deletions runner/pkg/actions/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func Stream(ctx context.Context, log *slog.Logger, notify chan *protobuf.Notific
if err != nil {
return AbortingError(err)
}
d["recording"] = liveRecDir
d["recordingDir"] = liveRecDir

notify <- &protobuf.Notification{
Data: &protobuf.Notification_StreamStart{
Expand Down Expand Up @@ -113,5 +113,5 @@ func logCmdPipe(log *slog.Logger, pipe io.ReadCloser, fields []any) {
for scanner.Scan() {
log.Info("ffmpeg log", append([]any{"msg", scanner.Text()}, fields...)...)
}
log.Info("ffmpeg logstream ended", fields)
log.Info("ffmpeg logstream ended", fields...)
}
4 changes: 2 additions & 2 deletions runner/pkg/metrics/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,14 @@ func NewBroker(options ...Option) *Broker {
Subsystem: "stream",
Name: "n_streams",
Help: "Number of active streams",
}, []string{"stream_id", "input"}),
}, []string{"stream_id", "source"}),

StreamErrors: promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "runner",
Subsystem: "stream",
Name: "n_errors",
Help: "Number of stream ffmpeg errors",
}, []string{"stream_id", "input"}),
}, []string{"stream_id", "source"}),
}
for _, option := range options {
option(b)
Expand Down
Loading

0 comments on commit adaa64a

Please sign in to comment.