Skip to content

Commit

Permalink
add debug logging and pipe a description of each request into the log…
Browse files Browse the repository at this point in the history
… message if a retry fails
  • Loading branch information
cosmicexplorer committed Mar 22, 2019
1 parent af89154 commit 0f2ec4d
Show file tree
Hide file tree
Showing 5 changed files with 149 additions and 87 deletions.
1 change: 1 addition & 0 deletions src/rust/engine/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

187 changes: 108 additions & 79 deletions src/rust/engine/fs/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1729,7 +1729,7 @@ mod remote {
use futures::{self, future, Future, IntoFuture, Sink, Stream};
use grpcio;
use hashing::{Digest, Fingerprint};
use serverset::{Retry, Serverset};
use serverset::{Retry, RetryParameters, Serverset};
use sha2::Sha256;
use std::cmp::min;
use std::collections::HashSet;
Expand All @@ -1748,6 +1748,16 @@ mod remote {
authorization_header: Option<String>,
}

struct ContentRequestMetadata {
description: String,
}

impl ContentRequestMetadata {
fn new(description: String) -> Self {
ContentRequestMetadata { description }
}
}

impl ByteStore {
pub fn new(
cas_addresses: &[String],
Expand Down Expand Up @@ -1803,14 +1813,19 @@ mod remote {
>(
&self,
f: F,
metadata: ContentRequestMetadata,
) -> impl Future<Item = Value, Error = String> {
let ContentRequestMetadata { description } = metadata;
Retry(self.serverset.clone()).all_errors_immediately(
move |channel| {
f(bazel_protos::bytestream_grpc::ByteStreamClient::new(
channel,
))
},
self.rpc_attempts,
RetryParameters::new(
self.rpc_attempts,
format!("byte stream client request for {}", description),
),
)
}

Expand All @@ -1826,12 +1841,17 @@ mod remote {
>(
&self,
f: F,
metadata: ContentRequestMetadata,
) -> impl Future<Item = Value, Error = String> {
let ContentRequestMetadata { description } = metadata;
Retry(self.serverset.clone()).all_errors_immediately(
move |channel| {
f(bazel_protos::remote_execution_grpc::ContentAddressableStorageClient::new(channel))
},
self.rpc_attempts,
RetryParameters::new(
self.rpc_attempts,
format!("cas client request for {}", description),
),
)
}

Expand Down Expand Up @@ -1862,8 +1882,8 @@ mod remote {
);
let store = self.clone();
self
.with_byte_stream_client(move |client| {
match client
.with_byte_stream_client(
move |client| match client
.write_opt(store.call_option().timeout(store.upload_timeout))
.map(|v| (v, client))
{
Expand Down Expand Up @@ -1924,8 +1944,9 @@ mod remote {
})
.to_boxed()
}
}
})
},
ContentRequestMetadata::new(format!("storing bytes for digest {:?}", digest)),
)
.to_boxed()
}

Expand All @@ -1937,62 +1958,65 @@ mod remote {
) -> BoxFuture<Option<T>, String> {
let store = self.clone();
self
.with_byte_stream_client(move |client| {
match client
.read_opt(
&{
let mut req = bazel_protos::bytestream::ReadRequest::new();
req.set_resource_name(format!(
"{}/blobs/{}/{}",
store.instance_name.clone().unwrap_or_default(),
digest.0,
digest.1
));
req.set_read_offset(0);
// 0 means no limit.
req.set_read_limit(0);
req
},
store.call_option(),
)
.map(|stream| (stream, client))
{
Ok((stream, client)) => {
let f = f.clone();
// We shouldn't have to pass around the client here, it's a workaround for
// https://github.com/pingcap/grpc-rs/issues/123
future::ok(client)
.join(
stream.fold(BytesMut::with_capacity(digest.1), move |mut bytes, r| {
bytes.extend_from_slice(&r.data);
future::ok::<_, grpcio::Error>(bytes)
}),
)
.map(|(_client, bytes)| Some(bytes.freeze()))
.or_else(|e| match e {
grpcio::Error::RpcFailure(grpcio::RpcStatus {
status: grpcio::RpcStatusCode::NotFound,
..
}) => Ok(None),
// TODO: This may be a race condition that deserves respect, but right now it's
// not clear it's an error. See #6344 for a case where we override this behavior
// by forking gprcio.
grpcio::Error::RpcFinished(None) => Ok(None),
_ => Err(format!(
"Error from server in response to CAS read request: {:?}",
e
)),
})
.map(move |maybe_bytes| maybe_bytes.map(f))
.to_boxed()
.with_byte_stream_client(
move |client| {
match client
.read_opt(
&{
let mut req = bazel_protos::bytestream::ReadRequest::new();
req.set_resource_name(format!(
"{}/blobs/{}/{}",
store.instance_name.clone().unwrap_or_default(),
digest.0,
digest.1
));
req.set_read_offset(0);
// 0 means no limit.
req.set_read_limit(0);
req
},
store.call_option(),
)
.map(|stream| (stream, client))
{
Ok((stream, client)) => {
let f = f.clone();
// We shouldn't have to pass around the client here, it's a workaround for
// https://github.com/pingcap/grpc-rs/issues/123
future::ok(client)
.join(
stream.fold(BytesMut::with_capacity(digest.1), move |mut bytes, r| {
bytes.extend_from_slice(&r.data);
future::ok::<_, grpcio::Error>(bytes)
}),
)
.map(|(_client, bytes)| Some(bytes.freeze()))
.or_else(|e| match e {
grpcio::Error::RpcFailure(grpcio::RpcStatus {
status: grpcio::RpcStatusCode::NotFound,
..
}) => Ok(None),
// TODO: This may be a race condition that deserves respect, but right now it's
// not clear it's an error. See #6344 for a case where we override this behavior
// by forking gprcio.
grpcio::Error::RpcFinished(None) => Ok(None),
_ => Err(format!(
"Error from server in response to CAS read request: {:?}",
e
)),
})
.map(move |maybe_bytes| maybe_bytes.map(f))
.to_boxed()
}
Err(err) => future::err(format!(
"Error making CAS read request for {:?}: {:?}",
digest, err
))
.to_boxed(),
}
Err(err) => future::err(format!(
"Error making CAS read request for {:?}: {:?}",
digest, err
))
.to_boxed(),
}
})
},
ContentRequestMetadata::new(format!("loading bytes from CAS for digest {:?}", digest)),
)
.to_boxed()
}

Expand All @@ -2005,23 +2029,28 @@ mod remote {
request: bazel_protos::remote_execution::FindMissingBlobsRequest,
) -> impl Future<Item = HashSet<Digest>, Error = String> {
let store = self.clone();
self.with_cas_client(move |client| {
client
.find_missing_blobs_opt(&request, store.call_option())
.map_err(|err| {
format!(
"Error from server in response to find_missing_blobs_request: {:?}",
err
)
})
.and_then(|response| {
response
.get_missing_blob_digests()
.iter()
.map(|digest| digest.into())
.collect()
})
})
let metadata =
ContentRequestMetadata::new(format!("finding missing blobs for {:?}", request.clone()));
self.with_cas_client(
move |client| {
client
.find_missing_blobs_opt(&request, store.call_option())
.map_err(|err| {
format!(
"Error from server in response to find_missing_blobs_request: {:?}",
err
)
})
.and_then(|response| {
response
.get_missing_blob_digests()
.iter()
.map(|digest| digest.into())
.collect()
})
},
metadata,
)
}

pub(super) fn find_missing_blobs_request<'a, Digests: Iterator<Item = &'a Digest>>(
Expand Down
1 change: 1 addition & 0 deletions src/rust/engine/serverset/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,5 @@ boxfuture = { path = "../boxfuture" }
futures = "^0.1.16"
# TODO: Switch to a release once https://github.com/alexcrichton/futures-timer/pull/11 and https://github.com/alexcrichton/futures-timer/pull/12 merge
futures-timer = { git = "https://github.com/pantsbuild/futures-timer", rev = "0b747e565309a58537807ab43c674d8951f9e5a0" }
log = "0.4"
parking_lot = "0.6"
2 changes: 1 addition & 1 deletion src/rust/engine/serverset/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use std::sync::Arc;
use std::time::{Duration, Instant};

mod retry;
pub use crate::retry::Retry;
pub use crate::retry::{Retry, RetryParameters};

///
/// A collection of resources which are observed to be healthy or unhealthy.
Expand Down
45 changes: 38 additions & 7 deletions src/rust/engine/serverset/src/retry.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,20 @@
use crate::{Health, Serverset};
use futures::{self, Future, IntoFuture};
use log::debug;

pub struct Retry<T>(pub Serverset<T>);

pub struct RetryParameters {
times: usize,
description: String,
}

impl RetryParameters {
pub fn new(times: usize, description: String) -> Self {
RetryParameters { times, description }
}
}

impl<T: Clone + Send + Sync + 'static> Retry<T> {
///
/// Runs `f` up to `times` times, taking servers from the underlying serverset.
Expand All @@ -18,19 +30,30 @@ impl<T: Clone + Send + Sync + 'static> Retry<T> {
>(
&self,
f: F,
times: usize,
retry_params: RetryParameters,
) -> impl Future<Item = Value, Error = String> {
let serverset = self.0.clone();
let RetryParameters { times, description } = retry_params;
futures::future::loop_fn(0_usize, move |i| {
let serverset = serverset.clone();
let description = description.clone();
let description2 = description.clone();
let f = f.clone();
serverset
.next()
.and_then(move |(server, token)| {
futures::future::ok(server).and_then(f).then(move |result| {
let health = match &result {
&Ok(_) => Health::Healthy,
&Err(_) => Health::Unhealthy,
Err(err) => {
debug!(
"Attempt {} for {} failed with error {:?}",
i,
description.clone(),
err
);
Health::Unhealthy
}
};
serverset.report_health(token, health);
result
Expand All @@ -39,7 +62,10 @@ impl<T: Clone + Send + Sync + 'static> Retry<T> {
.map(futures::future::Loop::Break)
.or_else(move |err| {
if i >= times {
Err(format!("Failed after {} retries; last failure: {}", i, err))
Err(format!(
"Failed after {} retries for {}; last failure: {}",
i, description2, err
))
} else {
Ok(futures::future::Loop::Continue(i + 1))
}
Expand All @@ -50,7 +76,7 @@ impl<T: Clone + Send + Sync + 'static> Retry<T> {

#[cfg(test)]
mod tests {
use crate::{BackoffConfig, Retry, Serverset};
use crate::{BackoffConfig, Retry, RetryParameters, Serverset};
use futures::Future;
use futures_timer::TimerHandle;
use std::time::Duration;
Expand All @@ -67,7 +93,7 @@ mod tests {
for _ in 0..3 {
v.push(
Retry(s.clone())
.all_errors_immediately(|v| v, 1)
.all_errors_immediately(|v| v, RetryParameters::new(1, "identity".to_string()))
.wait()
.unwrap(),
);
Expand All @@ -84,9 +110,14 @@ mod tests {
)
.unwrap();
assert_eq!(
Err(format!("Failed after 5 retries; last failure: bad")),
Err(format!(
"Failed after 5 retries for identity; last failure: bad"
)),
Retry(s)
.all_errors_immediately(|v: Result<u8, _>| v, 5)
.all_errors_immediately(
|v: Result<u8, _>| v,
RetryParameters::new(5, "identity".to_string())
)
.wait()
);
}
Expand Down

0 comments on commit 0f2ec4d

Please sign in to comment.