Skip to content

Commit

Permalink
CONJ-152 - rewriteBatchedStatements and multiple executeBatch error
Browse files Browse the repository at this point in the history
  • Loading branch information
rusher committed Jun 4, 2015
1 parent 59f3d68 commit fae71ea
Show file tree
Hide file tree
Showing 13 changed files with 282 additions and 201 deletions.
13 changes: 0 additions & 13 deletions src/main/java/org/mariadb/jdbc/MySQLConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -124,19 +124,6 @@ public static MySQLConnection newConnection(MySQLProtocol protocol) throws SQLEx
connection.nullCatalogMeansCurrent = false;
}

Statement st = null;
try {
st = connection.createStatement();
if (sessionVariables != null) {
st.executeUpdate("set session " + sessionVariables);
}
ResultSet rs = st.executeQuery("show variables like 'max_allowed_packet'");
rs.next();
protocol.setMaxAllowedPacket(Integer.parseInt(rs.getString(2)));
} finally {
if (st != null)
st.close();
}
return connection;
}

Expand Down
67 changes: 12 additions & 55 deletions src/main/java/org/mariadb/jdbc/MySQLPreparedStatement.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,7 @@ WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWIS

import org.mariadb.jdbc.internal.SQLExceptionMapper;
import org.mariadb.jdbc.internal.common.Utils;
import org.mariadb.jdbc.internal.common.query.IllegalParameterException;
import org.mariadb.jdbc.internal.common.query.MySQLParameterizedQuery;
import org.mariadb.jdbc.internal.common.query.*;
import org.mariadb.jdbc.internal.common.query.parameters.*;

import java.io.IOException;
Expand All @@ -75,7 +74,6 @@ public class MySQLPreparedStatement extends MySQLStatement implements PreparedSt
private String sql;
private boolean useFractionalSeconds;
boolean parametersCleared;
List<MySQLPreparedStatement> batchPreparedStatements;


public MySQLPreparedStatement(MySQLConnection connection,
Expand All @@ -92,13 +90,6 @@ public MySQLPreparedStatement(MySQLConnection connection,
parametersCleared = true;
}

private MySQLPreparedStatement (MySQLConnection connection, String sql, MySQLParameterizedQuery dQuery, boolean useFractionalSeconds ) {
super(connection);
this.dQuery = dQuery.cloneQuery();
this.sql = sql;
this.useFractionalSeconds = useFractionalSeconds;
}

/**
* Executes the SQL query in this <code>PreparedStatement</code> object
* and returns the <code>ResultSet</code> object generated by the query.
Expand Down Expand Up @@ -192,64 +183,30 @@ public void setNull(final int parameterIndex, final int sqlType) throws SQLExcep
*/
public void addBatch() throws SQLException {
checkBatchFields();
batchPreparedStatements.add(new MySQLPreparedStatement(connection,sql, dQuery, useFractionalSeconds));

batchQueries.add(dQuery.cloneQuery());
isInsertRewriteable(dQuery.getQuery());
}
public void addBatch(final String sql) throws SQLException {
checkBatchFields();
batchPreparedStatements.add(new MySQLPreparedStatement(connection, sql));
isInsertRewriteable(sql);
batchQueries.add(new MySQLQuery(sql));
}

private void checkBatchFields() {
if (batchPreparedStatements == null) {
batchPreparedStatements = new ArrayList<MySQLPreparedStatement>();
if (batchQueries == null) {
batchQueries = new ArrayList<Query>();
}
}

public void clearBatch() {
if (batchPreparedStatements != null) {
batchPreparedStatements.clear();
}
}


@Override
public int[] executeBatch() throws SQLException {
if (batchPreparedStatements == null || batchPreparedStatements.isEmpty()) {
return new int[0];
if (batchQueries != null) {
batchQueries.clear();
}
int[] ret = new int[batchPreparedStatements.size()];
int i = 0;
MySQLResultSet rs = null;
try {
synchronized (this.getProtocol()) {
for (; i < batchPreparedStatements.size(); i++) {
PreparedStatement ps = batchPreparedStatements.get(i);
ps.execute();
int updateCount = ps.getUpdateCount();
if (updateCount == -1) {
ret[i] = SUCCESS_NO_INFO;
} else {
ret[i] = updateCount;
}
if (i == 0) {
rs = (MySQLResultSet)ps.getGeneratedKeys();
} else {
rs = rs.joinResultSets((MySQLResultSet)ps.getGeneratedKeys());
}
}
}
} catch (SQLException sqle) {
throw new BatchUpdateException(sqle.getMessage(), sqle.getSQLState(), sqle.getErrorCode(), Arrays.copyOf(ret, i), sqle);
} finally {
clearBatch();
}
batchResultSet = rs;
return ret;
firstRewrite = null;
isRewriteable = true;
}


/**
/**
* Sets the designated parameter to the given <code>Reader</code> object, which is the given number of characters
* long. When a very large UNICODE value is input to a <code>LONGVARCHAR</code> parameter, it may be more practical
* to send it via a <code>java.io.Reader</code> object. The data will be read from the stream as needed until
Expand Down
112 changes: 57 additions & 55 deletions src/main/java/org/mariadb/jdbc/MySQLStatement.java
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,10 @@ public class MySQLStatement implements Statement {
boolean isTimedout;
volatile boolean executing;

List<String> batchQueries;
List<Query> batchQueries;
Queue<Object> cachedResultSets;
private boolean isRewriteable = true;
private String firstRewrite = null;
protected boolean isRewriteable = true;
protected String firstRewrite = null;
protected ResultSet batchResultSet = null;


Expand Down Expand Up @@ -299,6 +299,45 @@ protected boolean execute(Query query) throws SQLException {
}
}

/**
* Execute statements. if many queries, those queries will be rewritten
* if isRewritable = false, the query will be agreggated :
* INSERT INTO jdbc (`name`) VALUES ('Line 1: Lorem ipsum ...')
* INSERT INTO jdbc (`name`) VALUES ('Line 2: Lorem ipsum ...')
* will be agreggate as
* INSERT INTO jdbc (`name`) VALUES ('Line 1: Lorem ipsum ...');INSERT INTO jdbc (`name`) VALUES ('Line 2: Lorem ipsum ...')
* and if isRewritable, agreggated as
* INSERT INTO jdbc (`name`) VALUES ('Line 1: Lorem ipsum ...'),('Line 2: Lorem ipsum ...')
* @param queries list of queries
* @param isRewritable are the queries of the same type to be agreggated
* @param rewriteOffset offset of the parameter if query are similar
* @return true if there was a result set, false otherwise.
* @throws SQLException
*/
protected boolean execute(List<Query> queries, boolean isRewritable, int rewriteOffset) throws SQLException {
//System.out.println(query);
synchronized (protocol) {
if (protocol.activeResult != null) {
protocol.activeResult.close();
}
executing = true;
QueryException exception = null;
executeQueryProlog();
try {
batchResultSet = null;
queryResult = protocol.executeQuery(queries, isStreaming(), isRewritable, rewriteOffset);
cacheMoreResults();
return (queryResult.getResultSetType() == ResultSetType.SELECT);
} catch (QueryException e) {
exception = e;
return false;
} finally {
executeQueryEpilog(exception, queries.get(0));
executing = false;
}
}
}

/**
* executes a select query.
*
Expand Down Expand Up @@ -1095,17 +1134,17 @@ public int getResultSetType() throws SQLException {
*/
public void addBatch(final String sql) throws SQLException {
if (batchQueries == null) {
batchQueries = new ArrayList<String>();
batchQueries = new ArrayList<Query>();
}
batchQueries.add(sql);
isInsertRewriteable(sql);
batchQueries.add(new MySQLQuery(sql));
}

/**
* Parses the sql string to understand whether it is compatible with rewritten batches.
* @param sql the sql string
*/
private void isInsertRewriteable(String sql) {
protected void isInsertRewriteable(String sql) {
if (!isRewriteable) {
return;
}
Expand Down Expand Up @@ -1157,24 +1196,7 @@ protected int getInsertIncipit(String sql) {

return startBracket;
}

/**
* If the batch array contains only rewriteable sql strings, returns the rewritten statement.
* @return the rewritten statement
*/
private String rewrittenBatch() {
StringBuilder result = null;
if(isRewriteable) {
result = new StringBuilder("");
result.append(firstRewrite);
for (String query : batchQueries) {
result.append(query.substring(getInsertIncipit(query)));
result.append(",");
}
result.deleteCharAt(result.length() - 1);
}
return (result == null ? null : result.toString());
}



/**
Expand Down Expand Up @@ -1231,18 +1253,23 @@ public void clearBatch() throws SQLException {
* @since 1.3
*/
public int[] executeBatch() throws SQLException {
if (batchQueries == null)
return new int[0];
if (batchQueries == null || batchQueries.size() == 0) return new int[0];

int[] ret = new int[batchQueries.size()];
int i = 0;
MySQLResultSet rs = null;

boolean allowMultiQueries = "true".equals(getProtocol().getInfo().getProperty("allowMultiQueries"));
boolean rewriteBatchedStatements = "true".equals(getProtocol().getInfo().getProperty("rewriteBatchedStatements"));
if (rewriteBatchedStatements) allowMultiQueries=true;
try {
synchronized (this.protocol) {
if (getProtocol().getInfo().getProperty("rewriteBatchedStatements") != null
&& "true".equalsIgnoreCase(getProtocol().getInfo().getProperty("rewriteBatchedStatements"))) {
ret = executeBatchAsMultiQueries();
} else {
if (allowMultiQueries) {
int size = batchQueries.size();
MySQLStatement ps = (MySQLStatement) connection.createStatement();
ps.execute(batchQueries, isRewriteable && rewriteBatchedStatements, (isRewriteable && rewriteBatchedStatements)?firstRewrite.length():0);
return isRewriteable?getUpdateCountsForReWrittenBatch(ps, size):getUpdateCounts(ps, size);
} else {
for(; i < batchQueries.size(); i++) {
execute(batchQueries.get(i));
int updateCount = getUpdateCount();
Expand All @@ -1267,31 +1294,6 @@ public int[] executeBatch() throws SQLException {
batchResultSet = rs;
return ret;
}

/**
* Builds a new statement which contains the batched Statements and executes it.
* @return an array of update counts containing one element for each command in the batch.
* The elements of the array are ordered according to the order in which commands were added to the batch.
* @throws SQLException
*/
private int[] executeBatchAsMultiQueries() throws SQLException {
int i = 0;
StringBuilder stringBuilder = new StringBuilder();
String rewrite = rewrittenBatch();
boolean rewrittenBatch = rewrite != null;
if (rewrittenBatch) {
stringBuilder.append(rewrite);
i = batchQueries.size();
} else {
for (; i < batchQueries.size(); i++) {
stringBuilder.append(batchQueries.get(i) + ";");
}
}
Statement ps = connection.createStatement();
ps.execute(stringBuilder.toString());
return rewrittenBatch ? getUpdateCountsForReWrittenBatch(ps, i) : getUpdateCounts(ps, i);
}


/**
* Retrieves the update counts for the batched statements rewritten as
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package org.mariadb.jdbc.internal.common.packet;

import java.io.IOException;

/**
* Created by diego_000 on 03/06/2015.
*/
public class MaxAllowedPacketException extends IOException {
boolean mustReconnect;
public MaxAllowedPacketException(String message, boolean mustReconnect) {
super(message);
this.mustReconnect = mustReconnect;
}

public boolean isMustReconnect() {
return mustReconnect;
}
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
package org.mariadb.jdbc.internal.common.packet;
import org.mariadb.jdbc.internal.common.packet.commands.StreamedQueryPacket;
import org.mariadb.jdbc.internal.common.query.MySQLQuery;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.logging.Level;
import java.util.logging.Logger;



public class PacketOutputStream extends OutputStream{
public class PacketOutputStream extends OutputStream {
private final static Logger log = Logger.getLogger("org.maria.jdbc");

private static final int MAX_PACKET_LENGTH = 0x00ffffff;
private static final int SEQNO_OFFSET = 3;
Expand Down Expand Up @@ -100,7 +105,6 @@ public void write(byte[] bytes, int off, int len) throws IOException{
System.arraycopy(byteBuffer, 0, tmp, 0, position);
byteBuffer = tmp;
}

System.arraycopy(bytes, off, byteBuffer, position, bytesToWrite);
position += bytesToWrite;
off += bytesToWrite;
Expand All @@ -123,12 +127,18 @@ private void internalFlush() throws IOException {
byteBuffer[1] = (byte)((dataLen >> 8) & 0xff);
byteBuffer[2] = (byte)((dataLen >> 16) & 0xff);
byteBuffer[SEQNO_OFFSET] = (byte)this.seqNo;
bytesWritten += dataLen;
bytesWritten += dataLen + HEADER_LENGTH;
if (maxAllowedPacket > 0 && bytesWritten > maxAllowedPacket && checkPacketLength) {
baseStream.close();
throw new IOException("max_allowed_packet exceeded. wrote " + bytesWritten + ", max_allowed_packet = " +maxAllowedPacket);
this.seqNo=-1;
throw new MaxAllowedPacketException("max_allowed_packet exceeded. wrote " + bytesWritten + ", max_allowed_packet = " +maxAllowedPacket, this.seqNo != 0);
}
baseStream.write(byteBuffer, 0, position);
if (log.isLoggable(Level.FINEST)) {
byte[] tmp = new byte[Math.min(1000, position)];
System.arraycopy(byteBuffer, 0, tmp, 0, Math.min(1000, position));
log.finest(new String(tmp));
}

position = HEADER_LENGTH;
this.seqNo++;
}
Expand Down
Loading

0 comments on commit fae71ea

Please sign in to comment.