Skip to content

Commit

Permalink
Improve stream logging, handle io.ErrUnexpectedEOF (#663)
Browse files Browse the repository at this point in the history
  • Loading branch information
bartekn authored Sep 18, 2018
1 parent eddcbf0 commit bd60abd
Showing 1 changed file with 16 additions and 7 deletions.
23 changes: 16 additions & 7 deletions clients/horizon/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,14 +364,14 @@ func (c *Client) stream(
for {
req, err := http.NewRequest("GET", fmt.Sprintf("%s?%s", baseURL, query.Encode()), nil)
if err != nil {
return err
return errors.Wrap(err, "Error creating HTTP request")
}
req.Header.Set("Accept", "text/event-stream")

// Make sure we don't use c.HTTP that can have Timeout set.
resp, err := client.Do(req)
if err != nil {
return err
return errors.Wrap(err, "Error sending HTTP request")
}
defer resp.Body.Close()

Expand Down Expand Up @@ -399,17 +399,24 @@ func (c *Client) stream(

line, err := reader.ReadString('\n')
if err != nil {
if err == io.EOF {
// Currently Horizon appends a new line after the last event so this is not really
// needed. We have this code here in case this behaviour is changed in a future.
if err == io.EOF || err == io.ErrUnexpectedEOF {
// We catch EOF errors to handle two possible situations:
// - The last line before closing the stream was not empty. This should never
// happen in Horizon as it always sends an empty line after each event.
// - The stream was closed by the server/proxy because the connection was idle.
//
// In the former case, that (again) should never happen in Horizon, we need to
// check if there are any events we need to decode. We do this in the `if`
// statement below just in case if Horizon behaviour changes in a future.
//
// From spec:
// > Once the end of the file is reached, the user agent must dispatch the
// > event one final time, as defined below.
if nonEmptylinesRead == 0 {
break Events
}
} else {
return err
return errors.Wrap(err, "Error reading line")
}
}

Expand All @@ -424,7 +431,7 @@ func (c *Client) stream(

events, err := sse.Decode(strings.NewReader(buffer.String()))
if err != nil {
return err
return errors.Wrap(err, "Error decoding event")
}

// Right now len(events) should always be 1. This loop will be helpful after writing
Expand All @@ -442,8 +449,10 @@ func (c *Client) stream(
switch data := event.Data.(type) {
case string:
err = handler([]byte(data))
err = errors.Wrap(err, "Handler error")
case []byte:
err = handler(data)
err = errors.Wrap(err, "Handler error")
default:
err = errors.New("Invalid event.Data type")
}
Expand Down

0 comments on commit bd60abd

Please sign in to comment.