Coverage Report

Created: 2025-05-30 16:37

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-util/src/task.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use core::pin::Pin;
16
use core::task::{Context as TaskContext, Poll};
17
18
use futures::Future;
19
use hyper::rt::Executor;
20
use hyper_util::rt::tokio::TokioExecutor;
21
use opentelemetry::context::{Context, FutureExt};
22
use tokio::task::{JoinError, JoinHandle, spawn_blocking};
23
pub use tracing::error_span as __error_span;
24
use tracing::{Instrument, Span};
25
26
1.53k
pub fn __spawn_with_span_and_context<F, T>(f: F, span: Span, ctx: Option<Context>) -> JoinHandle<T>
27
1.53k
where
28
1.53k
    T: Send + 'static,
29
1.53k
    F: Future<Output = T> + Send + 'static,
30
{
31
1.53k
    let future = f.instrument(span);
32
1.53k
    let future = if let Some(ctx) = ctx {
  Branch (32:25): [True: 0, False: 0]
  Branch (32:25): [True: 0, False: 0]
  Branch (32:25): [True: 0, False: 0]
  Branch (32:25): [True: 0, False: 0]
  Branch (32:25): [True: 0, False: 0]
  Branch (32:25): [True: 28, False: 0]
  Branch (32:25): [True: 0, False: 0]
  Branch (32:25): [True: 0, False: 0]
  Branch (32:25): [True: 0, False: 0]
  Branch (32:25): [True: 7, False: 0]
  Branch (32:25): [True: 0, False: 0]
  Branch (32:25): [True: 0, False: 0]
  Branch (32:25): [True: 6, False: 0]
  Branch (32:25): [True: 1.22k, False: 0]
  Branch (32:25): [True: 1, False: 0]
  Branch (32:25): [True: 26, False: 0]
  Branch (32:25): [Folded - Ignored]
  Branch (32:25): [True: 7, False: 0]
  Branch (32:25): [True: 2, False: 0]
  Branch (32:25): [True: 1, False: 0]
  Branch (32:25): [True: 13, False: 0]
  Branch (32:25): [True: 1, False: 0]
  Branch (32:25): [True: 1, False: 0]
  Branch (32:25): [True: 1, False: 0]
  Branch (32:25): [True: 1, False: 0]
  Branch (32:25): [True: 1, False: 0]
  Branch (32:25): [True: 1, False: 0]
  Branch (32:25): [True: 1, False: 0]
  Branch (32:25): [True: 1, False: 0]
  Branch (32:25): [True: 1, False: 0]
  Branch (32:25): [True: 1, False: 0]
  Branch (32:25): [True: 2, False: 0]
  Branch (32:25): [True: 2, False: 0]
  Branch (32:25): [True: 2, False: 0]
  Branch (32:25): [True: 2, False: 0]
  Branch (32:25): [True: 1, False: 0]
  Branch (32:25): [True: 15, False: 0]
  Branch (32:25): [True: 1, False: 0]
  Branch (32:25): [True: 1, False: 0]
  Branch (32:25): [True: 1, False: 0]
  Branch (32:25): [True: 1, False: 0]
  Branch (32:25): [True: 1, False: 0]
  Branch (32:25): [True: 1, False: 0]
  Branch (32:25): [True: 1, False: 0]
  Branch (32:25): [True: 4, False: 0]
  Branch (32:25): [True: 0, False: 0]
  Branch (32:25): [True: 6, False: 0]
  Branch (32:25): [True: 0, False: 0]
  Branch (32:25): [True: 1, False: 0]
  Branch (32:25): [Folded - Ignored]
  Branch (32:25): [True: 1, False: 0]
  Branch (32:25): [True: 0, False: 0]
  Branch (32:25): [True: 13, False: 0]
  Branch (32:25): [True: 13, False: 0]
  Branch (32:25): [True: 93, False: 0]
  Branch (32:25): [True: 1, False: 0]
  Branch (32:25): [True: 1, False: 0]
  Branch (32:25): [True: 1, False: 0]
  Branch (32:25): [True: 1, False: 0]
  Branch (32:25): [True: 1, False: 0]
  Branch (32:25): [True: 1, False: 0]
  Branch (32:25): [True: 1, False: 0]
  Branch (32:25): [True: 1, False: 0]
  Branch (32:25): [True: 1, False: 0]
  Branch (32:25): [True: 1, False: 0]
  Branch (32:25): [True: 1, False: 0]
  Branch (32:25): [True: 1, False: 0]
  Branch (32:25): [True: 1, False: 0]
  Branch (32:25): [True: 1, False: 0]
  Branch (32:25): [True: 1, False: 0]
  Branch (32:25): [True: 1, False: 0]
  Branch (32:25): [True: 1, False: 0]
  Branch (32:25): [True: 1, False: 0]
  Branch (32:25): [True: 1, False: 0]
  Branch (32:25): [True: 1, False: 0]
  Branch (32:25): [True: 1, False: 0]
  Branch (32:25): [True: 1, False: 0]
  Branch (32:25): [True: 1, False: 0]
  Branch (32:25): [True: 1, False: 0]
  Branch (32:25): [True: 20, False: 0]
  Branch (32:25): [True: 1, False: 0]
33
1.53k
        future.with_context(ctx)
34
    } else {
35
0
        future.with_current_context()
36
    };
37
38
    #[expect(clippy::disallowed_methods, reason = "purpose of the method")]
39
1.53k
    tokio::spawn(future)
40
1.53k
}
41
42
1.53k
pub fn __spawn_with_span<F, T>(f: F, span: Span) -> JoinHandle<T>
43
1.53k
where
44
1.53k
    T: Send + 'static,
45
1.53k
    F: Future<Output = T> + Send + 'static,
46
{
47
1.53k
    let current_ctx = Context::current();
48
1.53k
    __spawn_with_span_and_context(f, span, Some(current_ctx))
49
1.53k
}
50
51
736
pub fn __spawn_blocking<F, T>(f: F, span: Span) -> JoinHandle<T>
52
736
where
53
736
    F: FnOnce() -> T + Send + 'static,
54
736
    T: Send + 'static,
55
{
56
    #[expect(clippy::disallowed_methods, reason = "purpose of the method")]
57
736
    spawn_blocking(move || span.in_scope(f))
58
736
}
59
60
#[macro_export]
61
macro_rules! background_spawn {
62
    ($name:expr, $fut:expr) => {{
63
        $crate::task::__spawn_with_span($fut, $crate::task::__error_span!($name))
64
    }};
65
    ($name:expr, $fut:expr, $($fields:tt)*) => {{
66
        $crate::task::__spawn_with_span($fut, $crate::task::__error_span!($name, $($fields)*))
67
    }};
68
    (name: $name:expr, fut: $fut:expr, target: $target:expr, $($fields:tt)*) => {{
69
        $crate::task::__spawn_with_span($fut, $crate::task::__error_span!(target: $target, $name, $($fields)*))
70
    }};
71
    (span: $span:expr, ctx: $ctx:expr, fut: $fut:expr) => {{
72
        $crate::task::__spawn_with_span_and_context($fut, $span, $ctx)
73
    }};
74
}
75
76
#[macro_export]
77
macro_rules! spawn {
78
    ($name:expr, $fut:expr) => {{
79
        $crate::task::JoinHandleDropGuard::new($crate::background_spawn!($name, $fut))
80
    }};
81
    ($name:expr, $fut:expr, $($fields:tt)*) => {{
82
        $crate::task::JoinHandleDropGuard::new($crate::background_spawn!($name, $fut, $($fields)*))
83
    }};
84
    (name: $name:expr, fut: $fut:expr, target: $target:expr, $($fields:tt)*) => {{
85
        $crate::task::JoinHandleDropGuard::new($crate::background_spawn!($name, $fut, target: $target, $($fields)*))
86
    }};
87
}
88
89
#[macro_export]
90
macro_rules! spawn_blocking {
91
    ($name:expr, $fut:expr) => {{
92
        $crate::task::JoinHandleDropGuard::new($crate::task::__spawn_blocking($fut, $crate::task::__error_span!($name)))
93
    }};
94
    ($name:expr, $fut:expr, $($fields:tt)*) => {{
95
        $crate::task::JoinHandleDropGuard::new($crate::task::__spawn_blocking($fut, $crate::task::__error_span!($name, $($fields)*)))
96
    }};
97
    ($name:expr, $fut:expr, target: $target:expr) => {{
98
        $crate::task::JoinHandleDropGuard::new($crate::task::__spawn_blocking($fut, $crate::task::__error_span!(target: $target, $name)))
99
    }};
100
    ($name:expr, $fut:expr, target: $target:expr, $($fields:tt)*) => {{
101
        $crate::task::JoinHandleDropGuard::new($crate::task::__spawn_blocking($fut, $crate::task::__error_span!(target: $target, $name, $($fields)*)))
102
    }};
103
}
104
105
/// Simple wrapper that will abort a future that is running in another spawn in the
106
/// event that this handle gets dropped.
107
#[derive(Debug)]
108
#[must_use]
109
pub struct JoinHandleDropGuard<T> {
110
    inner: JoinHandle<T>,
111
}
112
113
impl<T> JoinHandleDropGuard<T> {
114
2.11k
    pub const fn new(inner: JoinHandle<T>) -> Self {
115
2.11k
        Self { inner }
116
2.11k
    }
117
}
118
119
impl<T> Future for JoinHandleDropGuard<T> {
120
    type Output = Result<T, JoinError>;
121
122
4.11k
    fn poll(mut self: Pin<&mut Self>, cx: &mut TaskContext<'_>) -> Poll<Self::Output> {
123
4.11k
        Pin::new(&mut self.inner).poll(cx)
124
4.11k
    }
125
}
126
127
impl<T> Drop for JoinHandleDropGuard<T> {
128
2.11k
    fn drop(&mut self) {
129
2.11k
        self.inner.abort();
130
2.11k
    }
131
}
132
133
#[derive(Debug, Clone)]
134
pub struct TaskExecutor(TokioExecutor);
135
136
impl TaskExecutor {
137
0
    pub fn new() -> Self {
138
0
        Self(TokioExecutor::new())
139
0
    }
140
}
141
142
impl Default for TaskExecutor {
143
0
    fn default() -> Self {
144
0
        Self::new()
145
0
    }
146
}
147
148
impl<F> Executor<F> for TaskExecutor
149
where
150
    F: Future + Send + 'static,
151
    F::Output: Send + 'static,
152
{
153
0
    fn execute(&self, fut: F) {
154
0
        background_spawn!("http_executor", fut);
155
0
    }
156
}