/build/source/nativelink-worker/src/persistent_worker/pool.rs
Line | Count | Source |
1 | | // Copyright 2024 Trace Machina, Inc. All rights reserved. |
2 | | // |
3 | | // Licensed under the Business Source License, Version 1.1 (the "License"); |
4 | | // you may not use this file except in compliance with the License. |
5 | | // You may requested a copy of the License by emailing contact@nativelink.com. |
6 | | // |
7 | | // Use of this module requires an enterprise license agreement, which can be |
8 | | // attained by emailing contact@nativelink.com or signing up for Nativelink |
9 | | // Cloud at app.nativelink.com. |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, software |
12 | | // distributed under the License is distributed on an "AS IS" BASIS, |
13 | | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
14 | | // See the License for the specific language governing permissions and |
15 | | // limitations under the License. |
16 | | |
17 | | //! `PersistentWorkerPool`: per-`LocalWorker` pool of `LiveWorker`s keyed by |
18 | | //! `WorkerKey`. Mirrors Bazel's notion of `WorkerKey` — two actions whose |
19 | | //! executable + startup-flag prefix + wire format are identical share a worker |
20 | | //! process. |
21 | | |
22 | | use core::mem; |
23 | | use core::time::Duration; |
24 | | use std::collections::HashMap; |
25 | | use std::path::{Path, PathBuf}; |
26 | | use std::sync::Arc; |
27 | | |
28 | | use nativelink_error::{Code, Error, ResultExt, make_err}; |
29 | | use nativelink_util::background_spawn; |
30 | | use parking_lot::Mutex; |
31 | | use tracing::{debug, info, warn}; |
32 | | |
33 | | use super::live_worker::LiveWorker; |
34 | | use super::protocol::WireFormat; |
35 | | |
36 | | /// Default per-key bound on live workers. Matches Bazel's `worker_max_instances` |
37 | | /// default and keeps memory pressure bounded without operator intervention. |
38 | | pub const DEFAULT_MAX_WORKERS_PER_KEY: usize = 4; |
39 | | |
40 | | /// Default idle timeout. A worker that has not handled a request for this |
41 | | /// duration is shut down by the background sweeper. |
42 | | pub const DEFAULT_IDLE_TIMEOUT: Duration = Duration::from_secs(60 * 5); |
43 | | |
44 | | /// Default per-worker request cap. After this many `dispatch` calls a worker is |
45 | | /// dropped on `release` (helps recycle accumulated JVM state). |
46 | | pub const DEFAULT_MAX_REQUESTS_PER_WORKER: u64 = 200; |
47 | | |
48 | | /// Identity by which two persistent-worker actions are considered compatible. |
49 | | /// Same `WorkerKey` => same worker can serve both. |
50 | | #[derive(Clone, Debug, PartialEq, Eq, Hash)] |
51 | | pub struct WorkerKey { |
52 | | /// Resolved tool executable. Mirrors `Action.arguments[0]`. |
53 | | pub executable: PathBuf, |
54 | | /// Leading arguments that become tool startup flags. Computed as the prefix |
55 | | /// of `Action.arguments[1..]` up to (excluding) the first arg starting |
56 | | /// with `@` (Bazel's response-file convention). |
57 | | pub startup_args: Vec<String>, |
58 | | /// Wire format from the action's `requires-worker-protocol`. |
59 | | pub wire_format: WireFormat, |
60 | | } |
61 | | |
62 | | impl WorkerKey { |
63 | | /// Derive a `WorkerKey` from a full action `argv` and the negotiated wire |
64 | | /// format. |
65 | | /// |
66 | | /// `argv[0]` is the executable. Subsequent arguments are scanned |
67 | | /// left-to-right and treated as startup flags until the first |
68 | | /// `@`-prefixed argument (Bazel's response file), at which point the rest |
69 | | /// are considered per-request inputs and excluded from the key. |
70 | 12 | pub fn from_argv(argv: &[String], wire_format: WireFormat) -> Result<Self, Error> { |
71 | 12 | let (exe11 , rest11 ) = argv.split_first().ok_or_else(|| {1 |
72 | 1 | make_err!( |
73 | 1 | Code::InvalidArgument, |
74 | | "Cannot derive WorkerKey from empty argument list" |
75 | | ) |
76 | 1 | })?; |
77 | 11 | let startup_args: Vec<String> = rest |
78 | 11 | .iter() |
79 | 22 | .take_while11 (|a| !a.starts_with('@')) |
80 | 11 | .cloned() |
81 | 11 | .collect(); |
82 | 11 | Ok(Self { |
83 | 11 | executable: PathBuf::from(exe), |
84 | 11 | startup_args, |
85 | 11 | wire_format, |
86 | 11 | }) |
87 | 12 | } |
88 | | } |
89 | | |
90 | | /// Configuration knobs for a `PersistentWorkerPool`. Defaults are sensible for |
91 | | /// a JVM-heavy workload; operators can override in `nativelink-config`. |
92 | | #[derive(Clone, Copy, Debug)] |
93 | | pub struct PoolConfig { |
94 | | pub max_workers_per_key: usize, |
95 | | pub idle_timeout: Duration, |
96 | | pub max_requests_per_worker: u64, |
97 | | /// SIGKILL grace when shutting a worker down. |
98 | | pub shutdown_grace: Duration, |
99 | | } |
100 | | |
101 | | impl Default for PoolConfig { |
102 | 34 | fn default() -> Self { |
103 | 34 | Self { |
104 | 34 | max_workers_per_key: DEFAULT_MAX_WORKERS_PER_KEY, |
105 | 34 | idle_timeout: DEFAULT_IDLE_TIMEOUT, |
106 | 34 | max_requests_per_worker: DEFAULT_MAX_REQUESTS_PER_WORKER, |
107 | 34 | shutdown_grace: Duration::from_secs(5), |
108 | 34 | } |
109 | 34 | } |
110 | | } |
111 | | |
112 | | /// Reservation handle returned by `acquire`. Holding it removes the worker |
113 | | /// from the pool's idle set; `release` returns it (unless the request marked |
114 | | /// it as dead). Drop-without-release is treated as "worker is dead": it is |
115 | | /// shut down rather than re-pooled. |
116 | | #[derive(Debug)] |
117 | | pub struct Lease { |
118 | | inner: Option<LeaseInner>, |
119 | | } |
120 | | |
121 | | #[derive(Debug)] |
122 | | struct LeaseInner { |
123 | | worker: LiveWorker, |
124 | | pool: Arc<PoolInner>, |
125 | | key: WorkerKey, |
126 | | } |
127 | | |
128 | | #[derive(Debug)] |
129 | | struct CountSlotGuard { |
130 | | pool: Arc<PoolInner>, |
131 | | key: WorkerKey, |
132 | | active: bool, |
133 | | } |
134 | | |
135 | | impl CountSlotGuard { |
136 | 7 | const fn new(pool: Arc<PoolInner>, key: WorkerKey) -> Self { |
137 | 7 | Self { |
138 | 7 | pool, |
139 | 7 | key, |
140 | 7 | active: true, |
141 | 7 | } |
142 | 7 | } |
143 | | |
144 | 4 | fn disarm(mut self) { |
145 | 4 | self.active = false; |
146 | 4 | } |
147 | | |
148 | 2 | fn decrement_now(mut self) { |
149 | 2 | if self.active { |
150 | 2 | self.pool.decrement_count(&self.key); |
151 | 2 | self.active = false; |
152 | 2 | }0 |
153 | 2 | } |
154 | | } |
155 | | |
156 | | impl Drop for CountSlotGuard { |
157 | 7 | fn drop(&mut self) { |
158 | 7 | if self.active { |
159 | 1 | self.pool.decrement_count(&self.key); |
160 | 6 | } |
161 | 7 | } |
162 | | } |
163 | | |
164 | | impl Lease { |
165 | 2 | pub const fn worker(&mut self) -> &mut LiveWorker { |
166 | 2 | &mut self |
167 | 2 | .inner |
168 | 2 | .as_mut() |
169 | 2 | .expect("lease used after release") |
170 | 2 | .worker |
171 | 2 | } |
172 | | |
173 | | /// Return the worker to the pool. Caller signals whether the worker is |
174 | | /// still healthy. Unhealthy workers are shut down asynchronously and not |
175 | | /// returned to the idle set. |
176 | 5 | pub async fn release(mut self, healthy: bool)3 { |
177 | 5 | let Some(inner) = self.inner.take() else { |
178 | 0 | return; |
179 | | }; |
180 | 5 | inner |
181 | 5 | .pool |
182 | 5 | .return_worker(inner.key, inner.worker, healthy) |
183 | 5 | .await; |
184 | 4 | } |
185 | | } |
186 | | |
187 | | impl Drop for Lease { |
188 | 5 | fn drop(&mut self) { |
189 | | // If the caller forgot to call `release`, treat the worker as |
190 | | // unhealthy and shut it down on a detached task. |
191 | 5 | if let Some(inner0 ) = self.inner.take() { |
192 | 0 | let pool = inner.pool.clone(); |
193 | 0 | let key = inner.key.clone(); |
194 | 0 | let worker = inner.worker; |
195 | 0 | let grace = pool.config.shutdown_grace; |
196 | 0 | background_spawn!("persistent_worker_lease_drop", async move { |
197 | 0 | let count_slot = CountSlotGuard::new(pool.clone(), key.clone()); |
198 | 0 | warn!( |
199 | | ?key, |
200 | | "Lease dropped without release; treating worker as unhealthy" |
201 | | ); |
202 | 0 | worker.shutdown(grace).await; |
203 | 0 | count_slot.decrement_now(); |
204 | 0 | }); |
205 | 5 | } |
206 | 5 | } |
207 | | } |
208 | | |
209 | | /// Per-key pool state. Wrapped in a `Mutex` because `acquire` and `release` |
210 | | /// run on different tokio tasks. |
211 | | #[derive(Debug)] |
212 | | struct KeyState { |
213 | | /// Idle workers ready to serve. Always size <= `total_count`. |
214 | | idle: Vec<LiveWorker>, |
215 | | /// Total live workers for this key (idle + leased). Used to enforce |
216 | | /// `max_workers_per_key`. |
217 | | total_count: usize, |
218 | | } |
219 | | |
220 | | impl KeyState { |
221 | 2 | const fn new() -> Self { |
222 | 2 | Self { |
223 | 2 | idle: Vec::new(), |
224 | 2 | total_count: 0, |
225 | 2 | } |
226 | 2 | } |
227 | | } |
228 | | |
229 | | #[derive(Debug)] |
230 | | struct PoolInner { |
231 | | config: PoolConfig, |
232 | | state: Mutex<HashMap<WorkerKey, KeyState>>, |
233 | | } |
234 | | |
235 | | impl PoolInner { |
236 | | /// Helper used by Drop on a forgotten Lease. |
237 | 3 | fn decrement_count(&self, key: &WorkerKey) { |
238 | 3 | let mut state = self.state.lock(); |
239 | 3 | if let Some(s) = state.get_mut(key) { |
240 | 3 | s.total_count = s.total_count.saturating_sub(1); |
241 | 3 | }0 |
242 | 3 | } |
243 | | |
244 | 5 | async fn return_worker(self: Arc<Self>, key: WorkerKey, worker: LiveWorker, healthy: bool) { |
245 | 5 | let exceeded_cap = worker.request_count() >= self.config.max_requests_per_worker; |
246 | 5 | if !healthy || exceeded_cap2 { |
247 | 3 | let count_slot = CountSlotGuard::new(self.clone(), key.clone()); |
248 | 3 | debug!( |
249 | | ?key, |
250 | 3 | request_count = worker.request_count(), |
251 | | healthy, |
252 | | "Retiring persistent worker" |
253 | | ); |
254 | 3 | worker.shutdown(self.config.shutdown_grace).await; |
255 | 2 | count_slot.decrement_now(); |
256 | 2 | return; |
257 | 2 | } |
258 | 2 | let mut state = self.state.lock(); |
259 | 2 | let entry = state.entry(key).or_insert_with(KeyState::new); |
260 | 2 | entry.idle.push(worker); |
261 | 4 | } |
262 | | } |
263 | | |
264 | | /// The pool itself. |
265 | | #[derive(Clone, Debug)] |
266 | | pub struct PersistentWorkerPool { |
267 | | inner: Arc<PoolInner>, |
268 | | } |
269 | | |
270 | | impl Default for PersistentWorkerPool { |
271 | 32 | fn default() -> Self { |
272 | 32 | Self::new(PoolConfig::default()) |
273 | 32 | } |
274 | | } |
275 | | |
276 | | impl PersistentWorkerPool { |
277 | 34 | pub fn new(config: PoolConfig) -> Self { |
278 | 34 | Self { |
279 | 34 | inner: Arc::new(PoolInner { |
280 | 34 | config, |
281 | 34 | state: Mutex::new(HashMap::new()), |
282 | 34 | }), |
283 | 34 | } |
284 | 34 | } |
285 | | |
286 | | /// Acquire a worker for `key`. If an idle worker is available it is |
287 | | /// returned. Otherwise a new one is spawned, capped at |
288 | | /// `max_workers_per_key`. If the cap is reached and no idle worker exists, |
289 | | /// returns `Err(Code::ResourceExhausted)` — the caller should fall back to |
290 | | /// the one-shot subprocess path or wait and retry. |
291 | | /// |
292 | | /// `working_dir` is a stable worker process directory. Per-request action |
293 | | /// directories are passed through `WorkRequest::sandbox_dir`, because the |
294 | | /// action directory is deleted after each action completes. |
295 | 4 | pub async fn acquire( |
296 | 4 | &self, |
297 | 4 | key: WorkerKey, |
298 | 4 | executable_path: &Path, |
299 | 4 | working_dir: &Path, |
300 | 6 | ) -> Result<Lease, Error> { |
301 | | // First, try to take an idle worker without blocking on spawn. |
302 | | { |
303 | 6 | let mut state = self.inner.state.lock(); |
304 | 6 | let entry = state.entry(key.clone()).or_insert_with(KeyState::new); |
305 | 6 | while let Some(mut worker1 ) = entry.idle.pop() { |
306 | 1 | if worker.is_dead() { |
307 | | // Process died while idle (e.g. JVM OOM). Drop it. |
308 | 0 | entry.total_count = entry.total_count.saturating_sub(1); |
309 | 0 | continue; |
310 | 1 | } |
311 | 1 | return Ok(Lease { |
312 | 1 | inner: Some(LeaseInner { |
313 | 1 | worker, |
314 | 1 | pool: self.inner.clone(), |
315 | 1 | key, |
316 | 1 | }), |
317 | 1 | }); |
318 | | } |
319 | 5 | if entry.total_count >= self.inner.config.max_workers_per_key { |
320 | 1 | return Err(make_err!( |
321 | 1 | Code::ResourceExhausted, |
322 | 1 | "Persistent worker pool for {key:?} is at capacity ({} workers)", |
323 | 1 | self.inner.config.max_workers_per_key |
324 | 1 | )); |
325 | 4 | } |
326 | | // Reserve a slot before releasing the lock so a concurrent |
327 | | // acquire can't over-commit. |
328 | 4 | entry.total_count += 1; |
329 | | } |
330 | | |
331 | | // Lock released; spawn outside the critical section. |
332 | 4 | let count_slot = CountSlotGuard::new(self.inner.clone(), key.clone()); |
333 | 4 | let spawn_result = LiveWorker::spawn( |
334 | 4 | executable_path, |
335 | 4 | &key.startup_args, |
336 | 4 | key.wire_format, |
337 | 4 | working_dir, |
338 | 4 | ) |
339 | 4 | .await; |
340 | 4 | let worker = match spawn_result { |
341 | 4 | Ok(w) => w, |
342 | 0 | Err(err) => { |
343 | 0 | return Err(err).err_tip(|| format!("Spawning persistent worker for {key:?}")); |
344 | | } |
345 | | }; |
346 | 4 | count_slot.disarm(); |
347 | 4 | info!(?key, "Spawned new persistent worker"); |
348 | 4 | Ok(Lease { |
349 | 4 | inner: Some(LeaseInner { |
350 | 4 | worker, |
351 | 4 | pool: self.inner.clone(), |
352 | 4 | key, |
353 | 4 | }), |
354 | 4 | }) |
355 | 6 | } |
356 | | |
357 | | /// Drop all idle workers whose `last_used` is older than `idle_timeout`. |
358 | | /// Intended to be called periodically from a background task. |
359 | 0 | pub async fn sweep_idle(&self) { |
360 | 0 | let threshold = std::time::Instant::now() |
361 | 0 | .checked_sub(self.inner.config.idle_timeout) |
362 | 0 | .unwrap_or_else(std::time::Instant::now); |
363 | 0 | let evicted: Vec<LiveWorker> = { |
364 | 0 | let mut state = self.inner.state.lock(); |
365 | 0 | let mut evicted = Vec::new(); |
366 | 0 | for entry in state.values_mut() { |
367 | 0 | let (keep, drop_): (Vec<_>, Vec<_>) = mem::take(&mut entry.idle) |
368 | 0 | .into_iter() |
369 | 0 | .partition(|w| w.last_used() >= threshold); |
370 | 0 | entry.total_count = entry.total_count.saturating_sub(drop_.len()); |
371 | 0 | entry.idle = keep; |
372 | 0 | evicted.extend(drop_); |
373 | | } |
374 | 0 | evicted |
375 | | }; |
376 | 0 | for w in evicted { |
377 | 0 | w.shutdown(self.inner.config.shutdown_grace).await; |
378 | | } |
379 | 0 | } |
380 | | } |
381 | | |
382 | | #[cfg(test)] |
383 | | mod tests { |
384 | | #[cfg(unix)] |
385 | | use std::io::Write as _; |
386 | | |
387 | | #[cfg(unix)] |
388 | | use nativelink_macro::nativelink_test; |
389 | | |
390 | | use super::*; |
391 | | |
392 | | #[cfg(unix)] |
393 | 2 | fn shell_script(working_dir: &Path, body: &str) -> PathBuf { |
394 | 2 | let path = working_dir.join("worker.sh"); |
395 | 2 | let mut file = std::fs::File::create(&path).unwrap(); |
396 | 2 | file.write_all(body.as_bytes()).unwrap(); |
397 | 2 | file.sync_all().unwrap(); |
398 | 2 | path |
399 | 2 | } |
400 | | |
401 | | #[cfg(unix)] |
402 | 2 | fn shell_worker_key(script: &Path) -> WorkerKey { |
403 | 2 | WorkerKey { |
404 | 2 | executable: PathBuf::from("/bin/sh"), |
405 | 2 | startup_args: vec![script.display().to_string()], |
406 | 2 | wire_format: WireFormat::Json, |
407 | 2 | } |
408 | 2 | } |
409 | | |
410 | | #[test] |
411 | 1 | fn worker_key_excludes_argfile_and_later_args() { |
412 | 1 | let argv: Vec<String> = ["javac", "-source", "21", "@args.txt", "Foo.java"] |
413 | 1 | .iter() |
414 | 5 | .map1 (|s| (*s).to_string()) |
415 | 1 | .collect(); |
416 | 1 | let key = WorkerKey::from_argv(&argv, WireFormat::Proto).unwrap(); |
417 | 1 | assert_eq!(key.executable, PathBuf::from("javac")); |
418 | 1 | assert_eq!(key.startup_args, vec!["-source".to_string(), "21".into()]); |
419 | 1 | } |
420 | | |
421 | | #[test] |
422 | 1 | fn worker_key_handles_empty_startup_args() { |
423 | 1 | let argv: Vec<String> = ["javac", "@args.txt"] |
424 | 1 | .iter() |
425 | 2 | .map1 (|s| (*s).to_string()) |
426 | 1 | .collect(); |
427 | 1 | let key = WorkerKey::from_argv(&argv, WireFormat::Json).unwrap(); |
428 | 1 | assert!(key.startup_args.is_empty()); |
429 | 1 | assert_eq!(key.wire_format, WireFormat::Json); |
430 | 1 | } |
431 | | |
432 | | #[test] |
433 | 1 | fn worker_key_handles_no_argfile() { |
434 | | // No @argfile means *all* trailing args are startup flags. Unusual but |
435 | | // legal — the action just doesn't use a response file. |
436 | 1 | let argv: Vec<String> = ["scalac", "-deprecation", "-Xfatal-warnings"] |
437 | 1 | .iter() |
438 | 3 | .map1 (|s| (*s).to_string()) |
439 | 1 | .collect(); |
440 | 1 | let key = WorkerKey::from_argv(&argv, WireFormat::Proto).unwrap(); |
441 | 1 | assert_eq!(key.startup_args.len(), 2); |
442 | 1 | } |
443 | | |
444 | | #[test] |
445 | 1 | fn worker_key_rejects_empty_argv() { |
446 | 1 | let argv: Vec<String> = Vec::new(); |
447 | 1 | assert!(WorkerKey::from_argv(&argv, WireFormat::Proto).is_err()); |
448 | 1 | } |
449 | | |
450 | | #[test] |
451 | 1 | fn worker_key_equality_collapses_compatible_actions() { |
452 | 1 | let a = WorkerKey::from_argv( |
453 | 1 | &["javac", "-source", "21", "@a"].map(String::from), |
454 | 1 | WireFormat::Proto, |
455 | | ) |
456 | 1 | .unwrap(); |
457 | 1 | let b = WorkerKey::from_argv( |
458 | 1 | &["javac", "-source", "21", "@b"].map(String::from), |
459 | 1 | WireFormat::Proto, |
460 | | ) |
461 | 1 | .unwrap(); |
462 | 1 | assert_eq!(a, b); |
463 | 1 | } |
464 | | |
465 | | #[test] |
466 | 1 | fn worker_key_distinguishes_different_startup_args() { |
467 | 1 | let a = WorkerKey::from_argv( |
468 | 1 | &["javac", "-source", "21", "@a"].map(String::from), |
469 | 1 | WireFormat::Proto, |
470 | | ) |
471 | 1 | .unwrap(); |
472 | 1 | let b = WorkerKey::from_argv( |
473 | 1 | &["javac", "-source", "17", "@a"].map(String::from), |
474 | 1 | WireFormat::Proto, |
475 | | ) |
476 | 1 | .unwrap(); |
477 | 1 | assert_ne!(a, b); |
478 | 1 | } |
479 | | |
480 | | #[test] |
481 | 1 | fn worker_key_distinguishes_wire_formats() { |
482 | 1 | let a = |
483 | 1 | WorkerKey::from_argv(&["javac", "@a"].map(String::from), WireFormat::Proto).unwrap(); |
484 | 1 | let b = WorkerKey::from_argv(&["javac", "@a"].map(String::from), WireFormat::Json).unwrap(); |
485 | 1 | assert_ne!(a, b); |
486 | 1 | } |
487 | | |
488 | | #[nativelink_test] |
489 | | #[cfg(unix)] |
490 | | async fn dropping_unhealthy_release_frees_pool_slot() { |
491 | | let dir = tempfile::tempdir().unwrap(); |
492 | | let script = shell_script(dir.path(), "exec sleep 60\n"); |
493 | | let key = shell_worker_key(&script); |
494 | | let pool = PersistentWorkerPool::new(PoolConfig { |
495 | | max_workers_per_key: 1, |
496 | | shutdown_grace: Duration::from_secs(1), |
497 | | ..PoolConfig::default() |
498 | | }); |
499 | | |
500 | | let lease = pool |
501 | | .acquire(key.clone(), Path::new("/bin/sh"), dir.path()) |
502 | | .await |
503 | | .unwrap(); |
504 | | let release_handle = tokio::spawn(lease.release(false)); |
505 | | tokio::time::sleep(Duration::from_millis(50)).await; |
506 | | release_handle.abort(); |
507 | | assert!(release_handle.await.unwrap_err().is_cancelled()); |
508 | | |
509 | | let next_lease = tokio::time::timeout( |
510 | | Duration::from_secs(2), |
511 | | pool.acquire(key, Path::new("/bin/sh"), dir.path()), |
512 | | ) |
513 | | .await |
514 | | .unwrap() |
515 | | .unwrap(); |
516 | | next_lease.release(false).await; |
517 | | } |
518 | | |
519 | | #[nativelink_test] |
520 | | #[cfg(unix)] |
521 | | async fn concurrent_acquire_respects_per_key_cap() { |
522 | | let dir = tempfile::tempdir().unwrap(); |
523 | | let script = shell_script(dir.path(), "exec sleep 60\n"); |
524 | | let key = shell_worker_key(&script); |
525 | | let pool = PersistentWorkerPool::new(PoolConfig { |
526 | | max_workers_per_key: 1, |
527 | | shutdown_grace: Duration::from_millis(100), |
528 | | ..PoolConfig::default() |
529 | | }); |
530 | | |
531 | | let lease = pool |
532 | | .acquire(key.clone(), Path::new("/bin/sh"), dir.path()) |
533 | | .await |
534 | | .unwrap(); |
535 | | let err = pool |
536 | | .acquire(key, Path::new("/bin/sh"), dir.path()) |
537 | | .await |
538 | | .unwrap_err(); |
539 | | assert_eq!(err.code, Code::ResourceExhausted); |
540 | | lease.release(false).await; |
541 | | } |
542 | | } |