9
9
#include " path.hh"
10
10
#include " legacy-ssh-store.hh"
11
11
#include " serve-protocol.hh"
12
- #include " serve-protocol-impl.hh"
13
12
#include " state.hh"
14
13
#include " current-process.hh"
15
14
#include " processes.hh"
16
15
#include " util.hh"
17
- #include " serve-protocol.hh"
18
- #include " serve-protocol-impl.hh"
19
16
#include " ssh.hh"
20
17
#include " finally.hh"
21
18
#include " url.hh"
@@ -39,38 +36,6 @@ bool ::Machine::isLocalhost() const
39
36
40
37
namespace nix ::build_remote {
41
38
42
- static std::unique_ptr<SSHMaster::Connection> openConnection (
43
- ::Machine::ptr machine, SSHMaster & master)
44
- {
45
- Strings command = {" nix-store" , " --serve" , " --write" };
46
- if (machine->isLocalhost ()) {
47
- command.push_back (" --builders" );
48
- command.push_back (" " );
49
- } else {
50
- auto remoteStore = machine->storeUri .params .find (" remote-store" );
51
- if (remoteStore != machine->storeUri .params .end ()) {
52
- command.push_back (" --store" );
53
- command.push_back (shellEscape (remoteStore->second ));
54
- }
55
- }
56
-
57
- auto ret = master.startCommand (std::move (command), {
58
- " -a" , " -oBatchMode=yes" , " -oConnectTimeout=60" , " -oTCPKeepAlive=yes"
59
- });
60
-
61
- // XXX: determine the actual max value we can use from /proc.
62
-
63
- // FIXME: Should this be upstreamed into `startCommand` in Nix?
64
-
65
- int pipesize = 1024 * 1024 ;
66
-
67
- fcntl (ret->in .get (), F_SETPIPE_SZ, &pipesize);
68
- fcntl (ret->out .get (), F_SETPIPE_SZ, &pipesize);
69
-
70
- return ret;
71
- }
72
-
73
-
74
39
static void copyClosureTo (
75
40
::Machine::Connection & conn,
76
41
Store & destStore,
@@ -87,8 +52,8 @@ static void copyClosureTo(
87
52
// FIXME: substitute output pollutes our build log
88
53
/* Get back the set of paths that are already valid on the remote
89
54
host. */
90
- auto present = conn.queryValidPaths (
91
- destStore , true , closure , useSubstitutes);
55
+ auto present = conn.store -> queryValidPaths (
56
+ closure , true , useSubstitutes);
92
57
93
58
if (present.size () == closure.size ()) return ;
94
59
@@ -103,12 +68,7 @@ static void copyClosureTo(
103
68
std::unique_lock<std::timed_mutex> sendLock (conn.machine ->state ->sendLock ,
104
69
std::chrono::seconds (600 ));
105
70
106
- conn.to << ServeProto::Command::ImportPaths;
107
- destStore.exportPaths (missing, conn.to );
108
- conn.to .flush ();
109
-
110
- if (readInt (conn.from ) != 1 )
111
- throw Error (" remote machine failed to import closure" );
71
+ conn.store ->addMultipleToStoreLegacy (destStore, missing);
112
72
}
113
73
114
74
@@ -228,7 +188,7 @@ static BuildResult performBuild(
228
188
counter & nrStepsBuilding
229
189
)
230
190
{
231
- conn.putBuildDerivationRequest (localStore, drvPath, drv, options);
191
+ auto kont = conn.store -> buildDerivationAsync ( drvPath, drv, options);
232
192
233
193
BuildResult result;
234
194
@@ -237,7 +197,10 @@ static BuildResult performBuild(
237
197
startTime = time (0 );
238
198
{
239
199
MaintainCount<counter> mc (nrStepsBuilding);
240
- result = ServeProto::Serialise<BuildResult>::read (localStore, conn);
200
+ result = kont ();
201
+ // Without proper call-once functions, we need to manually
202
+ // delete after calling.
203
+ kont = {};
241
204
}
242
205
stopTime = time (0 );
243
206
@@ -253,7 +216,7 @@ static BuildResult performBuild(
253
216
254
217
// If the protocol was too old to give us `builtOutputs`, initialize
255
218
// it manually by introspecting the derivation.
256
- if (GET_PROTOCOL_MINOR (conn.remoteVersion ) < 6 )
219
+ if (GET_PROTOCOL_MINOR (conn.store -> getProtocol () ) < 6 )
257
220
{
258
221
// If the remote is too old to handle CA derivations, we can’t get this
259
222
// far anyways
@@ -286,26 +249,25 @@ static void copyPathFromRemote(
286
249
const ValidPathInfo & info
287
250
)
288
251
{
289
- /* Receive the NAR from the remote and add it to the
290
- destination store. Meanwhile, extract all the info from the
291
- NAR that getBuildOutput() needs. */
292
- auto source2 = sinkToSource ([&](Sink & sink)
293
- {
294
- /* Note: we should only send the command to dump the store
295
- path to the remote if the NAR is actually going to get read
296
- by the destination store, which won't happen if this path
297
- is already valid on the destination store. Since this
298
- lambda function only gets executed if someone tries to read
299
- from source2, we will send the command from here rather
300
- than outside the lambda. */
301
- conn.to << ServeProto::Command::DumpStorePath << localStore.printStorePath (info.path );
302
- conn.to .flush ();
303
-
304
- TeeSource tee (conn.from , sink);
305
- extractNarData (tee, localStore.printStorePath (info.path ), narMembers);
306
- });
307
-
308
- destStore.addToStore (info, *source2, NoRepair, NoCheckSigs);
252
+ /* Receive the NAR from the remote and add it to the
253
+ destination store. Meanwhile, extract all the info from the
254
+ NAR that getBuildOutput() needs. */
255
+ auto source2 = sinkToSource ([&](Sink & sink)
256
+ {
257
+ /* Note: we should only send the command to dump the store
258
+ path to the remote if the NAR is actually going to get read
259
+ by the destination store, which won't happen if this path
260
+ is already valid on the destination store. Since this
261
+ lambda function only gets executed if someone tries to read
262
+ from source2, we will send the command from here rather
263
+ than outside the lambda. */
264
+ conn.store ->narFromPath (info.path , [&](Source & source) {
265
+ TeeSource tee{source, sink};
266
+ extractNarData (tee, conn.store ->printStorePath (info.path ), narMembers);
267
+ });
268
+ });
269
+
270
+ destStore.addToStore (info, *source2, NoRepair, NoCheckSigs);
309
271
}
310
272
311
273
static void copyPathsFromRemote (
@@ -404,30 +366,37 @@ void State::buildRemote(ref<Store> destStore,
404
366
405
367
updateStep (ssConnecting);
406
368
407
- auto storeRef = machine->completeStoreReference ();
408
-
409
- auto * pSpecified = std::get_if<StoreReference::Specified>(&storeRef.variant );
410
- if (!pSpecified || pSpecified->scheme != " ssh" ) {
411
- throw Error (" Currently, only (legacy-)ssh stores are supported!" );
412
- }
413
-
414
- LegacySSHStoreConfig storeConfig {
415
- pSpecified->scheme ,
416
- pSpecified->authority ,
417
- storeRef.params
418
- };
419
-
420
- auto master = storeConfig.createSSHMaster (
421
- false , // no SSH master yet
422
- logFD.get ());
423
-
424
369
// FIXME: rewrite to use Store.
425
- auto child = build_remote::openConnection (machine, master);
370
+ ::Machine::Connection conn {
371
+ .machine = machine,
372
+ .store = [&]{
373
+ auto * pSpecified = std::get_if<StoreReference::Specified>(&machine->storeUri .variant );
374
+ if (!pSpecified || pSpecified->scheme != " ssh" ) {
375
+ throw Error (" Currently, only (legacy-)ssh stores are supported!" );
376
+ }
377
+
378
+ auto remoteStore = machine->openStore ().dynamic_pointer_cast <LegacySSHStore>();
379
+ assert (remoteStore);
380
+
381
+ if (machine->isLocalhost ()) {
382
+ auto rp_new = remoteStore->remoteProgram .get ();
383
+ rp_new.push_back (" --builders" );
384
+ rp_new.push_back (" " );
385
+ const_cast <nix::Setting<Strings> &>(remoteStore->remoteProgram ).assign (rp_new);
386
+ }
387
+ remoteStore->extraSshArgs = {
388
+ " -a" , " -oBatchMode=yes" , " -oConnectTimeout=60" , " -oTCPKeepAlive=yes"
389
+ };
390
+ const_cast <nix::Setting<int > &>(remoteStore->logFD ).assign (logFD.get ());
391
+
392
+ return nix::ref{remoteStore};
393
+ }(),
394
+ };
426
395
427
396
{
428
397
auto activeStepState (activeStep->state_ .lock ());
429
398
if (activeStepState->cancelled ) throw Error (" step cancelled" );
430
- activeStepState->pid = child-> sshPid ;
399
+ activeStepState->pid = conn. store -> getConnectionPid () ;
431
400
}
432
401
433
402
Finally clearPid ([&]() {
@@ -442,35 +411,12 @@ void State::buildRemote(ref<Store> destStore,
442
411
process. Meh. */
443
412
});
444
413
445
- ::Machine::Connection conn {
446
- {
447
- .to = child->in .get (),
448
- .from = child->out .get (),
449
- /* Handshake. */
450
- .remoteVersion = 0xdadbeef , // FIXME avoid dummy initialize
451
- },
452
- /* .machine =*/ machine,
453
- };
454
-
455
414
Finally updateStats ([&]() {
456
- bytesReceived += conn.from .read ;
457
- bytesSent += conn.to .written ;
415
+ auto stats = conn.store ->getConnectionStats ();
416
+ bytesReceived += stats.bytesReceived ;
417
+ bytesSent += stats.bytesSent ;
458
418
});
459
419
460
- constexpr ServeProto::Version our_version = 0x206 ;
461
-
462
- try {
463
- conn.remoteVersion = decltype (conn)::handshake (
464
- conn.to ,
465
- conn.from ,
466
- our_version,
467
- machine->storeUri .render ());
468
- } catch (EndOfFile & e) {
469
- child->sshPid .wait ();
470
- std::string s = chomp (readFile (result.logFile ));
471
- throw Error (" cannot connect to ‘%1%’: %2%" , machine->storeUri .render (), s);
472
- }
473
-
474
420
{
475
421
auto info (machine->state ->connectInfo .lock ());
476
422
info->consecutiveFailures = 0 ;
@@ -539,7 +485,7 @@ void State::buildRemote(ref<Store> destStore,
539
485
540
486
auto now1 = std::chrono::steady_clock::now ();
541
487
542
- auto infos = conn.queryPathInfos (*localStore, outputs);
488
+ auto infos = conn.store -> queryPathInfosUncached ( outputs);
543
489
544
490
size_t totalNarSize = 0 ;
545
491
for (auto & [_, info] : infos) totalNarSize += info.narSize ;
@@ -574,9 +520,11 @@ void State::buildRemote(ref<Store> destStore,
574
520
}
575
521
}
576
522
577
- /* Shut down the connection. */
578
- child->in = -1 ;
579
- child->sshPid .wait ();
523
+ /* Shut down the connection done by RAII.
524
+
525
+ Only difference is kill() instead of wait() (i.e. send signal
526
+ then wait())
527
+ */
580
528
581
529
} catch (Error & e) {
582
530
/* Disable this machine until a certain period of time has
0 commit comments