-
-
Notifications
You must be signed in to change notification settings - Fork 645
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Store can backfill from a CAS #5148
Conversation
This uses the ContentAddressableStorageService protocol specified in https://github.com/googleapis/googleapis/blob/master/google/devtools/remoteexecution/v1test/remote_execution.proto There are a few of problems with this implementation: 1. It has one unnecessary synchronisation point inside load_bytes_remote_with - I'm going to work this out before merging. 2. It doesn't currently pass around the sizes of Digests. I have an ongoing discussion with both the Bazel API specifiers, and the scoot team, about what we should do here. Passing around a size, or a size hint, is easy to do, but I don't want to commit to how it's going to be done until we know whether it needs to be. 3. It is hard-coded to run with a one-thread threadpool. I'm not sure whether we should be configuring this statically (like we do for the IO pool), or through from a pants config. If the latter, I haven't worked out how to do that yet. 4. Context doesn't currently accept a CAS address (specified by pants config). I will wire this up in a subsequent PR.
8368dc6
to
7b13494
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(more feedback to follow)
grpc_env: Option<GrpcEnvironment>, | ||
} | ||
|
||
#[derive(Clone)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see this implementation of Clone
used: should remove if possible. InnerStore
is not Clone
, so avoiding sending the signal that this needs to be clone would be good.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Clone happens here: a01221f#diff-5f54ce2d41ce4be118aab4eb95da8fe1R186
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could that closure just reference store.grpc_env
? You've sent the store
clone into the closure already...
src/rust/engine/fs/src/store.rs
Outdated
let grpc_env = self.inner.grpc_env.clone(); | ||
let store = self.clone(); | ||
|
||
let f = Arc::new(Mutex::new(f)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that if you add an F: Sync
* bound (which should be easy enough: the contents of the closure are generally file paths, etc), you wouldn't need the Mutex
here, and could just use an Arc
.
But also, I think that my factoring of load_bytes_with
might have been a bit premature: it exists for two reasons: 1) to avoid a copy, 2) to allow the file-io of dumping into a file to run directly on the pool. If it simplifies things to have an extra copy from gRPC to the store (ie load_bytes
), and then only use the *_with
for the copy out of the store, that's probably totally fine too.
* maybe also +Send
? (I haven't fully internalized when you need each of them)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's already +Send, and indeed making it +Sync works :)
src/rust/engine/fs/src/store.rs
Outdated
|
||
// TODO: Work out why not wrapping this in a future::done(... .wait()) errors in gRPC channel | ||
// creation. | ||
future::done(stream.map(|r| r.data).concat2().wait()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm. That is certainly suspicious.
You're saying that this API already exposes Futures, but that if you don't call wait
, you see an error? That would definitely imply a bug in the library, because wait
should never actually be necessary except at the root of a Future hierarchy.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Filed tikv/grpc-rs#123 and added a workaround to prevent client from dropping early.
src/rust/engine/fs/src/store.rs
Outdated
} | ||
} | ||
}) | ||
.to_boxed() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This first to_boxed
call should not be necessary, as flatten
probably doesn't have a Sized
bound.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
src/rust/engine/fs/src/store.rs
Outdated
} | ||
}) | ||
.to_boxed() | ||
.flatten() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rather than flatten
, you can change your map
to an and_then
(which flattens).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
src/rust/engine/fs/src/store.rs
Outdated
@@ -175,6 +234,83 @@ impl Store { | |||
}) | |||
} | |||
|
|||
fn load_bytes_remote_with<T: Send + 'static, F: Fn(&[u8]) -> T + Send + 'static>( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel funny about both local and remote logic existing in the same impl
.
I'm not sure whether having a Store
trait with "local", "remote", "cached" implementations would be maximally efficient, but I do not know that it results in a very clean factoring. Will send you a link to an internal thing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, I'll pull them out into separate impls :)
src/rust/engine/fs/src/store.rs
Outdated
.and_then(move |maybe_bytes: Option<Vec<u8>>| match maybe_bytes { | ||
Some(bytes) => { | ||
if db == directory_db { | ||
match Store::validate_directory(&bytes) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this can be:
if db == directory_db {
Store::validate_directory(&bytes).map_err(..)?;
}
... if the return
statement is valid, then the ?
should be as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alas no; because the return type of this closure is a BoxFuture (chained off of the store_copy.store_bytes), it isn't considered to return a Result, so can't be ?'d.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @illicitonion : this looks great.
The only blocker would be that wait
I think... if we leave it in, we should try to execute it directly at the root of a CpuPool, as calling wait
in a Future context has been deadlock prone in the other Future impls I've used, and I expect that it's the same here.
src/rust/engine/fs/src/store.rs
Outdated
} | ||
}).map(move |()| { | ||
let f = f.lock().unwrap(); | ||
Some(f(&bytes)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that if you ran the user function before the store_bytes
, you wouldn't need to clone the bytes in order to store them?
I think that that altered ordering would be harmless, as we never guaranteed that the load_bytes_into
was atomic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This feels a little weird, but I guess since I made it a Fn rather than a FnOnce... Done.
src/rust/engine/fs/src/store.rs
Outdated
Some(f(&bytes)) | ||
}).to_boxed() | ||
} | ||
None => future::ok(None).to_boxed() as BoxFuture<_, _>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
are the as BoxFuture
ascriptions actually necessary...?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sadly so. Otherwise I'm told:
= note: expected type `std::boxed::Box<futures::Map<futures::AndThen<futures_cpupool::CpuFuture<hash::Fingerprint, std::string::String>, std::result::Result<(), std::string::String>, [closure@src/store.rs:280:29: 291:20 fingerprint:_]>, [closure@src/store.rs:291:26: 291:42 result:_]>>`
found type `std::boxed::Box<futures::FutureResult<std::option::Option<_>, _>>`
note: match arm with an incompatible type
This is rust-lang/rust#46061
This uses the ContentAddressableStorageService protocol specified in
https://github.com/googleapis/googleapis/blob/master/google/devtools/remoteexecution/v1test/remote_execution.proto
There are a few of problems with this implementation:
load_bytes_remote_with - I'm going to work this out before merging.
ongoing discussion with both the Bazel API specifiers, and the scoot
team, about what we should do here. Passing around a size, or a size
hint, is easy to do, but I don't want to commit to how it's going to
be done until we know whether it needs to be.
whether we should be configuring this statically (like we do for the
IO pool), or through from a pants config. If the latter, I haven't
worked out how to do that yet.
config). I will wire this up in a subsequent PR.