Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update to 1.18.3, local tests passing #48

Draft
wants to merge 9 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2021 Blockchain Technology Partners
* Copyright 2021-2022 Blockchain Technology Partners
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
Expand All @@ -13,7 +13,7 @@
*/
package com.blockchaintp.daml.address;

import com.daml.ledger.participant.state.kvutils.DamlKvutils;
import com.daml.ledger.participant.state.kvutils.store.DamlStateKey;

/**
* A generic identifier.
Expand All @@ -23,5 +23,5 @@ public interface Identifier {
*
* @return The state key;
*/
DamlKvutils.DamlStateKey toKey();
DamlStateKey toKey();
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2021 Blockchain Technology Partners
* Copyright 2021-2022 Blockchain Technology Partners
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
Expand All @@ -13,24 +13,24 @@
*/
package com.blockchaintp.daml.address;

import com.daml.ledger.participant.state.kvutils.DamlKvutils;
import com.daml.ledger.participant.state.kvutils.store.DamlStateKey;

/**
*
*/
public final class QldbAddress implements LedgerAddress {
private final DamlKvutils.DamlStateKey data;
private final DamlStateKey data;

/**
*
* @param theData
*/
public QldbAddress(final DamlKvutils.DamlStateKey theData) {
public QldbAddress(final DamlStateKey theData) {
data = theData;
}

@Override
public DamlKvutils.DamlStateKey toKey() {
public DamlStateKey toKey() {
return data;
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2021 Blockchain Technology Partners
* Copyright 2021-2022 Blockchain Technology Partners
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
Expand All @@ -15,24 +15,24 @@

import java.util.Objects;

import com.daml.ledger.participant.state.kvutils.DamlKvutils;
import com.daml.ledger.participant.state.kvutils.store.DamlStateKey;

/**
*
*/
public final class QldbIdentifier implements Identifier {
private final DamlKvutils.DamlStateKey data;
private final DamlStateKey data;

/**
*
* @param theData
*/
public QldbIdentifier(final DamlKvutils.DamlStateKey theData) {
public QldbIdentifier(final DamlStateKey theData) {
data = theData;
}

@Override
public DamlKvutils.DamlStateKey toKey() {
public DamlStateKey toKey() {
return data;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2021 Blockchain Technology Partners
* Copyright 2021-2022 Blockchain Technology Partners
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2021 Blockchain Technology Partners
* Copyright 2021-2022 Blockchain Technology Partners
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
Expand Down Expand Up @@ -28,9 +28,9 @@
import com.blockchaintp.utility.Functions;
import com.blockchaintp.utility.UuidConverter;
import com.daml.api.util.TimeProvider;
import com.daml.ledger.participant.state.kvutils.DamlKvutils;
import com.daml.ledger.participant.state.kvutils.Raw;
import com.daml.ledger.participant.state.v1.SubmissionResult;
import com.daml.ledger.participant.state.kvutils.store.DamlLogEntryId;
import com.daml.ledger.participant.state.v2.SubmissionResult;
import com.daml.ledger.validator.SubmissionValidator$;
import com.daml.ledger.validator.ValidatingCommitter;
import com.daml.lf.engine.Engine;
Expand Down Expand Up @@ -78,8 +78,8 @@ public static <I extends Identifier, A extends LedgerAddress> InProcLedgerSubmit
* @param id
* @return A daml log entry id parsed from a daml log entry id.
*/
private DamlKvutils.DamlLogEntryId logEntryIdToDamlLogEntryId(final Raw.LogEntryId id) {
var parsed = Functions.uncheckFn(() -> DamlKvutils.DamlLogEntryId.parseFrom(id.bytes())).apply();
private DamlLogEntryId logEntryIdToDamlLogEntryId(final Raw.LogEntryId id) {
var parsed = Functions.uncheckFn(() -> DamlLogEntryId.parseFrom(id.bytes())).apply();

LOG.info("parse log id {}", () -> parsed.getEntryId().toString());

Expand Down Expand Up @@ -107,7 +107,7 @@ public InProcLedgerSubmitter(final Engine theEngine, final Metrics theMetrics,
new StateAccess(CoercingStore.from(Bijection.of(Raw.StateKey::bytes, Raw.StateKey$.MODULE$::apply),
Bijection.of(Raw.Envelope::bytes, Raw.Envelope$.MODULE$::apply), theStateStore), writer),
() -> logEntryIdToDamlLogEntryId(UuidConverter.uuidtoLogEntry(UUID.randomUUID())), false,
new StateCache<>(new LRUCache<>(STATE_CACHE_SIZE)), theEngine, theMetrics, false),
new StateCache<>(new LRUCache<>(STATE_CACHE_SIZE)), theEngine, theMetrics),
r -> {
/// New head should be end of sequence, i.e. one past the actual head. This should really have a
/// nicer type, we ensure it is monotonic here
Expand All @@ -125,19 +125,20 @@ public InProcLedgerSubmitter(final Engine theEngine, final Metrics theMetrics,
}

@Override
public CompletableFuture<SubmissionStatus> submitPayload(final CommitPayload<A> cp) {
public CompletableFuture<com.blockchaintp.daml.participant.SubmissionResult> submitPayload(
final CommitPayload<A> cp) {
return FutureConverters
.toJava(
this.comitter.commit(cp.getCorrelationId(), cp.getSubmission(), cp.getSubmittingParticipantId(), context))
.thenApply(x -> {
if (x == SubmissionResult.Acknowledged$.MODULE$) {
return SubmissionStatus.SUBMITTED;
return com.blockchaintp.daml.participant.SubmissionResult.submitted();
}
if (x == SubmissionResult.NotSupported$.MODULE$) {
return SubmissionStatus.REJECTED;
if (x instanceof SubmissionResult.SynchronousError) {
return com.blockchaintp.daml.participant.SubmissionResult.rejected((SubmissionResult.SynchronousError) x);
}
LOG.info("Overloaded {} {} ", cp.getCorrelationId(), x);
return SubmissionStatus.OVERLOADED;
return com.blockchaintp.daml.participant.SubmissionResult.overloaded();
}).toCompletableFuture();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2021 Blockchain Technology Partners
* Copyright 2021-2022 Blockchain Technology Partners
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
Expand Down Expand Up @@ -35,7 +35,7 @@ public interface LedgerSubmitter<A extends Identifier, B extends LedgerAddress>
* the payload to submit.
* @return a reference to the submission.
*/
CompletableFuture<SubmissionStatus> submitPayload(CommitPayload<A> cp);
CompletableFuture<SubmissionResult> submitPayload(CommitPayload<A> cp);

/**
* For convenience we can translate one the payload to the expected output payload.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2021 Blockchain Technology Partners
* Copyright 2021-2022 Blockchain Technology Partners
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
Expand Down Expand Up @@ -80,7 +80,7 @@ public LedgerSubmitterBulkhead(final LedgerSubmitter<A, B> theInner, final int m
}

@Override
public CompletableFuture<SubmissionStatus> submitPayload(final CommitPayload<A> cp) {
public CompletableFuture<SubmissionResult> submitPayload(final CommitPayload<A> cp) {
var executor = ContextAwareScheduledThreadPoolExecutor.newScheduledThreadPool().build();

var retry = Retry.of("Ledger submitter retry",
Expand All @@ -99,7 +99,7 @@ public CompletableFuture<SubmissionStatus> submitPayload(final CommitPayload<A>
.withBulkhead(bulkhead)
.withFallback(
Arrays.asList(TimeoutException.class, CallNotPermittedException.class, BulkheadFullException.class),
throwable -> SubmissionStatus.OVERLOADED)
throwable -> SubmissionResult.overloaded())
.withRetry(retry, executor).get().toCompletableFuture();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2021 Blockchain Technology Partners
* Copyright 2021-2022 Blockchain Technology Partners
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
Expand All @@ -13,6 +13,7 @@
*/
package com.blockchaintp.daml.participant;

import java.util.ArrayList;
import java.util.Comparator;
import java.util.Optional;
import java.util.concurrent.BlockingDeque;
Expand All @@ -26,14 +27,14 @@
import com.blockchaintp.daml.address.LedgerAddress;
import com.blockchaintp.daml.stores.service.TransactionLogReader;
import com.daml.ledger.api.health.HealthStatus;
import com.daml.ledger.participant.state.kvutils.OffsetBuilder;
import com.daml.ledger.offset.Offset;
import com.daml.ledger.participant.state.kvutils.KVOffsetBuilder;
import com.daml.ledger.participant.state.kvutils.Raw;
import com.daml.ledger.participant.state.kvutils.api.CommitMetadata;
import com.daml.ledger.participant.state.kvutils.api.LedgerReader;
import com.daml.ledger.participant.state.kvutils.api.LedgerRecord;
import com.daml.ledger.participant.state.kvutils.api.LedgerWriter;
import com.daml.ledger.participant.state.v1.Offset;
import com.daml.ledger.participant.state.v1.SubmissionResult;
import com.daml.ledger.participant.state.v2.SubmissionResult;
import com.daml.platform.akkastreams.dispatcher.Dispatcher;
import com.daml.platform.akkastreams.dispatcher.SubSource.RangeSource;
import com.daml.telemetry.TelemetryContext;
Expand Down Expand Up @@ -62,9 +63,11 @@ public final class Participant<I extends Identifier, A extends LedgerAddress> im
private final String ledgerId;
private final String participantId;
private final Dispatcher<Long> dispatcher;
private final BlockingDeque<CommitPayload<I>> submissions = new LinkedBlockingDeque<>();
private final BlockingDeque<CommitPayload<I>> submissions;
private final BlockingDeque<Tuple2<String, com.blockchaintp.daml.participant.SubmissionResult>> responses;
private final ExecutionContextExecutor context;
private final ScheduledExecutorService pollExecutor;
private final KVOffsetBuilder offsetBuilder = new KVOffsetBuilder((byte) 0);

/**
* Convenience method for creating a builder.
Expand Down Expand Up @@ -103,6 +106,8 @@ public Participant(final TransactionLogReader<Long, Raw.LogEntryId, Raw.Envelope
context = theContext;
pollExecutor = Executors.newSingleThreadScheduledExecutor();
pollExecutor.scheduleAtFixedRate(this::work, 0, 1, TimeUnit.MILLISECONDS);
responses = new LinkedBlockingDeque<>();
submissions = new LinkedBlockingDeque<>();
}

private void work() {
Expand All @@ -112,7 +117,9 @@ private void work() {
LOG.debug("Commit correlation id {}", next.getCorrelationId());

try {
var result = submitter.submitPayload(next).get();
com.blockchaintp.daml.participant.SubmissionResult result = submitter.submitPayload(next).get();

responses.push(Tuple2.apply(next.getCorrelationId(), result));

LOG.info("Submission result for {} {}", next.getCorrelationId(), result);

Expand All @@ -131,16 +138,16 @@ public HealthStatus currentHealth() {
public akka.stream.scaladsl.Source<LedgerRecord, NotUsed> events(final Option<Offset> startExclusive) {
LOG.info("Get from {}", () -> startExclusive);

var start = OffsetBuilder.fromLong(0L, 0, 0);
var start = offsetBuilder.of(0L, 0, 0);

Ordering<Long> scalaLongOrdering = Ordering.comparatorToOrdering(Comparator.comparingLong(scala.Long::unbox));

var rangeSource = new RangeSource<>((s, e) -> Source
.fromJavaStream(() -> API.unchecked(() -> txLog.from(s - 1, Optional.of(e - 1))).apply()
.map(r -> Tuple2.apply(r._1, LedgerRecord.apply(OffsetBuilder.fromLong(r._1, 0, 0), r._2, r._3))))
.map(r -> Tuple2.apply(r._1, LedgerRecord.apply(offsetBuilder.of(r._1, 0, 0), r._2, r._3))))
.mapMaterializedValue(m -> NotUsed.notUsed()).asScala(), scalaLongOrdering);

var offset = OffsetBuilder.highestIndex(startExclusive.getOrElse(() -> start));
var offset = offsetBuilder.highestIndex(startExclusive.getOrElse(() -> start));
return dispatcher.startingAt(offset, rangeSource, Option.empty()).asJava().map(x -> {
LOG.debug("Yield log {} {}", x._1, x._2);
return x;
Expand All @@ -161,7 +168,33 @@ public String participantId() {
public Future<SubmissionResult> commit(final String correlationId, final Raw.Envelope envelope,
final CommitMetadata metadata, final TelemetryContext telemetryContext) {

commitPayloadBuilder.build(envelope, metadata, correlationId).stream().forEach(submissions::add);
final var waitFor = new ArrayList<String>();

commitPayloadBuilder.build(envelope, metadata, correlationId).stream().forEach(submission -> {
waitFor.add(submission.getCorrelationId());
submissions.add(submission);
});

SubmissionResult lastError = null;

while (!waitFor.isEmpty()) {
var head = responses.peek();

if (head != null) {
if (waitFor.contains(head._1)) {
waitFor.remove(head._1);

if (head._2.getStatus() == SubmissionStatus.REJECTED) {
lastError = head._2.getError().get();
}
}
}

}

if (lastError != null) {
return Future.successful(lastError);
}

return Future.successful(SubmissionResult.Acknowledged$.MODULE$);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2021 Blockchain Technology Partners
* Copyright 2021-2022 Blockchain Technology Partners
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
Expand Down Expand Up @@ -28,6 +28,7 @@
import com.daml.ledger.validator.BatchingLedgerStateOperations;
import com.daml.ledger.validator.LedgerStateAccess;
import com.daml.ledger.validator.LedgerStateOperations;
import com.daml.logging.LoggingContext;

import kr.pe.kwonnam.slf4jlambda.LambdaLogger;
import kr.pe.kwonnam.slf4jlambda.LambdaLoggerFactory;
Expand Down Expand Up @@ -65,7 +66,7 @@ private class Operations extends BatchingLedgerStateOperations<Long> {

@Override
public Future<Seq<Option<Raw.Envelope>>> readState(final Iterable<Raw.StateKey> keys,
final ExecutionContext executionContext) {
final ExecutionContext executionContext, final LoggingContext loggingContext) {

var syncFuture = Future.fromTry(Try.apply(Functions.uncheckFn(() -> {
var sparseInputs = StreamConverters$.MODULE$.asJavaParStream(keys)
Expand All @@ -87,7 +88,7 @@ public Future<Seq<Option<Raw.Envelope>>> readState(final Iterable<Raw.StateKey>

@Override
public Future<BoxedUnit> writeState(final Iterable<scala.Tuple2<Raw.StateKey, Raw.Envelope>> keyValuePairs,
final ExecutionContext executionContext) {
final ExecutionContext executionContext, final LoggingContext loggingContext) {
LOG.debug("Write state {}", keyValuePairs);
var syncFuture = Future.fromTry(Try.apply(Functions.uncheckFn(() -> {
stateStore.put(new ArrayList<>(StreamConverters.asJavaParStream(keyValuePairs)
Expand All @@ -108,7 +109,7 @@ public Future<BoxedUnit> writeState(final Iterable<scala.Tuple2<Raw.StateKey, Ra
*/
@Override
public Future<Long> appendToLog(final Raw.LogEntryId key, final Raw.Envelope value,
final ExecutionContext executionContext) {
final ExecutionContext executionContext, final LoggingContext loggingContext) {

var work = sequenceAllocation.serialisedBegin(key)
.thenCompose(seq -> CompletableFuture.supplyAsync(() -> Functions.uncheckFn(() -> {
Expand All @@ -124,7 +125,7 @@ public Future<Long> appendToLog(final Raw.LogEntryId key, final Raw.Envelope val

@Override
public <T> Future<T> inTransaction(final Function1<LedgerStateOperations<Long>, Future<T>> body,
final ExecutionContext executionContext) {
final ExecutionContext executionContext, final LoggingContext loggingContext) {
return Future.delegate(() -> body.apply(new Operations()), executionContext);
}
}
Loading