Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Impl PublishWithContext and make subscriber and publisher accessible #2882

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,15 @@
package akka.stream.alpakka.googlecloud.pubsub.grpc.scaladsl

import akka.actor.Cancellable
import akka.annotation.ApiMayChange
import akka.dispatch.ExecutionContexts
import akka.stream.{Attributes, Materializer}
import akka.stream.scaladsl.{Flow, Keep, Sink, Source}
import akka.stream.scaladsl.{Flow, FlowWithContext, Keep, Sink, Source}
import akka.{Done, NotUsed}
import com.google.pubsub.v1.pubsub._

import scala.concurrent.duration._
import scala.concurrent.{Future, Promise}
import scala.concurrent.{ExecutionContext, Future, Promise}

/**
* Google Pub/Sub Akka Stream operator factory.
Expand All @@ -26,12 +27,24 @@ object GooglePubSub {
* @param parallelism controls how many messages can be in-flight at any given time
*/
def publish(parallelism: Int): Flow[PublishRequest, PublishResponse, NotUsed] =
Flow
.fromMaterializer { (mat, attr) =>
Flow[PublishRequest]
.mapAsyncUnordered(parallelism)(publisher(mat, attr).client.publish)
flowWithPublisherClient { publisherClient =>
Flow[PublishRequest].mapAsyncUnordered(parallelism)(publisherClient.publish)
}

/**
* Create a flow with context to publish messages to Google Cloud Pub/Sub. The flow emits responses that contain published
* message ids and provided context.
*
* @param parallelism controls how many messages can be in-flight at any given time
*/
def publishWithContext[Ctx](parallelism: Int): FlowWithContext[PublishRequest, Ctx, PublishResponse, Ctx, NotUsed] =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no corresponding in javadsl?

FlowWithContext.fromTuples(
flowWithPublisherClient { publisherClient =>
Flow[(PublishRequest, Ctx)].mapAsyncUnordered(parallelism) {
case (pr, promises) => publisherClient.publish(pr).map(_ -> promises)(ExecutionContext.parasitic)
}
}
.mapMaterializedValue(_ => NotUsed)
)

/**
* Create a source that emits messages for a given subscription using a StreamingPullRequest.
Expand Down Expand Up @@ -127,15 +140,28 @@ object GooglePubSub {
.mapMaterializedValue(_.flatMap(identity)(ExecutionContexts.parasitic))
}

private def publisher(mat: Materializer, attr: Attributes) =
@ApiMayChange
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are not supposed to be public user facing API? Then they should be marked with @InternalApi instead.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think they should be left private, or if I'm missing a use case, please describe it a bit.

private[scaladsl] def publisher(mat: Materializer, attr: Attributes) =
attr
.get[PubSubAttributes.Publisher]
.map(_.publisher)
.getOrElse(GrpcPublisherExt()(mat.system).publisher)

private def subscriber(mat: Materializer, attr: Attributes) =
@ApiMayChange
private[scaladsl] def subscriber(mat: Materializer, attr: Attributes) =
attr
.get[PubSubAttributes.Subscriber]
.map(_.subscriber)
.getOrElse(GrpcSubscriberExt()(mat.system).subscriber)

@ApiMayChange
private[scaladsl] def flowWithPublisherClient[In, Out](
flowFactory: PublisherClient => Flow[In, Out, NotUsed]
): Flow[In, Out, NotUsed] =
Flow
.fromMaterializer { (mat, attr) =>
val publisherClient = publisher(mat, attr).client
flowFactory(publisherClient)
}
.mapMaterializedValue(_ => NotUsed)
}