diff --git a/core/src/main/java/com/blockchaintp/daml/address/Identifier.java b/core/src/main/java/com/blockchaintp/daml/address/Identifier.java index 45e7db5f..6676f372 100644 --- a/core/src/main/java/com/blockchaintp/daml/address/Identifier.java +++ b/core/src/main/java/com/blockchaintp/daml/address/Identifier.java @@ -1,5 +1,5 @@ /* - * Copyright 2021 Blockchain Technology Partners + * Copyright 2021-2022 Blockchain Technology Partners * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -13,7 +13,7 @@ */ package com.blockchaintp.daml.address; -import com.daml.ledger.participant.state.kvutils.DamlKvutils; +import com.daml.ledger.participant.state.kvutils.store.DamlStateKey; /** * A generic identifier. @@ -23,5 +23,5 @@ public interface Identifier { * * @return The state key; */ - DamlKvutils.DamlStateKey toKey(); + DamlStateKey toKey(); } diff --git a/core/src/main/java/com/blockchaintp/daml/address/QldbAddress.java b/core/src/main/java/com/blockchaintp/daml/address/QldbAddress.java index 03a9e53d..e4182246 100644 --- a/core/src/main/java/com/blockchaintp/daml/address/QldbAddress.java +++ b/core/src/main/java/com/blockchaintp/daml/address/QldbAddress.java @@ -1,5 +1,5 @@ /* - * Copyright 2021 Blockchain Technology Partners + * Copyright 2021-2022 Blockchain Technology Partners * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -13,24 +13,24 @@ */ package com.blockchaintp.daml.address; -import com.daml.ledger.participant.state.kvutils.DamlKvutils; +import com.daml.ledger.participant.state.kvutils.store.DamlStateKey; /** * */ public final class QldbAddress implements LedgerAddress { - private final DamlKvutils.DamlStateKey data; + private final DamlStateKey data; /** * * @param theData */ - public QldbAddress(final DamlKvutils.DamlStateKey theData) { + public QldbAddress(final DamlStateKey theData) { data = theData; } @Override - public DamlKvutils.DamlStateKey toKey() { + public DamlStateKey toKey() { return data; } } diff --git a/core/src/main/java/com/blockchaintp/daml/address/QldbIdentifier.java b/core/src/main/java/com/blockchaintp/daml/address/QldbIdentifier.java index b3c16420..9d54197a 100644 --- a/core/src/main/java/com/blockchaintp/daml/address/QldbIdentifier.java +++ b/core/src/main/java/com/blockchaintp/daml/address/QldbIdentifier.java @@ -1,5 +1,5 @@ /* - * Copyright 2021 Blockchain Technology Partners + * Copyright 2021-2022 Blockchain Technology Partners * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -15,24 +15,24 @@ import java.util.Objects; -import com.daml.ledger.participant.state.kvutils.DamlKvutils; +import com.daml.ledger.participant.state.kvutils.store.DamlStateKey; /** * */ public final class QldbIdentifier implements Identifier { - private final DamlKvutils.DamlStateKey data; + private final DamlStateKey data; /** * * @param theData */ - public QldbIdentifier(final DamlKvutils.DamlStateKey theData) { + public QldbIdentifier(final DamlStateKey theData) { data = theData; } @Override - public DamlKvutils.DamlStateKey toKey() { + public DamlStateKey toKey() { return data; } diff --git a/core/src/main/java/com/blockchaintp/daml/participant/CommitPayload.java b/core/src/main/java/com/blockchaintp/daml/participant/CommitPayload.java index c93234ae..681731ba 100644 --- a/core/src/main/java/com/blockchaintp/daml/participant/CommitPayload.java +++ b/core/src/main/java/com/blockchaintp/daml/participant/CommitPayload.java @@ -1,5 +1,5 @@ /* - * Copyright 2021 Blockchain Technology Partners + * Copyright 2021-2022 Blockchain Technology Partners * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at diff --git a/core/src/main/java/com/blockchaintp/daml/participant/InProcLedgerSubmitter.java b/core/src/main/java/com/blockchaintp/daml/participant/InProcLedgerSubmitter.java index 719cbb39..5c3dc22d 100644 --- a/core/src/main/java/com/blockchaintp/daml/participant/InProcLedgerSubmitter.java +++ b/core/src/main/java/com/blockchaintp/daml/participant/InProcLedgerSubmitter.java @@ -1,5 +1,5 @@ /* - * Copyright 2021 Blockchain Technology Partners + * Copyright 2021-2022 Blockchain Technology Partners * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -28,9 +28,9 @@ import com.blockchaintp.utility.Functions; import com.blockchaintp.utility.UuidConverter; import com.daml.api.util.TimeProvider; -import com.daml.ledger.participant.state.kvutils.DamlKvutils; import com.daml.ledger.participant.state.kvutils.Raw; -import com.daml.ledger.participant.state.v1.SubmissionResult; +import com.daml.ledger.participant.state.kvutils.store.DamlLogEntryId; +import com.daml.ledger.participant.state.v2.SubmissionResult; import com.daml.ledger.validator.SubmissionValidator$; import com.daml.ledger.validator.ValidatingCommitter; import com.daml.lf.engine.Engine; @@ -78,8 +78,8 @@ public static InProcLedgerSubmit * @param id * @return A daml log entry id parsed from a daml log entry id. */ - private DamlKvutils.DamlLogEntryId logEntryIdToDamlLogEntryId(final Raw.LogEntryId id) { - var parsed = Functions.uncheckFn(() -> DamlKvutils.DamlLogEntryId.parseFrom(id.bytes())).apply(); + private DamlLogEntryId logEntryIdToDamlLogEntryId(final Raw.LogEntryId id) { + var parsed = Functions.uncheckFn(() -> DamlLogEntryId.parseFrom(id.bytes())).apply(); LOG.info("parse log id {}", () -> parsed.getEntryId().toString()); @@ -107,7 +107,7 @@ public InProcLedgerSubmitter(final Engine theEngine, final Metrics theMetrics, new StateAccess(CoercingStore.from(Bijection.of(Raw.StateKey::bytes, Raw.StateKey$.MODULE$::apply), Bijection.of(Raw.Envelope::bytes, Raw.Envelope$.MODULE$::apply), theStateStore), writer), () -> logEntryIdToDamlLogEntryId(UuidConverter.uuidtoLogEntry(UUID.randomUUID())), false, - new StateCache<>(new LRUCache<>(STATE_CACHE_SIZE)), theEngine, theMetrics, false), + new StateCache<>(new LRUCache<>(STATE_CACHE_SIZE)), theEngine, theMetrics), r -> { /// New head should be end of sequence, i.e. one past the actual head. This should really have a /// nicer type, we ensure it is monotonic here @@ -125,19 +125,20 @@ public InProcLedgerSubmitter(final Engine theEngine, final Metrics theMetrics, } @Override - public CompletableFuture submitPayload(final CommitPayload cp) { + public CompletableFuture submitPayload( + final CommitPayload cp) { return FutureConverters .toJava( this.comitter.commit(cp.getCorrelationId(), cp.getSubmission(), cp.getSubmittingParticipantId(), context)) .thenApply(x -> { if (x == SubmissionResult.Acknowledged$.MODULE$) { - return SubmissionStatus.SUBMITTED; + return com.blockchaintp.daml.participant.SubmissionResult.submitted(); } - if (x == SubmissionResult.NotSupported$.MODULE$) { - return SubmissionStatus.REJECTED; + if (x instanceof SubmissionResult.SynchronousError) { + return com.blockchaintp.daml.participant.SubmissionResult.rejected((SubmissionResult.SynchronousError) x); } LOG.info("Overloaded {} {} ", cp.getCorrelationId(), x); - return SubmissionStatus.OVERLOADED; + return com.blockchaintp.daml.participant.SubmissionResult.overloaded(); }).toCompletableFuture(); } diff --git a/core/src/main/java/com/blockchaintp/daml/participant/LedgerSubmitter.java b/core/src/main/java/com/blockchaintp/daml/participant/LedgerSubmitter.java index d61dfe53..a2e9aa86 100644 --- a/core/src/main/java/com/blockchaintp/daml/participant/LedgerSubmitter.java +++ b/core/src/main/java/com/blockchaintp/daml/participant/LedgerSubmitter.java @@ -1,5 +1,5 @@ /* - * Copyright 2021 Blockchain Technology Partners + * Copyright 2021-2022 Blockchain Technology Partners * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -35,7 +35,7 @@ public interface LedgerSubmitter * the payload to submit. * @return a reference to the submission. */ - CompletableFuture submitPayload(CommitPayload cp); + CompletableFuture submitPayload(CommitPayload cp); /** * For convenience we can translate one the payload to the expected output payload. diff --git a/core/src/main/java/com/blockchaintp/daml/participant/LedgerSubmitterBulkhead.java b/core/src/main/java/com/blockchaintp/daml/participant/LedgerSubmitterBulkhead.java index 0b742caf..fc663c25 100644 --- a/core/src/main/java/com/blockchaintp/daml/participant/LedgerSubmitterBulkhead.java +++ b/core/src/main/java/com/blockchaintp/daml/participant/LedgerSubmitterBulkhead.java @@ -1,5 +1,5 @@ /* - * Copyright 2021 Blockchain Technology Partners + * Copyright 2021-2022 Blockchain Technology Partners * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -80,7 +80,7 @@ public LedgerSubmitterBulkhead(final LedgerSubmitter theInner, final int m } @Override - public CompletableFuture submitPayload(final CommitPayload cp) { + public CompletableFuture submitPayload(final CommitPayload cp) { var executor = ContextAwareScheduledThreadPoolExecutor.newScheduledThreadPool().build(); var retry = Retry.of("Ledger submitter retry", @@ -99,7 +99,7 @@ public CompletableFuture submitPayload(final CommitPayload .withBulkhead(bulkhead) .withFallback( Arrays.asList(TimeoutException.class, CallNotPermittedException.class, BulkheadFullException.class), - throwable -> SubmissionStatus.OVERLOADED) + throwable -> SubmissionResult.overloaded()) .withRetry(retry, executor).get().toCompletableFuture(); } diff --git a/core/src/main/java/com/blockchaintp/daml/participant/Participant.java b/core/src/main/java/com/blockchaintp/daml/participant/Participant.java index 1b421166..0e2588a2 100644 --- a/core/src/main/java/com/blockchaintp/daml/participant/Participant.java +++ b/core/src/main/java/com/blockchaintp/daml/participant/Participant.java @@ -1,5 +1,5 @@ /* - * Copyright 2021 Blockchain Technology Partners + * Copyright 2021-2022 Blockchain Technology Partners * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -13,6 +13,7 @@ */ package com.blockchaintp.daml.participant; +import java.util.ArrayList; import java.util.Comparator; import java.util.Optional; import java.util.concurrent.BlockingDeque; @@ -26,14 +27,14 @@ import com.blockchaintp.daml.address.LedgerAddress; import com.blockchaintp.daml.stores.service.TransactionLogReader; import com.daml.ledger.api.health.HealthStatus; -import com.daml.ledger.participant.state.kvutils.OffsetBuilder; +import com.daml.ledger.offset.Offset; +import com.daml.ledger.participant.state.kvutils.KVOffsetBuilder; import com.daml.ledger.participant.state.kvutils.Raw; import com.daml.ledger.participant.state.kvutils.api.CommitMetadata; import com.daml.ledger.participant.state.kvutils.api.LedgerReader; import com.daml.ledger.participant.state.kvutils.api.LedgerRecord; import com.daml.ledger.participant.state.kvutils.api.LedgerWriter; -import com.daml.ledger.participant.state.v1.Offset; -import com.daml.ledger.participant.state.v1.SubmissionResult; +import com.daml.ledger.participant.state.v2.SubmissionResult; import com.daml.platform.akkastreams.dispatcher.Dispatcher; import com.daml.platform.akkastreams.dispatcher.SubSource.RangeSource; import com.daml.telemetry.TelemetryContext; @@ -62,9 +63,11 @@ public final class Participant im private final String ledgerId; private final String participantId; private final Dispatcher dispatcher; - private final BlockingDeque> submissions = new LinkedBlockingDeque<>(); + private final BlockingDeque> submissions; + private final BlockingDeque> responses; private final ExecutionContextExecutor context; private final ScheduledExecutorService pollExecutor; + private final KVOffsetBuilder offsetBuilder = new KVOffsetBuilder((byte) 0); /** * Convenience method for creating a builder. @@ -103,6 +106,8 @@ public Participant(final TransactionLogReader(); + submissions = new LinkedBlockingDeque<>(); } private void work() { @@ -112,7 +117,9 @@ private void work() { LOG.debug("Commit correlation id {}", next.getCorrelationId()); try { - var result = submitter.submitPayload(next).get(); + com.blockchaintp.daml.participant.SubmissionResult result = submitter.submitPayload(next).get(); + + responses.push(Tuple2.apply(next.getCorrelationId(), result)); LOG.info("Submission result for {} {}", next.getCorrelationId(), result); @@ -131,16 +138,16 @@ public HealthStatus currentHealth() { public akka.stream.scaladsl.Source events(final Option startExclusive) { LOG.info("Get from {}", () -> startExclusive); - var start = OffsetBuilder.fromLong(0L, 0, 0); + var start = offsetBuilder.of(0L, 0, 0); Ordering scalaLongOrdering = Ordering.comparatorToOrdering(Comparator.comparingLong(scala.Long::unbox)); var rangeSource = new RangeSource<>((s, e) -> Source .fromJavaStream(() -> API.unchecked(() -> txLog.from(s - 1, Optional.of(e - 1))).apply() - .map(r -> Tuple2.apply(r._1, LedgerRecord.apply(OffsetBuilder.fromLong(r._1, 0, 0), r._2, r._3)))) + .map(r -> Tuple2.apply(r._1, LedgerRecord.apply(offsetBuilder.of(r._1, 0, 0), r._2, r._3)))) .mapMaterializedValue(m -> NotUsed.notUsed()).asScala(), scalaLongOrdering); - var offset = OffsetBuilder.highestIndex(startExclusive.getOrElse(() -> start)); + var offset = offsetBuilder.highestIndex(startExclusive.getOrElse(() -> start)); return dispatcher.startingAt(offset, rangeSource, Option.empty()).asJava().map(x -> { LOG.debug("Yield log {} {}", x._1, x._2); return x; @@ -161,7 +168,33 @@ public String participantId() { public Future commit(final String correlationId, final Raw.Envelope envelope, final CommitMetadata metadata, final TelemetryContext telemetryContext) { - commitPayloadBuilder.build(envelope, metadata, correlationId).stream().forEach(submissions::add); + final var waitFor = new ArrayList(); + + commitPayloadBuilder.build(envelope, metadata, correlationId).stream().forEach(submission -> { + waitFor.add(submission.getCorrelationId()); + submissions.add(submission); + }); + + SubmissionResult lastError = null; + + while (!waitFor.isEmpty()) { + var head = responses.peek(); + + if (head != null) { + if (waitFor.contains(head._1)) { + waitFor.remove(head._1); + + if (head._2.getStatus() == SubmissionStatus.REJECTED) { + lastError = head._2.getError().get(); + } + } + } + + } + + if (lastError != null) { + return Future.successful(lastError); + } return Future.successful(SubmissionResult.Acknowledged$.MODULE$); } diff --git a/core/src/main/java/com/blockchaintp/daml/participant/StateAccess.java b/core/src/main/java/com/blockchaintp/daml/participant/StateAccess.java index ad9aef7b..5ecd5f86 100644 --- a/core/src/main/java/com/blockchaintp/daml/participant/StateAccess.java +++ b/core/src/main/java/com/blockchaintp/daml/participant/StateAccess.java @@ -1,5 +1,5 @@ /* - * Copyright 2021 Blockchain Technology Partners + * Copyright 2021-2022 Blockchain Technology Partners * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -28,6 +28,7 @@ import com.daml.ledger.validator.BatchingLedgerStateOperations; import com.daml.ledger.validator.LedgerStateAccess; import com.daml.ledger.validator.LedgerStateOperations; +import com.daml.logging.LoggingContext; import kr.pe.kwonnam.slf4jlambda.LambdaLogger; import kr.pe.kwonnam.slf4jlambda.LambdaLoggerFactory; @@ -65,7 +66,7 @@ private class Operations extends BatchingLedgerStateOperations { @Override public Future>> readState(final Iterable keys, - final ExecutionContext executionContext) { + final ExecutionContext executionContext, final LoggingContext loggingContext) { var syncFuture = Future.fromTry(Try.apply(Functions.uncheckFn(() -> { var sparseInputs = StreamConverters$.MODULE$.asJavaParStream(keys) @@ -87,7 +88,7 @@ public Future>> readState(final Iterable @Override public Future writeState(final Iterable> keyValuePairs, - final ExecutionContext executionContext) { + final ExecutionContext executionContext, final LoggingContext loggingContext) { LOG.debug("Write state {}", keyValuePairs); var syncFuture = Future.fromTry(Try.apply(Functions.uncheckFn(() -> { stateStore.put(new ArrayList<>(StreamConverters.asJavaParStream(keyValuePairs) @@ -108,7 +109,7 @@ public Future writeState(final Iterable appendToLog(final Raw.LogEntryId key, final Raw.Envelope value, - final ExecutionContext executionContext) { + final ExecutionContext executionContext, final LoggingContext loggingContext) { var work = sequenceAllocation.serialisedBegin(key) .thenCompose(seq -> CompletableFuture.supplyAsync(() -> Functions.uncheckFn(() -> { @@ -124,7 +125,7 @@ public Future appendToLog(final Raw.LogEntryId key, final Raw.Envelope val @Override public Future inTransaction(final Function1, Future> body, - final ExecutionContext executionContext) { + final ExecutionContext executionContext, final LoggingContext loggingContext) { return Future.delegate(() -> body.apply(new Operations()), executionContext); } } diff --git a/core/src/main/java/com/blockchaintp/daml/participant/SubmissionResult.java b/core/src/main/java/com/blockchaintp/daml/participant/SubmissionResult.java new file mode 100644 index 00000000..dc8b52ed --- /dev/null +++ b/core/src/main/java/com/blockchaintp/daml/participant/SubmissionResult.java @@ -0,0 +1,107 @@ +/* + * Copyright 2022 Blockchain Technology Partners + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package com.blockchaintp.daml.participant; + +import java.util.Optional; + +/** + * The type Submission result. + */ +public class SubmissionResult { + private final SubmissionStatus status; + + /** + * Gets status. + * + * @return the status + */ + public SubmissionStatus getStatus() { + return status; + } + + /** + * Gets error. + * + * @return the synchronous error + */ + public Optional getError() { + return error; + } + + private final Optional error; + + /** + * Instantiates a new Submission result. + * + * @param theStatus + * the the status + * @param theError + * the the error + */ + public SubmissionResult(final SubmissionStatus theStatus, + final Optional theError) { + status = theStatus; + error = theError; + } + + /** + * Submitted submission result. + * + * @return the submission result + */ + public static SubmissionResult submitted() { + return new SubmissionResult(SubmissionStatus.SUBMITTED, Optional.empty()); + } + + /** + * The participant cannot currently process this submission. + * + * @return the submission result + */ + public static SubmissionResult overloaded() { + return new SubmissionResult(SubmissionStatus.OVERLOADED, Optional.empty()); + } + + /** + * The participant has not yet submitted a proposal. + * + * @return the submission result + */ + public static SubmissionResult enqueued() { + return new SubmissionResult(SubmissionStatus.ENQUEUED, Optional.empty()); + } + + /** + * The participant has submitted a proposal, but it has not yet been fully accepted. This is + * normally only applicable to multipart submissions + * + * @return the submission result + */ + public static SubmissionResult partiallySubmitted() { + return new SubmissionResult(SubmissionStatus.PARTIALLY_SUBMITTED, Optional.empty()); + } + + /** + * The participant has submitted a proposal, and it has been rejected. This is a terminal state and + * is normally quite immediate. + * + * @param error + * the error + * @return the submission result + */ + public static SubmissionResult rejected( + final com.daml.ledger.participant.state.v2.SubmissionResult.SynchronousError error) { + return new SubmissionResult(SubmissionStatus.REJECTED, Optional.of(error)); + } +} diff --git a/core/src/main/java/com/blockchaintp/daml/participant/SubmissionStatus.java b/core/src/main/java/com/blockchaintp/daml/participant/SubmissionStatus.java index f1675535..8425c6d8 100644 --- a/core/src/main/java/com/blockchaintp/daml/participant/SubmissionStatus.java +++ b/core/src/main/java/com/blockchaintp/daml/participant/SubmissionStatus.java @@ -1,5 +1,5 @@ /* - * Copyright 2021 Blockchain Technology Partners + * Copyright 2021-2022 Blockchain Technology Partners * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at diff --git a/core/src/main/java/com/blockchaintp/utility/UuidConverter.java b/core/src/main/java/com/blockchaintp/utility/UuidConverter.java index c071838e..52dbe92a 100644 --- a/core/src/main/java/com/blockchaintp/utility/UuidConverter.java +++ b/core/src/main/java/com/blockchaintp/utility/UuidConverter.java @@ -1,5 +1,5 @@ /* - * Copyright 2021 Blockchain Technology Partners + * Copyright 2021-2022 Blockchain Technology Partners * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -16,8 +16,8 @@ import java.nio.ByteBuffer; import java.util.UUID; -import com.daml.ledger.participant.state.kvutils.DamlKvutils; import com.daml.ledger.participant.state.kvutils.Raw; +import com.daml.ledger.participant.state.kvutils.store.DamlLogEntryId; import com.google.protobuf.ByteString; import io.vavr.API; @@ -37,9 +37,7 @@ private UuidConverter() { * @return The UUID encoded as the entry id from the log id. */ public static UUID logEntryToUuid(final Raw.LogEntryId id) { - return API - .unchecked( - () -> UuidConverter.asUuid(DamlKvutils.DamlLogEntryId.parseFrom(id.bytes()).getEntryId().toByteArray())) + return API.unchecked(() -> UuidConverter.asUuid(DamlLogEntryId.parseFrom(id.bytes()).getEntryId().toByteArray())) .apply(); } @@ -49,8 +47,8 @@ public static UUID logEntryToUuid(final Raw.LogEntryId id) { * @return A log entry id with the encoded UUID as its entry id. */ public static Raw.LogEntryId uuidtoLogEntry(final UUID id) { - return Raw.LogEntryId$.MODULE$.apply( - DamlKvutils.DamlLogEntryId.newBuilder().setEntryId(ByteString.copyFrom(UuidConverter.asBytes(id))).build()); + return Raw.LogEntryId$.MODULE$ + .apply(DamlLogEntryId.newBuilder().setEntryId(ByteString.copyFrom(UuidConverter.asBytes(id))).build()); } /** diff --git a/core/src/main/scala/com/blockchaintp/daml/runtime/BuilderLedgerFactory.scala b/core/src/main/scala/com/blockchaintp/daml/runtime/BuilderLedgerFactory.scala index cc7fdca4..21d87994 100644 --- a/core/src/main/scala/com/blockchaintp/daml/runtime/BuilderLedgerFactory.scala +++ b/core/src/main/scala/com/blockchaintp/daml/runtime/BuilderLedgerFactory.scala @@ -1,5 +1,5 @@ /* - * Copyright 2021 Blockchain Technology Partners + * Copyright 2021-2022 Blockchain Technology Partners * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -17,6 +17,9 @@ import akka.stream.Materializer import com.blockchaintp.daml.address.Identifier import com.blockchaintp.daml.address.LedgerAddress import com.blockchaintp.daml.participant.ParticipantBuilder +import com.daml.ledger.api.v1.admin.config_management_service.TimeModel +import com.daml.ledger.configuration.Configuration +import com.daml.ledger.configuration.LedgerTimeModel import com.daml.ledger.participant.state.kvutils.api.KeyValueLedger import com.daml.ledger.participant.state.kvutils.api.KeyValueParticipantState import com.daml.ledger.participant.state.kvutils.app.Config @@ -26,6 +29,9 @@ import com.daml.ledger.resources.ResourceOwner import com.daml.lf.engine.Engine import com.daml.logging.LoggingContext import com.daml.metrics.Metrics +import com.daml.platform.configuration.InitialLedgerConfiguration + +import java.time.Duration abstract class BuilderLedgerFactory[ Id <: Identifier, @@ -49,10 +55,25 @@ abstract class BuilderLedgerFactory[ } yield new KeyValueParticipantState( readerWriter, readerWriter, - metrics + metrics, + false ) } + override def initialLedgerConfig(config: Config[ExtraConfig]): InitialLedgerConfiguration = + InitialLedgerConfiguration( + configuration = Configuration( + generation = 1, + timeModel = LedgerTimeModel( + avgTransactionLatency = Duration.ofSeconds(1L), + minSkew = Duration.ofSeconds(40L), + maxSkew = Duration.ofSeconds(80L) + ).get, + maxDeduplicationTime = Duration.ofDays(1) + ), + delayBeforeSubmitting = Duration.ofSeconds(5) + ) + def owner( config: Config[ExtraConfig], metrics: Metrics, @@ -64,7 +85,7 @@ abstract class BuilderLedgerFactory[ logCtx: LoggingContext ): ResourceOwner[KeyValueLedger] = { new ParticipantOwner( - ledgerConfig(config), + initialLedgerConfig(config), engine, metrics, logCtx, diff --git a/core/src/main/scala/com/blockchaintp/daml/runtime/ParticipantOwner.scala b/core/src/main/scala/com/blockchaintp/daml/runtime/ParticipantOwner.scala index 21cee3dc..dfcf12f8 100644 --- a/core/src/main/scala/com/blockchaintp/daml/runtime/ParticipantOwner.scala +++ b/core/src/main/scala/com/blockchaintp/daml/runtime/ParticipantOwner.scala @@ -1,5 +1,5 @@ /* - * Copyright 2021 Blockchain Technology Partners + * Copyright 2021-2022 Blockchain Technology Partners * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -17,7 +17,6 @@ import com.blockchaintp.daml.address.Identifier import com.blockchaintp.daml.address.LedgerAddress import com.blockchaintp.daml.participant.Participant import com.blockchaintp.daml.participant.ParticipantBuilder -import com.daml.ledger.api.domain.LedgerId import com.daml.ledger.participant.state.kvutils.app.Config import com.daml.ledger.resources.Resource import com.daml.ledger.resources.ResourceContext @@ -26,14 +25,14 @@ import com.daml.lf.data.Ref.ParticipantId import com.daml.lf.engine.Engine import com.daml.logging.LoggingContext import com.daml.metrics.Metrics -import com.daml.platform.configuration.LedgerConfiguration +import com.daml.platform.configuration.InitialLedgerConfiguration import com.daml.resources import java.util.concurrent.Executors import scala.{concurrent => sc} class ParticipantOwner[ExtraConfig, Id <: Identifier, Address <: LedgerAddress]( - val ledgerConfig: LedgerConfiguration, + val ledgerConfig: InitialLedgerConfiguration, val engine: Engine, val metrics: Metrics, val logCtx: LoggingContext, diff --git a/core/src/test/java/com/blockchaintp/daml/stores/layers/CoercingTest.java b/core/src/test/java/com/blockchaintp/daml/stores/layers/CoercingTest.java index 3cd8274f..72800a5a 100644 --- a/core/src/test/java/com/blockchaintp/daml/stores/layers/CoercingTest.java +++ b/core/src/test/java/com/blockchaintp/daml/stores/layers/CoercingTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2021 Blockchain Technology Partners + * Copyright 2021-2022 Blockchain Technology Partners * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -19,9 +19,12 @@ import com.blockchaintp.daml.stores.exception.StoreWriteException; import com.blockchaintp.daml.stores.service.Key; import com.blockchaintp.daml.stores.service.Value; -import com.daml.ledger.participant.state.kvutils.DamlKvutils; -import com.daml.ledger.participant.state.v1.Offset; -import com.daml.ledger.participant.state.v1.Offset$; +import com.daml.ledger.offset.Offset; +import com.daml.ledger.offset.Offset$; +import com.daml.ledger.participant.state.kvutils.store.DamlLogEntry; +import com.daml.ledger.participant.state.kvutils.store.DamlLogEntryId; +import com.daml.ledger.participant.state.kvutils.store.DamlStateKey; +import com.daml.ledger.participant.state.kvutils.store.DamlStateValue; import com.google.common.primitives.Longs; import com.google.protobuf.ByteString; import io.vavr.API; @@ -38,14 +41,13 @@ void store_coercion() throws StoreWriteException, StoreReadException { var stub = new StubStore(); var coerced = CoercingStore.from( - Bijection.of((DamlKvutils.DamlStateKey k) -> k.toByteString(), - API.unchecked((ByteString k) -> DamlKvutils.DamlStateKey.parseFrom(k))), - Bijection.of((DamlKvutils.DamlStateValue v) -> v.toByteString(), - API.unchecked((ByteString v) -> DamlKvutils.DamlStateValue.parseFrom(v))), + Bijection.of((DamlStateKey k) -> k.toByteString(), API.unchecked((ByteString k) -> DamlStateKey.parseFrom(k))), + Bijection.of((DamlStateValue v) -> v.toByteString(), + API.unchecked((ByteString v) -> DamlStateValue.parseFrom(v))), stub); - var k = DamlKvutils.DamlStateKey.newBuilder().setParty("bob").build(); - var v = DamlKvutils.DamlStateValue.newBuilder().build(); + var k = DamlStateKey.newBuilder().setParty("bob").build(); + var v = DamlStateValue.newBuilder().build(); coerced.put(Key.of(k), Value.of(v)); Assertions.assertArrayEquals(v.toByteArray(), coerced.get(Key.of(k)).get().toNative().toByteArray()); @@ -69,19 +71,16 @@ private static byte[] asBytes(final UUID uuid) { @Test void txlog_coercion() throws StoreWriteException, StoreReadException { var stub = new StubTransactionLog(); - var coerced = CoercingTxLog - .from( - Bijection.of((DamlKvutils.DamlLogEntryId k) -> asUuid(k.getEntryId().toByteArray()), - (UUID k) -> DamlKvutils.DamlLogEntryId.newBuilder().setEntryId(ByteString.copyFrom(asBytes(k))) - .build()), - Bijection.of((DamlKvutils.DamlLogEntry v) -> v.toByteString(), - API.unchecked((ByteString v) -> DamlKvutils.DamlLogEntry.parseFrom(v))), - Bijection.of((Offset i) -> Longs.fromByteArray(i.toByteArray()), - (Long i) -> Offset$.MODULE$.fromByteArray(Longs.toByteArray(i))), - stub); + var coerced = CoercingTxLog.from( + Bijection.of((DamlLogEntryId k) -> asUuid(k.getEntryId().toByteArray()), + (UUID k) -> DamlLogEntryId.newBuilder().setEntryId(ByteString.copyFrom(asBytes(k))).build()), + Bijection.of((DamlLogEntry v) -> v.toByteString(), API.unchecked((ByteString v) -> DamlLogEntry.parseFrom(v))), + Bijection.of((Offset i) -> Longs.fromByteArray(i.toByteArray()), + (Long i) -> Offset$.MODULE$.fromByteArray(Longs.toByteArray(i))), + stub); var id = coerced.begin(Optional.empty()); - var data = DamlKvutils.DamlLogEntry.newBuilder().build(); + var data = DamlLogEntry.newBuilder().build(); coerced.sendEvent(id._1, data); coerced.commit(id._1); diff --git a/docker/ledger-api-testtool.docker b/docker/ledger-api-testtool.docker index e891c287..41687ba5 100644 --- a/docker/ledger-api-testtool.docker +++ b/docker/ledger-api-testtool.docker @@ -13,7 +13,7 @@ # limitations under the License. FROM ubuntu:bionic -ARG DAML_SDK_VERSION=1.13.1 +ARG DAML_SDK_VERSION=1.18.3 RUN apt-get update -y && \ apt-get upgrade -y && \ diff --git a/pom.xml b/pom.xml index e09010ff..4c558255 100644 --- a/pom.xml +++ b/pom.xml @@ -79,10 +79,10 @@ - 1.13.1-SNAPSHOT + 1.18.3-SNAPSHOT UTF-8 3.17.3 - 1.13.1 + 1.18.3 2.13 diff --git a/postgres/src/main/scala/com/blockchaintp/daml/postgres/Main.scala b/postgres/src/main/scala/com/blockchaintp/daml/postgres/Main.scala index ec1eaeb6..63834b81 100644 --- a/postgres/src/main/scala/com/blockchaintp/daml/postgres/Main.scala +++ b/postgres/src/main/scala/com/blockchaintp/daml/postgres/Main.scala @@ -1,5 +1,5 @@ /* - * Copyright 2021 Blockchain Technology Partners + * Copyright 2021-2022 Blockchain Technology Partners * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -24,6 +24,7 @@ import com.daml.jwt.RSA256Verifier import com.daml.ledger.api.auth.AuthService import com.daml.ledger.api.auth.AuthServiceJWT import com.daml.ledger.api.auth.AuthServiceWildcard +import com.daml.ledger.api.v1.ledger_configuration_service.LedgerConfiguration import com.daml.ledger.participant.state.kvutils.api.CommitMetadata import com.daml.ledger.participant.state.kvutils.app.Config import com.daml.ledger.participant.state.kvutils.app.ParticipantConfig @@ -31,14 +32,12 @@ import com.daml.ledger.participant.state.kvutils.app.Runner import com.daml.ledger.resources.ResourceContext import com.daml.ledger.validator.DefaultStateKeySerializationStrategy import com.daml.platform.configuration.CommandConfiguration -import com.daml.platform.configuration.LedgerConfiguration import com.daml.resources.ProgramResource import scopt.OptionParser import scala.jdk.CollectionConverters._ import java.nio.file.Paths -import scala.concurrent.duration.DurationInt -import scala.concurrent.duration.FiniteDuration +import java.time.Duration import scala.jdk.FunctionConverters.enrichAsJavaFunction import scala.util.Try @@ -113,26 +112,8 @@ class LedgerFactory( } } - override def ledgerConfig(config: Config[ExtraConfig]): LedgerConfiguration = - LedgerConfiguration.defaultLocalLedger - override val defaultExtraConfig: ExtraConfig = ExtraConfig.default - override def commandConfig( - participantConfig: ParticipantConfig, - config: Config[ExtraConfig] - ): CommandConfiguration = { - val DefaultTrackerRetentionPeriod: FiniteDuration = 5.minutes - - CommandConfiguration( - inputBufferSize = 512, - maxParallelSubmissions = 1, - maxCommandsInFlight = 256, - limitMaxCommandsInFlight = true, - retentionPeriod = DefaultTrackerRetentionPeriod - ) - } - private def validatePath(path: String, message: String) = { val valid = Try(Paths.get(path).toFile.canRead).getOrElse(false) if (valid) Right(()) else Left(message) diff --git a/qldb/src/main/scala/com/blockchaintp/daml/qldb/Metrics.scala b/qldb/src/main/scala/com/blockchaintp/daml/qldb/Metrics.scala new file mode 100644 index 00000000..e19fe294 --- /dev/null +++ b/qldb/src/main/scala/com/blockchaintp/daml/qldb/Metrics.scala @@ -0,0 +1,62 @@ +/* + * Copyright 2022 Blockchain Technology Partners + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package com.blockchaintp.daml.qldb + +import com.daml.metrics.MetricsReporter + +import scala.concurrent.duration.Duration +import scala.concurrent.duration.NANOSECONDS +import scala.util.Try + +object Metrics { + type Setter[T, B] = (B => B, T) => T + private case class DurationFormat(unwrap: Duration) + + // We're trying to parse the java duration first for backwards compatibility as + // removing it and only supporting the scala duration variant would be a breaking change. + implicit private val scoptDurationFormat: scopt.Read[DurationFormat] = scopt.Read.reads { duration => + Try { + Duration.fromNanos( + java.time.Duration.parse(duration).toNanos + ) + }.orElse(Try { + Duration(duration) + }).flatMap(duration => + Try { + if (!duration.isFinite) + throw new IllegalArgumentException(s"Input duration $duration is not finite") + else DurationFormat(Duration(duration.toNanos, NANOSECONDS)) + } + ).get + } + + def metricsReporterParse[C](parser: scopt.OptionParser[C])( + metricsReporter: Setter[C, Option[MetricsReporter]], + metricsReportingInterval: Setter[C, Duration] + ): Unit = { + import parser.opt + + opt[MetricsReporter]("metrics-reporter") + .action((reporter, config) => metricsReporter(_ => Some(reporter), config)) + .optional() + .text(s"Start a metrics reporter. ${MetricsReporter.cliHint}") + + opt[DurationFormat]("metrics-reporting-interval") + .action((interval, config) => metricsReportingInterval(_ => interval.unwrap, config)) + .optional() + .text("Set metric reporting interval.") + + () + } +}