9
9
#include " path.hh"
10
10
#include " legacy-ssh-store.hh"
11
11
#include " serve-protocol.hh"
12
+ #include " serve-protocol-impl.hh"
12
13
#include " state.hh"
13
14
#include " current-process.hh"
14
15
#include " processes.hh"
15
16
#include " util.hh"
17
+ #include " serve-protocol.hh"
18
+ #include " serve-protocol-impl.hh"
16
19
#include " ssh.hh"
17
20
#include " finally.hh"
18
21
#include " url.hh"
@@ -36,6 +39,38 @@ bool ::Machine::isLocalhost() const
36
39
37
40
namespace nix ::build_remote {
38
41
42
+ static std::unique_ptr<SSHMaster::Connection> openConnection (
43
+ ::Machine::ptr machine, SSHMaster & master)
44
+ {
45
+ Strings command = {" nix-store" , " --serve" , " --write" };
46
+ if (machine->isLocalhost ()) {
47
+ command.push_back (" --builders" );
48
+ command.push_back (" " );
49
+ } else {
50
+ auto remoteStore = machine->storeUri .params .find (" remote-store" );
51
+ if (remoteStore != machine->storeUri .params .end ()) {
52
+ command.push_back (" --store" );
53
+ command.push_back (shellEscape (remoteStore->second ));
54
+ }
55
+ }
56
+
57
+ auto ret = master.startCommand (std::move (command), {
58
+ " -a" , " -oBatchMode=yes" , " -oConnectTimeout=60" , " -oTCPKeepAlive=yes"
59
+ });
60
+
61
+ // XXX: determine the actual max value we can use from /proc.
62
+
63
+ // FIXME: Should this be upstreamed into `startCommand` in Nix?
64
+
65
+ int pipesize = 1024 * 1024 ;
66
+
67
+ fcntl (ret->in .get (), F_SETPIPE_SZ, &pipesize);
68
+ fcntl (ret->out .get (), F_SETPIPE_SZ, &pipesize);
69
+
70
+ return ret;
71
+ }
72
+
73
+
39
74
static void copyClosureTo (
40
75
::Machine::Connection & conn,
41
76
Store & destStore,
@@ -52,8 +87,8 @@ static void copyClosureTo(
52
87
// FIXME: substitute output pollutes our build log
53
88
/* Get back the set of paths that are already valid on the remote
54
89
host. */
55
- auto present = conn.store -> queryValidPaths (
56
- closure , true , useSubstitutes);
90
+ auto present = conn.queryValidPaths (
91
+ destStore , true , closure , useSubstitutes);
57
92
58
93
if (present.size () == closure.size ()) return ;
59
94
@@ -68,7 +103,12 @@ static void copyClosureTo(
68
103
std::unique_lock<std::timed_mutex> sendLock (conn.machine ->state ->sendLock ,
69
104
std::chrono::seconds (600 ));
70
105
71
- conn.store ->addMultipleToStoreLegacy (destStore, missing);
106
+ conn.to << ServeProto::Command::ImportPaths;
107
+ destStore.exportPaths (missing, conn.to );
108
+ conn.to .flush ();
109
+
110
+ if (readInt (conn.from ) != 1 )
111
+ throw Error (" remote machine failed to import closure" );
72
112
}
73
113
74
114
@@ -188,7 +228,7 @@ static BuildResult performBuild(
188
228
counter & nrStepsBuilding
189
229
)
190
230
{
191
- auto kont = conn.store -> buildDerivationAsync ( drvPath, drv, options);
231
+ conn.putBuildDerivationRequest (localStore, drvPath, drv, options);
192
232
193
233
BuildResult result;
194
234
@@ -197,10 +237,7 @@ static BuildResult performBuild(
197
237
startTime = time (0 );
198
238
{
199
239
MaintainCount<counter> mc (nrStepsBuilding);
200
- result = kont ();
201
- // Without proper call-once functions, we need to manually
202
- // delete after calling.
203
- kont = {};
240
+ result = ServeProto::Serialise<BuildResult>::read (localStore, conn);
204
241
}
205
242
stopTime = time (0 );
206
243
@@ -216,7 +253,7 @@ static BuildResult performBuild(
216
253
217
254
// If the protocol was too old to give us `builtOutputs`, initialize
218
255
// it manually by introspecting the derivation.
219
- if (GET_PROTOCOL_MINOR (conn.store -> getProtocol () ) < 6 )
256
+ if (GET_PROTOCOL_MINOR (conn.remoteVersion ) < 6 )
220
257
{
221
258
// If the remote is too old to handle CA derivations, we can’t get this
222
259
// far anyways
@@ -249,25 +286,26 @@ static void copyPathFromRemote(
249
286
const ValidPathInfo & info
250
287
)
251
288
{
252
- /* Receive the NAR from the remote and add it to the
253
- destination store. Meanwhile, extract all the info from the
254
- NAR that getBuildOutput() needs. */
255
- auto source2 = sinkToSource ([&](Sink & sink)
256
- {
257
- /* Note: we should only send the command to dump the store
258
- path to the remote if the NAR is actually going to get read
259
- by the destination store, which won't happen if this path
260
- is already valid on the destination store. Since this
261
- lambda function only gets executed if someone tries to read
262
- from source2, we will send the command from here rather
263
- than outside the lambda. */
264
- conn.store ->narFromPath (info.path , [&](Source & source) {
265
- TeeSource tee{source, sink};
266
- extractNarData (tee, conn.store ->printStorePath (info.path ), narMembers);
267
- });
268
- });
269
-
270
- destStore.addToStore (info, *source2, NoRepair, NoCheckSigs);
289
+ /* Receive the NAR from the remote and add it to the
290
+ destination store. Meanwhile, extract all the info from the
291
+ NAR that getBuildOutput() needs. */
292
+ auto source2 = sinkToSource ([&](Sink & sink)
293
+ {
294
+ /* Note: we should only send the command to dump the store
295
+ path to the remote if the NAR is actually going to get read
296
+ by the destination store, which won't happen if this path
297
+ is already valid on the destination store. Since this
298
+ lambda function only gets executed if someone tries to read
299
+ from source2, we will send the command from here rather
300
+ than outside the lambda. */
301
+ conn.to << ServeProto::Command::DumpStorePath << localStore.printStorePath (info.path );
302
+ conn.to .flush ();
303
+
304
+ TeeSource tee (conn.from , sink);
305
+ extractNarData (tee, localStore.printStorePath (info.path ), narMembers);
306
+ });
307
+
308
+ destStore.addToStore (info, *source2, NoRepair, NoCheckSigs);
271
309
}
272
310
273
311
static void copyPathsFromRemote (
@@ -366,39 +404,30 @@ void State::buildRemote(ref<Store> destStore,
366
404
367
405
updateStep (ssConnecting);
368
406
369
- // FIXME: rewrite to use Store.
370
- ::Machine::Connection conn {
371
- .machine = machine,
372
- .store = [&]{
373
- auto * pSpecified = std::get_if<StoreReference::Specified>(&machine->storeUri .variant );
374
- if (!pSpecified || pSpecified->scheme != " ssh" ) {
375
- throw Error (" Currently, only (legacy-)ssh stores are supported!" );
376
- }
377
-
378
- auto remoteStore = machine->openStore ().dynamic_pointer_cast <LegacySSHStore>();
379
- assert (remoteStore);
380
-
381
- remoteStore->connPipeSize = 1024 * 1024 ;
382
-
383
- if (machine->isLocalhost ()) {
384
- auto rp_new = remoteStore->remoteProgram .get ();
385
- rp_new.push_back (" --builders" );
386
- rp_new.push_back (" " );
387
- const_cast <nix::Setting<Strings> &>(remoteStore->remoteProgram ).assign (rp_new);
388
- }
389
- remoteStore->extraSshArgs = {
390
- " -a" , " -oBatchMode=yes" , " -oConnectTimeout=60" , " -oTCPKeepAlive=yes"
391
- };
392
- const_cast <nix::Setting<int > &>(remoteStore->logFD ).assign (logFD.get ());
393
-
394
- return nix::ref{remoteStore};
395
- }(),
407
+ auto storeRef = machine->completeStoreReference ();
408
+
409
+ auto * pSpecified = std::get_if<StoreReference::Specified>(&storeRef.variant );
410
+ if (!pSpecified || pSpecified->scheme != " ssh" ) {
411
+ throw Error (" Currently, only (legacy-)ssh stores are supported!" );
412
+ }
413
+
414
+ LegacySSHStoreConfig storeConfig {
415
+ pSpecified->scheme ,
416
+ pSpecified->authority ,
417
+ storeRef.params
396
418
};
397
419
420
+ auto master = storeConfig.createSSHMaster (
421
+ false , // no SSH master yet
422
+ logFD.get ());
423
+
424
+ // FIXME: rewrite to use Store.
425
+ auto child = build_remote::openConnection (machine, master);
426
+
398
427
{
399
428
auto activeStepState (activeStep->state_ .lock ());
400
429
if (activeStepState->cancelled ) throw Error (" step cancelled" );
401
- activeStepState->pid = conn. store -> getConnectionPid () ;
430
+ activeStepState->pid = child-> sshPid ;
402
431
}
403
432
404
433
Finally clearPid ([&]() {
@@ -413,12 +442,35 @@ void State::buildRemote(ref<Store> destStore,
413
442
process. Meh. */
414
443
});
415
444
445
+ ::Machine::Connection conn {
446
+ {
447
+ .to = child->in .get (),
448
+ .from = child->out .get (),
449
+ /* Handshake. */
450
+ .remoteVersion = 0xdadbeef , // FIXME avoid dummy initialize
451
+ },
452
+ /* .machine =*/ machine,
453
+ };
454
+
416
455
Finally updateStats ([&]() {
417
- auto stats = conn.store ->getConnectionStats ();
418
- bytesReceived += stats.bytesReceived ;
419
- bytesSent += stats.bytesSent ;
456
+ bytesReceived += conn.from .read ;
457
+ bytesSent += conn.to .written ;
420
458
});
421
459
460
+ constexpr ServeProto::Version our_version = 0x206 ;
461
+
462
+ try {
463
+ conn.remoteVersion = decltype (conn)::handshake (
464
+ conn.to ,
465
+ conn.from ,
466
+ our_version,
467
+ machine->storeUri .render ());
468
+ } catch (EndOfFile & e) {
469
+ child->sshPid .wait ();
470
+ std::string s = chomp (readFile (result.logFile ));
471
+ throw Error (" cannot connect to ‘%1%’: %2%" , machine->storeUri .render (), s);
472
+ }
473
+
422
474
{
423
475
auto info (machine->state ->connectInfo .lock ());
424
476
info->consecutiveFailures = 0 ;
@@ -487,7 +539,7 @@ void State::buildRemote(ref<Store> destStore,
487
539
488
540
auto now1 = std::chrono::steady_clock::now ();
489
541
490
- auto infos = conn.store -> queryPathInfosUncached ( outputs);
542
+ auto infos = conn.queryPathInfos (*localStore, outputs);
491
543
492
544
size_t totalNarSize = 0 ;
493
545
for (auto & [_, info] : infos) totalNarSize += info.narSize ;
@@ -522,11 +574,9 @@ void State::buildRemote(ref<Store> destStore,
522
574
}
523
575
}
524
576
525
- /* Shut down the connection done by RAII.
526
-
527
- Only difference is kill() instead of wait() (i.e. send signal
528
- then wait())
529
- */
577
+ /* Shut down the connection. */
578
+ child->in = -1 ;
579
+ child->sshPid .wait ();
530
580
531
581
} catch (Error & e) {
532
582
/* Disable this machine until a certain period of time has
0 commit comments