Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions apollo-router/src/cache/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -512,10 +512,10 @@ mod tests {
storage
.insert(test_key.clone(), test_value.clone(), None)
.await;
let retrieved: Option<RedisValue<TestValue>> = storage.get(test_key.clone()).await;
let retrieved: Result<RedisValue<TestValue>, _> = storage.get(test_key.clone()).await;

// Verify the mock actually worked
assert!(retrieved.is_some(), "Should have retrieved value from mock");
assert!(retrieved.is_ok(), "Should have retrieved value from mock");
assert_eq!(retrieved.unwrap().0.data, "test_value");

// Verify Redis connection metrics are emitted.
Expand Down
198 changes: 120 additions & 78 deletions apollo-router/src/cache/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@ use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering;
use std::time::Duration;

use fred::clients::Client;
use fred::clients::Pipeline;
use fred::interfaces::EventInterface;
#[cfg(test)]
use fred::mocks::Mocks;
use fred::prelude::Client as RedisClient;
use fred::prelude::ClientLike;
use fred::prelude::Error as RedisError;
use fred::prelude::ErrorKind as RedisErrorKind;
Expand All @@ -28,7 +29,6 @@ use fred::types::config::TlsConfig;
use fred::types::config::TlsHostMapping;
use fred::types::config::UnresponsiveConfig;
use fred::types::scan::ScanResult;
use futures::FutureExt;
use futures::Stream;
use futures::future::join_all;
use tokio::sync::broadcast::error::RecvError;
Expand Down Expand Up @@ -273,7 +273,7 @@ impl RedisCacheStorage {

Self::create_client(
client_config,
config.timeout.unwrap_or(Duration::from_millis(500)),
config.timeout,
config.pool_size as usize,
config.namespace,
config.ttl,
Expand Down Expand Up @@ -321,6 +321,7 @@ impl RedisCacheStorage {
let pooled_client = Builder::from_config(client_config)
.with_connection_config(|config| {
config.internal_command_timeout = DEFAULT_INTERNAL_REDIS_TIMEOUT;
config.max_command_buffer_len = 10_000;
config.reconnect_on_auth_error = true;
config.tcp = TcpConfig {
#[cfg(target_os = "linux")]
Expand Down Expand Up @@ -543,6 +544,10 @@ impl RedisCacheStorage {
self.ttl = ttl;
}

fn pipeline(&self) -> Pipeline<Client> {
self.inner.next().pipeline()
}

fn make_key<K: KeyType>(&self, key: RedisKey<K>) -> String {
match &self.namespace {
Some(namespace) => format!("{namespace}:{key}"),
Expand All @@ -553,61 +558,52 @@ impl RedisCacheStorage {
pub(crate) async fn get<K: KeyType, V: ValueType>(
&self,
key: RedisKey<K>,
) -> Option<RedisValue<V>> {
) -> Result<RedisValue<V>, RedisError> {
let key = self.make_key(key);
match self.ttl {
Some(ttl) if self.reset_ttl => {
let pipeline: fred::clients::Pipeline<RedisClient> = self.inner.next().pipeline();
let key = self.make_key(key);
let res = pipeline
.get::<fred::types::Value, _>(&key)
let pipeline = self.pipeline();
let _: () = pipeline
.get(&key)
.await
.inspect_err(|e| self.record_error(e))
.ok()?;
if !res.is_queued() {
tracing::error!("could not queue GET command");
return None;
}
let res: fred::types::Value = pipeline
.inspect_err(|e| self.record_error(e))?;
let _: () = pipeline
.expire(&key, ttl.as_secs() as i64, None)
.await
.inspect_err(|e| self.record_error(e))
.ok()?;
if !res.is_queued() {
tracing::error!("could not queue EXPIRE command");
return None;
}
.inspect_err(|e| self.record_error(e))?;

let (first, _): (Option<RedisValue<V>>, bool) = pipeline
.all()
.await
.inspect_err(|e| self.record_error(e))
.ok()?;
first
let (value, _timeout_set): (RedisValue<V>, bool) =
pipeline.all().await.inspect_err(|e| self.record_error(e))?;
Ok(value)
}
_ => self
.inner
.get::<RedisValue<V>, _>(self.make_key(key))
.get(key)
.await
.inspect_err(|e| self.record_error(e))
.ok(),
.inspect_err(|e| self.record_error(e)),
}
}

pub(crate) async fn get_multiple<K: KeyType, V: ValueType>(
&self,
mut keys: Vec<RedisKey<K>>,
) -> Option<Vec<Option<RedisValue<V>>>> {
) -> Vec<Option<RedisValue<V>>> {
// NB: MGET is different from GET in that it returns `Option`s rather than `Result`s.
// > For every key that does not hold a string value or does not exist, the special value
// nil is returned. Because of this, the operation never fails.
// - https://redis.io/docs/latest/commands/mget/

tracing::trace!("getting multiple values from redis: {:?}", keys);

if keys.len() == 1 {
let key = self.make_key(keys.remove(0));
let res = self
.inner
.get::<RedisValue<V>, _>(self.make_key(keys.remove(0)))
.get(key)
.await
.inspect_err(|e| self.record_error(e))
.ok();

Some(vec![res])
vec![res]
} else if self.is_cluster {
// when using a cluster of redis nodes, the keys are hashed, and the hash number indicates which
// node will store it. So first we have to group the keys by hash, because we cannot do a MGET
Expand All @@ -623,41 +619,43 @@ impl RedisCacheStorage {
}

// then we query all the key groups at the same time
let results = futures::future::join_all(h.into_iter().map(|(_, (indexes, keys))| {
self.inner
.mget(keys)
.map(|values: Result<Vec<Option<RedisValue<V>>>, RedisError>| (indexes, values))
}))
.await;
let mut tasks = Vec::new();
for (_shard, (indexes, keys)) in h {
let client = self.inner.next();
tasks.push(async move {
let result: Result<Vec<Option<RedisValue<V>>>, _> = client.mget(keys).await;
(indexes, result)
});
}

// then we have to assemble the results, by making sure that the values are in the same order as
// the keys argument's order
let mut res = Vec::with_capacity(len);
for (indexes, result) in results.into_iter() {
match result {
Err(e) => {
self.record_error(&e);
return None;
}
let mut result = vec![None; len];
for (indexes, result_value) in join_all(tasks).await.into_iter() {
match result_value {
Ok(values) => {
for (index, value) in indexes.into_iter().zip(values.into_iter()) {
res.push((index, value));
result[index] = value;
}
}
Err(e) => {
self.record_error(&e);
}
}
}
res.sort_by(|(i, _), (j, _)| i.cmp(j));
Some(res.into_iter().map(|(_, v)| v).collect())

result
} else {
let len = keys.len();
let keys = keys
.into_iter()
.map(|k| self.make_key(k))
.collect::<Vec<_>>();
self.inner
.mget(
keys.into_iter()
.map(|k| self.make_key(k))
.collect::<Vec<_>>(),
)
.mget(keys)
.await
.inspect_err(|e| self.record_error(e))
.ok()
.unwrap_or_else(|_| vec![None; len])
}
}

Expand Down Expand Up @@ -696,7 +694,7 @@ impl RedisCacheStorage {

// NB: if we were using MSET here, we'd need to split the keys by hash slot. however, fred
// seems to split the pipeline by hash slot in the background.
let pipeline = self.inner.next().pipeline();
let pipeline = self.pipeline();
for (key, value) in data {
let key = self.make_key(key.clone());
let _ = pipeline
Expand All @@ -716,7 +714,10 @@ impl RedisCacheStorage {

/// Delete keys *without* adding the `namespace` prefix because `keys` is from
/// `scan_with_namespaced_results` and already includes it.
pub(crate) async fn delete_from_scan_result(&self, keys: Vec<fred::types::Key>) -> Option<u32> {
pub(crate) async fn delete_from_scan_result(
&self,
keys: Vec<fred::types::Key>,
) -> Result<u32, RedisError> {
let mut h: HashMap<u16, Vec<fred::types::Key>> = HashMap::new();
for key in keys.into_iter() {
let hash = ClusterRouting::hash_key(key.as_bytes());
Expand All @@ -726,19 +727,15 @@ impl RedisCacheStorage {

// then we query all the key groups at the same time
let results: Vec<Result<u32, RedisError>> =
futures::future::join_all(h.into_values().map(|keys| self.inner.del(keys))).await;
let mut total = 0u32;

for res in results {
match res {
Ok(res) => total += res,
Err(e) => {
self.record_error(&e);
}
}
join_all(h.into_values().map(|keys| self.inner.del(keys))).await;

let mut total = 0;
for result in results {
let count = result.inspect_err(|e| self.record_error(e))?;
total += count;
}

Some(total)
Ok(total)
}

/// The keys returned in `ScanResult` do include the prefix from `namespace` configuration.
Expand Down Expand Up @@ -891,7 +888,7 @@ mod test {
async fn test_redis_storage_avoids_common_cross_slot_errors() -> Result<(), BoxError> {
let config_json = json!({
"urls": ["redis-cluster://localhost:7000"],
"namespace": "test_redis_cluster",
"namespace": "test_redis_storage_avoids_common_cross_slot_errors",
"required_to_start": true,
"ttl": "60s"
});
Expand Down Expand Up @@ -934,23 +931,68 @@ mod test {
// make a `get` call for each key and ensure that it has the expected value. this tests both
// the `get` and `insert_multiple` functions
for key in &keys {
let value: RedisValue<usize> = storage
.get(key.clone())
.await
.ok_or("unable to get value")?;
let value: RedisValue<usize> = storage.get(key.clone()).await?;
assert_eq!(value.0, expected_value);
}

// test the `mget` functionality
let values = storage
.get_multiple(keys)
.await
.ok_or("unable to get_multiple")?;
let values = storage.get_multiple(keys).await;
for value in values {
let value: RedisValue<usize> = value.ok_or("missing value")?;
assert_eq!(value.0, expected_value);
}

Ok(())
}

/// Test that `get_multiple` returns items in the correct order.
#[cfg(all(
test,
any(not(feature = "ci"), all(target_arch = "x86_64", target_os = "linux"))
))]
#[tokio::test]
async fn test_get_multiple_is_ordered() -> Result<(), BoxError> {
let config_json = json!({
"urls": ["redis://localhost:6379"],
"namespace": "test_get_multiple_is_ordered",
"required_to_start": true,
"ttl": "60s"
});
let config = serde_json::from_value(config_json).unwrap();
let storage = super::RedisCacheStorage::new(config, "test_get_multiple_is_ordered").await?;

let data = [("a", "1"), ("b", "2"), ("c", "3")]
.map(|(k, v)| (RedisKey(k.to_string()), RedisValue(v.to_string())));
storage.insert_multiple(&data, None).await;

// check different orders of fetches to make everything is ordered correctly, including
// when some values are none
let test_cases = vec![
(vec!["a", "b", "c"], vec![Some("1"), Some("2"), Some("3")]),
(vec!["c", "b", "a"], vec![Some("3"), Some("2"), Some("1")]),
(vec!["d", "b", "c"], vec![None, Some("2"), Some("3")]),
(
vec!["d", "3", "s", "b", "s", "1", "c", "Y"],
vec![None, None, None, Some("2"), None, None, Some("3"), None],
),
];

for (keys, expected_values) in test_cases {
let keys: Vec<RedisKey<_>> = keys
.into_iter()
.map(|key| RedisKey(key.to_string()))
.collect();
let expected_values: Vec<Option<String>> = expected_values
.into_iter()
.map(|value| value.map(ToString::to_string))
.collect();

let values = storage.get_multiple(keys).await;
let parsed_values: Vec<Option<String>> =
values.into_iter().map(|v| v.map(|v| v.0)).collect();
assert_eq!(parsed_values, expected_values);
}

Ok(())
}
}
20 changes: 9 additions & 11 deletions apollo-router/src/cache/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,17 +191,15 @@ where
let instant_redis = Instant::now();
if let Some(redis) = self.redis.as_ref() {
let inner_key = RedisKey(key.clone());
let redis_value =
redis
.get::<K, V>(inner_key)
.await
.and_then(|mut v| match init_from_redis(&mut v.0) {
Ok(()) => Some(v),
Err(e) => {
tracing::error!("Invalid value from Redis cache: {e}");
None
}
});
let redis_value = redis.get(inner_key).await.ok().and_then(|mut v| {
match init_from_redis(&mut v.0) {
Ok(()) => Some(v),
Err(e) => {
tracing::error!("Invalid value from Redis cache: {e}");
None
}
}
});
match redis_value {
Some(v) => {
self.insert_in_memory(key.clone(), v.0.clone()).await;
Expand Down
Loading