Skip to content
This repository was archived by the owner on Mar 11, 2023. It is now read-only.

Commit

Permalink
Merge pull request #213 from zolrag13/add_error_handler
Browse files Browse the repository at this point in the history
add custom error handler
  • Loading branch information
DanielaSfregola authored Mar 16, 2018
2 parents 6e393db + f768891 commit 16bcf73
Show file tree
Hide file tree
Showing 6 changed files with 100 additions and 42 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package com.danielasfregola.twitter4s.http.clients.streaming

object ErrorHandler {
def ignore: PartialFunction[Throwable, Unit] = {
case scala.util.control.NonFatal(e) => ()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,26 @@ private[twitter4s] class StreamingClient(val consumerToken: ConsumerToken, val a
implicit val materializer = ActorMaterializer()
implicit val ec = materializer.executionContext

def processStream[T <: StreamingMessage: Manifest](f: PartialFunction[T, Unit]): Future[TwitterStream] =
def processStream[T <: StreamingMessage: Manifest](
f: PartialFunction[T, Unit],
errorHandler: PartialFunction[Throwable, Unit]
): Future[TwitterStream] =
for {
requestWithAuth <- withOAuthHeader(None)(materializer)(request)
killSwitch <- processStreamRequest(requestWithAuth)(f)
killSwitch <- processStreamRequest(requestWithAuth)(f, errorHandler)
} yield TwitterStream(consumerToken, accessToken)(killSwitch, requestWithAuth, system)
}

protected def processStreamRequest[T <: StreamingMessage: Manifest](request: HttpRequest)(
f: PartialFunction[T, Unit])(implicit system: ActorSystem,
materializer: Materializer): Future[SharedKillSwitch] = {
protected def processStreamRequest[T <: StreamingMessage: Manifest](
request: HttpRequest
)(
f: PartialFunction[T, Unit],
errorHandler: PartialFunction[Throwable, Unit]
)(
implicit
system: ActorSystem,
materializer: Materializer
): Future[SharedKillSwitch] = {
implicit val ec = materializer.executionContext
implicit val rqt = request

Expand All @@ -58,7 +68,7 @@ private[twitter4s] class StreamingClient(val consumerToken: ConsumerToken, val a
.flatMapConcat {
case response if response.status.isSuccess =>
successResponse = true
processBody(response, killSwitch)(f)
processBody(response, killSwitch)(f, errorHandler)
Source.empty
case failureResponse =>
val statusCode = failureResponse.status
Expand All @@ -78,13 +88,27 @@ private[twitter4s] class StreamingClient(val consumerToken: ConsumerToken, val a
Future.firstCompletedOf(Seq(processing, switch))
}

def processBody[T: Manifest](response: HttpResponse, killSwitch: SharedKillSwitch)(
f: PartialFunction[T, Unit])(implicit request: HttpRequest, materializer: Materializer): Unit =
def processBody[T: Manifest, A](
response: HttpResponse,
killSwitch: SharedKillSwitch
)(
f: PartialFunction[T, Unit],
errorHandler: PartialFunction[Throwable, Unit] = ErrorHandler.ignore
)(
implicit
request: HttpRequest,
materializer: Materializer
): Unit =
response.entity.withoutSizeLimit.dataBytes
.via(Framing.delimiter(ByteString("\r\n"), Int.MaxValue).async)
.filter(_.nonEmpty)
.via(killSwitch.flow)
.map(data => unmarshalStream(data, f))
.recoverWithRetries(attempts = 3, {
case e =>
errorHandler(e)
Source.empty
})
.runWith(Sink.foreach(_ => (): Unit))

private def unmarshalStream[T <: StreamingMessage: Manifest](data: ByteString, f: PartialFunction[T, Unit])(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import com.danielasfregola.twitter4s.entities.enums.WithFilter
import com.danielasfregola.twitter4s.entities.enums.WithFilter.WithFilter
import com.danielasfregola.twitter4s.entities.streaming.SiteStreamingMessage
import com.danielasfregola.twitter4s.http.clients.streaming.sites.parameters.SiteParameters
import com.danielasfregola.twitter4s.http.clients.streaming.{StreamingClient, TwitterStream}
import com.danielasfregola.twitter4s.http.clients.streaming.{StreamingClient, TwitterStream, ErrorHandler}
import com.danielasfregola.twitter4s.util.Configurations._

import scala.concurrent.Future
Expand Down Expand Up @@ -52,11 +52,15 @@ trait TwitterSiteClient {
replies: Option[Boolean] = None,
stringify_friend_ids: Boolean = false,
languages: Seq[Language] = Seq.empty,
stall_warnings: Boolean = false)(f: PartialFunction[SiteStreamingMessage, Unit]): Future[TwitterStream] = {
stall_warnings: Boolean = false
)(
f: PartialFunction[SiteStreamingMessage, Unit],
errorHandler: PartialFunction[Throwable, Unit] = ErrorHandler.ignore
): Future[TwitterStream] = {
import streamingClient._
val repliesAll = replies.flatMap(x => if (x) Some("all") else None)
val parameters = SiteParameters(follow, `with`, repliesAll, stringify_friend_ids, languages, stall_warnings)
preProcessing()
Get(s"$siteUrl/site.json", parameters).processStream(f)
Get(s"$siteUrl/site.json", parameters).processStream(f, errorHandler)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import com.danielasfregola.twitter4s.entities.enums.FilterLevel.FilterLevel
import com.danielasfregola.twitter4s.entities.enums.Language.Language
import com.danielasfregola.twitter4s.entities.streaming.CommonStreamingMessage
import com.danielasfregola.twitter4s.http.clients.streaming.statuses.parameters._
import com.danielasfregola.twitter4s.http.clients.streaming.{StreamingClient, TwitterStream}
import com.danielasfregola.twitter4s.http.clients.streaming.{StreamingClient, TwitterStream, ErrorHandler}
import com.danielasfregola.twitter4s.util.Configurations._

import scala.concurrent.Future
Expand Down Expand Up @@ -46,19 +46,23 @@ trait TwitterStatusClient {
* Set the minimum value of the filter_level Tweet attribute required to be included in the stream.
* @param f : Function that defines how to process the received messages.
*/
def filterStatuses(follow: Seq[Long] = Seq.empty,
tracks: Seq[String] = Seq.empty,
locations: Seq[Double] = Seq.empty,
languages: Seq[Language] = Seq.empty,
stall_warnings: Boolean = false,
filter_level: FilterLevel = FilterLevel.None)(
f: PartialFunction[CommonStreamingMessage, Unit]): Future[TwitterStream] = {
def filterStatuses(
follow: Seq[Long] = Seq.empty,
tracks: Seq[String] = Seq.empty,
locations: Seq[Double] = Seq.empty,
languages: Seq[Language] = Seq.empty,
stall_warnings: Boolean = false,
filter_level: FilterLevel = FilterLevel.None
)(
f: PartialFunction[CommonStreamingMessage, Unit],
errorHandler: PartialFunction[Throwable, Unit] = ErrorHandler.ignore
): Future[TwitterStream] = {
import streamingClient._
require(follow.nonEmpty || tracks.nonEmpty || locations.nonEmpty,
"At least one of 'follow', 'tracks' or 'locations' needs to be non empty")
val filters = StatusFilters(follow, tracks, locations, languages, stall_warnings, filter_level)
preProcessing()
Post(s"$statusUrl/filter.json", filters).processStream(f)
Post(s"$statusUrl/filter.json", filters).processStream(f, errorHandler)
}

/** Starts a streaming connection from Twitter's public API, which is a a small random sample of all public statuses.
Expand All @@ -83,15 +87,19 @@ trait TwitterStatusClient {
* Set the minimum value of the filter_level Tweet attribute required to be included in the stream.
* @param f : Function that defines how to process the received messages.
*/
def sampleStatuses(languages: Seq[Language] = Seq.empty,
stall_warnings: Boolean = false,
tracks: Seq[String] = Seq.empty,
filter_level: FilterLevel = FilterLevel.None)(
f: PartialFunction[CommonStreamingMessage, Unit]): Future[TwitterStream] = {
def sampleStatuses(
languages: Seq[Language] = Seq.empty,
stall_warnings: Boolean = false,
tracks: Seq[String] = Seq.empty,
filter_level: FilterLevel = FilterLevel.None
)(
f: PartialFunction[CommonStreamingMessage, Unit],
errorHandler: PartialFunction[Throwable, Unit] = ErrorHandler.ignore
): Future[TwitterStream] = {
import streamingClient._
val parameters = StatusSampleParameters(languages, stall_warnings, tracks, filter_level)
preProcessing()
Get(s"$statusUrl/sample.json", parameters).processStream(f)
Get(s"$statusUrl/sample.json", parameters).processStream(f, errorHandler)
}

/** Starts a streaming connection from Twitter's firehose API of all public statuses.
Expand All @@ -116,12 +124,16 @@ trait TwitterStatusClient {
def firehoseStatuses(
count: Option[Int] = None,
languages: Seq[Language] = Seq.empty,
stall_warnings: Boolean = false)(f: PartialFunction[CommonStreamingMessage, Unit]): Future[TwitterStream] = {
stall_warnings: Boolean = false
)(
f: PartialFunction[CommonStreamingMessage, Unit],
errorHandler: PartialFunction[Throwable, Unit] = ErrorHandler.ignore
): Future[TwitterStream] = {
import streamingClient._
val maxCount = 150000
require(Math.abs(count.getOrElse(0)) <= maxCount, s"count must be between -$maxCount and +$maxCount")
val parameters = StatusFirehoseParameters(languages, count, stall_warnings)
preProcessing()
Get(s"$statusUrl/firehose.json", parameters).processStream(f)
Get(s"$statusUrl/firehose.json", parameters).processStream(f, errorHandler)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import com.danielasfregola.twitter4s.entities.enums.WithFilter.WithFilter
import com.danielasfregola.twitter4s.entities.enums.{FilterLevel, WithFilter}
import com.danielasfregola.twitter4s.entities.streaming.UserStreamingMessage
import com.danielasfregola.twitter4s.http.clients.streaming.users.parameters._
import com.danielasfregola.twitter4s.http.clients.streaming.{StreamingClient, TwitterStream}
import com.danielasfregola.twitter4s.http.clients.streaming.{StreamingClient, TwitterStream, ErrorHandler}
import com.danielasfregola.twitter4s.util.Configurations._

import scala.concurrent.Future
Expand Down Expand Up @@ -52,15 +52,19 @@ trait TwitterUserClient {
* @param stall_warnings : Default to false. Specifies whether stall warnings (`WarningMessage`) should be delivered as part of the updates.
* @param f : Function that defines how to process the received messages.
*/
def userEvents(`with`: WithFilter = WithFilter.Followings,
replies: Option[Boolean] = None,
tracks: Seq[String] = Seq.empty,
locations: Seq[Double] = Seq.empty,
stringify_friend_ids: Boolean = false,
languages: Seq[Language] = Seq.empty,
stall_warnings: Boolean = false,
filter_level: FilterLevel = FilterLevel.None)(
f: PartialFunction[UserStreamingMessage, Unit]): Future[TwitterStream] = {
def userEvents(
`with`: WithFilter = WithFilter.Followings,
replies: Option[Boolean] = None,
tracks: Seq[String] = Seq.empty,
locations: Seq[Double] = Seq.empty,
stringify_friend_ids: Boolean = false,
languages: Seq[Language] = Seq.empty,
stall_warnings: Boolean = false,
filter_level: FilterLevel = FilterLevel.None
)(
f: PartialFunction[UserStreamingMessage, Unit],
errorHandler: PartialFunction[Throwable, Unit] = ErrorHandler.ignore
): Future[TwitterStream] = {
import streamingClient._
val repliesAll = replies.flatMap(x => if (x) Some("all") else None)
val parameters = UserParameters(`with`,
Expand All @@ -72,6 +76,6 @@ trait TwitterUserClient {
stall_warnings,
filter_level)
preProcessing()
Get(s"$userUrl/user.json", parameters).processStream(f)
Get(s"$userUrl/user.json", parameters).processStream(f, errorHandler)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,16 @@ trait ClientSpec extends Spec {

protected val streamingClient = new StreamingClient(consumerToken, accessToken) {

override def processStreamRequest[T <: StreamingMessage: Manifest](request: HttpRequest)(
f: PartialFunction[T, Unit])(implicit system: ActorSystem,
materializer: Materializer): Future[SharedKillSwitch] = {
override def processStreamRequest[T <: StreamingMessage: Manifest](
request: HttpRequest
)(
f: PartialFunction[T, Unit],
errorHandler: PartialFunction[Throwable, Unit]
)(
implicit
system: ActorSystem,
materializer: Materializer
): Future[SharedKillSwitch] = {
implicit val ec = materializer.executionContext
implicit val timeout: Timeout = DurationInt(20) seconds

Expand Down

0 comments on commit 16bcf73

Please sign in to comment.