4040import org .apache .hadoop .hive .metastore .txn .TxnStore ;
4141import org .apache .hadoop .hive .metastore .txn .TxnUtils ;
4242import org .apache .hadoop .hive .metastore .txn .entities .TxnWriteDetails ;
43+ import org .apache .hadoop .hive .metastore .txn .jdbc .InClauseBatchCommand ;
4344import org .apache .hadoop .hive .metastore .txn .jdbc .commands .DeleteReplTxnMapEntryCommand ;
4445import org .apache .hadoop .hive .metastore .txn .jdbc .commands .InsertCompletedTxnComponentsCommand ;
4546import org .apache .hadoop .hive .metastore .txn .jdbc .commands .RemoveTxnsFromMinHistoryLevelCommand ;
7071import java .util .Collections ;
7172import java .util .List ;
7273import java .util .Set ;
74+ import java .util .function .Function ;
7375import java .util .stream .Collectors ;
7476import java .util .stream .IntStream ;
7577
@@ -126,7 +128,7 @@ public TxnType execute(MultiDataSourceJdbcResource jdbcResource) throws MetaExce
126128 throw new RollbackException (null );
127129 }
128130 assert targetTxnIds .size () == 1 ;
129- txnid = targetTxnIds .get ( 0 );
131+ txnid = targetTxnIds .getFirst ( );
130132 }
131133
132134 /**
@@ -155,8 +157,8 @@ public TxnType execute(MultiDataSourceJdbcResource jdbcResource) throws MetaExce
155157 }
156158
157159 String conflictSQLSuffix = String .format ("""
158- FROM "TXN_COMPONENTS" WHERE "TC_TXNID" = %d AND "TC_OPERATION_TYPE" IN (%s, %s)
159- """ , txnid , OperationType .UPDATE , OperationType .DELETE );
160+ FROM "TXN_COMPONENTS" WHERE "TC_TXNID" = :txnId AND "TC_OPERATION_TYPE" IN (%s, %s)
161+ """ , OperationType .UPDATE , OperationType .DELETE );
160162 long tempCommitId = TxnUtils .generateTemporaryId ();
161163
162164 if (txnType == TxnType .SOFT_DELETE || txnType == TxnType .COMPACTION ) {
@@ -176,7 +178,10 @@ public TxnType execute(MultiDataSourceJdbcResource jdbcResource) throws MetaExce
176178 """ ;
177179
178180 boolean isUpdateOrDelete = Boolean .TRUE .equals (jdbcResource .getJdbcTemplate ().query (
179- jdbcResource .getSqlGenerator ().addLimitClause (1 , "\" TC_OPERATION_TYPE\" " + conflictSQLSuffix ),
181+ jdbcResource .getSqlGenerator ()
182+ .addLimitClause (1 , "\" TC_OPERATION_TYPE\" " + conflictSQLSuffix ),
183+ new MapSqlParameterSource ()
184+ .addValue ("txnId" , txnid ),
180185 ResultSet ::next ));
181186
182187 if (isUpdateOrDelete ) {
@@ -197,12 +202,12 @@ public TxnType execute(MultiDataSourceJdbcResource jdbcResource) throws MetaExce
197202 Object undoWriteSetForCurrentTxn = context .createSavepoint ();
198203 jdbcResource .getJdbcTemplate ().update (
199204 writeSetInsertSql + (ConfVars .useMinHistoryLevel () ? conflictSQLSuffix :
200- "FROM \" TXN_COMPONENTS\" WHERE \" TC_TXNID\" = :txnId" + (
201- (txnType != TxnType .REBALANCE_COMPACTION ) ? "" : " AND \" TC_OPERATION_TYPE\" <> :type" )),
205+ "FROM \" TXN_COMPONENTS\" WHERE \" TC_TXNID\" = :txnId" + (
206+ (txnType != TxnType .REBALANCE_COMPACTION ) ? "" : " AND \" TC_OPERATION_TYPE\" <> :type" )),
202207 new MapSqlParameterSource ()
203208 .addValue ("txnId" , txnid )
204- .addValue ("commitId " , tempCommitId )
205- .addValue ("type " , OperationType . COMPACT . getSqlConst () ));
209+ .addValue ("type " , OperationType . COMPACT . getSqlConst () )
210+ .addValue ("commitId " , tempCommitId ));
206211
207212 /**
208213 * This S4U will mutex with other commitTxn() and openTxns().
@@ -249,7 +254,7 @@ public TxnType execute(MultiDataSourceJdbcResource jdbcResource) throws MetaExce
249254 jdbcResource .getJdbcTemplate ().update (writeSetInsertSql + "FROM \" TXN_COMPONENTS\" WHERE \" TC_TXNID\" = :txnId" ,
250255 new MapSqlParameterSource ()
251256 .addValue ("txnId" , txnid )
252- .addValue ("commitId" , txnid ));
257+ .addValue ("commitId" , jdbcResource . execute ( new GetHighWaterMarkHandler ()) ));
253258 }
254259 } else {
255260 /*
@@ -274,8 +279,7 @@ public TxnType execute(MultiDataSourceJdbcResource jdbcResource) throws MetaExce
274279 jdbcResource .execute (new DeleteReplTxnMapEntryCommand (sourceTxnId , rqst .getReplPolicy ()));
275280 }
276281 updateWSCommitIdAndCleanUpMetadata (jdbcResource , txnid , txnType , commitId , tempCommitId );
277- jdbcResource .execute (new RemoveTxnsFromMinHistoryLevelCommand (ImmutableList .of (txnid )));
278- jdbcResource .execute (new RemoveWriteIdsFromMinHistoryCommand (ImmutableList .of (txnid )));
282+
279283 if (rqst .isSetKeyValue ()) {
280284 updateKeyValueAssociatedWithTxn (jdbcResource , rqst );
281285 }
@@ -562,11 +566,8 @@ private void updateKeyValueAssociatedWithTxn(MultiDataSourceJdbcResource jdbcRes
562566 }
563567 }
564568
565- /**
566- * See overridden method in CompactionTxnHandler also.
567- */
568- private void updateWSCommitIdAndCleanUpMetadata (MultiDataSourceJdbcResource jdbcResource , long txnid , TxnType txnType ,
569- Long commitId , long tempId ) throws MetaException {
569+ private void updateWSCommitIdAndCleanUpMetadata (MultiDataSourceJdbcResource jdbcResource ,
570+ long txnid , TxnType txnType , Long commitId , long tempId ) throws MetaException {
570571 List <String > queryBatch = new ArrayList <>(6 );
571572 // update write_set with real commitId
572573 if (commitId != null ) {
@@ -587,9 +588,17 @@ private void updateWSCommitIdAndCleanUpMetadata(MultiDataSourceJdbcResource jdbc
587588 queryBatch .add ("UPDATE \" COMPACTION_QUEUE\" SET \" CQ_NEXT_TXN_ID\" = " + commitId + ", \" CQ_COMMIT_TIME\" = " +
588589 getEpochFn (jdbcResource .getDatabaseProduct ()) + " WHERE \" CQ_TXN_ID\" = " + txnid );
589590 }
590-
591+
591592 // execute all in one batch
592593 jdbcResource .getJdbcTemplate ().getJdbcTemplate ().batchUpdate (queryBatch .toArray (new String [0 ]));
594+
595+ List <Function <List <Long >, InClauseBatchCommand <Long >>> commands = List .of (
596+ RemoveTxnsFromMinHistoryLevelCommand ::new ,
597+ RemoveWriteIdsFromMinHistoryCommand ::new
598+ );
599+ for (var cmd : commands ) {
600+ jdbcResource .execute (cmd .apply (ImmutableList .of (txnid )));
601+ }
593602 }
594603
595604 /**
0 commit comments