/build/source/nativelink-util/src/task.rs
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright 2024 The NativeLink Authors. All rights reserved. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); |
4 | | // you may not use this file except in compliance with the License. |
5 | | // You may obtain a copy of the License at |
6 | | // |
7 | | // http://www.apache.org/licenses/LICENSE-2.0 |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software |
10 | | // distributed under the License is distributed on an "AS IS" BASIS, |
11 | | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | | // See the License for the specific language governing permissions and |
13 | | // limitations under the License. |
14 | | |
15 | | use std::pin::Pin; |
16 | | use std::sync::Arc; |
17 | | use std::task::{Context, Poll}; |
18 | | |
19 | | use futures::Future; |
20 | | use hyper::rt::Executor; |
21 | | use hyper_util::rt::tokio::TokioExecutor; |
22 | | use tokio::task::{spawn_blocking, JoinError, JoinHandle}; |
23 | | pub use tracing::error_span as __error_span; |
24 | | use tracing::{Instrument, Span}; |
25 | | |
26 | | use crate::origin_context::{ActiveOriginContext, ContextAwareFuture, OriginContext}; |
27 | | |
28 | 1.46k | pub fn __spawn_with_span_and_context<F, T>( |
29 | 1.46k | f: F, |
30 | 1.46k | span: Span, |
31 | 1.46k | ctx: Option<Arc<OriginContext>>, |
32 | 1.46k | ) -> JoinHandle<T> |
33 | 1.46k | where |
34 | 1.46k | T: Send + 'static, |
35 | 1.46k | F: Future<Output = T> + Send + 'static, |
36 | 1.46k | { |
37 | 1.46k | #[allow(clippy::disallowed_methods)] |
38 | 1.46k | tokio::spawn(ContextAwareFuture::new(ctx, f.instrument(span))) |
39 | 1.46k | } |
40 | | |
41 | 1.46k | pub fn __spawn_with_span<F, T>(f: F, span: Span) -> JoinHandle<T> |
42 | 1.46k | where |
43 | 1.46k | T: Send + 'static, |
44 | 1.46k | F: Future<Output = T> + Send + 'static, |
45 | 1.46k | { |
46 | 1.46k | __spawn_with_span_and_context(f, span, ActiveOriginContext::get()) |
47 | 1.46k | } |
48 | | |
49 | 478 | pub fn __spawn_blocking<F, T>(f: F, span: Span) -> JoinHandle<T> |
50 | 478 | where |
51 | 478 | F: FnOnce() -> T + Send + 'static, |
52 | 478 | T: Send + 'static, |
53 | 478 | { |
54 | 478 | #[allow(clippy::disallowed_methods)] |
55 | 478 | spawn_blocking(move || span.in_scope(f)) |
56 | 478 | } |
57 | | |
58 | | #[macro_export] |
59 | | macro_rules! background_spawn { |
60 | | ($name:expr, $fut:expr) => {{ |
61 | | $crate::task::__spawn_with_span($fut, $crate::task::__error_span!($name)) |
62 | | }}; |
63 | | ($name:expr, $fut:expr, $($fields:tt)*) => {{ |
64 | | $crate::task::__spawn_with_span($fut, $crate::task::__error_span!($name, $($fields)*)) |
65 | | }}; |
66 | | (name: $name:expr, fut: $fut:expr, target: $target:expr, $($fields:tt)*) => {{ |
67 | | $crate::task::__spawn_with_span($fut, $crate::task::__error_span!(target: $target, $name, $($fields)*)) |
68 | | }}; |
69 | | (span: $span:expr, ctx: $ctx:expr, fut: $fut:expr) => {{ |
70 | | $crate::task::__spawn_with_span_and_context($fut, $span, $ctx) |
71 | | }}; |
72 | | } |
73 | | |
74 | | #[macro_export] |
75 | | macro_rules! spawn { |
76 | | ($name:expr, $fut:expr) => {{ |
77 | | $crate::task::JoinHandleDropGuard::new($crate::background_spawn!($name, $fut)) |
78 | | }}; |
79 | | ($name:expr, $fut:expr, $($fields:tt)*) => {{ |
80 | | $crate::task::JoinHandleDropGuard::new($crate::background_spawn!($name, $fut, $($fields)*)) |
81 | | }}; |
82 | | (name: $name:expr, fut: $fut:expr, target: $target:expr, $($fields:tt)*) => {{ |
83 | | $crate::task::JoinHandleDropGuard::new($crate::background_spawn!($name, $fut, target: $target, $($fields)*)) |
84 | | }}; |
85 | | } |
86 | | |
87 | | #[macro_export] |
88 | | macro_rules! spawn_blocking { |
89 | | ($name:expr, $fut:expr) => {{ |
90 | | $crate::task::JoinHandleDropGuard::new($crate::task::__spawn_blocking($fut, $crate::task::__error_span!($name))) |
91 | | }}; |
92 | | ($name:expr, $fut:expr, $($fields:tt)*) => {{ |
93 | | $crate::task::JoinHandleDropGuard::new($crate::task::__spawn_blocking($fut, $crate::task::__error_span!($name, $($fields)*))) |
94 | | }}; |
95 | | ($name:expr, $fut:expr, target: $target:expr) => {{ |
96 | | $crate::task::JoinHandleDropGuard::new($crate::task::__spawn_blocking($fut, $crate::task::__error_span!(target: $target, $name))) |
97 | | }}; |
98 | | ($name:expr, $fut:expr, target: $target:expr, $($fields:tt)*) => {{ |
99 | | $crate::task::JoinHandleDropGuard::new($crate::task::__spawn_blocking($fut, $crate::task::__error_span!(target: $target, $name, $($fields)*))) |
100 | | }}; |
101 | | } |
102 | | |
103 | | /// Simple wrapper that will abort a future that is running in another spawn in the |
104 | | /// event that this handle gets dropped. |
105 | | #[derive(Debug)] |
106 | | #[must_use] |
107 | | pub struct JoinHandleDropGuard<T> { |
108 | | inner: JoinHandle<T>, |
109 | | } |
110 | | |
111 | | impl<T> JoinHandleDropGuard<T> { |
112 | 1.83k | pub fn new(inner: JoinHandle<T>) -> Self { |
113 | 1.83k | Self { inner } |
114 | 1.83k | } |
115 | | } |
116 | | |
117 | | impl<T> Future for JoinHandleDropGuard<T> { |
118 | | type Output = Result<T, JoinError>; |
119 | | |
120 | 3.57k | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
121 | 3.57k | Pin::new(&mut self.inner).poll(cx) |
122 | 3.57k | } |
123 | | } |
124 | | |
125 | | impl<T> Drop for JoinHandleDropGuard<T> { |
126 | 1.83k | fn drop(&mut self) { |
127 | 1.83k | self.inner.abort(); |
128 | 1.83k | } |
129 | | } |
130 | | |
131 | | #[derive(Clone)] |
132 | | pub struct TaskExecutor(TokioExecutor); |
133 | | |
134 | | impl TaskExecutor { |
135 | 0 | pub fn new() -> Self { |
136 | 0 | Self(TokioExecutor::new()) |
137 | 0 | } |
138 | | } |
139 | | |
140 | | impl Default for TaskExecutor { |
141 | 0 | fn default() -> Self { |
142 | 0 | Self::new() |
143 | 0 | } |
144 | | } |
145 | | |
146 | | impl<F> Executor<F> for TaskExecutor |
147 | | where |
148 | | F: Future + Send + 'static, |
149 | | F::Output: Send + 'static, |
150 | | { |
151 | 0 | fn execute(&self, fut: F) { |
152 | 0 | background_spawn!("http_executor", fut); |
153 | 0 | } |
154 | | } |