Skip to content

Commit 9775528

Browse files
authored
Vminitd: Rework ManagedProcess.IO protocols (#194)
I think the way these types were structured made reasoning about them a bit difficult. I don't think this fully solves the problem, but this change aims to make things a bit simpler by hoisting a lot of the logic for the IO relays to a new IOPair type thats goal is to simply take in a reader and writer and handle their resource cleanup after a relay finishes. Bundled in with this is also the buffer we'll use to copy between them. This change: - Alters ManagedProcess.IO `start` to take in the process to alter instead of this just being a side effect of the constructors. - Gets rid of `close()` in favor of `CloseStdin`. The IO will get closed when the relays finish, which will naturally happen if the process exits or just closes its side of the pipes/pty. This makes it so that the one special case (a client wants to signal no more input is coming) is still sane. - Move all relay logic and resource cleanup to a new IOPair type that takes in protocols that are easily conformable by all of our various io types ( Socket, Terminal, FileHandle).
1 parent d3ca580 commit 9775528

File tree

7 files changed

+329
-316
lines changed

7 files changed

+329
-316
lines changed
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
//===----------------------------------------------------------------------===//
2+
// Copyright © 2025 Apple Inc. and the Containerization project authors. All rights reserved.
3+
//
4+
// Licensed under the Apache License, Version 2.0 (the "License");
5+
// you may not use this file except in compliance with the License.
6+
// You may obtain a copy of the License at
7+
//
8+
// https://www.apache.org/licenses/LICENSE-2.0
9+
//
10+
// Unless required by applicable law or agreed to in writing, software
11+
// distributed under the License is distributed on an "AS IS" BASIS,
12+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
// See the License for the specific language governing permissions and
14+
// limitations under the License.
15+
//===----------------------------------------------------------------------===//
16+
17+
import ContainerizationOS
18+
import Foundation
19+
20+
extension Socket: IOCloser {}
21+
22+
extension Terminal: IOCloser {
23+
var fileDescriptor: Int32 {
24+
self.handle.fileDescriptor
25+
}
26+
}
27+
28+
extension FileHandle: IOCloser {}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
//===----------------------------------------------------------------------===//
2+
// Copyright © 2025 Apple Inc. and the Containerization project authors. All rights reserved.
3+
//
4+
// Licensed under the Apache License, Version 2.0 (the "License");
5+
// you may not use this file except in compliance with the License.
6+
// You may obtain a copy of the License at
7+
//
8+
// https://www.apache.org/licenses/LICENSE-2.0
9+
//
10+
// Unless required by applicable law or agreed to in writing, software
11+
// distributed under the License is distributed on an "AS IS" BASIS,
12+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
// See the License for the specific language governing permissions and
14+
// limitations under the License.
15+
//===----------------------------------------------------------------------===//
16+
17+
protocol IOCloser: Sendable {
18+
var fileDescriptor: Int32 { get }
19+
20+
func close() throws
21+
}
Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
//===----------------------------------------------------------------------===//
2+
// Copyright © 2025 Apple Inc. and the Containerization project authors. All rights reserved.
3+
//
4+
// Licensed under the Apache License, Version 2.0 (the "License");
5+
// you may not use this file except in compliance with the License.
6+
// You may obtain a copy of the License at
7+
//
8+
// https://www.apache.org/licenses/LICENSE-2.0
9+
//
10+
// Unless required by applicable law or agreed to in writing, software
11+
// distributed under the License is distributed on an "AS IS" BASIS,
12+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
// See the License for the specific language governing permissions and
14+
// limitations under the License.
15+
//===----------------------------------------------------------------------===//
16+
17+
import ContainerizationError
18+
import ContainerizationOS
19+
import Foundation
20+
import Logging
21+
import Synchronization
22+
23+
final class IOPair: Sendable {
24+
let readFrom: IOCloser
25+
let writeTo: IOCloser
26+
nonisolated(unsafe) let buffer: UnsafeMutableBufferPointer<UInt8>
27+
private let logger: Logger?
28+
29+
private let done: Atomic<Bool>
30+
31+
init(readFrom: IOCloser, writeTo: IOCloser, logger: Logger? = nil) {
32+
self.readFrom = readFrom
33+
self.writeTo = writeTo
34+
self.done = Atomic(false)
35+
self.buffer = UnsafeMutableBufferPointer<UInt8>.allocate(capacity: Int(getpagesize()))
36+
self.logger = logger
37+
}
38+
39+
func relay() throws {
40+
let readFromFd = self.readFrom.fileDescriptor
41+
let writeToFd = self.writeTo.fileDescriptor
42+
43+
let readFrom = OSFile(fd: readFromFd)
44+
let writeTo = OSFile(fd: writeToFd)
45+
46+
try ProcessSupervisor.default.poller.add(readFromFd, mask: EPOLLIN) { mask in
47+
if mask.isHangup && !mask.readyToRead {
48+
self.close()
49+
return
50+
}
51+
// Loop so that in the case that someone wrote > buf.count down the pipe
52+
// we properly will drain it fully.
53+
while true {
54+
let r = readFrom.read(self.buffer)
55+
if r.read > 0 {
56+
let view = UnsafeMutableBufferPointer(
57+
start: self.buffer.baseAddress,
58+
count: r.read
59+
)
60+
61+
let w = writeTo.write(view)
62+
if w.wrote != r.read {
63+
self.logger?.error("stopping relay: short write for stdio")
64+
self.close()
65+
return
66+
}
67+
}
68+
69+
switch r.action {
70+
case .error(let errno):
71+
self.logger?.error("failed with errno \(errno) while reading for fd \(readFromFd)")
72+
fallthrough
73+
case .eof:
74+
self.close()
75+
self.logger?.debug("closing relay for \(readFromFd)")
76+
return
77+
case .again:
78+
// We read all we could, exit.
79+
if mask.isHangup {
80+
self.close()
81+
}
82+
return
83+
default:
84+
break
85+
}
86+
}
87+
}
88+
}
89+
90+
func close() {
91+
guard
92+
self.done.compareExchange(
93+
expected: false,
94+
desired: true,
95+
successOrdering: .acquiringAndReleasing,
96+
failureOrdering: .acquiring
97+
).exchanged
98+
else {
99+
return
100+
}
101+
102+
self.buffer.deallocate()
103+
104+
let readFromFd = self.readFrom.fileDescriptor
105+
// Remove the fd from our global epoll instance first.
106+
do {
107+
try ProcessSupervisor.default.poller.delete(readFromFd)
108+
} catch {
109+
self.logger?.error("failed to delete fd from epoll \(readFromFd): \(error)")
110+
}
111+
112+
do {
113+
try self.readFrom.close()
114+
} catch {
115+
self.logger?.error("failed to close reader fd for IOPair: \(error)")
116+
}
117+
118+
do {
119+
try self.writeTo.close()
120+
} catch {
121+
self.logger?.error("failed to close writer fd for IOPair: \(error)")
122+
}
123+
}
124+
}

vminitd/Sources/vminitd/ManagedContainer.swift

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -111,11 +111,6 @@ extension ManagedContainer {
111111
try proc.resize(size: size)
112112
}
113113

114-
func close(execID: String) throws {
115-
let proc = try self.getExecOrInit(execID: execID)
116-
try proc.close()
117-
}
118-
119114
func deleteExec(id: String) throws {
120115
try ensureExecExists(id)
121116
do {

vminitd/Sources/vminitd/ManagedProcess.swift

Lines changed: 4 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@ final class ManagedProcess: Sendable {
4040
let io: IO
4141
var waiters: [CheckedContinuation<Int32, Never>] = []
4242
var exitStatus: Int32? = nil
43-
var closed: Bool = false
4443
var pid: Int32 = 0
4544
}
4645

@@ -52,10 +51,10 @@ final class ManagedProcess: Sendable {
5251

5352
// swiftlint: disable type_name
5453
protocol IO {
55-
func start() throws
54+
func start(process: inout Command) throws
5655
func closeAfterExec() throws
5756
func resize(size: Terminal.Size) throws
58-
func close() throws
57+
func closeStdin() throws
5958
}
6059
// swiftlint: enable type_name
6160

@@ -105,14 +104,12 @@ final class ManagedProcess: Sendable {
105104
let attrs = Command.Attrs(setsid: false, setctty: false)
106105
process.attrs = attrs
107106
io = try TerminalIO(
108-
process: &process,
109107
stdio: stdio,
110108
log: log
111109
)
112110
} else {
113111
process.attrs = .init(setsid: false)
114112
io = StandardIO(
115-
process: &process,
116113
stdio: stdio,
117114
log: log
118115
)
@@ -121,7 +118,7 @@ final class ManagedProcess: Sendable {
121118
log.info("starting io")
122119

123120
// Setup IO early. We expect the host to be listening already.
124-
try io.start()
121+
try io.start(process: &process)
125122

126123
self.process = process
127124
self.lock = Mutex(State(io: io))
@@ -213,20 +210,10 @@ extension ManagedProcess {
213210

214211
func resize(size: Terminal.Size) throws {
215212
try self.lock.withLock {
216-
if $0.closed {
213+
guard $0.exitStatus == nil else {
217214
return
218215
}
219216
try $0.io.resize(size: size)
220217
}
221218
}
222-
223-
func close() throws {
224-
try self.lock.withLock {
225-
if $0.closed {
226-
return
227-
}
228-
try $0.io.close()
229-
$0.closed = true
230-
}
231-
}
232219
}

0 commit comments

Comments
 (0)