From fc91130d4908f5bfbd22cd1143cfa485ef064cd1 Mon Sep 17 00:00:00 2001 From: Ryan Date: Mon, 25 Jul 2022 17:31:58 +0100 Subject: [PATCH 1/9] Update to 1.18.3, local tests passing Signed-off-by: Ryan --- .../blockchaintp/daml/address/Identifier.java | 6 +- .../daml/address/QldbAddress.java | 10 +-- .../daml/address/QldbIdentifier.java | 10 +-- .../daml/participant/CommitPayload.java | 3 +- .../participant/InProcLedgerSubmitter.java | 14 ++--- .../daml/participant/Participant.java | 17 ++--- .../daml/participant/StateAccess.java | 11 ++-- .../blockchaintp/utility/UuidConverter.java | 12 ++-- .../daml/runtime/BuilderLedgerFactory.scala | 5 +- .../daml/runtime/ParticipantOwner.scala | 4 +- .../daml/stores/layers/CoercingTest.java | 31 +++++----- pom.xml | 4 +- .../com/blockchaintp/daml/qldb/Metrics.scala | 62 +++++++++++++++++++ 13 files changed, 129 insertions(+), 60 deletions(-) create mode 100644 qldb/src/main/scala/com/blockchaintp/daml/qldb/Metrics.scala diff --git a/core/src/main/java/com/blockchaintp/daml/address/Identifier.java b/core/src/main/java/com/blockchaintp/daml/address/Identifier.java index 45e7db5f..6676f372 100644 --- a/core/src/main/java/com/blockchaintp/daml/address/Identifier.java +++ b/core/src/main/java/com/blockchaintp/daml/address/Identifier.java @@ -1,5 +1,5 @@ /* - * Copyright 2021 Blockchain Technology Partners + * Copyright 2021-2022 Blockchain Technology Partners * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -13,7 +13,7 @@ */ package com.blockchaintp.daml.address; -import com.daml.ledger.participant.state.kvutils.DamlKvutils; +import com.daml.ledger.participant.state.kvutils.store.DamlStateKey; /** * A generic identifier. @@ -23,5 +23,5 @@ public interface Identifier { * * @return The state key; */ - DamlKvutils.DamlStateKey toKey(); + DamlStateKey toKey(); } diff --git a/core/src/main/java/com/blockchaintp/daml/address/QldbAddress.java b/core/src/main/java/com/blockchaintp/daml/address/QldbAddress.java index 03a9e53d..e4182246 100644 --- a/core/src/main/java/com/blockchaintp/daml/address/QldbAddress.java +++ b/core/src/main/java/com/blockchaintp/daml/address/QldbAddress.java @@ -1,5 +1,5 @@ /* - * Copyright 2021 Blockchain Technology Partners + * Copyright 2021-2022 Blockchain Technology Partners * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -13,24 +13,24 @@ */ package com.blockchaintp.daml.address; -import com.daml.ledger.participant.state.kvutils.DamlKvutils; +import com.daml.ledger.participant.state.kvutils.store.DamlStateKey; /** * */ public final class QldbAddress implements LedgerAddress { - private final DamlKvutils.DamlStateKey data; + private final DamlStateKey data; /** * * @param theData */ - public QldbAddress(final DamlKvutils.DamlStateKey theData) { + public QldbAddress(final DamlStateKey theData) { data = theData; } @Override - public DamlKvutils.DamlStateKey toKey() { + public DamlStateKey toKey() { return data; } } diff --git a/core/src/main/java/com/blockchaintp/daml/address/QldbIdentifier.java b/core/src/main/java/com/blockchaintp/daml/address/QldbIdentifier.java index b3c16420..9d54197a 100644 --- a/core/src/main/java/com/blockchaintp/daml/address/QldbIdentifier.java +++ b/core/src/main/java/com/blockchaintp/daml/address/QldbIdentifier.java @@ -1,5 +1,5 @@ /* - * Copyright 2021 Blockchain Technology Partners + * Copyright 2021-2022 Blockchain Technology Partners * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -15,24 +15,24 @@ import java.util.Objects; -import com.daml.ledger.participant.state.kvutils.DamlKvutils; +import com.daml.ledger.participant.state.kvutils.store.DamlStateKey; /** * */ public final class QldbIdentifier implements Identifier { - private final DamlKvutils.DamlStateKey data; + private final DamlStateKey data; /** * * @param theData */ - public QldbIdentifier(final DamlKvutils.DamlStateKey theData) { + public QldbIdentifier(final DamlStateKey theData) { data = theData; } @Override - public DamlKvutils.DamlStateKey toKey() { + public DamlStateKey toKey() { return data; } diff --git a/core/src/main/java/com/blockchaintp/daml/participant/CommitPayload.java b/core/src/main/java/com/blockchaintp/daml/participant/CommitPayload.java index c93234ae..5fc20c0c 100644 --- a/core/src/main/java/com/blockchaintp/daml/participant/CommitPayload.java +++ b/core/src/main/java/com/blockchaintp/daml/participant/CommitPayload.java @@ -1,5 +1,5 @@ /* - * Copyright 2021 Blockchain Technology Partners + * Copyright 2021-2022 Blockchain Technology Partners * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -19,6 +19,7 @@ import java.util.stream.Stream; import com.blockchaintp.daml.address.Identifier; +import com.blockchaintp.daml.address.LedgerAddress; import com.daml.ledger.participant.state.kvutils.Raw; import com.daml.ledger.participant.state.kvutils.api.CommitMetadata; diff --git a/core/src/main/java/com/blockchaintp/daml/participant/InProcLedgerSubmitter.java b/core/src/main/java/com/blockchaintp/daml/participant/InProcLedgerSubmitter.java index 719cbb39..b2a5a3cd 100644 --- a/core/src/main/java/com/blockchaintp/daml/participant/InProcLedgerSubmitter.java +++ b/core/src/main/java/com/blockchaintp/daml/participant/InProcLedgerSubmitter.java @@ -1,5 +1,5 @@ /* - * Copyright 2021 Blockchain Technology Partners + * Copyright 2021-2022 Blockchain Technology Partners * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -28,9 +28,9 @@ import com.blockchaintp.utility.Functions; import com.blockchaintp.utility.UuidConverter; import com.daml.api.util.TimeProvider; -import com.daml.ledger.participant.state.kvutils.DamlKvutils; import com.daml.ledger.participant.state.kvutils.Raw; -import com.daml.ledger.participant.state.v1.SubmissionResult; +import com.daml.ledger.participant.state.kvutils.store.DamlLogEntryId; +import com.daml.ledger.participant.state.v2.SubmissionResult; import com.daml.ledger.validator.SubmissionValidator$; import com.daml.ledger.validator.ValidatingCommitter; import com.daml.lf.engine.Engine; @@ -78,8 +78,8 @@ public static InProcLedgerSubmit * @param id * @return A daml log entry id parsed from a daml log entry id. */ - private DamlKvutils.DamlLogEntryId logEntryIdToDamlLogEntryId(final Raw.LogEntryId id) { - var parsed = Functions.uncheckFn(() -> DamlKvutils.DamlLogEntryId.parseFrom(id.bytes())).apply(); + private DamlLogEntryId logEntryIdToDamlLogEntryId(final Raw.LogEntryId id) { + var parsed = Functions.uncheckFn(() -> DamlLogEntryId.parseFrom(id.bytes())).apply(); LOG.info("parse log id {}", () -> parsed.getEntryId().toString()); @@ -107,7 +107,7 @@ public InProcLedgerSubmitter(final Engine theEngine, final Metrics theMetrics, new StateAccess(CoercingStore.from(Bijection.of(Raw.StateKey::bytes, Raw.StateKey$.MODULE$::apply), Bijection.of(Raw.Envelope::bytes, Raw.Envelope$.MODULE$::apply), theStateStore), writer), () -> logEntryIdToDamlLogEntryId(UuidConverter.uuidtoLogEntry(UUID.randomUUID())), false, - new StateCache<>(new LRUCache<>(STATE_CACHE_SIZE)), theEngine, theMetrics, false), + new StateCache<>(new LRUCache<>(STATE_CACHE_SIZE)), theEngine, theMetrics), r -> { /// New head should be end of sequence, i.e. one past the actual head. This should really have a /// nicer type, we ensure it is monotonic here @@ -133,7 +133,7 @@ public CompletableFuture submitPayload(final CommitPayload if (x == SubmissionResult.Acknowledged$.MODULE$) { return SubmissionStatus.SUBMITTED; } - if (x == SubmissionResult.NotSupported$.MODULE$) { + if (x instanceof SubmissionResult.SynchronousError) { return SubmissionStatus.REJECTED; } LOG.info("Overloaded {} {} ", cp.getCorrelationId(), x); diff --git a/core/src/main/java/com/blockchaintp/daml/participant/Participant.java b/core/src/main/java/com/blockchaintp/daml/participant/Participant.java index 1b421166..4069859b 100644 --- a/core/src/main/java/com/blockchaintp/daml/participant/Participant.java +++ b/core/src/main/java/com/blockchaintp/daml/participant/Participant.java @@ -1,5 +1,5 @@ /* - * Copyright 2021 Blockchain Technology Partners + * Copyright 2021-2022 Blockchain Technology Partners * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -25,15 +25,17 @@ import com.blockchaintp.daml.address.Identifier; import com.blockchaintp.daml.address.LedgerAddress; import com.blockchaintp.daml.stores.service.TransactionLogReader; + +import com.daml.ledger.offset.Offset; +import com.daml.ledger.participant.state.kvutils.KVOffsetBuilder; import com.daml.ledger.api.health.HealthStatus; -import com.daml.ledger.participant.state.kvutils.OffsetBuilder; +import com.daml.ledger.participant.state.kvutils.KVOffsetBuilder$; import com.daml.ledger.participant.state.kvutils.Raw; import com.daml.ledger.participant.state.kvutils.api.CommitMetadata; import com.daml.ledger.participant.state.kvutils.api.LedgerReader; import com.daml.ledger.participant.state.kvutils.api.LedgerRecord; import com.daml.ledger.participant.state.kvutils.api.LedgerWriter; -import com.daml.ledger.participant.state.v1.Offset; -import com.daml.ledger.participant.state.v1.SubmissionResult; +import com.daml.ledger.participant.state.v2.SubmissionResult; import com.daml.platform.akkastreams.dispatcher.Dispatcher; import com.daml.platform.akkastreams.dispatcher.SubSource.RangeSource; import com.daml.telemetry.TelemetryContext; @@ -65,6 +67,7 @@ public final class Participant im private final BlockingDeque> submissions = new LinkedBlockingDeque<>(); private final ExecutionContextExecutor context; private final ScheduledExecutorService pollExecutor; + private final KVOffsetBuilder offsetBuilder = new KVOffsetBuilder((byte) 0); /** * Convenience method for creating a builder. @@ -131,16 +134,16 @@ public HealthStatus currentHealth() { public akka.stream.scaladsl.Source events(final Option startExclusive) { LOG.info("Get from {}", () -> startExclusive); - var start = OffsetBuilder.fromLong(0L, 0, 0); + var start = offsetBuilder.of(0L, 0, 0); Ordering scalaLongOrdering = Ordering.comparatorToOrdering(Comparator.comparingLong(scala.Long::unbox)); var rangeSource = new RangeSource<>((s, e) -> Source .fromJavaStream(() -> API.unchecked(() -> txLog.from(s - 1, Optional.of(e - 1))).apply() - .map(r -> Tuple2.apply(r._1, LedgerRecord.apply(OffsetBuilder.fromLong(r._1, 0, 0), r._2, r._3)))) + .map(r -> Tuple2.apply(r._1, LedgerRecord.apply(offsetBuilder.of(r._1, 0, 0), r._2, r._3)))) .mapMaterializedValue(m -> NotUsed.notUsed()).asScala(), scalaLongOrdering); - var offset = OffsetBuilder.highestIndex(startExclusive.getOrElse(() -> start)); + var offset = offsetBuilder.highestIndex(startExclusive.getOrElse(() -> start)); return dispatcher.startingAt(offset, rangeSource, Option.empty()).asJava().map(x -> { LOG.debug("Yield log {} {}", x._1, x._2); return x; diff --git a/core/src/main/java/com/blockchaintp/daml/participant/StateAccess.java b/core/src/main/java/com/blockchaintp/daml/participant/StateAccess.java index ad9aef7b..5ecd5f86 100644 --- a/core/src/main/java/com/blockchaintp/daml/participant/StateAccess.java +++ b/core/src/main/java/com/blockchaintp/daml/participant/StateAccess.java @@ -1,5 +1,5 @@ /* - * Copyright 2021 Blockchain Technology Partners + * Copyright 2021-2022 Blockchain Technology Partners * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -28,6 +28,7 @@ import com.daml.ledger.validator.BatchingLedgerStateOperations; import com.daml.ledger.validator.LedgerStateAccess; import com.daml.ledger.validator.LedgerStateOperations; +import com.daml.logging.LoggingContext; import kr.pe.kwonnam.slf4jlambda.LambdaLogger; import kr.pe.kwonnam.slf4jlambda.LambdaLoggerFactory; @@ -65,7 +66,7 @@ private class Operations extends BatchingLedgerStateOperations { @Override public Future>> readState(final Iterable keys, - final ExecutionContext executionContext) { + final ExecutionContext executionContext, final LoggingContext loggingContext) { var syncFuture = Future.fromTry(Try.apply(Functions.uncheckFn(() -> { var sparseInputs = StreamConverters$.MODULE$.asJavaParStream(keys) @@ -87,7 +88,7 @@ public Future>> readState(final Iterable @Override public Future writeState(final Iterable> keyValuePairs, - final ExecutionContext executionContext) { + final ExecutionContext executionContext, final LoggingContext loggingContext) { LOG.debug("Write state {}", keyValuePairs); var syncFuture = Future.fromTry(Try.apply(Functions.uncheckFn(() -> { stateStore.put(new ArrayList<>(StreamConverters.asJavaParStream(keyValuePairs) @@ -108,7 +109,7 @@ public Future writeState(final Iterable appendToLog(final Raw.LogEntryId key, final Raw.Envelope value, - final ExecutionContext executionContext) { + final ExecutionContext executionContext, final LoggingContext loggingContext) { var work = sequenceAllocation.serialisedBegin(key) .thenCompose(seq -> CompletableFuture.supplyAsync(() -> Functions.uncheckFn(() -> { @@ -124,7 +125,7 @@ public Future appendToLog(final Raw.LogEntryId key, final Raw.Envelope val @Override public Future inTransaction(final Function1, Future> body, - final ExecutionContext executionContext) { + final ExecutionContext executionContext, final LoggingContext loggingContext) { return Future.delegate(() -> body.apply(new Operations()), executionContext); } } diff --git a/core/src/main/java/com/blockchaintp/utility/UuidConverter.java b/core/src/main/java/com/blockchaintp/utility/UuidConverter.java index c071838e..52dbe92a 100644 --- a/core/src/main/java/com/blockchaintp/utility/UuidConverter.java +++ b/core/src/main/java/com/blockchaintp/utility/UuidConverter.java @@ -1,5 +1,5 @@ /* - * Copyright 2021 Blockchain Technology Partners + * Copyright 2021-2022 Blockchain Technology Partners * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -16,8 +16,8 @@ import java.nio.ByteBuffer; import java.util.UUID; -import com.daml.ledger.participant.state.kvutils.DamlKvutils; import com.daml.ledger.participant.state.kvutils.Raw; +import com.daml.ledger.participant.state.kvutils.store.DamlLogEntryId; import com.google.protobuf.ByteString; import io.vavr.API; @@ -37,9 +37,7 @@ private UuidConverter() { * @return The UUID encoded as the entry id from the log id. */ public static UUID logEntryToUuid(final Raw.LogEntryId id) { - return API - .unchecked( - () -> UuidConverter.asUuid(DamlKvutils.DamlLogEntryId.parseFrom(id.bytes()).getEntryId().toByteArray())) + return API.unchecked(() -> UuidConverter.asUuid(DamlLogEntryId.parseFrom(id.bytes()).getEntryId().toByteArray())) .apply(); } @@ -49,8 +47,8 @@ public static UUID logEntryToUuid(final Raw.LogEntryId id) { * @return A log entry id with the encoded UUID as its entry id. */ public static Raw.LogEntryId uuidtoLogEntry(final UUID id) { - return Raw.LogEntryId$.MODULE$.apply( - DamlKvutils.DamlLogEntryId.newBuilder().setEntryId(ByteString.copyFrom(UuidConverter.asBytes(id))).build()); + return Raw.LogEntryId$.MODULE$ + .apply(DamlLogEntryId.newBuilder().setEntryId(ByteString.copyFrom(UuidConverter.asBytes(id))).build()); } /** diff --git a/core/src/main/scala/com/blockchaintp/daml/runtime/BuilderLedgerFactory.scala b/core/src/main/scala/com/blockchaintp/daml/runtime/BuilderLedgerFactory.scala index cc7fdca4..f40abcac 100644 --- a/core/src/main/scala/com/blockchaintp/daml/runtime/BuilderLedgerFactory.scala +++ b/core/src/main/scala/com/blockchaintp/daml/runtime/BuilderLedgerFactory.scala @@ -1,5 +1,5 @@ /* - * Copyright 2021 Blockchain Technology Partners + * Copyright 2021-2022 Blockchain Technology Partners * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -49,7 +49,8 @@ abstract class BuilderLedgerFactory[ } yield new KeyValueParticipantState( readerWriter, readerWriter, - metrics + metrics, + false ) } diff --git a/core/src/main/scala/com/blockchaintp/daml/runtime/ParticipantOwner.scala b/core/src/main/scala/com/blockchaintp/daml/runtime/ParticipantOwner.scala index 21cee3dc..c042b735 100644 --- a/core/src/main/scala/com/blockchaintp/daml/runtime/ParticipantOwner.scala +++ b/core/src/main/scala/com/blockchaintp/daml/runtime/ParticipantOwner.scala @@ -1,5 +1,5 @@ /* - * Copyright 2021 Blockchain Technology Partners + * Copyright 2021-2022 Blockchain Technology Partners * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -18,6 +18,7 @@ import com.blockchaintp.daml.address.LedgerAddress import com.blockchaintp.daml.participant.Participant import com.blockchaintp.daml.participant.ParticipantBuilder import com.daml.ledger.api.domain.LedgerId +import com.daml.ledger.participant.state.index.v2.LedgerConfiguration import com.daml.ledger.participant.state.kvutils.app.Config import com.daml.ledger.resources.Resource import com.daml.ledger.resources.ResourceContext @@ -26,7 +27,6 @@ import com.daml.lf.data.Ref.ParticipantId import com.daml.lf.engine.Engine import com.daml.logging.LoggingContext import com.daml.metrics.Metrics -import com.daml.platform.configuration.LedgerConfiguration import com.daml.resources import java.util.concurrent.Executors diff --git a/core/src/test/java/com/blockchaintp/daml/stores/layers/CoercingTest.java b/core/src/test/java/com/blockchaintp/daml/stores/layers/CoercingTest.java index 3cd8274f..82bab1e7 100644 --- a/core/src/test/java/com/blockchaintp/daml/stores/layers/CoercingTest.java +++ b/core/src/test/java/com/blockchaintp/daml/stores/layers/CoercingTest.java @@ -19,9 +19,12 @@ import com.blockchaintp.daml.stores.exception.StoreWriteException; import com.blockchaintp.daml.stores.service.Key; import com.blockchaintp.daml.stores.service.Value; -import com.daml.ledger.participant.state.kvutils.DamlKvutils; -import com.daml.ledger.participant.state.v1.Offset; -import com.daml.ledger.participant.state.v1.Offset$; +import com.daml.ledger.offset.Offset; +import com.daml.ledger.offset.Offset$; +import com.daml.ledger.participant.state.kvutils.store.DamlLogEntry; +import com.daml.ledger.participant.state.kvutils.store.DamlLogEntryId; +import com.daml.ledger.participant.state.kvutils.store.DamlStateKey; +import com.daml.ledger.participant.state.kvutils.store.DamlStateValue; import com.google.common.primitives.Longs; import com.google.protobuf.ByteString; import io.vavr.API; @@ -38,14 +41,14 @@ void store_coercion() throws StoreWriteException, StoreReadException { var stub = new StubStore(); var coerced = CoercingStore.from( - Bijection.of((DamlKvutils.DamlStateKey k) -> k.toByteString(), - API.unchecked((ByteString k) -> DamlKvutils.DamlStateKey.parseFrom(k))), - Bijection.of((DamlKvutils.DamlStateValue v) -> v.toByteString(), - API.unchecked((ByteString v) -> DamlKvutils.DamlStateValue.parseFrom(v))), + Bijection.of((DamlStateKey k) -> k.toByteString(), + API.unchecked((ByteString k) -> DamlStateKey.parseFrom(k))), + Bijection.of((DamlStateValue v) -> v.toByteString(), + API.unchecked((ByteString v) -> DamlStateValue.parseFrom(v))), stub); - var k = DamlKvutils.DamlStateKey.newBuilder().setParty("bob").build(); - var v = DamlKvutils.DamlStateValue.newBuilder().build(); + var k = DamlStateKey.newBuilder().setParty("bob").build(); + var v = DamlStateValue.newBuilder().build(); coerced.put(Key.of(k), Value.of(v)); Assertions.assertArrayEquals(v.toByteArray(), coerced.get(Key.of(k)).get().toNative().toByteArray()); @@ -71,17 +74,17 @@ void txlog_coercion() throws StoreWriteException, StoreReadException { var stub = new StubTransactionLog(); var coerced = CoercingTxLog .from( - Bijection.of((DamlKvutils.DamlLogEntryId k) -> asUuid(k.getEntryId().toByteArray()), - (UUID k) -> DamlKvutils.DamlLogEntryId.newBuilder().setEntryId(ByteString.copyFrom(asBytes(k))) + Bijection.of((DamlLogEntryId k) -> asUuid(k.getEntryId().toByteArray()), + (UUID k) -> DamlLogEntryId.newBuilder().setEntryId(ByteString.copyFrom(asBytes(k))) .build()), - Bijection.of((DamlKvutils.DamlLogEntry v) -> v.toByteString(), - API.unchecked((ByteString v) -> DamlKvutils.DamlLogEntry.parseFrom(v))), + Bijection.of((DamlLogEntry v) -> v.toByteString(), + API.unchecked((ByteString v) -> DamlLogEntry.parseFrom(v))), Bijection.of((Offset i) -> Longs.fromByteArray(i.toByteArray()), (Long i) -> Offset$.MODULE$.fromByteArray(Longs.toByteArray(i))), stub); var id = coerced.begin(Optional.empty()); - var data = DamlKvutils.DamlLogEntry.newBuilder().build(); + var data = DamlLogEntry.newBuilder().build(); coerced.sendEvent(id._1, data); coerced.commit(id._1); diff --git a/pom.xml b/pom.xml index e09010ff..4c558255 100644 --- a/pom.xml +++ b/pom.xml @@ -79,10 +79,10 @@ - 1.13.1-SNAPSHOT + 1.18.3-SNAPSHOT UTF-8 3.17.3 - 1.13.1 + 1.18.3 2.13 diff --git a/qldb/src/main/scala/com/blockchaintp/daml/qldb/Metrics.scala b/qldb/src/main/scala/com/blockchaintp/daml/qldb/Metrics.scala new file mode 100644 index 00000000..e19fe294 --- /dev/null +++ b/qldb/src/main/scala/com/blockchaintp/daml/qldb/Metrics.scala @@ -0,0 +1,62 @@ +/* + * Copyright 2022 Blockchain Technology Partners + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package com.blockchaintp.daml.qldb + +import com.daml.metrics.MetricsReporter + +import scala.concurrent.duration.Duration +import scala.concurrent.duration.NANOSECONDS +import scala.util.Try + +object Metrics { + type Setter[T, B] = (B => B, T) => T + private case class DurationFormat(unwrap: Duration) + + // We're trying to parse the java duration first for backwards compatibility as + // removing it and only supporting the scala duration variant would be a breaking change. + implicit private val scoptDurationFormat: scopt.Read[DurationFormat] = scopt.Read.reads { duration => + Try { + Duration.fromNanos( + java.time.Duration.parse(duration).toNanos + ) + }.orElse(Try { + Duration(duration) + }).flatMap(duration => + Try { + if (!duration.isFinite) + throw new IllegalArgumentException(s"Input duration $duration is not finite") + else DurationFormat(Duration(duration.toNanos, NANOSECONDS)) + } + ).get + } + + def metricsReporterParse[C](parser: scopt.OptionParser[C])( + metricsReporter: Setter[C, Option[MetricsReporter]], + metricsReportingInterval: Setter[C, Duration] + ): Unit = { + import parser.opt + + opt[MetricsReporter]("metrics-reporter") + .action((reporter, config) => metricsReporter(_ => Some(reporter), config)) + .optional() + .text(s"Start a metrics reporter. ${MetricsReporter.cliHint}") + + opt[DurationFormat]("metrics-reporting-interval") + .action((interval, config) => metricsReportingInterval(_ => interval.unwrap, config)) + .optional() + .text("Set metric reporting interval.") + + () + } +} From ab555a99577ecafaf6cd35fa7bd36babfda81225 Mon Sep 17 00:00:00 2001 From: Ryan Date: Mon, 25 Jul 2022 17:44:36 +0100 Subject: [PATCH 2/9] fix: Spotless on test Signed-off-by: Ryan --- .../daml/stores/layers/CoercingTest.java | 22 ++++++++----------- 1 file changed, 9 insertions(+), 13 deletions(-) diff --git a/core/src/test/java/com/blockchaintp/daml/stores/layers/CoercingTest.java b/core/src/test/java/com/blockchaintp/daml/stores/layers/CoercingTest.java index 82bab1e7..72800a5a 100644 --- a/core/src/test/java/com/blockchaintp/daml/stores/layers/CoercingTest.java +++ b/core/src/test/java/com/blockchaintp/daml/stores/layers/CoercingTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2021 Blockchain Technology Partners + * Copyright 2021-2022 Blockchain Technology Partners * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -41,8 +41,7 @@ void store_coercion() throws StoreWriteException, StoreReadException { var stub = new StubStore(); var coerced = CoercingStore.from( - Bijection.of((DamlStateKey k) -> k.toByteString(), - API.unchecked((ByteString k) -> DamlStateKey.parseFrom(k))), + Bijection.of((DamlStateKey k) -> k.toByteString(), API.unchecked((ByteString k) -> DamlStateKey.parseFrom(k))), Bijection.of((DamlStateValue v) -> v.toByteString(), API.unchecked((ByteString v) -> DamlStateValue.parseFrom(v))), stub); @@ -72,16 +71,13 @@ private static byte[] asBytes(final UUID uuid) { @Test void txlog_coercion() throws StoreWriteException, StoreReadException { var stub = new StubTransactionLog(); - var coerced = CoercingTxLog - .from( - Bijection.of((DamlLogEntryId k) -> asUuid(k.getEntryId().toByteArray()), - (UUID k) -> DamlLogEntryId.newBuilder().setEntryId(ByteString.copyFrom(asBytes(k))) - .build()), - Bijection.of((DamlLogEntry v) -> v.toByteString(), - API.unchecked((ByteString v) -> DamlLogEntry.parseFrom(v))), - Bijection.of((Offset i) -> Longs.fromByteArray(i.toByteArray()), - (Long i) -> Offset$.MODULE$.fromByteArray(Longs.toByteArray(i))), - stub); + var coerced = CoercingTxLog.from( + Bijection.of((DamlLogEntryId k) -> asUuid(k.getEntryId().toByteArray()), + (UUID k) -> DamlLogEntryId.newBuilder().setEntryId(ByteString.copyFrom(asBytes(k))).build()), + Bijection.of((DamlLogEntry v) -> v.toByteString(), API.unchecked((ByteString v) -> DamlLogEntry.parseFrom(v))), + Bijection.of((Offset i) -> Longs.fromByteArray(i.toByteArray()), + (Long i) -> Offset$.MODULE$.fromByteArray(Longs.toByteArray(i))), + stub); var id = coerced.begin(Optional.empty()); var data = DamlLogEntry.newBuilder().build(); From 107fafce1e943b9af3d808d2683c8a61f2a1e589 Mon Sep 17 00:00:00 2001 From: Ryan Date: Mon, 25 Jul 2022 18:16:55 +0100 Subject: [PATCH 3/9] fix: IntialConfig Signed-off-by: Ryan --- .../daml/runtime/BuilderLedgerFactory.scala | 39 ++++++++++++++++++- .../daml/runtime/ParticipantOwner.scala | 5 +-- 2 files changed, 40 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/com/blockchaintp/daml/runtime/BuilderLedgerFactory.scala b/core/src/main/scala/com/blockchaintp/daml/runtime/BuilderLedgerFactory.scala index f40abcac..56c405da 100644 --- a/core/src/main/scala/com/blockchaintp/daml/runtime/BuilderLedgerFactory.scala +++ b/core/src/main/scala/com/blockchaintp/daml/runtime/BuilderLedgerFactory.scala @@ -17,6 +17,8 @@ import akka.stream.Materializer import com.blockchaintp.daml.address.Identifier import com.blockchaintp.daml.address.LedgerAddress import com.blockchaintp.daml.participant.ParticipantBuilder +import com.daml.ledger.api.v1.admin.config_management_service.TimeModel +import com.daml.ledger.configuration.{Configuration, LedgerTimeModel} import com.daml.ledger.participant.state.kvutils.api.KeyValueLedger import com.daml.ledger.participant.state.kvutils.api.KeyValueParticipantState import com.daml.ledger.participant.state.kvutils.app.Config @@ -26,6 +28,9 @@ import com.daml.ledger.resources.ResourceOwner import com.daml.lf.engine.Engine import com.daml.logging.LoggingContext import com.daml.metrics.Metrics +import com.daml.platform.configuration.InitialLedgerConfiguration + +import java.time.Duration abstract class BuilderLedgerFactory[ Id <: Identifier, @@ -54,6 +59,38 @@ abstract class BuilderLedgerFactory[ ) } + override def initialLedgerConfig(config: Config[ExtraConfig]): InitialLedgerConfiguration = + InitialLedgerConfiguration( + configuration = Configuration( + generation = 1, + timeModel = TimeModel( + LedgerTimeModel( + avgTransactionLatency = Duration.ofSeconds(1L), + minSkew = Duration.ofSeconds(40L), + maxSkew = Duration.ofSeconds(80L) + ).get, + maxDeduplicationTime = Duration.ofDays(1) + ), + initialConfigurationSubmitDelay = Duration.ofSeconds(5), + configurationLoadTimeout = Duration.ofSeconds(10) + delayBeforeSubmitting = Duration.ofSeconds(5) + ) + ) + + def initialLedgerConfig(config: Config[ExtraConfig]): InitialLedgerConfiguration = + InitialLedgerConfiguration( + configuration = Configuration( + generation = 1, + timeModel = LedgerTimeModel( + avgTransactionLatency = Duration.ofSeconds(1L), + minSkew = Duration.ofSeconds(40L), + maxSkew = Duration.ofSeconds(80L) + ).get, + maxDeduplicationTime = Duration.ofDays(1) + ), + delayBeforeSubmitting = Duration.ofSeconds(5) + ) + def owner( config: Config[ExtraConfig], metrics: Metrics, @@ -65,7 +102,7 @@ abstract class BuilderLedgerFactory[ logCtx: LoggingContext ): ResourceOwner[KeyValueLedger] = { new ParticipantOwner( - ledgerConfig(config), + initialLedgerConfig(config), engine, metrics, logCtx, diff --git a/core/src/main/scala/com/blockchaintp/daml/runtime/ParticipantOwner.scala b/core/src/main/scala/com/blockchaintp/daml/runtime/ParticipantOwner.scala index c042b735..dfcf12f8 100644 --- a/core/src/main/scala/com/blockchaintp/daml/runtime/ParticipantOwner.scala +++ b/core/src/main/scala/com/blockchaintp/daml/runtime/ParticipantOwner.scala @@ -17,8 +17,6 @@ import com.blockchaintp.daml.address.Identifier import com.blockchaintp.daml.address.LedgerAddress import com.blockchaintp.daml.participant.Participant import com.blockchaintp.daml.participant.ParticipantBuilder -import com.daml.ledger.api.domain.LedgerId -import com.daml.ledger.participant.state.index.v2.LedgerConfiguration import com.daml.ledger.participant.state.kvutils.app.Config import com.daml.ledger.resources.Resource import com.daml.ledger.resources.ResourceContext @@ -27,13 +25,14 @@ import com.daml.lf.data.Ref.ParticipantId import com.daml.lf.engine.Engine import com.daml.logging.LoggingContext import com.daml.metrics.Metrics +import com.daml.platform.configuration.InitialLedgerConfiguration import com.daml.resources import java.util.concurrent.Executors import scala.{concurrent => sc} class ParticipantOwner[ExtraConfig, Id <: Identifier, Address <: LedgerAddress]( - val ledgerConfig: LedgerConfiguration, + val ledgerConfig: InitialLedgerConfiguration, val engine: Engine, val metrics: Metrics, val logCtx: LoggingContext, From c1948fd6654bc7fae365e9c7d8d32fd99ff35eba Mon Sep 17 00:00:00 2001 From: Ryan Date: Mon, 25 Jul 2022 18:20:16 +0100 Subject: [PATCH 4/9] fix: Formatting Signed-off-by: Ryan --- .../daml/runtime/BuilderLedgerFactory.scala | 21 ++-------------- .../com/blockchaintp/daml/postgres/Main.scala | 25 +++---------------- 2 files changed, 5 insertions(+), 41 deletions(-) diff --git a/core/src/main/scala/com/blockchaintp/daml/runtime/BuilderLedgerFactory.scala b/core/src/main/scala/com/blockchaintp/daml/runtime/BuilderLedgerFactory.scala index 56c405da..21d87994 100644 --- a/core/src/main/scala/com/blockchaintp/daml/runtime/BuilderLedgerFactory.scala +++ b/core/src/main/scala/com/blockchaintp/daml/runtime/BuilderLedgerFactory.scala @@ -18,7 +18,8 @@ import com.blockchaintp.daml.address.Identifier import com.blockchaintp.daml.address.LedgerAddress import com.blockchaintp.daml.participant.ParticipantBuilder import com.daml.ledger.api.v1.admin.config_management_service.TimeModel -import com.daml.ledger.configuration.{Configuration, LedgerTimeModel} +import com.daml.ledger.configuration.Configuration +import com.daml.ledger.configuration.LedgerTimeModel import com.daml.ledger.participant.state.kvutils.api.KeyValueLedger import com.daml.ledger.participant.state.kvutils.api.KeyValueParticipantState import com.daml.ledger.participant.state.kvutils.app.Config @@ -60,24 +61,6 @@ abstract class BuilderLedgerFactory[ } override def initialLedgerConfig(config: Config[ExtraConfig]): InitialLedgerConfiguration = - InitialLedgerConfiguration( - configuration = Configuration( - generation = 1, - timeModel = TimeModel( - LedgerTimeModel( - avgTransactionLatency = Duration.ofSeconds(1L), - minSkew = Duration.ofSeconds(40L), - maxSkew = Duration.ofSeconds(80L) - ).get, - maxDeduplicationTime = Duration.ofDays(1) - ), - initialConfigurationSubmitDelay = Duration.ofSeconds(5), - configurationLoadTimeout = Duration.ofSeconds(10) - delayBeforeSubmitting = Duration.ofSeconds(5) - ) - ) - - def initialLedgerConfig(config: Config[ExtraConfig]): InitialLedgerConfiguration = InitialLedgerConfiguration( configuration = Configuration( generation = 1, diff --git a/postgres/src/main/scala/com/blockchaintp/daml/postgres/Main.scala b/postgres/src/main/scala/com/blockchaintp/daml/postgres/Main.scala index ec1eaeb6..63834b81 100644 --- a/postgres/src/main/scala/com/blockchaintp/daml/postgres/Main.scala +++ b/postgres/src/main/scala/com/blockchaintp/daml/postgres/Main.scala @@ -1,5 +1,5 @@ /* - * Copyright 2021 Blockchain Technology Partners + * Copyright 2021-2022 Blockchain Technology Partners * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -24,6 +24,7 @@ import com.daml.jwt.RSA256Verifier import com.daml.ledger.api.auth.AuthService import com.daml.ledger.api.auth.AuthServiceJWT import com.daml.ledger.api.auth.AuthServiceWildcard +import com.daml.ledger.api.v1.ledger_configuration_service.LedgerConfiguration import com.daml.ledger.participant.state.kvutils.api.CommitMetadata import com.daml.ledger.participant.state.kvutils.app.Config import com.daml.ledger.participant.state.kvutils.app.ParticipantConfig @@ -31,14 +32,12 @@ import com.daml.ledger.participant.state.kvutils.app.Runner import com.daml.ledger.resources.ResourceContext import com.daml.ledger.validator.DefaultStateKeySerializationStrategy import com.daml.platform.configuration.CommandConfiguration -import com.daml.platform.configuration.LedgerConfiguration import com.daml.resources.ProgramResource import scopt.OptionParser import scala.jdk.CollectionConverters._ import java.nio.file.Paths -import scala.concurrent.duration.DurationInt -import scala.concurrent.duration.FiniteDuration +import java.time.Duration import scala.jdk.FunctionConverters.enrichAsJavaFunction import scala.util.Try @@ -113,26 +112,8 @@ class LedgerFactory( } } - override def ledgerConfig(config: Config[ExtraConfig]): LedgerConfiguration = - LedgerConfiguration.defaultLocalLedger - override val defaultExtraConfig: ExtraConfig = ExtraConfig.default - override def commandConfig( - participantConfig: ParticipantConfig, - config: Config[ExtraConfig] - ): CommandConfiguration = { - val DefaultTrackerRetentionPeriod: FiniteDuration = 5.minutes - - CommandConfiguration( - inputBufferSize = 512, - maxParallelSubmissions = 1, - maxCommandsInFlight = 256, - limitMaxCommandsInFlight = true, - retentionPeriod = DefaultTrackerRetentionPeriod - ) - } - private def validatePath(path: String, message: String) = { val valid = Try(Paths.get(path).toFile.canRead).getOrElse(false) if (valid) Right(()) else Left(message) From 557e4aa510bd092b80dfac11addb5ef07ac1eb03 Mon Sep 17 00:00:00 2001 From: Ryan Date: Mon, 25 Jul 2022 19:49:21 +0100 Subject: [PATCH 5/9] fix: Update test tool Signed-off-by: Ryan --- docker/ledger-api-testtool.docker | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docker/ledger-api-testtool.docker b/docker/ledger-api-testtool.docker index e891c287..41687ba5 100644 --- a/docker/ledger-api-testtool.docker +++ b/docker/ledger-api-testtool.docker @@ -13,7 +13,7 @@ # limitations under the License. FROM ubuntu:bionic -ARG DAML_SDK_VERSION=1.13.1 +ARG DAML_SDK_VERSION=1.18.3 RUN apt-get update -y && \ apt-get upgrade -y && \ From 17177654e437a2be924549126ae0527fa7718cb2 Mon Sep 17 00:00:00 2001 From: Ryan Date: Mon, 25 Jul 2022 19:56:05 +0100 Subject: [PATCH 6/9] fix: Import orders Signed-off-by: Ryan --- .../com/blockchaintp/daml/participant/CommitPayload.java | 1 - .../java/com/blockchaintp/daml/participant/Participant.java | 6 ++---- 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/core/src/main/java/com/blockchaintp/daml/participant/CommitPayload.java b/core/src/main/java/com/blockchaintp/daml/participant/CommitPayload.java index 5fc20c0c..681731ba 100644 --- a/core/src/main/java/com/blockchaintp/daml/participant/CommitPayload.java +++ b/core/src/main/java/com/blockchaintp/daml/participant/CommitPayload.java @@ -19,7 +19,6 @@ import java.util.stream.Stream; import com.blockchaintp.daml.address.Identifier; -import com.blockchaintp.daml.address.LedgerAddress; import com.daml.ledger.participant.state.kvutils.Raw; import com.daml.ledger.participant.state.kvutils.api.CommitMetadata; diff --git a/core/src/main/java/com/blockchaintp/daml/participant/Participant.java b/core/src/main/java/com/blockchaintp/daml/participant/Participant.java index 4069859b..2a97dcb7 100644 --- a/core/src/main/java/com/blockchaintp/daml/participant/Participant.java +++ b/core/src/main/java/com/blockchaintp/daml/participant/Participant.java @@ -13,6 +13,7 @@ */ package com.blockchaintp.daml.participant; + import java.util.Comparator; import java.util.Optional; import java.util.concurrent.BlockingDeque; @@ -25,11 +26,8 @@ import com.blockchaintp.daml.address.Identifier; import com.blockchaintp.daml.address.LedgerAddress; import com.blockchaintp.daml.stores.service.TransactionLogReader; - -import com.daml.ledger.offset.Offset; -import com.daml.ledger.participant.state.kvutils.KVOffsetBuilder; import com.daml.ledger.api.health.HealthStatus; -import com.daml.ledger.participant.state.kvutils.KVOffsetBuilder$; +import com.daml.ledger.offset.Offset; import com.daml.ledger.participant.state.kvutils.Raw; import com.daml.ledger.participant.state.kvutils.api.CommitMetadata; import com.daml.ledger.participant.state.kvutils.api.LedgerReader; From 771213fbe8f46ea01519cbf18e1eb987110b16cc Mon Sep 17 00:00:00 2001 From: Ryan Date: Mon, 25 Jul 2022 20:10:31 +0100 Subject: [PATCH 7/9] Fix: more imports Signed-off-by: Ryan --- .../java/com/blockchaintp/daml/participant/Participant.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/java/com/blockchaintp/daml/participant/Participant.java b/core/src/main/java/com/blockchaintp/daml/participant/Participant.java index 2a97dcb7..ef4c0b90 100644 --- a/core/src/main/java/com/blockchaintp/daml/participant/Participant.java +++ b/core/src/main/java/com/blockchaintp/daml/participant/Participant.java @@ -13,7 +13,6 @@ */ package com.blockchaintp.daml.participant; - import java.util.Comparator; import java.util.Optional; import java.util.concurrent.BlockingDeque; @@ -28,6 +27,7 @@ import com.blockchaintp.daml.stores.service.TransactionLogReader; import com.daml.ledger.api.health.HealthStatus; import com.daml.ledger.offset.Offset; +import com.daml.ledger.participant.state.kvutils.KVOffsetBuilder; import com.daml.ledger.participant.state.kvutils.Raw; import com.daml.ledger.participant.state.kvutils.api.CommitMetadata; import com.daml.ledger.participant.state.kvutils.api.LedgerReader; From 539144ae8a926c36f097eaf2c94def076ab064e9 Mon Sep 17 00:00:00 2001 From: Ryan Date: Tue, 26 Jul 2022 21:28:16 +0100 Subject: [PATCH 8/9] Add a reply queue for particpant, returning the last error if any Signed-off-by: Ryan --- .../participant/InProcLedgerSubmitter.java | 9 +- .../daml/participant/LedgerSubmitter.java | 4 +- .../participant/LedgerSubmitterBulkhead.java | 6 +- .../daml/participant/Participant.java | 35 +++++- .../daml/participant/SubmissionResult.java | 107 ++++++++++++++++++ .../daml/participant/SubmissionStatus.java | 2 +- 6 files changed, 151 insertions(+), 12 deletions(-) create mode 100644 core/src/main/java/com/blockchaintp/daml/participant/SubmissionResult.java diff --git a/core/src/main/java/com/blockchaintp/daml/participant/InProcLedgerSubmitter.java b/core/src/main/java/com/blockchaintp/daml/participant/InProcLedgerSubmitter.java index b2a5a3cd..5c3dc22d 100644 --- a/core/src/main/java/com/blockchaintp/daml/participant/InProcLedgerSubmitter.java +++ b/core/src/main/java/com/blockchaintp/daml/participant/InProcLedgerSubmitter.java @@ -125,19 +125,20 @@ public InProcLedgerSubmitter(final Engine theEngine, final Metrics theMetrics, } @Override - public CompletableFuture submitPayload(final CommitPayload cp) { + public CompletableFuture submitPayload( + final CommitPayload cp) { return FutureConverters .toJava( this.comitter.commit(cp.getCorrelationId(), cp.getSubmission(), cp.getSubmittingParticipantId(), context)) .thenApply(x -> { if (x == SubmissionResult.Acknowledged$.MODULE$) { - return SubmissionStatus.SUBMITTED; + return com.blockchaintp.daml.participant.SubmissionResult.submitted(); } if (x instanceof SubmissionResult.SynchronousError) { - return SubmissionStatus.REJECTED; + return com.blockchaintp.daml.participant.SubmissionResult.rejected((SubmissionResult.SynchronousError) x); } LOG.info("Overloaded {} {} ", cp.getCorrelationId(), x); - return SubmissionStatus.OVERLOADED; + return com.blockchaintp.daml.participant.SubmissionResult.overloaded(); }).toCompletableFuture(); } diff --git a/core/src/main/java/com/blockchaintp/daml/participant/LedgerSubmitter.java b/core/src/main/java/com/blockchaintp/daml/participant/LedgerSubmitter.java index d61dfe53..a2e9aa86 100644 --- a/core/src/main/java/com/blockchaintp/daml/participant/LedgerSubmitter.java +++ b/core/src/main/java/com/blockchaintp/daml/participant/LedgerSubmitter.java @@ -1,5 +1,5 @@ /* - * Copyright 2021 Blockchain Technology Partners + * Copyright 2021-2022 Blockchain Technology Partners * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -35,7 +35,7 @@ public interface LedgerSubmitter * the payload to submit. * @return a reference to the submission. */ - CompletableFuture submitPayload(CommitPayload cp); + CompletableFuture submitPayload(CommitPayload cp); /** * For convenience we can translate one the payload to the expected output payload. diff --git a/core/src/main/java/com/blockchaintp/daml/participant/LedgerSubmitterBulkhead.java b/core/src/main/java/com/blockchaintp/daml/participant/LedgerSubmitterBulkhead.java index 0b742caf..fc663c25 100644 --- a/core/src/main/java/com/blockchaintp/daml/participant/LedgerSubmitterBulkhead.java +++ b/core/src/main/java/com/blockchaintp/daml/participant/LedgerSubmitterBulkhead.java @@ -1,5 +1,5 @@ /* - * Copyright 2021 Blockchain Technology Partners + * Copyright 2021-2022 Blockchain Technology Partners * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -80,7 +80,7 @@ public LedgerSubmitterBulkhead(final LedgerSubmitter theInner, final int m } @Override - public CompletableFuture submitPayload(final CommitPayload cp) { + public CompletableFuture submitPayload(final CommitPayload cp) { var executor = ContextAwareScheduledThreadPoolExecutor.newScheduledThreadPool().build(); var retry = Retry.of("Ledger submitter retry", @@ -99,7 +99,7 @@ public CompletableFuture submitPayload(final CommitPayload .withBulkhead(bulkhead) .withFallback( Arrays.asList(TimeoutException.class, CallNotPermittedException.class, BulkheadFullException.class), - throwable -> SubmissionStatus.OVERLOADED) + throwable -> SubmissionResult.overloaded()) .withRetry(retry, executor).get().toCompletableFuture(); } diff --git a/core/src/main/java/com/blockchaintp/daml/participant/Participant.java b/core/src/main/java/com/blockchaintp/daml/participant/Participant.java index ef4c0b90..853d1b10 100644 --- a/core/src/main/java/com/blockchaintp/daml/participant/Participant.java +++ b/core/src/main/java/com/blockchaintp/daml/participant/Participant.java @@ -13,6 +13,7 @@ */ package com.blockchaintp.daml.participant; +import java.util.ArrayList; import java.util.Comparator; import java.util.Optional; import java.util.concurrent.BlockingDeque; @@ -63,6 +64,8 @@ public final class Participant im private final String participantId; private final Dispatcher dispatcher; private final BlockingDeque> submissions = new LinkedBlockingDeque<>(); + private final BlockingDeque> responses + = new LinkedBlockingDeque<>(); private final ExecutionContextExecutor context; private final ScheduledExecutorService pollExecutor; private final KVOffsetBuilder offsetBuilder = new KVOffsetBuilder((byte) 0); @@ -113,7 +116,9 @@ private void work() { LOG.debug("Commit correlation id {}", next.getCorrelationId()); try { - var result = submitter.submitPayload(next).get(); + com.blockchaintp.daml.participant.SubmissionResult result = submitter.submitPayload(next).get(); + + responses.push(Tuple2.apply(next.getCorrelationId(), result)); LOG.info("Submission result for {} {}", next.getCorrelationId(), result); @@ -162,7 +167,33 @@ public String participantId() { public Future commit(final String correlationId, final Raw.Envelope envelope, final CommitMetadata metadata, final TelemetryContext telemetryContext) { - commitPayloadBuilder.build(envelope, metadata, correlationId).stream().forEach(submissions::add); + final var waitFor = new ArrayList(); + + commitPayloadBuilder.build(envelope, metadata, correlationId).stream().forEach(submission -> { + waitFor.add(submission.getCorrelationId()); + submissions.add(submission); + }); + + SubmissionResult lastError = null; + + while (!waitFor.isEmpty()) { + var head = responses.peek(); + + if (head != null) { + if (waitFor.contains(head._1)) { + waitFor.remove(head._1); + + if (head._2.getStatus() == SubmissionStatus.REJECTED) { + lastError = head._2.getError().get(); + } + } + } + + } + + if (lastError != null) { + return Future.successful(lastError); + } return Future.successful(SubmissionResult.Acknowledged$.MODULE$); } diff --git a/core/src/main/java/com/blockchaintp/daml/participant/SubmissionResult.java b/core/src/main/java/com/blockchaintp/daml/participant/SubmissionResult.java new file mode 100644 index 00000000..dc8b52ed --- /dev/null +++ b/core/src/main/java/com/blockchaintp/daml/participant/SubmissionResult.java @@ -0,0 +1,107 @@ +/* + * Copyright 2022 Blockchain Technology Partners + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package com.blockchaintp.daml.participant; + +import java.util.Optional; + +/** + * The type Submission result. + */ +public class SubmissionResult { + private final SubmissionStatus status; + + /** + * Gets status. + * + * @return the status + */ + public SubmissionStatus getStatus() { + return status; + } + + /** + * Gets error. + * + * @return the synchronous error + */ + public Optional getError() { + return error; + } + + private final Optional error; + + /** + * Instantiates a new Submission result. + * + * @param theStatus + * the the status + * @param theError + * the the error + */ + public SubmissionResult(final SubmissionStatus theStatus, + final Optional theError) { + status = theStatus; + error = theError; + } + + /** + * Submitted submission result. + * + * @return the submission result + */ + public static SubmissionResult submitted() { + return new SubmissionResult(SubmissionStatus.SUBMITTED, Optional.empty()); + } + + /** + * The participant cannot currently process this submission. + * + * @return the submission result + */ + public static SubmissionResult overloaded() { + return new SubmissionResult(SubmissionStatus.OVERLOADED, Optional.empty()); + } + + /** + * The participant has not yet submitted a proposal. + * + * @return the submission result + */ + public static SubmissionResult enqueued() { + return new SubmissionResult(SubmissionStatus.ENQUEUED, Optional.empty()); + } + + /** + * The participant has submitted a proposal, but it has not yet been fully accepted. This is + * normally only applicable to multipart submissions + * + * @return the submission result + */ + public static SubmissionResult partiallySubmitted() { + return new SubmissionResult(SubmissionStatus.PARTIALLY_SUBMITTED, Optional.empty()); + } + + /** + * The participant has submitted a proposal, and it has been rejected. This is a terminal state and + * is normally quite immediate. + * + * @param error + * the error + * @return the submission result + */ + public static SubmissionResult rejected( + final com.daml.ledger.participant.state.v2.SubmissionResult.SynchronousError error) { + return new SubmissionResult(SubmissionStatus.REJECTED, Optional.of(error)); + } +} diff --git a/core/src/main/java/com/blockchaintp/daml/participant/SubmissionStatus.java b/core/src/main/java/com/blockchaintp/daml/participant/SubmissionStatus.java index f1675535..8425c6d8 100644 --- a/core/src/main/java/com/blockchaintp/daml/participant/SubmissionStatus.java +++ b/core/src/main/java/com/blockchaintp/daml/participant/SubmissionStatus.java @@ -1,5 +1,5 @@ /* - * Copyright 2021 Blockchain Technology Partners + * Copyright 2021-2022 Blockchain Technology Partners * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at From bce9184134b90fe87448315a1c10737c8c9c9e35 Mon Sep 17 00:00:00 2001 From: Ryan Date: Tue, 26 Jul 2022 21:47:56 +0100 Subject: [PATCH 9/9] fix: Spotless Signed-off-by: Ryan --- .../com/blockchaintp/daml/participant/Participant.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/core/src/main/java/com/blockchaintp/daml/participant/Participant.java b/core/src/main/java/com/blockchaintp/daml/participant/Participant.java index 853d1b10..0e2588a2 100644 --- a/core/src/main/java/com/blockchaintp/daml/participant/Participant.java +++ b/core/src/main/java/com/blockchaintp/daml/participant/Participant.java @@ -63,9 +63,8 @@ public final class Participant im private final String ledgerId; private final String participantId; private final Dispatcher dispatcher; - private final BlockingDeque> submissions = new LinkedBlockingDeque<>(); - private final BlockingDeque> responses - = new LinkedBlockingDeque<>(); + private final BlockingDeque> submissions; + private final BlockingDeque> responses; private final ExecutionContextExecutor context; private final ScheduledExecutorService pollExecutor; private final KVOffsetBuilder offsetBuilder = new KVOffsetBuilder((byte) 0); @@ -107,6 +106,8 @@ public Participant(final TransactionLogReader(); + submissions = new LinkedBlockingDeque<>(); } private void work() {