diff --git a/public/transactions-api/src/main/java/com/atomikos/icatch/provider/ConfigProperties.java b/public/transactions-api/src/main/java/com/atomikos/icatch/provider/ConfigProperties.java
index 3a61392d..927f1c5b 100644
--- a/public/transactions-api/src/main/java/com/atomikos/icatch/provider/ConfigProperties.java
+++ b/public/transactions-api/src/main/java/com/atomikos/icatch/provider/ConfigProperties.java
@@ -157,6 +157,22 @@ public String getProperty(String name) {
return ret;
}
+ /** Allow optional properties
+ *
+ * @param name
+ * @param defaultValue
+ * @return
+ */
+ public String getProperty(String name, String defaultValue) {
+ completeProperties();
+ String ret = properties.getProperty(name);
+ if (ret == null) {
+ ret = defaultValue;
+ }
+ ret = ret.trim();
+ return ret;
+ }
+
public void setProperty(String name, String value) {
properties.setProperty(name, value);
}
diff --git a/public/transactions-jdbc/pom.xml b/public/transactions-jdbc/pom.xml
index e916fed0..4203de9f 100644
--- a/public/transactions-jdbc/pom.xml
+++ b/public/transactions-jdbc/pom.xml
@@ -24,5 +24,17 @@
1.0
provided
+
+ com.h2database
+ h2
+ 2.2.224
+ test
+
+
+ org.apache.logging.log4j
+ log4j-slf4j-impl
+ 2.17.1
+ test
+
diff --git a/public/transactions-jdbc/src/test/java/com/atomikos/recovery/fs/DiskForceBundlingTest.java b/public/transactions-jdbc/src/test/java/com/atomikos/recovery/fs/DiskForceBundlingTest.java
new file mode 100644
index 00000000..48b1ef0e
--- /dev/null
+++ b/public/transactions-jdbc/src/test/java/com/atomikos/recovery/fs/DiskForceBundlingTest.java
@@ -0,0 +1,259 @@
+/**
+ * Copyright (C) 2000-2024 Atomikos
+ *
+ * LICENSE CONDITIONS
+ *
+ * See http://www.atomikos.com/Main/WhichLicenseApplies for details.
+ */
+package com.atomikos.recovery.fs;
+
+import com.atomikos.icatch.config.Configuration;
+import static com.atomikos.icatch.config.Configuration.getConfigProperties;
+import com.atomikos.jdbc.AtomikosDataSourceBean;
+import com.atomikos.logging.Logger;
+import com.atomikos.logging.LoggerFactory;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.sql.DataSource;
+import javax.transaction.UserTransaction;
+import static junit.framework.TestCase.assertTrue;
+import org.h2.jdbcx.JdbcDataSource;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/** This class is intended to check that the disk force bundling optimization does not cause
+ * unexpected issues and by running enabled vs disabled also to compare performance. It also causes
+ * semi-random 'functional' errors to also test the rollbacks.
+ *
+ * The tests run against an H2 in memory DB.
+ *
+ * The default settings run the test quite quickly, increase number of threads, transactions per thread
+ * and other parameters to get more varied results.
+ *
+ */
+public class DiskForceBundlingTest {
+
+ private static final Logger LOGGER = LoggerFactory.createLogger(DiskForceBundlingTest.class);
+
+ private static int NB_THREADS = 10;
+ private static int NB_TRANSACTIONS_PER_THREAD = 100;
+ private static String DISK_FORCE_BUNDING_ENABLED = "true";
+ // roughly make that many inserts 'fail' (leading to rollbacks)
+ private static float FAILURE_RATIO = 0.1f;
+
+ private UserTransaction userTransaction;
+ private DataSource ds;
+
+ @Before
+ public void setUp() throws Exception {
+ getConfigProperties().setProperty("com.atomikos.icatch.max_actives", String.valueOf(Integer.MAX_VALUE));
+ getConfigProperties().setProperty("com.atomikos.icatch.checkpoint_interval", "20000");
+ getConfigProperties().setProperty("com.atomikos.icatch.registered", "true");
+ getConfigProperties().setProperty("com.atomikos.icatch.default_jta_timeout", "30000");
+ //getConfigProperties().setProperty("com.atomikos.icatch.max_timeout", "30000");
+ //getConfigProperties().setProperty("com.atomikos.icatch.default_jta_timeout","30000");
+ getConfigProperties().setProperty(FileSystemRepository.DISK_FORCE_BUNDLING_ENABLED_PROPERTY, DISK_FORCE_BUNDING_ENABLED);
+// getConfigProperties().setProperty(FileSystemRepository.DISK_FORCE_BUNDLING_MAX_BUNDLE_SIZE_PROPERTY,"40");
+// getConfigProperties().setProperty(FileSystemRepository.DISK_FORCE_BUNDLING_MAX_BUNDLING_WAIT_TIME_MS_PROPERTY,"5");
+// getConfigProperties().setProperty(CachedRepository.DISK_FORCE_BUNDLING_MAX_COORDINATION_WAIT_TIME_MS,"0");
+
+ userTransaction = new com.atomikos.icatch.jta.UserTransactionImp();
+ AtomikosDataSourceBean ds = new AtomikosDataSourceBean();
+ ds.setUniqueResourceName("H2-DUMMY-DB");
+
+ JdbcDataSource ds2 = new JdbcDataSource();
+ ds2.setURL("jdbc:h2:mem:mydatabase;DB_CLOSE_DELAY=-1;DB_CLOSE_ON_EXIT=FALSE");//"jdbc:h2:˜/test");
+ ds2.setUser("sa");
+ ds2.setPassword("sa");
+
+ ds.setXaDataSource(ds2);
+ ds.setConcurrentConnectionValidation(true);
+ ds.setBorrowConnectionTimeout(5);
+
+ ds.setPoolSize(NB_THREADS);
+ ds.setTestQuery("SELECT 1 from dual");
+ ds.init();
+
+ prepareDB(ds);
+
+ this.ds = ds;
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ Configuration.shutdown(true);
+ ((AtomikosDataSourceBean) ds).close();
+ }
+
+ @Test
+ public void testFunctionality() throws Exception {
+ PerftestRunnable[] rs = new PerftestRunnable[NB_THREADS];
+
+ Thread[] threads = new Thread[NB_THREADS];
+ for (int i = 0; i < threads.length; i++) {
+ PerftestRunnable r = new PerftestRunnable(ds, userTransaction, i);
+ threads[i] = new Thread(r, "Perfthread-" + i);
+ threads[i].setDaemon(true);
+ rs[i] = r;
+ }
+
+ long start = System.currentTimeMillis();
+ for (int j = 0; j < threads.length; j++) {
+ threads[j].start();
+ LOGGER.logInfo("Started thread " + j);
+ }
+ int totalSuccessful = 0;
+ int totalFailed = 0;
+ int totalRolledback = 0;
+ for (int j = 0; j < threads.length; j++) {
+ threads[j].join();
+ int successful = rs[j].successfulTransactions.get();
+ int failed = rs[j].failedBackTransactions.get();
+ int rolledback = rs[j].rolledBackTransactions.get();
+ totalSuccessful = totalSuccessful + successful;
+ totalFailed = totalFailed + failed;
+ totalRolledback = totalRolledback + rolledback;
+ LOGGER.logInfo("Successful: " + threads[j] + " - " + successful);
+ LOGGER.logInfo("Failed: " + threads[j] + " - " + failed);
+ LOGGER.logInfo("Rolledback: " + threads[j] + " - " + rolledback);
+ }
+ LOGGER.logInfo("tx / sec: " + (NB_THREADS * NB_TRANSACTIONS_PER_THREAD) * 1000 / ((System.currentTimeMillis() - start)));
+ int inserted = insertedCount();
+ LOGGER.logInfo("total inserted records count: " + inserted);
+ LOGGER.logInfo("total successful commits count: " + totalSuccessful);
+ LOGGER.logInfo("total failed operations count: " + totalFailed);
+ LOGGER.logInfo("total successful rolledback count: " + totalRolledback);
+
+ // the total number of records found in the DB should be equal to the total number of successful commits
+ assertTrue("Inserted not equals to successfully XA-comitted!", inserted == totalSuccessful);
+
+ // all fails should have a corresponding successful rollback under normal circumstances (probably not in case the
+ // resource or system misbehaves (e. g. cannot write to disk temporarily or so))
+ assertTrue("Failed not equals to successfully XA-rolledback!", totalFailed == totalRolledback);
+ }
+
+ protected void prepareDB(DataSource ds) throws SQLException {
+ try (
+ Connection conn = ds.getConnection(); Statement s = conn.createStatement();) {
+ s.executeUpdate("drop table if exists accounts");
+ System.err.println("Creating Accounts table...");
+ s.executeUpdate("create table accounts ( "
+ + " account INTEGER, owner VARCHAR(300), balance BIGINT )");
+ }
+ }
+
+ protected int insertedCount() throws SQLException {
+ Connection c = null;
+ PreparedStatement s = null;
+ ResultSet rs = null;
+ try {
+ c = ds.getConnection();
+ s = c.prepareStatement("select count(*), account from Accounts group by account");
+ rs = s.executeQuery();
+ int totalChanged = 0;
+ while (rs.next()) {
+ int count = rs.getInt(1);
+ totalChanged = totalChanged + count;
+ LOGGER.logDebug(rs.getInt(2) + " - " + count);
+ }
+ return totalChanged;
+ } finally {
+ if (rs != null) {
+ rs.close();
+ }
+ if (s != null) {
+ s.close();
+ }
+ if (c != null) {
+ c.close();
+ }
+ }
+ }
+
+ private static class PerftestRunnable implements Runnable {
+
+ private AtomicInteger successfulTransactions = new AtomicInteger(0);
+ private AtomicInteger failedBackTransactions = new AtomicInteger(0);
+ private AtomicInteger rolledBackTransactions = new AtomicInteger(0);
+ private UserTransaction userTransaction;
+ private DataSource ds;
+ private int id;
+
+ private PerftestRunnable(DataSource ds, UserTransaction userTransaction, int id) {
+ this.ds = ds;
+ this.userTransaction = userTransaction;
+ this.id = id;
+ }
+
+ public void run() {
+ for (int count = 0; count < NB_TRANSACTIONS_PER_THREAD; count++) {
+ try {
+ userTransaction.begin();
+ performSQL(id, count);
+ //Thread.currentThread().sleep(100);
+ userTransaction.commit();
+ successfulTransactions.incrementAndGet();
+ } catch (Exception e) {
+ failedBackTransactions.incrementAndGet();
+ if (!(e instanceof CustomException)) {
+ LOGGER.logError("Error", e);
+ }
+ try {
+ userTransaction.rollback();
+ rolledBackTransactions.incrementAndGet();
+ } catch (Exception e1) {
+ LOGGER.logError("Could not rollback transaction!", e);
+ }
+ }
+ if (count % 200 == 0) {
+ LOGGER.logDebug("Count: " + count + ", success: " + successfulTransactions.get()
+ + ", failed: " + failedBackTransactions.get() + ", rolledback: " + rolledBackTransactions.get());
+ }
+ }
+ LOGGER.logInfo("Total: " + NB_TRANSACTIONS_PER_THREAD + ", success: " + successfulTransactions.get()
+ + ", failed: " + failedBackTransactions.get() + ", rolledback: " + rolledBackTransactions.get());
+ }
+
+ protected void performSQL(int id, int iteration) throws SQLException {
+ Random rand = new Random();
+
+ Connection c = null;
+ PreparedStatement s = null;
+ try {
+ c = ds.getConnection();
+ s = c.prepareStatement("insert into Accounts values ( ?, ?, ? )");
+ s.setInt(1, id);
+ s.setString(2, "owner" + id);
+ int x = rand.nextInt(NB_TRANSACTIONS_PER_THREAD);
+ s.setInt(3, x);
+ s.executeUpdate();
+ // make some transactions fail
+ if (x > (NB_TRANSACTIONS_PER_THREAD * FAILURE_RATIO)) {
+ throw new CustomException();
+ }
+ } catch (SQLException e) {
+ if (!(e instanceof CustomException)) {
+ LOGGER.logError("SQL could not be executed", e);
+ }
+ throw e;
+ } finally {
+ if (s != null) {
+ s.close();
+ }
+ if (c != null) {
+ c.close();
+ }
+ }
+ }
+ }
+
+ // just a marker exception
+ private static class CustomException extends SQLException {
+ }
+}
diff --git a/public/transactions-jdbc/src/test/resources/log4j2.xml b/public/transactions-jdbc/src/test/resources/log4j2.xml
new file mode 100644
index 00000000..46ae7516
--- /dev/null
+++ b/public/transactions-jdbc/src/test/resources/log4j2.xml
@@ -0,0 +1,28 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/public/transactions/src/main/java/com/atomikos/recovery/fs/CachedRepository.java b/public/transactions/src/main/java/com/atomikos/recovery/fs/CachedRepository.java
index f6d43b01..ba0c7f18 100644
--- a/public/transactions/src/main/java/com/atomikos/recovery/fs/CachedRepository.java
+++ b/public/transactions/src/main/java/com/atomikos/recovery/fs/CachedRepository.java
@@ -11,6 +11,8 @@
import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import com.atomikos.icatch.config.Configuration;
import com.atomikos.icatch.provider.ConfigProperties;
@@ -33,6 +35,11 @@ public class CachedRepository implements Repository {
private volatile long numberOfPutsSinceLastCheckpoint = 0;
private long checkpointInterval;
private long forgetOrphanedLogEntriesDelay;
+
+ public static final String DISK_FORCE_BUNDLING_MAX_COORDINATION_WAIT_TIME_MS = "com.atomikos.icatch.disk_force_bunding_max_coordination_wait_time_ms";
+ // zero means wait as long as it takes
+ private long diskForceBundlingMaxCoordinationWaitTimeMs = 0;
+
public CachedRepository(
InMemoryRepository inMemoryCoordinatorLogEntryRepository,
Repository backupCoordinatorLogEntryRepository) {
@@ -47,6 +54,9 @@ public void init() {
ConfigProperties configProperties = Configuration.getConfigProperties();
checkpointInterval = configProperties.getCheckpointInterval();
forgetOrphanedLogEntriesDelay = configProperties.getForgetOrphanedLogEntriesDelay();
+
+ diskForceBundlingMaxCoordinationWaitTimeMs = Long.parseLong(configProperties.getProperty(DISK_FORCE_BUNDLING_MAX_COORDINATION_WAIT_TIME_MS, "0"));
+ LOGGER.logDebug("diskForceBundlingMaxCoordinationWaitTimeMs " + diskForceBundlingMaxCoordinationWaitTimeMs);
try {
Collection coordinatorLogEntries = backupCoordinatorLogEntryRepository.getAllCoordinatorLogEntries();
@@ -63,19 +73,39 @@ public void init() {
}
@Override
- public synchronized void put(String id, PendingTransactionRecord coordinatorLogEntry)
+ public CountDownLatch put(String id, PendingTransactionRecord coordinatorLogEntry)
throws IllegalArgumentException, LogWriteException {
try {
+ CountDownLatch cdl;
+ synchronized (this) {
if(needsCheckpoint()){
performCheckpoint();
}
- backupCoordinatorLogEntryRepository.put(id, coordinatorLogEntry);
+ cdl = backupCoordinatorLogEntryRepository.put(id, coordinatorLogEntry);
inMemoryCoordinatorLogEntryRepository.put(id, coordinatorLogEntry);
numberOfPutsSinceLastCheckpoint++;
+ }
+ // If there is a latch returned, we are running in disk-force-bundling mode, so wait for the disk-force-bundling thread
+ // to signal disk-force has occured. The waiting is done outside of the synchronized block, otherwise no bundling would be
+ // possible in the first place.
+ if (cdl != null) {
+ if (diskForceBundlingMaxCoordinationWaitTimeMs > 0) {
+ boolean completed = cdl.await(diskForceBundlingMaxCoordinationWaitTimeMs, TimeUnit.MILLISECONDS);
+ if (!completed) {
+ LOGGER.logWarning("Disk force coordination time expired without completion for " + id + ", throwing exception. Another try will be done via checkpoint mechanism.");
+ throw new IllegalStateException("Disk force coordination time expired without completion for " + id + ", throwing exception. Another try will be done via checkpoint mechanism.");
+ }
+ }
+ else {
+ cdl.await();
+ }
+ }
} catch (Exception e) {
- performCheckpoint();
+ LOGGER.logDebug("Issue occurred during write put, trying checkpoint.", e);
+ performCheckpoint();
}
+ return null;
}
private synchronized void performCheckpoint() throws LogWriteException {
diff --git a/public/transactions/src/main/java/com/atomikos/recovery/fs/FileSystemRepository.java b/public/transactions/src/main/java/com/atomikos/recovery/fs/FileSystemRepository.java
index ea265d19..2242cf84 100644
--- a/public/transactions/src/main/java/com/atomikos/recovery/fs/FileSystemRepository.java
+++ b/public/transactions/src/main/java/com/atomikos/recovery/fs/FileSystemRepository.java
@@ -18,10 +18,18 @@
import java.io.StreamCorruptedException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import com.atomikos.icatch.config.Configuration;
import com.atomikos.icatch.provider.ConfigProperties;
@@ -38,8 +46,36 @@ public class FileSystemRepository implements Repository {
private static final Logger LOGGER = LoggerFactory.createLogger(FileSystemRepository.class);
private VersionedFile file;
- private FileChannel rwChannel = null;
+
+ // make volatile to allow, for the DISK_FORCE_BUNDING_ENABLED=true case, the null-check in initChannelIfNecessary
+ // to work properly without the need to acquire
+ private volatile FileChannel rwChannel = null;
private LogFileLock lock_;
+
+ // Enable/disable disk-force-bundling mode optimization.
+ // Ffor heavy concurrent load this can significantly improve transaction throughput by bundling the disk force
+ // of several transactions. Depending on the settings of the tuning options below this might in turn increase
+ // individual transaction latency - test and tune, your mileage may vary.
+ public static final String DISK_FORCE_BUNDLING_ENABLED_PROPERTY = "com.atomikos.icatch.disk_force_bunding_enabled";
+ private boolean diskForceBundlingEnabled = false;
+ // The maximum number of items to collect in a single batch.
+ // Note that it makes no sense to make this number larger than the maximum number of concurrent threads, if you
+ // do this and make the max_wait_time larger than 0 you will actually decrease commit throughput and increase latency.
+ public static final String DISK_FORCE_BUNDLING_MAX_BUNDLE_SIZE_PROPERTY = "com.atomikos.icatch.disk_force_bunding_max_bundle_size";
+ private int diskForceBundlingMaxBundleSize = 20;
+ // set this to non-zero to reduce load on the disk and tune your load profile
+ public static final String DISK_FORCE_BUNDLING_MAX_BUNDLING_WAIT_TIME_MS_PROPERTY = "com.atomikos.icatch.disk_force_bunding_max_bundling_wait_time_ms";
+ private long diskForceBundlingMaxBundlingWaitTimeMs = 0;
+ // size limit of the disk-force-bunding queue
+ public static final String DISK_FORCE_BUNDLING_QUEUE_MAX_SIZE_PROPERTY = "com.atomikos.icatch.disk_force_bunding_max_queue_size";
+ private int diskForceBundlingMaxQueueSize = 1000;
+
+ // the queue to put ByteBuffer write requests into
+ private final BlockingQueue diskForceBundlingQueue = new ArrayBlockingQueue<>(diskForceBundlingMaxQueueSize);
+
+ // This read/write lock is used a bit atypically, the read-locks are used for the write & force operations,
+ // the write lock for the renewal/checkpointing operations of the rwChannel.
+ private final ReadWriteLock diskForceBundlingRwChannelLock = new ReentrantReadWriteLock();
@Override
public void init() throws LogException {
@@ -48,45 +84,148 @@ public void init() throws LogException {
String baseName = configProperties.getLogBaseName();
LOGGER.logDebug("baseDir " + baseDir);
LOGGER.logDebug("baseName " + baseName);
+
+ diskForceBundlingEnabled = Boolean.parseBoolean(configProperties.getProperty(DISK_FORCE_BUNDLING_ENABLED_PROPERTY, "false"));
+ diskForceBundlingMaxBundleSize = Integer.parseInt(configProperties.getProperty(DISK_FORCE_BUNDLING_MAX_BUNDLE_SIZE_PROPERTY, "20"));
+ diskForceBundlingMaxBundlingWaitTimeMs = Long.parseLong(configProperties.getProperty(DISK_FORCE_BUNDLING_MAX_BUNDLING_WAIT_TIME_MS_PROPERTY, "0"));
+ diskForceBundlingMaxQueueSize = Integer.parseInt(configProperties.getProperty(DISK_FORCE_BUNDLING_QUEUE_MAX_SIZE_PROPERTY, "1000"));
+
+ LOGGER.logDebug("diskForceBundlingEnabled " + diskForceBundlingEnabled);
+ LOGGER.logDebug("diskForceBundlingMaxBundleSize " + diskForceBundlingMaxBundleSize);
+ LOGGER.logDebug("diskForceBundlingMaxBundlingWaitTimeMs " + diskForceBundlingMaxBundlingWaitTimeMs);
+ LOGGER.logDebug("diskForceBundlingMaxQueueSize " + diskForceBundlingMaxQueueSize);
+
lock_ = new LogFileLock(baseDir, baseName);
LOGGER.logDebug("LogFileLock " + lock_);
lock_.acquireLock();
file = new VersionedFile(baseDir, baseName, ".log");
-
+
+ // if disk-force-bundling is enabled, start the bundling thread
+ if (diskForceBundlingEnabled) {
+ Thread t = new Thread(new Runnable() {
+ public void run() {
+ int totalCount = 0;
+ while (true) {
+ try {
+ // start with one due to the first poll
+ int count = 1;
+ List holdersProcessed = new ArrayList<>(diskForceBundlingMaxBundleSize);
+ BufferHolder bh = diskForceBundlingQueue.take();//poll(1, TimeUnit.MILLISECONDS);
+ while (count < diskForceBundlingMaxBundleSize && bh != null) {
+ count++;
+ writeToChannel(bh.buff);
+ holdersProcessed.add(bh);
+ if (diskForceBundlingMaxBundlingWaitTimeMs <= 0) {
+ // performance tests have shown this to be faster than poll(0, TimeUnit.MILLISECONDS),
+ // at least on Windows (say, +10-15%), under heavy load
+ bh = diskForceBundlingQueue.poll();
+ }
+ else {
+ bh = diskForceBundlingQueue.poll(diskForceBundlingMaxBundlingWaitTimeMs, TimeUnit.MILLISECONDS);
+ }
+ }
+ // the last one might be non-null but the batch-count already reached - don't forget to process that too...
+ if (bh != null) {
+ writeToChannel(bh.buff);
+ holdersProcessed.add(bh);
+ }
+ writeForceChannel(false);
+ for (BufferHolder bhp : holdersProcessed) {
+ bhp.latch.countDown();
+ }
+ if (LOGGER.isTraceEnabled()) {
+ totalCount = totalCount + holdersProcessed.size();
+ LOGGER.logTrace("TotalCount: " + totalCount + ", last bundle size: " + holdersProcessed.size());
+ }
+ }
+ catch (InterruptedException e) {
+ LOGGER.logError("InterruptedException Problem in disk-force-bundling thread! Trying to continue.", e);
+ // set-back interrupted flag
+ Thread.currentThread().interrupt();
+ }
+ catch (IOException e) {
+ LOGGER.logError("IOException Problem in disk-force-bundling thread! Trying to continue.", e);
+ }
+ }
+ }
+ }, "Disk-Force-Bundle-Thread");
+ t.setPriority(10);
+ t.setDaemon(true);
+ t.start();
+ LOGGER.logInfo("Started Disk-Force-Bundle Thread");
+ }
+ else {
+ LOGGER.logDebug("Running in classic (Non-Disk-Force-Bundle) mode");
+ }
}
-
+
@Override
- public void put(String id, PendingTransactionRecord pendingTransactionRecord)
+ public CountDownLatch put(String id, PendingTransactionRecord pendingTransactionRecord)
throws IllegalArgumentException, LogWriteException {
try {
initChannelIfNecessary();
- write(pendingTransactionRecord, true);
+ return write(pendingTransactionRecord, true);
} catch (IOException e) {
throw new LogWriteException(e);
}
}
- private synchronized void initChannelIfNecessary()
+ private void initChannelIfNecessary()
throws FileNotFoundException {
- if (rwChannel == null) {
+ if (diskForceBundlingEnabled) {
+ if (rwChannel == null) {
+ try {
+ diskForceBundlingRwChannelLock.writeLock().lock();
+ rwChannel = file.openNewVersionForNioWriting();
+ }
+ finally {
+ diskForceBundlingRwChannelLock.writeLock().unlock();
+ }
+ }
+ }
+ else {
+ synchronized (this) {
+ if (rwChannel == null) {
rwChannel = file.openNewVersionForNioWriting();
- }
+ }
+ }
+ }
}
- private void write(PendingTransactionRecord pendingTransactionRecord,
+ private CountDownLatch write(PendingTransactionRecord pendingTransactionRecord,
boolean flushImmediately) throws IOException {
String str = pendingTransactionRecord.toRecord();
byte[] buffer = str.getBytes();
ByteBuffer buff = ByteBuffer.wrap(buffer);
- writeToFile(buff, flushImmediately);
+ return writeToFile(buff, flushImmediately);
}
- private synchronized void writeToFile(ByteBuffer buff, boolean force)
+ private CountDownLatch writeToFile(ByteBuffer buff, boolean force)
throws IOException {
- rwChannel.write(buff);
- if (force) {
- rwChannel.force(false);
- }
+
+ if (diskForceBundlingEnabled) {
+ if (force) {
+ BufferHolder bh = new BufferHolder();
+ bh.buff = buff;
+ // directly offer without timeout, it is unlikely that the queue becomes full (other mechanisms will become stuck first,
+ // i. e. threads hanging on the latch.await and timing out there)
+ diskForceBundlingQueue.offer(bh);
+ return bh.latch;
+ }
+ else {
+ writeToChannel(buff);
+ return null;
+ }
+ }
+ else {
+ synchronized (this) {
+ rwChannel.write(buff);
+ if (force) {
+ rwChannel.force(false);
+ }
+ return null;
+ }
+ }
}
@Override
@@ -187,14 +326,31 @@ private static void closeSilently(BufferedReader fis) {
public void writeCheckpoint(Collection checkpointContent) throws LogWriteException {
try {
- closeOutput();
-
- rwChannel = file.openNewVersionForNioWriting();
- for (PendingTransactionRecord coordinatorLogEntry : checkpointContent) {
- write(coordinatorLogEntry, false);
- }
- rwChannel.force(false);
- file.discardBackupVersion();
+ if (diskForceBundlingEnabled) {
+ try {
+ diskForceBundlingRwChannelLock.writeLock().lock();
+ closeOutput();
+ rwChannel = file.openNewVersionForNioWriting();
+ for (PendingTransactionRecord coordinatorLogEntry : checkpointContent) {
+ write(coordinatorLogEntry, false);
+ }
+ rwChannel.force(false);
+ file.discardBackupVersion();
+ }
+ finally {
+ diskForceBundlingRwChannelLock.writeLock().unlock();
+ }
+ }
+ else {
+ closeOutput();
+
+ rwChannel = file.openNewVersionForNioWriting();
+ for (PendingTransactionRecord coordinatorLogEntry : checkpointContent) {
+ write(coordinatorLogEntry, false);
+ }
+ rwChannel.force(false);
+ file.discardBackupVersion();
+ }
} catch (FileNotFoundException firstStart) {
// the file could not be opened for reading;
// merely return the default empty vector
@@ -216,14 +372,65 @@ protected void closeOutput() throws IllegalStateException {
@Override
public void close() {
+ if (diskForceBundlingEnabled) {
try {
+ diskForceBundlingRwChannelLock.writeLock().lock();
+ closeOutput();
+ } catch (Exception e) {
+ LOGGER.logWarning("Error closing file - ignoring", e);
+ } finally {
+ diskForceBundlingRwChannelLock.writeLock().unlock();
+ lock_.releaseLock();
+ }
+ }
+ else {
+ try {
closeOutput();
} catch (Exception e) {
LOGGER.logWarning("Error closing file - ignoring", e);
} finally {
lock_.releaseLock();
}
-
+ }
}
-
+
+ /** Helper method to write to channel with proper locking for disk-force-budling
+ *
+ * @param buff
+ * @throws IOException
+ */
+ private void writeToChannel(ByteBuffer buff) throws IOException {
+ try {
+ diskForceBundlingRwChannelLock.readLock().lock();
+ rwChannel.write(buff);
+ }
+ finally {
+ diskForceBundlingRwChannelLock.readLock().unlock();
+ }
+ }
+
+ /** Helper method to disk-force channel with proper locking for disk-force-budling
+ *
+ * @param forceMeta
+ * @throws IOException
+ */
+ private void writeForceChannel(boolean forceMeta) throws IOException {
+ try {
+ diskForceBundlingRwChannelLock.readLock().lock();
+ rwChannel.force(forceMeta);
+ }
+ finally {
+ diskForceBundlingRwChannelLock.readLock().unlock();
+ }
+ }
+
+ /** Simple helper class to transfer ByteBuffer and countdown latch
+ * from original/transaction thread to disk-force thread
+ */
+ private static class BufferHolder {
+ private ByteBuffer buff;
+ // coordination latch, once it's down the original/transaction thread
+ // knows the buffer has been disk-forced
+ private final CountDownLatch latch = new CountDownLatch(1);
+ }
}
diff --git a/public/transactions/src/main/java/com/atomikos/recovery/fs/InMemoryRepository.java b/public/transactions/src/main/java/com/atomikos/recovery/fs/InMemoryRepository.java
index 8bfb6c01..2fb91c03 100644
--- a/public/transactions/src/main/java/com/atomikos/recovery/fs/InMemoryRepository.java
+++ b/public/transactions/src/main/java/com/atomikos/recovery/fs/InMemoryRepository.java
@@ -13,6 +13,7 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
import com.atomikos.recovery.PendingTransactionRecord;
import com.atomikos.recovery.TxState;
@@ -30,7 +31,7 @@ public void init() {
@Override
- public synchronized void put(String id, PendingTransactionRecord coordinatorLogEntry)
+ public synchronized CountDownLatch put(String id, PendingTransactionRecord coordinatorLogEntry)
throws IllegalArgumentException {
PendingTransactionRecord existing = storage.get(id);
if (existing != null && existing == coordinatorLogEntry) {
@@ -41,6 +42,7 @@ public synchronized void put(String id, PendingTransactionRecord coordinatorLogE
} else {
storage.put(id, coordinatorLogEntry);
}
+ return null;
}
@Override
diff --git a/public/transactions/src/main/java/com/atomikos/recovery/fs/Repository.java b/public/transactions/src/main/java/com/atomikos/recovery/fs/Repository.java
index 252cb244..64107770 100644
--- a/public/transactions/src/main/java/com/atomikos/recovery/fs/Repository.java
+++ b/public/transactions/src/main/java/com/atomikos/recovery/fs/Repository.java
@@ -9,6 +9,7 @@
package com.atomikos.recovery.fs;
import java.util.Collection;
+import java.util.concurrent.CountDownLatch;
import com.atomikos.recovery.LogException;
import com.atomikos.recovery.LogReadException;
@@ -19,7 +20,10 @@ public interface Repository {
void init() throws LogException;
- void put(String id,PendingTransactionRecord pendingTransactionRecord) throws LogWriteException;
+ // Returns the disk-force-bundling coordination latch for the case that disk-force-bundling is enabled.
+ // The latch is necessary upstream because of the CachedRepository wrapping the FileSystemRepository and the CachedRepository
+ // doing it's own synchronization.
+ CountDownLatch put(String id,PendingTransactionRecord pendingTransactionRecord) throws LogWriteException;
PendingTransactionRecord get(String coordinatorId) throws LogReadException;