Skip to content
This repository has been archived by the owner on Mar 11, 2023. It is now read-only.

Streaming API modification proposal #242

Closed
lmyslinski opened this issue Jun 10, 2019 · 9 comments
Closed

Streaming API modification proposal #242

lmyslinski opened this issue Jun 10, 2019 · 9 comments

Comments

@lmyslinski
Copy link
Contributor

lmyslinski commented Jun 10, 2019

Hi,
first of all, thanks for building this library. It's awesome. With that said, I have a number of problems with the streaming API. I'm trying to build a service that is supposed to consume tweets 24/7, and the design of the API makes it really hard to handle any issues.

Twitter describes a pretty large collection of errors and reasons for shutting down a stream. The user of the API has no reasonable way to react to that. Let's say we use the method filterStatuses. It returns a Future[TwitterStream]that returns Success as soon as the connection is established. All events are processed within a partial function. Let's say the stream receives a DisconnectMessage - the application can only respond to that by using side effects. What is more, type of the error received impacts the retry operation that should be performed. Some errors require immediate retry, others should be handled with backoff strategy. I found about all of this after seeing that my stream has been running for days but has not been receiving any new messages.

Therefore I would like to propose one of two solutions:

  • Changing the result of streaming functions to something like ([Future[TwitterStreamHandle], Future[StreamResult]]). This would allow the end user to manually close the stream, as it is right now, as well react to errors that occur in a functional manner. I would also supply an example with proper retry strategies that would keep the stream working.
  • Better yet, I believe that the retry behaviour should be integrated into the library itself. The backoff/instant retry based on error messages will require a bit of work to get right. I don't see a need for the end user to implement this on his own. In that case we could just add a simple flag parameter autoHandleStreamErrors that would enable the retry mechanism in case of any issues.

Please let me know what you think about this, I'm happy to implement either of these solutions.

@DanielaSfregola
Copy link
Owner

Hi @Humblehound! Thank you for the nice words!

Yes I totally agree with you: the Streaming API is far from perfect and we need to dedicate some proper love to it (I have other tickets related to this, I will try to link them here later on).

I'll try to think of this and come back to you by tomorrow so that we can finalize an API. Ideally we need to come up with an API that looks simple but it is flexible enough to make it configurable.

Maybe trying to have some snippets of code on how this could look could help in making a decision?

Let's brainstorm! :D

@lmyslinski
Copy link
Contributor Author

I'd be happy to! So I've tried implementing the first approach, I don't really like it since a tuple of two futures is not a monad so you can't map over it... I think it would be best to see how this could be implemented and work up towards what the API could look like. To start - I think this fragment is crucial:

val killSwitch = KillSwitches.shared(s"twitter4s-${UUID.randomUUID}")

    val processing = Source
      .single(request)
      .via(connection)
      .flatMapConcat {
        case response if response.status.isSuccess =>
          successResponse = true
          processBody(response, killSwitch)(f, errorHandler)
          if (!successResponse) {
            Source.single(StreamResult())
          } else {
            Source.empty
          }
        case failureResponse =>
          val statusCode = failureResponse.status
          val msg = "Stream could not be opened"
          parseFailedResponse(failureResponse).map(ex => logger.error(s"$msg: $ex"))
          Source.failed(TwitterException(statusCode, s"$msg. Check the logs for more details"))
      }
      .runWith(Sink.head)
//      .map(x => (x, killSwitch))

    val switch = Future {
      val pullingTimeMillis = 250
      while (!successResponse) { Thread.sleep(pullingTimeMillis) }
      killSwitch
    }
    (switch, processing)
//    Future.firstCompletedOf(Seq(processing, switch))
}
...
case class StreamResult()

As you can see I've modified it slightly already. I've used akka-streams a couple of times, but to date I still think it's the most complex library I've used. The most needed part is extracting the failure information from the stream. Here I've used the existing flag as a placeholder for event parsing information. The complete flow could look like this:

  • On each event, before or after applying the partial function f we search for error messsages. If one is found, a StreamResult is emitted from the Stream. If no error is found, nothing is emitted (as it is currently implemented).
  • Now that we know that our stream has stopped working, we can:
    • emit a new request down the stream
    • close the stream and start a new one (possibly the first option would be better)
    • kill it and leave it up to the API user to create a new stream
    • ??

The main problem for me seems to be exposing the killswitch to the user alongside processing result. I'm wondering if we could lift the killswitch up in the hierarchy - there is no need to have it as low. We definitely need to make sure that one killswitch is always bound to one stream. We could modify the control flow - return to the user an object that contains the killswitch without the spawned stream. Then this object could have a method for running the actual stream, while the killswitch would already be available to the user. Example:

def filterStatuses(params: String): TwitterStream = new TwitterStream(params)

  class TwitterStream(params: String){

    val killSwitch = KillSwitches.shared(s"twitter4s-${UUID.randomUUID}")

    def run(): Future[StreamResult] = {
      // run the actual stream here, pass the killswitch down
      Future.successful(StreamResult())
    }
  }

@DanielaSfregola
Copy link
Owner

A few things people have been requested that could be interesting for our discussion:

@DanielaSfregola
Copy link
Owner

I need to think more about this, but I think we have two essential problems in the current API:

  • the current object we return (currently named TwitterStream) is not powerful enough. First of all - it is not a stream as you cannot access it: the only thing you can do with it is closing it! We should probably think at something more powerful that actually is a stream that people can access and do stuff with it (this could address Source for use in stream flow #168 and maybe we could figure it out the inject a different streaming library if needed somehow).
  • the API to start the stream is not powerful enough: it doesn't allow people to attach listeners to the status of the stream, or specify an efficient error strategy that does not necessarily involve side effects. What we could do is to have defaults that provide defaults for users that are happy with the current behavior, while providing opportunities to override these defaults for particular uses.

...let me see if tomorrow I can come up with a sort possible look for the new API

@lmyslinski
Copy link
Contributor Author

I'll start working on incorporating the first three - abstracting upon stream implementation seems like a much bigger task

@DanielaSfregola
Copy link
Owner

Agreed, we are not going to solve it now as it is a much bigger task - but it is good to keep it in mind to ensure the new API should be independent from any specific streaming library we could be using :)

@lmyslinski
Copy link
Contributor Author

I'm not exactly sure how we could achieve that. As far as I'm aware, there is no common "Streaming API". akka-streams, fs2, monix - all of them are built from scratch. They also utilize different mechanism and models - i. e. only akka streams allows you to create a graph blueprint. However most libraries offer conversions between different stream implementations - fs2 can work with a akka-streams stream etc. If we provide a stream to the end user, he can do whatever he wants with it.

Speaking of streams - By exposing a stream I mean Source[TwitterStreamingEvent, NotUsed]. Exposing a graph makes no sense, because the source is always the http request which is always going to be executed from within the library. Let's say we go with this approach - this is where lots of factors come in:

  • The eventHandler and errorHandler functions are no longer needed. The user can just handle all of this directly from the stream.
  • Preprocessing. I strongly believe that we should parse each incoming event and act upon the error ones. So if we receive a disconnect message, we restart the stream. Akka provides a neat mechanism for this. This should probably be configurable (on-off) with a flag.
  • [Optional] User could register for message types he is interested in. Before starting a stream, we pass in a list of messages we want to receive. The streams only emits values that are registered. This allows the user to have better control over the stream data and only receive what he is interested in.

@DanielaSfregola
Copy link
Owner

About using the different streaming API, you would have to specify separate modules that allow to use a stream library other than akka-stream (twitter4s-f2, twitter4s-monix or something)! But this is a problem for another day :)

Yes, I agree with you about streams: we should allow users that are cool with streams to just use the stream. I also think we should make things easier for those that are not interested in having the full control of the stream. So maybe this suggests having two APIs?

[Still brainstorming here, sorry if my answers bring more and more confusion! :D ]

@lmyslinski
Copy link
Contributor Author

lmyslinski commented Jun 12, 2019

#243 I've been messing with the code a bit. I've created a duplicate implementation side by side which exposed the stream to the end user. I've also had to simplify lots of implicit stuff just not to have to figure out what parameters are required where etc. at this point.

This implementation still misses any of the added-value stuff like retries and message filtering, but it is working. Lots of stuff is patch-worked just to make it compile at this point, but the core flow is there.

In short, the top-level functions would return Future[Source[StreamingMessage, NotUsed]]. The Future success or failure is based on establishing a connection. The end user can map upon success to start processing incoming events. All of the event mapping and wrapping in appropriate types still needs to be done.

@lmyslinski lmyslinski closed this as not planned Won't fix, can't repro, duplicate, stale Feb 3, 2023
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants