Skip to content

Commit

Permalink
added RedisService
Browse files Browse the repository at this point in the history
Signed-off-by: munishchouhan <[email protected]>
  • Loading branch information
munishchouhan committed Nov 5, 2024
1 parent 481298b commit 3294a57
Show file tree
Hide file tree
Showing 3 changed files with 381 additions and 1 deletion.
50 changes: 49 additions & 1 deletion src/main/groovy/io/seqera/wave/redis/RedisFactory.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,16 @@ import groovy.util.logging.Slf4j
import io.micronaut.context.annotation.Factory
import io.micronaut.context.annotation.Requires
import io.micronaut.context.annotation.Value
import io.micronaut.core.annotation.Nullable
import jakarta.inject.Singleton
import redis.clients.jedis.DefaultJedisClientConfig
import redis.clients.jedis.HostAndPort
import redis.clients.jedis.JedisCluster
import redis.clients.jedis.JedisPool
import redis.clients.jedis.JedisPoolConfig
import redis.clients.jedis.exceptions.InvalidURIException
import redis.clients.jedis.util.JedisURIHelper

/**
* Redis connection pool factory
*
Expand All @@ -38,18 +45,59 @@ import redis.clients.jedis.JedisPoolConfig
class RedisFactory {

@Singleton
@Requires(property = "redis.mode", value = "standalone")
JedisPool createRedisPool(
@Value('${redis.uri}') String uri,
@Value('${redis.pool.minIdle:0}') int minIdle,
@Value('${redis.pool.maxIdle:10}') int maxIdle,
@Value('${redis.pool.maxTotal:50}') int maxTotal
) {
log.info "Using redis $uri as storage for rate limit - pool minIdle: ${minIdle}; maxIdle: ${maxIdle}; maxTotal: ${maxTotal}"
log.info "Using redis $uri as storage and cache - pool minIdle: ${minIdle}; maxIdle: ${maxIdle}; maxTotal: ${maxTotal}"
final config = new JedisPoolConfig()
config.setMinIdle(minIdle)
config.setMaxIdle(maxIdle)
config.setMaxTotal(maxTotal)
return new JedisPool(config, URI.create(uri))
}

@Singleton
@Requires(property = "redis.mode", value = "cluster")
JedisCluster createRedisCluster(
@Value('${redis.uris}') List<String> uris,
@Value('${redis.client.timeout:5000}') int timeout,
@Nullable @Value('${redis.user}') String user,
@Nullable @Value('${redis.password}') String password,
@Nullable @Value('${redis.db}') String db,
@Nullable @Value('${redis.ssl}') boolean ssl
) {
log.info "Using redis cluster $uris as storage and cache - timeout: ${timeout}ms"

final jedisClusterNodes = new HashSet<HostAndPort>()
for (def uri : uris){
if (!JedisURIHelper.isValid(URI.create(uri))) {
throw new InvalidURIException(String.format(
"Cannot open Redis connection due invalid URI. %s", uri.toString()))
}
jedisClusterNodes.add(HostAndPort.from(uri))
}

def clientConfig = DefaultJedisClientConfig.builder().connectionTimeoutMillis(timeout)
.socketTimeoutMillis(timeout)
.blockingSocketTimeoutMillis(timeout)
if ( user ){
clientConfig.user(user)
}
if ( password ){
clientConfig.password(password)
}
if ( db ){
clientConfig.database(db as int)
}
if ( ssl ){
clientConfig.ssl(true)
}

return new JedisCluster(jedisClusterNodes, clientConfig.build())
}

}
72 changes: 72 additions & 0 deletions src/main/groovy/io/seqera/wave/redis/RedisService.groovy
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Wave, containers provisioning service
* Copyright (c) 2023-2024, Seqera Labs
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/

package io.seqera.wave.redis

import java.util.concurrent.TimeoutException

import redis.clients.jedis.StreamEntryID
import redis.clients.jedis.Transaction
import redis.clients.jedis.params.SetParams
import redis.clients.jedis.params.XAutoClaimParams
import redis.clients.jedis.params.XReadGroupParams
import redis.clients.jedis.resps.ScanResult
import redis.clients.jedis.resps.StreamEntry
import redis.clients.jedis.resps.Tuple
/**
* Implements RedisService
*
* @author Munish Chouhan <[email protected]>
*/
interface RedisService {

long hincrBy(final String key, final String field, final long value)

Long hget(final String key, final String field)

ScanResult<Map. Entry<String, String>> hscan(final String key, final String pattern)

String set(final String key, final String value, final SetParams params)

Transaction multi() throws TimeoutException

long lpush(final String target, final String message)

String rpop(final String target)

String brpop( final double timeout, final String target)

String xgroupCreate(final String key, final String groupName, final StreamEntryID id, final boolean makeStream)

StreamEntryID xadd(final String key, final StreamEntryID id, final Map<String, String> hash)

Map.Entry<StreamEntryID, List<StreamEntry>> xautoclaim(String key, String group, String consumerName, long minIdleTime, StreamEntryID start, XAutoClaimParams params)

List<Map.Entry<String, List<StreamEntry>>> xreadGroup(final String groupName, final String consumer, final XReadGroupParams xReadGroupParams, final Map<String, StreamEntryID> streams)

long zadd(final String key, final double score, final String member)

Object eval(final String script, final int keyCount, final String... params)

List<Tuple> zrangeByScoreWithScores(final String key, final double min, final double max, final int offset, final int count)

long del(final String key)

String flushAll()

}
Loading

0 comments on commit 3294a57

Please sign in to comment.