Skip to content

Commit

Permalink
Changes done for v4.1.5.
Browse files Browse the repository at this point in the history
  • Loading branch information
Senthil Nathan committed Apr 18, 2022
1 parent 6053e90 commit 93cbc06
Show file tree
Hide file tree
Showing 26 changed files with 925 additions and 320 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -434,12 +434,16 @@ it does safety checks and is therefore slower.</description>
<prototype>public stateful void dpsEndIteration(uint64 store, uint64 iterator, mutable uint64 err)</prototype>
</function>
<function>
<description>This function can be called to get all the keys present in a given store.
@param store the handle of the store.
@param keys a user provided mutable list variable. This list must be suitable for storing the data type of the keys.
@param err Contains the error code. Will be '0' if no error occurs, and a non-zero value otherwise.
<description>This function can be called to get multiple keys present in a given store.
@param store The handle of the store.
@param keys User provided mutable list variable. This list must be suitable for storing multiple keys found in a given store and it must be made of a given store's key data type.
@param keyStartPosition User can indicate a start position from where keys should be fetched and returned. It must be greater than or equal to zero. If not, this API will return back with an empty list of keys.
@param numberOfKeysNeeded User can indicate the total number of keys to be returned as available from the given key start position. It must be greater than or equal to 0 and less than or equal to 50000. If it is set to 0, then all the available keys upto a maximum of 50000 keys from the given key start position will be returned.
@param keyExpression User can provide an expression made of the attributes from the key's data type. This expression will be evaluated in determining which matching keys to be returned. Due to very high logic complexity, this feature is not implemented at this time.
@param valueExpression User can provide an expression made of the attributes from the value's data type. This expression will be evaluated in determining which matching keys to be returned. Due to very high logic complexity, this feature is not implemented at this time.
@param err Contains the error code. Will be '0' if no error occurs, and a non-zero value otherwise.
</description>
<prototype>&lt;any T1> public stateful void dpsGetAllKeys(uint64 store, mutable list&lt;T1&gt; keys, mutable uint64 err)</prototype>
<prototype>&lt;any T1> public stateful void dpsGetKeys(uint64 store, mutable list&lt;T1&gt; keys, int32 keyStartPosition, int32 numberOfKeysNeeded, rstring keyExpression, rstring valueExpression, mutable uint64 err)</prototype>
</function>
<function>
<description> This function serializes all the key-value pairs in a given store id into a blob. The blob can be used to recreate all the key-value pairs into another store. This is a useful technique for copying an entire store into a different store. See `dpsDeserialize()` for a detailed example on how to use the serialization functions to copy a store.
Expand Down
2 changes: 1 addition & 1 deletion com.ibm.streamsx.dps/impl/include/CassandraDBLayer.h
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ namespace distributed
bool acquireLock(uint64_t lock, double leaseTime, double maxWaitTimeToAcquireLock, PersistenceError & lkError);
bool removeLock(uint64_t lock, PersistenceError & lkError);
uint32_t getPidForLock(std::string const & name, PersistenceError & lkError);
void getAllKeys(uint64_t store, std::vector<unsigned char *> & keysBuffer, std::vector<uint32_t> & keysSize, PersistenceError & dbError);
void getKeys(uint64_t store, std::vector<unsigned char *> & keysBuffer, std::vector<uint32_t> & keysSize, int32_t keyStartPosition, int32_t numberOfKeysNeeded, PersistenceError & dbError);

};
} } } } }
Expand Down
2 changes: 1 addition & 1 deletion com.ibm.streamsx.dps/impl/include/CloudantDBLayer.h
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ namespace distributed
bool acquireLock(uint64_t lock, double leaseTime, double maxWaitTimeToAcquireLock, PersistenceError & lkError);
bool removeLock(uint64_t lock, PersistenceError & lkError);
uint32_t getPidForLock(std::string const & name, PersistenceError & lkError);
void getAllKeys(uint64_t store, std::vector<unsigned char *> & keysBuffer, std::vector<uint32_t> & keysSize, PersistenceError & dbError);
void getKeys(uint64_t store, std::vector<unsigned char *> & keysBuffer, std::vector<uint32_t> & keysSize, int32_t keyStartPosition, int32_t numberOfKeysNeeded, PersistenceError & dbError);

};
} } } } }
Expand Down
2 changes: 1 addition & 1 deletion com.ibm.streamsx.dps/impl/include/CouchbaseDBLayer.h
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ namespace distributed
bool acquireLock(uint64_t lock, double leaseTime, double maxWaitTimeToAcquireLock, PersistenceError & lkError);
bool removeLock(uint64_t lock, PersistenceError & lkError);
uint32_t getPidForLock(std::string const & name, PersistenceError & lkError);
void getAllKeys(uint64_t store, std::vector<unsigned char *> & keysBuffer, std::vector<uint32_t> & keysSize, PersistenceError & dbError);
void getKeys(uint64_t store, std::vector<unsigned char *> & keysBuffer, std::vector<uint32_t> & keysSize, int32_t keyStartPosition, int32_t numberOfKeysNeeded, PersistenceError & dbError);

};
} } } } }
Expand Down
6 changes: 3 additions & 3 deletions com.ibm.streamsx.dps/impl/include/DBLayer.h
Original file line number Diff line number Diff line change
Expand Up @@ -285,9 +285,9 @@ namespace store {
/// @return true if connection is active or false if connection is inactive.
virtual bool reconnect(std::set<std::string> & dbServers, PersistenceError & dbError) = 0;

/// Get all the keys present in a given store.
/// @return a list containing all the keys of a given data type.
virtual void getAllKeys(uint64_t store, std::vector<unsigned char *> & keysBuffer, std::vector<uint32_t> & keysSize, PersistenceError & dbError) = 0;
/// Get multiple keys present in a given store.
/// @return a list containing multiple keys of a given data type.
virtual void getKeys(uint64_t store, std::vector<unsigned char *> & keysBuffer, std::vector<uint32_t> & keysSize, int32_t keyStartPosition, int32_t numberOfKeysNeeded, PersistenceError & dbError) = 0;

/// A store iterator
class Iterator
Expand Down
43 changes: 27 additions & 16 deletions com.ibm.streamsx.dps/impl/include/DistributedProcessStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -218,12 +218,17 @@ namespace distributed
/// @param err PersistentStore error code
void endIteration(SPL::uint64 store, SPL::uint64 iterator, SPL::uint64 & err);

/// Get all the keys in a given store.
/// @param store store handle
/// @param List (vector) of a specific key type
/// @param err store error code
template<class T1>
void getAllKeysHelper(SPL::uint64 store, SPL::list<T1> & keys, SPL::uint64 & err);
/// Get multiple keys present in a given store.
/// @param store The handle of the store.
/// @param keys User provided mutable list variable. This list must be suitable for storing multiple keys found in a given store and it must be made of a given store's key data type.
/// @param keyStartPosition User can indicate a start position from where keys should be fetched and returned. It must be greater than or equal to zero. If not, this API will return back with an empty list of keys.
/// @param numberOfKeysNeeded User can indicate the total number of keys to be returned as available from the given key start position. It must be greater than or equal to 0 and less than or equal to 50000. If it is set to 0, then all the available keys upto a maximum of 50000 keys from the given key start position will be returned.
/// @param keyExpression User can provide an expression made of the attributes from the key's data type. This expression will be evaluated in determining which matching keys to be returned. [This feature is not implemented at this time.]
/// @param valueExpression User can provide an expression made of the attributes from the value's data type. This expression will be evaluated in determining which matching keys to be returned. [This feature is not implemented at this time.]
/// @param err Contains the error code. Will be '0' if no error occurs, and a non-zero value otherwise.
///
template<class T1>
void getKeys(SPL::uint64 store, SPL::list<T1> & keys, SPL::int32 const & keyStartPosition, SPL::int32 const & numberOfKeysNeeded, SPL::rstring const & keyExpression, SPL::rstring const & valueExpression, SPL::uint64 & err);

/// Serialize the items from the serialized store
/// @param store store handle
Expand Down Expand Up @@ -865,17 +870,24 @@ namespace distributed
return res;
}

/// Get all the keys in a given store.
/// @param store store handle
/// @param List (vector) of a specific key type
/// @param err store error code
/// Get multiple keys present in a given store.
/// @param store The handle of the store.
/// @param keys User provided mutable list variable. This list must be suitable for storing multiple keys found in a given store and it must be made of a given store's key data type.
/// @param keyStartPosition User can indicate a start position from where keys should be fetched and returned. It must be greater than or equal to zero. If not, this API will return back with an empty list of keys.
/// @param numberOfKeysNeeded User can indicate the total number of keys to be returned as available from the given key start position. It must be greater than or equal to 0 and less than or equal to 50000. If it is set to 0, then all the available keys upto a maximum of 50000 keys from the given key start position will be returned.
/// @param keyExpression User can provide an expression made of the attributes from the key's data type. This expression will be evaluated in determining which matching keys to be returned. [This feature is not implemented at this time.]
/// @param valueExpression User can provide an expression made of the attributes from the value's data type. This expression will be evaluated in determining which matching keys to be returned. [This feature is not implemented at this time.]
/// @param err Contains the error code. Will be '0' if no error occurs, and a non-zero value otherwise.
///
template<class T1>
void DistributedProcessStore::getAllKeysHelper(SPL::uint64 store, SPL::list<T1> & keys, SPL::uint64 & err) {
void DistributedProcessStore::getKeys(SPL::uint64 store, SPL::list<T1> & keys, SPL::int32 const & keyStartPosition, SPL::int32 const & numberOfKeysNeeded, SPL::rstring const & keyExpression, SPL::rstring const & valueExpression, SPL::uint64 & err) {
dbError_->reset();
std::vector<unsigned char *> keysBuffer;
std::vector<uint32_t> keysSize;
// Call the underlying store implementation function to get all the keys in a given store.
db_->getAllKeys(store, keysBuffer, keysSize, *dbError_);
// Clear the user provided list now.
keys.clear();
// Call the underlying store implementation function to get multiple keys in a given store.
db_->getKeys(store, keysBuffer, keysSize, keyStartPosition, numberOfKeysNeeded, *dbError_);
err = dbError_->getErrorCode();

if(err != 0) {
Expand All @@ -892,11 +904,10 @@ namespace distributed
return;
}

// We got all the keys.
// We got multiple keys.
// Let us convert it to the proper key type and store them in
// the user provided list (vector).
keys.clear();

//
// Populate the user provided list (vector) with the store keys.
for (unsigned int i = 0; i < keysBuffer.size(); i++) {
SPL::NativeByteBuffer nbf_key(keysBuffer.at(i), keysSize.at(i));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -300,14 +300,19 @@ namespace distributed
return DistributedProcessStore::getGlobalStore().endIteration(store, iterator, err);
}

/// Get all the keys in a given store.
/// @param store store handle
/// @param List (vector) of a specific key type
/// @param err store error code
template<class T1>
void dpsGetAllKeys(SPL::uint64 store, SPL::list<T1> & keys, SPL::uint64 & err)
/// Get multiple keys present in a given store.
/// @param store The handle of the store.
/// @param keys User provided mutable list variable. This list must be suitable for storing multiple keys found in a given store and it must be made of a given store's key data type.
/// @param keyStartPosition User can indicate a start position from where keys should be fetched and returned. It must be greater than or equal to zero. If not, this API will return back with an empty list of keys.
/// @param numberOfKeysNeeded User can indicate the total number of keys to be returned as available from the given key start position. It must be greater than or equal to 0 and less than or equal to 50000. If it is set to 0, then all the available keys upto a maximum of 50000 keys from the given key start position will be returned.
/// @param keyExpression User can provide an expression made of the attributes from the key's data type. This expression will be evaluated in determining which matching keys to be returned. [This feature is not implemented at this time.]
/// @param valueExpression User can provide an expression made of the attributes from the value's data type. This expression will be evaluated in determining which matching keys to be returned. [This feature is not implemented at this time.]
/// @param err Contains the error code. Will be '0' if no error occurs, and a non-zero value otherwise.
///
template<class T1>
void dpsGetKeys(SPL::uint64 store, SPL::list<T1> & keys, SPL::int32 const & keyStartPosition, SPL::int32 const & numberOfKeysNeeded, SPL::rstring const & keyExpression, SPL::rstring const & valueExpression, SPL::uint64 & err)
{
return DistributedProcessStore::getGlobalStore().getAllKeysHelper(store, keys, err);
return DistributedProcessStore::getGlobalStore().getKeys(store, keys, keyStartPosition, numberOfKeysNeeded, keyExpression, valueExpression, err);
}

/// Serialize the items from the serialized store
Expand Down
12 changes: 9 additions & 3 deletions com.ibm.streamsx.dps/impl/include/DpsConstants.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@ interface with many different back-end in-memory stores.
#define DPS_STORE_ID_TRACKER_SET "dps_store_id_tracker" // Used in Aerospike
#define DPS_STORE_NAME_TYPE "0"
#define DPS_STORE_INFO_TYPE "1" // Used in memcached
#define DPS_STORE_CONTENTS_HASH_TYPE "1" // Used in Redis
#define DPS_STORE_CATALOG_TYPE "2" // Used only in memcached
#define DPS_STORE_DATA_ITEM_TYPE "3"
#define DPS_STORE_CONTENTS_HASH_TYPE "1" // Used in Redis
#define DPS_STORE_CATALOG_TYPE "2" // Used only in memcached
#define DPS_STORE_DATA_ITEM_TYPE "3"
#define DPS_STORE_ORDERED_KEYS_SET_TYPE "101" // Used in Redis
#define DPS_STORE_LOCK_TYPE "4"
#define DL_LOCK_NAME_TYPE "5"
#define DL_LOCK_INFO_TYPE "6"
Expand Down Expand Up @@ -77,6 +78,9 @@ interface with many different back-end in-memory stores.
#define REDIS_HLEN_CMD "hlen "
#define REDIS_HKEYS_CMD "hkeys "
#define REDIS_AUTH_CMD "auth "
#define REDIS_ZADD_CMD "zadd "
#define REDIS_ZREM_CMD "zrem "
#define REDIS_ZRANGE_CMD "zrange "
#define CASSANDRA_DPS_KEYSPACE "com_ibm_streamsx_dps"
#define CASSANDRA_DPS_MAIN_TABLE "t1"
#define HBASE_DPS_MAIN_TABLE "dps_t1"
Expand Down Expand Up @@ -212,6 +216,8 @@ interface with many different back-end in-memory stores.
#define DPS_STORE_FATAL_ERROR 156
#define DPS_STORE_UNKNOWN_STATE_ERROR 157
#define DPS_AUTHENTICATION_ERROR 158
#define DPS_NEGATIVE_KEY_START_POS_ERROR 159
#define DPS_INVALID_NUM_KEYS_NEEDED_ERROR 160

#define DL_CONNECTION_ERROR 501
#define DL_GET_LOCK_ID_ERROR 502
Expand Down
2 changes: 1 addition & 1 deletion com.ibm.streamsx.dps/impl/include/HBaseDBLayer.h
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ namespace distributed
bool acquireLock(uint64_t lock, double leaseTime, double maxWaitTimeToAcquireLock, PersistenceError & lkError);
bool removeLock(uint64_t lock, PersistenceError & lkError);
uint32_t getPidForLock(std::string const & name, PersistenceError & lkError);
void getAllKeys(uint64_t store, std::vector<unsigned char *> & keysBuffer, std::vector<uint32_t> & keysSize, PersistenceError & dbError);
void getKeys(uint64_t store, std::vector<unsigned char *> & keysBuffer, std::vector<uint32_t> & keysSize, int32_t keyStartPosition, int32_t numberOfKeysNeeded, PersistenceError & dbError);

};
} } } } }
Expand Down
2 changes: 1 addition & 1 deletion com.ibm.streamsx.dps/impl/include/MemcachedDBLayer.h
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ namespace distributed
bool acquireLock(uint64_t lock, double leaseTime, double maxWaitTimeToAcquireLock, PersistenceError & lkError);
bool removeLock(uint64_t lock, PersistenceError & lkError);
uint32_t getPidForLock(std::string const & name, PersistenceError & lkError);
void getAllKeys(uint64_t store, std::vector<unsigned char *> & keysBuffer, std::vector<uint32_t> & keysSize, PersistenceError & dbError);
void getKeys(uint64_t store, std::vector<unsigned char *> & keysBuffer, std::vector<uint32_t> & keysSize, int32_t keyStartPosition, int32_t numberOfKeysNeeded, PersistenceError & dbError);

};
} } } } }
Expand Down
2 changes: 1 addition & 1 deletion com.ibm.streamsx.dps/impl/include/MongoDBLayer.h
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ namespace distributed
bool acquireLock(uint64_t lock, double leaseTime, double maxWaitTimeToAcquireLock, PersistenceError & lkError);
bool removeLock(uint64_t lock, PersistenceError & lkError);
uint32_t getPidForLock(std::string const & name, PersistenceError & lkError);
void getAllKeys(uint64_t store, std::vector<unsigned char *> & keysBuffer, std::vector<uint32_t> & keysSize, PersistenceError & dbError);
void getKeys(uint64_t store, std::vector<unsigned char *> & keysBuffer, std::vector<uint32_t> & keysSize, int32_t keyStartPosition, int32_t numberOfKeysNeeded, PersistenceError & dbError);

};
} } } } }
Expand Down
2 changes: 1 addition & 1 deletion com.ibm.streamsx.dps/impl/include/RedisClusterDBLayer.h
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ namespace distributed
std::string const & baseUrl, std::string const & apiEndpoint, std::string const & queryParams,
std::string const & jsonRequest, std::string & jsonResponse, PersistenceError & dbError);
bool runDataStoreCommand(std::vector<std::string> const & cmdList, std::string & resultValue, PersistenceError & dbError);
void getAllKeys(uint64_t store, std::vector<unsigned char *> & keysBuffer, std::vector<uint32_t> & keysSize, PersistenceError & dbError);
void getKeys(uint64_t store, std::vector<unsigned char *> & keysBuffer, std::vector<uint32_t> & keysSize, int32_t keyStartPosition, int32_t numberOfKeysNeeded, PersistenceError & dbError);

// Lock related methods.
uint64_t createOrGetLock(std::string const & name, PersistenceError & lkError);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ namespace distributed
std::string const & baseUrl, std::string const & apiEndpoint, std::string const & queryParams,
std::string const & jsonRequest, std::string & jsonResponse, PersistenceError & dbError);
bool runDataStoreCommand(std::vector<std::string> const & cmdList, std::string & resultValue, PersistenceError & dbError);
void getAllKeys(uint64_t store, std::vector<unsigned char *> & keysBuffer, std::vector<uint32_t> & keysSize, PersistenceError & dbError);
void getKeys(uint64_t store, std::vector<unsigned char *> & keysBuffer, std::vector<uint32_t> & keysSize, int32_t keyStartPosition, int32_t numberOfKeysNeeded, PersistenceError & dbError);

// Lock related methods.
uint64_t createOrGetLock(std::string const & name, PersistenceError & lkError);
Expand Down
4 changes: 2 additions & 2 deletions com.ibm.streamsx.dps/impl/include/RedisDBLayer.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ namespace distributed

public:
int32_t redisPartitionCnt;

/// Constructor
RedisDBLayer();

Expand Down Expand Up @@ -175,7 +175,7 @@ namespace distributed
std::string const & baseUrl, std::string const & apiEndpoint, std::string const & queryParams,
std::string const & jsonRequest, std::string & jsonResponse, PersistenceError & dbError);
bool runDataStoreCommand(std::vector<std::string> const & cmdList, std::string & resultValue, PersistenceError & dbError);
void getAllKeys(uint64_t store, std::vector<unsigned char *> & keysBuffer, std::vector<uint32_t> & keysSize, PersistenceError & dbError);
void getKeys(uint64_t store, std::vector<unsigned char *> & keysBuffer, std::vector<uint32_t> & keysSize, int32_t keyStartPosition, int32_t numberOfKeysNeeded, PersistenceError & dbError);

// Lock related methods.
uint64_t createOrGetLock(std::string const & name, PersistenceError & lkError);
Expand Down
Loading

0 comments on commit 93cbc06

Please sign in to comment.