To inspect how all of the below listed guidelines are to be implemented in practice, take a look at the reference connector. Feel free to use the reference connector as a starting point for new connectors.
Depending on the technology you integrate with Pekko Streams and Pekko Connectors you'll create Sources, Flows and Sinks. Regardless on how they are implemented make sure that you create the relevant Sources, Sinks and Flows APIs so they are simple and easy to use.
Reference connector model classes
When designing Flows, consider adding an extra field to the in- and out-messages which is passed through. A common use case we see, is committing a Kafka offset after passing data to another system.
Reference connector Java API factory methods
Apache Pekko Connectors, same as Apache Pekko, aims to keep 100% feature parity between the various language DSLs. Implementing even the API for Java in Scala has proven the most viable way to do it, as long as you keep the following in mind:
-
Keep entry points separated in
javadsl
andscaladsl
-
Provide factory methods for Sources, Flows and Sinks in the
javadsl
package wrapping all the methods in the Scala API. The Pekko Stream Scala instances have a.asJava
method to convert to theorg.apache.pekko.stream.javadsl
counterparts. -
When using Scala
object
instances, offer agetInstance()
method. -
When the Scala API contains an
apply
method, usecreate
for Java users. -
Do not nest Scala
object
s more than two levels (as access from Java becomes weird) -
Be careful to convert values within data structures (eg. for
scala.Long
vs.java.lang.Long
, usescala.Long.box(value)
) -
Complement any methods with Scala collections with a Java collection version
-
Use the
org.apache.pekko.japi.Pair
class to return tuples -
If the underlying Scala code requires an
ExecutionContext
, make the Java API take anExecutor
and useExecutionContext.fromExecutor(executor)
for conversion. -
Make use of
org.apache.pekko.util
conversions (eg.org.apache.pekko.util.FutureConverters
to translate Futures toCompletionStage
s).
Scala | Java |
---|---|
scala.Option[T] |
java.util.Optional<T> (OptionalDouble , ...) |
scala.collection.immutable.Seq[T] |
java.util.List<T> |
scala.concurrent.Future[T] |
java.util.concurrent.CompletionStage<T> |
scala.concurrent.Promise[T] |
java.util.concurrent.CompletableFuture<T> |
scala.concurrent.duration.FiniteDuration |
java.time.Duration |
Functions, if you want to allow the function to throw checked exceptions (like Scala functions - the infrastructure should recover from those gracefully) or leverage variance in the parameter or return types:
Scala | Java |
---|---|
T => Unit |
org.apache.pekko.japi.function.Procedure<T> |
() => R (scala.Function0[R] ) |
org.apache.pekko.japi.function.Creator<R> |
T => R (scala.Function1[T, R] ) |
org.apache.pekko.japi.function.Function<T, R> |
Functions, if you don't want to allow the function to throw checked exceptions (unlike Scala functions):
Scala | Java |
---|---|
T => Unit |
java.util.function.Consumer<T> |
() => R (scala.Function0[R] ) |
java.util.function.Supplier<R> |
T => R (scala.Function1[T, R] ) |
java.util.function.Function<T, R> |
Reference connector settings classes
Most technologies will have a couple of configuration settings that will be needed for several Sinks, Flows, or Sinks. Create case classes collecting these settings instead of passing them in every method.
Create a default instance in the companion object with good defaults which can be updated via withXxxx
methods.
Add withXxxx
methods to specify certain fields in the settings instance.
In case you see the need to support reading the settings from Config
, offer a method taking the Config
instance so
that the user can apply a proper namespace.
Refrain from using pekko.stream.connectors
as Config prefix, prefer pekko
as root namespace.
All Apache Pekko APIs aim to evolve in a binary compatible way within minor versions.
-
Do not use any default arguments
-
Do not use case classes (as the public copy method relies on default arguments)
-
To generate a case class replacement, consider using Kaze Class
See Binary Compatibilty Rules in the Apache Pekko documentation.
See Binary Compatibility for library authors
All the external runtime dependencies for the project, including transitive dependencies, must have an open source license that is equal to, or compatible with, Apache 2.
Which licenses are compatible with Apache 2 are defined in this doc, where you can see that the licenses that are listed under Category A
automatically compatible with Apache 2, while the ones listed under Category B
needs additional action:
Each license in this category requires some degree of reciprocity; therefore, additional action must be taken in order to minimize the chance that a user of an Apache product will create a derivative work of a reciprocally-licensed portion of an Apache product without being aware of the applicable requirements.
Dependency licenses will be checked automatically by FOSSA.
Use private
, private[connector]
and final
extensively to limit the API surface.
Package | Purpose |
---|---|
org.apache.pekko.stream.connectors.connector.javadsl |
Java-only part of the API, normally factories for Sources, Flows and Sinks |
org.apache.pekko.stream.connectors.connector.scaladsl |
Scala-only part of the API, normally factories for Sources, Flows and Sinks |
org.apache.pekko.stream.connectors.connector |
Shared API, eg. settings classes |
org.apache.pekko.stream.connectors.connector.impl |
Internal implementation in separate package |
Reference connector operator implementations
- Keep mutable state within the
GraphStageLogic
only - Open connections in
preStart
- Release resources in
postStop
- Fail early on configuration errors
- Make sure the code is thread-safe; if in doubt, please ask!
- No Blocking At Any Time -- in other words, avoid blocking whenever possible and replace it with asynchronous programming (async callbacks, stage actors)
Many technologies come with client libraries that only support blocking calls. Pekko Stream stages that use blocking APIs should preferably be run on Pekko's IODispatcher
. (In rare cases you might want to allow the users to configure another dispatcher to run the blocking operations on.)
To select Pekko's IODispatcher
for a stage use
override protected def initialAttributes: Attributes = Attributes(ActorAttributes.IODispatcher)
When the IODispatcher
is selected, you do NOT need to wrap the blocking calls in Future
s or blocking
.
(Issue akka/akka#25540 requests better support for selecting the correct execution context.)
Avoid duplication of code between different Sources, Sinks and Flows. Extract the common logic to a common abstract
base GraphStageLogic
that is inherited by the GraphStage
s.
Sometimes it may be useful to provide a Sink or Source for a connector, even if the main concept is implemented as a Flow. This can be easily done by reusing the Flow implementation:
- Source:
Source.maybe.viaMat(MyFlow)(Keep.right)
- Sink:
MyFlow.toMat(Sink.ignore)(Keep.right)
You do not need to expose every configuration a Flow offers this way -- Focus on the most important ones.
Write tests for all public methods and possible settings.
Use Docker containers for any testing needed by your application. The container's configuration can be added to the
docker-compose.yml
file.
Please ensure that you limit the amount of resources used by the containers.
Reference connector paradox documentation
Using Paradox syntax (which is very close to markdown), create or complement
the documentation in the docs
module.
Prepare code snippets to be integrated by Paradox in the tests. Such example should be part of real tests and not in
unused methods.
Use ScalaDoc if you see the need to describe the API usage better than the naming does.
The @apidoc
Paradox directive automatically creates links to the corresponding Scaladoc page for both scaladsl
and javadsl
. Be sure to add a $
at the end of the name if you point to an object
.
@apidoc[AmqpSink$]
will link to org/apache/pekko/stream/connectors/amqp/scaladsl/AmqpSink$.html
when viewing "Scala" and org/apache/pekko/stream/connectors/amqp/javadsl/AmqpSink$.html
for "Java".
Run `sbt docs/previewSite` to generate reference and API docs, start an embedded web-server, and open a tab to the generated documentation while developing.