Coverage Report

Created: 2026-05-23 21:09

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-worker/src/persistent_worker/pool.rs
Line
Count
Source
1
// Copyright 2024 Trace Machina, Inc. All rights reserved.
2
//
3
// Licensed under the Business Source License, Version 1.1 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may requested a copy of the License by emailing contact@nativelink.com.
6
//
7
// Use of this module requires an enterprise license agreement, which can be
8
// attained by emailing contact@nativelink.com or signing up for Nativelink
9
// Cloud at app.nativelink.com.
10
//
11
// Unless required by applicable law or agreed to in writing, software
12
// distributed under the License is distributed on an "AS IS" BASIS,
13
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
// See the License for the specific language governing permissions and
15
// limitations under the License.
16
17
//! `PersistentWorkerPool`: per-`LocalWorker` pool of `LiveWorker`s keyed by
18
//! `WorkerKey`. Mirrors Bazel's notion of `WorkerKey` — two actions whose
19
//! executable + startup-flag prefix + wire format are identical share a worker
20
//! process.
21
22
use core::mem;
23
use core::time::Duration;
24
use std::collections::HashMap;
25
use std::path::{Path, PathBuf};
26
use std::sync::Arc;
27
28
use nativelink_error::{Code, Error, ResultExt, make_err};
29
use nativelink_util::background_spawn;
30
use parking_lot::Mutex;
31
use tracing::{debug, info, warn};
32
33
use super::live_worker::LiveWorker;
34
use super::protocol::WireFormat;
35
36
/// Default per-key bound on live workers. Matches Bazel's `worker_max_instances`
37
/// default and keeps memory pressure bounded without operator intervention.
38
pub const DEFAULT_MAX_WORKERS_PER_KEY: usize = 4;
39
40
/// Default idle timeout. A worker that has not handled a request for this
41
/// duration is shut down by the background sweeper.
42
pub const DEFAULT_IDLE_TIMEOUT: Duration = Duration::from_secs(60 * 5);
43
44
/// Default per-worker request cap. After this many `dispatch` calls a worker is
45
/// dropped on `release` (helps recycle accumulated JVM state).
46
pub const DEFAULT_MAX_REQUESTS_PER_WORKER: u64 = 200;
47
48
/// Identity by which two persistent-worker actions are considered compatible.
49
/// Same `WorkerKey` => same worker can serve both.
50
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
51
pub struct WorkerKey {
52
    /// Resolved tool executable. Mirrors `Action.arguments[0]`.
53
    pub executable: PathBuf,
54
    /// Leading arguments that become tool startup flags. Computed as the prefix
55
    /// of `Action.arguments[1..]` up to (excluding) the first arg starting
56
    /// with `@` (Bazel's response-file convention).
57
    pub startup_args: Vec<String>,
58
    /// Wire format from the action's `requires-worker-protocol`.
59
    pub wire_format: WireFormat,
60
}
61
62
impl WorkerKey {
63
    /// Derive a `WorkerKey` from a full action `argv` and the negotiated wire
64
    /// format.
65
    ///
66
    /// `argv[0]` is the executable. Subsequent arguments are scanned
67
    /// left-to-right and treated as startup flags until the first
68
    /// `@`-prefixed argument (Bazel's response file), at which point the rest
69
    /// are considered per-request inputs and excluded from the key.
70
12
    pub fn from_argv(argv: &[String], wire_format: WireFormat) -> Result<Self, Error> {
71
12
        let (
exe11
,
rest11
) = argv.split_first().ok_or_else(||
{1
72
1
            make_err!(
73
1
                Code::InvalidArgument,
74
                "Cannot derive WorkerKey from empty argument list"
75
            )
76
1
        })?;
77
11
        let startup_args: Vec<String> = rest
78
11
            .iter()
79
22
            .
take_while11
(|a| !a.starts_with('@'))
80
11
            .cloned()
81
11
            .collect();
82
11
        Ok(Self {
83
11
            executable: PathBuf::from(exe),
84
11
            startup_args,
85
11
            wire_format,
86
11
        })
87
12
    }
88
}
89
90
/// Configuration knobs for a `PersistentWorkerPool`. Defaults are sensible for
91
/// a JVM-heavy workload; operators can override in `nativelink-config`.
92
#[derive(Clone, Copy, Debug)]
93
pub struct PoolConfig {
94
    pub max_workers_per_key: usize,
95
    pub idle_timeout: Duration,
96
    pub max_requests_per_worker: u64,
97
    /// SIGKILL grace when shutting a worker down.
98
    pub shutdown_grace: Duration,
99
}
100
101
impl Default for PoolConfig {
102
34
    fn default() -> Self {
103
34
        Self {
104
34
            max_workers_per_key: DEFAULT_MAX_WORKERS_PER_KEY,
105
34
            idle_timeout: DEFAULT_IDLE_TIMEOUT,
106
34
            max_requests_per_worker: DEFAULT_MAX_REQUESTS_PER_WORKER,
107
34
            shutdown_grace: Duration::from_secs(5),
108
34
        }
109
34
    }
110
}
111
112
/// Reservation handle returned by `acquire`. Holding it removes the worker
113
/// from the pool's idle set; `release` returns it (unless the request marked
114
/// it as dead). Drop-without-release is treated as "worker is dead": it is
115
/// shut down rather than re-pooled.
116
#[derive(Debug)]
117
pub struct Lease {
118
    inner: Option<LeaseInner>,
119
}
120
121
#[derive(Debug)]
122
struct LeaseInner {
123
    worker: LiveWorker,
124
    pool: Arc<PoolInner>,
125
    key: WorkerKey,
126
}
127
128
#[derive(Debug)]
129
struct CountSlotGuard {
130
    pool: Arc<PoolInner>,
131
    key: WorkerKey,
132
    active: bool,
133
}
134
135
impl CountSlotGuard {
136
7
    const fn new(pool: Arc<PoolInner>, key: WorkerKey) -> Self {
137
7
        Self {
138
7
            pool,
139
7
            key,
140
7
            active: true,
141
7
        }
142
7
    }
143
144
4
    fn disarm(mut self) {
145
4
        self.active = false;
146
4
    }
147
148
2
    fn decrement_now(mut self) {
149
2
        if self.active {
150
2
            self.pool.decrement_count(&self.key);
151
2
            self.active = false;
152
2
        
}0
153
2
    }
154
}
155
156
impl Drop for CountSlotGuard {
157
7
    fn drop(&mut self) {
158
7
        if self.active {
159
1
            self.pool.decrement_count(&self.key);
160
6
        }
161
7
    }
162
}
163
164
impl Lease {
165
2
    pub const fn worker(&mut self) -> &mut LiveWorker {
166
2
        &mut self
167
2
            .inner
168
2
            .as_mut()
169
2
            .expect("lease used after release")
170
2
            .worker
171
2
    }
172
173
    /// Return the worker to the pool. Caller signals whether the worker is
174
    /// still healthy. Unhealthy workers are shut down asynchronously and not
175
    /// returned to the idle set.
176
5
    
pub async fn release(mut self, healthy: bool)3
{
177
5
        let Some(inner) = self.inner.take() else {
178
0
            return;
179
        };
180
5
        inner
181
5
            .pool
182
5
            .return_worker(inner.key, inner.worker, healthy)
183
5
            .await;
184
4
    }
185
}
186
187
impl Drop for Lease {
188
5
    fn drop(&mut self) {
189
        // If the caller forgot to call `release`, treat the worker as
190
        // unhealthy and shut it down on a detached task.
191
5
        if let Some(
inner0
) = self.inner.take() {
192
0
            let pool = inner.pool.clone();
193
0
            let key = inner.key.clone();
194
0
            let worker = inner.worker;
195
0
            let grace = pool.config.shutdown_grace;
196
0
            background_spawn!("persistent_worker_lease_drop", async move {
197
0
                let count_slot = CountSlotGuard::new(pool.clone(), key.clone());
198
0
                warn!(
199
                    ?key,
200
                    "Lease dropped without release; treating worker as unhealthy"
201
                );
202
0
                worker.shutdown(grace).await;
203
0
                count_slot.decrement_now();
204
0
            });
205
5
        }
206
5
    }
207
}
208
209
/// Per-key pool state. Wrapped in a `Mutex` because `acquire` and `release`
210
/// run on different tokio tasks.
211
#[derive(Debug)]
212
struct KeyState {
213
    /// Idle workers ready to serve. Always size <= `total_count`.
214
    idle: Vec<LiveWorker>,
215
    /// Total live workers for this key (idle + leased). Used to enforce
216
    /// `max_workers_per_key`.
217
    total_count: usize,
218
}
219
220
impl KeyState {
221
2
    const fn new() -> Self {
222
2
        Self {
223
2
            idle: Vec::new(),
224
2
            total_count: 0,
225
2
        }
226
2
    }
227
}
228
229
#[derive(Debug)]
230
struct PoolInner {
231
    config: PoolConfig,
232
    state: Mutex<HashMap<WorkerKey, KeyState>>,
233
}
234
235
impl PoolInner {
236
    /// Helper used by Drop on a forgotten Lease.
237
3
    fn decrement_count(&self, key: &WorkerKey) {
238
3
        let mut state = self.state.lock();
239
3
        if let Some(s) = state.get_mut(key) {
240
3
            s.total_count = s.total_count.saturating_sub(1);
241
3
        
}0
242
3
    }
243
244
5
    async fn return_worker(self: Arc<Self>, key: WorkerKey, worker: LiveWorker, healthy: bool) {
245
5
        let exceeded_cap = worker.request_count() >= self.config.max_requests_per_worker;
246
5
        if !healthy || 
exceeded_cap2
{
247
3
            let count_slot = CountSlotGuard::new(self.clone(), key.clone());
248
3
            debug!(
249
                ?key,
250
3
                request_count = worker.request_count(),
251
                healthy,
252
                "Retiring persistent worker"
253
            );
254
3
            worker.shutdown(self.config.shutdown_grace).await;
255
2
            count_slot.decrement_now();
256
2
            return;
257
2
        }
258
2
        let mut state = self.state.lock();
259
2
        let entry = state.entry(key).or_insert_with(KeyState::new);
260
2
        entry.idle.push(worker);
261
4
    }
262
}
263
264
/// The pool itself.
265
#[derive(Clone, Debug)]
266
pub struct PersistentWorkerPool {
267
    inner: Arc<PoolInner>,
268
}
269
270
impl Default for PersistentWorkerPool {
271
32
    fn default() -> Self {
272
32
        Self::new(PoolConfig::default())
273
32
    }
274
}
275
276
impl PersistentWorkerPool {
277
34
    pub fn new(config: PoolConfig) -> Self {
278
34
        Self {
279
34
            inner: Arc::new(PoolInner {
280
34
                config,
281
34
                state: Mutex::new(HashMap::new()),
282
34
            }),
283
34
        }
284
34
    }
285
286
    /// Acquire a worker for `key`. If an idle worker is available it is
287
    /// returned. Otherwise a new one is spawned, capped at
288
    /// `max_workers_per_key`. If the cap is reached and no idle worker exists,
289
    /// returns `Err(Code::ResourceExhausted)` — the caller should fall back to
290
    /// the one-shot subprocess path or wait and retry.
291
    ///
292
    /// `working_dir` is a stable worker process directory. Per-request action
293
    /// directories are passed through `WorkRequest::sandbox_dir`, because the
294
    /// action directory is deleted after each action completes.
295
4
    pub async fn acquire(
296
4
        &self,
297
4
        key: WorkerKey,
298
4
        executable_path: &Path,
299
4
        working_dir: &Path,
300
6
    ) -> Result<Lease, Error> {
301
        // First, try to take an idle worker without blocking on spawn.
302
        {
303
6
            let mut state = self.inner.state.lock();
304
6
            let entry = state.entry(key.clone()).or_insert_with(KeyState::new);
305
6
            while let Some(
mut worker1
) = entry.idle.pop() {
306
1
                if worker.is_dead() {
307
                    // Process died while idle (e.g. JVM OOM). Drop it.
308
0
                    entry.total_count = entry.total_count.saturating_sub(1);
309
0
                    continue;
310
1
                }
311
1
                return Ok(Lease {
312
1
                    inner: Some(LeaseInner {
313
1
                        worker,
314
1
                        pool: self.inner.clone(),
315
1
                        key,
316
1
                    }),
317
1
                });
318
            }
319
5
            if entry.total_count >= self.inner.config.max_workers_per_key {
320
1
                return Err(make_err!(
321
1
                    Code::ResourceExhausted,
322
1
                    "Persistent worker pool for {key:?} is at capacity ({} workers)",
323
1
                    self.inner.config.max_workers_per_key
324
1
                ));
325
4
            }
326
            // Reserve a slot before releasing the lock so a concurrent
327
            // acquire can't over-commit.
328
4
            entry.total_count += 1;
329
        }
330
331
        // Lock released; spawn outside the critical section.
332
4
        let count_slot = CountSlotGuard::new(self.inner.clone(), key.clone());
333
4
        let spawn_result = LiveWorker::spawn(
334
4
            executable_path,
335
4
            &key.startup_args,
336
4
            key.wire_format,
337
4
            working_dir,
338
4
        )
339
4
        .await;
340
4
        let worker = match spawn_result {
341
4
            Ok(w) => w,
342
0
            Err(err) => {
343
0
                return Err(err).err_tip(|| format!("Spawning persistent worker for {key:?}"));
344
            }
345
        };
346
4
        count_slot.disarm();
347
4
        info!(?key, "Spawned new persistent worker");
348
4
        Ok(Lease {
349
4
            inner: Some(LeaseInner {
350
4
                worker,
351
4
                pool: self.inner.clone(),
352
4
                key,
353
4
            }),
354
4
        })
355
6
    }
356
357
    /// Drop all idle workers whose `last_used` is older than `idle_timeout`.
358
    /// Intended to be called periodically from a background task.
359
0
    pub async fn sweep_idle(&self) {
360
0
        let threshold = std::time::Instant::now()
361
0
            .checked_sub(self.inner.config.idle_timeout)
362
0
            .unwrap_or_else(std::time::Instant::now);
363
0
        let evicted: Vec<LiveWorker> = {
364
0
            let mut state = self.inner.state.lock();
365
0
            let mut evicted = Vec::new();
366
0
            for entry in state.values_mut() {
367
0
                let (keep, drop_): (Vec<_>, Vec<_>) = mem::take(&mut entry.idle)
368
0
                    .into_iter()
369
0
                    .partition(|w| w.last_used() >= threshold);
370
0
                entry.total_count = entry.total_count.saturating_sub(drop_.len());
371
0
                entry.idle = keep;
372
0
                evicted.extend(drop_);
373
            }
374
0
            evicted
375
        };
376
0
        for w in evicted {
377
0
            w.shutdown(self.inner.config.shutdown_grace).await;
378
        }
379
0
    }
380
}
381
382
#[cfg(test)]
383
mod tests {
384
    #[cfg(unix)]
385
    use std::io::Write as _;
386
387
    #[cfg(unix)]
388
    use nativelink_macro::nativelink_test;
389
390
    use super::*;
391
392
    #[cfg(unix)]
393
2
    fn shell_script(working_dir: &Path, body: &str) -> PathBuf {
394
2
        let path = working_dir.join("worker.sh");
395
2
        let mut file = std::fs::File::create(&path).unwrap();
396
2
        file.write_all(body.as_bytes()).unwrap();
397
2
        file.sync_all().unwrap();
398
2
        path
399
2
    }
400
401
    #[cfg(unix)]
402
2
    fn shell_worker_key(script: &Path) -> WorkerKey {
403
2
        WorkerKey {
404
2
            executable: PathBuf::from("/bin/sh"),
405
2
            startup_args: vec![script.display().to_string()],
406
2
            wire_format: WireFormat::Json,
407
2
        }
408
2
    }
409
410
    #[test]
411
1
    fn worker_key_excludes_argfile_and_later_args() {
412
1
        let argv: Vec<String> = ["javac", "-source", "21", "@args.txt", "Foo.java"]
413
1
            .iter()
414
5
            .
map1
(|s| (*s).to_string())
415
1
            .collect();
416
1
        let key = WorkerKey::from_argv(&argv, WireFormat::Proto).unwrap();
417
1
        assert_eq!(key.executable, PathBuf::from("javac"));
418
1
        assert_eq!(key.startup_args, vec!["-source".to_string(), "21".into()]);
419
1
    }
420
421
    #[test]
422
1
    fn worker_key_handles_empty_startup_args() {
423
1
        let argv: Vec<String> = ["javac", "@args.txt"]
424
1
            .iter()
425
2
            .
map1
(|s| (*s).to_string())
426
1
            .collect();
427
1
        let key = WorkerKey::from_argv(&argv, WireFormat::Json).unwrap();
428
1
        assert!(key.startup_args.is_empty());
429
1
        assert_eq!(key.wire_format, WireFormat::Json);
430
1
    }
431
432
    #[test]
433
1
    fn worker_key_handles_no_argfile() {
434
        // No @argfile means *all* trailing args are startup flags. Unusual but
435
        // legal — the action just doesn't use a response file.
436
1
        let argv: Vec<String> = ["scalac", "-deprecation", "-Xfatal-warnings"]
437
1
            .iter()
438
3
            .
map1
(|s| (*s).to_string())
439
1
            .collect();
440
1
        let key = WorkerKey::from_argv(&argv, WireFormat::Proto).unwrap();
441
1
        assert_eq!(key.startup_args.len(), 2);
442
1
    }
443
444
    #[test]
445
1
    fn worker_key_rejects_empty_argv() {
446
1
        let argv: Vec<String> = Vec::new();
447
1
        assert!(WorkerKey::from_argv(&argv, WireFormat::Proto).is_err());
448
1
    }
449
450
    #[test]
451
1
    fn worker_key_equality_collapses_compatible_actions() {
452
1
        let a = WorkerKey::from_argv(
453
1
            &["javac", "-source", "21", "@a"].map(String::from),
454
1
            WireFormat::Proto,
455
        )
456
1
        .unwrap();
457
1
        let b = WorkerKey::from_argv(
458
1
            &["javac", "-source", "21", "@b"].map(String::from),
459
1
            WireFormat::Proto,
460
        )
461
1
        .unwrap();
462
1
        assert_eq!(a, b);
463
1
    }
464
465
    #[test]
466
1
    fn worker_key_distinguishes_different_startup_args() {
467
1
        let a = WorkerKey::from_argv(
468
1
            &["javac", "-source", "21", "@a"].map(String::from),
469
1
            WireFormat::Proto,
470
        )
471
1
        .unwrap();
472
1
        let b = WorkerKey::from_argv(
473
1
            &["javac", "-source", "17", "@a"].map(String::from),
474
1
            WireFormat::Proto,
475
        )
476
1
        .unwrap();
477
1
        assert_ne!(a, b);
478
1
    }
479
480
    #[test]
481
1
    fn worker_key_distinguishes_wire_formats() {
482
1
        let a =
483
1
            WorkerKey::from_argv(&["javac", "@a"].map(String::from), WireFormat::Proto).unwrap();
484
1
        let b = WorkerKey::from_argv(&["javac", "@a"].map(String::from), WireFormat::Json).unwrap();
485
1
        assert_ne!(a, b);
486
1
    }
487
488
    #[nativelink_test]
489
    #[cfg(unix)]
490
    async fn dropping_unhealthy_release_frees_pool_slot() {
491
        let dir = tempfile::tempdir().unwrap();
492
        let script = shell_script(dir.path(), "exec sleep 60\n");
493
        let key = shell_worker_key(&script);
494
        let pool = PersistentWorkerPool::new(PoolConfig {
495
            max_workers_per_key: 1,
496
            shutdown_grace: Duration::from_secs(1),
497
            ..PoolConfig::default()
498
        });
499
500
        let lease = pool
501
            .acquire(key.clone(), Path::new("/bin/sh"), dir.path())
502
            .await
503
            .unwrap();
504
        let release_handle = tokio::spawn(lease.release(false));
505
        tokio::time::sleep(Duration::from_millis(50)).await;
506
        release_handle.abort();
507
        assert!(release_handle.await.unwrap_err().is_cancelled());
508
509
        let next_lease = tokio::time::timeout(
510
            Duration::from_secs(2),
511
            pool.acquire(key, Path::new("/bin/sh"), dir.path()),
512
        )
513
        .await
514
        .unwrap()
515
        .unwrap();
516
        next_lease.release(false).await;
517
    }
518
519
    #[nativelink_test]
520
    #[cfg(unix)]
521
    async fn concurrent_acquire_respects_per_key_cap() {
522
        let dir = tempfile::tempdir().unwrap();
523
        let script = shell_script(dir.path(), "exec sleep 60\n");
524
        let key = shell_worker_key(&script);
525
        let pool = PersistentWorkerPool::new(PoolConfig {
526
            max_workers_per_key: 1,
527
            shutdown_grace: Duration::from_millis(100),
528
            ..PoolConfig::default()
529
        });
530
531
        let lease = pool
532
            .acquire(key.clone(), Path::new("/bin/sh"), dir.path())
533
            .await
534
            .unwrap();
535
        let err = pool
536
            .acquire(key, Path::new("/bin/sh"), dir.path())
537
            .await
538
            .unwrap_err();
539
        assert_eq!(err.code, Code::ResourceExhausted);
540
        lease.release(false).await;
541
    }
542
}