/build/source/nativelink-service/src/cas_server.rs
Line  | Count  | Source  | 
1  |  | // Copyright 2024 The NativeLink Authors. All rights reserved.  | 
2  |  | //  | 
3  |  | // Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License");  | 
4  |  | // you may not use this file except in compliance with the License.  | 
5  |  | // You may obtain a copy of the License at  | 
6  |  | //  | 
7  |  | //    See LICENSE file for details  | 
8  |  | //  | 
9  |  | // Unless required by applicable law or agreed to in writing, software  | 
10  |  | // distributed under the License is distributed on an "AS IS" BASIS,  | 
11  |  | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  | 
12  |  | // See the License for the specific language governing permissions and  | 
13  |  | // limitations under the License.  | 
14  |  |  | 
15  |  | use core::convert::Into;  | 
16  |  | use core::pin::Pin;  | 
17  |  | use std::collections::{HashMap, VecDeque}; | 
18  |  |  | 
19  |  | use bytes::Bytes;  | 
20  |  | use futures::stream::{FuturesUnordered, Stream}; | 
21  |  | use futures::{StreamExt, TryStreamExt}; | 
22  |  | use nativelink_config::cas_server::{CasStoreConfig, WithInstanceName}; | 
23  |  | use nativelink_error::{Code, Error, ResultExt, error_if, make_input_err}; | 
24  |  | use nativelink_proto::build::bazel::remote::execution::v2::content_addressable_storage_server::{ | 
25  |  |     ContentAddressableStorage, ContentAddressableStorageServer as Server,  | 
26  |  | };  | 
27  |  | use nativelink_proto::build::bazel::remote::execution::v2::{ | 
28  |  |     BatchReadBlobsRequest, BatchReadBlobsResponse, BatchUpdateBlobsRequest,  | 
29  |  |     BatchUpdateBlobsResponse, Directory, FindMissingBlobsRequest, FindMissingBlobsResponse,  | 
30  |  |     GetTreeRequest, GetTreeResponse, batch_read_blobs_response, batch_update_blobs_response,  | 
31  |  |     compressor,  | 
32  |  | };  | 
33  |  | use nativelink_proto::google::rpc::Status as GrpcStatus;  | 
34  |  | use nativelink_store::ac_utils::get_and_decode_digest;  | 
35  |  | use nativelink_store::grpc_store::GrpcStore;  | 
36  |  | use nativelink_store::store_manager::StoreManager;  | 
37  |  | use nativelink_util::common::DigestInfo;  | 
38  |  | use nativelink_util::digest_hasher::make_ctx_for_hash_func;  | 
39  |  | use nativelink_util::store_trait::{Store, StoreLike}; | 
40  |  | use opentelemetry::context::FutureExt;  | 
41  |  | use tonic::{Request, Response, Status}; | 
42  |  | use tracing::{Instrument, Level, debug, error_span, instrument}; | 
43  |  |  | 
44  |  | #[derive(Debug)]  | 
45  |  | pub struct CasServer { | 
46  |  |     stores: HashMap<String, Store>,  | 
47  |  | }  | 
48  |  |  | 
49  |  | type GetTreeStream = Pin<Box<dyn Stream<Item = Result<GetTreeResponse, Status>> + Send + 'static>>;  | 
50  |  |  | 
51  |  | impl CasServer { | 
52  | 8  |     pub fn new(  | 
53  | 8  |         configs: &[WithInstanceName<CasStoreConfig>],  | 
54  | 8  |         store_manager: &StoreManager,  | 
55  | 8  |     ) -> Result<Self, Error> { | 
56  | 8  |         let mut stores = HashMap::with_capacity(configs.len());  | 
57  | 16  |         for config8  in configs {  | 
58  | 8  |             let store = store_manager.get_store(&config.cas_store).ok_or_else(|| {0   | 
59  | 0  |                 make_input_err!("'cas_store': '{}' does not exist", config.cas_store) | 
60  | 0  |             })?;  | 
61  | 8  |             stores.insert(config.instance_name.to_string(), store);  | 
62  |  |         }  | 
63  | 8  |         Ok(Self { stores }) | 
64  | 8  |     }  | 
65  |  |  | 
66  | 0  |     pub fn into_service(self) -> Server<Self> { | 
67  | 0  |         Server::new(self)  | 
68  | 0  |     }  | 
69  |  |  | 
70  | 4  |     async fn inner_find_missing_blobs(  | 
71  | 4  |         &self,  | 
72  | 4  |         request: FindMissingBlobsRequest,  | 
73  | 4  |     ) -> Result<Response<FindMissingBlobsResponse>, Error> { | 
74  | 4  |         let instance_name = &request.instance_name;  | 
75  | 4  |         let store = self  | 
76  | 4  |             .stores  | 
77  | 4  |             .get(instance_name)  | 
78  | 4  |             .err_tip(|| format!("'instance_name' not configured for '{instance_name}'"0 ))?0   | 
79  | 4  |             .clone();  | 
80  |  |  | 
81  | 4  |         let mut requested_blobs = Vec::with_capacity(request.blob_digests.len());  | 
82  | 10  |         for digest7  in &request.blob_digests {  | 
83  | 7  |             requested_blobs.push6 (DigestInfo::try_from(digest.clone())?1 .into6 ());  | 
84  |  |         }  | 
85  | 3  |         let sizes = store  | 
86  | 3  |             .has_many(&requested_blobs)  | 
87  | 3  |             .await  | 
88  | 3  |             .err_tip(|| "In find_missing_blobs")?0 ;  | 
89  | 3  |         let missing_blob_digests = sizes  | 
90  | 3  |             .into_iter()  | 
91  | 3  |             .zip(request.blob_digests)  | 
92  | 5  |             .filter_map3 (|(maybe_size, digest)| maybe_size.map_or_else(|| Some(digest2 ), |_| None))  | 
93  | 3  |             .collect();  | 
94  |  |  | 
95  | 3  |         Ok(Response::new(FindMissingBlobsResponse { | 
96  | 3  |             missing_blob_digests,  | 
97  | 3  |         }))  | 
98  | 4  |     }  | 
99  |  |  | 
100  | 2  |     async fn inner_batch_update_blobs(  | 
101  | 2  |         &self,  | 
102  | 2  |         request: BatchUpdateBlobsRequest,  | 
103  | 2  |     ) -> Result<Response<BatchUpdateBlobsResponse>, Error> { | 
104  | 2  |         let instance_name = &request.instance_name;  | 
105  |  |  | 
106  | 2  |         let store = self  | 
107  | 2  |             .stores  | 
108  | 2  |             .get(instance_name)  | 
109  | 2  |             .err_tip(|| format!("'instance_name' not configured for '{instance_name}'"0 ))?0   | 
110  | 2  |             .clone();  | 
111  |  |  | 
112  |  |         // If we are a GrpcStore we shortcut here, as this is a special store.  | 
113  |  |         // Note: We don't know the digests here, so we try perform a very shallow  | 
114  |  |         // check to see if it's a grpc store.  | 
115  | 2  |         if let Some(grpc_store0 ) = store.downcast_ref::<GrpcStore>(None) {   Branch (115:16): [True: 0, False: 2]
   Branch (115:16): [Folded - Ignored]
  | 
116  | 0  |             return grpc_store.batch_update_blobs(Request::new(request)).await;  | 
117  | 2  |         }  | 
118  |  |  | 
119  | 2  |         let store_ref = &store;  | 
120  | 2  |         let update_futures: FuturesUnordered<_> = request  | 
121  | 2  |             .requests  | 
122  | 2  |             .into_iter()  | 
123  | 3  |             .map2 (|request| async move {  | 
124  | 3  |                 let digest = request  | 
125  | 3  |                     .digest  | 
126  | 3  |                     .clone()  | 
127  | 3  |                     .err_tip(|| "Digest not found in request")?0 ;  | 
128  | 3  |                 let request_data = request.data;  | 
129  | 3  |                 let digest_info = DigestInfo::try_from(digest.clone())?0 ;  | 
130  | 3  |                 let size_bytes = usize::try_from(digest_info.size_bytes())  | 
131  | 3  |                     .err_tip(|| "Digest size_bytes was not convertible to usize")?0 ;  | 
132  | 0  |                 error_if!(  | 
133  | 3  |                     size_bytes != request_data.len(),   Branch (133:21): [True: 0, False: 3]
   Branch (133:21): [Folded - Ignored]
  | 
134  |  |                     "Digest for upload had mismatching sizes, digest said {} data  said {}", | 
135  |  |                     size_bytes,  | 
136  | 0  |                     request_data.len()  | 
137  |  |                 );  | 
138  | 3  |                 let result = store_ref  | 
139  | 3  |                     .update_oneshot(digest_info, request_data)  | 
140  | 3  |                     .await  | 
141  | 3  |                     .err_tip(|| "Error writing to store");  | 
142  |  |                 Ok::<_, Error>(batch_update_blobs_response::Response { | 
143  | 3  |                     digest: Some(digest),  | 
144  | 3  |                     status: Some(result.map_or_else(Into::into, |()| GrpcStatus::default())),  | 
145  |  |                 })  | 
146  | 6  |             })  | 
147  | 2  |             .collect();  | 
148  | 2  |         let responses = update_futures  | 
149  | 2  |             .try_collect::<Vec<batch_update_blobs_response::Response>>()  | 
150  | 2  |             .await?0 ;  | 
151  |  |  | 
152  | 2  |         Ok(Response::new(BatchUpdateBlobsResponse { responses })) | 
153  | 2  |     }  | 
154  |  |  | 
155  | 1  |     async fn inner_batch_read_blobs(  | 
156  | 1  |         &self,  | 
157  | 1  |         request: BatchReadBlobsRequest,  | 
158  | 1  |     ) -> Result<Response<BatchReadBlobsResponse>, Error> { | 
159  | 1  |         let instance_name = &request.instance_name;  | 
160  |  |  | 
161  | 1  |         let store = self  | 
162  | 1  |             .stores  | 
163  | 1  |             .get(instance_name)  | 
164  | 1  |             .err_tip(|| format!("'instance_name' not configured for '{instance_name}'"0 ))?0   | 
165  | 1  |             .clone();  | 
166  |  |  | 
167  |  |         // If we are a GrpcStore we shortcut here, as this is a special store.  | 
168  |  |         // Note: We don't know the digests here, so we try perform a very shallow  | 
169  |  |         // check to see if it's a grpc store.  | 
170  | 1  |         if let Some(grpc_store0 ) = store.downcast_ref::<GrpcStore>(None) {   Branch (170:16): [True: 0, False: 1]
   Branch (170:16): [Folded - Ignored]
  | 
171  | 0  |             return grpc_store.batch_read_blobs(Request::new(request)).await;  | 
172  | 1  |         }  | 
173  |  |  | 
174  | 1  |         let store_ref = &store;  | 
175  | 1  |         let read_futures: FuturesUnordered<_> = request  | 
176  | 1  |             .digests  | 
177  | 1  |             .into_iter()  | 
178  | 3  |             .map1 (|digest| async move {  | 
179  | 3  |                 let digest_copy = DigestInfo::try_from(digest.clone())?0 ;  | 
180  |  |                 // TODO(palfrey) There is a security risk here of someone taking all the memory on the instance.  | 
181  | 3  |                 let result = store_ref  | 
182  | 3  |                     .get_part_unchunked(digest_copy, 0, None)  | 
183  | 3  |                     .await  | 
184  | 3  |                     .err_tip(|| "Error reading from store");  | 
185  | 3  |                 let (status, data) = result.map_or_else(  | 
186  | 1  |                     |mut e| { | 
187  | 1  |                         if e.code == Code::NotFound {  Branch (187:28): [True: 1, False: 0]
   Branch (187:28): [Folded - Ignored]
  | 
188  | 1  |                             // Trim the error code. Not Found is quite common and we don't want to send a large  | 
189  | 1  |                             // error (debug) message for something that is common. We resize to just the last  | 
190  | 1  |                             // message as it will be the most relevant.  | 
191  | 1  |                             e.messages.resize_with(1, String::new);  | 
192  | 1  |                         }0   | 
193  | 1  |                         (e.into(), Bytes::new())  | 
194  | 1  |                     },  | 
195  | 2  |                     |v| (GrpcStatus::default(), v),  | 
196  |  |                 );  | 
197  | 3  |                 Ok::<_, Error>(batch_read_blobs_response::Response { | 
198  | 3  |                     status: Some(status),  | 
199  | 3  |                     digest: Some(digest),  | 
200  | 3  |                     compressor: compressor::Value::Identity.into(),  | 
201  | 3  |                     data,  | 
202  | 3  |                 })  | 
203  | 6  |             })  | 
204  | 1  |             .collect();  | 
205  | 1  |         let responses = read_futures  | 
206  | 1  |             .try_collect::<Vec<batch_read_blobs_response::Response>>()  | 
207  | 1  |             .await?0 ;  | 
208  |  |  | 
209  | 1  |         Ok(Response::new(BatchReadBlobsResponse { responses })) | 
210  | 1  |     }  | 
211  |  |  | 
212  | 6  |     async fn inner_get_tree(  | 
213  | 6  |         &self,  | 
214  | 6  |         request: GetTreeRequest,  | 
215  | 6  |     ) -> Result<impl Stream<Item = Result<GetTreeResponse, Status>> + Send + use<>, Error> { | 
216  | 6  |         let instance_name = &request.instance_name;  | 
217  |  |  | 
218  | 6  |         let store = self  | 
219  | 6  |             .stores  | 
220  | 6  |             .get(instance_name)  | 
221  | 6  |             .err_tip(|| format!("'instance_name' not configured for '{instance_name}'"0 ))?0   | 
222  | 6  |             .clone();  | 
223  |  |  | 
224  |  |         // If we are a GrpcStore we shortcut here, as this is a special store.  | 
225  |  |         // Note: We don't know the digests here, so we try perform a very shallow  | 
226  |  |         // check to see if it's a grpc store.  | 
227  | 6  |         if let Some(grpc_store0 ) = store.downcast_ref::<GrpcStore>(None) {   Branch (227:16): [True: 0, False: 6]
   Branch (227:16): [Folded - Ignored]
  | 
228  | 0  |             let stream = grpc_store  | 
229  | 0  |                 .get_tree(Request::new(request))  | 
230  | 0  |                 .await?  | 
231  | 0  |                 .into_inner();  | 
232  | 0  |             return Ok(stream.left_stream());  | 
233  | 6  |         }  | 
234  | 6  |         let root_digest: DigestInfo = request  | 
235  | 6  |             .root_digest  | 
236  | 6  |             .err_tip(|| "Expected root_digest to exist in GetTreeRequest")?0   | 
237  | 6  |             .try_into()  | 
238  | 6  |             .err_tip(|| "In GetTreeRequest::root_digest")?0 ;  | 
239  |  |  | 
240  | 6  |         let mut deque: VecDeque<DigestInfo> = VecDeque::new();  | 
241  | 6  |         let mut directories: Vec<Directory> = Vec::new();  | 
242  |  |         // `page_token` will return the `{hash_str}-{size_bytes}` of the current request's first directory digest. | 
243  | 6  |         let page_token_digest = if request.page_token.is_empty() {  Branch (243:36): [True: 2, False: 4]
   Branch (243:36): [Folded - Ignored]
  | 
244  | 2  |             root_digest  | 
245  |  |         } else { | 
246  | 4  |             let mut page_token_parts = request.page_token.split('-'); | 
247  | 4  |             DigestInfo::try_new(  | 
248  | 4  |                 page_token_parts  | 
249  | 4  |                     .next()  | 
250  | 4  |                     .err_tip(|| "Failed to parse `hash_str` in `page_token`")?0 ,  | 
251  | 4  |                 page_token_parts  | 
252  | 4  |                     .next()  | 
253  | 4  |                     .err_tip(|| "Failed to parse `size_bytes` in `page_token`")?0   | 
254  | 4  |                     .parse::<i64>()  | 
255  | 4  |                     .err_tip(|| "Failed to parse `size_bytes` as i64")?0 ,  | 
256  |  |             )  | 
257  | 4  |             .err_tip(|| "Failed to parse `page_token` as `Digest` in `GetTreeRequest`")?0   | 
258  |  |         };  | 
259  | 6  |         let page_size = request.page_size;  | 
260  |  |         // If `page_size` is 0, paging is not necessary.  | 
261  | 6  |         let mut page_token_matched = page_size == 0;  | 
262  | 6  |         deque.push_back(root_digest);  | 
263  |  |  | 
264  | 28  |         while !deque.is_empty() {  Branch (264:15): [True: 26, False: 2]
   Branch (264:15): [Folded - Ignored]
  | 
265  | 26  |             let digest: DigestInfo = deque.pop_front().err_tip(|| "In VecDeque::pop_front")?0 ;  | 
266  | 26  |             let directory = get_and_decode_digest::<Directory>(&store, digest.into())  | 
267  | 26  |                 .await  | 
268  | 26  |                 .err_tip(|| "Converting digest to Directory")?0 ;  | 
269  | 26  |             if digest == page_token_digest {  Branch (269:16): [True: 6, False: 20]
   Branch (269:16): [Folded - Ignored]
  | 
270  | 6  |                 page_token_matched = true;  | 
271  | 20  |             }  | 
272  | 56  |             for directory30  in &directory.directories {  | 
273  | 30  |                 let digest: DigestInfo = directory  | 
274  | 30  |                     .digest  | 
275  | 30  |                     .clone()  | 
276  | 30  |                     .err_tip(|| "Expected Digest to exist in Directory::directories::digest")?0   | 
277  | 30  |                     .try_into()  | 
278  | 30  |                     .err_tip(|| "In Directory::file::digest")?0 ;  | 
279  | 30  |                 deque.push_back(digest);  | 
280  |  |             }  | 
281  |  |  | 
282  | 26  |             let page_size_usize = usize::try_from(page_size).unwrap_or(usize::MAX);  | 
283  |  |  | 
284  | 26  |             if page_token_matched {  Branch (284:16): [True: 20, False: 6]
   Branch (284:16): [Folded - Ignored]
  | 
285  | 20  |                 directories.push(directory);  | 
286  | 20  |                 if directories.len() == page_size_usize {  Branch (286:20): [True: 4, False: 16]
   Branch (286:20): [Folded - Ignored]
  | 
287  | 4  |                     break;  | 
288  | 16  |                 }  | 
289  | 6  |             }  | 
290  |  |         }  | 
291  |  |         // `next_page_token` will return the `{hash_str}:{size_bytes}` of the next request's first directory digest. | 
292  |  |         // It will be an empty string when it reached the end of the directory tree.  | 
293  | 6  |         let next_page_token: String = deque  | 
294  | 6  |             .front()  | 
295  | 6  |             .map_or_else(String::new, |value| format!("{value}"3 ));  | 
296  |  |  | 
297  | 6  |         Ok(futures::stream::once(async { | 
298  | 6  |             Ok(GetTreeResponse { | 
299  | 6  |                 directories,  | 
300  | 6  |                 next_page_token,  | 
301  | 6  |             })  | 
302  | 6  |         })  | 
303  | 6  |         .right_stream())  | 
304  | 6  |     }  | 
305  |  | }  | 
306  |  |  | 
307  |  | #[tonic::async_trait]  | 
308  |  | impl ContentAddressableStorage for CasServer { | 
309  |  |     type GetTreeStream = GetTreeStream;  | 
310  |  |  | 
311  |  |     #[instrument(  | 
312  |  |         err,  | 
313  |  |         ret(level = Level::DEBUG),  | 
314  |  |         level = Level::ERROR,  | 
315  |  |         skip_all,  | 
316  |  |         fields(  | 
317  |  |             // Mostly to skip request.blob_digests which is sometimes enormous  | 
318  |  |             request.instance_name = ?grpc_request.get_ref().instance_name,  | 
319  |  |             request.digest_function = ?grpc_request.get_ref().digest_function  | 
320  |  |         )  | 
321  |  |     )]  | 
322  |  |     async fn find_missing_blobs(  | 
323  |  |         &self,  | 
324  |  |         grpc_request: Request<FindMissingBlobsRequest>,  | 
325  |  |     ) -> Result<Response<FindMissingBlobsResponse>, Status> { | 
326  |  |         let request = grpc_request.into_inner();  | 
327  |  |         let digest_function = request.digest_function;  | 
328  |  |         self.inner_find_missing_blobs(request)  | 
329  |  |             .instrument(error_span!("cas_server_find_missing_blobs")) | 
330  |  |             .with_context(  | 
331  |  |                 make_ctx_for_hash_func(digest_function)  | 
332  |  |                     .err_tip(|| "In CasServer::find_missing_blobs")?,  | 
333  |  |             )  | 
334  |  |             .await  | 
335  |  |             .err_tip(|| "Failed on find_missing_blobs() command")  | 
336  |  |             .map_err(Into::into)  | 
337  |  |     }  | 
338  |  |  | 
339  |  |     #[instrument(  | 
340  |  |         err,  | 
341  |  |         ret(level = Level::DEBUG),  | 
342  |  |         level = Level::ERROR,  | 
343  |  |         skip_all,  | 
344  |  |         fields(request = ?grpc_request.get_ref())  | 
345  |  |     )]  | 
346  |  |     async fn batch_update_blobs(  | 
347  |  |         &self,  | 
348  |  |         grpc_request: Request<BatchUpdateBlobsRequest>,  | 
349  |  |     ) -> Result<Response<BatchUpdateBlobsResponse>, Status> { | 
350  |  |         let request = grpc_request.into_inner();  | 
351  |  |         let digest_function = request.digest_function;  | 
352  |  |  | 
353  |  |         self.inner_batch_update_blobs(request)  | 
354  |  |             .instrument(error_span!("cas_server_batch_update_blobs")) | 
355  |  |             .with_context(  | 
356  |  |                 make_ctx_for_hash_func(digest_function)  | 
357  |  |                     .err_tip(|| "In CasServer::batch_update_blobs")?,  | 
358  |  |             )  | 
359  |  |             .await  | 
360  |  |             .err_tip(|| "Failed on batch_update_blobs() command")  | 
361  |  |             .map_err(Into::into)  | 
362  |  |     }  | 
363  |  |  | 
364  |  |     #[instrument(  | 
365  |  |         err,  | 
366  |  |         ret(level = Level::INFO),  | 
367  |  |         level = Level::ERROR,  | 
368  |  |         skip_all,  | 
369  |  |         fields(request = ?grpc_request.get_ref())  | 
370  |  |     )]  | 
371  |  |     async fn batch_read_blobs(  | 
372  |  |         &self,  | 
373  |  |         grpc_request: Request<BatchReadBlobsRequest>,  | 
374  |  |     ) -> Result<Response<BatchReadBlobsResponse>, Status> { | 
375  |  |         let request = grpc_request.into_inner();  | 
376  |  |         let digest_function = request.digest_function;  | 
377  |  |  | 
378  |  |         self.inner_batch_read_blobs(request)  | 
379  |  |             .instrument(error_span!("cas_server_batch_read_blobs")) | 
380  |  |             .with_context(  | 
381  |  |                 make_ctx_for_hash_func(digest_function)  | 
382  |  |                     .err_tip(|| "In CasServer::batch_read_blobs")?,  | 
383  |  |             )  | 
384  |  |             .await  | 
385  |  |             .err_tip(|| "Failed on batch_read_blobs() command")  | 
386  |  |             .map_err(Into::into)  | 
387  |  |     }  | 
388  |  |  | 
389  |  |     #[instrument(  | 
390  |  |         err,  | 
391  |  |         level = Level::ERROR,  | 
392  |  |         skip_all,  | 
393  |  |         fields(request = ?grpc_request.get_ref())  | 
394  |  |     )]  | 
395  |  |     async fn get_tree(  | 
396  |  |         &self,  | 
397  |  |         grpc_request: Request<GetTreeRequest>,  | 
398  |  |     ) -> Result<Response<Self::GetTreeStream>, Status> { | 
399  |  |         let request = grpc_request.into_inner();  | 
400  |  |         let digest_function = request.digest_function;  | 
401  |  |  | 
402  |  |         let resp = self  | 
403  |  |             .inner_get_tree(request)  | 
404  |  |             .instrument(error_span!("cas_server_get_tree")) | 
405  |  |             .with_context(  | 
406  |  |                 make_ctx_for_hash_func(digest_function).err_tip(|| "In CasServer::get_tree")?,  | 
407  |  |             )  | 
408  |  |             .await  | 
409  |  |             .err_tip(|| "Failed on get_tree() command")  | 
410  | 6  |             .map(|stream| -> Response<Self::GetTreeStream> { Response::new(Box::pin(stream)) }) | 
411  |  |             .map_err(Into::into);  | 
412  |  |  | 
413  |  |         if resp.is_ok() { | 
414  |  |             debug!(return = "Ok(<stream>)");  | 
415  |  |         }  | 
416  |  |         resp  | 
417  |  |     }  | 
418  |  | }  |