Skip to content

Commit 4a4a0f9

Browse files
committed
Use LegacySSHStore
In NixOS/nix#10748 it is extended with everything we need.
1 parent 881462b commit 4a4a0f9

File tree

2 files changed

+69
-119
lines changed

2 files changed

+69
-119
lines changed

src/hydra-queue-runner/build-remote.cc

+65-115
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,10 @@
99
#include "path.hh"
1010
#include "legacy-ssh-store.hh"
1111
#include "serve-protocol.hh"
12-
#include "serve-protocol-impl.hh"
1312
#include "state.hh"
1413
#include "current-process.hh"
1514
#include "processes.hh"
1615
#include "util.hh"
17-
#include "serve-protocol.hh"
18-
#include "serve-protocol-impl.hh"
1916
#include "ssh.hh"
2017
#include "finally.hh"
2118
#include "url.hh"
@@ -39,38 +36,6 @@ bool ::Machine::isLocalhost() const
3936

4037
namespace nix::build_remote {
4138

42-
static std::unique_ptr<SSHMaster::Connection> openConnection(
43-
::Machine::ptr machine, SSHMaster & master)
44-
{
45-
Strings command = {"nix-store", "--serve", "--write"};
46-
if (machine->isLocalhost()) {
47-
command.push_back("--builders");
48-
command.push_back("");
49-
} else {
50-
auto remoteStore = machine->storeUri.params.find("remote-store");
51-
if (remoteStore != machine->storeUri.params.end()) {
52-
command.push_back("--store");
53-
command.push_back(shellEscape(remoteStore->second));
54-
}
55-
}
56-
57-
auto ret = master.startCommand(std::move(command), {
58-
"-a", "-oBatchMode=yes", "-oConnectTimeout=60", "-oTCPKeepAlive=yes"
59-
});
60-
61-
// XXX: determine the actual max value we can use from /proc.
62-
63-
// FIXME: Should this be upstreamed into `startCommand` in Nix?
64-
65-
int pipesize = 1024 * 1024;
66-
67-
fcntl(ret->in.get(), F_SETPIPE_SZ, &pipesize);
68-
fcntl(ret->out.get(), F_SETPIPE_SZ, &pipesize);
69-
70-
return ret;
71-
}
72-
73-
7439
static void copyClosureTo(
7540
::Machine::Connection & conn,
7641
Store & destStore,
@@ -87,8 +52,8 @@ static void copyClosureTo(
8752
// FIXME: substitute output pollutes our build log
8853
/* Get back the set of paths that are already valid on the remote
8954
host. */
90-
auto present = conn.queryValidPaths(
91-
destStore, true, closure, useSubstitutes);
55+
auto present = conn.store->queryValidPaths(
56+
closure, true, useSubstitutes);
9257

9358
if (present.size() == closure.size()) return;
9459

@@ -103,12 +68,7 @@ static void copyClosureTo(
10368
std::unique_lock<std::timed_mutex> sendLock(conn.machine->state->sendLock,
10469
std::chrono::seconds(600));
10570

106-
conn.to << ServeProto::Command::ImportPaths;
107-
destStore.exportPaths(missing, conn.to);
108-
conn.to.flush();
109-
110-
if (readInt(conn.from) != 1)
111-
throw Error("remote machine failed to import closure");
71+
conn.store->addMultipleToStoreLegacy(destStore, missing);
11272
}
11373

11474

@@ -228,7 +188,7 @@ static BuildResult performBuild(
228188
counter & nrStepsBuilding
229189
)
230190
{
231-
conn.putBuildDerivationRequest(localStore, drvPath, drv, options);
191+
auto kont = conn.store->buildDerivationAsync(drvPath, drv, options);
232192

233193
BuildResult result;
234194

@@ -237,7 +197,10 @@ static BuildResult performBuild(
237197
startTime = time(0);
238198
{
239199
MaintainCount<counter> mc(nrStepsBuilding);
240-
result = ServeProto::Serialise<BuildResult>::read(localStore, conn);
200+
result = kont();
201+
// Without proper call-once functions, we need to manually
202+
// delete after calling.
203+
kont = {};
241204
}
242205
stopTime = time(0);
243206

@@ -253,7 +216,7 @@ static BuildResult performBuild(
253216

254217
// If the protocol was too old to give us `builtOutputs`, initialize
255218
// it manually by introspecting the derivation.
256-
if (GET_PROTOCOL_MINOR(conn.remoteVersion) < 6)
219+
if (GET_PROTOCOL_MINOR(conn.store->getProtocol()) < 6)
257220
{
258221
// If the remote is too old to handle CA derivations, we can’t get this
259222
// far anyways
@@ -286,26 +249,25 @@ static void copyPathFromRemote(
286249
const ValidPathInfo & info
287250
)
288251
{
289-
/* Receive the NAR from the remote and add it to the
290-
destination store. Meanwhile, extract all the info from the
291-
NAR that getBuildOutput() needs. */
292-
auto source2 = sinkToSource([&](Sink & sink)
293-
{
294-
/* Note: we should only send the command to dump the store
295-
path to the remote if the NAR is actually going to get read
296-
by the destination store, which won't happen if this path
297-
is already valid on the destination store. Since this
298-
lambda function only gets executed if someone tries to read
299-
from source2, we will send the command from here rather
300-
than outside the lambda. */
301-
conn.to << ServeProto::Command::DumpStorePath << localStore.printStorePath(info.path);
302-
conn.to.flush();
303-
304-
TeeSource tee(conn.from, sink);
305-
extractNarData(tee, localStore.printStorePath(info.path), narMembers);
306-
});
307-
308-
destStore.addToStore(info, *source2, NoRepair, NoCheckSigs);
252+
/* Receive the NAR from the remote and add it to the
253+
destination store. Meanwhile, extract all the info from the
254+
NAR that getBuildOutput() needs. */
255+
auto source2 = sinkToSource([&](Sink & sink)
256+
{
257+
/* Note: we should only send the command to dump the store
258+
path to the remote if the NAR is actually going to get read
259+
by the destination store, which won't happen if this path
260+
is already valid on the destination store. Since this
261+
lambda function only gets executed if someone tries to read
262+
from source2, we will send the command from here rather
263+
than outside the lambda. */
264+
conn.store->narFromPath(info.path, [&](Source & source) {
265+
TeeSource tee{source, sink};
266+
extractNarData(tee, conn.store->printStorePath(info.path), narMembers);
267+
});
268+
});
269+
270+
destStore.addToStore(info, *source2, NoRepair, NoCheckSigs);
309271
}
310272

311273
static void copyPathsFromRemote(
@@ -404,30 +366,39 @@ void State::buildRemote(ref<Store> destStore,
404366

405367
updateStep(ssConnecting);
406368

407-
auto storeRef = machine->completeStoreReference();
408-
409-
auto * pSpecified = std::get_if<StoreReference::Specified>(&storeRef.variant);
410-
if (!pSpecified || pSpecified->scheme != "ssh") {
411-
throw Error("Currently, only (legacy-)ssh stores are supported!");
412-
}
413-
414-
LegacySSHStoreConfig storeConfig {
415-
pSpecified->scheme,
416-
pSpecified->authority,
417-
storeRef.params
418-
};
419-
420-
auto master = storeConfig.createSSHMaster(
421-
false, // no SSH master yet
422-
logFD.get());
423-
424369
// FIXME: rewrite to use Store.
425-
auto child = build_remote::openConnection(machine, master);
370+
::Machine::Connection conn {
371+
.machine = machine,
372+
.store = [&]{
373+
auto * pSpecified = std::get_if<StoreReference::Specified>(&machine->storeUri.variant);
374+
if (!pSpecified || pSpecified->scheme != "ssh") {
375+
throw Error("Currently, only (legacy-)ssh stores are supported!");
376+
}
377+
378+
auto remoteStore = machine->openStore().dynamic_pointer_cast<LegacySSHStore>();
379+
assert(remoteStore);
380+
381+
remoteStore->connPipeSize = 1024 * 1024;
382+
383+
if (machine->isLocalhost()) {
384+
auto rp_new = remoteStore->remoteProgram.get();
385+
rp_new.push_back("--builders");
386+
rp_new.push_back("");
387+
const_cast<nix::Setting<Strings> &>(remoteStore->remoteProgram).assign(rp_new);
388+
}
389+
remoteStore->extraSshArgs = {
390+
"-a", "-oBatchMode=yes", "-oConnectTimeout=60", "-oTCPKeepAlive=yes"
391+
};
392+
const_cast<nix::Setting<int> &>(remoteStore->logFD).assign(logFD.get());
393+
394+
return nix::ref{remoteStore};
395+
}(),
396+
};
426397

427398
{
428399
auto activeStepState(activeStep->state_.lock());
429400
if (activeStepState->cancelled) throw Error("step cancelled");
430-
activeStepState->pid = child->sshPid;
401+
activeStepState->pid = conn.store->getConnectionPid();
431402
}
432403

433404
Finally clearPid([&]() {
@@ -442,35 +413,12 @@ void State::buildRemote(ref<Store> destStore,
442413
process. Meh. */
443414
});
444415

445-
::Machine::Connection conn {
446-
{
447-
.to = child->in.get(),
448-
.from = child->out.get(),
449-
/* Handshake. */
450-
.remoteVersion = 0xdadbeef, // FIXME avoid dummy initialize
451-
},
452-
/*.machine =*/ machine,
453-
};
454-
455416
Finally updateStats([&]() {
456-
bytesReceived += conn.from.read;
457-
bytesSent += conn.to.written;
417+
auto stats = conn.store->getConnectionStats();
418+
bytesReceived += stats.bytesReceived;
419+
bytesSent += stats.bytesSent;
458420
});
459421

460-
constexpr ServeProto::Version our_version = 0x206;
461-
462-
try {
463-
conn.remoteVersion = decltype(conn)::handshake(
464-
conn.to,
465-
conn.from,
466-
our_version,
467-
machine->storeUri.render());
468-
} catch (EndOfFile & e) {
469-
child->sshPid.wait();
470-
std::string s = chomp(readFile(result.logFile));
471-
throw Error("cannot connect to ‘%1%’: %2%", machine->storeUri.render(), s);
472-
}
473-
474422
{
475423
auto info(machine->state->connectInfo.lock());
476424
info->consecutiveFailures = 0;
@@ -539,7 +487,7 @@ void State::buildRemote(ref<Store> destStore,
539487

540488
auto now1 = std::chrono::steady_clock::now();
541489

542-
auto infos = conn.queryPathInfos(*localStore, outputs);
490+
auto infos = conn.store->queryPathInfosUncached(outputs);
543491

544492
size_t totalNarSize = 0;
545493
for (auto & [_, info] : infos) totalNarSize += info.narSize;
@@ -574,9 +522,11 @@ void State::buildRemote(ref<Store> destStore,
574522
}
575523
}
576524

577-
/* Shut down the connection. */
578-
child->in = -1;
579-
child->sshPid.wait();
525+
/* Shut down the connection done by RAII.
526+
527+
Only difference is kill() instead of wait() (i.e. send signal
528+
then wait())
529+
*/
580530

581531
} catch (Error & e) {
582532
/* Disable this machine until a certain period of time has

src/hydra-queue-runner/state.hh

+4-4
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,7 @@
2020
#include "store-api.hh"
2121
#include "sync.hh"
2222
#include "nar-extractor.hh"
23-
#include "serve-protocol.hh"
24-
#include "serve-protocol-impl.hh"
25-
#include "serve-protocol-connection.hh"
23+
#include "legacy-ssh-store.hh"
2624
#include "machines.hh"
2725

2826

@@ -292,9 +290,11 @@ struct Machine : nix::Machine
292290
bool isLocalhost() const;
293291

294292
// A connection to a machine
295-
struct Connection : nix::ServeProto::BasicClientConnection {
293+
struct Connection {
296294
// Backpointer to the machine
297295
ptr machine;
296+
// Opened store
297+
nix::ref<nix::LegacySSHStore> store;
298298
};
299299
};
300300

0 commit comments

Comments
 (0)