Skip to content

Commit

Permalink
TLS for Redis
Browse files Browse the repository at this point in the history
  • Loading branch information
Senthil Nathan committed Nov 23, 2020
1 parent 3548100 commit a966c3d
Show file tree
Hide file tree
Showing 49 changed files with 7,212 additions and 560 deletions.
28 changes: 18 additions & 10 deletions com.ibm.streamsx.dps/impl/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,11 @@ AERO_OBJ =
AERO_LIB =
COUCH_OBJ = CouchbaseDBLayer.o
REDIS_CLUSTER_OBJ = RedisClusterDBLayer.o
REDIS_CLUSTER_PLUS_PLUS_OBJ = RedisClusterPlusPlusDBLayer.o

REDIS_LIB = $(LIB)/libDPSRedis.so
REDIS_CLUSTER_LIB = $(LIB)/libDPSRedisCluster.so
REDIS_CLUSTER_PLUS_PLUS_LIB = $(LIB)/libDPSRedisClusterPlusPlus.so
COUCH_LIB = $(LIB)/libDPSCouchbase.so
HBASE_LIB = $(LIB)/libDPSHBase.so
CASSANDRA_LIB = $(LIB)/libDPSCassandra.so
Expand All @@ -34,6 +35,7 @@ MONGO_LIB = $(LIB)/libDPSMongo.so
MEMCACHED_LIB = $(LIB)/libDPSMemcached.so

HELPER_LIBS = $(REDIS_CLUSTER_LIB)
HELPER_LIBS += $(REDIS_CLUSTER_PLUS_PLUS_LIB)
HELPER_LIBS += $(COUCH_LIB)
HELPER_LIBS += $(REDIS_LIB)
HELPER_LIBS += $(HBASE_LIB)
Expand All @@ -58,7 +60,7 @@ SPL_COMPILE_OPTIONS = $(shell $(SPL_PKGCFG) --cflags $(SPL_PKG))
SPL_LINK_OPTIONS = $(shell $(SPL_PKGCFG) --libs $(SPL_PKG))

CPPFLAGS =
CPPFLAGS += -O3 -Wall -c -fmessage-length=0 -fPIC -D_REENTRANT
CPPFLAGS += -O3 -Wall -c -fmessage-length=0 -fPIC -D_REENTRANT
CPPFLAGS += -I include
CPPFLAGS += -I nl/include
CPPFLAGS += -isystem ext/include
Expand Down Expand Up @@ -91,7 +93,7 @@ all: check-streams $(DISTRIBUTED_PROCESS_STORE_LIB_STATIC) $(DISTRIBUTED_PROCESS
check-streams:
@echo "Checking to see if STREAMS_INSTALL is set..."
test -d $(STREAMS_INSTALL) || false

$(DISTRIBUTED_PROCESS_STORE_LIB): $(DISTRIBUTED_PROCESS_STORE_OBJS)
mkdir -p $(LIB)
$(CXX) $(LDFLAGS) -o $@ $^
Expand All @@ -108,40 +110,46 @@ $(DISTRIBUTED_PROCESS_STORE_LIB_STATIC): $(DISTRIBUTED_PROCESS_STORE_OBJS)
$(COUCH_LIB): $(COUCH_OBJ)
mkdir -p $(LIB)
$(CXX) $(LDFLAGS) -o $@ $^

$(REDIS_LIB): $(REDIS_OBJ)
mkdir -p $(LIB)
$(CXX) $(LDFLAGS) -o $@ $^

$(REDIS_CLUSTER_LIB): $(REDIS_CLUSTER_OBJ)
mkdir -p $(LIB)
$(CXX) $(LDFLAGS) -o $@ $^


$(REDIS_CLUSTER_PLUS_PLUS_LIB): CPPFLAGS+=-std=c++11
$(REDIS_CLUSTER_PLUS_PLUS_LIB): $(REDIS_CLUSTER_PLUS_PLUS_OBJ)
mkdir -p $(LIB)
$(CXX) $(LDFLAGS) -o $@ $^

$(HBASE_LIB): $(HBASE_OBJ)
mkdir -p $(LIB)
$(CXX) $(LDFLAGS) -o $@ $^

$(CASSANDRA_LIB): $(CASSANDRA_OBJ)
mkdir -p $(LIB)
$(CXX) $(LDFLAGS) -o $@ $^

$(CLOUDANT_LIB): $(CLOUDANT_OBJ)
mkdir -p $(LIB)
$(CXX) $(LDFLAGS) -o $@ $^

$(MONGO_LIB): $(MONGO_OBJ)
mkdir -p $(LIB)
$(CXX) $(LDFLAGS) -o $@ $^

$(MEMCACHED_LIB): $(MEMCACHED_OBJ)
mkdir -p $(LIB)
$(CXX) $(LDFLAGS) -o $@ $^

clean:
rm -f $(DISTRIBUTED_PROCESS_STORE_OBJS)
rm -f $(DISTRIBUTED_PROCESS_STORE_LIB)
rm -f $(REDIS_LIB) $(REDIS_OBJ)
rm -f $(REDIS_CLUSTER_LIB) $(REDIS_CLUSTER_OBJ)
rm -f $(REDIS_CLUSTER_PLUS_PLUS_LIB) $(REDIS_CLUSTER_PLUS_PLUS_OBJ)
rm -f $(MEMCACHED_LIB) $(MEMCACHED_OBJ)
rm -f $(CLOUDANT_LIB) $(CLOUDANT_OBJ)
rm -f $(CASSANDRA_LIB) $(CASSANDRA_OBJ)
Expand Down
7 changes: 6 additions & 1 deletion com.ibm.streamsx.dps/impl/include/DpsConstants.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/*
# Licensed Materials - Property of IBM
# Copyright IBM Corp. 2011, 2016
# Copyright IBM Corp. 2011, 2020
# US Government Users Restricted Rights - Use, duplication or
# disclosure restricted by GSA ADP Schedule Contract with
# IBM Corp.
Expand Down Expand Up @@ -93,6 +93,7 @@ interface with many different back-end in-memory stores.
#define COUCHBASE_NO_SQL_DB_NAME "couchbase"
#define AEROSPIKE_NO_SQL_DB_NAME "aerospike"
#define REDIS_CLUSTER_NO_SQL_DB_NAME "redis-cluster"
#define REDIS_CLUSTER_PLUS_PLUS_NO_SQL_DB_NAME "redis-cluster-plus-plus"
#define HTTP_GET "GET"
#define HTTP_PUT "PUT"
#define HTTP_POST "POST"
Expand Down Expand Up @@ -228,5 +229,9 @@ interface with many different back-end in-memory stores.
#define DL_LOCK_NOT_FOUND_ERROR 514
#define DL_LOCK_REMOVAL_ERROR 515
#define AEROSPIKE_GET_STORE_ID_ERROR 516
#define REDIS_PLUS_PLUS_NO_ERROR 0
#define REDIS_PLUS_PLUS_CONNECTION_ERROR 1
#define REDIS_PLUS_PLUS_REPLY_ERROR 2
#define REDIS_PLUS_PLUS_OTHER_ERROR 3

#endif /* DPS_CONSTANTS_H_ */
170 changes: 170 additions & 0 deletions com.ibm.streamsx.dps/impl/include/RedisClusterPlusPlusDBLayer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
/*
# Licensed Materials - Property of IBM
# Copyright IBM Corp. 2011, 2020
# US Government Users Restricted Rights - Use, duplication or
# disclosure restricted by GSA ADP Schedule Contract with
# IBM Corp.
*/
#ifndef REDIS_CLUSTER_PLUS_PLUS_DB_LAYER_H_
#define REDIS_CLUSTER_PLUS_PLUS_DB_LAYER_H_
/*
=====================================================================
Here is the copyright statement for our use of the hiredis APIs:
Hiredis was written by Salvatore Sanfilippo (antirez at gmail) and
Pieter Noordhuis (pcnoordhuis at gmail) and is released under the
BSD license.
Copyright (c) 2009-2011, Salvatore Sanfilippo <antirez at gmail dot com>
Copyright (c) 2010-2011, Pieter Noordhuis <pcnoordhuis at gmail dot com>
hiredis-cluster-plus-plus include files provide wrappers on top of the hiredis library.
This wrapper allows us to use the familiar hiredis APIs in the context of
a Redis cluster (Version 6 and higher) to provide HA facility for automatic
fail-over when a Redis instance or the entire machine crashes.
In addition, it provides the TLS/SSL support for the redis-cluster.
Please note that this hiredis-cluster-plus-plus wrapper supercedes the older
hiredis-cluster wrapper that we have in the DPS toolkit. If the Redis server
version is v5 and lower, one may continue to use the older hiredis-cluster in DPS.
If the Redis server version is v6 and higher, it is recommented to use the
hiredis-cluster-plus-plus in order to work with both non-TLS and TLS.
The redis-plus-plus wrapper carries the Apache 2.0 copyright as shown in the following line.
A permissive license whose main conditions require preservation of copyright and license notices.
Contributors provide an express grant of patent rights. Licensed works, modifications, and larger
works may be distributed under different terms and without source code.
=====================================================================
*/
#include "DBLayer.h"

#include <tr1/memory>
#include <set>
#include <vector>
#include <sw/redis++/redis_cluster.h>

using namespace sw::redis;

namespace com {
namespace ibm {
namespace streamsx {
namespace store {
namespace distributed
{
class RedisClusterPlusPlusDBLayer;

/// Class that implements the Iterator for Redis cluster
class RedisClusterPlusPlusDBLayerIterator : public DBLayer::Iterator
{
public:
uint64_t store;
std::string storeName;
std::vector<std::string> dataItemKeys;
uint32_t sizeOfDataItemKeysVector;
uint32_t currentIndex;
bool hasData;
RedisClusterPlusPlusDBLayer *redisClusterPlusPlusDBLayerPtr;

RedisClusterPlusPlusDBLayerIterator();
~RedisClusterPlusPlusDBLayerIterator();
bool getNext(uint64_t store, unsigned char * & keyData, uint32_t & keySize,
unsigned char * & valueData, uint32_t & valueSize, PersistenceError & dbError);
};

/// Class that implements the DBLayer for Redis Cluster Plus Plus
class RedisClusterPlusPlusDBLayer : public DBLayer
{
private:
bool readStoreInformation(std::string const & storeIdString, PersistenceError & dbError,
uint32_t & dataItemCnt, std::string & storeName,
std::string & keySplTypeName, std::string & valueSplTypeName);
bool acquireStoreLock(std::string const & storeIdString);
void releaseStoreLock(std::string const & storeIdString);
bool readLockInformation(std::string const & storeIdString, PersistenceError & dbError, uint32_t & lockUsageCnt,
int32_t & lockExpirationTime, pid_t & lockOwningPid, std::string & lockName);
bool updateLockInformation(std::string const & lockIdString, PersistenceError & lkError,
uint32_t const & lockUsageCnt, int32_t const & lockExpirationTime, pid_t const & lockOwningPid);
bool lockIdExistsOrNot(std::string lockIdString, PersistenceError & lkError);
bool acquireGeneralPurposeLock(std::string const & entityName);
void releaseGeneralPurposeLock(std::string const & entityName);
int32_t getRedisServerPartitionIndex(std::string const & key);

public:
RedisCluster *redis_cluster = NULL;

/// Constructor
RedisClusterPlusPlusDBLayer();

/// Destructor
~RedisClusterPlusPlusDBLayer();

// These are inherited from DBLayer, see DBLayer for descriptions
void connectToDatabase(std::set<std::string> const & dbServers, PersistenceError & dbError);

uint64_t createStore(std::string const & name,
std::string const & keySplTypeName,
std::string const & valueSplTypeName,
PersistenceError & dbError);
uint64_t createOrGetStore(std::string const & name,
std::string const & keySplTypeName,
std::string const & valueSplTypeName,
PersistenceError & dbError);
uint64_t findStore(std::string const & name,
PersistenceError & dbError);
bool removeStore(uint64_t store, PersistenceError & dbError);

bool put(uint64_t store, char const * keyData, uint32_t keySize,
unsigned char const * valueData, uint32_t valueSize, PersistenceError & dbError);
bool putSafe(uint64_t store, char const * keyData, uint32_t keySize,
unsigned char const * valueData, uint32_t valueSize, PersistenceError & dbError);
bool putTTL(char const * keyData, uint32_t keySize,
unsigned char const * valueData, uint32_t valueSize, uint32_t ttl, PersistenceError & dbError, bool encodeKey=true, bool encodeValue=true);
bool get(uint64_t store, char const * keyData, uint32_t keySize,
unsigned char * & valueData, uint32_t & valueSize,
PersistenceError & dbError);
bool getSafe(uint64_t store, char const * keyData, uint32_t keySize,
unsigned char * & valueData, uint32_t & valueSize,
PersistenceError & dbError);
bool getTTL(char const * keyData, uint32_t keySize,
unsigned char * & valueData, uint32_t & valueSize,
PersistenceError & dbError, bool encodeKey=true);
bool remove(uint64_t store, char const * keyData, uint32_t keySize, PersistenceError & dbError);
bool removeTTL(char const * keyData, uint32_t keySize, PersistenceError & dbError, bool encodeKey=true);
bool has(uint64_t store, char const * keyData, uint32_t keySize, PersistenceError & dbError);
bool hasTTL(char const * keyData, uint32_t keySize, PersistenceError & dbError, bool encodeKey=true);
void clear(uint64_t store, PersistenceError & dbError);
uint64_t size(uint64_t store, PersistenceError & dbError);
void base64_encode(std::string const & str, std::string & base64);
void base64_decode(std::string & base64, std::string & result);
bool isConnected();
bool reconnect(std::set<std::string> & dbServers, PersistenceError & dbError);

RedisClusterPlusPlusDBLayerIterator * newIterator(uint64_t store, PersistenceError & dbError);
void deleteIterator(uint64_t store, Iterator * iter, PersistenceError & dbError);
bool storeIdExistsOrNot(std::string storeIdString, PersistenceError & dbError);
bool getDataItemFromStore(std::string const & storeIdString,
std::string const & keyDataString, bool const & checkOnlyForDataItemExistence,
bool const & skipDataItemExistenceCheck, unsigned char * & valueData,
uint32_t & valueSize, PersistenceError & dbError);
std::string getStoreName(uint64_t store, PersistenceError & dbError);
std::string getSplTypeNameForKey(uint64_t store, PersistenceError & dbError);
std::string getSplTypeNameForValue(uint64_t store, PersistenceError & dbError);
std::string getNoSqlDbProductName(void);
void getDetailsAboutThisMachine(std::string & machineName, std::string & osVersion, std::string & cpuArchitecture);
bool runDataStoreCommand(std::string const & cmd, PersistenceError & dbError);
bool runDataStoreCommand(uint32_t const & cmdType, std::string const & httpVerb,
std::string const & baseUrl, std::string const & apiEndpoint, std::string const & queryParams,
std::string const & jsonRequest, std::string & jsonResponse, PersistenceError & dbError);
bool runDataStoreCommand(std::vector<std::string> const & cmdList, std::string & resultValue, PersistenceError & dbError);

// Lock related methods.
uint64_t createOrGetLock(std::string const & name, PersistenceError & lkError);
void releaseLock(uint64_t lock, PersistenceError & lkError);
bool acquireLock(uint64_t lock, double leaseTime, double maxWaitTimeToAcquireLock, PersistenceError & lkError);
bool removeLock(uint64_t lock, PersistenceError & lkError);
uint32_t getPidForLock(std::string const & name, PersistenceError & lkError);
void persist(PersistenceError & dbError);

};
} } } } }
#endif /* REDIS_CLUSTER_PLUS_PLUS_DB_LAYER_H_ */
3 changes: 2 additions & 1 deletion com.ibm.streamsx.dps/impl/include/RedisDBLayer.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/*
# Licensed Materials - Property of IBM
# Copyright IBM Corp. 2011, 2014
# Copyright IBM Corp. 2011, 2020
# US Government Users Restricted Rights - Use, duplication or
# disclosure restricted by GSA ADP Schedule Contract with
# IBM Corp.
Expand Down Expand Up @@ -49,6 +49,7 @@ SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#include "DBLayer.h"

#include "hiredis/hiredis.h"
#include "hiredis/hiredis_ssl.h"
#include <tr1/memory>
#include <set>
#include <vector>
Expand Down
41 changes: 39 additions & 2 deletions com.ibm.streamsx.dps/impl/src/DistributedProcessStore.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/*
# Licensed Materials - Property of IBM
# Copyright IBM Corp. 2011, 2014
# Copyright IBM Corp. 2011, 2020
# US Government Users Restricted Rights - Use, duplication or
# disclosure restricted by GSA ADP Schedule Contract with
# IBM Corp.
Expand Down Expand Up @@ -267,6 +267,7 @@ namespace distributed
void* handle1 = NULL;
void* handle2 = NULL;
void* handle3 = NULL;
void* handle4 = NULL;
bool libraryLoadingError = false;
std::string kvLibName = "";
std::string toolkitDir = ProcessingElement::pe().getToolkitDirectory("com.ibm.streamsx.dps") + "/impl/ext/lib" ;
Expand All @@ -278,9 +279,15 @@ namespace distributed
} else if (noSqlKvStoreProductName.compare("redis") == 0) {
handle1 = load_dependent_lib(toolkitDir, "libuv.so");
handle2 = load_dependent_lib(toolkitDir, "libhiredis.so");
// Oct/12/2020 to support "TLS for redis". If the user configures the DPS toolkit to
// use "TLS for redis", then this additional .so file will take care of that need.
// On the IBM Streams application Linux machines, it is a must to install opensssl and
// and openssl-devel RPM packages. The following libhiredis_ssl.so file has a dependency on the
// /lib64/libssl.so and /lib64/libcrypto.so libraries that are part of openssl.
handle3 = load_dependent_lib(toolkitDir, "libhiredis_ssl.so");
kvLibName= "libDPSRedis.so";

if (handle1 == NULL || handle2 == NULL) {
if (handle1 == NULL || handle2 == NULL || handle3 == NULL) {
libraryLoadingError = true;
}
} else if (noSqlKvStoreProductName.compare("cassandra") == 0) {
Expand Down Expand Up @@ -344,6 +351,21 @@ namespace distributed
if (handle1 == NULL || handle2 == NULL) {
libraryLoadingError = true;
}
} else if (noSqlKvStoreProductName.compare("redis-cluster-plus-plus") == 0) {
handle1 = load_dependent_lib(toolkitDir, "libuv.so");
handle2 = load_dependent_lib(toolkitDir,"libhiredis.so");
// Oct/12/2020 to support "TLS for redis". If the user configures the DPS toolkit to
// use "TLS for redis", then this additional .so file will take care of that need.
// On the IBM Streams application Linux machines, it is a must to install opensssl and
// and openssl-devel RPM packages. The following libhiredis_ssl.so file has a dependency on the
// /lib64/libssl.so and /lib64/libcrypto.so libraries that are part of openssl.
handle3 = load_dependent_lib(toolkitDir, "libhiredis_ssl.so");
handle4 = load_dependent_lib(toolkitDir, "libredis++.so");
kvLibName= "libDPSRedisClusterPlusPlus.so";

if (handle1 == NULL || handle2 == NULL || handle3 == NULL || handle4 == NULL) {
libraryLoadingError = true;
}
} else {
// Invalid no-sql store product name configured. Abort now.
// SPLAPPLOG is causing it to get stuck in RHEL6/CentOS6 (RHEL7/CentOS7 is fine) when the @catch annotation is used in the calling SPL code.
Expand Down Expand Up @@ -381,6 +403,11 @@ namespace distributed
handle3 = NULL;
}

if (handle4 != NULL) {
dlclose(handle4);
handle4 = NULL;
}

if (handle != NULL) {
dlclose(handle);
handle = NULL;
Expand Down Expand Up @@ -416,6 +443,11 @@ namespace distributed
handle3 = NULL;
}

if (handle4 != NULL) {
dlclose(handle4);
handle4 = NULL;
}

if (handle != NULL) {
dlclose(handle);
handle = NULL;
Expand Down Expand Up @@ -449,6 +481,11 @@ namespace distributed
handle3 = NULL;
}

if (handle4 != NULL) {
dlclose(handle4);
handle4 = NULL;
}

if (handle != NULL) {
dlclose(handle);
handle = NULL;
Expand Down
Loading

0 comments on commit a966c3d

Please sign in to comment.