/build/source/nativelink-util/src/task.rs
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright 2024 The NativeLink Authors. All rights reserved. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); |
4 | | // you may not use this file except in compliance with the License. |
5 | | // You may obtain a copy of the License at |
6 | | // |
7 | | // http://www.apache.org/licenses/LICENSE-2.0 |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software |
10 | | // distributed under the License is distributed on an "AS IS" BASIS, |
11 | | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | | // See the License for the specific language governing permissions and |
13 | | // limitations under the License. |
14 | | |
15 | | use std::pin::Pin; |
16 | | use std::sync::Arc; |
17 | | use std::task::{Context, Poll}; |
18 | | |
19 | | use futures::Future; |
20 | | use hyper::rt::Executor; |
21 | | use hyper_util::rt::tokio::TokioExecutor; |
22 | | use tokio::task::{spawn_blocking, JoinError, JoinHandle}; |
23 | | pub use tracing::error_span as __error_span; |
24 | | use tracing::{Instrument, Span}; |
25 | | |
26 | | use crate::origin_context::{ActiveOriginContext, ContextAwareFuture, OriginContext}; |
27 | | |
28 | | #[inline(always)] |
29 | 1.46k | pub fn __spawn_with_span_and_context<F, T>( |
30 | 1.46k | f: F, |
31 | 1.46k | span: Span, |
32 | 1.46k | ctx: Option<Arc<OriginContext>>, |
33 | 1.46k | ) -> JoinHandle<T> |
34 | 1.46k | where |
35 | 1.46k | T: Send + 'static, |
36 | 1.46k | F: Future<Output = T> + Send + 'static, |
37 | 1.46k | { |
38 | 1.46k | #[allow(clippy::disallowed_methods)] |
39 | 1.46k | tokio::spawn(ContextAwareFuture::new(ctx, f.instrument(span))) |
40 | 1.46k | } |
41 | | |
42 | | #[inline(always)] |
43 | 1.46k | pub fn __spawn_with_span<F, T>(f: F, span: Span) -> JoinHandle<T> |
44 | 1.46k | where |
45 | 1.46k | T: Send + 'static, |
46 | 1.46k | F: Future<Output = T> + Send + 'static, |
47 | 1.46k | { |
48 | 1.46k | __spawn_with_span_and_context(f, span, ActiveOriginContext::get()) |
49 | 1.46k | } |
50 | | |
51 | | #[inline(always)] |
52 | 479 | pub fn __spawn_blocking<F, T>(f: F, span: Span) -> JoinHandle<T> |
53 | 479 | where |
54 | 479 | F: FnOnce() -> T + Send + 'static, |
55 | 479 | T: Send + 'static, |
56 | 479 | { |
57 | 479 | #[allow(clippy::disallowed_methods)] |
58 | 479 | spawn_blocking(move || span.in_scope(f)478 ) |
59 | 479 | } |
60 | | |
61 | | #[macro_export] |
62 | | macro_rules! background_spawn { |
63 | | ($name:expr, $fut:expr) => {{ |
64 | | $crate::task::__spawn_with_span($fut, $crate::task::__error_span!($name)) |
65 | | }}; |
66 | | ($name:expr, $fut:expr, $($fields:tt)*) => {{ |
67 | | $crate::task::__spawn_with_span($fut, $crate::task::__error_span!($name, $($fields)*)) |
68 | | }}; |
69 | | (name: $name:expr, fut: $fut:expr, target: $target:expr, $($fields:tt)*) => {{ |
70 | | $crate::task::__spawn_with_span($fut, $crate::task::__error_span!(target: $target, $name, $($fields)*)) |
71 | | }}; |
72 | | (span: $span:expr, ctx: $ctx:expr, fut: $fut:expr) => {{ |
73 | | $crate::task::__spawn_with_span_and_context($fut, $span, $ctx) |
74 | | }}; |
75 | | } |
76 | | |
77 | | #[macro_export] |
78 | | macro_rules! spawn { |
79 | | ($name:expr, $fut:expr) => {{ |
80 | | $crate::task::JoinHandleDropGuard::new($crate::background_spawn!($name, $fut)) |
81 | | }}; |
82 | | ($name:expr, $fut:expr, $($fields:tt)*) => {{ |
83 | | $crate::task::JoinHandleDropGuard::new($crate::background_spawn!($name, $fut, $($fields)*)) |
84 | | }}; |
85 | | (name: $name:expr, fut: $fut:expr, target: $target:expr, $($fields:tt)*) => {{ |
86 | | $crate::task::JoinHandleDropGuard::new($crate::background_spawn!($name, $fut, target: $target, $($fields)*)) |
87 | | }}; |
88 | | } |
89 | | |
90 | | #[macro_export] |
91 | | macro_rules! spawn_blocking { |
92 | | ($name:expr, $fut:expr) => {{ |
93 | | $crate::task::JoinHandleDropGuard::new($crate::task::__spawn_blocking($fut, $crate::task::__error_span!($name))) |
94 | | }}; |
95 | | ($name:expr, $fut:expr, $($fields:tt)*) => {{ |
96 | | $crate::task::JoinHandleDropGuard::new($crate::task::__spawn_blocking($fut, $crate::task::__error_span!($name, $($fields)*))) |
97 | | }}; |
98 | | ($name:expr, $fut:expr, target: $target:expr) => {{ |
99 | | $crate::task::JoinHandleDropGuard::new($crate::task::__spawn_blocking($fut, $crate::task::__error_span!(target: $target, $name))) |
100 | | }}; |
101 | | ($name:expr, $fut:expr, target: $target:expr, $($fields:tt)*) => {{ |
102 | | $crate::task::JoinHandleDropGuard::new($crate::task::__spawn_blocking($fut, $crate::task::__error_span!(target: $target, $name, $($fields)*))) |
103 | | }}; |
104 | | } |
105 | | |
106 | | /// Simple wrapper that will abort a future that is running in another spawn in the |
107 | | /// event that this handle gets dropped. |
108 | | #[derive(Debug)] |
109 | | #[must_use] |
110 | | pub struct JoinHandleDropGuard<T> { |
111 | | inner: JoinHandle<T>, |
112 | | } |
113 | | |
114 | | impl<T> JoinHandleDropGuard<T> { |
115 | 1.83k | pub fn new(inner: JoinHandle<T>) -> Self { |
116 | 1.83k | Self { inner } |
117 | 1.83k | } |
118 | | } |
119 | | |
120 | | impl<T> Future for JoinHandleDropGuard<T> { |
121 | | type Output = Result<T, JoinError>; |
122 | | |
123 | 3.57k | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
124 | 3.57k | Pin::new(&mut self.inner).poll(cx) |
125 | 3.57k | } |
126 | | } |
127 | | |
128 | | impl<T> Drop for JoinHandleDropGuard<T> { |
129 | 1.83k | fn drop(&mut self) { |
130 | 1.83k | self.inner.abort(); |
131 | 1.83k | } |
132 | | } |
133 | | |
134 | | #[derive(Clone)] |
135 | | pub struct TaskExecutor(TokioExecutor); |
136 | | |
137 | | impl TaskExecutor { |
138 | 0 | pub fn new() -> Self { |
139 | 0 | Self(TokioExecutor::new()) |
140 | 0 | } |
141 | | } |
142 | | |
143 | | impl Default for TaskExecutor { |
144 | 0 | fn default() -> Self { |
145 | 0 | Self::new() |
146 | 0 | } |
147 | | } |
148 | | |
149 | | impl<F> Executor<F> for TaskExecutor |
150 | | where |
151 | | F: Future + Send + 'static, |
152 | | F::Output: Send + 'static, |
153 | | { |
154 | 0 | fn execute(&self, fut: F) { |
155 | 0 | background_spawn!("http_executor", fut); |
156 | 0 | } |
157 | | } |