From 09d97ceda4b3387a538cef063f1d8c7d2f22ec6a Mon Sep 17 00:00:00 2001 From: Florent Biville Date: Wed, 22 Jan 2025 22:04:34 +0100 Subject: [PATCH] Expose error policy and concurrency settings of batchable changes --- docs/includes/_batching.md | 9 +++ .../ext/neo4j/change/BatchErrorPolicy.java | 5 ++ .../ext/neo4j/change/BatchableChange.java | 61 +++++++++++++++++-- .../neo4j/change/InvertDirectionChange.java | 2 +- .../ext/neo4j/change/RenameLabelChange.java | 4 +- .../neo4j/change/RenamePropertyChange.java | 8 +-- .../ext/neo4j/change/RenameTypeChange.java | 4 +- .../ext/neo4j/database/KernelVersion.java | 2 + .../ext/neo4j/database/Neo4jDatabase.java | 5 -- .../xml/ns/neo4j/liquibase-neo4j-latest.xsd | 40 ++++++++++++ .../change/InvertDirectionChangeTest.groovy | 4 +- 11 files changed, 123 insertions(+), 21 deletions(-) create mode 100644 src/main/java/liquibase/ext/neo4j/change/BatchErrorPolicy.java diff --git a/docs/includes/_batching.md b/docs/includes/_batching.md index d8fb2c6c..31d16a4c 100644 --- a/docs/includes/_batching.md +++ b/docs/includes/_batching.md @@ -13,3 +13,12 @@ This results in the change being executed in batches. The `batchSize` attribute controls how many transactions run. If the attribute is not set, the batch size is defined on the Neo4j server side. + +The [error policy of inner transactions](https://neo4j.com/docs/cypher-manual/current/subqueries/subqueries-in-transactions/#error-behavior), +introduced in Neo4j 5.7, is configurable with the `batchErrorPolicy` attribute, and accepts the following values: + - `CONTINUE` + - `BREAK` + - `FAIL` + +Inner transactions can also be configured to [run in parallel](https://neo4j.com/docs/cypher-manual/current/subqueries/subqueries-in-transactions/#error-behavior) since Neo4j 5.21. +The boolean `concurrent` attribute controls that behavior. Concurrency is disabled by default. diff --git a/src/main/java/liquibase/ext/neo4j/change/BatchErrorPolicy.java b/src/main/java/liquibase/ext/neo4j/change/BatchErrorPolicy.java new file mode 100644 index 00000000..6d96cc22 --- /dev/null +++ b/src/main/java/liquibase/ext/neo4j/change/BatchErrorPolicy.java @@ -0,0 +1,5 @@ +package liquibase.ext.neo4j.change; + +public enum BatchErrorPolicy { + CONTINUE, BREAK, FAIL +} diff --git a/src/main/java/liquibase/ext/neo4j/change/BatchableChange.java b/src/main/java/liquibase/ext/neo4j/change/BatchableChange.java index 644d4f6c..1066c42c 100644 --- a/src/main/java/liquibase/ext/neo4j/change/BatchableChange.java +++ b/src/main/java/liquibase/ext/neo4j/change/BatchableChange.java @@ -4,6 +4,7 @@ import liquibase.change.AbstractChange; import liquibase.database.Database; import liquibase.exception.ValidationErrors; +import liquibase.ext.neo4j.database.KernelVersion; import liquibase.ext.neo4j.database.Neo4jDatabase; import liquibase.logging.Logger; import liquibase.statement.SqlStatement; @@ -14,6 +15,10 @@ abstract class BatchableChange extends AbstractChange { protected Long batchSize; + private Boolean concurrent; + + private BatchErrorPolicy batchErrorPolicy; + @Override public ValidationErrors validate(Database database) { ValidationErrors validation = new ValidationErrors(this); @@ -23,12 +28,27 @@ public ValidationErrors validate(Database database) { if (!enableBatchImport && batchSize != null) { validation.addError("batch size must be set only if enableBatchImport is set to true"); } - if (batchSize != null && batchSize <= 0) { - validation.addError("batch size, if set, must be strictly positive"); + if (!enableBatchImport && concurrent != null) { + validation.addError("concurrent must be set only if enableBatchImport is set to true"); + } + if (!enableBatchImport && batchErrorPolicy != null) { + validation.addError("batchErrorPolicy must be set only if enableBatchImport is set to true"); } Neo4jDatabase neo4j = (Neo4jDatabase) database; - if (enableBatchImport && !neo4j.supportsCallInTransactions()) { - validation.addWarning("this version of Neo4j does not support CALL {} IN TRANSACTIONS, all batch import settings are ignored"); + if (enableBatchImport) { + KernelVersion version = neo4j.getKernelVersion(); + if (version.compareTo(KernelVersion.V4_4_0) < 0) { + validation.addWarning("this version of Neo4j does not support CALL {} IN TRANSACTIONS, all batch import settings are ignored"); + } + if (batchSize != null && batchSize <= 0) { + validation.addError("batch size, if set, must be strictly positive"); + } + if (batchErrorPolicy != null && version.compareTo(KernelVersion.V5_7_0) < 0) { + validation.addError("this version of Neo4j does not support the configuration of CALL {} IN TRANSACTIONS error behavior (ON ERROR), Neo4j 5.7 or later is required"); + } + if (concurrent != null && concurrent && version.compareTo(KernelVersion.V5_21_0) < 0) { + validation.addError("this version of Neo4j does not support CALL {} IN CONCURRENT TRANSACTIONS, Neo4j 5.21 or later is required"); + } } validation.addAll(super.validate(database)); return validation; @@ -38,7 +58,7 @@ public ValidationErrors validate(Database database) { public SqlStatement[] generateStatements(Database database) { Logger log = Scope.getCurrentScope().getLog(getClass()); Neo4jDatabase neo4j = (Neo4jDatabase) database; - boolean supportsCallInTransactions = neo4j.supportsCallInTransactions(); + boolean supportsCallInTransactions = neo4j.getKernelVersion().compareTo(KernelVersion.V4_4_0) >= 0; if (supportsCallInTransactions && enableBatchImport) { log.info("Running change in CALL {} IN TRANSACTIONS"); return generateBatchedStatements(neo4j); @@ -72,7 +92,36 @@ public void setBatchSize(Long batchSize) { this.batchSize = batchSize; } + public Boolean getConcurrent() { + return concurrent; + } + + public void setConcurrent(Boolean concurrent) { + this.concurrent = concurrent; + } + + public BatchErrorPolicy getBatchErrorPolicy() { + return batchErrorPolicy; + } + + public void setBatchErrorPolicy(BatchErrorPolicy batchErrorPolicy) { + this.batchErrorPolicy = batchErrorPolicy; + } + protected String cypherBatchSpec() { - return batchSize != null ? String.format(" OF %d ROWS", batchSize) : ""; + StringBuilder builder = new StringBuilder(); + builder.append(" IN"); + if (concurrent != null && concurrent) { + builder.append(" CONCURRENT"); + } + builder.append(" TRANSACTIONS"); + if (batchSize != null) { + builder.append(String.format(" OF %d ROWS", batchSize)); + } + if (batchErrorPolicy != null) { + builder.append(String.format(" ON ERROR %s", batchErrorPolicy)); + } + return builder.toString(); } + } diff --git a/src/main/java/liquibase/ext/neo4j/change/InvertDirectionChange.java b/src/main/java/liquibase/ext/neo4j/change/InvertDirectionChange.java index cddbc154..899d8d5d 100644 --- a/src/main/java/liquibase/ext/neo4j/change/InvertDirectionChange.java +++ b/src/main/java/liquibase/ext/neo4j/change/InvertDirectionChange.java @@ -66,7 +66,7 @@ protected SqlStatement[] generateBatchedStatements(Neo4jDatabase database) { " CREATE (__start__)<-[__newrel__:`%s`]-(__end__) " + " SET __newrel__ = properties(__rel__) " + " DELETE __rel__ " + - "} IN TRANSACTIONS%s", queryStart(), type, cypherBatchSpec()); + "}%s", queryStart(), type, cypherBatchSpec()); return new SqlStatement[]{new RawParameterizedSqlStatement(cypher, type)}; } diff --git a/src/main/java/liquibase/ext/neo4j/change/RenameLabelChange.java b/src/main/java/liquibase/ext/neo4j/change/RenameLabelChange.java index d8efc921..b8a1c24a 100644 --- a/src/main/java/liquibase/ext/neo4j/change/RenameLabelChange.java +++ b/src/main/java/liquibase/ext/neo4j/change/RenameLabelChange.java @@ -67,11 +67,11 @@ public String getConfirmationMessage() { @Override protected SqlStatement[] generateBatchedStatements(Neo4jDatabase neo4j) { if (supportsDynamicLabels(neo4j)) { - String cypher = String.format("%s CALL {WITH __node__ SET __node__:$($1) REMOVE __node__:$($2)} IN TRANSACTIONS%s", + String cypher = String.format("%s CALL {WITH __node__ SET __node__:$($1) REMOVE __node__:$($2)}%s", queryStart(neo4j), cypherBatchSpec()); return new SqlStatement[]{new RawParameterizedSqlStatement(cypher, to, from)}; } - String cypher = String.format("%s CALL {WITH __node__ SET __node__:`%s` REMOVE __node__:`%s`} IN TRANSACTIONS%s", + String cypher = String.format("%s CALL {WITH __node__ SET __node__:`%s` REMOVE __node__:`%s`}%s", queryStart(neo4j), to, from, cypherBatchSpec()); return new SqlStatement[]{new RawSqlStatement(cypher)}; } diff --git a/src/main/java/liquibase/ext/neo4j/change/RenamePropertyChange.java b/src/main/java/liquibase/ext/neo4j/change/RenamePropertyChange.java index 4a4bde7f..2cb00733 100644 --- a/src/main/java/liquibase/ext/neo4j/change/RenamePropertyChange.java +++ b/src/main/java/liquibase/ext/neo4j/change/RenamePropertyChange.java @@ -63,12 +63,12 @@ public String getConfirmationMessage() { protected SqlStatement[] generateBatchedStatements(Neo4jDatabase database) { String batchSpec = cypherBatchSpec(); if (supportsDynamicProperties(database)) { - String nodeRename = String.format("MATCH (n) WHERE n[$1] IS NOT NULL CALL { WITH n SET n[$2] = n[$1] REMOVE n[$1] } IN TRANSACTIONS%s", batchSpec); - String relRename = String.format("MATCH ()-[r]->() WHERE r[$1] IS NOT NULL CALL { WITH r SET r[$2] = r[$1] REMOVE r[$1] } IN TRANSACTIONS%s", batchSpec); + String nodeRename = String.format("MATCH (n) WHERE n[$1] IS NOT NULL CALL { WITH n SET n[$2] = n[$1] REMOVE n[$1] }%s", batchSpec); + String relRename = String.format("MATCH ()-[r]->() WHERE r[$1] IS NOT NULL CALL { WITH r SET r[$2] = r[$1] REMOVE r[$1] }%s", batchSpec); return filterStatements(nodeRename, relRename); } - String nodeRename = String.format("MATCH (n) WHERE n[$1] IS NOT NULL CALL { WITH n SET n.`%2$s` = n[$1] REMOVE n.`%1$s` } IN TRANSACTIONS%3$s", from, to, batchSpec); - String relRename = String.format("MATCH ()-[r]->() WHERE r[$1] IS NOT NULL CALL { WITH r SET r.`%2$s` = r[$1] REMOVE r.`%1$s` } IN TRANSACTIONS%3$s", from, to, batchSpec); + String nodeRename = String.format("MATCH (n) WHERE n[$1] IS NOT NULL CALL { WITH n SET n.`%2$s` = n[$1] REMOVE n.`%1$s` }%3$s", from, to, batchSpec); + String relRename = String.format("MATCH ()-[r]->() WHERE r[$1] IS NOT NULL CALL { WITH r SET r.`%2$s` = r[$1] REMOVE r.`%1$s` }%3$s", from, to, batchSpec); return filterStatements(nodeRename, relRename); } diff --git a/src/main/java/liquibase/ext/neo4j/change/RenameTypeChange.java b/src/main/java/liquibase/ext/neo4j/change/RenameTypeChange.java index 98cad8a4..51409e5d 100644 --- a/src/main/java/liquibase/ext/neo4j/change/RenameTypeChange.java +++ b/src/main/java/liquibase/ext/neo4j/change/RenameTypeChange.java @@ -72,7 +72,7 @@ protected SqlStatement[] generateBatchedStatements(Neo4jDatabase neo4j) { "CREATE (__start__)-[__newrel__:$($2)]->(__end__) " + "SET __newrel__ = properties(__rel__) " + "DELETE __rel__ } " + - "IN TRANSACTIONS%s", queryStart(neo4j), cypherBatchSpec()); + "%s", queryStart(neo4j), cypherBatchSpec()); return new SqlStatement[]{new RawParameterizedSqlStatement(cypher, from, to)}; } @@ -82,7 +82,7 @@ protected SqlStatement[] generateBatchedStatements(Neo4jDatabase neo4j) { "CREATE (__start__)-[__newrel__:`%s`]->(__end__) " + "SET __newrel__ = properties(__rel__) " + "DELETE __rel__ } " + - "IN TRANSACTIONS%s", queryStart(neo4j), to, cypherBatchSpec()); + "%s", queryStart(neo4j), to, cypherBatchSpec()); return new SqlStatement[]{new RawParameterizedSqlStatement(cypher, from)}; } diff --git a/src/main/java/liquibase/ext/neo4j/database/KernelVersion.java b/src/main/java/liquibase/ext/neo4j/database/KernelVersion.java index 9fe59e1a..2cf3f20f 100644 --- a/src/main/java/liquibase/ext/neo4j/database/KernelVersion.java +++ b/src/main/java/liquibase/ext/neo4j/database/KernelVersion.java @@ -9,7 +9,9 @@ public class KernelVersion implements Comparable { public static final KernelVersion V4_3_0 = new KernelVersion(4, 3, 0); public static final KernelVersion V4_4_0 = new KernelVersion(4, 4, 0); public static final KernelVersion V5_0_0 = new KernelVersion(5, 0, 0); + public static final KernelVersion V5_7_0 = new KernelVersion(5, 7, 0); public static final KernelVersion V5_24_0 = new KernelVersion(5, 24, 0); + public static final KernelVersion V5_21_0 = new KernelVersion(5, 21, 0); public static final KernelVersion V5_26_0 = new KernelVersion(5, 26, 0); private final int major; diff --git a/src/main/java/liquibase/ext/neo4j/database/Neo4jDatabase.java b/src/main/java/liquibase/ext/neo4j/database/Neo4jDatabase.java index b23ae1b9..d6efe948 100644 --- a/src/main/java/liquibase/ext/neo4j/database/Neo4jDatabase.java +++ b/src/main/java/liquibase/ext/neo4j/database/Neo4jDatabase.java @@ -224,11 +224,6 @@ public void execute(SqlStatement statement) throws LiquibaseException { return jdbcExecutor().queryForList(statement); } - // FIXME: inline and remove this - public boolean supportsCallInTransactions() { - return kernelVersion.compareTo(KernelVersion.V4_4_0) >= 0; - } - public KernelVersion getKernelVersion() { return kernelVersion; } diff --git a/src/main/resources/www.liquibase.org/xml/ns/neo4j/liquibase-neo4j-latest.xsd b/src/main/resources/www.liquibase.org/xml/ns/neo4j/liquibase-neo4j-latest.xsd index ba4f5d85..36feb60f 100644 --- a/src/main/resources/www.liquibase.org/xml/ns/neo4j/liquibase-neo4j-latest.xsd +++ b/src/main/resources/www.liquibase.org/xml/ns/neo4j/liquibase-neo4j-latest.xsd @@ -95,6 +95,16 @@ + + + + + + + + + + @@ -105,6 +115,16 @@ + + + + + + + + + + @@ -114,6 +134,16 @@ + + + + + + + + + + @@ -123,5 +153,15 @@ + + + + + + + + + + diff --git a/src/test/groovy/liquibase/ext/neo4j/change/InvertDirectionChangeTest.groovy b/src/test/groovy/liquibase/ext/neo4j/change/InvertDirectionChangeTest.groovy index fcb4c80d..d2578f0b 100644 --- a/src/test/groovy/liquibase/ext/neo4j/change/InvertDirectionChangeTest.groovy +++ b/src/test/groovy/liquibase/ext/neo4j/change/InvertDirectionChangeTest.groovy @@ -2,6 +2,7 @@ package liquibase.ext.neo4j.change import liquibase.changelog.ChangeSet import liquibase.database.core.MySQLDatabase +import liquibase.ext.neo4j.database.KernelVersion import liquibase.ext.neo4j.database.Neo4jDatabase import spock.lang.Specification @@ -30,7 +31,8 @@ class InvertDirectionChangeTest extends Specification { changeSet.runInTransaction >> runInTx renameLabelChange.setChangeSet(changeSet) def database = Mock(Neo4jDatabase) - database.supportsCallInTransactions() >> withCIT + def version = (withCIT) ? KernelVersion.V4_4_0 : KernelVersion.V4_3_0 + database.getKernelVersion() >> version expect: renameLabelChange.validate(database).getErrorMessages() == [error]