/build/source/nativelink-util/src/task.rs
Line | Count | Source |
1 | | // Copyright 2024 The NativeLink Authors. All rights reserved. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); |
4 | | // you may not use this file except in compliance with the License. |
5 | | // You may obtain a copy of the License at |
6 | | // |
7 | | // http://www.apache.org/licenses/LICENSE-2.0 |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software |
10 | | // distributed under the License is distributed on an "AS IS" BASIS, |
11 | | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | | // See the License for the specific language governing permissions and |
13 | | // limitations under the License. |
14 | | |
15 | | use core::pin::Pin; |
16 | | use core::task::{Context as TaskContext, Poll}; |
17 | | |
18 | | use futures::Future; |
19 | | use hyper::rt::Executor; |
20 | | use hyper_util::rt::tokio::TokioExecutor; |
21 | | use opentelemetry::context::{Context, FutureExt}; |
22 | | use tokio::task::{JoinError, JoinHandle, spawn_blocking}; |
23 | | pub use tracing::error_span as __error_span; |
24 | | use tracing::{Instrument, Span}; |
25 | | |
26 | 1.53k | pub fn __spawn_with_span_and_context<F, T>(f: F, span: Span, ctx: Option<Context>) -> JoinHandle<T> |
27 | 1.53k | where |
28 | 1.53k | T: Send + 'static, |
29 | 1.53k | F: Future<Output = T> + Send + 'static, |
30 | | { |
31 | 1.53k | let future = f.instrument(span); |
32 | 1.53k | let future = if let Some(ctx) = ctx { Branch (32:25): [True: 0, False: 0]
Branch (32:25): [True: 0, False: 0]
Branch (32:25): [True: 0, False: 0]
Branch (32:25): [True: 0, False: 0]
Branch (32:25): [True: 0, False: 0]
Branch (32:25): [True: 28, False: 0]
Branch (32:25): [True: 0, False: 0]
Branch (32:25): [True: 0, False: 0]
Branch (32:25): [True: 0, False: 0]
Branch (32:25): [True: 7, False: 0]
Branch (32:25): [True: 0, False: 0]
Branch (32:25): [True: 0, False: 0]
Branch (32:25): [True: 6, False: 0]
Branch (32:25): [True: 1.22k, False: 0]
Branch (32:25): [True: 1, False: 0]
Branch (32:25): [True: 26, False: 0]
Branch (32:25): [Folded - Ignored]
Branch (32:25): [True: 7, False: 0]
Branch (32:25): [True: 2, False: 0]
Branch (32:25): [True: 1, False: 0]
Branch (32:25): [True: 13, False: 0]
Branch (32:25): [True: 1, False: 0]
Branch (32:25): [True: 1, False: 0]
Branch (32:25): [True: 1, False: 0]
Branch (32:25): [True: 1, False: 0]
Branch (32:25): [True: 1, False: 0]
Branch (32:25): [True: 1, False: 0]
Branch (32:25): [True: 1, False: 0]
Branch (32:25): [True: 1, False: 0]
Branch (32:25): [True: 1, False: 0]
Branch (32:25): [True: 1, False: 0]
Branch (32:25): [True: 2, False: 0]
Branch (32:25): [True: 2, False: 0]
Branch (32:25): [True: 2, False: 0]
Branch (32:25): [True: 2, False: 0]
Branch (32:25): [True: 1, False: 0]
Branch (32:25): [True: 15, False: 0]
Branch (32:25): [True: 1, False: 0]
Branch (32:25): [True: 1, False: 0]
Branch (32:25): [True: 1, False: 0]
Branch (32:25): [True: 1, False: 0]
Branch (32:25): [True: 1, False: 0]
Branch (32:25): [True: 1, False: 0]
Branch (32:25): [True: 1, False: 0]
Branch (32:25): [True: 4, False: 0]
Branch (32:25): [True: 0, False: 0]
Branch (32:25): [True: 6, False: 0]
Branch (32:25): [True: 0, False: 0]
Branch (32:25): [True: 1, False: 0]
Branch (32:25): [Folded - Ignored]
Branch (32:25): [True: 1, False: 0]
Branch (32:25): [True: 0, False: 0]
Branch (32:25): [True: 13, False: 0]
Branch (32:25): [True: 13, False: 0]
Branch (32:25): [True: 93, False: 0]
Branch (32:25): [True: 1, False: 0]
Branch (32:25): [True: 1, False: 0]
Branch (32:25): [True: 1, False: 0]
Branch (32:25): [True: 1, False: 0]
Branch (32:25): [True: 1, False: 0]
Branch (32:25): [True: 1, False: 0]
Branch (32:25): [True: 1, False: 0]
Branch (32:25): [True: 1, False: 0]
Branch (32:25): [True: 1, False: 0]
Branch (32:25): [True: 1, False: 0]
Branch (32:25): [True: 1, False: 0]
Branch (32:25): [True: 1, False: 0]
Branch (32:25): [True: 1, False: 0]
Branch (32:25): [True: 1, False: 0]
Branch (32:25): [True: 1, False: 0]
Branch (32:25): [True: 1, False: 0]
Branch (32:25): [True: 1, False: 0]
Branch (32:25): [True: 1, False: 0]
Branch (32:25): [True: 1, False: 0]
Branch (32:25): [True: 1, False: 0]
Branch (32:25): [True: 1, False: 0]
Branch (32:25): [True: 1, False: 0]
Branch (32:25): [True: 1, False: 0]
Branch (32:25): [True: 1, False: 0]
Branch (32:25): [True: 20, False: 0]
Branch (32:25): [True: 1, False: 0]
|
33 | 1.53k | future.with_context(ctx) |
34 | | } else { |
35 | 0 | future.with_current_context() |
36 | | }; |
37 | | |
38 | | #[expect(clippy::disallowed_methods, reason = "purpose of the method")] |
39 | 1.53k | tokio::spawn(future) |
40 | 1.53k | } |
41 | | |
42 | 1.53k | pub fn __spawn_with_span<F, T>(f: F, span: Span) -> JoinHandle<T> |
43 | 1.53k | where |
44 | 1.53k | T: Send + 'static, |
45 | 1.53k | F: Future<Output = T> + Send + 'static, |
46 | | { |
47 | 1.53k | let current_ctx = Context::current(); |
48 | 1.53k | __spawn_with_span_and_context(f, span, Some(current_ctx)) |
49 | 1.53k | } |
50 | | |
51 | 736 | pub fn __spawn_blocking<F, T>(f: F, span: Span) -> JoinHandle<T> |
52 | 736 | where |
53 | 736 | F: FnOnce() -> T + Send + 'static, |
54 | 736 | T: Send + 'static, |
55 | | { |
56 | | #[expect(clippy::disallowed_methods, reason = "purpose of the method")] |
57 | 736 | spawn_blocking(move || span.in_scope(f)) |
58 | 736 | } |
59 | | |
60 | | #[macro_export] |
61 | | macro_rules! background_spawn { |
62 | | ($name:expr, $fut:expr) => {{ |
63 | | $crate::task::__spawn_with_span($fut, $crate::task::__error_span!($name)) |
64 | | }}; |
65 | | ($name:expr, $fut:expr, $($fields:tt)*) => {{ |
66 | | $crate::task::__spawn_with_span($fut, $crate::task::__error_span!($name, $($fields)*)) |
67 | | }}; |
68 | | (name: $name:expr, fut: $fut:expr, target: $target:expr, $($fields:tt)*) => {{ |
69 | | $crate::task::__spawn_with_span($fut, $crate::task::__error_span!(target: $target, $name, $($fields)*)) |
70 | | }}; |
71 | | (span: $span:expr, ctx: $ctx:expr, fut: $fut:expr) => {{ |
72 | | $crate::task::__spawn_with_span_and_context($fut, $span, $ctx) |
73 | | }}; |
74 | | } |
75 | | |
76 | | #[macro_export] |
77 | | macro_rules! spawn { |
78 | | ($name:expr, $fut:expr) => {{ |
79 | | $crate::task::JoinHandleDropGuard::new($crate::background_spawn!($name, $fut)) |
80 | | }}; |
81 | | ($name:expr, $fut:expr, $($fields:tt)*) => {{ |
82 | | $crate::task::JoinHandleDropGuard::new($crate::background_spawn!($name, $fut, $($fields)*)) |
83 | | }}; |
84 | | (name: $name:expr, fut: $fut:expr, target: $target:expr, $($fields:tt)*) => {{ |
85 | | $crate::task::JoinHandleDropGuard::new($crate::background_spawn!($name, $fut, target: $target, $($fields)*)) |
86 | | }}; |
87 | | } |
88 | | |
89 | | #[macro_export] |
90 | | macro_rules! spawn_blocking { |
91 | | ($name:expr, $fut:expr) => {{ |
92 | | $crate::task::JoinHandleDropGuard::new($crate::task::__spawn_blocking($fut, $crate::task::__error_span!($name))) |
93 | | }}; |
94 | | ($name:expr, $fut:expr, $($fields:tt)*) => {{ |
95 | | $crate::task::JoinHandleDropGuard::new($crate::task::__spawn_blocking($fut, $crate::task::__error_span!($name, $($fields)*))) |
96 | | }}; |
97 | | ($name:expr, $fut:expr, target: $target:expr) => {{ |
98 | | $crate::task::JoinHandleDropGuard::new($crate::task::__spawn_blocking($fut, $crate::task::__error_span!(target: $target, $name))) |
99 | | }}; |
100 | | ($name:expr, $fut:expr, target: $target:expr, $($fields:tt)*) => {{ |
101 | | $crate::task::JoinHandleDropGuard::new($crate::task::__spawn_blocking($fut, $crate::task::__error_span!(target: $target, $name, $($fields)*))) |
102 | | }}; |
103 | | } |
104 | | |
105 | | /// Simple wrapper that will abort a future that is running in another spawn in the |
106 | | /// event that this handle gets dropped. |
107 | | #[derive(Debug)] |
108 | | #[must_use] |
109 | | pub struct JoinHandleDropGuard<T> { |
110 | | inner: JoinHandle<T>, |
111 | | } |
112 | | |
113 | | impl<T> JoinHandleDropGuard<T> { |
114 | 2.11k | pub const fn new(inner: JoinHandle<T>) -> Self { |
115 | 2.11k | Self { inner } |
116 | 2.11k | } |
117 | | } |
118 | | |
119 | | impl<T> Future for JoinHandleDropGuard<T> { |
120 | | type Output = Result<T, JoinError>; |
121 | | |
122 | 4.11k | fn poll(mut self: Pin<&mut Self>, cx: &mut TaskContext<'_>) -> Poll<Self::Output> { |
123 | 4.11k | Pin::new(&mut self.inner).poll(cx) |
124 | 4.11k | } |
125 | | } |
126 | | |
127 | | impl<T> Drop for JoinHandleDropGuard<T> { |
128 | 2.11k | fn drop(&mut self) { |
129 | 2.11k | self.inner.abort(); |
130 | 2.11k | } |
131 | | } |
132 | | |
133 | | #[derive(Debug, Clone)] |
134 | | pub struct TaskExecutor(TokioExecutor); |
135 | | |
136 | | impl TaskExecutor { |
137 | 0 | pub fn new() -> Self { |
138 | 0 | Self(TokioExecutor::new()) |
139 | 0 | } |
140 | | } |
141 | | |
142 | | impl Default for TaskExecutor { |
143 | 0 | fn default() -> Self { |
144 | 0 | Self::new() |
145 | 0 | } |
146 | | } |
147 | | |
148 | | impl<F> Executor<F> for TaskExecutor |
149 | | where |
150 | | F: Future + Send + 'static, |
151 | | F::Output: Send + 'static, |
152 | | { |
153 | 0 | fn execute(&self, fut: F) { |
154 | 0 | background_spawn!("http_executor", fut); |
155 | 0 | } |
156 | | } |