Skip to content
This repository was archived by the owner on Mar 11, 2023. It is now read-only.

add custom error handler #213

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package com.danielasfregola.twitter4s.http.clients.streaming

object ErrorHandler {
def default: PartialFunction[Throwable, Unit] = {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed, could you change it to ignore?

case scala.util.control.NonFatal(e) => ()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,26 @@ private[twitter4s] class StreamingClient(val consumerToken: ConsumerToken, val a
implicit val materializer = ActorMaterializer()
implicit val ec = materializer.executionContext

def processStream[T <: StreamingMessage: Manifest](f: PartialFunction[T, Unit]): Future[TwitterStream] =
def processStream[T <: StreamingMessage: Manifest](
f: PartialFunction[T, Unit],
errorHandler: PartialFunction[Throwable, Unit]
): Future[TwitterStream] =
for {
requestWithAuth <- withOAuthHeader(None)(materializer)(request)
killSwitch <- processStreamRequest(requestWithAuth)(f)
killSwitch <- processStreamRequest(requestWithAuth)(f, errorHandler)
} yield TwitterStream(consumerToken, accessToken)(killSwitch, requestWithAuth, system)
}

protected def processStreamRequest[T <: StreamingMessage: Manifest](request: HttpRequest)(
f: PartialFunction[T, Unit])(implicit system: ActorSystem,
materializer: Materializer): Future[SharedKillSwitch] = {
protected def processStreamRequest[T <: StreamingMessage: Manifest](
request: HttpRequest
)(
f: PartialFunction[T, Unit],
errorHandler: PartialFunction[Throwable, Unit]
)(
implicit
system: ActorSystem,
materializer: Materializer
): Future[SharedKillSwitch] = {
implicit val ec = materializer.executionContext
implicit val rqt = request

Expand All @@ -58,7 +68,7 @@ private[twitter4s] class StreamingClient(val consumerToken: ConsumerToken, val a
.flatMapConcat {
case response if response.status.isSuccess =>
successResponse = true
processBody(response, killSwitch)(f)
processBody(response, killSwitch)(f, errorHandler)
Source.empty
case failureResponse =>
val statusCode = failureResponse.status
Expand All @@ -78,13 +88,27 @@ private[twitter4s] class StreamingClient(val consumerToken: ConsumerToken, val a
Future.firstCompletedOf(Seq(processing, switch))
}

def processBody[T: Manifest](response: HttpResponse, killSwitch: SharedKillSwitch)(
f: PartialFunction[T, Unit])(implicit request: HttpRequest, materializer: Materializer): Unit =
def processBody[T: Manifest, A](
response: HttpResponse,
killSwitch: SharedKillSwitch
)(
f: PartialFunction[T, Unit],
errorHandler: PartialFunction[Throwable, Unit] = ErrorHandler.default
)(
implicit
request: HttpRequest,
materializer: Materializer
): Unit =
response.entity.withoutSizeLimit.dataBytes
.via(Framing.delimiter(ByteString("\r\n"), Int.MaxValue).async)
.filter(_.nonEmpty)
.via(killSwitch.flow)
.map(data => unmarshalStream(data, f))
.recoverWithRetries(attempts = 3, {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally, the number of attempts should be configurable.

case e =>
errorHandler(e)
Source.empty
})
.runWith(Sink.foreach(_ => (): Unit))

private def unmarshalStream[T <: StreamingMessage: Manifest](data: ByteString, f: PartialFunction[T, Unit])(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import com.danielasfregola.twitter4s.entities.enums.WithFilter
import com.danielasfregola.twitter4s.entities.enums.WithFilter.WithFilter
import com.danielasfregola.twitter4s.entities.streaming.SiteStreamingMessage
import com.danielasfregola.twitter4s.http.clients.streaming.sites.parameters.SiteParameters
import com.danielasfregola.twitter4s.http.clients.streaming.{StreamingClient, TwitterStream}
import com.danielasfregola.twitter4s.http.clients.streaming.{StreamingClient, TwitterStream, ErrorHandler}
import com.danielasfregola.twitter4s.util.Configurations._

import scala.concurrent.Future
Expand Down Expand Up @@ -52,11 +52,15 @@ trait TwitterSiteClient {
replies: Option[Boolean] = None,
stringify_friend_ids: Boolean = false,
languages: Seq[Language] = Seq.empty,
stall_warnings: Boolean = false)(f: PartialFunction[SiteStreamingMessage, Unit]): Future[TwitterStream] = {
stall_warnings: Boolean = false
)(
f: PartialFunction[SiteStreamingMessage, Unit],
errorHandler: PartialFunction[Throwable, Unit] = ErrorHandler.default
): Future[TwitterStream] = {
import streamingClient._
val repliesAll = replies.flatMap(x => if (x) Some("all") else None)
val parameters = SiteParameters(follow, `with`, repliesAll, stringify_friend_ids, languages, stall_warnings)
preProcessing()
Get(s"$siteUrl/site.json", parameters).processStream(f)
Get(s"$siteUrl/site.json", parameters).processStream(f, errorHandler)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import com.danielasfregola.twitter4s.entities.enums.FilterLevel.FilterLevel
import com.danielasfregola.twitter4s.entities.enums.Language.Language
import com.danielasfregola.twitter4s.entities.streaming.CommonStreamingMessage
import com.danielasfregola.twitter4s.http.clients.streaming.statuses.parameters._
import com.danielasfregola.twitter4s.http.clients.streaming.{StreamingClient, TwitterStream}
import com.danielasfregola.twitter4s.http.clients.streaming.{StreamingClient, TwitterStream, ErrorHandler}
import com.danielasfregola.twitter4s.util.Configurations._

import scala.concurrent.Future
Expand Down Expand Up @@ -46,19 +46,23 @@ trait TwitterStatusClient {
* Set the minimum value of the filter_level Tweet attribute required to be included in the stream.
* @param f : Function that defines how to process the received messages.
*/
def filterStatuses(follow: Seq[Long] = Seq.empty,
tracks: Seq[String] = Seq.empty,
locations: Seq[Double] = Seq.empty,
languages: Seq[Language] = Seq.empty,
stall_warnings: Boolean = false,
filter_level: FilterLevel = FilterLevel.None)(
f: PartialFunction[CommonStreamingMessage, Unit]): Future[TwitterStream] = {
def filterStatuses(
follow: Seq[Long] = Seq.empty,
tracks: Seq[String] = Seq.empty,
locations: Seq[Double] = Seq.empty,
languages: Seq[Language] = Seq.empty,
stall_warnings: Boolean = false,
filter_level: FilterLevel = FilterLevel.None
)(
f: PartialFunction[CommonStreamingMessage, Unit],
errorHandler: PartialFunction[Throwable, Unit] = ErrorHandler.default
): Future[TwitterStream] = {
import streamingClient._
require(follow.nonEmpty || tracks.nonEmpty || locations.nonEmpty,
"At least one of 'follow', 'tracks' or 'locations' needs to be non empty")
val filters = StatusFilters(follow, tracks, locations, languages, stall_warnings, filter_level)
preProcessing()
Post(s"$statusUrl/filter.json", filters).processStream(f)
Post(s"$statusUrl/filter.json", filters).processStream(f, errorHandler)
}

/** Starts a streaming connection from Twitter's public API, which is a a small random sample of all public statuses.
Expand All @@ -83,15 +87,19 @@ trait TwitterStatusClient {
* Set the minimum value of the filter_level Tweet attribute required to be included in the stream.
* @param f : Function that defines how to process the received messages.
*/
def sampleStatuses(languages: Seq[Language] = Seq.empty,
stall_warnings: Boolean = false,
tracks: Seq[String] = Seq.empty,
filter_level: FilterLevel = FilterLevel.None)(
f: PartialFunction[CommonStreamingMessage, Unit]): Future[TwitterStream] = {
def sampleStatuses(
languages: Seq[Language] = Seq.empty,
stall_warnings: Boolean = false,
tracks: Seq[String] = Seq.empty,
filter_level: FilterLevel = FilterLevel.None
)(
f: PartialFunction[CommonStreamingMessage, Unit],
errorHandler: PartialFunction[Throwable, Unit] = ErrorHandler.default
): Future[TwitterStream] = {
import streamingClient._
val parameters = StatusSampleParameters(languages, stall_warnings, tracks, filter_level)
preProcessing()
Get(s"$statusUrl/sample.json", parameters).processStream(f)
Get(s"$statusUrl/sample.json", parameters).processStream(f, errorHandler)
}

/** Starts a streaming connection from Twitter's firehose API of all public statuses.
Expand All @@ -116,12 +124,16 @@ trait TwitterStatusClient {
def firehoseStatuses(
count: Option[Int] = None,
languages: Seq[Language] = Seq.empty,
stall_warnings: Boolean = false)(f: PartialFunction[CommonStreamingMessage, Unit]): Future[TwitterStream] = {
stall_warnings: Boolean = false
)(
f: PartialFunction[CommonStreamingMessage, Unit],
errorHandler: PartialFunction[Throwable, Unit] = ErrorHandler.default
): Future[TwitterStream] = {
import streamingClient._
val maxCount = 150000
require(Math.abs(count.getOrElse(0)) <= maxCount, s"count must be between -$maxCount and +$maxCount")
val parameters = StatusFirehoseParameters(languages, count, stall_warnings)
preProcessing()
Get(s"$statusUrl/firehose.json", parameters).processStream(f)
Get(s"$statusUrl/firehose.json", parameters).processStream(f, errorHandler)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import com.danielasfregola.twitter4s.entities.enums.WithFilter.WithFilter
import com.danielasfregola.twitter4s.entities.enums.{FilterLevel, WithFilter}
import com.danielasfregola.twitter4s.entities.streaming.UserStreamingMessage
import com.danielasfregola.twitter4s.http.clients.streaming.users.parameters._
import com.danielasfregola.twitter4s.http.clients.streaming.{StreamingClient, TwitterStream}
import com.danielasfregola.twitter4s.http.clients.streaming.{StreamingClient, TwitterStream, ErrorHandler}
import com.danielasfregola.twitter4s.util.Configurations._

import scala.concurrent.Future
Expand Down Expand Up @@ -52,15 +52,19 @@ trait TwitterUserClient {
* @param stall_warnings : Default to false. Specifies whether stall warnings (`WarningMessage`) should be delivered as part of the updates.
* @param f : Function that defines how to process the received messages.
*/
def userEvents(`with`: WithFilter = WithFilter.Followings,
replies: Option[Boolean] = None,
tracks: Seq[String] = Seq.empty,
locations: Seq[Double] = Seq.empty,
stringify_friend_ids: Boolean = false,
languages: Seq[Language] = Seq.empty,
stall_warnings: Boolean = false,
filter_level: FilterLevel = FilterLevel.None)(
f: PartialFunction[UserStreamingMessage, Unit]): Future[TwitterStream] = {
def userEvents(
`with`: WithFilter = WithFilter.Followings,
replies: Option[Boolean] = None,
tracks: Seq[String] = Seq.empty,
locations: Seq[Double] = Seq.empty,
stringify_friend_ids: Boolean = false,
languages: Seq[Language] = Seq.empty,
stall_warnings: Boolean = false,
filter_level: FilterLevel = FilterLevel.None
)(
f: PartialFunction[UserStreamingMessage, Unit],
errorHandler: PartialFunction[Throwable, Unit] = ErrorHandler.default
): Future[TwitterStream] = {
import streamingClient._
val repliesAll = replies.flatMap(x => if (x) Some("all") else None)
val parameters = UserParameters(`with`,
Expand All @@ -72,6 +76,6 @@ trait TwitterUserClient {
stall_warnings,
filter_level)
preProcessing()
Get(s"$userUrl/user.json", parameters).processStream(f)
Get(s"$userUrl/user.json", parameters).processStream(f, errorHandler)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,16 @@ trait ClientSpec extends Spec {

protected val streamingClient = new StreamingClient(consumerToken, accessToken) {

override def processStreamRequest[T <: StreamingMessage: Manifest](request: HttpRequest)(
f: PartialFunction[T, Unit])(implicit system: ActorSystem,
materializer: Materializer): Future[SharedKillSwitch] = {
override def processStreamRequest[T <: StreamingMessage: Manifest](
request: HttpRequest
)(
f: PartialFunction[T, Unit],
errorHandler: PartialFunction[Throwable, Unit]
)(
implicit
system: ActorSystem,
materializer: Materializer
): Future[SharedKillSwitch] = {
implicit val ec = materializer.executionContext
implicit val timeout: Timeout = DurationInt(20) seconds

Expand Down