diff --git a/core/src/main/java/com/blockchaintp/daml/address/Identifier.java b/core/src/main/java/com/blockchaintp/daml/address/Identifier.java index 45e7db5f..6676f372 100644 --- a/core/src/main/java/com/blockchaintp/daml/address/Identifier.java +++ b/core/src/main/java/com/blockchaintp/daml/address/Identifier.java @@ -1,5 +1,5 @@ /* - * Copyright 2021 Blockchain Technology Partners + * Copyright 2021-2022 Blockchain Technology Partners * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -13,7 +13,7 @@ */ package com.blockchaintp.daml.address; -import com.daml.ledger.participant.state.kvutils.DamlKvutils; +import com.daml.ledger.participant.state.kvutils.store.DamlStateKey; /** * A generic identifier. @@ -23,5 +23,5 @@ public interface Identifier { * * @return The state key; */ - DamlKvutils.DamlStateKey toKey(); + DamlStateKey toKey(); } diff --git a/core/src/main/java/com/blockchaintp/daml/address/QldbAddress.java b/core/src/main/java/com/blockchaintp/daml/address/QldbAddress.java index 03a9e53d..e4182246 100644 --- a/core/src/main/java/com/blockchaintp/daml/address/QldbAddress.java +++ b/core/src/main/java/com/blockchaintp/daml/address/QldbAddress.java @@ -1,5 +1,5 @@ /* - * Copyright 2021 Blockchain Technology Partners + * Copyright 2021-2022 Blockchain Technology Partners * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -13,24 +13,24 @@ */ package com.blockchaintp.daml.address; -import com.daml.ledger.participant.state.kvutils.DamlKvutils; +import com.daml.ledger.participant.state.kvutils.store.DamlStateKey; /** * */ public final class QldbAddress implements LedgerAddress { - private final DamlKvutils.DamlStateKey data; + private final DamlStateKey data; /** * * @param theData */ - public QldbAddress(final DamlKvutils.DamlStateKey theData) { + public QldbAddress(final DamlStateKey theData) { data = theData; } @Override - public DamlKvutils.DamlStateKey toKey() { + public DamlStateKey toKey() { return data; } } diff --git a/core/src/main/java/com/blockchaintp/daml/address/QldbIdentifier.java b/core/src/main/java/com/blockchaintp/daml/address/QldbIdentifier.java index b3c16420..9d54197a 100644 --- a/core/src/main/java/com/blockchaintp/daml/address/QldbIdentifier.java +++ b/core/src/main/java/com/blockchaintp/daml/address/QldbIdentifier.java @@ -1,5 +1,5 @@ /* - * Copyright 2021 Blockchain Technology Partners + * Copyright 2021-2022 Blockchain Technology Partners * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -15,24 +15,24 @@ import java.util.Objects; -import com.daml.ledger.participant.state.kvutils.DamlKvutils; +import com.daml.ledger.participant.state.kvutils.store.DamlStateKey; /** * */ public final class QldbIdentifier implements Identifier { - private final DamlKvutils.DamlStateKey data; + private final DamlStateKey data; /** * * @param theData */ - public QldbIdentifier(final DamlKvutils.DamlStateKey theData) { + public QldbIdentifier(final DamlStateKey theData) { data = theData; } @Override - public DamlKvutils.DamlStateKey toKey() { + public DamlStateKey toKey() { return data; } diff --git a/core/src/main/java/com/blockchaintp/daml/participant/CommitPayload.java b/core/src/main/java/com/blockchaintp/daml/participant/CommitPayload.java index c93234ae..5fc20c0c 100644 --- a/core/src/main/java/com/blockchaintp/daml/participant/CommitPayload.java +++ b/core/src/main/java/com/blockchaintp/daml/participant/CommitPayload.java @@ -1,5 +1,5 @@ /* - * Copyright 2021 Blockchain Technology Partners + * Copyright 2021-2022 Blockchain Technology Partners * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -19,6 +19,7 @@ import java.util.stream.Stream; import com.blockchaintp.daml.address.Identifier; +import com.blockchaintp.daml.address.LedgerAddress; import com.daml.ledger.participant.state.kvutils.Raw; import com.daml.ledger.participant.state.kvutils.api.CommitMetadata; diff --git a/core/src/main/java/com/blockchaintp/daml/participant/InProcLedgerSubmitter.java b/core/src/main/java/com/blockchaintp/daml/participant/InProcLedgerSubmitter.java index 719cbb39..b2a5a3cd 100644 --- a/core/src/main/java/com/blockchaintp/daml/participant/InProcLedgerSubmitter.java +++ b/core/src/main/java/com/blockchaintp/daml/participant/InProcLedgerSubmitter.java @@ -1,5 +1,5 @@ /* - * Copyright 2021 Blockchain Technology Partners + * Copyright 2021-2022 Blockchain Technology Partners * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -28,9 +28,9 @@ import com.blockchaintp.utility.Functions; import com.blockchaintp.utility.UuidConverter; import com.daml.api.util.TimeProvider; -import com.daml.ledger.participant.state.kvutils.DamlKvutils; import com.daml.ledger.participant.state.kvutils.Raw; -import com.daml.ledger.participant.state.v1.SubmissionResult; +import com.daml.ledger.participant.state.kvutils.store.DamlLogEntryId; +import com.daml.ledger.participant.state.v2.SubmissionResult; import com.daml.ledger.validator.SubmissionValidator$; import com.daml.ledger.validator.ValidatingCommitter; import com.daml.lf.engine.Engine; @@ -78,8 +78,8 @@ public static InProcLedgerSubmit * @param id * @return A daml log entry id parsed from a daml log entry id. */ - private DamlKvutils.DamlLogEntryId logEntryIdToDamlLogEntryId(final Raw.LogEntryId id) { - var parsed = Functions.uncheckFn(() -> DamlKvutils.DamlLogEntryId.parseFrom(id.bytes())).apply(); + private DamlLogEntryId logEntryIdToDamlLogEntryId(final Raw.LogEntryId id) { + var parsed = Functions.uncheckFn(() -> DamlLogEntryId.parseFrom(id.bytes())).apply(); LOG.info("parse log id {}", () -> parsed.getEntryId().toString()); @@ -107,7 +107,7 @@ public InProcLedgerSubmitter(final Engine theEngine, final Metrics theMetrics, new StateAccess(CoercingStore.from(Bijection.of(Raw.StateKey::bytes, Raw.StateKey$.MODULE$::apply), Bijection.of(Raw.Envelope::bytes, Raw.Envelope$.MODULE$::apply), theStateStore), writer), () -> logEntryIdToDamlLogEntryId(UuidConverter.uuidtoLogEntry(UUID.randomUUID())), false, - new StateCache<>(new LRUCache<>(STATE_CACHE_SIZE)), theEngine, theMetrics, false), + new StateCache<>(new LRUCache<>(STATE_CACHE_SIZE)), theEngine, theMetrics), r -> { /// New head should be end of sequence, i.e. one past the actual head. This should really have a /// nicer type, we ensure it is monotonic here @@ -133,7 +133,7 @@ public CompletableFuture submitPayload(final CommitPayload if (x == SubmissionResult.Acknowledged$.MODULE$) { return SubmissionStatus.SUBMITTED; } - if (x == SubmissionResult.NotSupported$.MODULE$) { + if (x instanceof SubmissionResult.SynchronousError) { return SubmissionStatus.REJECTED; } LOG.info("Overloaded {} {} ", cp.getCorrelationId(), x); diff --git a/core/src/main/java/com/blockchaintp/daml/participant/Participant.java b/core/src/main/java/com/blockchaintp/daml/participant/Participant.java index 1b421166..4069859b 100644 --- a/core/src/main/java/com/blockchaintp/daml/participant/Participant.java +++ b/core/src/main/java/com/blockchaintp/daml/participant/Participant.java @@ -1,5 +1,5 @@ /* - * Copyright 2021 Blockchain Technology Partners + * Copyright 2021-2022 Blockchain Technology Partners * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -25,15 +25,17 @@ import com.blockchaintp.daml.address.Identifier; import com.blockchaintp.daml.address.LedgerAddress; import com.blockchaintp.daml.stores.service.TransactionLogReader; + +import com.daml.ledger.offset.Offset; +import com.daml.ledger.participant.state.kvutils.KVOffsetBuilder; import com.daml.ledger.api.health.HealthStatus; -import com.daml.ledger.participant.state.kvutils.OffsetBuilder; +import com.daml.ledger.participant.state.kvutils.KVOffsetBuilder$; import com.daml.ledger.participant.state.kvutils.Raw; import com.daml.ledger.participant.state.kvutils.api.CommitMetadata; import com.daml.ledger.participant.state.kvutils.api.LedgerReader; import com.daml.ledger.participant.state.kvutils.api.LedgerRecord; import com.daml.ledger.participant.state.kvutils.api.LedgerWriter; -import com.daml.ledger.participant.state.v1.Offset; -import com.daml.ledger.participant.state.v1.SubmissionResult; +import com.daml.ledger.participant.state.v2.SubmissionResult; import com.daml.platform.akkastreams.dispatcher.Dispatcher; import com.daml.platform.akkastreams.dispatcher.SubSource.RangeSource; import com.daml.telemetry.TelemetryContext; @@ -65,6 +67,7 @@ public final class Participant im private final BlockingDeque> submissions = new LinkedBlockingDeque<>(); private final ExecutionContextExecutor context; private final ScheduledExecutorService pollExecutor; + private final KVOffsetBuilder offsetBuilder = new KVOffsetBuilder((byte) 0); /** * Convenience method for creating a builder. @@ -131,16 +134,16 @@ public HealthStatus currentHealth() { public akka.stream.scaladsl.Source events(final Option startExclusive) { LOG.info("Get from {}", () -> startExclusive); - var start = OffsetBuilder.fromLong(0L, 0, 0); + var start = offsetBuilder.of(0L, 0, 0); Ordering scalaLongOrdering = Ordering.comparatorToOrdering(Comparator.comparingLong(scala.Long::unbox)); var rangeSource = new RangeSource<>((s, e) -> Source .fromJavaStream(() -> API.unchecked(() -> txLog.from(s - 1, Optional.of(e - 1))).apply() - .map(r -> Tuple2.apply(r._1, LedgerRecord.apply(OffsetBuilder.fromLong(r._1, 0, 0), r._2, r._3)))) + .map(r -> Tuple2.apply(r._1, LedgerRecord.apply(offsetBuilder.of(r._1, 0, 0), r._2, r._3)))) .mapMaterializedValue(m -> NotUsed.notUsed()).asScala(), scalaLongOrdering); - var offset = OffsetBuilder.highestIndex(startExclusive.getOrElse(() -> start)); + var offset = offsetBuilder.highestIndex(startExclusive.getOrElse(() -> start)); return dispatcher.startingAt(offset, rangeSource, Option.empty()).asJava().map(x -> { LOG.debug("Yield log {} {}", x._1, x._2); return x; diff --git a/core/src/main/java/com/blockchaintp/daml/participant/StateAccess.java b/core/src/main/java/com/blockchaintp/daml/participant/StateAccess.java index ad9aef7b..5ecd5f86 100644 --- a/core/src/main/java/com/blockchaintp/daml/participant/StateAccess.java +++ b/core/src/main/java/com/blockchaintp/daml/participant/StateAccess.java @@ -1,5 +1,5 @@ /* - * Copyright 2021 Blockchain Technology Partners + * Copyright 2021-2022 Blockchain Technology Partners * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -28,6 +28,7 @@ import com.daml.ledger.validator.BatchingLedgerStateOperations; import com.daml.ledger.validator.LedgerStateAccess; import com.daml.ledger.validator.LedgerStateOperations; +import com.daml.logging.LoggingContext; import kr.pe.kwonnam.slf4jlambda.LambdaLogger; import kr.pe.kwonnam.slf4jlambda.LambdaLoggerFactory; @@ -65,7 +66,7 @@ private class Operations extends BatchingLedgerStateOperations { @Override public Future>> readState(final Iterable keys, - final ExecutionContext executionContext) { + final ExecutionContext executionContext, final LoggingContext loggingContext) { var syncFuture = Future.fromTry(Try.apply(Functions.uncheckFn(() -> { var sparseInputs = StreamConverters$.MODULE$.asJavaParStream(keys) @@ -87,7 +88,7 @@ public Future>> readState(final Iterable @Override public Future writeState(final Iterable> keyValuePairs, - final ExecutionContext executionContext) { + final ExecutionContext executionContext, final LoggingContext loggingContext) { LOG.debug("Write state {}", keyValuePairs); var syncFuture = Future.fromTry(Try.apply(Functions.uncheckFn(() -> { stateStore.put(new ArrayList<>(StreamConverters.asJavaParStream(keyValuePairs) @@ -108,7 +109,7 @@ public Future writeState(final Iterable appendToLog(final Raw.LogEntryId key, final Raw.Envelope value, - final ExecutionContext executionContext) { + final ExecutionContext executionContext, final LoggingContext loggingContext) { var work = sequenceAllocation.serialisedBegin(key) .thenCompose(seq -> CompletableFuture.supplyAsync(() -> Functions.uncheckFn(() -> { @@ -124,7 +125,7 @@ public Future appendToLog(final Raw.LogEntryId key, final Raw.Envelope val @Override public Future inTransaction(final Function1, Future> body, - final ExecutionContext executionContext) { + final ExecutionContext executionContext, final LoggingContext loggingContext) { return Future.delegate(() -> body.apply(new Operations()), executionContext); } } diff --git a/core/src/main/java/com/blockchaintp/utility/UuidConverter.java b/core/src/main/java/com/blockchaintp/utility/UuidConverter.java index c071838e..52dbe92a 100644 --- a/core/src/main/java/com/blockchaintp/utility/UuidConverter.java +++ b/core/src/main/java/com/blockchaintp/utility/UuidConverter.java @@ -1,5 +1,5 @@ /* - * Copyright 2021 Blockchain Technology Partners + * Copyright 2021-2022 Blockchain Technology Partners * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -16,8 +16,8 @@ import java.nio.ByteBuffer; import java.util.UUID; -import com.daml.ledger.participant.state.kvutils.DamlKvutils; import com.daml.ledger.participant.state.kvutils.Raw; +import com.daml.ledger.participant.state.kvutils.store.DamlLogEntryId; import com.google.protobuf.ByteString; import io.vavr.API; @@ -37,9 +37,7 @@ private UuidConverter() { * @return The UUID encoded as the entry id from the log id. */ public static UUID logEntryToUuid(final Raw.LogEntryId id) { - return API - .unchecked( - () -> UuidConverter.asUuid(DamlKvutils.DamlLogEntryId.parseFrom(id.bytes()).getEntryId().toByteArray())) + return API.unchecked(() -> UuidConverter.asUuid(DamlLogEntryId.parseFrom(id.bytes()).getEntryId().toByteArray())) .apply(); } @@ -49,8 +47,8 @@ public static UUID logEntryToUuid(final Raw.LogEntryId id) { * @return A log entry id with the encoded UUID as its entry id. */ public static Raw.LogEntryId uuidtoLogEntry(final UUID id) { - return Raw.LogEntryId$.MODULE$.apply( - DamlKvutils.DamlLogEntryId.newBuilder().setEntryId(ByteString.copyFrom(UuidConverter.asBytes(id))).build()); + return Raw.LogEntryId$.MODULE$ + .apply(DamlLogEntryId.newBuilder().setEntryId(ByteString.copyFrom(UuidConverter.asBytes(id))).build()); } /** diff --git a/core/src/main/scala/com/blockchaintp/daml/runtime/BuilderLedgerFactory.scala b/core/src/main/scala/com/blockchaintp/daml/runtime/BuilderLedgerFactory.scala index cc7fdca4..f40abcac 100644 --- a/core/src/main/scala/com/blockchaintp/daml/runtime/BuilderLedgerFactory.scala +++ b/core/src/main/scala/com/blockchaintp/daml/runtime/BuilderLedgerFactory.scala @@ -1,5 +1,5 @@ /* - * Copyright 2021 Blockchain Technology Partners + * Copyright 2021-2022 Blockchain Technology Partners * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -49,7 +49,8 @@ abstract class BuilderLedgerFactory[ } yield new KeyValueParticipantState( readerWriter, readerWriter, - metrics + metrics, + false ) } diff --git a/core/src/main/scala/com/blockchaintp/daml/runtime/ParticipantOwner.scala b/core/src/main/scala/com/blockchaintp/daml/runtime/ParticipantOwner.scala index 21cee3dc..c042b735 100644 --- a/core/src/main/scala/com/blockchaintp/daml/runtime/ParticipantOwner.scala +++ b/core/src/main/scala/com/blockchaintp/daml/runtime/ParticipantOwner.scala @@ -1,5 +1,5 @@ /* - * Copyright 2021 Blockchain Technology Partners + * Copyright 2021-2022 Blockchain Technology Partners * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -18,6 +18,7 @@ import com.blockchaintp.daml.address.LedgerAddress import com.blockchaintp.daml.participant.Participant import com.blockchaintp.daml.participant.ParticipantBuilder import com.daml.ledger.api.domain.LedgerId +import com.daml.ledger.participant.state.index.v2.LedgerConfiguration import com.daml.ledger.participant.state.kvutils.app.Config import com.daml.ledger.resources.Resource import com.daml.ledger.resources.ResourceContext @@ -26,7 +27,6 @@ import com.daml.lf.data.Ref.ParticipantId import com.daml.lf.engine.Engine import com.daml.logging.LoggingContext import com.daml.metrics.Metrics -import com.daml.platform.configuration.LedgerConfiguration import com.daml.resources import java.util.concurrent.Executors diff --git a/core/src/test/java/com/blockchaintp/daml/stores/layers/CoercingTest.java b/core/src/test/java/com/blockchaintp/daml/stores/layers/CoercingTest.java index 3cd8274f..82bab1e7 100644 --- a/core/src/test/java/com/blockchaintp/daml/stores/layers/CoercingTest.java +++ b/core/src/test/java/com/blockchaintp/daml/stores/layers/CoercingTest.java @@ -19,9 +19,12 @@ import com.blockchaintp.daml.stores.exception.StoreWriteException; import com.blockchaintp.daml.stores.service.Key; import com.blockchaintp.daml.stores.service.Value; -import com.daml.ledger.participant.state.kvutils.DamlKvutils; -import com.daml.ledger.participant.state.v1.Offset; -import com.daml.ledger.participant.state.v1.Offset$; +import com.daml.ledger.offset.Offset; +import com.daml.ledger.offset.Offset$; +import com.daml.ledger.participant.state.kvutils.store.DamlLogEntry; +import com.daml.ledger.participant.state.kvutils.store.DamlLogEntryId; +import com.daml.ledger.participant.state.kvutils.store.DamlStateKey; +import com.daml.ledger.participant.state.kvutils.store.DamlStateValue; import com.google.common.primitives.Longs; import com.google.protobuf.ByteString; import io.vavr.API; @@ -38,14 +41,14 @@ void store_coercion() throws StoreWriteException, StoreReadException { var stub = new StubStore(); var coerced = CoercingStore.from( - Bijection.of((DamlKvutils.DamlStateKey k) -> k.toByteString(), - API.unchecked((ByteString k) -> DamlKvutils.DamlStateKey.parseFrom(k))), - Bijection.of((DamlKvutils.DamlStateValue v) -> v.toByteString(), - API.unchecked((ByteString v) -> DamlKvutils.DamlStateValue.parseFrom(v))), + Bijection.of((DamlStateKey k) -> k.toByteString(), + API.unchecked((ByteString k) -> DamlStateKey.parseFrom(k))), + Bijection.of((DamlStateValue v) -> v.toByteString(), + API.unchecked((ByteString v) -> DamlStateValue.parseFrom(v))), stub); - var k = DamlKvutils.DamlStateKey.newBuilder().setParty("bob").build(); - var v = DamlKvutils.DamlStateValue.newBuilder().build(); + var k = DamlStateKey.newBuilder().setParty("bob").build(); + var v = DamlStateValue.newBuilder().build(); coerced.put(Key.of(k), Value.of(v)); Assertions.assertArrayEquals(v.toByteArray(), coerced.get(Key.of(k)).get().toNative().toByteArray()); @@ -71,17 +74,17 @@ void txlog_coercion() throws StoreWriteException, StoreReadException { var stub = new StubTransactionLog(); var coerced = CoercingTxLog .from( - Bijection.of((DamlKvutils.DamlLogEntryId k) -> asUuid(k.getEntryId().toByteArray()), - (UUID k) -> DamlKvutils.DamlLogEntryId.newBuilder().setEntryId(ByteString.copyFrom(asBytes(k))) + Bijection.of((DamlLogEntryId k) -> asUuid(k.getEntryId().toByteArray()), + (UUID k) -> DamlLogEntryId.newBuilder().setEntryId(ByteString.copyFrom(asBytes(k))) .build()), - Bijection.of((DamlKvutils.DamlLogEntry v) -> v.toByteString(), - API.unchecked((ByteString v) -> DamlKvutils.DamlLogEntry.parseFrom(v))), + Bijection.of((DamlLogEntry v) -> v.toByteString(), + API.unchecked((ByteString v) -> DamlLogEntry.parseFrom(v))), Bijection.of((Offset i) -> Longs.fromByteArray(i.toByteArray()), (Long i) -> Offset$.MODULE$.fromByteArray(Longs.toByteArray(i))), stub); var id = coerced.begin(Optional.empty()); - var data = DamlKvutils.DamlLogEntry.newBuilder().build(); + var data = DamlLogEntry.newBuilder().build(); coerced.sendEvent(id._1, data); coerced.commit(id._1); diff --git a/pom.xml b/pom.xml index e09010ff..4c558255 100644 --- a/pom.xml +++ b/pom.xml @@ -79,10 +79,10 @@ - 1.13.1-SNAPSHOT + 1.18.3-SNAPSHOT UTF-8 3.17.3 - 1.13.1 + 1.18.3 2.13 diff --git a/qldb/src/main/scala/com/blockchaintp/daml/qldb/Metrics.scala b/qldb/src/main/scala/com/blockchaintp/daml/qldb/Metrics.scala new file mode 100644 index 00000000..e19fe294 --- /dev/null +++ b/qldb/src/main/scala/com/blockchaintp/daml/qldb/Metrics.scala @@ -0,0 +1,62 @@ +/* + * Copyright 2022 Blockchain Technology Partners + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package com.blockchaintp.daml.qldb + +import com.daml.metrics.MetricsReporter + +import scala.concurrent.duration.Duration +import scala.concurrent.duration.NANOSECONDS +import scala.util.Try + +object Metrics { + type Setter[T, B] = (B => B, T) => T + private case class DurationFormat(unwrap: Duration) + + // We're trying to parse the java duration first for backwards compatibility as + // removing it and only supporting the scala duration variant would be a breaking change. + implicit private val scoptDurationFormat: scopt.Read[DurationFormat] = scopt.Read.reads { duration => + Try { + Duration.fromNanos( + java.time.Duration.parse(duration).toNanos + ) + }.orElse(Try { + Duration(duration) + }).flatMap(duration => + Try { + if (!duration.isFinite) + throw new IllegalArgumentException(s"Input duration $duration is not finite") + else DurationFormat(Duration(duration.toNanos, NANOSECONDS)) + } + ).get + } + + def metricsReporterParse[C](parser: scopt.OptionParser[C])( + metricsReporter: Setter[C, Option[MetricsReporter]], + metricsReportingInterval: Setter[C, Duration] + ): Unit = { + import parser.opt + + opt[MetricsReporter]("metrics-reporter") + .action((reporter, config) => metricsReporter(_ => Some(reporter), config)) + .optional() + .text(s"Start a metrics reporter. ${MetricsReporter.cliHint}") + + opt[DurationFormat]("metrics-reporting-interval") + .action((interval, config) => metricsReportingInterval(_ => interval.unwrap, config)) + .optional() + .text("Set metric reporting interval.") + + () + } +}