/build/source/nativelink-worker/src/persistent_worker/live_worker.rs
Line | Count | Source |
1 | | // Copyright 2024 Trace Machina, Inc. All rights reserved. |
2 | | // |
3 | | // Licensed under the Business Source License, Version 1.1 (the "License"); |
4 | | // you may not use this file except in compliance with the License. |
5 | | // You may requested a copy of the License by emailing contact@nativelink.com. |
6 | | // |
7 | | // Use of this module requires an enterprise license agreement, which can be |
8 | | // attained by emailing contact@nativelink.com or signing up for Nativelink |
9 | | // Cloud at app.nativelink.com. |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, software |
12 | | // distributed under the License is distributed on an "AS IS" BASIS, |
13 | | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
14 | | // See the License for the specific language governing permissions and |
15 | | // limitations under the License. |
16 | | |
17 | | //! `LiveWorker`: one long-lived persistent-worker child process. |
18 | | //! |
19 | | //! A `LiveWorker` wraps a child process started with the action's tool + |
20 | | //! startup-flag prefix + `--persistent_worker`. It owns the child's stdin/stdout |
21 | | //! and exposes `dispatch(request) -> response` for the pool to invoke. |
22 | | //! |
23 | | //! Lifecycle: |
24 | | //! - `spawn` → ready, never-used |
25 | | //! - `dispatch` flips an in-flight bool, writes the request, reads the response, |
26 | | //! updates `last_used` and `request_count` |
27 | | //! - `drop` sends SIGTERM, waits a grace period, then SIGKILL |
28 | | //! |
29 | | //! v1 is single-request-per-worker (no multiplex). The pool serializes |
30 | | //! concurrent acquires of the same worker via its data structure. |
31 | | |
32 | | use core::time::Duration; |
33 | | use std::path::{Path, PathBuf}; |
34 | | use std::process::Stdio; |
35 | | use std::time::Instant; |
36 | | |
37 | | use bytes::BytesMut; |
38 | | use nativelink_error::{Code, Error, ResultExt, make_err}; |
39 | | use prost::Message as ProstMessage; |
40 | | use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader}; |
41 | | use tokio::process::{Child, ChildStdin, ChildStdout, Command}; |
42 | | use tracing::{debug, warn}; |
43 | | |
44 | | use super::protocol::{WireFormat, WorkRequest, WorkResponse}; |
45 | | |
46 | | /// Default time we wait for a single `WorkResponse` after writing a request. |
47 | | /// Beyond this we kill the worker and surface a `DeadlineExceeded` error. |
48 | | /// Callers may override via `LiveWorker::dispatch_with_timeout`. |
49 | | const DEFAULT_DISPATCH_TIMEOUT: Duration = Duration::from_secs(60 * 10); |
50 | | |
51 | | /// One persistent-worker child process. |
52 | | #[derive(Debug)] |
53 | | pub struct LiveWorker { |
54 | | /// Spawned child process. Kept on the struct so its Drop kills the process |
55 | | /// when the worker is dropped without `shutdown` being called. |
56 | | child: Child, |
57 | | stdin: ChildStdin, |
58 | | stdout: BufReader<ChildStdout>, |
59 | | /// Wire format we negotiated at spawn (immutable for this worker's lifetime). |
60 | | wire_format: WireFormat, |
61 | | /// Number of successful `dispatch` calls completed. |
62 | | request_count: u64, |
63 | | /// When the last `dispatch` finished. Used by the pool's idle eviction. |
64 | | last_used: Instant, |
65 | | /// Working directory the child runs in. Stored so the pool can reuse it |
66 | | /// across requests for the same `WorkerKey` until per-request sandboxing |
67 | | /// arrives in v2. |
68 | | working_dir: PathBuf, |
69 | | } |
70 | | |
71 | | impl LiveWorker { |
72 | | /// Spawn a new persistent worker. |
73 | | /// |
74 | | /// `executable` is the resolved tool path (absolute, or relative to |
75 | | /// `working_dir`). `startup_args` are the flags that the worker is launched |
76 | | /// with once — these become part of the `WorkerKey`. The Bazel-conventional |
77 | | /// `--persistent_worker` flag is appended automatically. |
78 | 8 | pub async fn spawn( |
79 | 8 | executable: &Path, |
80 | 8 | startup_args: &[String], |
81 | 8 | wire_format: WireFormat, |
82 | 8 | working_dir: &Path, |
83 | 8 | ) -> Result<Self, Error> { |
84 | 8 | let mut cmd = Command::new(executable); |
85 | 8 | cmd.args(startup_args) |
86 | 8 | .arg("--persistent_worker") |
87 | 8 | .current_dir(working_dir) |
88 | 8 | .stdin(Stdio::piped()) |
89 | 8 | .stdout(Stdio::piped()) |
90 | 8 | // WorkResponse.output carries per-action diagnostics. The child |
91 | 8 | // stderr stream is process-lifetime data and cannot be attributed |
92 | 8 | // safely to a single request. |
93 | 8 | .stderr(Stdio::null()) |
94 | 8 | .kill_on_drop(true); |
95 | | |
96 | 8 | debug!( |
97 | | ?executable, |
98 | | ?startup_args, |
99 | | ?wire_format, |
100 | | "Spawning persistent worker" |
101 | | ); |
102 | | |
103 | 8 | let mut child = cmd.spawn().err_tip(|| {0 |
104 | 0 | format!( |
105 | | "Spawning persistent worker {} with args {startup_args:?}", |
106 | 0 | executable.display() |
107 | | ) |
108 | 0 | })?; |
109 | 8 | let stdin = child |
110 | 8 | .stdin |
111 | 8 | .take() |
112 | 8 | .ok_or_else(|| make_err!0 (Code::Internal0 , "Persistent worker child has no stdin"))?0 ; |
113 | 8 | let stdout = child |
114 | 8 | .stdout |
115 | 8 | .take() |
116 | 8 | .ok_or_else(|| make_err!0 (Code::Internal0 , "Persistent worker child has no stdout"))?0 ; |
117 | | |
118 | 8 | Ok(Self { |
119 | 8 | child, |
120 | 8 | stdin, |
121 | 8 | stdout: BufReader::new(stdout), |
122 | 8 | wire_format, |
123 | 8 | request_count: 0, |
124 | 8 | last_used: Instant::now(), |
125 | 8 | working_dir: working_dir.to_path_buf(), |
126 | 8 | }) |
127 | 8 | } |
128 | | |
129 | 0 | pub const fn wire_format(&self) -> WireFormat { |
130 | 0 | self.wire_format |
131 | 0 | } |
132 | | |
133 | 7 | pub const fn request_count(&self) -> u64 { |
134 | 7 | self.request_count |
135 | 7 | } |
136 | | |
137 | 0 | pub const fn last_used(&self) -> Instant { |
138 | 0 | self.last_used |
139 | 0 | } |
140 | | |
141 | 0 | pub fn working_dir(&self) -> &Path { |
142 | 0 | &self.working_dir |
143 | 0 | } |
144 | | |
145 | | /// Returns true if the child process has already exited. |
146 | 2 | pub fn is_dead(&mut self) -> bool { |
147 | 2 | matches!1 (self.child.try_wait(), Ok(Some(_)) | Err(_)) |
148 | 2 | } |
149 | | |
150 | | /// Send a single `WorkRequest` and read the matching `WorkResponse`. On |
151 | | /// any I/O error or response framing error, the worker is considered |
152 | | /// dead — the caller (pool) must not return it to the idle set. |
153 | 2 | pub async fn dispatch(&mut self, request: &WorkRequest) -> Result<WorkResponse, Error> { |
154 | 2 | self.dispatch_with_timeout(request, DEFAULT_DISPATCH_TIMEOUT) |
155 | 2 | .await |
156 | 2 | } |
157 | | |
158 | 3 | pub async fn dispatch_with_timeout( |
159 | 3 | &mut self, |
160 | 3 | request: &WorkRequest, |
161 | 3 | timeout: Duration, |
162 | 5 | ) -> Result<WorkResponse, Error> { |
163 | 5 | if request.request_id != 0 { |
164 | 1 | return Err(make_err!( |
165 | 1 | Code::InvalidArgument, |
166 | 1 | "v1 persistent workers do not support multiplex; request_id must be 0, got {}", |
167 | 1 | request.request_id |
168 | 1 | )); |
169 | 4 | } |
170 | | |
171 | 4 | let bytes = request.encode_framed(self.wire_format)?0 ; |
172 | 4 | self.stdin |
173 | 4 | .write_all(&bytes) |
174 | 4 | .await |
175 | 4 | .err_tip(|| "Writing WorkRequest to persistent worker stdin")?0 ; |
176 | 4 | self.stdin |
177 | 4 | .flush() |
178 | 4 | .await |
179 | 4 | .err_tip(|| "Flushing persistent worker stdin")?0 ; |
180 | | |
181 | 4 | let read_fut = read_response(&mut self.stdout, self.wire_format); |
182 | 4 | let response3 = if let Ok(result3 ) = tokio::time::timeout(timeout, read_fut).await { |
183 | 3 | result?0 |
184 | | } else { |
185 | 1 | warn!( |
186 | | ?timeout, |
187 | | "Persistent worker did not respond before deadline; killing child" |
188 | | ); |
189 | 1 | drop(self.child.kill().await); |
190 | 1 | return Err(make_err!( |
191 | 1 | Code::DeadlineExceeded, |
192 | 1 | "Persistent worker did not respond within {timeout:?}" |
193 | 1 | )); |
194 | | }; |
195 | | |
196 | 3 | if response.request_id != 0 { |
197 | | // v1 contract: workers MUST echo request_id 0. If a tool ever |
198 | | // produces a multiplex-style id we don't know what to do with the |
199 | | // response — fail conservatively rather than misroute output. |
200 | 0 | return Err(make_err!( |
201 | 0 | Code::Internal, |
202 | 0 | "Persistent worker returned non-zero request_id={}; v1 does not support multiplex", |
203 | 0 | response.request_id |
204 | 0 | )); |
205 | 3 | } |
206 | | |
207 | 3 | self.request_count += 1; |
208 | 3 | self.last_used = Instant::now(); |
209 | 3 | Ok(response) |
210 | 5 | } |
211 | | |
212 | | /// Gracefully drain: close stdin so the worker observes EOF and exits on |
213 | | /// its own, then wait up to `grace`, then SIGKILL if still alive. Always |
214 | | /// consumes the worker. |
215 | 6 | pub async fn shutdown(mut self, grace: Duration) { |
216 | | // Dropping stdin closes the pipe, which most well-behaved workers |
217 | | // interpret as "no more work, please exit". |
218 | 6 | drop(self.stdin); |
219 | | |
220 | 6 | match tokio::time::timeout(grace, self.child.wait()).await { |
221 | 2 | Ok(Ok(status)) => debug!(?status, "Persistent worker exited cleanly"), |
222 | 0 | Ok(Err(err)) => warn!(?err, "Error waiting for persistent worker exit"), |
223 | | Err(_) => { |
224 | 3 | warn!( |
225 | | ?grace, |
226 | | "Persistent worker did not exit within grace; SIGKILLing" |
227 | | ); |
228 | 3 | drop(self.child.kill().await); |
229 | | } |
230 | | } |
231 | 5 | } |
232 | | } |
233 | | |
234 | | /// Read one framed `WorkResponse` from the worker's stdout. |
235 | 4 | async fn read_response( |
236 | 4 | stdout: &mut BufReader<ChildStdout>, |
237 | 4 | format: WireFormat, |
238 | 4 | ) -> Result<WorkResponse, Error> { |
239 | 4 | match format { |
240 | | WireFormat::Proto => { |
241 | | // Length-delimited varint frame. Decode the varint manually because |
242 | | // we want to read exactly the right number of bytes without buffering |
243 | | // arbitrary data from the worker's pipe. |
244 | 0 | let len = read_varint(stdout).await?; |
245 | 0 | if len > 64 * 1024 * 1024 { |
246 | 0 | return Err(make_err!( |
247 | 0 | Code::OutOfRange, |
248 | 0 | "Persistent worker WorkResponse length {len} exceeds 64 MiB cap" |
249 | 0 | )); |
250 | 0 | } |
251 | 0 | let mut buf = BytesMut::with_capacity(len); |
252 | 0 | buf.resize(len, 0); |
253 | 0 | stdout |
254 | 0 | .read_exact(&mut buf) |
255 | 0 | .await |
256 | 0 | .err_tip(|| "Reading WorkResponse proto body from persistent worker")?; |
257 | 0 | <WorkResponse as ProstMessage>::decode(&buf[..]) |
258 | 0 | .map_err(|e| make_err!(Code::Internal, "Decoding WorkResponse proto body: {e}")) |
259 | | } |
260 | | WireFormat::Json => { |
261 | | // Newline-delimited JSON. |
262 | 4 | let mut line = Vec::with_capacity(256); |
263 | | loop { |
264 | 98 | let mut byte = [0u8; 1]; |
265 | 98 | let n97 = stdout |
266 | 98 | .read(&mut byte) |
267 | 98 | .await |
268 | 97 | .err_tip(|| "Reading WorkResponse JSON byte from persistent worker")?0 ; |
269 | 97 | if n == 0 { |
270 | 0 | return Err(make_err!( |
271 | 0 | Code::Aborted, |
272 | 0 | "Persistent worker closed stdout before sending a full JSON response" |
273 | 0 | )); |
274 | 97 | } |
275 | 97 | if byte[0] == b'\n' { |
276 | 3 | if line.is_empty() { |
277 | 0 | continue; // tolerate blank lines between responses |
278 | 3 | } |
279 | 3 | break; |
280 | 94 | } |
281 | 94 | line.push(byte[0]); |
282 | 94 | if line.len() > 64 * 1024 * 1024 { |
283 | 0 | return Err(make_err!( |
284 | 0 | Code::OutOfRange, |
285 | 0 | "Persistent worker WorkResponse JSON line exceeds 64 MiB cap" |
286 | 0 | )); |
287 | 94 | } |
288 | | } |
289 | 3 | WorkResponse::decode_framed(&line, WireFormat::Json) |
290 | | } |
291 | | } |
292 | 3 | } |
293 | | |
294 | | /// Read a protobuf varint from the reader. Reused for the length prefix of a |
295 | | /// length-delimited frame. |
296 | 0 | async fn read_varint(r: &mut BufReader<ChildStdout>) -> Result<usize, Error> { |
297 | 0 | let mut result: u64 = 0; |
298 | 0 | for shift in (0..64).step_by(7) { |
299 | 0 | let mut byte = [0u8; 1]; |
300 | 0 | let n = r |
301 | 0 | .read(&mut byte) |
302 | 0 | .await |
303 | 0 | .err_tip(|| "Reading varint byte from persistent worker stdout")?; |
304 | 0 | if n == 0 { |
305 | 0 | return Err(make_err!( |
306 | 0 | Code::Aborted, |
307 | 0 | "Persistent worker closed stdout while reading varint" |
308 | 0 | )); |
309 | 0 | } |
310 | 0 | result |= u64::from(byte[0] & 0x7f) << shift; |
311 | 0 | if byte[0] & 0x80 == 0 { |
312 | 0 | return usize::try_from(result).map_err(|_| { |
313 | 0 | make_err!( |
314 | 0 | Code::OutOfRange, |
315 | | "Varint length {result} does not fit in usize" |
316 | | ) |
317 | 0 | }); |
318 | 0 | } |
319 | | } |
320 | 0 | Err(make_err!( |
321 | 0 | Code::OutOfRange, |
322 | 0 | "Varint did not terminate within 10 bytes" |
323 | 0 | )) |
324 | 0 | } |
325 | | |
326 | | #[cfg(test)] |
327 | | mod tests { |
328 | | use std::io::Write as _; |
329 | | |
330 | | use nativelink_macro::nativelink_test; |
331 | | |
332 | | use super::*; |
333 | | |
334 | | struct TestWorkerProgram { |
335 | | executable: PathBuf, |
336 | | startup_args: Vec<String>, |
337 | | } |
338 | | |
339 | | impl TestWorkerProgram { |
340 | 4 | fn startup_args(&self) -> &[String] { |
341 | 4 | &self.startup_args |
342 | 4 | } |
343 | | } |
344 | | |
345 | | #[cfg(unix)] |
346 | 4 | fn echo_script(working_dir: &Path, unix_body: &str, _windows_body: &str) -> TestWorkerProgram { |
347 | 4 | let path = working_dir.join("worker.sh"); |
348 | 4 | let mut file = std::fs::File::create(&path).unwrap(); |
349 | 4 | file.write_all(unix_body.as_bytes()).unwrap(); |
350 | 4 | file.sync_all().unwrap(); |
351 | 4 | drop(file); |
352 | | |
353 | 4 | TestWorkerProgram { |
354 | 4 | executable: PathBuf::from("/bin/sh"), |
355 | 4 | startup_args: vec![path.display().to_string()], |
356 | 4 | } |
357 | 4 | } |
358 | | |
359 | | #[cfg(windows)] |
360 | | fn echo_script(working_dir: &Path, _unix_body: &str, windows_body: &str) -> TestWorkerProgram { |
361 | | let path = working_dir.join("worker.ps1"); |
362 | | let mut file = std::fs::File::create(&path).unwrap(); |
363 | | file.write_all(windows_body.as_bytes()).unwrap(); |
364 | | file.sync_all().unwrap(); |
365 | | drop(file); |
366 | | |
367 | | TestWorkerProgram { |
368 | | executable: PathBuf::from("powershell.exe"), |
369 | | startup_args: vec![ |
370 | | "-NoProfile".to_owned(), |
371 | | "-ExecutionPolicy".to_owned(), |
372 | | "Bypass".to_owned(), |
373 | | "-File".to_owned(), |
374 | | path.display().to_string(), |
375 | | ], |
376 | | } |
377 | | } |
378 | | |
379 | | #[nativelink_test] |
380 | | async fn shutdown_kills_unresponsive_worker() { |
381 | | // A worker that never reads/writes — shutdown grace expires, we SIGKILL. |
382 | | let dir = tempfile::tempdir().unwrap(); |
383 | | let script = echo_script(dir.path(), "exec sleep 30\n", "Start-Sleep -Seconds 30\n"); |
384 | | let worker = LiveWorker::spawn( |
385 | | &script.executable, |
386 | | script.startup_args(), |
387 | | WireFormat::Json, |
388 | | dir.path(), |
389 | | ) |
390 | | .await |
391 | | .unwrap(); |
392 | | let start = Instant::now(); |
393 | | worker.shutdown(Duration::from_millis(100)).await; |
394 | | // SIGKILL should arrive well within a second. |
395 | | assert!(start.elapsed() < Duration::from_secs(2)); |
396 | | } |
397 | | |
398 | | #[nativelink_test] |
399 | | async fn dispatch_json_round_trip() { |
400 | | // A worker scripted to read one JSON line and echo a canned response. |
401 | | let dir = tempfile::tempdir().unwrap(); |
402 | | let script = echo_script( |
403 | | dir.path(), |
404 | | // Read one line, ignore it, emit a fixed response. Newline-terminated. |
405 | | "read line\necho '{\"exitCode\":0,\"output\":\"ok\"}'\n", |
406 | | "$line = [Console]::In.ReadLine()\n[Console]::Out.WriteLine('{\"exitCode\":0,\"output\":\"ok\"}')\n", |
407 | | ); |
408 | | let mut worker = LiveWorker::spawn( |
409 | | &script.executable, |
410 | | script.startup_args(), |
411 | | WireFormat::Json, |
412 | | dir.path(), |
413 | | ) |
414 | | .await |
415 | | .unwrap(); |
416 | | |
417 | | let req = WorkRequest { |
418 | | arguments: vec!["compile".into()], |
419 | | ..WorkRequest::default() |
420 | | }; |
421 | | let resp = worker.dispatch(&req).await.unwrap(); |
422 | | assert_eq!(resp.exit_code, 0); |
423 | | assert_eq!(resp.output, "ok"); |
424 | | assert_eq!(worker.request_count(), 1); |
425 | | |
426 | | worker.shutdown(Duration::from_secs(1)).await; |
427 | | } |
428 | | |
429 | | #[nativelink_test] |
430 | | async fn dispatch_timeout_kills_worker() { |
431 | | let dir = tempfile::tempdir().unwrap(); |
432 | | // Worker reads, then sleeps forever instead of responding. |
433 | | let script = echo_script( |
434 | | dir.path(), |
435 | | "read line\nexec sleep 60\n", |
436 | | "$line = [Console]::In.ReadLine()\nStart-Sleep -Seconds 60\n", |
437 | | ); |
438 | | let mut worker = LiveWorker::spawn( |
439 | | &script.executable, |
440 | | script.startup_args(), |
441 | | WireFormat::Json, |
442 | | dir.path(), |
443 | | ) |
444 | | .await |
445 | | .unwrap(); |
446 | | |
447 | | let req = WorkRequest { |
448 | | arguments: vec!["compile".into()], |
449 | | ..WorkRequest::default() |
450 | | }; |
451 | | let result = worker |
452 | | .dispatch_with_timeout(&req, Duration::from_millis(100)) |
453 | | .await; |
454 | | assert!(result.is_err()); |
455 | | assert_eq!(result.unwrap_err().code, Code::DeadlineExceeded); |
456 | | assert!(worker.is_dead()); |
457 | | } |
458 | | |
459 | | #[nativelink_test] |
460 | | async fn rejects_multiplex_request_id() { |
461 | | let dir = tempfile::tempdir().unwrap(); |
462 | | let script = echo_script( |
463 | | dir.path(), |
464 | | "read line\necho '{\"exitCode\":0}'\n", |
465 | | "$line = [Console]::In.ReadLine()\n[Console]::Out.WriteLine('{\"exitCode\":0}')\n", |
466 | | ); |
467 | | let mut worker = LiveWorker::spawn( |
468 | | &script.executable, |
469 | | script.startup_args(), |
470 | | WireFormat::Json, |
471 | | dir.path(), |
472 | | ) |
473 | | .await |
474 | | .unwrap(); |
475 | | let req = WorkRequest { |
476 | | request_id: 7, |
477 | | ..WorkRequest::default() |
478 | | }; |
479 | | let err = worker.dispatch(&req).await.unwrap_err(); |
480 | | assert_eq!(err.code, Code::InvalidArgument); |
481 | | worker.shutdown(Duration::from_secs(1)).await; |
482 | | } |
483 | | } |