Skip to content

[fix](group commit) fix group commit use prepared statement and connect to observer #46206

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jan 2, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.doris.nereids.trees.plans.commands.insert;

import org.apache.doris.analysis.RedirectStatus;
import org.apache.doris.analysis.StmtType;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
Expand Down Expand Up @@ -452,6 +453,15 @@ public StmtType stmtType() {
return StmtType.INSERT;
}

@Override
public RedirectStatus toRedirectStatus() {
if (ConnectContext.get().isGroupCommit()) {
return RedirectStatus.NO_FORWARD;
} else {
return RedirectStatus.FORWARD_WITH_SYNC;
}
}

/**
* this factory is used to delay create the AbstractInsertExecutor until the DistributePlan is generated
* by NereidsPlanner
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,20 @@ public OlapGroupCommitInsertExecutor(ConnectContext ctx, Table table,
this.groupCommitBackend = backend;
}

/**
* check if the sql can run in group commit mode
* @param logicalPlan plan of sql
*/
public static void analyzeGroupCommit(LogicalPlan logicalPlan) {
ConnectContext ctx = ConnectContext.get();
if (ctx.getSessionVariable().isEnableInsertGroupCommit() && logicalPlan instanceof InsertIntoTableCommand) {
LogicalPlan logicalQuery = ((InsertIntoTableCommand) logicalPlan).getLogicalQuery();
TableIf targetTableIf = InsertUtils.getTargetTable(logicalQuery, ctx);
OlapGroupCommitInsertExecutor.analyzeGroupCommit(ctx, targetTableIf, logicalQuery,
Optional.empty());
}
}

protected static void analyzeGroupCommit(ConnectContext ctx, TableIf table, LogicalPlan logicalQuery,
Optional<InsertCommandContext> insertCtx) {
// The flag is set to false before execute sql, if it is true, this is a http stream
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@
import org.apache.doris.nereids.trees.plans.commands.insert.BatchInsertIntoTableCommand;
import org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand;
import org.apache.doris.nereids.trees.plans.commands.insert.InsertOverwriteTableCommand;
import org.apache.doris.nereids.trees.plans.commands.insert.OlapGroupCommitInsertExecutor;
import org.apache.doris.nereids.trees.plans.commands.insert.OlapInsertExecutor;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalSqlCache;
Expand Down Expand Up @@ -724,6 +725,7 @@ private void executeByNereids(TUniqueId queryId) throws Exception {
}
if (logicalPlan instanceof Command) {
if (logicalPlan instanceof Redirect) {
OlapGroupCommitInsertExecutor.analyzeGroupCommit(logicalPlan);
redirectStatus = ((Redirect) logicalPlan).toRedirectStatus();
if (isForwardToMaster()) {
// before forward to master, we also need to set profileType in this node
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1407,8 +1407,12 @@ class Suite implements GroovyInterceptable {
}

String getServerPrepareJdbcUrl(String jdbcUrl, String database) {
return getServerPrepareJdbcUrl(jdbcUrl, database, true)
}

String getServerPrepareJdbcUrl(String jdbcUrl, String database, boolean useMasterIp) {
String urlWithoutSchema = jdbcUrl.substring(jdbcUrl.indexOf("://") + 3)
def sql_ip = getMasterIp()
def sql_ip = useMasterIp ? getMasterIp() : urlWithoutSchema.substring(0, urlWithoutSchema.indexOf(":"))
def sql_port
if (urlWithoutSchema.indexOf("/") >= 0) {
// e.g: jdbc:mysql://locahost:8080/?a=b
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,12 +111,12 @@ suite("insert_group_commit_with_exception") {

// prepare insert
def db = context.config.defaultDb + "_insert_p0"
String url = getServerPrepareJdbcUrl(context.config.jdbcUrl, db)
String url = getServerPrepareJdbcUrl(context.config.jdbcUrl, db, false)

try (Connection connection = DriverManager.getConnection(url, context.config.jdbcUser, context.config.jdbcPassword)) {
Statement statement = connection.createStatement();
statement.execute("use ${db}");
statement.execute("set group_commit = eventual_consistency;");
statement.execute("set group_commit = sync_mode");
statement.execute("set enable_server_side_prepared_statement = true")
// without column
try (PreparedStatement ps = connection.prepareStatement("insert into ${table} values(?, ?, ?, ?)")) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ suite("insert_group_commit_with_prepare_stmt") {
return serverStatementIds
}

def url = getServerPrepareJdbcUrl(context.config.jdbcUrl, realDb)
def url = getServerPrepareJdbcUrl(context.config.jdbcUrl, realDb, false)
logger.info("url: " + url)

def result1 = connect(user, password, url + "&sessionVariables=group_commit=async_mode") {
Expand Down
Loading