Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

LAPITT tests for pruning offset in transaction streams #19470

Merged
merged 1 commit into from
Jul 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ final case class Features(
userAndPartyLocalMetadataExtensions: Boolean = false,
acsActiveAtOffsetFeature: Boolean = false,
templateFilters: Boolean = false,
prunedOffsets: Boolean = false,
)

object Features {
Expand Down Expand Up @@ -56,6 +57,7 @@ object Features {
experimental.getUserAndPartyLocalMetadataExtensions.supported,
acsActiveAtOffsetFeature = experimental.getAcsActiveAtOffset.supported,
templateFilters = experimental.getTemplateFilters.supported,
prunedOffsets = experimental.getPrunedOffsets.supported,
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
package com.daml.ledger.api.testtool.infrastructure.participant

import com.daml.error.ErrorCode

import java.time.Instant

import com.daml.ledger.api.testtool.infrastructure.Endpoint
import com.daml.ledger.api.testtool.infrastructure.participant.ParticipantTestContext.{
CompletionResponse,
Expand Down Expand Up @@ -68,6 +68,7 @@ import com.daml.ledger.api.v1.event_query_service.{
import com.daml.ledger.api.v1.transaction_service.{
GetTransactionByEventIdRequest,
GetTransactionByIdRequest,
GetTransactionTreesResponse,
GetTransactionsRequest,
GetTransactionsResponse,
}
Expand Down Expand Up @@ -226,6 +227,13 @@ trait ParticipantTestContext extends UserManagementTestContext {
parties: Party*
): Future[Vector[Transaction]]

/** Non-managed version of [[rawFlatTransactions]], use this only if you need to tweak the request (i.e. to test low-level details)
*/
def rawFlatTransactions(
take: Int,
request: GetTransactionsRequest,
): Future[Vector[GetTransactionsResponse]]

/** Non-managed version of [[flatTransactions]], use this only if you need to tweak the request (i.e. to test low-level details)
*/
def flatTransactions(request: GetTransactionsRequest): Future[Vector[Transaction]]
Expand All @@ -246,6 +254,13 @@ trait ParticipantTestContext extends UserManagementTestContext {
parties: Party*
): Future[Vector[TransactionTree]]

/** Non-managed version of [[rawTransactionTrees]], use this only if you need to tweak the request (i.e. to test low-level details)
*/
def rawTransactionTrees(
take: Int,
request: GetTransactionsRequest,
): Future[Vector[GetTransactionTreesResponse]]

/** Non-managed version of [[transactionTrees]], use this only if you need to tweak the request (i.e. to test low-level details)
*/
def transactionTrees(request: GetTransactionsRequest): Future[Vector[TransactionTree]]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
package com.daml.ledger.api.testtool.infrastructure.participant

import com.daml.error.ErrorCode

import java.time.{Clock, Instant}

import com.daml.grpc.test.StreamConsumer
import com.daml.ledger.api.testtool.infrastructure.Eventually.eventually
import com.daml.ledger.api.testtool.infrastructure.ProtobufConverters._
Expand Down Expand Up @@ -104,6 +104,7 @@ import com.daml.ledger.api.v1.transaction_service.{
GetLedgerEndRequest,
GetTransactionByEventIdRequest,
GetTransactionByIdRequest,
GetTransactionTreesResponse,
GetTransactionsRequest,
GetTransactionsResponse,
}
Expand All @@ -125,8 +126,8 @@ import io.grpc.StatusRuntimeException
import io.grpc.health.v1.health.{HealthCheckRequest, HealthCheckResponse}
import io.grpc.protobuf.StatusProto
import io.grpc.stub.StreamObserver

import java.util.{List => JList}

import scala.concurrent.duration.DurationInt
import scala.concurrent.{ExecutionContext, Future}
import scala.jdk.CollectionConverters._
Expand Down Expand Up @@ -463,6 +464,12 @@ final class SingleParticipantTestContext private[participant] (
): Future[Vector[Transaction]] =
flatTransactions(getTransactionsRequest(transactionFilter(parties, Seq(templateId))))

override def rawFlatTransactions(
take: Int,
request: GetTransactionsRequest,
): Future[Vector[GetTransactionsResponse]] =
transactions(take, request, services.transaction.getTransactions)

override def flatTransactions(request: GetTransactionsRequest): Future[Vector[Transaction]] =
transactions(request, services.transaction.getTransactions)
.map(_.flatMap(_.transactions))
Expand Down Expand Up @@ -499,7 +506,11 @@ final class SingleParticipantTestContext private[participant] (
): Future[Vector[TransactionTree]] =
transactions(take, request, services.transaction.getTransactionTrees)
.map(_.flatMap(_.transactions))

override def rawTransactionTrees(
take: Int,
request: GetTransactionsRequest,
): Future[Vector[GetTransactionTreesResponse]] =
transactions(take, request, services.transaction.getTransactionTrees)
override def transactionTrees(take: Int, parties: Party*): Future[Vector[TransactionTree]] =
transactionTrees(take, getTransactionsRequest(transactionFilter(parties)))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@
package com.daml.ledger.api.testtool.infrastructure.participant

import com.daml.error.ErrorCode

import java.time.Instant
import java.util.concurrent.TimeoutException

import com.daml.ledger.api.testtool.infrastructure.Endpoint
import com.daml.ledger.api.testtool.infrastructure.participant.ParticipantTestContext.IncludeInterfaceView
import com.daml.ledger.api.testtool.infrastructure.time.{DelayMechanism, Durations}
Expand Down Expand Up @@ -66,6 +66,7 @@ import com.daml.ledger.api.v1.event_query_service.{
import com.daml.ledger.api.v1.transaction_service.{
GetTransactionByEventIdRequest,
GetTransactionByIdRequest,
GetTransactionTreesResponse,
GetTransactionsRequest,
GetTransactionsResponse,
}
Expand All @@ -76,8 +77,8 @@ import com.daml.timer.Delayed
import com.google.protobuf.ByteString
import io.grpc.health.v1.health.HealthCheckResponse
import io.grpc.stub.StreamObserver

import java.util.{List => JList}

import scala.concurrent.duration.DurationInt
import scala.concurrent.{ExecutionContext, Future}

Expand Down Expand Up @@ -263,6 +264,15 @@ class TimeoutParticipantTestContext(timeoutScaleFactor: Double, delegate: Partic
s"Flat transaction by template id $templateId for parties $parties",
delegate.flatTransactionsByTemplateId(templateId, parties: _*),
)

override def rawFlatTransactions(
take: Int,
request: GetTransactionsRequest,
): Future[Vector[GetTransactionsResponse]] =
withTimeout(
s"Flat transactions for request $request",
delegate.rawFlatTransactions(take, request),
)
override def flatTransactions(request: GetTransactionsRequest): Future[Vector[Transaction]] =
withTimeout(s"Flat transactions for request $request", delegate.flatTransactions(request))
override def flatTransactions(parties: Party*): Future[Vector[Transaction]] =
Expand All @@ -286,6 +296,14 @@ class TimeoutParticipantTestContext(timeoutScaleFactor: Double, delegate: Partic
s"Transaction trees by template id $templateId for parties $parties",
delegate.transactionTreesByTemplateId(templateId, parties: _*),
)
override def rawTransactionTrees(
take: Int,
request: GetTransactionsRequest,
): Future[Vector[GetTransactionTreesResponse]] =
withTimeout(
s"Transaction trees for request $request",
delegate.rawTransactionTrees(take, request),
)
override def transactionTrees(request: GetTransactionsRequest): Future[Vector[TransactionTree]] =
withTimeout(s"Transaction trees for request $request", delegate.transactionTrees(request))
override def transactionTrees(parties: Party*): Future[Vector[TransactionTree]] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@ import com.daml.ledger.test.java.semantic.divulgencetests
import com.daml.logging.LoggingContext

import java.util.{List => JList}

import com.daml.ledger.api.v1.transaction_service.{
GetTransactionTreesResponse,
GetTransactionsResponse,
}

import scala.concurrent.duration.DurationInt
import scala.concurrent.{ExecutionContext, Future}

Expand Down Expand Up @@ -908,6 +914,142 @@ class ParticipantPruningIT extends LedgerTestSuite {
}
})

test(
"PRTPFailToRequestPruningOffsetsWithEnd",
"Pruning offsets should not be served on streams with end specified",
allocate(SingleParty),
enabled = _.prunedOffsets,
disabledReason = "Ledger does not support pruned offset streaming",
)(implicit ec => {
case Participants(Participant(ledger, party)) => {
val req = ledger
.getTransactionsRequest(ledger.transactionFilter(parties = Seq(party)))
.copy(sendPrunedOffsets = true)

for {
flatFailure <- ledger
.rawFlatTransactions(1, req)
.mustFail("subscribing past the ledger end")
treeFailure <- ledger
.rawTransactionTrees(1, req)
.mustFail("subscribing past the ledger end")
} yield {
assertGrpcError(
flatFailure,
LedgerApiErrors.RequestValidation.InvalidArgument,
Some("Pruning offsets requested on a stream with an explicit end"),
)
assertGrpcError(
treeFailure,
LedgerApiErrors.RequestValidation.InvalidArgument,
Some("Pruning offsets requested on a stream with an explicit end"),
)
}
}
})

test(
"PRTServePruningOffsets",
"Pruning offsets should be served when requested",
allocate(SingleParty),
enabled = _.prunedOffsets,
disabledReason = "Ledger does not support pruned offset streaming",
runConcurrently = false,
)(implicit ec => {
case Participants(Participant(ledger, party)) => {
val expectedMsgs = 11

def validate[Response](
transactions: Vector[Response],
extractOffset: Response => String,
offsetToPruneUpTo: LedgerOffset,
): Unit = {
assert(
transactions.size == expectedMsgs,
s"$expectedMsgs should have been submitted but ${transactions.size} were instead",
)
val expectedOffset = offsetToPruneUpTo.value.absolute.fold("")(identity)
assert(
transactions.exists(extractOffset(_) == expectedOffset),
s"pruning offset of $expectedOffset should have been received",
)
}

val req = ledger
.getTransactionsRequest(ledger.transactionFilter(parties = Seq(party)))
.copy(sendPrunedOffsets = true)
.copy(end = None)
val flatsF = ledger.rawFlatTransactions(expectedMsgs, req)
val treesF = ledger.rawTransactionTrees(expectedMsgs, req)
for {
offsetToPruneUpTo <- ledger.currentEnd()
_ <- Future.sequence(
Vector.fill((expectedMsgs - 1) / 2)(ledger.create(party, new Dummy(party)))
)
_ <- ledger.prune(offsetToPruneUpTo)
_ <- Future.sequence(
Vector.fill((expectedMsgs - 1) / 2)(ledger.create(party, new Dummy(party)))
)
flats <- flatsF
trees <- treesF

} yield {
validate[GetTransactionsResponse](flats, _.prunedOffset, offsetToPruneUpTo)
validate[GetTransactionTreesResponse](trees, _.prunedOffset, offsetToPruneUpTo)
}
}
})

test(
"PRTDontServePruningOffsets",
"Pruning offsets should not be served when not requested",
allocate(SingleParty),
enabled = _.prunedOffsets,
disabledReason = "Ledger does not support pruned offset streaming",
runConcurrently = false,
)(implicit ec => {
case Participants(Participant(ledger, party)) => {
val expectedMsgs = 10

def validate[Response](
transactions: Vector[Response],
validateNoOffset: Response => Boolean,
): Unit = {
assert(
transactions.size == expectedMsgs,
s"$expectedMsgs should have been submitted but ${transactions.size} were instead",
)
assert(
transactions.forall(validateNoOffset),
s"pruning offset was received",
)
}

val req = ledger
.getTransactionsRequest(ledger.transactionFilter(parties = Seq(party)))
.copy(sendPrunedOffsets = false)
.copy(end = None)
val flatsF = ledger.rawFlatTransactions(expectedMsgs, req)
val treesF = ledger.rawTransactionTrees(expectedMsgs, req)
for {
offsetToPruneUpTo <- ledger.currentEnd()
_ <- Future.sequence(
Vector.fill(expectedMsgs / 2)(ledger.create(party, new Dummy(party)))
)
_ <- ledger.prune(offsetToPruneUpTo)
_ <- Future.sequence(
Vector.fill(expectedMsgs / 2)(ledger.create(party, new Dummy(party)))
)
flats <- flatsF
trees <- treesF

} yield {
validate[GetTransactionsResponse](flats, _.prunedOffset.isEmpty)
validate[GetTransactionTreesResponse](trees, _.prunedOffset.isEmpty)
}
}
})

private def createDivulgence(
alice: Party,
bob: Party,
Expand Down