diff --git a/com.ibm.streamsx.dps/impl/include/DpsConstants.h b/com.ibm.streamsx.dps/impl/include/DpsConstants.h index 16f4ad8..e4e8fb7 100644 --- a/com.ibm.streamsx.dps/impl/include/DpsConstants.h +++ b/com.ibm.streamsx.dps/impl/include/DpsConstants.h @@ -1,6 +1,6 @@ /* # Licensed Materials - Property of IBM -# Copyright IBM Corp. 2011, 2022 +# Copyright IBM Corp. 2011, 2023 # US Government Users Restricted Rights - Use, duplication or # disclosure restricted by GSA ADP Schedule Contract with # IBM Corp. @@ -84,6 +84,9 @@ interface with many different back-end in-memory stores. #define REDIS_HMSET_CMD "hmset " #define REDIS_HMGET_CMD "hmget " #define REDIS_ZMSCORE_CMD "zmscore " +#define REDIS_NX_OPTION "NX" +#define REDIS_EX_OPTION "EX" +#define REDIS_PX_OPTION "PX" #define CASSANDRA_DPS_KEYSPACE "com_ibm_streamsx_dps" #define CASSANDRA_DPS_MAIN_TABLE "t1" #define HBASE_DPS_MAIN_TABLE "dps_t1" diff --git a/com.ibm.streamsx.dps/impl/src/RedisClusterDBLayer.cpp b/com.ibm.streamsx.dps/impl/src/RedisClusterDBLayer.cpp index 5b6c654..1701a01 100644 --- a/com.ibm.streamsx.dps/impl/src/RedisClusterDBLayer.cpp +++ b/com.ibm.streamsx.dps/impl/src/RedisClusterDBLayer.cpp @@ -1,6 +1,6 @@ /* # Licensed Materials - Property of IBM -# Copyright IBM Corp. 2011, 2022 +# Copyright IBM Corp. 2011, 2023 # US Government Users Restricted Rights - Use, duplication or # disclosure restricted by GSA ADP Schedule Contract with # IBM Corp. @@ -326,11 +326,14 @@ namespace distributed if (redis_cluster_reply->integer == (int)0) { // It could be that our global store id is not there now. // Let us create one with an initial value of 0. - // Redis setnx is an atomic operation. It will succeed only for the very first operator that + // Redis set nx is an atomic operation. It will succeed only for the very first operator that // attempts to do this setting after a redis-cluster server is started fresh. If some other operator // already raced us ahead and created this guid_key, then our attempt below will be safely rejected. + // Senthil made NX related single API call code change in this method on Nov/25/2023. freeReplyObject(redis_cluster_reply); - cmd = string(REDIS_SETNX_CMD) + keyString + string(" ") + string("0"); + cmd = string(REDIS_SET_CMD) + keyString + string(" ") + string("0") + + string(" ") + string(REDIS_NX_OPTION); + try { redis_cluster_reply = static_cast(HiredisCommand::Command(redis_cluster, password_for_redis_cluster, keyString, cmd.c_str())); } catch (const RedisCluster::ClusterException &ex) { @@ -338,8 +341,8 @@ namespace distributed } if (redis_cluster_reply == NULL) { - dbError.set("Unable to connect to the redis-cluster server(s). Got a NULL redisReply for REDIS_SETNX_CMD.", DPS_CONNECTION_ERROR); - SPLAPPTRC(L_ERROR, "Inside connectToDatabase, it failed with an error with a NULL redisReply for REDIS_SETNX_CMD." << DPS_CONNECTION_ERROR, "RedisClusterDBLayer"); + dbError.set("Unable to connect to the redis-cluster server(s). Got a NULL redisReply for REDIS_SET_NX_CMD.", DPS_CONNECTION_ERROR); + SPLAPPTRC(L_ERROR, "Inside connectToDatabase, it failed with an error with a NULL redisReply for REDIS_SET_NX_CMD." << DPS_CONNECTION_ERROR, "RedisClusterDBLayer"); return; } } @@ -1862,16 +1865,23 @@ namespace distributed bool RedisClusterDBLayer::acquireStoreLock(string const & storeIdString) { int32_t retryCnt = 0; string cmd = ""; + SPLAPPTRC(L_DEBUG, "Inside acquireStoreLock, starting one or more attempts to acquire a lock for store " << storeIdString << ".", "RedisClusterDBLayer"); //Try to get a lock for this store. while (1) { // '4' + 'store id' + 'dps_lock' => 1 std::string storeLockKey = string(DPS_STORE_LOCK_TYPE) + storeIdString + DPS_LOCK_TOKEN; - // This is an atomic activity. + // This is an atomic activity that does a combined set with NX and EX options all done in one API call. // If multiple threads attempt to do it at the same time, only one will succeed. // Winner will hold the lock until they release it voluntarily or // until the Redis back-end removes this lock entry after the DPS_AND_DL_GET_LOCK_TTL times out. - cmd = string(REDIS_SETNX_CMD) + storeLockKey + " " + "1"; + // Senthil made NX related single API call code change in this method on Nov/27/2023. + std::ostringstream expiry_time_stream; + expiry_time_stream << string("") << DPS_AND_DL_GET_LOCK_TTL; + cmd = string(REDIS_SET_CMD) + storeLockKey + string(" ") + string("1") + + string(" ") + string(REDIS_NX_OPTION) + string(" ") + string(REDIS_EX_OPTION) + + string(" ") + expiry_time_stream.str(); + try { redis_cluster_reply = static_cast(HiredisCommand::Command(redis_cluster, password_for_redis_cluster, storeLockKey, cmd.c_str())); } catch (const RedisCluster::ClusterException &ex) { @@ -1879,70 +1889,23 @@ namespace distributed } if (redis_cluster_reply == NULL) { - SPLAPPTRC(L_ERROR, "a) Inside acquireStoreLock, it failed with a NULL redisReply. Application code may call the DPS reconnect API and then retry the failed operation. " << DPS_CONNECTION_ERROR, "RedisClusterDBLayer"); - return(false); - } - - if (redis_cluster_reply->type == REDIS_REPLY_ERROR) { - // Problem in atomic creation of the store lock. - freeReplyObject(redis_cluster_reply); - return(false); + // A NULL redis reply could be connection related error. Let us return now. + SPLAPPTRC(L_DEBUG, "a) Inside acquireStoreLock, got a null redis_reply error when acquiring a lock for a store " << storeIdString << " during attempt number " << (retryCnt+1) << ". Returning false now without making any further attempts to acquire a lock. Application code may call the DPS reconnect API and then retry the failed operation. " << DPS_CONNECTION_ERROR, "RedisClusterDBLayer"); + return(false); } - if (redis_cluster_reply->integer == (int)1) { - // We got the lock. - // Set the expiration time for this lock key. - freeReplyObject(redis_cluster_reply); - std::ostringstream cmd_stream; - cmd_stream << string(REDIS_EXPIRE_CMD) << storeLockKey << " " << DPS_AND_DL_GET_LOCK_TTL; - cmd = cmd_stream.str(); - try { - redis_cluster_reply = static_cast(HiredisCommand::Command(redis_cluster, password_for_redis_cluster, storeLockKey, cmd.c_str())); - } catch (const RedisCluster::ClusterException &ex) { - redis_cluster_reply = NULL; - SPLAPPTRC(L_ERROR, "b) Inside acquireStoreLock, it failed with a NULL redisReply. Application code may call the DPS reconnect API and then retry the failed operation. " << DPS_CONNECTION_ERROR, "RedisClusterDBLayer"); - } - - if (redis_cluster_reply == NULL) { - // We already got a NULL reply. There is not much use in the code block below. - // In any case, we will give it a try. - // Delete the erroneous lock data item we created. - cmd = string(REDIS_DEL_CMD) + " " + storeLockKey; - try { - redis_cluster_reply = static_cast(HiredisCommand::Command(redis_cluster, password_for_redis_cluster, storeLockKey, cmd.c_str())); - } catch (const RedisCluster::ClusterException &ex) { - redis_cluster_reply = NULL; - SPLAPPTRC(L_ERROR, "c) Inside acquireStoreLock, it failed with a NULL redisReply. Application code may call the DPS reconnect API and then retry the failed operation. " << DPS_CONNECTION_ERROR, "RedisClusterDBLayer"); - } - - if (redis_cluster_reply != NULL) { - freeReplyObject(redis_cluster_reply); - } - - return(false); - } + string redis_cluster_reply_string = ""; + int redis_cluster_reply_type = redis_cluster_reply->type; - if (redis_cluster_reply->type == REDIS_REPLY_ERROR) { - // Problem in atomic creation of the store lock. - freeReplyObject(redis_cluster_reply); - // Delete the erroneous lock data item we created. - cmd = string(REDIS_DEL_CMD) + " " + storeLockKey; - try { - redis_cluster_reply = static_cast(HiredisCommand::Command(redis_cluster, password_for_redis_cluster, storeLockKey, cmd.c_str())); - } catch (const RedisCluster::ClusterException &ex) { - redis_cluster_reply = NULL; - SPLAPPTRC(L_ERROR, "d) Inside acquireStoreLock, it failed with a NULL redisReply. Application code may call the DPS reconnect API and then retry the failed operation. " << DPS_CONNECTION_ERROR, "RedisClusterDBLayer"); - } - - if (redis_cluster_reply != NULL) { - freeReplyObject(redis_cluster_reply); - } - - return(false); - } + if (redis_cluster_reply_type == REDIS_REPLY_STATUS) { + redis_cluster_reply_string = string(redis_cluster_reply->str, redis_cluster_reply->len); - freeReplyObject(redis_cluster_reply); - return(true); + if(redis_cluster_reply_string == string("OK")) { + // We got an exclusive lock with an expiry time set for it. + freeReplyObject(redis_cluster_reply); + SPLAPPTRC(L_DEBUG, "Inside acquireStoreLock, acquired a lock for store " << storeIdString << " in " << (retryCnt+1) << " attempt(s).", "RedisClusterDBLayer"); + return(true); + } } freeReplyObject(redis_cluster_reply); @@ -1950,14 +1913,15 @@ namespace distributed retryCnt++; if (retryCnt >= DPS_AND_DL_GET_LOCK_MAX_RETRY_CNT) { + SPLAPPTRC(L_DEBUG, "Inside acquireStoreLock, it is not able to acquire a lock for store " << storeIdString << " after " << retryCnt << " retry attempts. Redis server reply type=" << redis_cluster_reply_type << ". Redis server reply string=" << redis_cluster_reply_string, "RedisClusterDBLayer"); - return(false); + return(false); } // Yield control to other threads. Wait here with patience by doing an exponential back-off delay. usleep(DPS_AND_DL_GET_LOCK_SLEEP_TIME * (retryCnt%(DPS_AND_DL_GET_LOCK_MAX_RETRY_CNT/DPS_AND_DL_GET_LOCK_BACKOFF_DELAY_MOD_FACTOR))); - } + } // End of while loop. return(false); } @@ -2378,7 +2342,7 @@ namespace distributed if (redis_cluster_reply == NULL) { dbError.set("Redis_Cluster_Reply_Null error. Unable to connect to the redis server(s). Application code may call the DPS reconnect API and then retry the failed operation.", DPS_CONNECTION_ERROR); - SPLAPPTRC(L_ERROR, "Redis_Cluster_Reply_Null error. Inside runDataStoreCommand using Redis cmdList, it failed for executing the user given Redis command list. Application code may call the DPS reconnect API and then retry the failed operation." << DPS_CONNECTION_ERROR, "RedisDBLayer"); + SPLAPPTRC(L_ERROR, "Redis_Cluster_Reply_Null error. Inside runDataStoreCommand using Redis cmdList, it failed for executing the user given Redis command list. Application code may call the DPS reconnect API and then retry the failed operation." << DPS_CONNECTION_ERROR, "RedisClusterDBLayer"); return(false); } @@ -2386,14 +2350,14 @@ namespace distributed // Error in executing the user given Redis command. resultValue = std::string(redis_cluster_reply->str); dbError.set("Redis_Cluster_Reply_Error while executing the user given Redis command. Error=" + resultValue, DPS_RUN_DATA_STORE_COMMAND_ERROR); - SPLAPPTRC(L_DEBUG, "Redis_Cluster_Reply_Error. Inside runDataStoreCommand using Redis cmdList, it failed to execute the user given Redis command list. Error=" << std::string(redis_cluster_reply->str) << ". " << DPS_RUN_DATA_STORE_COMMAND_ERROR, "RedisDBLayer"); + SPLAPPTRC(L_DEBUG, "Redis_Cluster_Reply_Error. Inside runDataStoreCommand using Redis cmdList, it failed to execute the user given Redis command list. Error=" << std::string(redis_cluster_reply->str) << ". " << DPS_RUN_DATA_STORE_COMMAND_ERROR, "RedisClusterDBLayer"); freeReplyObject(redis_cluster_reply); return(false); } else if (redis_cluster_reply->type == REDIS_REPLY_NIL) { // Redis returned NIL response. resultValue = "nil"; dbError.set("Redis_Cluster_Reply_Nil error while executing user given Redis command list. Possibly missing or invalid tokens in the Redis command.", DPS_RUN_DATA_STORE_COMMAND_ERROR); - SPLAPPTRC(L_DEBUG, "Redis_Cluster_Reply_Nil error. Inside runDataStoreCommand using Redis cmdList, it failed to execute the user given Redis command list. " << DPS_RUN_DATA_STORE_COMMAND_ERROR, "RedisDBLayer"); + SPLAPPTRC(L_DEBUG, "Redis_Cluster_Reply_Nil error. Inside runDataStoreCommand using Redis cmdList, it failed to execute the user given Redis command list. " << DPS_RUN_DATA_STORE_COMMAND_ERROR, "RedisClusterDBLayer"); freeReplyObject(redis_cluster_reply); return(false); } else if (redis_cluster_reply->type == REDIS_REPLY_STRING) { @@ -2445,7 +2409,7 @@ namespace distributed if (redis_cluster_reply == NULL) { dbError.set("a) getDataItemFromStore: Unable to connect to the redis-cluster server(s). Got a NULL redisReply. Application code may call the DPS reconnect API and then retry the failed operation.", DPS_CONNECTION_ERROR); - SPLAPPTRC(L_ERROR, "a) Redis_Cluster_Reply_Null error. getDataItemFromStore: Application code may call the DPS reconnect API and then retry the failed operation. " << DPS_CONNECTION_ERROR, "RedisDBLayer"); + SPLAPPTRC(L_ERROR, "a) Redis_Cluster_Reply_Null error. getDataItemFromStore: Application code may call the DPS reconnect API and then retry the failed operation. " << DPS_CONNECTION_ERROR, "RedisClusterDBLayer"); return(false); } @@ -2491,7 +2455,7 @@ namespace distributed if (redis_cluster_reply == NULL) { dbError.set("b) getDataItemFromStore: Unable to connect to the redis-cluster server(s). Got a NULL redisReply. Application code may call the DPS reconnect API and then retry the failed operation.", DPS_CONNECTION_ERROR); - SPLAPPTRC(L_ERROR, "b) Redis_Cluster_Reply_Null error. getDataItemFromStore: Application code may call the DPS reconnect API and then retry the failed operation. " << DPS_CONNECTION_ERROR, "RedisDBLayer"); + SPLAPPTRC(L_ERROR, "b) Redis_Cluster_Reply_Null error. getDataItemFromStore: Application code may call the DPS reconnect API and then retry the failed operation. " << DPS_CONNECTION_ERROR, "RedisClusterDBLayer"); return(false); } @@ -2617,16 +2581,23 @@ namespace distributed bool RedisClusterDBLayer::acquireGeneralPurposeLock(string const & entityName) { int32_t retryCnt = 0; string cmd = ""; + SPLAPPTRC(L_DEBUG, "Inside acquireGeneralPurposeLock, starting one or more attempts to acquire a lock for a generic id " << entityName << ".", "RedisClusterDBLayer"); //Try to get a lock for this generic entity. while (1) { // '501' + 'entity name' + 'generic_lock' => 1 std::string genericLockKey = GENERAL_PURPOSE_LOCK_TYPE + entityName + GENERIC_LOCK_TOKEN; - // This is an atomic activity. + // This is an atomic activity that does a combined set with NX and EX options all done in one API call. // If multiple threads attempt to do it at the same time, only one will succeed. // Winner will hold the lock until they release it voluntarily or // until the Redis back-end removes this lock entry after the DPS_AND_DL_GET_LOCK_TTL times out. - cmd = string(REDIS_SETNX_CMD) + genericLockKey + " " + "1"; + // Senthil made NX related single API call code change in this method on Nov/27/2023. + std::ostringstream expiry_time_stream; + expiry_time_stream << string("") << DPS_AND_DL_GET_LOCK_TTL; + cmd = string(REDIS_SET_CMD) + genericLockKey + string(" ") + string("1") + + string(" ") + string(REDIS_NX_OPTION) + string(" ") + string(REDIS_EX_OPTION) + + string(" ") + expiry_time_stream.str(); + try { redis_cluster_reply = static_cast(HiredisCommand::Command(redis_cluster, password_for_redis_cluster, genericLockKey, cmd.c_str())); } catch (const RedisCluster::ClusterException &ex) { @@ -2634,85 +2605,39 @@ namespace distributed } if (redis_cluster_reply == NULL) { - SPLAPPTRC(L_ERROR, "a) Redis_Cluster_Reply_Null error. acquireGeneralPurposeLock: Application code may call the DPS reconnect API and then retry the failed operation. " << DPS_CONNECTION_ERROR, "RedisDBLayer"); - return(false); + // A NULL redis reply could be connection related error. Let us return now. + SPLAPPTRC(L_DEBUG, "a) Inside acquireGeneralPurposeLock, got a null redis_reply error when acquiring a lock for a generic id " << entityName << " during attempt number " << (retryCnt+1) << ". Returning false now without making any further attempts to acquire a lock.", "RedisClusterDBLayer"); + return(false); } - if (redis_cluster_reply->type == REDIS_REPLY_ERROR) { - // Problem in atomic creation of the general purpose lock. - freeReplyObject(redis_cluster_reply); - return(false); - } - - if (redis_cluster_reply->integer == (int)1) { - // We got the lock. - // Set the expiration time for this lock key. - freeReplyObject(redis_cluster_reply); - std::ostringstream cmd_stream; - cmd_stream << string(REDIS_EXPIRE_CMD) << genericLockKey << " " << DPS_AND_DL_GET_LOCK_TTL; - cmd = cmd_stream.str(); - try { - redis_cluster_reply = static_cast(HiredisCommand::Command(redis_cluster, password_for_redis_cluster, genericLockKey, cmd.c_str())); - } catch (const RedisCluster::ClusterException &ex) { - redis_cluster_reply = NULL; - SPLAPPTRC(L_ERROR, "b) Redis_Cluster_Reply_Null error. acquireGeneralPurposeLock: Application code may call the DPS reconnect API and then retry the failed operation. " << DPS_CONNECTION_ERROR, "RedisDBLayer"); - } - - if (redis_cluster_reply == NULL) { - // We already got a NULL reply. There is not much use in the code block below. - // In any case, we will give it a try. - // Delete the erroneous lock data item we created. - cmd = string(REDIS_DEL_CMD) + " " + genericLockKey; - try { - redis_cluster_reply = static_cast(HiredisCommand::Command(redis_cluster, password_for_redis_cluster, genericLockKey, cmd.c_str())); - } catch (const RedisCluster::ClusterException &ex) { - redis_cluster_reply = NULL; - SPLAPPTRC(L_ERROR, "c) Redis_Cluster_Reply_Null error. acquireGeneralPurposeLock: Application code may call the DPS reconnect API and then retry the failed operation. " << DPS_CONNECTION_ERROR, "RedisDBLayer"); - } - - if (redis_cluster_reply != NULL) { - freeReplyObject(redis_cluster_reply); - } - - return(false); - } - - if (redis_cluster_reply->type == REDIS_REPLY_ERROR) { - // Problem in atomic creation of the general purpose lock. - freeReplyObject(redis_cluster_reply); - // Delete the erroneous lock data item we created. - cmd = string(REDIS_DEL_CMD) + " " + genericLockKey; - try { - redis_cluster_reply = static_cast(HiredisCommand::Command(redis_cluster, password_for_redis_cluster, genericLockKey, cmd.c_str())); - } catch (const RedisCluster::ClusterException &ex) { - redis_cluster_reply = NULL; - SPLAPPTRC(L_ERROR, "d) Redis_Cluster_Reply_Null error. acquireGeneralPurposeLock: Application code may call the DPS reconnect API and then retry the failed operation. " << DPS_CONNECTION_ERROR, "RedisDBLayer"); - } - - if (redis_cluster_reply != NULL) { - freeReplyObject(redis_cluster_reply); - } + string redis_cluster_reply_string = ""; + int redis_cluster_reply_type = redis_cluster_reply->type; - return(false); - } + if (redis_cluster_reply_type == REDIS_REPLY_STATUS) { + redis_cluster_reply_string = string(redis_cluster_reply->str, redis_cluster_reply->len); - freeReplyObject(redis_cluster_reply); - return(true); + if(redis_cluster_reply_string == string("OK")) { + // We got an exclusive lock with an expiry time set for it. + freeReplyObject(redis_cluster_reply); + SPLAPPTRC(L_DEBUG, "Inside acquireGeneralPurposeLock, acquired a lock for a generic id " << entityName << " in " << (retryCnt+1) << " attempt(s).", "RedisClusterDBLayer"); + return(true); + } } freeReplyObject(redis_cluster_reply); - // Someone else is holding on to the lock of this entity. Wait for a while before trying again. + // Someone else is holding on to the lock of this generic id. Wait for a while before trying again. retryCnt++; if (retryCnt >= DPS_AND_DL_GET_LOCK_MAX_RETRY_CNT) { + SPLAPPTRC(L_DEBUG, "Inside acquireGeneralPurposeLock, it is not able to acquire a lock for a generic id " << entityName << " after " << retryCnt << " retry attempts. Redis server reply type=" << redis_cluster_reply_type << ". Redis server reply string=" << redis_cluster_reply_string, "RedisClusterDBLayer"); - return(false); + return(false); } // Yield control to other threads. Wait here with patience by doing an exponential back-off delay. usleep(DPS_AND_DL_GET_LOCK_SLEEP_TIME * (retryCnt%(DPS_AND_DL_GET_LOCK_MAX_RETRY_CNT/DPS_AND_DL_GET_LOCK_BACKOFF_DELAY_MOD_FACTOR))); - } + } // End of while loop. return(false); } @@ -2725,7 +2650,7 @@ namespace distributed redis_cluster_reply = static_cast(HiredisCommand::Command(redis_cluster, password_for_redis_cluster, genericLockKey, cmd.c_str())); } catch (const RedisCluster::ClusterException &ex) { redis_cluster_reply = NULL; - SPLAPPTRC(L_ERROR, "Redis_Cluster_Reply_Null error. releaseGeneralPurposeLock: Application code may call the DPS reconnect API and then retry the failed operation. " << DPS_CONNECTION_ERROR, "RedisDBLayer"); + SPLAPPTRC(L_ERROR, "Redis_Cluster_Reply_Null error. releaseGeneralPurposeLock: Application code may call the DPS reconnect API and then retry the failed operation. " << DPS_CONNECTION_ERROR, "RedisClusterDBLayer"); } if (redis_cluster_reply != NULL) { @@ -2844,7 +2769,7 @@ namespace distributed if (this->redis_cluster_reply == NULL) { dbError.set("getNext: Unable to connect to the redis-cluster server(s). Got a NULL redisReply. Application code may call the DPS reconnect API and then retry the failed operation.", DPS_CONNECTION_ERROR); - SPLAPPTRC(L_ERROR, "Redis_Cluster_Reply_Null error. getNext: Application code may call the DPS reconnect API and then retry the failed operation. " << DPS_CONNECTION_ERROR, "RedisDBLayer"); + SPLAPPTRC(L_ERROR, "Redis_Cluster_Reply_Null error. getNext: Application code may call the DPS reconnect API and then retry the failed operation. " << DPS_CONNECTION_ERROR, "RedisClusterDBLayer"); this->hasData = false; return(false); } @@ -3288,145 +3213,109 @@ namespace distributed time_t startTime, timeNow; // Get the start time for our lock acquisition attempts. time(&startTime); + SPLAPPTRC(L_DEBUG, "Inside acquireLock, starting one or more attempts to acquire a lock id " << lockIdString << ".", "RedisClusterDBLayer"); //Try to get a distributed lock. while(1) { - // This is an atomic activity. + // This is an atomic activity that does a combined set with NX and PX options all done in one API call. // If multiple threads attempt to do it at the same time, only one will succeed. // Winner will hold the lock until they release it voluntarily or // until the Redis back-end removes this lock entry after the lease time ends. // We will add the lease time to the current timestamp i.e. seconds elapsed since the epoch. + // Senthil made NX related single API call code change in this method on Nov/27/2023. time_t new_lock_expiry_time = time(0) + (time_t)leaseTime; - cmd = string(REDIS_SETNX_CMD) + distributedLockKey + " " + "1"; + ostringstream expiryTimeInMillis; + expiryTimeInMillis << string("") << (leaseTime*1000.00); + cmd = string(REDIS_SET_CMD) + distributedLockKey + string(" ") + string("1") + + string(" ") + string(REDIS_NX_OPTION) + string(" ") + string(REDIS_PX_OPTION) + + string(" ") + expiryTimeInMillis.str(); + try { redis_cluster_reply = static_cast(HiredisCommand::Command(this->redis_cluster, this->password_for_redis_cluster, distributedLockKey, cmd.c_str())); } catch (const RedisCluster::ClusterException &ex) { redis_cluster_reply = NULL; - SPLAPPTRC(L_ERROR, "a) Inside acquireLock, it failed with a NULL redisReply. Application code may call the DPS reconnect API and then retry the failed operation. " << DL_CONNECTION_ERROR, "RedisClusterDBLayer"); } - if (redis_cluster_reply == NULL) { - return(false); - } - - if (redis_cluster_reply->type == REDIS_REPLY_ERROR) { - // Problem in atomic creation of the distributed lock. - freeReplyObject(redis_cluster_reply); - return(false); - } - - if (redis_cluster_reply->integer == (int)1) { - // We got the lock. - // Set the expiration time for this lock key. - freeReplyObject(redis_cluster_reply); - ostringstream expiryTimeInMillis; - expiryTimeInMillis << (leaseTime*1000.00); - cmd = string(REDIS_PSETEX_CMD) + distributedLockKey + " " + expiryTimeInMillis.str() + " " + "2"; - try { - redis_cluster_reply = static_cast(HiredisCommand::Command(this->redis_cluster, this->password_for_redis_cluster, distributedLockKey, cmd.c_str())); - } catch (const RedisCluster::ClusterException &ex) { - redis_cluster_reply = NULL; - SPLAPPTRC(L_ERROR, "b) Inside acquireLock, it failed with a NULL redisReply. Application code may call the DPS reconnect API and then retry the failed operation. " << DL_CONNECTION_ERROR, "RedisClusterDBLayer"); - - } - - if (redis_cluster_reply == NULL) { - // We already got a NULL reply which is not good. - // Most likely, it is a connection error with the redis cluster node. - // In any case, let us try the following code block which may also fail. - // Delete the erroneous lock data item we created. - cmd = string(REDIS_DEL_CMD) + " " + distributedLockKey; - try { - redis_cluster_reply = static_cast(HiredisCommand::Command(this->redis_cluster, this->password_for_redis_cluster, distributedLockKey, cmd.c_str())); - } catch (const RedisCluster::ClusterException &ex) { - redis_cluster_reply = NULL; - SPLAPPTRC(L_ERROR, "c) Inside acquireLock, it failed with a NULL redisReply. Application code may call the DPS reconnect API and then retry the failed operation. " << DL_CONNECTION_ERROR, "RedisClusterDBLayer"); - } - - if (redis_cluster_reply != NULL) { - freeReplyObject(redis_cluster_reply); - } - - return(false); - } - - if (redis_cluster_reply->type == REDIS_REPLY_ERROR) { - // Problem in atomic creation of the general purpose lock. - freeReplyObject(redis_cluster_reply); - // Delete the erroneous lock data item we created. - cmd = string(REDIS_DEL_CMD) + " " + distributedLockKey; - try { - redis_cluster_reply = static_cast(HiredisCommand::Command(this->redis_cluster, this->password_for_redis_cluster, distributedLockKey, cmd.c_str())); - } catch (const RedisCluster::ClusterException &ex) { - redis_cluster_reply = NULL; - SPLAPPTRC(L_ERROR, "d) Inside acquireLock, it failed with a NULL redisReply. Application code may call the DPS reconnect API and then retry the failed operation. " << DL_CONNECTION_ERROR, "RedisClusterDBLayer"); - } - - if (redis_cluster_reply != NULL) { - freeReplyObject(redis_cluster_reply); - } + if (redis_cluster_reply == NULL) { + // A NULL redis reply could be connection related error. Let us return now. + SPLAPPTRC(L_DEBUG, "Inside acquireLock, got a null redis_reply error when acquiring a lock id " << lockIdString << " during attempt number " << (retryCnt+1) << ". Returning false now without making any further attempts to acquire a lock.", "RedisClusterDBLayer"); + return(false); + } - return(false); - } + string redis_cluster_reply_string = ""; + int redis_cluster_reply_type = redis_cluster_reply->type; + bool redisClusterReplyObjectFreed = false; + + if (redis_cluster_reply_type == REDIS_REPLY_STATUS) { + redis_cluster_reply_string = string(redis_cluster_reply->str, redis_cluster_reply->len); + freeReplyObject(redis_cluster_reply); + redisClusterReplyObjectFreed = true; + + if(redis_cluster_reply_string == string("OK")) { + // We got an exclusive lock with an expiry time set for it. + // Let us update the lock information now. + if(updateLockInformation(lockIdString, lkError, 1, new_lock_expiry_time, getpid()) == true) { + SPLAPPTRC(L_DEBUG, "Inside acquireLock, acquired a lock id " << lockIdString << " in " << (retryCnt+1) << " attempt(s).", "RedisClusterDBLayer"); + return(true); + } else { + // Some error occurred while updating the lock information. + // It will be in an inconsistent state. Let us release the lock. + SPLAPPTRC(L_DEBUG, "Inside acquireLock, acquired a lock id " << lockIdString << " in " << (retryCnt+1) << " attempt(s). However, update lock information failed. We will continue the retry to get this lock.", "RedisClusterDBLayer"); + releaseLock(lock, lkError); + } + } + } - freeReplyObject(redis_cluster_reply); + if(redisClusterReplyObjectFreed == false) { + // It was not freed in the previous if block. Let us free it now. + freeReplyObject(redis_cluster_reply); + } - // We got the lock. - // Let us update the lock information now. - if(updateLockInformation(lockIdString, lkError, 1, new_lock_expiry_time, getpid()) == true) { - return(true); - } else { - // Some error occurred while updating the lock information. - // It will be in an inconsistent state. Let us release the lock. - releaseLock(lock, lkError); - } - } else { - // We didn't get the lock. - // Let us check if the previous owner of this lock simply forgot to release it. - // In that case, we will release this expired lock. - // Read the time at which this lock is expected to expire. - freeReplyObject(redis_cluster_reply); - uint32_t _lockUsageCnt = 0; - int32_t _lockExpirationTime = 0; - std::string _lockName = ""; - pid_t _lockOwningPid = 0; + // We didn't get the lock. + // Let us check if the previous owner of this lock simply forgot to release it. + // In that case, we will release this expired lock. + // Read the time at which this lock is expected to expire. + uint32_t _lockUsageCnt = 0; + int32_t _lockExpirationTime = 0; + std::string _lockName = ""; + pid_t _lockOwningPid = 0; + + if (readLockInformation(lockIdString, lkError, _lockUsageCnt, _lockExpirationTime, _lockOwningPid, _lockName) == false) { + SPLAPPTRC(L_DEBUG, "Inside acquireLock, it failed for lock id " << lockIdString << " while reading lock information. We will continue the retry to get this lock. Attempt number=" << (retryCnt+1) << ". Lock Error=" << lkError.getErrorCode(), "RedisClusterDBLayer"); + } else { + // Is current time greater than the lock expiration time? + if ((_lockExpirationTime > 0) && (time(0) > (time_t)_lockExpirationTime)) { + // Time has passed beyond the lease of this lock. + // Lease expired for this lock. Original owner forgot to release the lock and simply left it hanging there without a valid lease. + releaseLock(lock, lkError); + } + } - if (readLockInformation(lockIdString, lkError, _lockUsageCnt, _lockExpirationTime, _lockOwningPid, _lockName) == false) { - SPLAPPTRC(L_DEBUG, "Inside acquireLock, it failed for lock id " << lockIdString << ". " << lkError.getErrorCode(), "RedisClusterDBLayer"); - } else { - // Is current time greater than the lock expiration time? - if ((_lockExpirationTime > 0) && (time(0) > (time_t)_lockExpirationTime)) { - // Time has passed beyond the lease of this lock. - // Lease expired for this lock. Original owner forgot to release the lock and simply left it hanging there without a valid lease. - releaseLock(lock, lkError); - } - } - } + // Someone else is holding on to the lock we are trying to acquire. Wait for a while before trying again. + retryCnt++; - // Someone else is holding on to this distributed lock. Wait for a while before trying again. - retryCnt++; + if (retryCnt >= DPS_AND_DL_GET_LOCK_MAX_RETRY_CNT) { + SPLAPPTRC(L_DEBUG, "Inside acquireLock, it is not able to acquire a lock id " << lockIdString << " after " << retryCnt << " retry attempts. Redis server reply type=" << redis_cluster_reply_type << ". Redis server reply string=" << redis_cluster_reply_string << ". Caller will see a lock error code of " << DL_GET_LOCK_ERROR << ".", "RedisClusterDBLayer"); + lkError.set("Unable to acquire the lock named " + lockIdString + ".", DL_GET_LOCK_ERROR); + // Our caller can check the lock error code and try to acquire the lock again. + return(false); + } - if (retryCnt >= DPS_AND_DL_GET_LOCK_MAX_RETRY_CNT) { - lkError.set("Unable to acquire the lock named " + lockIdString + ".", DL_GET_LOCK_ERROR); - SPLAPPTRC(L_DEBUG, "Inside acquireLock, it failed for a lock named " << lockIdString << ". " << DL_GET_LOCK_ERROR, "RedisClusterDBLayer"); - // Our caller can check the error code and try to acquire the lock again. - return(false); - } + // Check if we have gone past the maximum wait time the caller was willing to wait in order to acquire this lock. + time(&timeNow); + if (difftime(startTime, timeNow) > maxWaitTimeToAcquireLock) { + lkError.set("Unable to acquire the lock named " + lockIdString + " within the caller specified wait time.", DL_GET_LOCK_TIMEOUT_ERROR); + SPLAPPTRC(L_DEBUG, "Inside acquireLock, it failed to acquire the lock named " << lockIdString << " within the caller specified wait time. Attempt number=" << retryCnt << ". Caller will see a lock error code of " << DL_GET_LOCK_TIMEOUT_ERROR << ".", "RedisClusterDBLayer"); + // Our caller can check the lock error code and try to acquire the lock again. + return(false); + } - // Check if we have gone past the maximum wait time the caller was willing to wait in order to acquire this lock. - time(&timeNow); - if (difftime(startTime, timeNow) > maxWaitTimeToAcquireLock) { - lkError.set("Unable to acquire the lock named " + lockIdString + " within the caller specified wait time.", DL_GET_LOCK_TIMEOUT_ERROR); - SPLAPPTRC(L_DEBUG, "Inside acquireLock, it failed to acquire the lock named " << lockIdString << - " within the caller specified wait time." << DL_GET_LOCK_TIMEOUT_ERROR, "RedisClusterDBLayer"); - // Our caller can check the error code and try to acquire the lock again. - return(false); - } + // Yield control to other threads. Wait here with patience by doing an exponential back-off delay. + usleep(DPS_AND_DL_GET_LOCK_SLEEP_TIME * + (retryCnt%(DPS_AND_DL_GET_LOCK_MAX_RETRY_CNT/DPS_AND_DL_GET_LOCK_BACKOFF_DELAY_MOD_FACTOR))); + } // End of while loop. - // Yield control to other threads. Wait here with patience by doing an exponential back-off delay. - usleep(DPS_AND_DL_GET_LOCK_SLEEP_TIME * - (retryCnt%(DPS_AND_DL_GET_LOCK_MAX_RETRY_CNT/DPS_AND_DL_GET_LOCK_BACKOFF_DELAY_MOD_FACTOR))); - } // End of while(1) + return(false); } void RedisClusterDBLayer::releaseLock(uint64_t lock, PersistenceError & lkError) { diff --git a/com.ibm.streamsx.dps/impl/src/RedisClusterPlusPlusDBLayer.cpp b/com.ibm.streamsx.dps/impl/src/RedisClusterPlusPlusDBLayer.cpp index a79015e..7fdb7fc 100644 --- a/com.ibm.streamsx.dps/impl/src/RedisClusterPlusPlusDBLayer.cpp +++ b/com.ibm.streamsx.dps/impl/src/RedisClusterPlusPlusDBLayer.cpp @@ -1,6 +1,6 @@ /* # Licensed Materials - Property of IBM -# Copyright IBM Corp. 2011, 2022 +# Copyright IBM Corp. 2011, 2023 # US Government Users Restricted Rights - Use, duplication or # disclosure restricted by GSA ADP Schedule Contract with # IBM Corp. @@ -149,6 +149,8 @@ namespace distributed int targetServerPort = 0; int connectionTimeout = 0; int useTls = -1; + string redisClusterTlsCertFileName = ""; + string redisClusterTlsKeyFileName = ""; string redisClusterCACertFileName = ""; int connectionAttemptCnt = 0; @@ -227,8 +229,23 @@ namespace distributed } } else if (tokenCnt == 6) { // This must be our sixth token. + if (token != "") { + // This is a fully qualified Redis cluster TLS cert filename. + // This file name can be copied from the Redis cluster's tls-cert-file configuration. + redisClusterTlsCertFileName = token; + } + } else if (tokenCnt == 7) { + // This must be our seventh token. + if (token != "") { + // This is a fully qualified Redis cluster TLS key filename. + // This file name can be copied from the Redis cluster's tls-key-file configuration. + redisClusterTlsKeyFileName = token; + } + } else if (tokenCnt == 8) { + // This must be our eighth token. if (token != "") { // This is a fully qualified Redis cluster CA cert filename. + // This file name can be copied from the Redis cluster's tls-ca-cert-file configuration. redisClusterCACertFileName = token; } } // End of the long if-else block. @@ -270,7 +287,7 @@ namespace distributed clusterPasswordUsage = "no"; } - cout << connectionAttemptCnt << ") ThreadId=" << threadId << ". Attempting to connect to the Redis cluster node " << targetServerName << " on port " << targetServerPort << " with " << clusterPasswordUsage << " password. " << "connectionTimeout=" << connectionTimeout << ", use_tls=" << useTls << ", redisClusterCACertFileName=" << redisClusterCACertFileName << "." << endl; + cout << connectionAttemptCnt << ") ThreadId=" << threadId << ". Attempting to connect to the Redis cluster node " << targetServerName << " on port " << targetServerPort << " with " << clusterPasswordUsage << " password. " << "connectionTimeout=" << connectionTimeout << ", use_tls=" << useTls << ", redisClusterTlsCertFileName=" << redisClusterTlsCertFileName << ", redisClusterTlsKeyFileName=" << redisClusterTlsKeyFileName << ", redisClusterCACertFileName=" << redisClusterCACertFileName << "." << endl; // Current redis cluster version as of Oct/29/2020 is 6.0.9 // If the redis cluster node is inactive, the following statement may throw an exception. @@ -294,6 +311,10 @@ namespace distributed // Set the desired TLS/SSL option. if(useTls == 1) { connection_options.tls.enabled = true; + // If TLS for the Redis Cluster is enabled, following three files must be + // specified via the DPS redis-cluster-plus-plus configuration. + connection_options.tls.cert = redisClusterTlsCertFileName; + connection_options.tls.key = redisClusterTlsKeyFileName; connection_options.tls.cacert = redisClusterCACertFileName; } @@ -394,14 +415,18 @@ namespace distributed if (exists_result_value == 0) { // It could be that our global store id is not there now. // Let us create one with an initial value of 0. - // Redis setnx is an atomic operation. It will succeed only for the very first operator that - // attempts to do this setting after a redis-cluster server is started fresh. If some other operator + // Redis set nx is an atomic operation. It will succeed only for the very first operator that + // attempts to do this setting after a redis server is started fresh. If some other operator // already raced us ahead and created this guid_key, then our attempt below will be safely rejected. + // Senthil made NX related single API call code change in this method on Nov/27/2023. exceptionString = ""; exceptionType = REDIS_PLUS_PLUS_NO_ERROR; try { - redis_cluster->setnx(keyString, string("0")); + // We will use the Redis SET with NX option all in a single atomic API call. + // A TTL valus of 0 means that we we don't want this K/V pair to expire at all. + std::chrono::milliseconds ttl_value = std::chrono::milliseconds(0); + redis_cluster->set(keyString, string("0"), ttl_value, UpdateType::NOT_EXIST); } catch (const ReplyError &ex) { // WRONGTYPE Operation against a key holding the wrong kind of value exceptionString = ex.what(); @@ -431,8 +456,8 @@ namespace distributed if(exceptionType != REDIS_PLUS_PLUS_NO_ERROR) { dbError.set(string("Unable to connect to the redis-cluster server(s). ") + - string("Error in REDIS_SETNX_CMD. Exception=") + exceptionString, DPS_CONNECTION_ERROR); - SPLAPPTRC(L_ERROR, "Inside connectToDatabase, it failed with an error for REDIS_SETNX_CMD. Exception=" << exceptionString << ". rc=" << DPS_CONNECTION_ERROR, "RedisClusterPlusPlusDBLayer"); + string("Error in REDIS_SET_NX_CMD. Exception=") + exceptionString, DPS_CONNECTION_ERROR); + SPLAPPTRC(L_ERROR, "Inside connectToDatabase, it failed with an error for REDIS_SET_NX_CMD. Exception=" << exceptionString << ". rc=" << DPS_CONNECTION_ERROR, "RedisClusterPlusPlusDBLayer"); return; } } // End of if (exists_result_value == 0) @@ -2801,21 +2826,26 @@ namespace distributed // This method will acquire a lock for a given store. bool RedisClusterPlusPlusDBLayer::acquireStoreLock(string const & storeIdString) { int32_t retryCnt = 0; + SPLAPPTRC(L_DEBUG, "Inside acquireStoreLock, starting one or more attempts to acquire a lock for store " << storeIdString << ".", "RedisClusterPlusPlusDBLayer"); // Try to get a lock for this store. while (1) { // '4' + 'store id' + 'dps_lock' => 1 std::string storeLockKey = string(DPS_STORE_LOCK_TYPE) + storeIdString + DPS_LOCK_TOKEN; - // This is an atomic activity. + // This is an atomic activity that does a combined set with NX and EX options all done in one API call. // If multiple threads attempt to do it at the same time, only one will succeed. // Winner will hold the lock until they release it voluntarily or // until the Redis back-end removes this lock entry after the DPS_AND_DL_GET_LOCK_TTL times out. + // Senthil made NX related single API call code change in this method on Nov/27/2023. string exceptionString = ""; int exceptionType = REDIS_PLUS_PLUS_NO_ERROR; bool setnx_result_value = false; try { - setnx_result_value = redis_cluster->setnx(storeLockKey, string("1")); + // We will use the Redis SET with NX option all in a single atomic API call. + // Set the TTL value for this K/V pair in milliseconds. + std::chrono::milliseconds ttl_value = std::chrono::milliseconds(DPS_AND_DL_GET_LOCK_TTL * 1000); + setnx_result_value = redis_cluster->set(storeLockKey, string("1"), ttl_value, UpdateType::NOT_EXIST); } catch (const ReplyError &ex) { // WRONGTYPE Operation against a key holding the wrong kind of value exceptionString = ex.what(); @@ -2845,89 +2875,14 @@ namespace distributed // Did we encounter an exception? if (exceptionType != REDIS_PLUS_PLUS_NO_ERROR) { - // Problem in atomic creation of the store lock. + // Problem in atomic creation of the store lock. Let us return now. + SPLAPPTRC(L_DEBUG, "Inside acquireStoreLock, got a redis command execution error when acquiring a lock for a store " << storeIdString << " during attempt number " << (retryCnt+1) << ". Returning false now without making any further attempts to acquire a lock. Exception type=" << exceptionType << ", ExceptionString=" << exceptionString, "RedisClusterPlusPlusDBLayer"); return(false); } if(setnx_result_value == true) { - // We got the lock. - // Set the expiration time for this lock key. - exceptionString = ""; - exceptionType = REDIS_PLUS_PLUS_NO_ERROR; - - try { - redis_cluster->expire(storeLockKey, 1); - } catch (const ReplyError &ex) { - // WRONGTYPE Operation against a key holding the wrong kind of value - exceptionString = ex.what(); - // Command execution error. - exceptionType = REDIS_PLUS_PLUS_REPLY_ERROR; - } catch (const TimeoutError &ex) { - // Reading or writing timeout - exceptionString = ex.what(); - // Connectivity related error. - exceptionType = REDIS_PLUS_PLUS_CONNECTION_ERROR; - } catch (const ClosedError &ex) { - // Connection has been closed. - exceptionString = ex.what(); - // Connectivity related error. - exceptionType = REDIS_PLUS_PLUS_CONNECTION_ERROR; - } catch (const IoError &ex) { - // I/O error on the connection. - exceptionString = ex.what(); - // Connectivity related error. - exceptionType = REDIS_PLUS_PLUS_CONNECTION_ERROR; - } catch (const Error &ex) { - // Other errors - exceptionString = ex.what(); - // Connectivity related error. - exceptionType = REDIS_PLUS_PLUS_OTHER_ERROR; - } - - // Did we encounter an exception? - if (exceptionType != REDIS_PLUS_PLUS_NO_ERROR) { - // Problem in setting the lock expiry time. - SPLAPPTRC(L_ERROR, "b) Inside acquireStoreLock, it failed with an exception. Error=" << exceptionString, "RedisClusterPlusPlusDBLayer"); - // We already got an exception. There is not much use in the code block below. - // In any case, we will give it a try. - // Delete the erroneous lock data item we created. - exceptionString = ""; - exceptionType = REDIS_PLUS_PLUS_NO_ERROR; - - try { - redis_cluster->del(storeLockKey); - } catch (const ReplyError &ex) { - // WRONGTYPE Operation against a key holding the wrong kind of value - exceptionString = ex.what(); - // Command execution error. - exceptionType = REDIS_PLUS_PLUS_REPLY_ERROR; - } catch (const TimeoutError &ex) { - // Reading or writing timeout - exceptionString = ex.what(); - // Connectivity related error. - exceptionType = REDIS_PLUS_PLUS_CONNECTION_ERROR; - } catch (const ClosedError &ex) { - // Connection has been closed. - exceptionString = ex.what(); - // Connectivity related error. - exceptionType = REDIS_PLUS_PLUS_CONNECTION_ERROR; - } catch (const IoError &ex) { - // I/O error on the connection. - exceptionString = ex.what(); - // Connectivity related error. - exceptionType = REDIS_PLUS_PLUS_CONNECTION_ERROR; - } catch (const Error &ex) { - // Other errors - exceptionString = ex.what(); - // Connectivity related error. - exceptionType = REDIS_PLUS_PLUS_OTHER_ERROR; - } - - // We couldn't get a lock. - return(false); - } - - // We got the lock. + // We got an exclusive lock with an expiry time set for it. + SPLAPPTRC(L_DEBUG, "Inside acquireStoreLock, acquired a lock for store " << storeIdString << " in " << (retryCnt+1) << " attempt(s).", "RedisClusterPlusPlusDBLayer"); return(true); } // End of if(setnx_result_value == true) @@ -2935,6 +2890,7 @@ namespace distributed retryCnt++; if (retryCnt >= DPS_AND_DL_GET_LOCK_MAX_RETRY_CNT) { + SPLAPPTRC(L_DEBUG, "Inside acquireStoreLock, it is not able to acquire a lock for store " << storeIdString << " after " << retryCnt << " retry attempts.", "RedisClusterPlusPlusDBLayer"); return(false); } @@ -3792,21 +3748,26 @@ namespace distributed // provide thread safety. There are other lock acquisition/release methods once someone has a valid store id or lock id. bool RedisClusterPlusPlusDBLayer::acquireGeneralPurposeLock(string const & entityName) { int32_t retryCnt = 0; + SPLAPPTRC(L_DEBUG, "Inside acquireGeneralPurposeLock, starting one or more attempts to acquire a lock for a generic id " << entityName << ".", "RedisClusterPlusPlusDBLayer"); //Try to get a lock for this generic entity. while (1) { // '501' + 'entity name' + 'generic_lock' => 1 std::string genericLockKey = GENERAL_PURPOSE_LOCK_TYPE + entityName + GENERIC_LOCK_TOKEN; - // This is an atomic activity. + // This is an atomic activity that does a combined set with NX and EX options all done in one API call. // If multiple threads attempt to do it at the same time, only one will succeed. // Winner will hold the lock until they release it voluntarily or // until the Redis back-end removes this lock entry after the DPS_AND_DL_GET_LOCK_TTL times out. + // Senthil made NX related single API call code change in this method on Nov/27/2023. string exceptionString = ""; int exceptionType = REDIS_PLUS_PLUS_NO_ERROR; bool setnx_result_value = false; try { - setnx_result_value = redis_cluster->setnx(genericLockKey, string("1")); + // We will use the Redis SET with NX option all in a single atomic API call. + // Set the TTL value for this K/V pair in milliseconds. + std::chrono::milliseconds ttl_value = std::chrono::milliseconds(DPS_AND_DL_GET_LOCK_TTL * 1000); + setnx_result_value = redis_cluster->set(genericLockKey, string("1"), ttl_value, UpdateType::NOT_EXIST); } catch (const ReplyError &ex) { // WRONGTYPE Operation against a key holding the wrong kind of value exceptionString = ex.what(); @@ -3834,87 +3795,30 @@ namespace distributed exceptionType = REDIS_PLUS_PLUS_OTHER_ERROR; } - // Did we encounter a redis-cluster server connection error? - if (exceptionType == REDIS_PLUS_PLUS_CONNECTION_ERROR) { - SPLAPPTRC(L_ERROR, "a) Inside acquireGeneralPurposeLock, it failed with a Redis connection error for REDIS_SETNX_CMD. Exception: " << exceptionString << ". Application code may call the DPS reconnect API and then retry the failed operation. " << DPS_CONNECTION_ERROR, "RedisClusterPlusPlusDBLayer"); - return(false); - } - - // Did we encounter a redis reply error? - if (exceptionType == REDIS_PLUS_PLUS_REPLY_ERROR || - exceptionType == REDIS_PLUS_PLUS_OTHER_ERROR) { - // Problem in atomic creation of the general purpose lock. + // Did we encounter an exception? + if (exceptionType != REDIS_PLUS_PLUS_NO_ERROR) { + SPLAPPTRC(L_DEBUG, "Inside acquireGeneralPurposeLock, got a redis command execution error when acquiring a lock for a generic id " << entityName << " during attempt number " << (retryCnt+1) << ". Returning false now without making any further attempts to acquire a lock. Exception type=" << exceptionType << ", ExceptionString=" << exceptionString, "RedisClusterPlusPlusDBLayer"); return(false); } if(setnx_result_value == true) { - // We got the lock. - // Set the expiration time for this lock key. - exceptionString = ""; - exceptionType = REDIS_PLUS_PLUS_NO_ERROR; - - try { - redis_cluster->expire(genericLockKey, DPS_AND_DL_GET_LOCK_TTL); - } catch (const ReplyError &ex) { - // WRONGTYPE Operation against a key holding the wrong kind of value - exceptionString = ex.what(); - // Command execution error. - exceptionType = REDIS_PLUS_PLUS_REPLY_ERROR; - } catch (const TimeoutError &ex) { - // Reading or writing timeout - exceptionString = ex.what(); - // Connectivity related error. - exceptionType = REDIS_PLUS_PLUS_CONNECTION_ERROR; - } catch (const ClosedError &ex) { - // Connection has been closed. - exceptionString = ex.what(); - // Connectivity related error. - exceptionType = REDIS_PLUS_PLUS_CONNECTION_ERROR; - } catch (const IoError &ex) { - // I/O error on the connection. - exceptionString = ex.what(); - // Connectivity related error. - exceptionType = REDIS_PLUS_PLUS_CONNECTION_ERROR; - } catch (const Error &ex) { - // Other errors - exceptionString = ex.what(); - // Connectivity related error. - exceptionType = REDIS_PLUS_PLUS_OTHER_ERROR; - } - - // Did we encounter an exception? - if (exceptionType != REDIS_PLUS_PLUS_NO_ERROR) { - // Problem in setting the lock expiry time. - SPLAPPTRC(L_ERROR, "b) Inside acquireGeneralPurposeLock, it failed with an exception for REDIS_EXPIRE_CMD. Exception: " << exceptionString << ".", "RedisClusterPlusPlusDBLayer"); - - // We already got an exception. There is not much use in the code block below. - // In any case, we will give it a try. - // Delete the erroneous lock data item we created. - try { - redis_cluster->del(genericLockKey); - } catch (const Error &ex) { - // It is not good if this one fails. We can't do much about that. - SPLAPPTRC(L_ERROR, "c) Inside acquireGeneralPurposeLock, it failed with an exception for REDIS_DEL_CMD. Exception: " << ex.what() << ".", "RedisClusterPlusPlusDBLayer"); - } - - return(false); - } - - // We got the lock with proper expiry time set. + // We got an exclusive lock with an expiry time set for it. + SPLAPPTRC(L_DEBUG, "Inside acquireGeneralPurposeLock, acquired a lock for a generic id " << entityName << " in " << (retryCnt+1) << " attempt(s).", "RedisClusterPlusPlusDBLayer"); return(true); - } // End of if(setnx_result_value == true). + } // End of if(setnx_result_value == true) - // Someone else is holding on to the lock of this entity. Wait for a while before trying again. + // Someone else is holding on to the lock of this store. Wait for a while before trying again. retryCnt++; if (retryCnt >= DPS_AND_DL_GET_LOCK_MAX_RETRY_CNT) { + SPLAPPTRC(L_DEBUG, "Inside acquireGeneralPurposeLock, it is not able to acquire a general purpose lock id " << entityName << " after " << retryCnt << " retry attempts.", "RedisClusterPlusPlusDBLayer"); return(false); } // Yield control to other threads. Wait here with patience by doing an exponential back-off delay. usleep(DPS_AND_DL_GET_LOCK_SLEEP_TIME * (retryCnt%(DPS_AND_DL_GET_LOCK_MAX_RETRY_CNT/DPS_AND_DL_GET_LOCK_BACKOFF_DELAY_MOD_FACTOR))); - } // End of while loop. + } // End of the while loop. return(false); } // End of acquireGeneralPurposeLock. @@ -4794,7 +4698,7 @@ namespace distributed if(totalKeysRemoved != total_zrem_cnt) { // Remove cnt mismatch between the store and the zset. dbError.set(std::string("Inside removeKeys #7, key, member removal cnt mismatch between HDEL and ZREM."), DPS_BULK_REMOVE_CNT_MISMATCH_ERROR); - SPLAPPTRC(L_DEBUG, "Inside removeKeys #7, it failed with a key, member removal cnt mismatch between HDEL and ZREM. " << DPS_BULK_REMOVE_CNT_MISMATCH_ERROR, "RedisDBLayer"); + SPLAPPTRC(L_DEBUG, "Inside removeKeys #7, it failed with a key, member removal cnt mismatch between HDEL and ZREM. " << DPS_BULK_REMOVE_CNT_MISMATCH_ERROR, "RedisClusterPlusPlusDBLayer"); return; } } catch (const ReplyError &ex) { @@ -5557,21 +5461,25 @@ namespace distributed time_t startTime, timeNow; // Get the start time for our lock acquisition attempts. time(&startTime); + SPLAPPTRC(L_DEBUG, "Inside acquireLock, starting one or more attempts to acquire a lock id " << lockIdString << ".", "RedisClusterPlusPlusDBLayer"); // Try to get a distributed lock. while(1) { - // This is an atomic activity. + // This is an atomic activity that does a combined set with NX and PX options all done in one API call. // If multiple threads attempt to do it at the same time, only one will succeed. // Winner will hold the lock until they release it voluntarily or // until the Redis back-end removes this lock entry after the lease time ends. // We will add the lease time to the current timestamp i.e. seconds elapsed since the epoch. + // Senthil made NX related single API call code change in this method on Nov/27/2023. time_t new_lock_expiry_time = time(0) + (time_t)leaseTime; string exceptionString = ""; int exceptionType = REDIS_PLUS_PLUS_NO_ERROR; bool setnx_result_value = false; try { - setnx_result_value = redis_cluster->setnx(distributedLockKey, string("1")); + // Set the TTL value for this K/V pair in milliseconds. + std::chrono::milliseconds ttl_value = std::chrono::milliseconds((int64_t)leaseTime * 1000); + setnx_result_value = redis_cluster->set(distributedLockKey, string("1"), ttl_value, UpdateType::NOT_EXIST); } catch (const ReplyError &ex) { // WRONGTYPE Operation against a key holding the wrong kind of value exceptionString = ex.what(); @@ -5599,80 +5507,22 @@ namespace distributed exceptionType = REDIS_PLUS_PLUS_OTHER_ERROR; } - // Did we encounter a redis-cluster server connection error? - if (exceptionType == REDIS_PLUS_PLUS_CONNECTION_ERROR) { - SPLAPPTRC(L_ERROR, "a) Inside acquireLock, it failed with a Redis connection error for REDIS_SETNX_CMD. Exception: " << exceptionString << ". Application code may call the DPS reconnect API and then retry the failed operation. " << DL_CONNECTION_ERROR, "RedisClusterPlusPlusDBLayer"); - return(false); - } - - // Did we encounter a redis reply error? - if (exceptionType == REDIS_PLUS_PLUS_REPLY_ERROR || - exceptionType == REDIS_PLUS_PLUS_OTHER_ERROR) { - // Problem in atomic creation of the distributed lock. - SPLAPPTRC(L_ERROR, "b) Inside acquireLock, it failed with an error for REDIS_SETNX_CMD. Exception: " << exceptionString << ".", "RedisClusterPlusPlusDBLayer"); + // Did we encounter an exception? + if (exceptionType != REDIS_PLUS_PLUS_NO_ERROR) { + SPLAPPTRC(L_DEBUG, "b) Inside acquireLock, got a redis command execution error when acquiring a lock id " << lockIdString << " during attempt number " << (retryCnt+1) << ". Returning false now without making any further attempts to acquire a lock. Exception type=" << exceptionType << ", ExceptionString=" << exceptionString, "RedisClusterPlusPlusDBLayer"); return(false); } if(setnx_result_value == true) { - // We got the lock. - // Set the expiration time for this lock key. - ostringstream expiryTimeInMillis; - expiryTimeInMillis << (leaseTime*1000.00); - long long ttlInMillis = streams_boost::lexical_cast(expiryTimeInMillis.str()); - exceptionString = ""; - exceptionType = REDIS_PLUS_PLUS_NO_ERROR; - - try { - redis_cluster->psetex(distributedLockKey, ttlInMillis, "2"); - } catch (const ReplyError &ex) { - // WRONGTYPE Operation against a key holding the wrong kind of value - exceptionString = ex.what(); - // Command execution error. - exceptionType = REDIS_PLUS_PLUS_REPLY_ERROR; - } catch (const TimeoutError &ex) { - // Reading or writing timeout - exceptionString = ex.what(); - // Connectivity related error. - exceptionType = REDIS_PLUS_PLUS_CONNECTION_ERROR; - } catch (const ClosedError &ex) { - // Connection has been closed. - exceptionString = ex.what(); - // Connectivity related error. - exceptionType = REDIS_PLUS_PLUS_CONNECTION_ERROR; - } catch (const IoError &ex) { - // I/O error on the connection. - exceptionString = ex.what(); - // Connectivity related error. - exceptionType = REDIS_PLUS_PLUS_CONNECTION_ERROR; - } catch (const Error &ex) { - // Other errors - exceptionString = ex.what(); - // Connectivity related error. - exceptionType = REDIS_PLUS_PLUS_OTHER_ERROR; - } - - // Is there an exception? - if(exceptionType != REDIS_PLUS_PLUS_NO_ERROR) { - SPLAPPTRC(L_ERROR, "c) Inside acquireLock, it failed with an exception for REDIS_PSETEX_CMD. Exception: " << exceptionString << ".", "RedisClusterPlusPlusDBLayer"); - // In any case, let us try the following code block which may also fail. - // Delete the erroneous lock data item we created. - try { - redis_cluster->del(distributedLockKey); - } catch (const Error &ex) { - // It is not good if this one fails. We can't do much about that. - } - - return(false); - } - - // We got the lock. + // We got an exclusive lock with an expiry time set for it. // Let us update the lock information now. if(updateLockInformation(lockIdString, lkError, 1, new_lock_expiry_time, getpid()) == true) { - return(true); + SPLAPPTRC(L_DEBUG, "a) Inside acquireLock, acquired a lock id " << lockIdString << " in " << (retryCnt+1) << " attempt(s).", "RedisClusterPlusPlusDBLayer"); + return(true); } else { // Some error occurred while updating the lock information. // It will be in an inconsistent state. Let us release the lock. - // After than, we will continue in the while loop. + SPLAPPTRC(L_DEBUG, "c) Inside acquireLock, acquired a lock id " << lockIdString << " in " << (retryCnt+1) << " attempt(s). However, update lock information failed. We will continue the retry to get this lock.", "RedisClusterPlusPlusDBLayer"); releaseLock(lock, lkError); } } else { @@ -5686,7 +5536,8 @@ namespace distributed pid_t _lockOwningPid = 0; if (readLockInformation(lockIdString, lkError, _lockUsageCnt, _lockExpirationTime, _lockOwningPid, _lockName) == false) { - SPLAPPTRC(L_DEBUG, "Inside acquireLock, it failed for lock id " << lockIdString << ". " << lkError.getErrorCode(), "RedisClusterPlusPlusDBLayer"); + SPLAPPTRC(L_DEBUG, "d) Inside acquireLock, it failed for lock id " << lockIdString << " while reading lock information. We will continue the retry to get this lock. Attempt number=" << (retryCnt+1) << ". Lock Error=" << lkError.getErrorCode(), "RedisClusterPlusPlusDBLayer"); +; } else { // Is current time greater than the lock expiration time? if ((_lockExpirationTime > 0) && (time(0) > (time_t)_lockExpirationTime)) { @@ -5701,8 +5552,8 @@ namespace distributed retryCnt++; if (retryCnt >= DPS_AND_DL_GET_LOCK_MAX_RETRY_CNT) { + SPLAPPTRC(L_DEBUG, "e) Inside acquireLock, it is not able to acquire a lock id " << lockIdString << " after " << retryCnt << " retry attempts. Caller will see a lock error code of " << DL_GET_LOCK_ERROR << ".", "RedisClusterPlusPlusDBLayer"); lkError.set("Unable to acquire the lock named " + lockIdString + ".", DL_GET_LOCK_ERROR); - SPLAPPTRC(L_DEBUG, "Inside acquireLock, it failed for a lock named " << lockIdString << ". " << DL_GET_LOCK_ERROR, "RedisClusterPlusPlusDBLayer"); // Our caller can check the error code and try to acquire the lock again. return(false); } @@ -5711,8 +5562,7 @@ namespace distributed time(&timeNow); if (difftime(startTime, timeNow) > maxWaitTimeToAcquireLock) { lkError.set("Unable to acquire the lock named " + lockIdString + " within the caller specified wait time.", DL_GET_LOCK_TIMEOUT_ERROR); - SPLAPPTRC(L_DEBUG, "Inside acquireLock, it failed to acquire the lock named " << lockIdString << - " within the caller specified wait time." << DL_GET_LOCK_TIMEOUT_ERROR, "RedisClusterPlusPlusDBLayer"); + SPLAPPTRC(L_DEBUG, "f) Inside acquireLock, it failed to acquire the lock named " << lockIdString << " within the caller specified wait time. Attempt number=" << retryCnt << ". Caller will see a lock error code of " << DL_GET_LOCK_TIMEOUT_ERROR << ".", "RedisClusterPlusPlusDBLayer"); // Our caller can check the error code and try to acquire the lock again. return(false); } @@ -5720,7 +5570,9 @@ namespace distributed // Yield control to other threads. Wait here with patience by doing an exponential back-off delay. usleep(DPS_AND_DL_GET_LOCK_SLEEP_TIME * (retryCnt%(DPS_AND_DL_GET_LOCK_MAX_RETRY_CNT/DPS_AND_DL_GET_LOCK_BACKOFF_DELAY_MOD_FACTOR))); - } // End of while(1) + } // End of while loop. + + return(false); } // End of acquireLock. void RedisClusterPlusPlusDBLayer::releaseLock(uint64_t lock, PersistenceError & lkError) { diff --git a/com.ibm.streamsx.dps/impl/src/RedisDBLayer.cpp b/com.ibm.streamsx.dps/impl/src/RedisDBLayer.cpp index b5f9dd9..50234c3 100644 --- a/com.ibm.streamsx.dps/impl/src/RedisDBLayer.cpp +++ b/com.ibm.streamsx.dps/impl/src/RedisDBLayer.cpp @@ -1,6 +1,6 @@ /* # Licensed Materials - Property of IBM -# Copyright IBM Corp. 2011, 2022 +# Copyright IBM Corp. 2011, 2023 # US Government Users Restricted Rights - Use, duplication or # disclosure restricted by GSA ADP Schedule Contract with # IBM Corp. @@ -666,15 +666,20 @@ namespace distributed if (redis_reply->integer == (int)0) { // It could be that our global store id is not there now. // Let us create one with an initial value of 0. - // Redis setnx is an atomic operation. It will succeed only for the very first operator that + // Redis set nx is an atomic operation. It will succeed only for the very first operator that // attempts to do this setting after a redis server is started fresh. If some other operator // already raced us ahead and created this guid_key, then our attempt below will be safely rejected. + // Senthil made NX related single API call code change in this method on Nov/25/2023. freeReplyObject(redis_reply); - cmd = string(REDIS_SETNX_CMD) + keyString + string(" ") + string("0"); + cmd = string(REDIS_SET_CMD) + keyString + string(" ") + string("0") + + string(" ") + string(REDIS_NX_OPTION); redis_reply = (redisReply*)redisCommand(redisPartitions[partitionIdx].rdsc, cmd.c_str()); } - freeReplyObject(redis_reply); + if(redis_reply != NULL) { + freeReplyObject(redis_reply); + } + SPLAPPTRC(L_DEBUG, "Inside connectToDatabase done", "RedisDBLayer"); } @@ -2189,79 +2194,65 @@ namespace distributed bool RedisDBLayer::acquireStoreLock(string const & storeIdString) { int32_t retryCnt = 0; string cmd = ""; + SPLAPPTRC(L_DEBUG, "Inside acquireStoreLock, starting one or more attempts to acquire a lock for store " << storeIdString << ".", "RedisDBLayer"); //Try to get a lock for this store. while (1) { - // '4' + 'store id' + 'dps_lock' => 1 - std::string storeLockKey = string(DPS_STORE_LOCK_TYPE) + storeIdString + DPS_LOCK_TOKEN; - int32_t partitionIdx = getRedisServerPartitionIndex(storeLockKey); - - // Return now if there is no valid connection to the Redis server. - if (redisPartitions[partitionIdx].rdsc == NULL) { - return(false); - } + // '4' + 'store id' + 'dps_lock' => 1 + std::string storeLockKey = string(DPS_STORE_LOCK_TYPE) + storeIdString + DPS_LOCK_TOKEN; + int32_t partitionIdx = getRedisServerPartitionIndex(storeLockKey); - // This is an atomic activity. - // If multiple threads attempt to do it at the same time, only one will succeed. - // Winner will hold the lock until they release it voluntarily or - // until the Redis back-end removes this lock entry after the DPS_AND_DL_GET_LOCK_TTL times out. - cmd = string(REDIS_SETNX_CMD) + storeLockKey + " " + "1"; - redis_reply = (redisReply*)redisCommand(redisPartitions[partitionIdx].rdsc, cmd.c_str()); - - if (redis_reply == NULL) { - return(false); - } + // Return now if there is no valid connection to the Redis server. + if (redisPartitions[partitionIdx].rdsc == NULL) { + return(false); + } - if (redis_reply->type == REDIS_REPLY_ERROR) { - // Problem in atomic creation of the store lock. - freeReplyObject(redis_reply); - return(false); - } + // This is an atomic activity that does a combined set with NX and EX options all done in one API call. + // If multiple threads attempt to do it at the same time, only one will succeed. + // Winner will hold the lock until they release it voluntarily or + // until the Redis back-end removes this lock entry after the DPS_AND_DL_GET_LOCK_TTL times out. + // Senthil made NX related single API call code change in this method on Nov/27/2023. + std::ostringstream expiry_time_stream; + expiry_time_stream << string("") << DPS_AND_DL_GET_LOCK_TTL; + cmd = string(REDIS_SET_CMD) + storeLockKey + string(" ") + string("1") + + string(" ") + string(REDIS_NX_OPTION) + string(" ") + string(REDIS_EX_OPTION) + + string(" ") + expiry_time_stream.str(); + redis_reply = (redisReply*)redisCommand(redisPartitions[partitionIdx].rdsc, cmd.c_str()); + + if (redis_reply == NULL) { + // A NULL redis reply could be connection related error. Let us return now. + SPLAPPTRC(L_DEBUG, "Inside acquireStoreLock, got a null redis_reply error when acquiring a lock for a store " << storeIdString << " during attempt number " << (retryCnt+1) << ". Returning false now without making any further attempts to acquire a lock.", "RedisDBLayer"); + return(false); + } - if (redis_reply->integer == (int)1) { - // We got the lock. - // Set the expiration time for this lock key. - freeReplyObject(redis_reply); - std::ostringstream cmd_stream; - cmd_stream << string(REDIS_EXPIRE_CMD) << storeLockKey << " " << DPS_AND_DL_GET_LOCK_TTL; - cmd = cmd_stream.str(); - redis_reply = (redisReply*)redisCommand(redisPartitions[partitionIdx].rdsc, cmd.c_str()); + string redis_reply_string = ""; + int redis_reply_type = redis_reply->type; - if (redis_reply == NULL) { - // Delete the erroneous lock data item we created. - cmd = string(REDIS_DEL_CMD) + " " + storeLockKey; - redis_reply = (redisReply*)redisCommand(redisPartitions[partitionIdx].rdsc, cmd.c_str()); - freeReplyObject(redis_reply); - return(false); - } + if (redis_reply_type == REDIS_REPLY_STATUS) { + redis_reply_string = string(redis_reply->str, redis_reply->len); - if (redis_reply->type == REDIS_REPLY_ERROR) { - // Problem in atomic creation of the store lock. - freeReplyObject(redis_reply); - // Delete the erroneous lock data item we created. - cmd = string(REDIS_DEL_CMD) + " " + storeLockKey; - redis_reply = (redisReply*)redisCommand(redisPartitions[partitionIdx].rdsc, cmd.c_str()); - freeReplyObject(redis_reply); - return(false); - } + if(redis_reply_string == string("OK")) { + // We got an exclusive lock with an expiry time set for it. + freeReplyObject(redis_reply); + SPLAPPTRC(L_DEBUG, "Inside acquireStoreLock, acquired a lock for store " << storeIdString << " in " << (retryCnt+1) << " attempt(s).", "RedisDBLayer"); + return(true); + } + } - freeReplyObject(redis_reply); - return(true); - } + freeReplyObject(redis_reply); + // Someone else is holding on to the lock of this store. Wait for a while before trying again. + retryCnt++; - freeReplyObject(redis_reply); - // Someone else is holding on to the lock of this store. Wait for a while before trying again. - retryCnt++; + if (retryCnt >= DPS_AND_DL_GET_LOCK_MAX_RETRY_CNT) { + SPLAPPTRC(L_DEBUG, "Inside acquireStoreLock, it is not able to acquire a lock for store " << storeIdString << " after " << retryCnt << " retry attempts. Redis server reply type=" << redis_reply_type << ". Redis server reply string=" << redis_reply_string, "RedisDBLayer"); - if (retryCnt >= DPS_AND_DL_GET_LOCK_MAX_RETRY_CNT) { + return(false); + } - return(false); - } - - // Yield control to other threads. Wait here with patience by doing an exponential back-off delay. - usleep(DPS_AND_DL_GET_LOCK_SLEEP_TIME * - (retryCnt%(DPS_AND_DL_GET_LOCK_MAX_RETRY_CNT/DPS_AND_DL_GET_LOCK_BACKOFF_DELAY_MOD_FACTOR))); - } + // Yield control to other threads. Wait here with patience by doing an exponential back-off delay. + usleep(DPS_AND_DL_GET_LOCK_SLEEP_TIME * + (retryCnt%(DPS_AND_DL_GET_LOCK_MAX_RETRY_CNT/DPS_AND_DL_GET_LOCK_BACKOFF_DELAY_MOD_FACTOR))); + } // End of while loop. return(false); } @@ -2890,12 +2881,13 @@ namespace distributed } } - // This method will acquire a lock for any given generic/arbitrary identifier passed as a string.. + // This method will acquire a lock for any given generic/arbitrary identifier passed as a string. // This is typically used inside the createStore, createOrGetStore, createOrGetLock methods to // provide thread safety. There are other lock acquisition/release methods once someone has a valid store id or lock id. bool RedisDBLayer::acquireGeneralPurposeLock(string const & entityName) { int32_t retryCnt = 0; string cmd = ""; + SPLAPPTRC(L_DEBUG, "Inside acquireGeneralPurposeLock, starting one or more attempts to acquire a lock for a generic id " << entityName << ".", "RedisDBLayer"); //Try to get a lock for this generic entity. while (1) { @@ -2908,68 +2900,52 @@ namespace distributed return(false); } - // This is an atomic activity. + // This is an atomic activity that does a combined set with NX and EX options all done in one API call. // If multiple threads attempt to do it at the same time, only one will succeed. // Winner will hold the lock until they release it voluntarily or // until the Redis back-end removes this lock entry after the DPS_AND_DL_GET_LOCK_TTL times out. - cmd = string(REDIS_SETNX_CMD) + genericLockKey + " " + "1"; - + // Senthil made NX related single API call code change in this method on Nov/27/2023. + std::ostringstream expiry_time_stream; + expiry_time_stream << string("") << DPS_AND_DL_GET_LOCK_TTL; + cmd = string(REDIS_SET_CMD) + genericLockKey + string(" ") + string("1") + + string(" ") + string(REDIS_NX_OPTION) + string(" ") + string(REDIS_EX_OPTION) + + string(" ") + expiry_time_stream.str(); redis_reply = (redisReply*)redisCommand(redisPartitions[partitionIdx].rdsc, cmd.c_str()); if (redis_reply == NULL) { - return(false); + // A NULL redis reply could be connection related error. Let us return now. + SPLAPPTRC(L_DEBUG, "Inside acquireGeneralPurposeLock, got a null redis_reply error when acquiring a lock for a generic id " << entityName << " during attempt number " << (retryCnt+1) << ". Returning false now without making any further attempts to acquire a lock.", "RedisDBLayer"); + return(false); } - if (redis_reply->type == REDIS_REPLY_ERROR) { - // Problem in atomic creation of the general purpose lock. - freeReplyObject(redis_reply); - return(false); - } + string redis_reply_string = ""; + int redis_reply_type = redis_reply->type; - if (redis_reply->integer == (int)1) { - // We got the lock. - // Set the expiration time for this lock key. - freeReplyObject(redis_reply); - std::ostringstream cmd_stream; - cmd_stream << string(REDIS_EXPIRE_CMD) << genericLockKey << " " << DPS_AND_DL_GET_LOCK_TTL; - cmd = cmd_stream.str(); - redis_reply = (redisReply*)redisCommand(redisPartitions[partitionIdx].rdsc, cmd.c_str()); + if (redis_reply_type == REDIS_REPLY_STATUS) { + redis_reply_string = string(redis_reply->str, redis_reply->len); - if (redis_reply == NULL) { - // Delete the erroneous lock data item we created. - cmd = string(REDIS_DEL_CMD) + " " + genericLockKey; - redis_reply = (redisReply*)redisCommand(redisPartitions[partitionIdx].rdsc, cmd.c_str()); - freeReplyObject(redis_reply); - return(false); - } - - if (redis_reply->type == REDIS_REPLY_ERROR) { - // Problem in atomic creation of the general purpose lock. - freeReplyObject(redis_reply); - // Delete the erroneous lock data item we created. - cmd = string(REDIS_DEL_CMD) + " " + genericLockKey; - redis_reply = (redisReply*)redisCommand(redisPartitions[partitionIdx].rdsc, cmd.c_str()); - freeReplyObject(redis_reply); - return(false); - } - - freeReplyObject(redis_reply); - return(true); + if(redis_reply_string == string("OK")) { + // We got an exclusive lock with an expiry time set for it. + freeReplyObject(redis_reply); + SPLAPPTRC(L_DEBUG, "Inside acquireGeneralPurposeLock, acquired a lock for a generic id " << entityName << " in " << (retryCnt+1) << " attempt(s).", "RedisDBLayer"); + return(true); + } } freeReplyObject(redis_reply); - // Someone else is holding on to the lock of this entity. Wait for a while before trying again. + // Someone else is holding on to the lock of this generic id. Wait for a while before trying again. retryCnt++; if (retryCnt >= DPS_AND_DL_GET_LOCK_MAX_RETRY_CNT) { + SPLAPPTRC(L_DEBUG, "Inside acquireGeneralPurposeLock, it is not able to acquire a lock for a generic id " << entityName << " after " << retryCnt << " retry attempts. Redis server reply type=" << redis_reply_type << ". Redis server reply string=" << redis_reply_string, "RedisDBLayer"); - return(false); + return(false); } // Yield control to other threads. Wait here with patience by doing an exponential back-off delay. usleep(DPS_AND_DL_GET_LOCK_SLEEP_TIME * (retryCnt%(DPS_AND_DL_GET_LOCK_MAX_RETRY_CNT/DPS_AND_DL_GET_LOCK_BACKOFF_DELAY_MOD_FACTOR))); - } + } // End of while loop. return(false); } @@ -4157,7 +4133,7 @@ namespace distributed SPLAPPTRC(L_DEBUG, "Inside acquireLock, it failed to check for the existence of lock id " << lockIdString << ". " << lkError.getErrorCode(), "RedisDBLayer"); } else { lkError.set("No lock exists for the LockId " + lockIdString + ".", DL_INVALID_LOCK_ID_ERROR); - SPLAPPTRC(L_DEBUG, "Inside acquireLock, it failed for lock id " << lockIdString << ". " << DL_INVALID_LOCK_ID_ERROR, "RedisDBLayer"); + SPLAPPTRC(L_DEBUG, "Inside acquireLock, no lock exists for lock id " << lockIdString << ". " << DL_INVALID_LOCK_ID_ERROR, "RedisDBLayer"); } return(false); @@ -4171,120 +4147,111 @@ namespace distributed // Return now if there is no valid connection to the Redis server. if (redisPartitions[partitionIdx].rdsc == NULL) { lkError.set("There is no valid connection to the Redis server at this time.", DPS_CONNECTION_ERROR); - SPLAPPTRC(L_DEBUG, "Inside acquireLock, it failed for distributedLockKey " << distributedLockKey << ". There is no valid connection to the Redis server at this time. " << DPS_CONNECTION_ERROR, "RedisDBLayer"); + SPLAPPTRC(L_DEBUG, "Inside acquireLock, it failed for lock id " << lockIdString << ". There is no valid connection to the Redis server at this time. " << DPS_CONNECTION_ERROR, "RedisDBLayer"); return(false); } time_t startTime, timeNow; // Get the start time for our lock acquisition attempts. time(&startTime); + SPLAPPTRC(L_DEBUG, "Inside acquireLock, starting one or more attempts to acquire a lock id " << lockIdString << ".", "RedisDBLayer"); //Try to get a distributed lock. while(1) { - // This is an atomic activity. - // If multiple threads attempt to do it at the same time, only one will succeed. - // Winner will hold the lock until they release it voluntarily or - // until the Redis back-end removes this lock entry after the lease time ends. - // We will add the lease time to the current timestamp i.e. seconds elapsed since the epoch. - time_t new_lock_expiry_time = time(0) + (time_t)leaseTime; - cmd = string(REDIS_SETNX_CMD) + distributedLockKey + " " + "1"; - redis_reply = (redisReply*)redisCommand(redisPartitions[partitionIdx].rdsc, cmd.c_str()); - - if (redis_reply == NULL) { - return(false); - } - - if (redis_reply->type == REDIS_REPLY_ERROR) { - // Problem in atomic creation of the distributed lock. - freeReplyObject(redis_reply); - return(false); - } - - if (redis_reply->integer == (int)1) { - // We got the lock. - // Set the expiration time for this lock key. - freeReplyObject(redis_reply); - ostringstream expiryTimeInMillis; - expiryTimeInMillis << (leaseTime*1000.00); - cmd = string(REDIS_PSETEX_CMD) + distributedLockKey + " " + expiryTimeInMillis.str() + " " + "2"; - redis_reply = (redisReply*)redisCommand(redisPartitions[partitionIdx].rdsc, cmd.c_str()); + // This is an atomic activity that does a combined set with NX and PX options all done in one API call. + // If multiple threads attempt to do it at the same time, only one will succeed. + // Winner will hold the lock until they release it voluntarily or + // until the Redis back-end removes this lock entry after the lease time ends. + // We will add the lease time to the current timestamp i.e. seconds elapsed since the epoch. + // Senthil made NX related single API call code change in this method on Nov/27/2023. + time_t new_lock_expiry_time = time(0) + (time_t)leaseTime; + ostringstream expiryTimeInMillis; + expiryTimeInMillis << string("") << (leaseTime*1000.00); + cmd = string(REDIS_SET_CMD) + distributedLockKey + string(" ") + string("1") + + string(" ") + string(REDIS_NX_OPTION) + string(" ") + string(REDIS_PX_OPTION) + + string(" ") + expiryTimeInMillis.str(); + redis_reply = (redisReply*)redisCommand(redisPartitions[partitionIdx].rdsc, cmd.c_str()); - if (redis_reply == NULL) { - // Delete the erroneous lock data item we created. - cmd = string(REDIS_DEL_CMD) + " " + distributedLockKey; - redis_reply = (redisReply*)redisCommand(redisPartitions[partitionIdx].rdsc, cmd.c_str()); - freeReplyObject(redis_reply); - return(false); - } + if (redis_reply == NULL) { + // A NULL redis reply could be connection related error. Let us return now. + SPLAPPTRC(L_DEBUG, "Inside acquireLock, got a null redis_reply error when acquiring a lock id " << lockIdString << " during attempt number " << (retryCnt+1) << ". Returning false now without making any further attempts to acquire a lock.", "RedisDBLayer"); + return(false); + } - if (redis_reply->type == REDIS_REPLY_ERROR) { - // Problem in atomic creation of the general purpose lock. - freeReplyObject(redis_reply); - // Delete the erroneous lock data item we created. - cmd = string(REDIS_DEL_CMD) + " " + distributedLockKey; - redis_reply = (redisReply*)redisCommand(redisPartitions[partitionIdx].rdsc, cmd.c_str()); - freeReplyObject(redis_reply); - return(false); - } + string redis_reply_string = ""; + int redis_reply_type = redis_reply->type; + bool redisReplyObjectFreed = false; + + if (redis_reply_type == REDIS_REPLY_STATUS) { + redis_reply_string = string(redis_reply->str, redis_reply->len); + freeReplyObject(redis_reply); + redisReplyObjectFreed = true; + + if(redis_reply_string == string("OK")) { + // We got an exclusive lock with an expiry time set for it. + // Let us update the lock information now. + if(updateLockInformation(lockIdString, lkError, 1, new_lock_expiry_time, getpid()) == true) { + SPLAPPTRC(L_DEBUG, "Inside acquireLock, acquired a lock id " << lockIdString << " in " << (retryCnt+1) << " attempt(s).", "RedisDBLayer"); + return(true); + } else { + // Some error occurred while updating the lock information. + // It will be in an inconsistent state. Let us release the lock. + SPLAPPTRC(L_DEBUG, "Inside acquireLock, acquired a lock id " << lockIdString << " in " << (retryCnt+1) << " attempt(s). However, update lock information failed. We will continue the retry to get this lock.", "RedisDBLayer"); + releaseLock(lock, lkError); + } + } + } - freeReplyObject(redis_reply); + if(redisReplyObjectFreed == false) { + // It was not freed in the previous if block. Let us free it now. + freeReplyObject(redis_reply); + } - // We got the lock. - // Let us update the lock information now. - if(updateLockInformation(lockIdString, lkError, 1, new_lock_expiry_time, getpid()) == true) { - return(true); - } else { - // Some error occurred while updating the lock information. - // It will be in an inconsistent state. Let us release the lock. - releaseLock(lock, lkError); - } - } else { - // We didn't get the lock. - // Let us check if the previous owner of this lock simply forgot to release it. - // In that case, we will release this expired lock. - // Read the time at which this lock is expected to expire. - freeReplyObject(redis_reply); - uint32_t _lockUsageCnt = 0; - int32_t _lockExpirationTime = 0; - std::string _lockName = ""; - pid_t _lockOwningPid = 0; + // We didn't get the lock. + // Let us check if the previous owner of this lock simply forgot to release it. + // In that case, we will release this expired lock. + // Read the time at which this lock is expected to expire. + uint32_t _lockUsageCnt = 0; + int32_t _lockExpirationTime = 0; + std::string _lockName = ""; + pid_t _lockOwningPid = 0; + + if (readLockInformation(lockIdString, lkError, _lockUsageCnt, _lockExpirationTime, _lockOwningPid, _lockName) == false) { + SPLAPPTRC(L_DEBUG, "Inside acquireLock, it failed for lock id " << lockIdString << " while reading lock information. We will continue the retry to get this lock. Attempt number=" << (retryCnt+1) << ". Lock Error=" << lkError.getErrorCode(), "RedisDBLayer"); + } else { + // Is current time greater than the lock expiration time? + if ((_lockExpirationTime > 0) && (time(0) > (time_t)_lockExpirationTime)) { + // Time has passed beyond the lease of this lock. + // Lease expired for this lock. Original owner forgot to release the lock and simply left it hanging there without a valid lease. + releaseLock(lock, lkError); + } + } - if (readLockInformation(lockIdString, lkError, _lockUsageCnt, _lockExpirationTime, _lockOwningPid, _lockName) == false) { - SPLAPPTRC(L_DEBUG, "Inside acquireLock, it failed for lock id " << lockIdString << ". " << lkError.getErrorCode(), "RedisDBLayer"); - } else { - // Is current time greater than the lock expiration time? - if ((_lockExpirationTime > 0) && (time(0) > (time_t)_lockExpirationTime)) { - // Time has passed beyond the lease of this lock. - // Lease expired for this lock. Original owner forgot to release the lock and simply left it hanging there without a valid lease. - releaseLock(lock, lkError); - } - } - } + // Someone else is holding on to the lock we are trying to acquire. Wait for a while before trying again. + retryCnt++; - // Someone else is holding on to this distributed lock. Wait for a while before trying again. - retryCnt++; + if (retryCnt >= DPS_AND_DL_GET_LOCK_MAX_RETRY_CNT) { + SPLAPPTRC(L_DEBUG, "Inside acquireLock, it is not able to acquire a lock id " << lockIdString << " after " << retryCnt << " retry attempts. Redis server reply type=" << redis_reply_type << ". Redis server reply string=" << redis_reply_string << ". Caller will see a lock error code of " << DL_GET_LOCK_ERROR << ".", "RedisDBLayer"); + lkError.set("Unable to acquire the lock named " + lockIdString + ".", DL_GET_LOCK_ERROR); + // Our caller can check the lock error code and try to acquire the lock again. + return(false); + } - if (retryCnt >= DPS_AND_DL_GET_LOCK_MAX_RETRY_CNT) { - lkError.set("Unable to acquire the lock named " + lockIdString + ".", DL_GET_LOCK_ERROR); - SPLAPPTRC(L_DEBUG, "Inside acquireLock, it failed for a lock named " << lockIdString << ". " << DL_GET_LOCK_ERROR, "RedisDBLayer"); - // Our caller can check the error code and try to acquire the lock again. - return(false); - } + // Check if we have gone past the maximum wait time the caller was willing to wait in order to acquire this lock. + time(&timeNow); + if (difftime(startTime, timeNow) > maxWaitTimeToAcquireLock) { + lkError.set("Unable to acquire the lock named " + lockIdString + " within the caller specified wait time.", DL_GET_LOCK_TIMEOUT_ERROR); + SPLAPPTRC(L_DEBUG, "Inside acquireLock, it failed to acquire the lock named " << lockIdString << " within the caller specified wait time. Attempt number=" << retryCnt << ". Caller will see a lock error code of " << DL_GET_LOCK_TIMEOUT_ERROR << ".", "RedisDBLayer"); + // Our caller can check the lock error code and try to acquire the lock again. + return(false); + } - // Check if we have gone past the maximum wait time the caller was willing to wait in order to acquire this lock. - time(&timeNow); - if (difftime(startTime, timeNow) > maxWaitTimeToAcquireLock) { - lkError.set("Unable to acquire the lock named " + lockIdString + " within the caller specified wait time.", DL_GET_LOCK_TIMEOUT_ERROR); - SPLAPPTRC(L_DEBUG, "Inside acquireLock, it failed to acquire the lock named " << lockIdString << - " within the caller specified wait time." << DL_GET_LOCK_TIMEOUT_ERROR, "RedisDBLayer"); - // Our caller can check the error code and try to acquire the lock again. - return(false); - } + // Yield control to other threads. Wait here with patience by doing an exponential back-off delay. + usleep(DPS_AND_DL_GET_LOCK_SLEEP_TIME * + (retryCnt%(DPS_AND_DL_GET_LOCK_MAX_RETRY_CNT/DPS_AND_DL_GET_LOCK_BACKOFF_DELAY_MOD_FACTOR))); + } // End of while loop. - // Yield control to other threads. Wait here with patience by doing an exponential back-off delay. - usleep(DPS_AND_DL_GET_LOCK_SLEEP_TIME * - (retryCnt%(DPS_AND_DL_GET_LOCK_MAX_RETRY_CNT/DPS_AND_DL_GET_LOCK_BACKOFF_DELAY_MOD_FACTOR))); - } // End of while(1) + return(false); } void RedisDBLayer::releaseLock(uint64_t lock, PersistenceError & lkError) { diff --git a/com.ibm.streamsx.dps/info.xml b/com.ibm.streamsx.dps/info.xml index e219aa8..2e1ef78 100644 --- a/com.ibm.streamsx.dps/info.xml +++ b/com.ibm.streamsx.dps/info.xml @@ -235,7 +235,7 @@ To specifically learn how to call the DPS APIs from SPL native functions, C++ an # Reference information [../../javadoc/dps/index.html| DPS Java API Reference] - 4.1.7 + 4.1.8 4.2.0.0 diff --git a/dependencies/hiredis/Makefile b/dependencies/hiredis/Makefile index 3161708..330e546 100644 --- a/dependencies/hiredis/Makefile +++ b/dependencies/hiredis/Makefile @@ -1,7 +1,7 @@ include ../make.variable.include PKG_NAME := hiredis -VERSION := 1.0.0 +VERSION := 1.2.0 ARCHIVE := v$(VERSION).tar.gz ARCHIVE_INSTALL := $(PKG_NAME)-$(VERSION)-$(OS)-$(ARCH)-install-bin.tar.gz URL := https://github.com/redis/$(PKG_NAME)/archive/$(ARCHIVE) diff --git a/dependencies/hiredis/hiredis-1.0.0-el7-x86_64-install-bin.tar.gz b/dependencies/hiredis/hiredis-1.0.0-el7-x86_64-install-bin.tar.gz deleted file mode 100644 index c65293b..0000000 Binary files a/dependencies/hiredis/hiredis-1.0.0-el7-x86_64-install-bin.tar.gz and /dev/null differ diff --git a/dependencies/hiredis/hiredis-1.2.0-el7-x86_64-install-bin.tar.gz b/dependencies/hiredis/hiredis-1.2.0-el7-x86_64-install-bin.tar.gz new file mode 100644 index 0000000..d765d3e Binary files /dev/null and b/dependencies/hiredis/hiredis-1.2.0-el7-x86_64-install-bin.tar.gz differ diff --git a/dependencies/hiredis/patch/sds.h.patch b/dependencies/hiredis/patch/sds.h.patch index 9ecf8d9..cf89fb6 100644 --- a/dependencies/hiredis/patch/sds.h.patch +++ b/dependencies/hiredis/patch/sds.h.patch @@ -1,16 +1,17 @@ ---- hiredis-1.0.0/sds.h.orig 2020-08-03 14:18:07.000000000 -0400 -+++ hiredis-1.0.0/sds.h 2020-10-10 13:43:56.988507551 -0400 -@@ -44,6 +44,9 @@ +--- hiredis-1.2.0/sds.h.org 2023-07-12 03:31:17.000000000 -0400 ++++ hiredis-1.2.0/sds.h 2023-11-21 17:48:03.791196553 -0500 +@@ -46,6 +46,10 @@ #include #include +#ifdef __cplusplus +extern "C" { +#endif ++ typedef char *sds; /* Note: sdshdr5 is never used, we just access the flags byte directly. -@@ -275,4 +278,8 @@ +@@ -277,4 +281,8 @@ int sdsTest(int argc, char *argv[]); #endif diff --git a/dependencies/hiredis/v1.0.0.tar.gz b/dependencies/hiredis/v1.0.0.tar.gz deleted file mode 100644 index c41632b..0000000 Binary files a/dependencies/hiredis/v1.0.0.tar.gz and /dev/null differ diff --git a/dependencies/hiredis/v1.2.0.tar.gz b/dependencies/hiredis/v1.2.0.tar.gz new file mode 100644 index 0000000..247edb8 Binary files /dev/null and b/dependencies/hiredis/v1.2.0.tar.gz differ diff --git a/dependencies/redis-plus-plus/Makefile b/dependencies/redis-plus-plus/Makefile index c70fec8..4185c3d 100644 --- a/dependencies/redis-plus-plus/Makefile +++ b/dependencies/redis-plus-plus/Makefile @@ -1,13 +1,13 @@ include ../make.variable.include PKG_NAME := redis-plus-plus -VERSION := 1.2.1 +VERSION := 1.3.10 ARCHIVE := $(PKG_NAME)-$(VERSION).tar.gz ARCHIVE_INSTALL := $(PKG_NAME)-$(VERSION)-$(OS)-$(ARCH)-install-bin.tar.gz PKG_DIR = $(PKG_NAME)-$(VERSION) GEN_DIR := $(shell pwd)/gen HIREDIS_PKG := hiredis -HIREDIS_VERSION := 1.0.0 +HIREDIS_VERSION := 1.2.0 HIREDIS_PKG_DIR := $(HIREDIS_PKG)-$(HIREDIS_VERSION) HIREDIS_SRC_DIR := $(HIREDIS_PKG)-$(HIREDIS_VERSION)-src @@ -17,18 +17,20 @@ all: @test -n "$(TARGETDIR)" || false mkdir -p $(TARGETDIR) tar -C $(TARGETDIR) -xzvf $(ARCHIVE_INSTALL) + mv $(TARGETDIR)/lib64/* $(TARGETDIR)/lib + rm -rf $(TARGETDIR)/lib64 rm -f $(ARCHIVE_INSTALL) $(ARCHIVE_INSTALL): $(PKG_DIR)/Makefile $(MAKE) -C $(HIREDIS_SRC_DIR) PREFIX=../$(HIREDIS_PKG_DIR) USE_SSL=1 $(MAKE) -C $(HIREDIS_SRC_DIR) PREFIX=../$(HIREDIS_PKG_DIR) USE_SSL=1 install - cd $(PKG_DIR) && cmake -DCMAKE_BUILD_TYPE=Release -DCMAKE_PREFIX_PATH=../$(HIREDIS_PKG_DIR) -DCMAKE_INSTALL_PREFIX=../$(PKG_NAME) -DREDIS_PLUS_PLUS_BUILD_TEST=OFF -DREDIS_PLUS_PLUS_BUILD_STATIC=OFF -DREDIS_PLUS_PLUS_USE_TLS=ON + cd $(PKG_DIR) && cmake -DCMAKE_BUILD_TYPE=Release -DCMAKE_PREFIX_PATH=../$(HIREDIS_PKG_DIR) -DCMAKE_INSTALL_PREFIX=../$(PKG_NAME) -DREDIS_PLUS_PLUS_BUILD_TEST=OFF -DREDIS_PLUS_PLUS_BUILD_STATIC=OFF -DREDIS_PLUS_PLUS_USE_TLS=ON -DREDIS_PLUS_PLUS_CXX_STANDARD=11 cd $(PKG_DIR) && $(MAKE) cd $(PKG_DIR) && $(MAKE) install rm -rf $(HIREDIS_SRC_DIR) rm -rf $(HIREDIS_PKG_DIR) rm -rf $(PKG_DIR) - tar -C $(PKG_NAME) -czvf $(ARCHIVE_INSTALL) ./lib ./include + tar -C $(PKG_NAME) -czvf $(ARCHIVE_INSTALL) ./lib64 ./include rm -rf $(PKG_NAME) $(PKG_DIR)/Makefile: $(ARCHIVE) @@ -39,7 +41,6 @@ $(PKG_DIR)/Makefile: $(ARCHIVE) clean: rm -rf $(PKG_DIR) rm -rf $(PKG_NAME) - rm -f $(ARCHIVE_INSTALL) rm -rf $(HIREDIS_SRC_DIR) rm -rf $(HIREDIS_PKG_DIR) rm -rf $(TARGETDIR)/lib/libredis++* diff --git a/dependencies/redis-plus-plus/hiredis-1.0.0.tar.gz b/dependencies/redis-plus-plus/hiredis-1.0.0.tar.gz deleted file mode 100644 index c41632b..0000000 Binary files a/dependencies/redis-plus-plus/hiredis-1.0.0.tar.gz and /dev/null differ diff --git a/dependencies/redis-plus-plus/hiredis-1.2.0.tar.gz b/dependencies/redis-plus-plus/hiredis-1.2.0.tar.gz new file mode 100644 index 0000000..247edb8 Binary files /dev/null and b/dependencies/redis-plus-plus/hiredis-1.2.0.tar.gz differ diff --git a/dependencies/redis-plus-plus/redis-plus-plus-1.2.1.tar.gz b/dependencies/redis-plus-plus/redis-plus-plus-1.2.1.tar.gz deleted file mode 100644 index d5b6ca5..0000000 Binary files a/dependencies/redis-plus-plus/redis-plus-plus-1.2.1.tar.gz and /dev/null differ diff --git a/dependencies/redis-plus-plus/redis-plus-plus-1.3.10.tar.gz b/dependencies/redis-plus-plus/redis-plus-plus-1.3.10.tar.gz new file mode 100644 index 0000000..4c5be1c Binary files /dev/null and b/dependencies/redis-plus-plus/redis-plus-plus-1.3.10.tar.gz differ diff --git a/dps-technical-positioning.pdf b/dps-technical-positioning.pdf index 0573309..9359167 100644 Binary files a/dps-technical-positioning.pdf and b/dps-technical-positioning.pdf differ diff --git a/samples/DPSUsageFromCpp/etc/no-sql-kv-store-servers.cfg b/samples/DPSUsageFromCpp/etc/no-sql-kv-store-servers.cfg index de4e93f..dd148b6 100644 --- a/samples/DPSUsageFromCpp/etc/no-sql-kv-store-servers.cfg +++ b/samples/DPSUsageFromCpp/etc/no-sql-kv-store-servers.cfg @@ -177,13 +177,15 @@ # replica nodes (server name or IP address and a port number). Please refer to the commentary above # for the non-clustered Redis to understand the connection timeout and TLS usage. Both are supported in # the clister-mode Redis as well when configuring your key value store product as redis-cluster-plus-plus. +# If TLS for the Redis Cluster is enabled, three fully qualified file names for TLS certificate, TLS key and +# TLS CA certificate must be specified in the order shown below. # -# RedisServerNameOrIPAddress:port:RedisPassword:ConnectionTimeoutValue:use_tls:RedisClusterCACertificateFile +# RedisServerNameOrIPAddress:port:RedisClusterPassword:ConnectionTimeoutValue:use_tls:RedisClusterTlsCertificateFileName:RedisClusterTlsKeyileName:RedisClusterCACertificateFileName # (e-g:) -# Machine1:7001:MyRedisPassword:7:1:/home/streamsadmin/my-redis-ca.crt -# Machine2:7001:MyRedisPassword:7:1:/home/streamsadmin/my-redis-ca.crt -# Machine3:7001:MyRedisPassword:7:1:/home/streamsadmin/my-redis-ca.crt -# Machine4:7001:MyRedisPassword:7:1:/home/streamsadmin/my-redis-ca.crt +# Machine1:7001:MyRedisPassword:7:1:/home/streamsadmin/my-redis.crt:/home/streamsadmin/my-redis.key:/home/streamsadmin/my-redis-ca.crt +# Machine2:7001:MyRedisPassword:7:1:/home/streamsadmin/my-redis.crt:/home/streamsadmin/my-redis.key:/home/streamsadmin/my-redis-ca.crt +# Machine3:7001:MyRedisPassword:7:1:/home/streamsadmin/my-redis.crt:/home/streamsadmin/my-redis.key:/home/streamsadmin/my-redis-ca.crt +# Machine4:7001:MyRedisPassword:7:1:/home/streamsadmin/my-redis.crt:/home/streamsadmin/my-redis.key:/home/streamsadmin/my-redis-ca.crt # # =============================================================================== redis diff --git a/samples/DPSUsageFromJava/etc/no-sql-kv-store-servers.cfg b/samples/DPSUsageFromJava/etc/no-sql-kv-store-servers.cfg index de4e93f..dd148b6 100644 --- a/samples/DPSUsageFromJava/etc/no-sql-kv-store-servers.cfg +++ b/samples/DPSUsageFromJava/etc/no-sql-kv-store-servers.cfg @@ -177,13 +177,15 @@ # replica nodes (server name or IP address and a port number). Please refer to the commentary above # for the non-clustered Redis to understand the connection timeout and TLS usage. Both are supported in # the clister-mode Redis as well when configuring your key value store product as redis-cluster-plus-plus. +# If TLS for the Redis Cluster is enabled, three fully qualified file names for TLS certificate, TLS key and +# TLS CA certificate must be specified in the order shown below. # -# RedisServerNameOrIPAddress:port:RedisPassword:ConnectionTimeoutValue:use_tls:RedisClusterCACertificateFile +# RedisServerNameOrIPAddress:port:RedisClusterPassword:ConnectionTimeoutValue:use_tls:RedisClusterTlsCertificateFileName:RedisClusterTlsKeyileName:RedisClusterCACertificateFileName # (e-g:) -# Machine1:7001:MyRedisPassword:7:1:/home/streamsadmin/my-redis-ca.crt -# Machine2:7001:MyRedisPassword:7:1:/home/streamsadmin/my-redis-ca.crt -# Machine3:7001:MyRedisPassword:7:1:/home/streamsadmin/my-redis-ca.crt -# Machine4:7001:MyRedisPassword:7:1:/home/streamsadmin/my-redis-ca.crt +# Machine1:7001:MyRedisPassword:7:1:/home/streamsadmin/my-redis.crt:/home/streamsadmin/my-redis.key:/home/streamsadmin/my-redis-ca.crt +# Machine2:7001:MyRedisPassword:7:1:/home/streamsadmin/my-redis.crt:/home/streamsadmin/my-redis.key:/home/streamsadmin/my-redis-ca.crt +# Machine3:7001:MyRedisPassword:7:1:/home/streamsadmin/my-redis.crt:/home/streamsadmin/my-redis.key:/home/streamsadmin/my-redis-ca.crt +# Machine4:7001:MyRedisPassword:7:1:/home/streamsadmin/my-redis.crt:/home/streamsadmin/my-redis.key:/home/streamsadmin/my-redis-ca.crt # # =============================================================================== redis diff --git a/samples/DPSUsageFromSPL/etc/no-sql-kv-store-servers.cfg b/samples/DPSUsageFromSPL/etc/no-sql-kv-store-servers.cfg index de4e93f..dd148b6 100644 --- a/samples/DPSUsageFromSPL/etc/no-sql-kv-store-servers.cfg +++ b/samples/DPSUsageFromSPL/etc/no-sql-kv-store-servers.cfg @@ -177,13 +177,15 @@ # replica nodes (server name or IP address and a port number). Please refer to the commentary above # for the non-clustered Redis to understand the connection timeout and TLS usage. Both are supported in # the clister-mode Redis as well when configuring your key value store product as redis-cluster-plus-plus. +# If TLS for the Redis Cluster is enabled, three fully qualified file names for TLS certificate, TLS key and +# TLS CA certificate must be specified in the order shown below. # -# RedisServerNameOrIPAddress:port:RedisPassword:ConnectionTimeoutValue:use_tls:RedisClusterCACertificateFile +# RedisServerNameOrIPAddress:port:RedisClusterPassword:ConnectionTimeoutValue:use_tls:RedisClusterTlsCertificateFileName:RedisClusterTlsKeyileName:RedisClusterCACertificateFileName # (e-g:) -# Machine1:7001:MyRedisPassword:7:1:/home/streamsadmin/my-redis-ca.crt -# Machine2:7001:MyRedisPassword:7:1:/home/streamsadmin/my-redis-ca.crt -# Machine3:7001:MyRedisPassword:7:1:/home/streamsadmin/my-redis-ca.crt -# Machine4:7001:MyRedisPassword:7:1:/home/streamsadmin/my-redis-ca.crt +# Machine1:7001:MyRedisPassword:7:1:/home/streamsadmin/my-redis.crt:/home/streamsadmin/my-redis.key:/home/streamsadmin/my-redis-ca.crt +# Machine2:7001:MyRedisPassword:7:1:/home/streamsadmin/my-redis.crt:/home/streamsadmin/my-redis.key:/home/streamsadmin/my-redis-ca.crt +# Machine3:7001:MyRedisPassword:7:1:/home/streamsadmin/my-redis.crt:/home/streamsadmin/my-redis.key:/home/streamsadmin/my-redis-ca.crt +# Machine4:7001:MyRedisPassword:7:1:/home/streamsadmin/my-redis.crt:/home/streamsadmin/my-redis.key:/home/streamsadmin/my-redis-ca.crt # # =============================================================================== redis diff --git a/samples/DpsTTLCompositesSample/etc/no-sql-kv-store-servers.cfg b/samples/DpsTTLCompositesSample/etc/no-sql-kv-store-servers.cfg index de4e93f..dd148b6 100644 --- a/samples/DpsTTLCompositesSample/etc/no-sql-kv-store-servers.cfg +++ b/samples/DpsTTLCompositesSample/etc/no-sql-kv-store-servers.cfg @@ -177,13 +177,15 @@ # replica nodes (server name or IP address and a port number). Please refer to the commentary above # for the non-clustered Redis to understand the connection timeout and TLS usage. Both are supported in # the clister-mode Redis as well when configuring your key value store product as redis-cluster-plus-plus. +# If TLS for the Redis Cluster is enabled, three fully qualified file names for TLS certificate, TLS key and +# TLS CA certificate must be specified in the order shown below. # -# RedisServerNameOrIPAddress:port:RedisPassword:ConnectionTimeoutValue:use_tls:RedisClusterCACertificateFile +# RedisServerNameOrIPAddress:port:RedisClusterPassword:ConnectionTimeoutValue:use_tls:RedisClusterTlsCertificateFileName:RedisClusterTlsKeyileName:RedisClusterCACertificateFileName # (e-g:) -# Machine1:7001:MyRedisPassword:7:1:/home/streamsadmin/my-redis-ca.crt -# Machine2:7001:MyRedisPassword:7:1:/home/streamsadmin/my-redis-ca.crt -# Machine3:7001:MyRedisPassword:7:1:/home/streamsadmin/my-redis-ca.crt -# Machine4:7001:MyRedisPassword:7:1:/home/streamsadmin/my-redis-ca.crt +# Machine1:7001:MyRedisPassword:7:1:/home/streamsadmin/my-redis.crt:/home/streamsadmin/my-redis.key:/home/streamsadmin/my-redis-ca.crt +# Machine2:7001:MyRedisPassword:7:1:/home/streamsadmin/my-redis.crt:/home/streamsadmin/my-redis.key:/home/streamsadmin/my-redis-ca.crt +# Machine3:7001:MyRedisPassword:7:1:/home/streamsadmin/my-redis.crt:/home/streamsadmin/my-redis.key:/home/streamsadmin/my-redis-ca.crt +# Machine4:7001:MyRedisPassword:7:1:/home/streamsadmin/my-redis.crt:/home/streamsadmin/my-redis.key:/home/streamsadmin/my-redis-ca.crt # # =============================================================================== redis diff --git a/samples/advanced/01_using_no_sql_db_in_spl_custom_and_cpp_primitive_operators/etc/no-sql-kv-store-servers.cfg b/samples/advanced/01_using_no_sql_db_in_spl_custom_and_cpp_primitive_operators/etc/no-sql-kv-store-servers.cfg index de4e93f..dd148b6 100644 --- a/samples/advanced/01_using_no_sql_db_in_spl_custom_and_cpp_primitive_operators/etc/no-sql-kv-store-servers.cfg +++ b/samples/advanced/01_using_no_sql_db_in_spl_custom_and_cpp_primitive_operators/etc/no-sql-kv-store-servers.cfg @@ -177,13 +177,15 @@ # replica nodes (server name or IP address and a port number). Please refer to the commentary above # for the non-clustered Redis to understand the connection timeout and TLS usage. Both are supported in # the clister-mode Redis as well when configuring your key value store product as redis-cluster-plus-plus. +# If TLS for the Redis Cluster is enabled, three fully qualified file names for TLS certificate, TLS key and +# TLS CA certificate must be specified in the order shown below. # -# RedisServerNameOrIPAddress:port:RedisPassword:ConnectionTimeoutValue:use_tls:RedisClusterCACertificateFile +# RedisServerNameOrIPAddress:port:RedisClusterPassword:ConnectionTimeoutValue:use_tls:RedisClusterTlsCertificateFileName:RedisClusterTlsKeyileName:RedisClusterCACertificateFileName # (e-g:) -# Machine1:7001:MyRedisPassword:7:1:/home/streamsadmin/my-redis-ca.crt -# Machine2:7001:MyRedisPassword:7:1:/home/streamsadmin/my-redis-ca.crt -# Machine3:7001:MyRedisPassword:7:1:/home/streamsadmin/my-redis-ca.crt -# Machine4:7001:MyRedisPassword:7:1:/home/streamsadmin/my-redis-ca.crt +# Machine1:7001:MyRedisPassword:7:1:/home/streamsadmin/my-redis.crt:/home/streamsadmin/my-redis.key:/home/streamsadmin/my-redis-ca.crt +# Machine2:7001:MyRedisPassword:7:1:/home/streamsadmin/my-redis.crt:/home/streamsadmin/my-redis.key:/home/streamsadmin/my-redis-ca.crt +# Machine3:7001:MyRedisPassword:7:1:/home/streamsadmin/my-redis.crt:/home/streamsadmin/my-redis.key:/home/streamsadmin/my-redis-ca.crt +# Machine4:7001:MyRedisPassword:7:1:/home/streamsadmin/my-redis.crt:/home/streamsadmin/my-redis.key:/home/streamsadmin/my-redis-ca.crt # # =============================================================================== redis diff --git a/samples/advanced/02_using_no_sql_db_in_spl_custom_operators_and_a_cpp_native_function/etc/no-sql-kv-store-servers.cfg b/samples/advanced/02_using_no_sql_db_in_spl_custom_operators_and_a_cpp_native_function/etc/no-sql-kv-store-servers.cfg index de4e93f..dd148b6 100644 --- a/samples/advanced/02_using_no_sql_db_in_spl_custom_operators_and_a_cpp_native_function/etc/no-sql-kv-store-servers.cfg +++ b/samples/advanced/02_using_no_sql_db_in_spl_custom_operators_and_a_cpp_native_function/etc/no-sql-kv-store-servers.cfg @@ -177,13 +177,15 @@ # replica nodes (server name or IP address and a port number). Please refer to the commentary above # for the non-clustered Redis to understand the connection timeout and TLS usage. Both are supported in # the clister-mode Redis as well when configuring your key value store product as redis-cluster-plus-plus. +# If TLS for the Redis Cluster is enabled, three fully qualified file names for TLS certificate, TLS key and +# TLS CA certificate must be specified in the order shown below. # -# RedisServerNameOrIPAddress:port:RedisPassword:ConnectionTimeoutValue:use_tls:RedisClusterCACertificateFile +# RedisServerNameOrIPAddress:port:RedisClusterPassword:ConnectionTimeoutValue:use_tls:RedisClusterTlsCertificateFileName:RedisClusterTlsKeyileName:RedisClusterCACertificateFileName # (e-g:) -# Machine1:7001:MyRedisPassword:7:1:/home/streamsadmin/my-redis-ca.crt -# Machine2:7001:MyRedisPassword:7:1:/home/streamsadmin/my-redis-ca.crt -# Machine3:7001:MyRedisPassword:7:1:/home/streamsadmin/my-redis-ca.crt -# Machine4:7001:MyRedisPassword:7:1:/home/streamsadmin/my-redis-ca.crt +# Machine1:7001:MyRedisPassword:7:1:/home/streamsadmin/my-redis.crt:/home/streamsadmin/my-redis.key:/home/streamsadmin/my-redis-ca.crt +# Machine2:7001:MyRedisPassword:7:1:/home/streamsadmin/my-redis.crt:/home/streamsadmin/my-redis.key:/home/streamsadmin/my-redis-ca.crt +# Machine3:7001:MyRedisPassword:7:1:/home/streamsadmin/my-redis.crt:/home/streamsadmin/my-redis.key:/home/streamsadmin/my-redis-ca.crt +# Machine4:7001:MyRedisPassword:7:1:/home/streamsadmin/my-redis.crt:/home/streamsadmin/my-redis.key:/home/streamsadmin/my-redis-ca.crt # # =============================================================================== redis diff --git a/samples/advanced/03_using_no_sql_db_in_spl_custom_and_java_primitive_operators/etc/no-sql-kv-store-servers.cfg b/samples/advanced/03_using_no_sql_db_in_spl_custom_and_java_primitive_operators/etc/no-sql-kv-store-servers.cfg index de4e93f..dd148b6 100644 --- a/samples/advanced/03_using_no_sql_db_in_spl_custom_and_java_primitive_operators/etc/no-sql-kv-store-servers.cfg +++ b/samples/advanced/03_using_no_sql_db_in_spl_custom_and_java_primitive_operators/etc/no-sql-kv-store-servers.cfg @@ -177,13 +177,15 @@ # replica nodes (server name or IP address and a port number). Please refer to the commentary above # for the non-clustered Redis to understand the connection timeout and TLS usage. Both are supported in # the clister-mode Redis as well when configuring your key value store product as redis-cluster-plus-plus. +# If TLS for the Redis Cluster is enabled, three fully qualified file names for TLS certificate, TLS key and +# TLS CA certificate must be specified in the order shown below. # -# RedisServerNameOrIPAddress:port:RedisPassword:ConnectionTimeoutValue:use_tls:RedisClusterCACertificateFile +# RedisServerNameOrIPAddress:port:RedisClusterPassword:ConnectionTimeoutValue:use_tls:RedisClusterTlsCertificateFileName:RedisClusterTlsKeyileName:RedisClusterCACertificateFileName # (e-g:) -# Machine1:7001:MyRedisPassword:7:1:/home/streamsadmin/my-redis-ca.crt -# Machine2:7001:MyRedisPassword:7:1:/home/streamsadmin/my-redis-ca.crt -# Machine3:7001:MyRedisPassword:7:1:/home/streamsadmin/my-redis-ca.crt -# Machine4:7001:MyRedisPassword:7:1:/home/streamsadmin/my-redis-ca.crt +# Machine1:7001:MyRedisPassword:7:1:/home/streamsadmin/my-redis.crt:/home/streamsadmin/my-redis.key:/home/streamsadmin/my-redis-ca.crt +# Machine2:7001:MyRedisPassword:7:1:/home/streamsadmin/my-redis.crt:/home/streamsadmin/my-redis.key:/home/streamsadmin/my-redis-ca.crt +# Machine3:7001:MyRedisPassword:7:1:/home/streamsadmin/my-redis.crt:/home/streamsadmin/my-redis.key:/home/streamsadmin/my-redis-ca.crt +# Machine4:7001:MyRedisPassword:7:1:/home/streamsadmin/my-redis.crt:/home/streamsadmin/my-redis.key:/home/streamsadmin/my-redis-ca.crt # # =============================================================================== redis diff --git a/samples/advanced/04_all_dps_apis_at_work_in_spl/com.acme.test/Main.splmm b/samples/advanced/04_all_dps_apis_at_work_in_spl/com.acme.test/Main.splmm index 472d2b9..58ac81d 100644 --- a/samples/advanced/04_all_dps_apis_at_work_in_spl/com.acme.test/Main.splmm +++ b/samples/advanced/04_all_dps_apis_at_work_in_spl/com.acme.test/Main.splmm @@ -1,6 +1,6 @@ /* * Licensed Materials - Property of IBM - * Copyright IBM Corp. 2011, 2022 + * Copyright IBM Corp. 2011, 2023 * US Government Users Restricted Rights - Use, duplication or * disclosure restricted by GSA ADP Schedule Contract with IBM Corp. */ @@ -67,8 +67,7 @@ composite Main { // suitable one for such data stores based on eventual consistency put/get as well disk based put/get (HBase, Mongo, Couchbase). // Before running this test for those slow data stores, please refer to the commentary at the top of // this composite to reduce the test count in order to obtain reasonable results. - // You can uncomment the line below if you want to do that particular test. - ///// () as Sink2 = LockTest() {} + () as Sink2 = LockTest() {} // Do 100K writes and 100K reads and time it. // Please be aware this high volume test will finish in a decent time for memcached, Redis, and Aerospike. @@ -76,8 +75,7 @@ composite Main { // hence it may be a long wait before this test completes when you use those data stores. // Before running this test for those slow data stores, please refer to the commentary at the top of // this composite to reduce the test count in order to obtain reasonable results. - // You can uncomment the line below if you want to do that particular test. - ///// () as Sink3 = ReadWritePerformanceTest() {} + () as Sink3 = ReadWritePerformanceTest() {} // Users can directly execute native back-end data store commands that are of // fire and forget nature (one way calls and not request/response calls). @@ -2837,12 +2835,11 @@ composite StateUpdater() { } // End of while(++cnt <= 5) if (err != 0ul) { - printStringLn("Unable to do dpsGet. Stopping the lock Test"); - return; + printStringLn("Unable to do dpsGet. Yet, continuing the lock Test"); } - - assert(err==0ul); - + + // assert(err == 0ul); + printStringLn("val=" + (rstring)val + " as read from the store during lock test #" + (rstring)lockTestCnt); cnt = 0; if(lockTestCnt == 1) { @@ -2860,7 +2857,7 @@ composite StateUpdater() { dpsPut(s, "myKey", val+1, err); if (err != 0ul) { - printStringLn("Error in dpsPut. rc = " + + printStringLn("Error in dpsPut with a value of " + (rstring)(val+1) + ". rc = " + (rstring)dpsGetLastStoreErrorCode() + ", msg=" + dpsGetLastStoreErrorString() + ", attemptCnt = " + (rstring)cnt); checkAndReconnect(12, result); @@ -2881,19 +2878,14 @@ composite StateUpdater() { break; } } else { - printStringLn("dpsPut successful."); + printStringLn("dpsPut with a value of " + (rstring)(val+1) + " successful."); break; } } // End of while(++cnt <= 5) if (err != 0ul) { - printStringLn("Unable to do dpsPut. Stopping the lock Test"); - return; + printStringLn("Unable to do dpsPut. Yet, continuing the lock Test"); } - - assert(err==0ul); - - printStringLn("val=" + (rstring)val + " as read from the store during lock test #" + (rstring)lockTestCnt); // Only one of the two PEs currently in the race to get from and // put into that same store will win in obtaining a chance to diff --git a/samples/advanced/04_all_dps_apis_at_work_in_spl/etc/no-sql-kv-store-servers.cfg b/samples/advanced/04_all_dps_apis_at_work_in_spl/etc/no-sql-kv-store-servers.cfg index de4e93f..dd148b6 100644 --- a/samples/advanced/04_all_dps_apis_at_work_in_spl/etc/no-sql-kv-store-servers.cfg +++ b/samples/advanced/04_all_dps_apis_at_work_in_spl/etc/no-sql-kv-store-servers.cfg @@ -177,13 +177,15 @@ # replica nodes (server name or IP address and a port number). Please refer to the commentary above # for the non-clustered Redis to understand the connection timeout and TLS usage. Both are supported in # the clister-mode Redis as well when configuring your key value store product as redis-cluster-plus-plus. +# If TLS for the Redis Cluster is enabled, three fully qualified file names for TLS certificate, TLS key and +# TLS CA certificate must be specified in the order shown below. # -# RedisServerNameOrIPAddress:port:RedisPassword:ConnectionTimeoutValue:use_tls:RedisClusterCACertificateFile +# RedisServerNameOrIPAddress:port:RedisClusterPassword:ConnectionTimeoutValue:use_tls:RedisClusterTlsCertificateFileName:RedisClusterTlsKeyileName:RedisClusterCACertificateFileName # (e-g:) -# Machine1:7001:MyRedisPassword:7:1:/home/streamsadmin/my-redis-ca.crt -# Machine2:7001:MyRedisPassword:7:1:/home/streamsadmin/my-redis-ca.crt -# Machine3:7001:MyRedisPassword:7:1:/home/streamsadmin/my-redis-ca.crt -# Machine4:7001:MyRedisPassword:7:1:/home/streamsadmin/my-redis-ca.crt +# Machine1:7001:MyRedisPassword:7:1:/home/streamsadmin/my-redis.crt:/home/streamsadmin/my-redis.key:/home/streamsadmin/my-redis-ca.crt +# Machine2:7001:MyRedisPassword:7:1:/home/streamsadmin/my-redis.crt:/home/streamsadmin/my-redis.key:/home/streamsadmin/my-redis-ca.crt +# Machine3:7001:MyRedisPassword:7:1:/home/streamsadmin/my-redis.crt:/home/streamsadmin/my-redis.key:/home/streamsadmin/my-redis-ca.crt +# Machine4:7001:MyRedisPassword:7:1:/home/streamsadmin/my-redis.crt:/home/streamsadmin/my-redis.key:/home/streamsadmin/my-redis-ca.crt # # =============================================================================== redis