Coverage Report

Created: 2024-10-22 12:33

/build/source/nativelink-util/src/task.rs
Line
Count
Source (jump to first uncovered line)
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use std::pin::Pin;
16
use std::sync::Arc;
17
use std::task::{Context, Poll};
18
19
use futures::Future;
20
use hyper::rt::Executor;
21
use hyper_util::rt::tokio::TokioExecutor;
22
use tokio::task::{spawn_blocking, JoinError, JoinHandle};
23
pub use tracing::error_span as __error_span;
24
use tracing::{Instrument, Span};
25
26
use crate::origin_context::{ActiveOriginContext, ContextAwareFuture, OriginContext};
27
28
#[inline(always)]
29
1.46k
pub fn __spawn_with_span_and_context<F, T>(
30
1.46k
    f: F,
31
1.46k
    span: Span,
32
1.46k
    ctx: Option<Arc<OriginContext>>,
33
1.46k
) -> JoinHandle<T>
34
1.46k
where
35
1.46k
    T: Send + 'static,
36
1.46k
    F: Future<Output = T> + Send + 'static,
37
1.46k
{
38
1.46k
    #[allow(clippy::disallowed_methods)]
39
1.46k
    tokio::spawn(ContextAwareFuture::new(ctx, f.instrument(span)))
40
1.46k
}
41
42
#[inline(always)]
43
1.46k
pub fn __spawn_with_span<F, T>(f: F, span: Span) -> JoinHandle<T>
44
1.46k
where
45
1.46k
    T: Send + 'static,
46
1.46k
    F: Future<Output = T> + Send + 'static,
47
1.46k
{
48
1.46k
    __spawn_with_span_and_context(f, span, ActiveOriginContext::get())
49
1.46k
}
50
51
#[inline(always)]
52
479
pub fn __spawn_blocking<F, T>(f: F, span: Span) -> JoinHandle<T>
53
479
where
54
479
    F: FnOnce() -> T + Send + 'static,
55
479
    T: Send + 'static,
56
479
{
57
479
    #[allow(clippy::disallowed_methods)]
58
479
    spawn_blocking(move || 
span.in_scope(f)478
)
59
479
}
60
61
#[macro_export]
62
macro_rules! background_spawn {
63
    ($name:expr, $fut:expr) => {{
64
        $crate::task::__spawn_with_span($fut, $crate::task::__error_span!($name))
65
    }};
66
    ($name:expr, $fut:expr, $($fields:tt)*) => {{
67
        $crate::task::__spawn_with_span($fut, $crate::task::__error_span!($name, $($fields)*))
68
    }};
69
    (name: $name:expr, fut: $fut:expr, target: $target:expr, $($fields:tt)*) => {{
70
        $crate::task::__spawn_with_span($fut, $crate::task::__error_span!(target: $target, $name, $($fields)*))
71
    }};
72
    (span: $span:expr, ctx: $ctx:expr, fut: $fut:expr) => {{
73
        $crate::task::__spawn_with_span_and_context($fut, $span, $ctx)
74
    }};
75
}
76
77
#[macro_export]
78
macro_rules! spawn {
79
    ($name:expr, $fut:expr) => {{
80
        $crate::task::JoinHandleDropGuard::new($crate::background_spawn!($name, $fut))
81
    }};
82
    ($name:expr, $fut:expr, $($fields:tt)*) => {{
83
        $crate::task::JoinHandleDropGuard::new($crate::background_spawn!($name, $fut, $($fields)*))
84
    }};
85
    (name: $name:expr, fut: $fut:expr, target: $target:expr, $($fields:tt)*) => {{
86
        $crate::task::JoinHandleDropGuard::new($crate::background_spawn!($name, $fut, target: $target, $($fields)*))
87
    }};
88
}
89
90
#[macro_export]
91
macro_rules! spawn_blocking {
92
    ($name:expr, $fut:expr) => {{
93
        $crate::task::JoinHandleDropGuard::new($crate::task::__spawn_blocking($fut, $crate::task::__error_span!($name)))
94
    }};
95
    ($name:expr, $fut:expr, $($fields:tt)*) => {{
96
        $crate::task::JoinHandleDropGuard::new($crate::task::__spawn_blocking($fut, $crate::task::__error_span!($name, $($fields)*)))
97
    }};
98
    ($name:expr, $fut:expr, target: $target:expr) => {{
99
        $crate::task::JoinHandleDropGuard::new($crate::task::__spawn_blocking($fut, $crate::task::__error_span!(target: $target, $name)))
100
    }};
101
    ($name:expr, $fut:expr, target: $target:expr, $($fields:tt)*) => {{
102
        $crate::task::JoinHandleDropGuard::new($crate::task::__spawn_blocking($fut, $crate::task::__error_span!(target: $target, $name, $($fields)*)))
103
    }};
104
}
105
106
/// Simple wrapper that will abort a future that is running in another spawn in the
107
/// event that this handle gets dropped.
108
#[derive(Debug)]
109
#[must_use]
110
pub struct JoinHandleDropGuard<T> {
111
    inner: JoinHandle<T>,
112
}
113
114
impl<T> JoinHandleDropGuard<T> {
115
1.83k
    pub fn new(inner: JoinHandle<T>) -> Self {
116
1.83k
        Self { inner }
117
1.83k
    }
118
}
119
120
impl<T> Future for JoinHandleDropGuard<T> {
121
    type Output = Result<T, JoinError>;
122
123
3.57k
    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
124
3.57k
        Pin::new(&mut self.inner).poll(cx)
125
3.57k
    }
126
}
127
128
impl<T> Drop for JoinHandleDropGuard<T> {
129
1.83k
    fn drop(&mut self) {
130
1.83k
        self.inner.abort();
131
1.83k
    }
132
}
133
134
#[derive(Clone)]
135
pub struct TaskExecutor(TokioExecutor);
136
137
impl TaskExecutor {
138
0
    pub fn new() -> Self {
139
0
        Self(TokioExecutor::new())
140
0
    }
141
}
142
143
impl Default for TaskExecutor {
144
0
    fn default() -> Self {
145
0
        Self::new()
146
0
    }
147
}
148
149
impl<F> Executor<F> for TaskExecutor
150
where
151
    F: Future + Send + 'static,
152
    F::Output: Send + 'static,
153
{
154
0
    fn execute(&self, fut: F) {
155
0
        background_spawn!("http_executor", fut);
156
0
    }
157
}