/build/source/nativelink-store/src/compression_store.rs
Line  | Count  | Source  | 
1  |  | // Copyright 2024 The NativeLink Authors. All rights reserved.  | 
2  |  | //  | 
3  |  | // Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License");  | 
4  |  | // you may not use this file except in compliance with the License.  | 
5  |  | // You may obtain a copy of the License at  | 
6  |  | //  | 
7  |  | //    See LICENSE file for details  | 
8  |  | //  | 
9  |  | // Unless required by applicable law or agreed to in writing, software  | 
10  |  | // distributed under the License is distributed on an "AS IS" BASIS,  | 
11  |  | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  | 
12  |  | // See the License for the specific language governing permissions and  | 
13  |  | // limitations under the License.  | 
14  |  |  | 
15  |  | use core::cmp;  | 
16  |  | use core::pin::Pin;  | 
17  |  | use std::sync::Arc;  | 
18  |  |  | 
19  |  | use async_trait::async_trait;  | 
20  |  | use bincode::serde::{decode_from_slice, encode_to_vec}; | 
21  |  | use byteorder::{ByteOrder, LittleEndian}; | 
22  |  | use bytes::{Buf, BufMut, BytesMut}; | 
23  |  | use futures::future::FutureExt;  | 
24  |  | use lz4_flex::block::{compress_into, decompress_into, get_maximum_output_size}; | 
25  |  | use nativelink_config::stores::CompressionSpec;  | 
26  |  | use nativelink_error::{Code, Error, ResultExt, error_if, make_err}; | 
27  |  | use nativelink_metric::MetricsComponent;  | 
28  |  | use nativelink_util::buf_channel::{ | 
29  |  |     DropCloserReadHalf, DropCloserWriteHalf, make_buf_channel_pair,  | 
30  |  | };  | 
31  |  | use nativelink_util::health_utils::{HealthStatusIndicator, default_health_status_indicator}; | 
32  |  | use nativelink_util::spawn;  | 
33  |  | use nativelink_util::store_trait::{ | 
34  |  |     RemoveItemCallback, Store, StoreDriver, StoreKey, StoreLike, UploadSizeInfo,  | 
35  |  | };  | 
36  |  | use serde::{Deserialize, Serialize}; | 
37  |  |  | 
38  |  | use crate::cas_utils::is_zero_digest;  | 
39  |  |  | 
40  |  | // In the event the bytestream format changes this number should be incremented to prevent  | 
41  |  | // backwards compatibility issues.  | 
42  |  | pub const CURRENT_STREAM_FORMAT_VERSION: u8 = 1;  | 
43  |  |  | 
44  |  | // Default block size that will be used to slice stream into.  | 
45  |  | pub const DEFAULT_BLOCK_SIZE: u32 = 64 * 1024;  | 
46  |  |  | 
47  |  | const U32_SZ: u64 = size_of::<u8>() as u64;  | 
48  |  |  | 
49  |  | // We use a custom frame format here because I wanted the ability in the future to:  | 
50  |  | // * Read a random part of the data without needing to parse entire file.  | 
51  |  | // * Compress the data on the fly without needing to know the exact input size.  | 
52  |  | //  | 
53  |  | // The frame format that LZ4 uses does not contain an index of where the different  | 
54  |  | // blocks are located. This would mean in the event we only wanted the last byte of  | 
55  |  | // a file, we'd need to seek to the header of each block to find where the next block  | 
56  |  | // offset is until we got to the last block then decompress it.  | 
57  |  | //  | 
58  |  | // By using this custom frame format we keep the ability to have each block reference  | 
59  |  | // the next block (for efficiency), but also after all blocks we have a footer frame  | 
60  |  | // which contains an index of each block in the stream. This means that we can read a  | 
61  |  | // fixed number of bytes of the file, which always contain the size of the index, then  | 
62  |  | // with that size we know the exact size of the footer frame, which contains the entire  | 
63  |  | // index. Once this footer frame is loaded, we could then do some math to figure out  | 
64  |  | // which block each byte is in and the offset of each block within the compressed file.  | 
65  |  | //  | 
66  |  | // The frame format is as follows:  | 
67  |  | // |----------------------------------HEADER-----------------------------------------|  | 
68  |  | // |  version(u8) |  block_size (u32) |  upload_size_type (u32) |  upload_size (u32) |  | 
69  |  | // |----------------------------------BLOCK------------------------------------------|  | 
70  |  | // |  frame_type(u8) 0x00 |  compressed_data_size (u32) |        ...DATA...          |  | 
71  |  | // |                                ...DATA...                                       |  | 
72  |  | // | [Possibly repeat block]                                                         |  | 
73  |  | // |----------------------------------FOOTER-----------------------------------------|  | 
74  |  | // |  frame_type(u8) 0x01 |    footer_size (u32) |           index_count1 (u64)      |  | 
75  |  | // |      ...[pos_from_prev_index (u32) - repeat for count {index_count*}]...        | | 
76  |  | // |  index_count2 (u32) |    uncompressed_data_sz (u64) |    block_size (u32)       |  | 
77  |  | // | version (u8) |------------------------------------------------------------------|  | 
78  |  | // |---------------------------------------------------------------------------------|  | 
79  |  | //  | 
80  |  | // version              - A constant number used to define what version of this format is being  | 
81  |  | //                        used. Version in header and footer must match.  | 
82  |  | // block_size           - Size of each block uncompressed except for last block. This means that  | 
83  |  | //                        every block uncompressed will be a constant size except last block may  | 
84  |  | //                        be variable size. Block size in header and footer must match.  | 
85  |  | // upload_size_type     - Value of 0 = UploadSizeInfo::ExactSize, 1 = UploadSizeInfo::MaxSize.  | 
86  |  | //                        This is for debug reasons only.  | 
87  |  | // upload_size          - The size of the data. WARNING: Do not rely on this being the uncompressed  | 
88  |  | //                        payload size. It is a debug field and a "best guess" on how large the data  | 
89  |  | //                        is. The header does not contain the upload data size. This value is the  | 
90  |  | //                        value counter part to what the `upload_size_type` field.  | 
91  |  | // frame_type           - Type of each frame. 0 = BLOCK frame, 1 = FOOTER frame. Header frame will  | 
92  |  | //                        always start with the first byte of the stream, so no magic number for it.  | 
93  |  | // compressed_data_size - The size of this block. The bytes after this field should be read  | 
94  |  | //                        in sequence to get all of the block's data in this block.  | 
95  |  | // footer_size          - Size of the footer for bytes after this field.  | 
96  |  | // index_count1         - Number of items in the index. ({index_count1} * 4) represents the number | 
97  |  | //                        of bytes that should be read after this field in order to get all index  | 
98  |  | //                        data.  | 
99  |  | // pos_from_prev_index  - Index of an individual block in the stream relative to the previous  | 
100  |  | //                        block. This field may repeat {index_count} times. | 
101  |  | // index_count2         - Same as {index_count1} and should always be equal. This field might be | 
102  |  | //                        useful if you want to read a random byte from this stream, have random  | 
103  |  | //                        access to it and know the size of the stream during reading, because  | 
104  |  | //                        this field is always in the exact same place in the stream relative to  | 
105  |  | //                        the last byte.  | 
106  |  | // uncompressed_data_sz - Size of the original uncompressed data.  | 
107  |  | //  | 
108  |  | // Note: All fields fields little-endian.  | 
109  |  |  | 
110  |  | /// Number representing a chunk.  | 
111  |  | pub const CHUNK_FRAME_TYPE: u8 = 0;  | 
112  |  |  | 
113  |  | /// Number representing the footer.  | 
114  |  | pub const FOOTER_FRAME_TYPE: u8 = 1;  | 
115  |  |  | 
116  |  | /// This is a partial mirror of `nativelink_config::stores::Lz4Config`.  | 
117  |  | /// We cannot use that natively here because it could cause our  | 
118  |  | /// serialized format to change if we added more configs.  | 
119  |  | #[derive(Serialize, Deserialize, PartialEq, Eq, Debug, Default, Copy, Clone)]  | 
120  |  | pub struct Lz4Config { | 
121  |  |     pub block_size: u32,  | 
122  |  | }  | 
123  |  |  | 
124  |  | #[derive(Serialize, Deserialize, PartialEq, Eq, Debug, Clone, Copy)]  | 
125  |  | pub struct Header { | 
126  |  |     pub version: u8,  | 
127  |  |     pub config: Lz4Config,  | 
128  |  |     pub upload_size: UploadSizeInfo,  | 
129  |  | }  | 
130  |  |  | 
131  |  | #[derive(Serialize, Deserialize, PartialEq, Eq, Debug, Default, Clone, Copy)]  | 
132  |  | pub struct SliceIndex { | 
133  |  |     pub position_from_prev_index: u32,  | 
134  |  | }  | 
135  |  |  | 
136  |  | #[derive(Serialize, Deserialize, PartialEq, Eq, Debug, Default)]  | 
137  |  | pub struct Footer { | 
138  |  |     pub indexes: Vec<SliceIndex>,  | 
139  |  |     pub index_count: u32,  | 
140  |  |     pub uncompressed_data_size: u64,  | 
141  |  |     pub config: Lz4Config,  | 
142  |  |     pub version: u8,  | 
143  |  | }  | 
144  |  |  | 
145  |  | /// `lz4_flex::block::get_maximum_output_size()` way over estimates, so we use the  | 
146  |  | /// one provided here: <https://github.com/torvalds/linux/blob/master/include/linux/lz4.h#L61>  | 
147  |  | /// Local testing shows this gives quite accurate worst case given random input.  | 
148  | 6  | const fn lz4_compress_bound(input_size: u64) -> u64 { | 
149  | 6  |     input_size + (input_size / 255) + 16  | 
150  | 6  | }  | 
151  |  |  | 
152  | 1.24k  | fn serialized_size<C>(value: &impl Serialize, config: C) -> Result<u64, Error>  | 
153  | 1.24k  | where  | 
154  | 1.24k  |     C: bincode::config::Config,  | 
155  |  | { | 
156  | 1.24k  |     let mut size_writer = bincode::enc::write::SizeWriter { bytes_written: 0 }; | 
157  | 1.24k  |     bincode::serde::encode_into_writer(value, &mut size_writer, config).map_err(|e| {0   | 
158  | 0  |         make_err!(  | 
159  | 0  |             Code::Internal,  | 
160  |  |             "Failed to calculate serialized size: {:?}", | 
161  |  |             e  | 
162  |  |         )  | 
163  | 0  |     })?;  | 
164  | 1.24k  |     Ok(size_writer.bytes_written as u64)  | 
165  | 1.24k  | }  | 
166  |  |  | 
167  |  | struct UploadState { | 
168  |  |     header: Header,  | 
169  |  |     footer: Footer,  | 
170  |  |     max_output_size: u64,  | 
171  |  |     input_max_size: u64,  | 
172  |  | }  | 
173  |  |  | 
174  |  | impl UploadState { | 
175  | 6  |     pub(crate) fn new(  | 
176  | 6  |         store: &CompressionStore,  | 
177  | 6  |         upload_size: UploadSizeInfo,  | 
178  | 6  |     ) -> Result<Self, Error> { | 
179  | 6  |         let input_max_size = match upload_size { | 
180  | 6  |             UploadSizeInfo::MaxSize(sz1 ) | UploadSizeInfo::ExactSize(sz5 ) => sz,  | 
181  |  |         };  | 
182  |  |  | 
183  | 6  |         let max_index_count = (input_max_size / u64::from(store.config.block_size)) + 1;  | 
184  |  |  | 
185  | 6  |         let header = Header { | 
186  | 6  |             version: CURRENT_STREAM_FORMAT_VERSION,  | 
187  | 6  |             config: Lz4Config { | 
188  | 6  |                 block_size: store.config.block_size,  | 
189  | 6  |             },  | 
190  | 6  |             upload_size,  | 
191  | 6  |         };  | 
192  | 6  |         let footer = Footer { | 
193  | 6  |             indexes: vec![  | 
194  | 6  |                 SliceIndex::default();  | 
195  | 6  |                 usize::try_from(max_index_count)  | 
196  | 6  |                     .err_tip(|| "Could not convert max_index_count to usize")?0   | 
197  |  |             ],  | 
198  | 6  |             index_count: u32::try_from(max_index_count)  | 
199  | 6  |                 .err_tip(|| "Could not convert max_index_count to u32")?0 ,  | 
200  |  |             uncompressed_data_size: 0, // Updated later.  | 
201  | 6  |             config: header.config,  | 
202  |  |             version: CURRENT_STREAM_FORMAT_VERSION,  | 
203  |  |         };  | 
204  |  |  | 
205  |  |         // This is more accurate of an estimate than what get_maximum_output_size calculates.  | 
206  | 6  |         let max_block_size = lz4_compress_bound(u64::from(store.config.block_size)) + U32_SZ + 1;  | 
207  |  |  | 
208  | 6  |         let max_output_size = { | 
209  | 6  |             let header_size = serialized_size(&header, store.bincode_config)?0 ;  | 
210  | 6  |             let max_content_size = max_block_size * max_index_count;  | 
211  | 6  |             let max_footer_size = U32_SZ + 1 + serialized_size(&footer, store.bincode_config)?0 ;  | 
212  | 6  |             header_size + max_content_size + max_footer_size  | 
213  |  |         };  | 
214  |  |  | 
215  | 6  |         Ok(Self { | 
216  | 6  |             header,  | 
217  | 6  |             footer,  | 
218  | 6  |             max_output_size,  | 
219  | 6  |             input_max_size,  | 
220  | 6  |         })  | 
221  | 6  |     }  | 
222  |  | }  | 
223  |  |  | 
224  |  | // TODO(jaroeichler): Use the default `standard` config.  | 
225  |  | type LegacyBincodeConfig = bincode::config::Configuration<  | 
226  |  |     bincode::config::LittleEndian,  | 
227  |  |     bincode::config::Fixint,  | 
228  |  |     bincode::config::NoLimit,  | 
229  |  | >;  | 
230  |  |  | 
231  |  | /// This store will compress data before sending it on to the inner store.  | 
232  |  | /// Note: Currently using `get_part()` and trying to read part of the data will  | 
233  |  | /// result in the entire contents being read from the inner store but will  | 
234  |  | /// only send the contents requested.  | 
235  |  | #[derive(MetricsComponent)]  | 
236  |  | pub struct CompressionStore { | 
237  |  |     #[metric(group = "inner_store")]  | 
238  |  |     inner_store: Store,  | 
239  |  |     config: nativelink_config::stores::Lz4Config,  | 
240  |  |     bincode_config: LegacyBincodeConfig,  | 
241  |  | }  | 
242  |  |  | 
243  |  | impl core::fmt::Debug for CompressionStore { | 
244  | 0  |     fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { | 
245  | 0  |         f.debug_struct("CompressionStore") | 
246  | 0  |             .field("inner_store", &self.inner_store) | 
247  | 0  |             .field("config", &self.config) | 
248  | 0  |             .finish_non_exhaustive()  | 
249  | 0  |     }  | 
250  |  | }  | 
251  |  |  | 
252  |  | impl CompressionStore { | 
253  | 7  |     pub fn new(spec: &CompressionSpec, inner_store: Store) -> Result<Arc<Self>, Error> { | 
254  | 7  |         let lz4_config = match spec.compression_algorithm { | 
255  | 7  |             nativelink_config::stores::CompressionAlgorithm::Lz4(mut lz4_config) => { | 
256  | 7  |                 if lz4_config.block_size == 0 {  Branch (256:20): [True: 3, False: 4]
   Branch (256:20): [Folded - Ignored]
  | 
257  | 3  |                     lz4_config.block_size = DEFAULT_BLOCK_SIZE;  | 
258  | 4  |                 }  | 
259  | 7  |                 if lz4_config.max_decode_block_size == 0 {  Branch (259:20): [True: 7, False: 0]
   Branch (259:20): [Folded - Ignored]
  | 
260  | 7  |                     lz4_config.max_decode_block_size = lz4_config.block_size;  | 
261  | 7  |                 }0   | 
262  | 7  |                 lz4_config  | 
263  |  |             }  | 
264  |  |         };  | 
265  | 7  |         Ok(Arc::new(Self { | 
266  | 7  |             inner_store,  | 
267  | 7  |             config: lz4_config,  | 
268  | 7  |             bincode_config: bincode::config::legacy(),  | 
269  | 7  |         }))  | 
270  | 7  |     }  | 
271  |  | }  | 
272  |  |  | 
273  |  | #[async_trait]  | 
274  |  | impl StoreDriver for CompressionStore { | 
275  |  |     async fn has_with_results(  | 
276  |  |         self: Pin<&Self>,  | 
277  |  |         digests: &[StoreKey<'_>],  | 
278  |  |         results: &mut [Option<u64>],  | 
279  | 0  |     ) -> Result<(), Error> { | 
280  |  |         self.inner_store.has_with_results(digests, results).await  | 
281  | 0  |     }  | 
282  |  |  | 
283  |  |     async fn update(  | 
284  |  |         self: Pin<&Self>,  | 
285  |  |         key: StoreKey<'_>,  | 
286  |  |         mut reader: DropCloserReadHalf,  | 
287  |  |         upload_size: UploadSizeInfo,  | 
288  | 6  |     ) -> Result<(), Error> { | 
289  |  |         let mut output_state = UploadState::new(&self, upload_size)?;  | 
290  |  |  | 
291  |  |         let (mut tx, rx) = make_buf_channel_pair();  | 
292  |  |  | 
293  |  |         let inner_store = self.inner_store.clone();  | 
294  |  |         let key = key.into_owned();  | 
295  | 6  |         let update_fut = spawn!("compression_store_update_spawn", async move { | 
296  | 6  |             inner_store  | 
297  | 6  |                 .update(  | 
298  | 6  |                     key,  | 
299  | 6  |                     rx,  | 
300  | 6  |                     UploadSizeInfo::MaxSize(output_state.max_output_size),  | 
301  | 6  |                 )  | 
302  | 6  |                 .await  | 
303  | 6  |                 .err_tip(|| "Inner store update in compression store failed")  | 
304  | 6  |         })  | 
305  |  |         .map(  | 
306  | 6  |             |result| match result.err_tip(|| "Failed to run compression update spawn") { | 
307  | 6  |                 Ok(inner_result) => { | 
308  | 6  |                     inner_result.err_tip(|| "Compression underlying store update failed")  | 
309  |  |                 }  | 
310  | 0  |                 Err(e) => Err(e),  | 
311  | 6  |             },  | 
312  |  |         );  | 
313  |  |  | 
314  | 6  |         let write_fut = async move { | 
315  |  |             { | 
316  |  |                 // Write Header.  | 
317  | 6  |                 let serialized_header = encode_to_vec(output_state.header, self.bincode_config)  | 
318  | 6  |                     .map_err(|e| {0   | 
319  | 0  |                         make_err!(Code::Internal, "Failed to serialize header : {:?}", e) | 
320  | 0  |                     })?;  | 
321  | 6  |                 tx.send(serialized_header.into())  | 
322  | 6  |                     .await  | 
323  | 6  |                     .err_tip(|| "Failed to write compression header on upload")?0 ;  | 
324  |  |             }  | 
325  |  |  | 
326  | 6  |             let mut received_amt = 0;  | 
327  | 6  |             let mut index_count: u32 = 0;  | 
328  | 99  |             for index98  in &mut output_state.footer.indexes {  | 
329  | 98  |                 let chunk = reader  | 
330  | 98  |                     .consume(Some(self.config.block_size as usize))  | 
331  | 98  |                     .await  | 
332  | 98  |                     .err_tip(|| "Failed to read take in update in compression store")?0 ;  | 
333  | 98  |                 if chunk.is_empty() {  Branch (333:20): [True: 5, False: 93]
   Branch (333:20): [Folded - Ignored]
  | 
334  | 5  |                     break; // EOF.  | 
335  | 93  |                 }  | 
336  |  |  | 
337  | 93  |                 received_amt += u64::try_from(chunk.len())  | 
338  | 93  |                     .err_tip(|| "Could not convert chunk.len() to u64")?0 ;  | 
339  | 0  |                 error_if!(  | 
340  | 93  |                     received_amt > output_state.input_max_size,   Branch (340:21): [True: 0, False: 93]
   Branch (340:21): [Folded - Ignored]
  | 
341  |  |                     "Got more data than stated in compression store upload request"  | 
342  |  |                 );  | 
343  |  |  | 
344  | 93  |                 let max_output_size = get_maximum_output_size(self.config.block_size as usize);  | 
345  | 93  |                 let mut compressed_data_buf = BytesMut::with_capacity(max_output_size);  | 
346  | 93  |                 compressed_data_buf.put_u8(CHUNK_FRAME_TYPE);  | 
347  | 93  |                 compressed_data_buf.put_u32_le(0); // Filled later.  | 
348  |  |  | 
349  |  |                 // For efficiency reasons we do some raw slice manipulation so we can write directly  | 
350  |  |                 // into our buffer instead of having to do another allocation.  | 
351  | 93  |                 let raw_compressed_data = unsafe { | 
352  | 93  |                     core::slice::from_raw_parts_mut(  | 
353  | 93  |                         compressed_data_buf.chunk_mut().as_mut_ptr(),  | 
354  | 93  |                         max_output_size,  | 
355  | 93  |                     )  | 
356  |  |                 };  | 
357  |  |  | 
358  | 93  |                 let compressed_data_sz = compress_into(&chunk, raw_compressed_data)  | 
359  | 93  |                     .map_err(|e| make_err!(Code::Internal0 , "Compression error {:?}", e))?0 ;  | 
360  | 93  |                 unsafe { | 
361  | 93  |                     compressed_data_buf.advance_mut(compressed_data_sz);  | 
362  | 93  |                 }  | 
363  |  |  | 
364  |  |                 // Now fill the size in our slice.  | 
365  | 93  |                 LittleEndian::write_u32(  | 
366  | 93  |                     &mut compressed_data_buf[1..5],  | 
367  | 93  |                     u32::try_from(compressed_data_sz).unwrap_or(u32::MAX),  | 
368  |  |                 );  | 
369  |  |  | 
370  |  |                 // Now send our chunk.  | 
371  | 93  |                 tx.send(compressed_data_buf.freeze())  | 
372  | 93  |                     .await  | 
373  | 93  |                     .err_tip(|| "Failed to write chunk to inner store in compression store")?0 ;  | 
374  |  |  | 
375  | 93  |                 index.position_from_prev_index =  | 
376  | 93  |                     u32::try_from(compressed_data_sz).unwrap_or(u32::MAX);  | 
377  |  |  | 
378  | 93  |                 index_count += 1;  | 
379  |  |             }  | 
380  |  |             // Index 0 is actually a pointer to the second chunk. This is because we don't need  | 
381  |  |             // an index for the first item, since it starts at position `{header_len}`. | 
382  |  |             // The code above causes us to create 1 more index than we actually need, so we  | 
383  |  |             // remove the last index from our vector here, because at this point we are always  | 
384  |  |             // one index too many.  | 
385  |  |             // Note: We need to be careful that if we don't have any data (zero bytes) it  | 
386  |  |             // doesn't go to -1.  | 
387  | 6  |             index_count = index_count.saturating_sub(1);  | 
388  | 6  |             output_state  | 
389  | 6  |                 .footer  | 
390  | 6  |                 .indexes  | 
391  | 6  |                 .resize(index_count as usize, SliceIndex::default());  | 
392  | 6  |             output_state.footer.index_count =  | 
393  | 6  |                 u32::try_from(output_state.footer.indexes.len()).unwrap_or(u32::MAX);  | 
394  | 6  |             output_state.footer.uncompressed_data_size = received_amt;  | 
395  |  |             { | 
396  |  |                 // Write Footer.  | 
397  | 6  |                 let serialized_footer = encode_to_vec(output_state.footer, self.bincode_config)  | 
398  | 6  |                     .map_err(|e| {0   | 
399  | 0  |                         make_err!(Code::Internal, "Failed to serialize header : {:?}", e) | 
400  | 0  |                     })?;  | 
401  |  |  | 
402  | 6  |                 let mut footer = BytesMut::with_capacity(1 + 4 + serialized_footer.len());  | 
403  | 6  |                 footer.put_u8(FOOTER_FRAME_TYPE);  | 
404  | 6  |                 footer.put_u32_le(u32::try_from(serialized_footer.len()).unwrap_or(u32::MAX));  | 
405  | 6  |                 footer.extend_from_slice(&serialized_footer);  | 
406  |  |  | 
407  | 6  |                 tx.send(footer.freeze())  | 
408  | 6  |                     .await  | 
409  | 6  |                     .err_tip(|| "Failed to write footer to inner store in compression store")?0 ;  | 
410  | 6  |                 tx.send_eof()  | 
411  | 6  |                     .err_tip(|| "Failed writing EOF in compression store update")?0 ;  | 
412  |  |             }  | 
413  |  |  | 
414  | 6  |             Result::<(), Error>::Ok(())  | 
415  | 6  |         };  | 
416  |  |         let (write_result, update_result) = tokio::join!(write_fut, update_fut);  | 
417  |  |         write_result.merge(update_result)  | 
418  | 6  |     }  | 
419  |  |  | 
420  |  |     async fn get_part(  | 
421  |  |         self: Pin<&Self>,  | 
422  |  |         key: StoreKey<'_>,  | 
423  |  |         writer: &mut DropCloserWriteHalf,  | 
424  |  |         offset: u64,  | 
425  |  |         length: Option<u64>,  | 
426  | 1.22k  |     ) -> Result<(), Error> { | 
427  |  |         if is_zero_digest(key.borrow()) { | 
428  |  |             writer  | 
429  |  |                 .send_eof()  | 
430  |  |                 .err_tip(|| "Failed to send zero EOF in filesystem store get_part")?;  | 
431  |  |             return Ok(());  | 
432  |  |         }  | 
433  |  |  | 
434  |  |         let (tx, mut rx) = make_buf_channel_pair();  | 
435  |  |  | 
436  |  |         let inner_store = self.inner_store.clone();  | 
437  |  |         let key = key.into_owned();  | 
438  | 1.22k  |         let get_part_fut = spawn!("compression_store_get_part_spawn", async move { | 
439  | 1.22k  |             inner_store  | 
440  | 1.22k  |                 .get_part(key, tx, 0, None)  | 
441  | 1.22k  |                 .await  | 
442  | 1.22k  |                 .err_tip(|| "Inner store get in compression store failed")  | 
443  | 1.22k  |         })  | 
444  |  |         .map(  | 
445  | 1.22k  |             |result| match result.err_tip(|| "Failed to run compression get spawn") { | 
446  | 1.22k  |                 Ok(inner_result) => { | 
447  | 1.22k  |                     inner_result.err_tip(|| "Compression underlying store get failed")  | 
448  |  |                 }  | 
449  | 0  |                 Err(e) => Err(e),  | 
450  | 1.22k  |             },  | 
451  |  |         );  | 
452  | 1.22k  |         let read_fut = async move { | 
453  | 1.22k  |             let header = { | 
454  |  |                 // Read header.  | 
455  |  |                 const EMPTY_HEADER: Header = Header { | 
456  |  |                     version: CURRENT_STREAM_FORMAT_VERSION,  | 
457  |  |                     config: Lz4Config { block_size: 0 }, | 
458  |  |                     upload_size: UploadSizeInfo::ExactSize(0),  | 
459  |  |                 };  | 
460  | 1.22k  |                 let header_size = serialized_size(&EMPTY_HEADER, self.bincode_config)?0 ;  | 
461  | 1.22k  |                 let chunk = rx  | 
462  | 1.22k  |                     .consume(Some(usize::try_from(header_size).unwrap_or(usize::MAX)))  | 
463  | 1.22k  |                     .await  | 
464  | 1.22k  |                     .err_tip(|| "Failed to read header in get_part compression store")?0 ;  | 
465  | 0  |                 error_if!(  | 
466  | 1.22k  |                     chunk.len() as u64 != header_size,   Branch (466:21): [True: 0, False: 1.22k]
   Branch (466:21): [Folded - Ignored]
  | 
467  |  |                     "Expected inner store to return the proper amount of data in compression store {} != {}", | 
468  | 0  |                     chunk.len(),  | 
469  |  |                     header_size,  | 
470  |  |                 );  | 
471  |  |  | 
472  | 1.22k  |                 let (header, _) = decode_from_slice::<Header, _>(&chunk, self.bincode_config)  | 
473  | 1.22k  |                     .map_err(|e| {0   | 
474  | 0  |                         make_err!(Code::Internal, "Failed to deserialize header : {:?}", e) | 
475  | 0  |                     })?;  | 
476  | 1.22k  |                 header  | 
477  |  |             };  | 
478  |  |  | 
479  | 0  |             error_if!(  | 
480  | 1.22k  |                 header.version != CURRENT_STREAM_FORMAT_VERSION,   Branch (480:17): [True: 0, False: 1.22k]
   Branch (480:17): [Folded - Ignored]
  | 
481  |  |                 "Expected header version to match in get compression, got {}, want {}", | 
482  |  |                 header.version,  | 
483  |  |                 CURRENT_STREAM_FORMAT_VERSION  | 
484  |  |             );  | 
485  | 0  |             error_if!(  | 
486  | 1.22k  |                 header.config.block_size > self.config.max_decode_block_size,   Branch (486:17): [True: 0, False: 1.22k]
   Branch (486:17): [Folded - Ignored]
  | 
487  |  |                 "Block size is too large in compression, got {} > {}", | 
488  |  |                 header.config.block_size,  | 
489  | 0  |                 self.config.max_decode_block_size  | 
490  |  |             );  | 
491  |  |  | 
492  | 1.22k  |             let mut chunk = rx  | 
493  | 1.22k  |                 .consume(Some(1 + 4))  | 
494  | 1.22k  |                 .await  | 
495  | 1.22k  |                 .err_tip(|| "Failed to read init frame info in compression store")?0 ;  | 
496  | 0  |             error_if!(  | 
497  | 1.22k  |                 chunk.len() < 1 + 4,   Branch (497:17): [True: 0, False: 1.22k]
   Branch (497:17): [Folded - Ignored]
  | 
498  |  |                 "Received EOF too early while reading init frame info in compression store"  | 
499  |  |             );  | 
500  |  |  | 
501  | 1.22k  |             let mut frame_type = chunk.get_u8();  | 
502  | 1.22k  |             let mut frame_sz = chunk.get_u32_le();  | 
503  |  |  | 
504  | 1.22k  |             let mut uncompressed_data_sz: u64 = 0;  | 
505  | 1.22k  |             let mut remaining_bytes_to_send: u64 = length.unwrap_or(u64::MAX);  | 
506  | 1.22k  |             let mut chunks_count: u32 = 0;  | 
507  | 4.98k  |             while frame_type != FOOTER_FRAME_TYPE {  Branch (507:19): [True: 3.75k, False: 1.22k]
   Branch (507:19): [Folded - Ignored]
  | 
508  | 0  |                 error_if!(  | 
509  | 3.75k  |                     frame_type != CHUNK_FRAME_TYPE,   Branch (509:21): [True: 0, False: 3.75k]
   Branch (509:21): [Folded - Ignored]
  | 
510  |  |                     "Expected frame to be BODY in compression store, got {} at {}", | 
511  |  |                     frame_type,  | 
512  |  |                     chunks_count  | 
513  |  |                 );  | 
514  |  |  | 
515  | 3.75k  |                 let chunk = rx  | 
516  | 3.75k  |                     .consume(Some(frame_sz as usize))  | 
517  | 3.75k  |                     .await  | 
518  | 3.75k  |                     .err_tip(|| "Failed to read chunk in get_part compression store")?0 ;  | 
519  | 3.75k  |                 if chunk.len() < frame_sz as usize {  Branch (519:20): [True: 0, False: 3.75k]
   Branch (519:20): [Folded - Ignored]
  | 
520  | 0  |                     return Err(make_err!(  | 
521  | 0  |                         Code::Internal,  | 
522  | 0  |                         "Got EOF earlier than expected. Maybe the data is not compressed or different format?"  | 
523  | 0  |                     ));  | 
524  | 3.75k  |                 }  | 
525  |  |                 { | 
526  | 3.75k  |                     let max_output_size =  | 
527  | 3.75k  |                         get_maximum_output_size(header.config.block_size as usize);  | 
528  | 3.75k  |                     let mut uncompressed_data = BytesMut::with_capacity(max_output_size);  | 
529  |  |  | 
530  |  |                     // For efficiency reasons we do some raw slice manipulation so we can write directly  | 
531  |  |                     // into our buffer instead of having to do another allocation.  | 
532  | 3.75k  |                     let raw_decompressed_data = unsafe { | 
533  | 3.75k  |                         core::slice::from_raw_parts_mut(  | 
534  | 3.75k  |                             uncompressed_data.chunk_mut().as_mut_ptr(),  | 
535  | 3.75k  |                             max_output_size,  | 
536  | 3.75k  |                         )  | 
537  |  |                     };  | 
538  |  |  | 
539  | 3.75k  |                     let uncompressed_chunk_sz = decompress_into(&chunk, raw_decompressed_data)  | 
540  | 3.75k  |                         .map_err(|e| make_err!(Code::Internal0 , "Decompression error {:?}", e))?0 ;  | 
541  | 3.75k  |                     unsafe { uncompressed_data.advance_mut(uncompressed_chunk_sz) }; | 
542  | 3.75k  |                     let new_uncompressed_data_sz =  | 
543  | 3.75k  |                         uncompressed_data_sz + uncompressed_chunk_sz as u64;  | 
544  | 3.75k  |                     if new_uncompressed_data_sz >= offset && remaining_bytes_to_send > 02.28k  {   Branch (544:24): [True: 2.28k, False: 1.47k]
  Branch (544:62): [True: 1.95k, False: 328]
   Branch (544:24): [Folded - Ignored]
  Branch (544:62): [Folded - Ignored]
  | 
545  | 1.95k  |                         let start_pos =  | 
546  | 1.95k  |                             usize::try_from(offset.saturating_sub(uncompressed_data_sz))  | 
547  | 1.95k  |                                 .unwrap_or(usize::MAX);  | 
548  | 1.95k  |                         let end_pos = cmp::min(  | 
549  | 1.95k  |                             start_pos.saturating_add(  | 
550  | 1.95k  |                                 usize::try_from(remaining_bytes_to_send).unwrap_or(usize::MAX),  | 
551  |  |                             ),  | 
552  | 1.95k  |                             uncompressed_chunk_sz,  | 
553  |  |                         );  | 
554  | 1.95k  |                         if end_pos != start_pos {  Branch (554:28): [True: 1.85k, False: 102]
   Branch (554:28): [Folded - Ignored]
  | 
555  |  |                             // Make sure we don't send an EOF by accident.  | 
556  | 1.85k  |                             writer  | 
557  | 1.85k  |                                 .send(uncompressed_data.freeze().slice(start_pos..end_pos))  | 
558  | 1.85k  |                                 .await  | 
559  | 1.85k  |                                 .err_tip(|| "Failed sending chunk in compression store")?0 ;  | 
560  | 102  |                         }  | 
561  | 1.95k  |                         remaining_bytes_to_send -= (end_pos - start_pos) as u64;  | 
562  | 1.79k  |                     }  | 
563  | 3.75k  |                     uncompressed_data_sz = new_uncompressed_data_sz;  | 
564  |  |                 }  | 
565  | 3.75k  |                 chunks_count += 1;  | 
566  |  |  | 
567  | 3.75k  |                 let mut chunk = rx  | 
568  | 3.75k  |                     .consume(Some(1 + 4))  | 
569  | 3.75k  |                     .await  | 
570  | 3.75k  |                     .err_tip(|| "Failed to read frame info in compression store")?0 ;  | 
571  | 0  |                 error_if!(  | 
572  | 3.75k  |                     chunk.len() < 1 + 4,   Branch (572:21): [True: 0, False: 3.75k]
   Branch (572:21): [Folded - Ignored]
  | 
573  |  |                     "Received EOF too early while reading frame info in compression store"  | 
574  |  |                 );  | 
575  |  |  | 
576  | 3.75k  |                 frame_type = chunk.get_u8();  | 
577  | 3.75k  |                 frame_sz = chunk.get_u32_le();  | 
578  |  |             }  | 
579  |  |             // Index count will always be +1 (unless it is zero bytes long).  | 
580  | 1.22k  |             chunks_count = chunks_count.saturating_sub(1);  | 
581  |  |             { | 
582  |  |                 // Read and validate footer.  | 
583  | 1.22k  |                 let chunk = rx  | 
584  | 1.22k  |                     .consume(Some(frame_sz as usize))  | 
585  | 1.22k  |                     .await  | 
586  | 1.22k  |                     .err_tip(|| "Failed to read chunk in get_part compression store")?0 ;  | 
587  | 0  |                 error_if!(  | 
588  | 1.22k  |                     chunk.len() < frame_sz as usize,   Branch (588:21): [True: 0, False: 1.22k]
   Branch (588:21): [Folded - Ignored]
  | 
589  |  |                     "Unexpected EOF when reading footer in compression store get_part"  | 
590  |  |                 );  | 
591  |  |  | 
592  | 1.22k  |                 let (footer, _) = decode_from_slice::<Footer, _>(&chunk, self.bincode_config)  | 
593  | 1.22k  |                     .map_err(|e| {0   | 
594  | 0  |                         make_err!(Code::Internal, "Failed to deserialize footer : {:?}", e) | 
595  | 0  |                     })?;  | 
596  |  |  | 
597  | 0  |                 error_if!(  | 
598  | 1.22k  |                     header.version != footer.version,   Branch (598:21): [True: 0, False: 1.22k]
   Branch (598:21): [Folded - Ignored]
  | 
599  |  |                     "Expected header and footer versions to match compression store get_part, {} != {}", | 
600  |  |                     header.version,  | 
601  |  |                     footer.version  | 
602  |  |                 );  | 
603  | 0  |                 error_if!(  | 
604  | 1.22k  |                     footer.indexes.len() != footer.index_count as usize,   Branch (604:21): [True: 0, False: 1.22k]
   Branch (604:21): [Folded - Ignored]
  | 
605  |  |                     "Expected index counts to match in compression store footer in get_part, {} != {}", | 
606  | 0  |                     footer.indexes.len(),  | 
607  |  |                     footer.index_count  | 
608  |  |                 );  | 
609  | 0  |                 error_if!(  | 
610  | 1.22k  |                     footer.index_count != chunks_count,   Branch (610:21): [True: 0, False: 1.22k]
   Branch (610:21): [Folded - Ignored]
  | 
611  |  |                     concat!(  | 
612  |  |                         "Expected index counts to match received chunks count ",  | 
613  |  |                         "in compression store footer in get_part, {} != {}" | 
614  |  |                     ),  | 
615  |  |                     footer.index_count,  | 
616  |  |                     chunks_count  | 
617  |  |                 );  | 
618  | 0  |                 error_if!(  | 
619  | 1.22k  |                     footer.uncompressed_data_size != uncompressed_data_sz,   Branch (619:21): [True: 0, False: 1.22k]
   Branch (619:21): [Folded - Ignored]
  | 
620  |  |                     "Expected uncompressed data sizes to match in compression store footer in get_part, {} != {}", | 
621  |  |                     footer.uncompressed_data_size,  | 
622  |  |                     uncompressed_data_sz  | 
623  |  |                 );  | 
624  |  |             }  | 
625  |  |  | 
626  | 1.22k  |             writer  | 
627  | 1.22k  |                 .send_eof()  | 
628  | 1.22k  |                 .err_tip(|| "Failed to send eof in compression store write")?0 ;  | 
629  | 1.22k  |             Ok(())  | 
630  | 1.22k  |         };  | 
631  |  |  | 
632  |  |         let (read_result, get_part_fut_result) = tokio::join!(read_fut, get_part_fut);  | 
633  |  |         if let Err(mut e) = read_result { | 
634  |  |             // We may need to propagate the error from reading the data through first.  | 
635  |  |             if let Err(err) = get_part_fut_result { | 
636  |  |                 e = err.merge(e);  | 
637  |  |             }  | 
638  |  |             return Err(e);  | 
639  |  |         }  | 
640  |  |         Ok(())  | 
641  | 1.22k  |     }  | 
642  |  |  | 
643  | 0  |     fn inner_store(&self, _digest: Option<StoreKey>) -> &dyn StoreDriver { | 
644  | 0  |         self  | 
645  | 0  |     }  | 
646  |  |  | 
647  | 0  |     fn as_any(&self) -> &(dyn core::any::Any + Sync + Send + 'static) { | 
648  | 0  |         self  | 
649  | 0  |     }  | 
650  |  |  | 
651  | 0  |     fn as_any_arc(self: Arc<Self>) -> Arc<dyn core::any::Any + Sync + Send + 'static> { | 
652  | 0  |         self  | 
653  | 0  |     }  | 
654  |  |  | 
655  | 0  |     fn register_remove_callback(  | 
656  | 0  |         self: Arc<Self>,  | 
657  | 0  |         callback: Arc<dyn RemoveItemCallback>,  | 
658  | 0  |     ) -> Result<(), Error> { | 
659  | 0  |         self.inner_store.register_remove_callback(callback)  | 
660  | 0  |     }  | 
661  |  | }  | 
662  |  |  | 
663  |  | default_health_status_indicator!(CompressionStore);  |