Skip to content

Commit

Permalink
Improve API slightly
Browse files Browse the repository at this point in the history
  • Loading branch information
nomisRev committed Apr 24, 2023
1 parent d7aec02 commit 54d7b91
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 56 deletions.
50 changes: 19 additions & 31 deletions src/main/kotlin/io/github/nomisrev/pubsub/GcpPubSub.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ package io.github.nomisrev.pubsub

import com.google.api.core.ApiFutures
import com.google.api.gax.batching.FlowControlSettings
import com.google.api.gax.core.CredentialsProvider
import com.google.api.gax.rpc.TransportChannelProvider
import com.google.cloud.pubsub.v1.AckReplyConsumer
import com.google.cloud.pubsub.v1.Publisher
import com.google.cloud.pubsub.v1.Subscriber
Expand All @@ -25,11 +23,9 @@ import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.channelFlow
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.toList
import kotlinx.coroutines.launch
import kotlinx.coroutines.runInterruptible
import kotlinx.coroutines.withContext
import kotlin.coroutines.CoroutineContext
import kotlinx.coroutines.CoroutineStart
import kotlinx.coroutines.channels.trySendBlocking

object GcpPubSub {
Expand Down Expand Up @@ -96,21 +92,15 @@ object GcpPubSub {
fun subscribe(
projectId: String,
subscriptionId: String,
credentialsProvider: CredentialsProvider? = null,
channelProvider: TransportChannelProvider? = null,
flowControlSettings: FlowControlSettings? = null
configure: Subscriber.Builder.() -> Unit = {},
): Flow<PubsubRecord> =
channelFlow {
val subscriptionName = ProjectSubscriptionName.of(projectId, subscriptionId)
// Create Subscriber for projectId & subscriptionId
val subscriber = Subscriber.newBuilder(subscriptionName) { message: PubsubMessage, consumer: AckReplyConsumer ->
val subscriber = Subscriber.newBuilder(subscriptionName, ) { message: PubsubMessage, consumer: AckReplyConsumer ->
// Block the upstream when Channel cannot keep up with messages
trySendBlocking(PubsubRecord(message, consumer))
}.apply {
channelProvider?.let(this::setChannelProvider)
credentialsProvider?.let(this::setCredentialsProvider)
flowControlSettings?.let(this::setFlowControlSettings)
}.build()
}.apply(configure).build()

subscriber.startAsync()
awaitClose { subscriber.stopAsync() }
Expand All @@ -127,9 +117,9 @@ object GcpPubSub {
projectId: String,
topicId: String,
encode: suspend (A) -> ByteString,
publisher: Publisher? = null
configure: Publisher.Builder.() -> Unit = {},
): Unit =
publish(messages.map(encode), projectId, topicId, publisher)
publish(messages.map(encode), projectId, topicId, configure = configure)

/**
* Publishes a stream of message [String] to a certain [projectId] and [topicId].
Expand All @@ -142,16 +132,15 @@ object GcpPubSub {
messages: Flow<ByteString>,
projectId: String,
topicId: String,
publisher: Publisher? = null,
context: CoroutineContext = Dispatchers.IO
context: CoroutineContext = Dispatchers.IO,
configure: Publisher.Builder.() -> Unit = {},
): Unit {
// Create publisher for projectId & topicId
val topicName = ProjectTopicName.of(projectId, topicId)

@Suppress("NAME_SHADOWING")
val publisher = publisher ?: withContext(context) {
Publisher.newBuilder(topicName).build()
val publisher = withContext(context) {
Publisher.newBuilder(ProjectTopicName.of(projectId, topicId))
.apply(configure)
.build()
}

// Create messages and publish them
val futures = messages
.map {
Expand All @@ -175,16 +164,15 @@ object GcpPubSub {
messages: Flow<ByteString>,
projectId: String,
topicId: String,
publisher: Publisher? = null,
context: CoroutineContext = Dispatchers.IO
context: CoroutineContext = Dispatchers.IO,
configure: Publisher.Builder.() -> Unit = {},
): Unit {
// Create publisher for projectId & topicId
val topicName = ProjectTopicName.of(projectId, topicId)

@Suppress("NAME_SHADOWING")
val publisher = publisher ?: withContext(context) {
Publisher.newBuilder(topicName).build()
val publisher = withContext(context) {
Publisher.newBuilder(ProjectTopicName.of(projectId, topicId))
.apply(configure)
.build()
}

try {
// Create messages and publish them
messages.collect { bytes ->
Expand Down
44 changes: 19 additions & 25 deletions src/test/kotlin/io/github/nomisrev/pubsub/PubSubTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,9 @@ import org.testcontainers.containers.PubSubEmulatorContainer
import org.testcontainers.utility.DockerImageName
import com.google.api.gax.core.NoCredentialsProvider
import com.google.api.gax.rpc.TransportChannelProvider
import com.google.cloud.pubsub.v1.Publisher
import com.google.cloud.pubsub.v1.TopicAdminSettings
import com.google.cloud.pubsub.v1.SubscriptionAdminSettings
import com.google.protobuf.ByteString
import com.google.pubsub.v1.TopicName
import io.kotest.matchers.shouldBe
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.take
Expand Down Expand Up @@ -56,16 +54,6 @@ class PubSubTest : StringSpec({
.setCredentialsProvider(credentials)
.build()

fun publisher(
projectId: String,
topicId: String,
channel: TransportChannelProvider,
credentials: CredentialsProvider
): Publisher = Publisher.newBuilder(TopicName.of(projectId, topicId))
.setChannelProvider(channel)
.setCredentialsProvider(credentials)
.build()

"publish multiple messages" {
val messages = setOf("first-message", "second-message", "third-message")
managedChannel().use { channel ->
Expand All @@ -78,16 +66,19 @@ class PubSubTest : StringSpec({
GcpPubSub.publish(
messages.map(ByteString::copyFromUtf8).asFlow(),
projectId,
topicId,
publisher(projectId, topicId, channel, credentials)
)
topicId
) {
setChannelProvider(channel)
setCredentialsProvider(credentials)
}

GcpPubSub.subscribe(
projectId,
subscriptionId,
credentials,
channel
).map { msg ->
) {
setChannelProvider(channel)
setCredentialsProvider(credentials)
}.map { msg ->
msg.data.toStringUtf8()
.also { msg.ack() }
}.take(3).toSet() shouldBe messages
Expand All @@ -106,16 +97,19 @@ class PubSubTest : StringSpec({
GcpPubSub.publish(
messages.map(ByteString::copyFromUtf8).asFlow(),
projectId,
topicId,
publisher(projectId, topicId, channel, credentials)
)
topicId
) {
setChannelProvider(channel)
setCredentialsProvider(credentials)
}

GcpPubSub.subscribe(
projectId,
subscriptionId,
credentials,
channel
).map { msg ->
subscriptionId
) {
setChannelProvider(channel)
setCredentialsProvider(credentials)
}.map { msg ->
msg.data.toStringUtf8()
.also { msg.ack() }
} // Receiving messages in order is not guaranteed unless you send message to the same region
Expand Down

0 comments on commit 54d7b91

Please sign in to comment.