Skip to content
This repository has been archived by the owner on Jul 17, 2024. It is now read-only.

Commit

Permalink
Merge pull request #4 from Roverr/feature/maintainability
Browse files Browse the repository at this point in the history
Feature/maintainability
Roverr authored Dec 14, 2018
2 parents bad4167 + b18fce7 commit f0e6130
Showing 2 changed files with 76 additions and 67 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# <img src="./rtsp-stream.png"/>

[![Go Report Card](https://goreportcard.com/badge/github.com/Roverr/rtsp-stream)](https://goreportcard.com/report/github.com/Roverr/rtsp-stream)
[![Maintainability](https://api.codeclimate.com/v1/badges/202152e83296250ab527/maintainability)](https://codeclimate.com/github/Roverr/rtsp-stream/maintainability)


rtsp-stream is an easy to use out of box solution that can be integrated into existing systems resolving the problem of not being able to play rtsp stream natively in browsers.

141 changes: 74 additions & 67 deletions core/routing.go
Original file line number Diff line number Diff line change
@@ -4,6 +4,7 @@ import (
"encoding/json"
"errors"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/url"
@@ -63,7 +64,7 @@ func restartStream(spec *config.Specification, path string) error {
}

// getListStreamHandler returns the handler for the list endpoint
func getListStreamHandler() func(http.ResponseWriter, *http.Request, httprouter.Params) {
func getListStreamHandler() httprouter.Handle {
return func(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
dto := []*summariseDto{}
for key, stream := range streams {
@@ -79,92 +80,98 @@ func getListStreamHandler() func(http.ResponseWriter, *http.Request, httprouter.
}
}

// getStartStreamHandler returns an HTTP handler for the /start endpoint
func getStartStreamHandler(spec *config.Specification) func(http.ResponseWriter, *http.Request, httprouter.Params) {
return func(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
// Parse request
uri, err := ioutil.ReadAll(r.Body)
if err != nil {
http.Error(w, "Invalid body", 400)
return
}
var dto streamDto
if err = json.Unmarshal(uri, &dto); err != nil {
http.Error(w, "Invalid body", 400)
// validateURI is for validiting that the URI is in a valid format
func validateURI(dto *streamDto, body io.Reader) error {
// Parse request
uri, err := ioutil.ReadAll(body)
if err != nil {
return err
}
if err = json.Unmarshal(uri, dto); err != nil {
return err
}

if _, err := url.Parse(dto.URI); err != nil {
return errors.New("Invalid URI")
}
return nil
}

func startStream(uri, dir string, spec *config.Specification, streamRunning chan<- bool, errorIssued chan<- bool) {
logrus.Infof("%s started processing", dir)
cmd, path, physicalPath := streaming.NewProcess(uri, spec)
streams[dir] = streaming.Stream{
CMD: cmd,
Mux: &sync.Mutex{},
Path: fmt.Sprintf("/%s/index.m3u8", path),
Streak: hotstreak.New(hotstreak.Config{
Limit: 10,
HotWait: time.Minute * 2,
ActiveWait: time.Minute * 4,
}).Activate(),
OriginalURI: uri,
}
go func() {
for {
_, err := os.Stat(physicalPath)
if err != nil {
<-time.After(25 * time.Millisecond)
continue
}
streamRunning <- true
return
}
if dto.URI == "" {
http.Error(w, "Empty URI", 400)
}()
if err := cmd.Run(); err != nil {
logrus.Error(err)
errorIssued <- true
}
}

func handleAlreadyRunningStream(w http.ResponseWriter, s streaming.Stream, spec *config.Specification, dir string) {
// If transcoding is not running, spin it back up
if !s.Streak.IsActive() {
err := restartStream(spec, dir)
if err != nil {
logrus.Error(err)
http.Error(w, "Unexpected error", 500)
return
}
}
// If the stream is already running return its path
b, err := json.Marshal(streamDto{URI: s.Path})
if err != nil {
http.Error(w, "Unexpected error", 500)
return
}
w.Header().Add("Content-Type", "application/json")
w.Write(b)
}

if _, err := url.Parse(dto.URI); err != nil {
http.Error(w, "Invalid URI", 400)
// getStartStreamHandler returns an HTTP handler for the /start endpoint
func getStartStreamHandler(spec *config.Specification) httprouter.Handle {
return func(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
var dto streamDto
if err := validateURI(&dto, r.Body); err != nil {
http.Error(w, err.Error(), 400)
return
}

// Calculate directory from URI
dir, err := streaming.GetURIDirectory(dto.URI)
if err != nil {
logrus.Error(err)
http.Error(w, "Could not create directory for URI", 500)
return
}
if s, ok := streams[dir]; ok {
// If transcoding is not running, spin it back up
if !s.Streak.IsActive() {
err := restartStream(spec, dir)
if err != nil {
logrus.Error(err)
http.Error(w, "Unexpected error", 500)
return
}
}
// If the stream is already running return its path
b, err := json.Marshal(streamDto{URI: s.Path})
if err != nil {
http.Error(w, "Unexpected error", 500)
return
}
w.Header().Add("Content-Type", "application/json")
w.Write(b)
if stream, ok := streams[dir]; ok {
handleAlreadyRunningStream(w, stream, spec, dir)
return
}
streamRunning := make(chan bool)
defer close(streamRunning)
errorIssued := make(chan bool)
defer close(errorIssued)
go func() {
logrus.Infof("%s started processing", dir)
cmd, path, physicalPath := streaming.NewProcess(dto.URI, spec)
streams[dir] = streaming.Stream{
CMD: cmd,
Mux: &sync.Mutex{},
Path: fmt.Sprintf("/%s/index.m3u8", path),
Streak: hotstreak.New(hotstreak.Config{
Limit: 10,
HotWait: time.Minute * 2,
ActiveWait: time.Minute * 4,
}).Activate(),
OriginalURI: dto.URI,
}
go func() {
for {
_, err := os.Stat(physicalPath)
if err != nil {
<-time.After(25 * time.Millisecond)
continue
}
streamRunning <- true
return
}
}()
if err := cmd.Run(); err != nil {
logrus.Error(err)
errorIssued <- true
}
}()

startStream(dto.URI, dir, spec, streamRunning, errorIssued)
select {
case <-time.After(time.Second * 10):
http.Error(w, "Timeout error", 408)

0 comments on commit f0e6130

Please sign in to comment.