Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 9 additions & 12 deletions caster.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,21 +30,18 @@ type Caster struct {

// NewCaster constructs a Caster, setting up the Handler and timeouts - run using ListenAndServe()
// TODO: Consider not constructing the http.Server, and leaving Caster as a http.Handler
// Then the caller can create other routes on the server, such as (for example) a /health endpoint,
// or a /stats endpoint - Though those could instead be run on separate http.Server's
// Also, middleware can be added to a Caster by doing `c.Handler = someMiddleware(c.Handler)`
// Then the caller can create other routes on the server, such as (for example) a /health endpoint,
// or a /stats endpoint - Though those could instead be run on separate http.Server's
// Also, middleware can be added to a Caster by doing `c.Handler = someMiddleware(c.Handler)`
func NewCaster(addr string, svc SourceService, logger logrus.FieldLogger) *Caster {
return &Caster{
http.Server{
Addr: addr,
Handler: getHandler(svc, logger),
IdleTimeout: 10 * time.Second,
// Read timeout kills publishing connections because they don't necessarily read from
// the response body
//ReadTimeout: 10 * time.Second,
// Write timeout kills subscriber connections because they don't write to the request
// body
//WriteTimeout: 10 * time.Second,
Addr: addr,
Handler: getHandler(svc, logger),
IdleTimeout: 10 * time.Second,
ReadHeaderTimeout: 10 * time.Second,
// Read and Write timeouts are controlled by the Handler functions due to NTRIP requests
// being long-lived
},
}
}
Expand Down
2 changes: 1 addition & 1 deletion caster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
// TODO: Test failure cases with httptest.Server

// Test running Caster with mock service using httptest.Server, which is close to actually calling
// caster.ListenAndServe(), write data with v2 server and read with v2 and v1 clients
// caster.ListenAndServe(), write data with v2 server and read with v2 and v1 clients
func TestCasterServerClient(t *testing.T) {
caster := ntrip.NewCaster("N/A", mock.NewMockSourceService(), logrus.StandardLogger())
ts := httptest.NewServer(caster.Handler)
Expand Down
46 changes: 26 additions & 20 deletions handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,18 @@ import (
"context"
"fmt"
"io"
"net"
"net/http"
"strings"
"time"

"github.com/sirupsen/logrus"
)

// handler is used by Caster, and is an instance of a request being handled with methods
// for handing v1 and v2 requests
// TODO: Better name - the http.Handler constructs this and uses it's methods for handling
// requests (so the word "handle" is a bit overloaded)
// requests (so the word "handle" is a bit overloaded)
// TODO: Separate package (in internal)?
type handler struct {
svc SourceService
Expand Down Expand Up @@ -65,13 +67,13 @@ func (h *handler) handleRequestV1(w http.ResponseWriter, r *http.Request) {

switch r.Method {
case http.MethodGet:
h.handleGetMountV1(rw, r)
h.handleGetMountV1(rw, r, conn)
default:
w.WriteHeader(http.StatusNotImplemented)
}
}

func (h *handler) handleGetSourcetableV1(w *bufio.ReadWriter, r *http.Request) {
func (h *handler) handleGetSourcetableV1(w *bufio.ReadWriter, _ *http.Request) {
st := h.svc.GetSourcetable()
_, err := fmt.Fprintf(w, "SOURCETABLE 200 OK\r\nConnection: close\r\nContent-Type: text/plain\r\nContent-Length: %d\r\n\r\n%s", len(st.String()), st)
if err != nil {
Expand All @@ -87,7 +89,7 @@ func (h *handler) handleGetSourcetableV1(w *bufio.ReadWriter, r *http.Request) {
h.logger.Info("sourcetable written to client")
}

func (h *handler) handleGetMountV1(w *bufio.ReadWriter, r *http.Request) {
func (h *handler) handleGetMountV1(w *bufio.ReadWriter, r *http.Request, conn net.Conn) {
username, password, _ := r.BasicAuth()
sub, err := h.svc.Subscriber(r.Context(), r.URL.Path[1:], username, password)
if err != nil {
Expand Down Expand Up @@ -115,7 +117,7 @@ func (h *handler) handleGetMountV1(w *bufio.ReadWriter, r *http.Request) {
}
h.logger.Infof("accepted request")

err = write(r.Context(), sub, w, w.Flush)
err = write(r.Context(), sub, w, w.Flush, conn)
h.logger.Infof("connection closed with reason: %s", err)
}

Expand Down Expand Up @@ -154,7 +156,7 @@ func (h *handler) handleRequestV2(w http.ResponseWriter, r *http.Request) {
}
}

func (h *handler) handleGetSourcetableV2(w http.ResponseWriter, r *http.Request) {
func (h *handler) handleGetSourcetableV2(w http.ResponseWriter, _ *http.Request) {
// TODO: Implement sourcetable filtering support
st := h.svc.GetSourcetable().String()
w.Header().Add("Content-Length", fmt.Sprint(len(st)))
Expand Down Expand Up @@ -202,41 +204,45 @@ func (h *handler) handleGetMountV2(w http.ResponseWriter, r *http.Request) error

w.Header().Add("Content-Type", "gnss/data")
// Flush response headers before sending data to client, default status code is 200
// TODO: Don't necessarily need to do this, since the first data written to client will flush
w.(http.Flusher).Flush()
h.logger.Infof("accepted request")

// bufio.ReadWriter's Flush method (used by v1 handler) returns error so does not satisfy the
// http.Flusher interface
flush := func() error {
// TODO: Check if cast succeeds and return error if not
w.(http.Flusher).Flush()
return nil
}
rc := http.NewResponseController(w)

err = write(r.Context(), sub, w, flush)
err = write(r.Context(), sub, w, rc.Flush, rc)
// Duplicating connection closed message here to avoid superfluous calls to WriteHeader
h.logger.Infof("connection closed with reason: %s", err)
return nil
}

// interface for the different methods of setting a write deadline
type writeDeadliner interface {
SetWriteDeadline(time.Time) error
}

// Used by the GET handlers to read data from Subscriber channel and write to client writer
// TODO: Better name
func write(ctx context.Context, c chan []byte, w io.Writer, flush func() error) error {
func write(ctx context.Context, c chan []byte, w io.Writer, flush func() error, d writeDeadliner) error {
for {
select {
case data, ok := <-c:
if !ok {
return fmt.Errorf("subscriber channel closed")
}

if err := d.SetWriteDeadline(time.Now().Add(10 * time.Second)); err != nil {
return fmt.Errorf("error setting write deadline: %w", err)
}

if _, err := w.Write(data); err != nil {
return err
return fmt.Errorf("error writing: %w", err)
}

if err := flush(); err != nil {
return err
return fmt.Errorf("error flushing: %w", err)
}

case <-ctx.Done():
return fmt.Errorf("client disconnect")
return fmt.Errorf("client disconnect: %w", ctx.Err())
}
}
}
Expand Down
9 changes: 7 additions & 2 deletions handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ package ntrip_test

import (
"bufio"
"crypto/rand"
"fmt"
"io"
"math/rand"
"net"
"net/http"
"net/http/httptest"
Expand All @@ -29,7 +29,8 @@ func init() {
// interface which is needed to test NTRIP v1 requests
// TODO: Move to another package?
// TODO: This doesn't prevent the server from writing to the original response Body, which
// http.Server would do for a real request - this case is tested by caster_test.go
//
// http.Server would do for a real request - this case is tested by caster_test.go
type HijackableResponseRecorder struct {
*httptest.ResponseRecorder
}
Expand All @@ -40,6 +41,10 @@ func (h *HijackableResponseRecorder) Hijack() (net.Conn, *bufio.ReadWriter, erro
return conn, rw, nil
}

func (h *HijackableResponseRecorder) SetWriteDeadline(time.Time) error {
return nil
}

func TestCasterHandlers(t *testing.T) {
v2Sourcetable := mock.NewMockSourceService().Sourcetable.String()
v1Sourcetable := fmt.Sprintf("SOURCETABLE 200 OK\r\nConnection: close\r\nContent-Type: text/plain\r\nContent-Length: %d\r\n\r\n%s", len(v2Sourcetable), v2Sourcetable)
Expand Down