Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 35 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ addCommandAlias(
"testJVM",
"typeidJVM/test; chunkJVM/test; combinatorsJVM/test; ringbufferJVM/test; schemaJVM/test; streamsJVM/test; schema-toonJVM/test; schema-messagepackJVM/test; schema-avro/test; " +
"schema-thrift/test; schema-bson/test; schema-xmlJVM/test; schema-yamlJVM/test; schema-csvJVM/test; contextJVM/test; scopeJVM/test; mediatypeJVM/test; http-modelJVM/test; " +
"http-model-schemaJVM/test; openapiJVM/test; smithy/test; zioGolemModelJVM/test; zioGolemCoreJVM/test; zioGolemMacros/test; zioGolemTools/test"
"http-model-schemaJVM/test; openapiJVM/test; smithy/test; zioGolemModelJVM/test; zioGolemCoreJVM/test; zioGolemMacros/test; zioGolemTools/test; otelJVM/test"
)

addCommandAlias(
Expand Down Expand Up @@ -133,6 +133,8 @@ lazy val root = project
`scope-examples`,
schema.jvm,
schema.js,
otel.jvm,
otel.js,
`schema-avro`,
`schema-messagepack`.jvm,
`schema-messagepack`.js,
Expand Down Expand Up @@ -375,6 +377,38 @@ lazy val schema = crossProject(JSPlatform, JVMPlatform)
})
)

lazy val otel = crossProject(JSPlatform, JVMPlatform)
.crossType(CrossType.Full)
.dependsOn(context, chunk)
.settings(stdSettings("zio-blocks-otel"))
.settings(crossProjectSettings)
.settings(buildInfoSettings("zio.blocks.otel"))
.enablePlugins(BuildInfoPlugin)
.settings(
libraryDependencies ++= Seq(
"dev.zio" %%% "zio-test" % "2.1.24" % Test,
"dev.zio" %%% "zio-test-sbt" % "2.1.24" % Test
) ++ (CrossVersion.partialVersion(scalaVersion.value) match {
case Some((2, _)) =>
Seq("org.scala-lang" % "scala-reflect" % scalaVersion.value)
case _ =>
Seq()
}),
coverageMinimumStmtTotal := 80,
coverageMinimumBranchTotal := 70,
coverageExcludedFiles := Seq(
".*PlatformExecutor.*",
".*BuildInfo.*"
).mkString(";"),
Compile / scalacOptions ++= {
if (scalaVersion.value.startsWith("2."))
Seq("-Wconf:cat=unchecked:s")
else Nil
}
)
.jvmSettings(mimaSettings(failOnProblem = false))
.jsSettings(jsSettings)

lazy val streams = crossProject(JSPlatform, JVMPlatform)
.crossType(CrossType.Pure)
.settings(stdSettings("zio-blocks-streams"))
Expand Down
46 changes: 46 additions & 0 deletions otel/js/src/main/scala/zio/blocks/otel/ContextStorage.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright 2024-2026 John A. De Goes and the ZIO Contributors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package zio.blocks.otel

sealed trait ContextStorage[A] {
def get(): A
def set(value: A): Unit
def scoped[B](value: A)(f: => B): B
}

object ContextStorage {

val hasLoom: Boolean = false
val implementationName: String = "JSGlobal"

def create[A](initial: A): ContextStorage[A] = new JsStorage[A](initial)

private final class JsStorage[A](initial: A) extends ContextStorage[A] {
private var current: A = initial

def get(): A = current

def set(value: A): Unit = current = value

def scoped[B](value: A)(f: => B): B = {
val prev = current
current = value
try f
finally { current = prev }
}
}
}
30 changes: 30 additions & 0 deletions otel/js/src/main/scala/zio/blocks/otel/LogAnnotations.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright 2024-2026 John A. De Goes and the ZIO Contributors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package zio.blocks.otel

private[otel] object LogAnnotations {
private var current: Map[String, String] = Map.empty

def get(): Map[String, String] = current

def scoped[A](annotations: Map[String, String])(f: => A): A = {
val prev = current
current = prev ++ annotations
try f
finally { current = prev }
}
}
120 changes: 120 additions & 0 deletions otel/jvm/src/main/scala/zio/blocks/otel/BatchProcessor.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/*
* Copyright 2024-2026 John A. De Goes and the ZIO Contributors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package zio.blocks.otel

import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.ScheduledExecutorService
import java.util.concurrent.ScheduledFuture
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicInteger

sealed trait ExportResult

object ExportResult {
case object Success extends ExportResult
final case class Failure(retryable: Boolean, message: String) extends ExportResult
}

final class BatchProcessor[A](
exportFn: Seq[A] => ExportResult,
executor: ScheduledExecutorService,
maxQueueSize: Int = 2048,
maxBatchSize: Int = 512,
flushIntervalMillis: Long = 5000,
maxRetries: Int = 5,
retryBaseMillis: Long = 1000L
) {
private val queue: ConcurrentLinkedQueue[A] = new ConcurrentLinkedQueue[A]()
private val queueSize: AtomicInteger = new AtomicInteger(0)
private val isShutdown: AtomicBoolean = new AtomicBoolean(false)

private val flushTask: Runnable = new Runnable {
def run(): Unit = doFlush()
}

private val scheduledFuture: ScheduledFuture[_] =
executor.scheduleAtFixedRate(flushTask, flushIntervalMillis, flushIntervalMillis, TimeUnit.MILLISECONDS)

def enqueue(item: A): Unit =
if (!isShutdown.get()) {
queue.add(item)
val size = queueSize.incrementAndGet()
if (size > maxQueueSize) {
val removed = queue.poll()
if (removed != null) {
queueSize.decrementAndGet()
System.err.println(
"[zio-blocks-otel] BatchProcessor queue full (" + maxQueueSize + "). Dropping oldest item."
)
}
}
}

def forceFlush(): Unit = doFlush()

def shutdown(): Unit =
if (isShutdown.compareAndSet(false, true)) {
scheduledFuture.cancel(false)
doFlush()
}

private def doFlush(): Unit = {
var hasMore = true
while (hasMore) {
val batch = drain(maxBatchSize)
hasMore = batch.nonEmpty
if (hasMore) exportWithRetry(batch, 0)
}
}

private def drain(max: Int): Seq[A] = {
val builder = Seq.newBuilder[A]
var count = 0
while (count < max) {
val item = queue.poll()
if (item == null) {
count = max // exit loop
} else {
builder += item
queueSize.decrementAndGet()
count += 1
}
}
builder.result()
}

private def exportWithRetry(batch: Seq[A], attempt: Int): Unit =
exportFn(batch) match {
case ExportResult.Success => ()
case ExportResult.Failure(retryable, message) =>
if (!retryable) {
System.err.println(
"[zio-blocks-otel] BatchProcessor export failed (non-retryable): " + message + ". Dropping " + batch.size + " items."
)
} else if (attempt >= maxRetries) {
System.err.println(
"[zio-blocks-otel] BatchProcessor export failed after " + (attempt + 1) + " attempts: " + message + ". Dropping " + batch.size + " items."
)
} else {
val delayMs = math.min(retryBaseMillis * (1L << attempt), 30000L)
try Thread.sleep(delayMs)
catch { case _: InterruptedException => Thread.currentThread().interrupt() }
Comment on lines +112 to +116
exportWithRetry(batch, attempt + 1)
}
}
}
Loading