From 0d23f087d8236689c2de0ca370745430775dee74 Mon Sep 17 00:00:00 2001 From: Imre 'Rover' Racz Date: Mon, 18 Nov 2019 14:37:19 +0100 Subject: [PATCH] fix(core): Directories now ensured to exist and streams are not saved when failing to start --- Gopkg.lock | 10 +++++----- core/controller.go | 27 ++++++++------------------- core/streaming/processor.go | 1 + 3 files changed, 14 insertions(+), 24 deletions(-) diff --git a/Gopkg.lock b/Gopkg.lock index 2fa35a9..db740f6 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -110,19 +110,19 @@ [[projects]] branch = "master" - digest = "1:999c8e6b10ddb08bc4ea3a56d4e838194c4884eac6ecf90a9aa98ed8475fdf37" + digest = "1:3b81f20605df873608343956a1f707bc63fe76bbc9ab3adb644ce95b4502a525" name = "golang.org/x/sys" packages = ["unix"] pruneopts = "UT" - revision = "195ce5e7f934b923d8fc432cf5a2561f2b334da9" + revision = "b5d5184f72d2900cd61b7676edc45ad9300e26a9" [[projects]] - digest = "1:59f10c1537d2199d9115d946927fe31165959a95190849c82ff11e05803528b0" + digest = "1:f26a5d382387e03a40d1471dddfba85dfff9bf05352d7e42d37612677c4d3c5c" name = "gopkg.in/yaml.v2" packages = ["."] pruneopts = "UT" - revision = "f221b8435cfb71e54062f6c6e99e9ade30b124d5" - version = "v2.2.4" + revision = "f90ceb4f409096b60e2e9076b38b304b8246e5fa" + version = "v2.2.5" [solve-meta] analyzer-name = "dep" diff --git a/core/controller.go b/core/controller.go index bb80050..b59332a 100644 --- a/core/controller.go +++ b/core/controller.go @@ -125,6 +125,7 @@ func (c *Controller) StartStreamHandler(w http.ResponseWriter, r *http.Request, c.SendError(w, err, http.StatusBadRequest) return } + // Process streams that are already known in the system if index, ok := c.index[dto.URI]; ok { stream, ok := c.streams[index] if !ok { @@ -135,7 +136,10 @@ func (c *Controller) StartStreamHandler(w http.ResponseWriter, r *http.Request, c.handleAlreadyKnownStream(w, stream, c.spec, stream.StorePath) return } - streamResolved := c.startStream(dto.URI, c.spec) + // Process new streams + logrus.Infof("%s started processing", dto.URI) + stream, physicalPath, id := c.processor.NewStream(dto.URI) + streamResolved := c.manager.Start(stream.CMD, physicalPath) select { case <-time.After(c.timeout): c.SendError(w, ErrTimeout, http.StatusRequestTimeout) @@ -144,14 +148,9 @@ func (c *Controller) StartStreamHandler(w http.ResponseWriter, r *http.Request, c.SendError(w, ErrUnexpected, http.StatusInternalServerError) return } - index, ok := c.index[dto.URI] - if !ok { - logrus.Error("Did not find any index for ", dto.URI) - c.SendError(w, ErrUnexpected, http.StatusInternalServerError) - return - } - s := c.streams[index] - b, _ := json.Marshal(StreamDto{URI: s.Path}) + c.streams[id] = stream + c.index[dto.URI] = id + b, _ := json.Marshal(StreamDto{URI: stream.Path}) w.Header().Add("Content-Type", "application/json") w.Write(b) } @@ -256,16 +255,6 @@ func (c *Controller) FileHandler(w http.ResponseWriter, req *http.Request, ps ht s.Streak.Activate().Hit() } -// startStream creates a new stream then starts processing it with a manager -func (c *Controller) startStream(uri string, spec *config.Specification) chan bool { - logrus.Infof("%s started processing", uri) - stream, physicalPath, id := c.processor.NewStream(uri) - c.streams[id] = stream - c.index[uri] = id - ch := c.manager.Start(stream.CMD, physicalPath) - return ch -} - // marshalValidateURI is for validiting that the URI is in a valid format // and marshaling it into the dto pointer func (c *Controller) marshalValidatedURI(dto *StreamDto, body io.Reader) error { diff --git a/core/streaming/processor.go b/core/streaming/processor.go index 15fdede..f973400 100644 --- a/core/streaming/processor.go +++ b/core/streaming/processor.go @@ -59,6 +59,7 @@ func (p Processor) getHLSFlags() string { // NewProcess creates only the process for the stream func (p Processor) NewProcess(path, URI string) *exec.Cmd { + os.MkdirAll(path, os.ModePerm) cmd := exec.Command( "ffmpeg", "-y",