Skip to content

Commit

Permalink
Expose error policy and concurrency settings of batchable changes (#688)
Browse files Browse the repository at this point in the history
This allows users running recent versions of Neo4j to get the benefits of `CALL {} IN CONCURRENT TRANSACTIONS` and/or `ON ERROR ...` clauses for the changes that can currently run in batch, i.e.:

  - rename label
  - rename type
  - rename property
  - invert direction
  • Loading branch information
fbiville authored Feb 6, 2025
1 parent 9a42efb commit 4b5cac9
Show file tree
Hide file tree
Showing 42 changed files with 811 additions and 39 deletions.
9 changes: 9 additions & 0 deletions docs/includes/_batching.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,12 @@ This results in the change being executed in batches.

The `batchSize` attribute controls how many transactions run.
If the attribute is not set, the batch size is defined on the Neo4j server side.

The [error policy of inner transactions](https://neo4j.com/docs/cypher-manual/current/subqueries/subqueries-in-transactions/#error-behavior),
introduced in Neo4j 5.7, is configurable with the `batchErrorPolicy` attribute, and accepts the following values:
- `CONTINUE`
- `BREAK`
- `FAIL`

Inner transactions can also be configured to [run in parallel](https://neo4j.com/docs/cypher-manual/current/subqueries/subqueries-in-transactions/#error-behavior) since Neo4j 5.21.
The boolean `concurrent` attribute controls that behavior. Concurrency is disabled by default.
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package liquibase.ext.neo4j.change;

public enum BatchErrorPolicy {
CONTINUE, BREAK, FAIL
}
61 changes: 55 additions & 6 deletions src/main/java/liquibase/ext/neo4j/change/BatchableChange.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import liquibase.change.AbstractChange;
import liquibase.database.Database;
import liquibase.exception.ValidationErrors;
import liquibase.ext.neo4j.database.KernelVersion;
import liquibase.ext.neo4j.database.Neo4jDatabase;
import liquibase.logging.Logger;
import liquibase.statement.SqlStatement;
Expand All @@ -14,6 +15,10 @@ abstract class BatchableChange extends AbstractChange {

protected Long batchSize;

private Boolean concurrent;

private BatchErrorPolicy batchErrorPolicy;

@Override
public ValidationErrors validate(Database database) {
ValidationErrors validation = new ValidationErrors(this);
Expand All @@ -23,12 +28,27 @@ public ValidationErrors validate(Database database) {
if (!enableBatchImport && batchSize != null) {
validation.addError("batch size must be set only if enableBatchImport is set to true");
}
if (batchSize != null && batchSize <= 0) {
validation.addError("batch size, if set, must be strictly positive");
if (!enableBatchImport && concurrent != null) {
validation.addError("concurrent must be set only if enableBatchImport is set to true");
}
if (!enableBatchImport && batchErrorPolicy != null) {
validation.addError("batchErrorPolicy must be set only if enableBatchImport is set to true");
}
Neo4jDatabase neo4j = (Neo4jDatabase) database;
if (enableBatchImport && !neo4j.supportsCallInTransactions()) {
validation.addWarning("this version of Neo4j does not support CALL {} IN TRANSACTIONS, all batch import settings are ignored");
if (enableBatchImport) {
KernelVersion version = neo4j.getKernelVersion();
if (version.compareTo(KernelVersion.V4_4_0) < 0) {
validation.addWarning("this version of Neo4j does not support CALL {} IN TRANSACTIONS, all batch import settings are ignored");
}
if (batchSize != null && batchSize <= 0) {
validation.addError("batch size, if set, must be strictly positive");
}
if (batchErrorPolicy != null && version.compareTo(KernelVersion.V5_7_0) < 0) {
validation.addError("this version of Neo4j does not support the configuration of CALL {} IN TRANSACTIONS error behavior (ON ERROR), Neo4j 5.7 or later is required");
}
if (concurrent != null && concurrent && version.compareTo(KernelVersion.V5_21_0) < 0) {
validation.addError("this version of Neo4j does not support CALL {} IN CONCURRENT TRANSACTIONS, Neo4j 5.21 or later is required");
}
}
validation.addAll(super.validate(database));
return validation;
Expand All @@ -38,7 +58,7 @@ public ValidationErrors validate(Database database) {
public SqlStatement[] generateStatements(Database database) {
Logger log = Scope.getCurrentScope().getLog(getClass());
Neo4jDatabase neo4j = (Neo4jDatabase) database;
boolean supportsCallInTransactions = neo4j.supportsCallInTransactions();
boolean supportsCallInTransactions = neo4j.getKernelVersion().compareTo(KernelVersion.V4_4_0) >= 0;
if (supportsCallInTransactions && enableBatchImport) {
log.info("Running change in CALL {} IN TRANSACTIONS");
return generateBatchedStatements(neo4j);
Expand Down Expand Up @@ -72,7 +92,36 @@ public void setBatchSize(Long batchSize) {
this.batchSize = batchSize;
}

public Boolean getConcurrent() {
return concurrent;
}

public void setConcurrent(Boolean concurrent) {
this.concurrent = concurrent;
}

public BatchErrorPolicy getBatchErrorPolicy() {
return batchErrorPolicy;
}

public void setBatchErrorPolicy(BatchErrorPolicy batchErrorPolicy) {
this.batchErrorPolicy = batchErrorPolicy;
}

protected String cypherBatchSpec() {
return batchSize != null ? String.format(" OF %d ROWS", batchSize) : "";
StringBuilder builder = new StringBuilder();
builder.append(" IN");
if (concurrent != null && concurrent) {
builder.append(" CONCURRENT");
}
builder.append(" TRANSACTIONS");
if (batchSize != null) {
builder.append(String.format(" OF %d ROWS", batchSize));
}
if (batchErrorPolicy != null) {
builder.append(String.format(" ON ERROR %s", batchErrorPolicy));
}
return builder.toString();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ protected SqlStatement[] generateBatchedStatements(Neo4jDatabase database) {
" CREATE (__start__)<-[__newrel__:`%s`]-(__end__) " +
" SET __newrel__ = properties(__rel__) " +
" DELETE __rel__ " +
"} IN TRANSACTIONS%s", queryStart(), type, cypherBatchSpec());
"}%s", queryStart(), type, cypherBatchSpec());
return new SqlStatement[]{new RawParameterizedSqlStatement(cypher, type)};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,11 @@ public String getConfirmationMessage() {
@Override
protected SqlStatement[] generateBatchedStatements(Neo4jDatabase neo4j) {
if (supportsDynamicLabels(neo4j)) {
String cypher = String.format("%s CALL {WITH __node__ SET __node__:$($1) REMOVE __node__:$($2)} IN TRANSACTIONS%s",
String cypher = String.format("%s CALL {WITH __node__ SET __node__:$($1) REMOVE __node__:$($2)}%s",
queryStart(neo4j), cypherBatchSpec());
return new SqlStatement[]{new RawParameterizedSqlStatement(cypher, to, from)};
}
String cypher = String.format("%s CALL {WITH __node__ SET __node__:`%s` REMOVE __node__:`%s`} IN TRANSACTIONS%s",
String cypher = String.format("%s CALL {WITH __node__ SET __node__:`%s` REMOVE __node__:`%s`}%s",
queryStart(neo4j), to, from, cypherBatchSpec());
return new SqlStatement[]{new RawSqlStatement(cypher)};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,12 @@ public String getConfirmationMessage() {
protected SqlStatement[] generateBatchedStatements(Neo4jDatabase database) {
String batchSpec = cypherBatchSpec();
if (supportsDynamicProperties(database)) {
String nodeRename = String.format("MATCH (n) WHERE n[$1] IS NOT NULL CALL { WITH n SET n[$2] = n[$1] REMOVE n[$1] } IN TRANSACTIONS%s", batchSpec);
String relRename = String.format("MATCH ()-[r]->() WHERE r[$1] IS NOT NULL CALL { WITH r SET r[$2] = r[$1] REMOVE r[$1] } IN TRANSACTIONS%s", batchSpec);
String nodeRename = String.format("MATCH (n) WHERE n[$1] IS NOT NULL CALL { WITH n SET n[$2] = n[$1] REMOVE n[$1] }%s", batchSpec);
String relRename = String.format("MATCH ()-[r]->() WHERE r[$1] IS NOT NULL CALL { WITH r SET r[$2] = r[$1] REMOVE r[$1] }%s", batchSpec);
return filterStatements(nodeRename, relRename);
}
String nodeRename = String.format("MATCH (n) WHERE n[$1] IS NOT NULL CALL { WITH n SET n.`%2$s` = n[$1] REMOVE n.`%1$s` } IN TRANSACTIONS%3$s", from, to, batchSpec);
String relRename = String.format("MATCH ()-[r]->() WHERE r[$1] IS NOT NULL CALL { WITH r SET r.`%2$s` = r[$1] REMOVE r.`%1$s` } IN TRANSACTIONS%3$s", from, to, batchSpec);
String nodeRename = String.format("MATCH (n) WHERE n[$1] IS NOT NULL CALL { WITH n SET n.`%2$s` = n[$1] REMOVE n.`%1$s` }%3$s", from, to, batchSpec);
String relRename = String.format("MATCH ()-[r]->() WHERE r[$1] IS NOT NULL CALL { WITH r SET r.`%2$s` = r[$1] REMOVE r.`%1$s` }%3$s", from, to, batchSpec);
return filterStatements(nodeRename, relRename);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ protected SqlStatement[] generateBatchedStatements(Neo4jDatabase neo4j) {
"CREATE (__start__)-[__newrel__:$($2)]->(__end__) " +
"SET __newrel__ = properties(__rel__) " +
"DELETE __rel__ } " +
"IN TRANSACTIONS%s", queryStart(neo4j), cypherBatchSpec());
"%s", queryStart(neo4j), cypherBatchSpec());
return new SqlStatement[]{new RawParameterizedSqlStatement(cypher, from, to)};
}

Expand All @@ -82,7 +82,7 @@ protected SqlStatement[] generateBatchedStatements(Neo4jDatabase neo4j) {
"CREATE (__start__)-[__newrel__:`%s`]->(__end__) " +
"SET __newrel__ = properties(__rel__) " +
"DELETE __rel__ } " +
"IN TRANSACTIONS%s", queryStart(neo4j), to, cypherBatchSpec());
"%s", queryStart(neo4j), to, cypherBatchSpec());
return new SqlStatement[]{new RawParameterizedSqlStatement(cypher, from)};
}

Expand Down
2 changes: 2 additions & 0 deletions src/main/java/liquibase/ext/neo4j/database/KernelVersion.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ public class KernelVersion implements Comparable<KernelVersion> {
public static final KernelVersion V4_3_0 = new KernelVersion(4, 3, 0);
public static final KernelVersion V4_4_0 = new KernelVersion(4, 4, 0);
public static final KernelVersion V5_0_0 = new KernelVersion(5, 0, 0);
public static final KernelVersion V5_7_0 = new KernelVersion(5, 7, 0);
public static final KernelVersion V5_24_0 = new KernelVersion(5, 24, 0);
public static final KernelVersion V5_21_0 = new KernelVersion(5, 21, 0);
public static final KernelVersion V5_26_0 = new KernelVersion(5, 26, 0);

private final int major;
Expand Down
5 changes: 0 additions & 5 deletions src/main/java/liquibase/ext/neo4j/database/Neo4jDatabase.java
Original file line number Diff line number Diff line change
Expand Up @@ -224,11 +224,6 @@ public void execute(SqlStatement statement) throws LiquibaseException {
return jdbcExecutor().queryForList(statement);
}

// FIXME: inline and remove this
public boolean supportsCallInTransactions() {
return kernelVersion.compareTo(KernelVersion.V4_4_0) >= 0;
}

public KernelVersion getKernelVersion() {
return kernelVersion;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,16 @@
<xsd:attribute type="xsd:string" name="outputVariable" />
<xsd:attribute type="xsd:boolean" name="enableBatchImport" />
<xsd:attribute type="xsd:int" name="batchSize" />
<xsd:attribute type="xsd:boolean" name="concurrent" />
<xsd:attribute type="xsd:string" name="batchErrorPolicy">
<xsd:simpleType>
<xsd:restriction base="xsd:string">
<xsd:enumeration value="CONTINUE"/>
<xsd:enumeration value="BREAK"/>
<xsd:enumeration value="FAIL"/>
</xsd:restriction>
</xsd:simpleType>
</xsd:attribute>
</xsd:complexType>

<xsd:element name="renameType" type="renameTypeType" />
Expand All @@ -105,6 +115,16 @@
<xsd:attribute type="xsd:string" name="outputVariable" />
<xsd:attribute type="xsd:boolean" name="enableBatchImport" />
<xsd:attribute type="xsd:int" name="batchSize" />
<xsd:attribute type="xsd:boolean" name="concurrent" />
<xsd:attribute type="xsd:string" name="batchErrorPolicy">
<xsd:simpleType>
<xsd:restriction base="xsd:string">
<xsd:enumeration value="CONTINUE"/>
<xsd:enumeration value="BREAK"/>
<xsd:enumeration value="FAIL"/>
</xsd:restriction>
</xsd:simpleType>
</xsd:attribute>
</xsd:complexType>

<xsd:element name="invertDirection" type="invertDirectionType" />
Expand All @@ -114,6 +134,16 @@
<xsd:attribute type="xsd:string" name="outputVariable" />
<xsd:attribute type="xsd:boolean" name="enableBatchImport" />
<xsd:attribute type="xsd:int" name="batchSize" />
<xsd:attribute type="xsd:boolean" name="concurrent" />
<xsd:attribute type="xsd:string" name="batchErrorPolicy">
<xsd:simpleType>
<xsd:restriction base="xsd:string">
<xsd:enumeration value="CONTINUE"/>
<xsd:enumeration value="BREAK"/>
<xsd:enumeration value="FAIL"/>
</xsd:restriction>
</xsd:simpleType>
</xsd:attribute>
</xsd:complexType>

<xsd:element name="renameProperty" type="renamePropertyType" />
Expand All @@ -123,5 +153,15 @@
<xsd:attribute type="xsd:string" name="entityType" />
<xsd:attribute type="xsd:boolean" name="enableBatchImport" />
<xsd:attribute type="xsd:int" name="batchSize" />
<xsd:attribute type="xsd:boolean" name="concurrent" />
<xsd:attribute type="xsd:string" name="batchErrorPolicy">
<xsd:simpleType>
<xsd:restriction base="xsd:string">
<xsd:enumeration value="CONTINUE"/>
<xsd:enumeration value="BREAK"/>
<xsd:enumeration value="FAIL"/>
</xsd:restriction>
</xsd:simpleType>
</xsd:attribute>
</xsd:complexType>
</xsd:schema>
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package liquibase.ext.neo4j.change

import liquibase.changelog.ChangeSet
import liquibase.database.core.MySQLDatabase
import liquibase.ext.neo4j.database.KernelVersion
import liquibase.ext.neo4j.database.Neo4jDatabase
import spock.lang.Specification

Expand Down Expand Up @@ -30,7 +31,8 @@ class InvertDirectionChangeTest extends Specification {
changeSet.runInTransaction >> runInTx
renameLabelChange.setChangeSet(changeSet)
def database = Mock(Neo4jDatabase)
database.supportsCallInTransactions() >> withCIT
def version = (withCIT) ? KernelVersion.V4_4_0 : KernelVersion.V4_3_0
database.getKernelVersion() >> version

expect:
renameLabelChange.validate(database).getErrorMessages() == [error]
Expand Down
18 changes: 14 additions & 4 deletions src/test/groovy/liquibase/ext/neo4j/e2e/InvertDirectionIT.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ import liquibase.command.core.helpers.DatabaseChangelogCommandStep
import liquibase.command.core.helpers.DbUrlConnectionCommandStep
import liquibase.ext.neo4j.Neo4jContainerSpec

import static liquibase.ext.neo4j.DockerNeo4j.neo4jVersion
import static liquibase.ext.neo4j.database.KernelVersion.V5_21_0
import static org.junit.jupiter.api.Assumptions.assumeTrue

class InvertDirectionIT extends Neo4jContainerSpec {

def "runs migrations inverting direction"() {
Expand Down Expand Up @@ -48,11 +52,14 @@ class InvertDirectionIT extends Neo4jContainerSpec {

def "runs batched migrations inverting direction"() {
given:
if (concurrent) {
assumeTrue(neo4jVersion() >= V5_21_0)
}
def command = new CommandScope(UpdateCommandStep.COMMAND_NAME)
.addArgumentValue(DbUrlConnectionCommandStep.URL_ARG, "jdbc:neo4j:${neo4jContainer.getBoltUrl()}".toString())
.addArgumentValue(DbUrlConnectionCommandStep.USERNAME_ARG, "neo4j")
.addArgumentValue(DbUrlConnectionCommandStep.PASSWORD_ARG, PASSWORD)
.addArgumentValue(DatabaseChangelogCommandStep.CHANGELOG_FILE_ARG, "/e2e/invert-direction/changeLog-simple-batched.${format}".toString())
.addArgumentValue(DatabaseChangelogCommandStep.CHANGELOG_FILE_ARG, "/e2e/invert-direction/changeLog-simple-batched${if (concurrent) "-concurrent" else ""}.${format}".toString())
.setOutput(System.out)
command.execute()

Expand Down Expand Up @@ -81,7 +88,7 @@ class InvertDirectionIT extends Neo4jContainerSpec {
]

where:
format << ["json", "xml", "yaml"]
[format, concurrent] << [["json", "xml", "yaml"], [false, true]].combinations()
}

def "runs migrations inverting direction of matching relationships"() {
Expand Down Expand Up @@ -123,11 +130,14 @@ class InvertDirectionIT extends Neo4jContainerSpec {

def "runs batched migrations inverting direction of matching relationships"() {
given:
if (concurrent) {
assumeTrue(neo4jVersion() >= V5_21_0)
}
def command = new CommandScope(UpdateCommandStep.COMMAND_NAME)
.addArgumentValue(DbUrlConnectionCommandStep.URL_ARG, "jdbc:neo4j:${neo4jContainer.getBoltUrl()}".toString())
.addArgumentValue(DbUrlConnectionCommandStep.USERNAME_ARG, "neo4j")
.addArgumentValue(DbUrlConnectionCommandStep.PASSWORD_ARG, PASSWORD)
.addArgumentValue(DatabaseChangelogCommandStep.CHANGELOG_FILE_ARG, "/e2e/invert-direction/changeLog-pattern-batched.${format}".toString())
.addArgumentValue(DatabaseChangelogCommandStep.CHANGELOG_FILE_ARG, "/e2e/invert-direction/changeLog-pattern-batched${if (concurrent) "-concurrent" else ""}.${format}".toString())
.setOutput(System.out)
command.execute()

Expand Down Expand Up @@ -155,6 +165,6 @@ class InvertDirectionIT extends Neo4jContainerSpec {
]

where:
format << ["json", "xml", "yaml"]
[format, concurrent] << [["json", "xml", "yaml"], [false, true]].combinations()
}
}
Loading

0 comments on commit 4b5cac9

Please sign in to comment.