From bd60abdbf3aaeeac12f6388c13929908c64cbf3d Mon Sep 17 00:00:00 2001 From: Bartek Nowotarski Date: Tue, 18 Sep 2018 19:22:18 +0200 Subject: [PATCH] Improve stream logging, handle io.ErrUnexpectedEOF (#663) --- clients/horizon/client.go | 23 ++++++++++++++++------- 1 file changed, 16 insertions(+), 7 deletions(-) diff --git a/clients/horizon/client.go b/clients/horizon/client.go index b2df990c9e..a3f0cf713d 100644 --- a/clients/horizon/client.go +++ b/clients/horizon/client.go @@ -364,14 +364,14 @@ func (c *Client) stream( for { req, err := http.NewRequest("GET", fmt.Sprintf("%s?%s", baseURL, query.Encode()), nil) if err != nil { - return err + return errors.Wrap(err, "Error creating HTTP request") } req.Header.Set("Accept", "text/event-stream") // Make sure we don't use c.HTTP that can have Timeout set. resp, err := client.Do(req) if err != nil { - return err + return errors.Wrap(err, "Error sending HTTP request") } defer resp.Body.Close() @@ -399,9 +399,16 @@ func (c *Client) stream( line, err := reader.ReadString('\n') if err != nil { - if err == io.EOF { - // Currently Horizon appends a new line after the last event so this is not really - // needed. We have this code here in case this behaviour is changed in a future. + if err == io.EOF || err == io.ErrUnexpectedEOF { + // We catch EOF errors to handle two possible situations: + // - The last line before closing the stream was not empty. This should never + // happen in Horizon as it always sends an empty line after each event. + // - The stream was closed by the server/proxy because the connection was idle. + // + // In the former case, that (again) should never happen in Horizon, we need to + // check if there are any events we need to decode. We do this in the `if` + // statement below just in case if Horizon behaviour changes in a future. + // // From spec: // > Once the end of the file is reached, the user agent must dispatch the // > event one final time, as defined below. @@ -409,7 +416,7 @@ func (c *Client) stream( break Events } } else { - return err + return errors.Wrap(err, "Error reading line") } } @@ -424,7 +431,7 @@ func (c *Client) stream( events, err := sse.Decode(strings.NewReader(buffer.String())) if err != nil { - return err + return errors.Wrap(err, "Error decoding event") } // Right now len(events) should always be 1. This loop will be helpful after writing @@ -442,8 +449,10 @@ func (c *Client) stream( switch data := event.Data.(type) { case string: err = handler([]byte(data)) + err = errors.Wrap(err, "Handler error") case []byte: err = handler(data) + err = errors.Wrap(err, "Handler error") default: err = errors.New("Invalid event.Data type") }