Coverage Report

Created: 2026-05-23 21:09

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-worker/src/persistent_worker/live_worker.rs
Line
Count
Source
1
// Copyright 2024 Trace Machina, Inc. All rights reserved.
2
//
3
// Licensed under the Business Source License, Version 1.1 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may requested a copy of the License by emailing contact@nativelink.com.
6
//
7
// Use of this module requires an enterprise license agreement, which can be
8
// attained by emailing contact@nativelink.com or signing up for Nativelink
9
// Cloud at app.nativelink.com.
10
//
11
// Unless required by applicable law or agreed to in writing, software
12
// distributed under the License is distributed on an "AS IS" BASIS,
13
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
// See the License for the specific language governing permissions and
15
// limitations under the License.
16
17
//! `LiveWorker`: one long-lived persistent-worker child process.
18
//!
19
//! A `LiveWorker` wraps a child process started with the action's tool +
20
//! startup-flag prefix + `--persistent_worker`. It owns the child's stdin/stdout
21
//! and exposes `dispatch(request) -> response` for the pool to invoke.
22
//!
23
//! Lifecycle:
24
//! - `spawn` → ready, never-used
25
//! - `dispatch` flips an in-flight bool, writes the request, reads the response,
26
//!   updates `last_used` and `request_count`
27
//! - `drop` sends SIGTERM, waits a grace period, then SIGKILL
28
//!
29
//! v1 is single-request-per-worker (no multiplex). The pool serializes
30
//! concurrent acquires of the same worker via its data structure.
31
32
use core::time::Duration;
33
use std::path::{Path, PathBuf};
34
use std::process::Stdio;
35
use std::time::Instant;
36
37
use bytes::BytesMut;
38
use nativelink_error::{Code, Error, ResultExt, make_err};
39
use prost::Message as ProstMessage;
40
use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader};
41
use tokio::process::{Child, ChildStdin, ChildStdout, Command};
42
use tracing::{debug, warn};
43
44
use super::protocol::{WireFormat, WorkRequest, WorkResponse};
45
46
/// Default time we wait for a single `WorkResponse` after writing a request.
47
/// Beyond this we kill the worker and surface a `DeadlineExceeded` error.
48
/// Callers may override via `LiveWorker::dispatch_with_timeout`.
49
const DEFAULT_DISPATCH_TIMEOUT: Duration = Duration::from_secs(60 * 10);
50
51
/// One persistent-worker child process.
52
#[derive(Debug)]
53
pub struct LiveWorker {
54
    /// Spawned child process. Kept on the struct so its Drop kills the process
55
    /// when the worker is dropped without `shutdown` being called.
56
    child: Child,
57
    stdin: ChildStdin,
58
    stdout: BufReader<ChildStdout>,
59
    /// Wire format we negotiated at spawn (immutable for this worker's lifetime).
60
    wire_format: WireFormat,
61
    /// Number of successful `dispatch` calls completed.
62
    request_count: u64,
63
    /// When the last `dispatch` finished. Used by the pool's idle eviction.
64
    last_used: Instant,
65
    /// Working directory the child runs in. Stored so the pool can reuse it
66
    /// across requests for the same `WorkerKey` until per-request sandboxing
67
    /// arrives in v2.
68
    working_dir: PathBuf,
69
}
70
71
impl LiveWorker {
72
    /// Spawn a new persistent worker.
73
    ///
74
    /// `executable` is the resolved tool path (absolute, or relative to
75
    /// `working_dir`). `startup_args` are the flags that the worker is launched
76
    /// with once — these become part of the `WorkerKey`. The Bazel-conventional
77
    /// `--persistent_worker` flag is appended automatically.
78
8
    pub async fn spawn(
79
8
        executable: &Path,
80
8
        startup_args: &[String],
81
8
        wire_format: WireFormat,
82
8
        working_dir: &Path,
83
8
    ) -> Result<Self, Error> {
84
8
        let mut cmd = Command::new(executable);
85
8
        cmd.args(startup_args)
86
8
            .arg("--persistent_worker")
87
8
            .current_dir(working_dir)
88
8
            .stdin(Stdio::piped())
89
8
            .stdout(Stdio::piped())
90
8
            // WorkResponse.output carries per-action diagnostics. The child
91
8
            // stderr stream is process-lifetime data and cannot be attributed
92
8
            // safely to a single request.
93
8
            .stderr(Stdio::null())
94
8
            .kill_on_drop(true);
95
96
8
        debug!(
97
            ?executable,
98
            ?startup_args,
99
            ?wire_format,
100
            "Spawning persistent worker"
101
        );
102
103
8
        let mut child = cmd.spawn().err_tip(|| 
{0
104
0
            format!(
105
                "Spawning persistent worker {} with args {startup_args:?}",
106
0
                executable.display()
107
            )
108
0
        })?;
109
8
        let stdin = child
110
8
            .stdin
111
8
            .take()
112
8
            .ok_or_else(|| 
make_err!0
(
Code::Internal0
, "Persistent worker child has no stdin"))
?0
;
113
8
        let stdout = child
114
8
            .stdout
115
8
            .take()
116
8
            .ok_or_else(|| 
make_err!0
(
Code::Internal0
, "Persistent worker child has no stdout"))
?0
;
117
118
8
        Ok(Self {
119
8
            child,
120
8
            stdin,
121
8
            stdout: BufReader::new(stdout),
122
8
            wire_format,
123
8
            request_count: 0,
124
8
            last_used: Instant::now(),
125
8
            working_dir: working_dir.to_path_buf(),
126
8
        })
127
8
    }
128
129
0
    pub const fn wire_format(&self) -> WireFormat {
130
0
        self.wire_format
131
0
    }
132
133
7
    pub const fn request_count(&self) -> u64 {
134
7
        self.request_count
135
7
    }
136
137
0
    pub const fn last_used(&self) -> Instant {
138
0
        self.last_used
139
0
    }
140
141
0
    pub fn working_dir(&self) -> &Path {
142
0
        &self.working_dir
143
0
    }
144
145
    /// Returns true if the child process has already exited.
146
2
    pub fn is_dead(&mut self) -> bool {
147
2
        
matches!1
(self.child.try_wait(), Ok(Some(_)) | Err(_))
148
2
    }
149
150
    /// Send a single `WorkRequest` and read the matching `WorkResponse`. On
151
    /// any I/O error or response framing error, the worker is considered
152
    /// dead — the caller (pool) must not return it to the idle set.
153
2
    pub async fn dispatch(&mut self, request: &WorkRequest) -> Result<WorkResponse, Error> {
154
2
        self.dispatch_with_timeout(request, DEFAULT_DISPATCH_TIMEOUT)
155
2
            .await
156
2
    }
157
158
3
    pub async fn dispatch_with_timeout(
159
3
        &mut self,
160
3
        request: &WorkRequest,
161
3
        timeout: Duration,
162
5
    ) -> Result<WorkResponse, Error> {
163
5
        if request.request_id != 0 {
164
1
            return Err(make_err!(
165
1
                Code::InvalidArgument,
166
1
                "v1 persistent workers do not support multiplex; request_id must be 0, got {}",
167
1
                request.request_id
168
1
            ));
169
4
        }
170
171
4
        let bytes = request.encode_framed(self.wire_format)
?0
;
172
4
        self.stdin
173
4
            .write_all(&bytes)
174
4
            .await
175
4
            .err_tip(|| "Writing WorkRequest to persistent worker stdin")
?0
;
176
4
        self.stdin
177
4
            .flush()
178
4
            .await
179
4
            .err_tip(|| "Flushing persistent worker stdin")
?0
;
180
181
4
        let read_fut = read_response(&mut self.stdout, self.wire_format);
182
4
        let 
response3
= if let Ok(
result3
) = tokio::time::timeout(timeout, read_fut).await {
183
3
            result
?0
184
        } else {
185
1
            warn!(
186
                ?timeout,
187
                "Persistent worker did not respond before deadline; killing child"
188
            );
189
1
            drop(self.child.kill().await);
190
1
            return Err(make_err!(
191
1
                Code::DeadlineExceeded,
192
1
                "Persistent worker did not respond within {timeout:?}"
193
1
            ));
194
        };
195
196
3
        if response.request_id != 0 {
197
            // v1 contract: workers MUST echo request_id 0. If a tool ever
198
            // produces a multiplex-style id we don't know what to do with the
199
            // response — fail conservatively rather than misroute output.
200
0
            return Err(make_err!(
201
0
                Code::Internal,
202
0
                "Persistent worker returned non-zero request_id={}; v1 does not support multiplex",
203
0
                response.request_id
204
0
            ));
205
3
        }
206
207
3
        self.request_count += 1;
208
3
        self.last_used = Instant::now();
209
3
        Ok(response)
210
5
    }
211
212
    /// Gracefully drain: close stdin so the worker observes EOF and exits on
213
    /// its own, then wait up to `grace`, then SIGKILL if still alive. Always
214
    /// consumes the worker.
215
6
    pub async fn shutdown(mut self, grace: Duration) {
216
        // Dropping stdin closes the pipe, which most well-behaved workers
217
        // interpret as "no more work, please exit".
218
6
        drop(self.stdin);
219
220
6
        match tokio::time::timeout(grace, self.child.wait()).await {
221
2
            Ok(Ok(status)) => debug!(?status, "Persistent worker exited cleanly"),
222
0
            Ok(Err(err)) => warn!(?err, "Error waiting for persistent worker exit"),
223
            Err(_) => {
224
3
                warn!(
225
                    ?grace,
226
                    "Persistent worker did not exit within grace; SIGKILLing"
227
                );
228
3
                drop(self.child.kill().await);
229
            }
230
        }
231
5
    }
232
}
233
234
/// Read one framed `WorkResponse` from the worker's stdout.
235
4
async fn read_response(
236
4
    stdout: &mut BufReader<ChildStdout>,
237
4
    format: WireFormat,
238
4
) -> Result<WorkResponse, Error> {
239
4
    match format {
240
        WireFormat::Proto => {
241
            // Length-delimited varint frame. Decode the varint manually because
242
            // we want to read exactly the right number of bytes without buffering
243
            // arbitrary data from the worker's pipe.
244
0
            let len = read_varint(stdout).await?;
245
0
            if len > 64 * 1024 * 1024 {
246
0
                return Err(make_err!(
247
0
                    Code::OutOfRange,
248
0
                    "Persistent worker WorkResponse length {len} exceeds 64 MiB cap"
249
0
                ));
250
0
            }
251
0
            let mut buf = BytesMut::with_capacity(len);
252
0
            buf.resize(len, 0);
253
0
            stdout
254
0
                .read_exact(&mut buf)
255
0
                .await
256
0
                .err_tip(|| "Reading WorkResponse proto body from persistent worker")?;
257
0
            <WorkResponse as ProstMessage>::decode(&buf[..])
258
0
                .map_err(|e| make_err!(Code::Internal, "Decoding WorkResponse proto body: {e}"))
259
        }
260
        WireFormat::Json => {
261
            // Newline-delimited JSON.
262
4
            let mut line = Vec::with_capacity(256);
263
            loop {
264
98
                let mut byte = [0u8; 1];
265
98
                let 
n97
= stdout
266
98
                    .read(&mut byte)
267
98
                    .await
268
97
                    .err_tip(|| "Reading WorkResponse JSON byte from persistent worker")
?0
;
269
97
                if n == 0 {
270
0
                    return Err(make_err!(
271
0
                        Code::Aborted,
272
0
                        "Persistent worker closed stdout before sending a full JSON response"
273
0
                    ));
274
97
                }
275
97
                if byte[0] == b'\n' {
276
3
                    if line.is_empty() {
277
0
                        continue; // tolerate blank lines between responses
278
3
                    }
279
3
                    break;
280
94
                }
281
94
                line.push(byte[0]);
282
94
                if line.len() > 64 * 1024 * 1024 {
283
0
                    return Err(make_err!(
284
0
                        Code::OutOfRange,
285
0
                        "Persistent worker WorkResponse JSON line exceeds 64 MiB cap"
286
0
                    ));
287
94
                }
288
            }
289
3
            WorkResponse::decode_framed(&line, WireFormat::Json)
290
        }
291
    }
292
3
}
293
294
/// Read a protobuf varint from the reader. Reused for the length prefix of a
295
/// length-delimited frame.
296
0
async fn read_varint(r: &mut BufReader<ChildStdout>) -> Result<usize, Error> {
297
0
    let mut result: u64 = 0;
298
0
    for shift in (0..64).step_by(7) {
299
0
        let mut byte = [0u8; 1];
300
0
        let n = r
301
0
            .read(&mut byte)
302
0
            .await
303
0
            .err_tip(|| "Reading varint byte from persistent worker stdout")?;
304
0
        if n == 0 {
305
0
            return Err(make_err!(
306
0
                Code::Aborted,
307
0
                "Persistent worker closed stdout while reading varint"
308
0
            ));
309
0
        }
310
0
        result |= u64::from(byte[0] & 0x7f) << shift;
311
0
        if byte[0] & 0x80 == 0 {
312
0
            return usize::try_from(result).map_err(|_| {
313
0
                make_err!(
314
0
                    Code::OutOfRange,
315
                    "Varint length {result} does not fit in usize"
316
                )
317
0
            });
318
0
        }
319
    }
320
0
    Err(make_err!(
321
0
        Code::OutOfRange,
322
0
        "Varint did not terminate within 10 bytes"
323
0
    ))
324
0
}
325
326
#[cfg(test)]
327
mod tests {
328
    use std::io::Write as _;
329
330
    use nativelink_macro::nativelink_test;
331
332
    use super::*;
333
334
    struct TestWorkerProgram {
335
        executable: PathBuf,
336
        startup_args: Vec<String>,
337
    }
338
339
    impl TestWorkerProgram {
340
4
        fn startup_args(&self) -> &[String] {
341
4
            &self.startup_args
342
4
        }
343
    }
344
345
    #[cfg(unix)]
346
4
    fn echo_script(working_dir: &Path, unix_body: &str, _windows_body: &str) -> TestWorkerProgram {
347
4
        let path = working_dir.join("worker.sh");
348
4
        let mut file = std::fs::File::create(&path).unwrap();
349
4
        file.write_all(unix_body.as_bytes()).unwrap();
350
4
        file.sync_all().unwrap();
351
4
        drop(file);
352
353
4
        TestWorkerProgram {
354
4
            executable: PathBuf::from("/bin/sh"),
355
4
            startup_args: vec![path.display().to_string()],
356
4
        }
357
4
    }
358
359
    #[cfg(windows)]
360
    fn echo_script(working_dir: &Path, _unix_body: &str, windows_body: &str) -> TestWorkerProgram {
361
        let path = working_dir.join("worker.ps1");
362
        let mut file = std::fs::File::create(&path).unwrap();
363
        file.write_all(windows_body.as_bytes()).unwrap();
364
        file.sync_all().unwrap();
365
        drop(file);
366
367
        TestWorkerProgram {
368
            executable: PathBuf::from("powershell.exe"),
369
            startup_args: vec![
370
                "-NoProfile".to_owned(),
371
                "-ExecutionPolicy".to_owned(),
372
                "Bypass".to_owned(),
373
                "-File".to_owned(),
374
                path.display().to_string(),
375
            ],
376
        }
377
    }
378
379
    #[nativelink_test]
380
    async fn shutdown_kills_unresponsive_worker() {
381
        // A worker that never reads/writes — shutdown grace expires, we SIGKILL.
382
        let dir = tempfile::tempdir().unwrap();
383
        let script = echo_script(dir.path(), "exec sleep 30\n", "Start-Sleep -Seconds 30\n");
384
        let worker = LiveWorker::spawn(
385
            &script.executable,
386
            script.startup_args(),
387
            WireFormat::Json,
388
            dir.path(),
389
        )
390
        .await
391
        .unwrap();
392
        let start = Instant::now();
393
        worker.shutdown(Duration::from_millis(100)).await;
394
        // SIGKILL should arrive well within a second.
395
        assert!(start.elapsed() < Duration::from_secs(2));
396
    }
397
398
    #[nativelink_test]
399
    async fn dispatch_json_round_trip() {
400
        // A worker scripted to read one JSON line and echo a canned response.
401
        let dir = tempfile::tempdir().unwrap();
402
        let script = echo_script(
403
            dir.path(),
404
            // Read one line, ignore it, emit a fixed response. Newline-terminated.
405
            "read line\necho '{\"exitCode\":0,\"output\":\"ok\"}'\n",
406
            "$line = [Console]::In.ReadLine()\n[Console]::Out.WriteLine('{\"exitCode\":0,\"output\":\"ok\"}')\n",
407
        );
408
        let mut worker = LiveWorker::spawn(
409
            &script.executable,
410
            script.startup_args(),
411
            WireFormat::Json,
412
            dir.path(),
413
        )
414
        .await
415
        .unwrap();
416
417
        let req = WorkRequest {
418
            arguments: vec!["compile".into()],
419
            ..WorkRequest::default()
420
        };
421
        let resp = worker.dispatch(&req).await.unwrap();
422
        assert_eq!(resp.exit_code, 0);
423
        assert_eq!(resp.output, "ok");
424
        assert_eq!(worker.request_count(), 1);
425
426
        worker.shutdown(Duration::from_secs(1)).await;
427
    }
428
429
    #[nativelink_test]
430
    async fn dispatch_timeout_kills_worker() {
431
        let dir = tempfile::tempdir().unwrap();
432
        // Worker reads, then sleeps forever instead of responding.
433
        let script = echo_script(
434
            dir.path(),
435
            "read line\nexec sleep 60\n",
436
            "$line = [Console]::In.ReadLine()\nStart-Sleep -Seconds 60\n",
437
        );
438
        let mut worker = LiveWorker::spawn(
439
            &script.executable,
440
            script.startup_args(),
441
            WireFormat::Json,
442
            dir.path(),
443
        )
444
        .await
445
        .unwrap();
446
447
        let req = WorkRequest {
448
            arguments: vec!["compile".into()],
449
            ..WorkRequest::default()
450
        };
451
        let result = worker
452
            .dispatch_with_timeout(&req, Duration::from_millis(100))
453
            .await;
454
        assert!(result.is_err());
455
        assert_eq!(result.unwrap_err().code, Code::DeadlineExceeded);
456
        assert!(worker.is_dead());
457
    }
458
459
    #[nativelink_test]
460
    async fn rejects_multiplex_request_id() {
461
        let dir = tempfile::tempdir().unwrap();
462
        let script = echo_script(
463
            dir.path(),
464
            "read line\necho '{\"exitCode\":0}'\n",
465
            "$line = [Console]::In.ReadLine()\n[Console]::Out.WriteLine('{\"exitCode\":0}')\n",
466
        );
467
        let mut worker = LiveWorker::spawn(
468
            &script.executable,
469
            script.startup_args(),
470
            WireFormat::Json,
471
            dir.path(),
472
        )
473
        .await
474
        .unwrap();
475
        let req = WorkRequest {
476
            request_id: 7,
477
            ..WorkRequest::default()
478
        };
479
        let err = worker.dispatch(&req).await.unwrap_err();
480
        assert_eq!(err.code, Code::InvalidArgument);
481
        worker.shutdown(Duration::from_secs(1)).await;
482
    }
483
}