Skip to content

Commit

Permalink
Merge pull request #280 from shelefg/persistence_exporter
Browse files Browse the repository at this point in the history
Introduce persistence exporter decorators for MetricExporter and Span
This submission builds upon #279 , and introduces a persistence decorator for signal exporters.

Most of the code in the submission was imported and adjusted from the DataDogExporter library. It contains:

A persistence layer (see /Storage).
The export worker (see /Export).
The decorators & persistence and export configuration.
For example, decorating metric and span exporters with persistence functionality can be done as follows:

let metricExporter = ... // create some MetricExporter
let persistenceMetricExporter = try PersistenceMetricExporterDecorator(
                                            metricExporter: metricExporter,
                                            storageURL: metricsSubdirectoryURL,
                                            writerQueue: DispatchQueue(label: "metricWriterQueue"),
                                            readerQueue: DispatchQueue(label: "metricReaderQueue"),
                                            exportQueue: DispatchQueue(label: "metricExportQueue"),
                                            exportCondition: { return true })

let spanExporter = ... // create some SpanExporter
let persistenceTraceExporter = try PersistenceSpanExporterDecorator(
                                            spanExporter: spanExporter,
                                            storageURL: tracesSubdirectoryURL,
                                            writerQueue: DispatchQueue(label: "spanWriterQueue"),
                                            readerQueue: DispatchQueue(label: "spanWriterQueue"),
                                            exportQueue: DispatchQueue(label: "spanWriterQueue"),
                                            exportCondition: { return true })

The PersistenceMetricExporterDecorator and PersistenceSpanExporterDecorator will asynchronously:
Encode the exported Metric's and SpanData's objects to JSON.
Write the data to files in the folder specified by storageURL
Read back these objects from the disk and forward them to be exported by the corresponding decorated exporters.
  • Loading branch information
Ignacio Bonafonte authored Nov 25, 2021
2 parents 5e02eda + 0c16427 commit d7eb282
Show file tree
Hide file tree
Showing 32 changed files with 3,058 additions and 0 deletions.
7 changes: 7 additions & 0 deletions Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ let package = Package(
.library(name: "StdoutExporter", type: .static, targets: ["StdoutExporter"]),
.library(name: "PrometheusExporter", type: .static, targets: ["PrometheusExporter"]),
.library(name: "OpenTelemetryProtocolExporter", type: .static, targets: ["OpenTelemetryProtocolExporter"]),
.library(name: "PersistenceExporter", type: .static, targets: ["PersistenceExporter"]),
.library(name: "InMemoryExporter", type: .static, targets: ["InMemoryExporter"]),
.library(name: "DatadogExporter", type: .static, targets: ["DatadogExporter"]),
.library(name: "NetworkStatus", type: .static, targets: ["NetworkStatus"]),
Expand Down Expand Up @@ -96,6 +97,9 @@ let package = Package(
dependencies: ["OpenTelemetrySdk"],
path: "Sources/Exporters/DatadogExporter",
exclude: ["NOTICE", "README.md"]),
.target(name: "PersistenceExporter",
dependencies: ["OpenTelemetrySdk"],
path: "Sources/Exporters/Persistence"),
.testTarget(name: "NetworkStatusTests",
dependencies: ["NetworkStatus"],
path: "Tests/InstrumentationTests/NetworkStatusTests"),
Expand Down Expand Up @@ -142,6 +146,9 @@ let package = Package(
.product(name: "NIO", package: "swift-nio"),
.product(name: "NIOHTTP1", package: "swift-nio")],
path: "Tests/ExportersTests/DatadogExporter"),
.testTarget(name: "PersistenceExporterTests",
dependencies: ["PersistenceExporter"],
path: "Tests/ExportersTests/PersistenceExporter"),
.target(name: "LoggingTracer",
dependencies: ["OpenTelemetryApi"],
path: "Examples/Logging Tracer"),
Expand Down
40 changes: 40 additions & 0 deletions Sources/Exporters/Persistence/Export/DataExportDelay.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

import Foundation

internal protocol Delay {
var current: TimeInterval { get }
mutating func decrease()
mutating func increase()
}

/// Mutable interval used for periodic data exports.
internal struct DataExportDelay: Delay {
private let defaultDelay: TimeInterval
private let minDelay: TimeInterval
private let maxDelay: TimeInterval
private let changeRate: Double

private var delay: TimeInterval

init(performance: ExportPerformancePreset) {
self.defaultDelay = performance.defaultExportDelay
self.minDelay = performance.minExportDelay
self.maxDelay = performance.maxExportDelay
self.changeRate = performance.exportDelayChangeRate
self.delay = performance.initialExportDelay
}

var current: TimeInterval { delay }

mutating func decrease() {
delay = max(minDelay, delay * (1.0 - changeRate))
}

mutating func increase() {
delay = min(delay * (1.0 + changeRate), maxDelay)
}
}
15 changes: 15 additions & 0 deletions Sources/Exporters/Persistence/Export/DataExportStatus.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

import Foundation

/// The status of a single export attempt.
internal struct DataExportStatus {
/// If export needs to be retried (`true`) because its associated data was not delivered but it may succeed
/// in the next attempt (i.e. it failed due to device leaving signal range or a temporary server unavailability occured).
/// If set to `false` then data associated with the upload should be deleted as it does not need any more export
/// attempts (i.e. the upload succeeded or failed due to unrecoverable client error).
let needsRetry: Bool
}
112 changes: 112 additions & 0 deletions Sources/Exporters/Persistence/Export/DataExportWorker.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

import Foundation

// a protocol for an exporter of `Data` to which a `DataExportWorker` can delegate persisted
// data export
internal protocol DataExporter {
func export(data: Data) -> DataExportStatus
}

// a protocol needed for mocking `DataExportWorker`
internal protocol DataExportWorkerProtocol {
func flush() -> Bool
}

internal class DataExportWorker: DataExportWorkerProtocol {
/// Queue to execute exports.
private let queue: DispatchQueue
/// File reader providing data to export.
private let fileReader: FileReader
/// Data exporter sending data to server.
private let dataExporter: DataExporter
/// Variable system conditions determining if export should be performed.
private let exportCondition: () -> Bool

/// Delay used to schedule consecutive exports.
private var delay: Delay

/// Export work scheduled by this worker.
private var exportWork: DispatchWorkItem?

init(
queue: DispatchQueue,
fileReader: FileReader,
dataExporter: DataExporter,
exportCondition: @escaping () -> Bool,
delay: Delay
) {
self.queue = queue
self.fileReader = fileReader
self.exportCondition = exportCondition
self.dataExporter = dataExporter
self.delay = delay

let exportWork = DispatchWorkItem { [weak self] in
guard let self = self else {
return
}

let isSystemReady = self.exportCondition()
let nextBatch = isSystemReady ? self.fileReader.readNextBatch() : nil
if let batch = nextBatch {
// Export batch
let exportStatus = self.dataExporter.export(data: batch.data)

// Delete or keep batch depending on the export status
if exportStatus.needsRetry {
self.delay.increase()
} else {
self.fileReader.markBatchAsRead(batch)
self.delay.decrease()
}
} else {
self.delay.increase()
}

self.scheduleNextExport(after: self.delay.current)
}

self.exportWork = exportWork

scheduleNextExport(after: self.delay.current)
}

private func scheduleNextExport(after delay: TimeInterval) {
guard let work = exportWork else {
return
}

queue.asyncAfter(deadline: .now() + delay, execute: work)
}

/// This method gets remaining files at once, and exports them
/// It assures that periodic exporter cannot read or export the files while the flush is being processed
internal func flush() -> Bool {
let success = queue.sync {
self.fileReader.onRemainingBatches {
let exportStatus = self.dataExporter.export(data: $0.data)
if !exportStatus.needsRetry {
self.fileReader.markBatchAsRead($0)
}
}
}
return success
}

/// Cancels scheduled exports and stops scheduling next ones.
/// - It does not affect the export that has already begun.
/// - It blocks the caller thread if called in the middle of export execution.
internal func cancelSynchronously() {
queue.sync(flags: .barrier) {
// This cancellation must be performed on the `queue` to ensure that it is not called
// in the middle of a `DispatchWorkItem` execution - otherwise, as the pending block would be
// fully executed, it will schedule another export by calling `nextScheduledWork(after:)` at the end.
self.exportWork?.cancel()
self.exportWork = nil
}
}
}
131 changes: 131 additions & 0 deletions Sources/Exporters/Persistence/PersistenceExporterDecorator.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

import Foundation
import OpenTelemetrySdk

// protocol for exporters that can be decorated with `PersistenceExporterDecorator`
protocol DecoratedExporter {
associatedtype SignalType

func export(values: [SignalType]) -> DataExportStatus
}

// a generic decorator of `DecoratedExporter` adding filesystem persistence of batches of `[T.SignalType]`.
// `T.SignalType` must conform to `Codable`.
internal class PersistenceExporterDecorator<T> where T: DecoratedExporter, T.SignalType: Codable {

// a wrapper of `DecoratedExporter` (T) to add conformance to `DataExporter` that can be
// used with `DataExportWorker`.
private class DecoratedDataExporter: DataExporter {

private let decoratedExporter: T

init(decoratedExporter: T) {
self.decoratedExporter = decoratedExporter
}

func export(data: Data) -> DataExportStatus {

// decode batches of `[T.SignalType]` from the raw data.
// the data is made of batches of comma-suffixed JSON arrays, so in order to utilize
// `JSONDecoder`, add a "[" prefix and "null]" suffix making the data a valid
// JSON array of `[T.SignalType]`.
var arrayData: Data = JSONDataConstants.arrayPrefix
arrayData.append(data)
arrayData.append(JSONDataConstants.arraySuffix)

do {
let decoder = JSONDecoder()
let exportables = try decoder.decode(
[[T.SignalType]?].self,
from: arrayData).compactMap { $0 }.flatMap { $0 }

return decoratedExporter.export(values: exportables)
} catch {
return DataExportStatus(needsRetry: false)
}
}
}

private let performancePreset: PersistencePerformancePreset

private let fileWriter: FileWriter

private let worker: DataExportWorkerProtocol

public convenience init(decoratedExporter: T,
storageURL: URL,
writerQueue: DispatchQueue,
readerQueue: DispatchQueue,
exportQueue: DispatchQueue,
exportCondition: @escaping () -> Bool,
performancePreset: PersistencePerformancePreset = .default) {

// orchestrate writes and reads over the folder given by `storageURL`
let filesOrchestrator = FilesOrchestrator(
directory: Directory(url: storageURL),
performance: performancePreset,
dateProvider: SystemDateProvider()
)

let fileWriter = OrchestratedFileWriter(
orchestrator: filesOrchestrator,
queue: writerQueue
)

let fileReader = OrchestratedFileReader(
orchestrator: filesOrchestrator,
queue: readerQueue
)

self.init(decoratedExporter: decoratedExporter,
fileWriter: fileWriter,
workerFactory: {
return DataExportWorker(
queue: exportQueue,
fileReader: fileReader,
dataExporter: $0,
exportCondition: exportCondition,
delay: DataExportDelay(performance: performancePreset))
},
performancePreset: performancePreset)
}

// internal initializer for testing that accepts a worker factory that allows mocking the worker
internal init(decoratedExporter: T,
fileWriter: FileWriter,
workerFactory createWorker: (DataExporter) -> DataExportWorkerProtocol,
performancePreset: PersistencePerformancePreset) {
self.performancePreset = performancePreset

self.fileWriter = fileWriter

self.worker = createWorker(DecoratedDataExporter(decoratedExporter: decoratedExporter))
}

public func export(values: [T.SignalType]) throws {
let encoder = JSONEncoder()
var data = try encoder.encode(values)
data.append(JSONDataConstants.arraySeparator)

if (performancePreset.synchronousWrite) {
fileWriter.writeSync(data: data)
} else {
fileWriter.write(data: data)
}
}

public func flush() {
fileWriter.flush()
_ = worker.flush()
}
}

fileprivate struct JSONDataConstants {
static let arrayPrefix = "[".data(using: .utf8)!
static let arraySuffix = "null]".data(using: .utf8)!
static let arraySeparator = ",".data(using: .utf8)!
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

import Foundation
import OpenTelemetrySdk

// a persistence exporter decorator for `Metric`.
// specialization of `PersistenceExporterDecorator` for `MetricExporter`.
public class PersistenceMetricExporterDecorator: MetricExporter {

struct MetricDecoratedExporter: DecoratedExporter {
typealias SignalType = Metric

private let metricExporter: MetricExporter

init(metricExporter: MetricExporter) {
self.metricExporter = metricExporter
}

func export(values: [Metric]) -> DataExportStatus {
let result = metricExporter.export(metrics: values, shouldCancel: nil)
return DataExportStatus(needsRetry: result == .failureRetryable)
}
}

private let persistenceExporter: PersistenceExporterDecorator<MetricDecoratedExporter>

public init(metricExporter: MetricExporter,
storageURL: URL,
writerQueue: DispatchQueue,
readerQueue: DispatchQueue,
exportQueue: DispatchQueue,
exportCondition: @escaping () -> Bool,
performancePreset: PersistencePerformancePreset = .default) throws {

self.persistenceExporter =
PersistenceExporterDecorator<MetricDecoratedExporter>(
decoratedExporter: MetricDecoratedExporter(metricExporter: metricExporter),
storageURL: storageURL,
writerQueue: writerQueue,
readerQueue: readerQueue,
exportQueue: exportQueue,
exportCondition: exportCondition,
performancePreset: performancePreset)
}

public func export(metrics: [Metric], shouldCancel: (() -> Bool)?) -> MetricExporterResultCode {
do {
try persistenceExporter.export(values: metrics)

return .success
} catch {
return .failureNotRetryable
}
}
}
Loading

0 comments on commit d7eb282

Please sign in to comment.