/build/source/nativelink-store/src/compression_store.rs
Line | Count | Source |
1 | | // Copyright 2024 The NativeLink Authors. All rights reserved. |
2 | | // |
3 | | // Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License"); |
4 | | // you may not use this file except in compliance with the License. |
5 | | // You may obtain a copy of the License at |
6 | | // |
7 | | // See LICENSE file for details |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software |
10 | | // distributed under the License is distributed on an "AS IS" BASIS, |
11 | | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | | // See the License for the specific language governing permissions and |
13 | | // limitations under the License. |
14 | | |
15 | | use core::cmp; |
16 | | use core::pin::Pin; |
17 | | use std::sync::Arc; |
18 | | |
19 | | use async_trait::async_trait; |
20 | | use bincode::serde::{decode_from_slice, encode_to_vec}; |
21 | | use byteorder::{ByteOrder, LittleEndian}; |
22 | | use bytes::{Buf, BufMut, BytesMut}; |
23 | | use futures::future::FutureExt; |
24 | | use lz4_flex::block::{compress_into, decompress_into, get_maximum_output_size}; |
25 | | use nativelink_config::stores::CompressionSpec; |
26 | | use nativelink_error::{Code, Error, ResultExt, error_if, make_err}; |
27 | | use nativelink_metric::MetricsComponent; |
28 | | use nativelink_util::buf_channel::{ |
29 | | DropCloserReadHalf, DropCloserWriteHalf, make_buf_channel_pair, |
30 | | }; |
31 | | use nativelink_util::health_utils::{HealthStatusIndicator, default_health_status_indicator}; |
32 | | use nativelink_util::spawn; |
33 | | use nativelink_util::store_trait::{ |
34 | | RemoveItemCallback, Store, StoreDriver, StoreKey, StoreLike, UploadSizeInfo, |
35 | | }; |
36 | | use serde::{Deserialize, Serialize}; |
37 | | |
38 | | use crate::cas_utils::is_zero_digest; |
39 | | |
40 | | // In the event the bytestream format changes this number should be incremented to prevent |
41 | | // backwards compatibility issues. |
42 | | pub const CURRENT_STREAM_FORMAT_VERSION: u8 = 1; |
43 | | |
44 | | // Default block size that will be used to slice stream into. |
45 | | pub const DEFAULT_BLOCK_SIZE: u32 = 64 * 1024; |
46 | | |
47 | | const U32_SZ: u64 = size_of::<u8>() as u64; |
48 | | |
49 | | // We use a custom frame format here because I wanted the ability in the future to: |
50 | | // * Read a random part of the data without needing to parse entire file. |
51 | | // * Compress the data on the fly without needing to know the exact input size. |
52 | | // |
53 | | // The frame format that LZ4 uses does not contain an index of where the different |
54 | | // blocks are located. This would mean in the event we only wanted the last byte of |
55 | | // a file, we'd need to seek to the header of each block to find where the next block |
56 | | // offset is until we got to the last block then decompress it. |
57 | | // |
58 | | // By using this custom frame format we keep the ability to have each block reference |
59 | | // the next block (for efficiency), but also after all blocks we have a footer frame |
60 | | // which contains an index of each block in the stream. This means that we can read a |
61 | | // fixed number of bytes of the file, which always contain the size of the index, then |
62 | | // with that size we know the exact size of the footer frame, which contains the entire |
63 | | // index. Once this footer frame is loaded, we could then do some math to figure out |
64 | | // which block each byte is in and the offset of each block within the compressed file. |
65 | | // |
66 | | // The frame format is as follows: |
67 | | // |----------------------------------HEADER-----------------------------------------| |
68 | | // | version(u8) | block_size (u32) | upload_size_type (u32) | upload_size (u32) | |
69 | | // |----------------------------------BLOCK------------------------------------------| |
70 | | // | frame_type(u8) 0x00 | compressed_data_size (u32) | ...DATA... | |
71 | | // | ...DATA... | |
72 | | // | [Possibly repeat block] | |
73 | | // |----------------------------------FOOTER-----------------------------------------| |
74 | | // | frame_type(u8) 0x01 | footer_size (u32) | index_count1 (u64) | |
75 | | // | ...[pos_from_prev_index (u32) - repeat for count {index_count*}]... | |
76 | | // | index_count2 (u32) | uncompressed_data_sz (u64) | block_size (u32) | |
77 | | // | version (u8) |------------------------------------------------------------------| |
78 | | // |---------------------------------------------------------------------------------| |
79 | | // |
80 | | // version - A constant number used to define what version of this format is being |
81 | | // used. Version in header and footer must match. |
82 | | // block_size - Size of each block uncompressed except for last block. This means that |
83 | | // every block uncompressed will be a constant size except last block may |
84 | | // be variable size. Block size in header and footer must match. |
85 | | // upload_size_type - Value of 0 = UploadSizeInfo::ExactSize, 1 = UploadSizeInfo::MaxSize. |
86 | | // This is for debug reasons only. |
87 | | // upload_size - The size of the data. WARNING: Do not rely on this being the uncompressed |
88 | | // payload size. It is a debug field and a "best guess" on how large the data |
89 | | // is. The header does not contain the upload data size. This value is the |
90 | | // value counter part to what the `upload_size_type` field. |
91 | | // frame_type - Type of each frame. 0 = BLOCK frame, 1 = FOOTER frame. Header frame will |
92 | | // always start with the first byte of the stream, so no magic number for it. |
93 | | // compressed_data_size - The size of this block. The bytes after this field should be read |
94 | | // in sequence to get all of the block's data in this block. |
95 | | // footer_size - Size of the footer for bytes after this field. |
96 | | // index_count1 - Number of items in the index. ({index_count1} * 4) represents the number |
97 | | // of bytes that should be read after this field in order to get all index |
98 | | // data. |
99 | | // pos_from_prev_index - Index of an individual block in the stream relative to the previous |
100 | | // block. This field may repeat {index_count} times. |
101 | | // index_count2 - Same as {index_count1} and should always be equal. This field might be |
102 | | // useful if you want to read a random byte from this stream, have random |
103 | | // access to it and know the size of the stream during reading, because |
104 | | // this field is always in the exact same place in the stream relative to |
105 | | // the last byte. |
106 | | // uncompressed_data_sz - Size of the original uncompressed data. |
107 | | // |
108 | | // Note: All fields fields little-endian. |
109 | | |
110 | | /// Number representing a chunk. |
111 | | pub const CHUNK_FRAME_TYPE: u8 = 0; |
112 | | |
113 | | /// Number representing the footer. |
114 | | pub const FOOTER_FRAME_TYPE: u8 = 1; |
115 | | |
116 | | /// This is a partial mirror of `nativelink_config::stores::Lz4Config`. |
117 | | /// We cannot use that natively here because it could cause our |
118 | | /// serialized format to change if we added more configs. |
119 | | #[derive(Serialize, Deserialize, PartialEq, Eq, Debug, Default, Copy, Clone)] |
120 | | pub struct Lz4Config { |
121 | | pub block_size: u32, |
122 | | } |
123 | | |
124 | | #[derive(Serialize, Deserialize, PartialEq, Eq, Debug, Clone, Copy)] |
125 | | pub struct Header { |
126 | | pub version: u8, |
127 | | pub config: Lz4Config, |
128 | | pub upload_size: UploadSizeInfo, |
129 | | } |
130 | | |
131 | | #[derive(Serialize, Deserialize, PartialEq, Eq, Debug, Default, Clone, Copy)] |
132 | | pub struct SliceIndex { |
133 | | pub position_from_prev_index: u32, |
134 | | } |
135 | | |
136 | | #[derive(Serialize, Deserialize, PartialEq, Eq, Debug, Default)] |
137 | | pub struct Footer { |
138 | | pub indexes: Vec<SliceIndex>, |
139 | | pub index_count: u32, |
140 | | pub uncompressed_data_size: u64, |
141 | | pub config: Lz4Config, |
142 | | pub version: u8, |
143 | | } |
144 | | |
145 | | /// `lz4_flex::block::get_maximum_output_size()` way over estimates, so we use the |
146 | | /// one provided here: <https://github.com/torvalds/linux/blob/master/include/linux/lz4.h#L61> |
147 | | /// Local testing shows this gives quite accurate worst case given random input. |
148 | 6 | const fn lz4_compress_bound(input_size: u64) -> u64 { |
149 | 6 | input_size + (input_size / 255) + 16 |
150 | 6 | } |
151 | | |
152 | 1.24k | fn serialized_size<C>(value: &impl Serialize, config: C) -> Result<u64, Error> |
153 | 1.24k | where |
154 | 1.24k | C: bincode::config::Config, |
155 | | { |
156 | 1.24k | let mut size_writer = bincode::enc::write::SizeWriter { bytes_written: 0 }; |
157 | 1.24k | bincode::serde::encode_into_writer(value, &mut size_writer, config).map_err(|e| {0 |
158 | 0 | make_err!( |
159 | 0 | Code::Internal, |
160 | | "Failed to calculate serialized size: {:?}", |
161 | | e |
162 | | ) |
163 | 0 | })?; |
164 | 1.24k | Ok(size_writer.bytes_written as u64) |
165 | 1.24k | } |
166 | | |
167 | | struct UploadState { |
168 | | header: Header, |
169 | | footer: Footer, |
170 | | max_output_size: u64, |
171 | | input_max_size: u64, |
172 | | } |
173 | | |
174 | | impl UploadState { |
175 | 6 | pub(crate) fn new( |
176 | 6 | store: &CompressionStore, |
177 | 6 | upload_size: UploadSizeInfo, |
178 | 6 | ) -> Result<Self, Error> { |
179 | 6 | let input_max_size = match upload_size { |
180 | 6 | UploadSizeInfo::MaxSize(sz1 ) | UploadSizeInfo::ExactSize(sz5 ) => sz, |
181 | | }; |
182 | | |
183 | 6 | let max_index_count = (input_max_size / u64::from(store.config.block_size)) + 1; |
184 | | |
185 | 6 | let header = Header { |
186 | 6 | version: CURRENT_STREAM_FORMAT_VERSION, |
187 | 6 | config: Lz4Config { |
188 | 6 | block_size: store.config.block_size, |
189 | 6 | }, |
190 | 6 | upload_size, |
191 | 6 | }; |
192 | 6 | let footer = Footer { |
193 | 6 | indexes: vec![ |
194 | 6 | SliceIndex::default(); |
195 | 6 | usize::try_from(max_index_count) |
196 | 6 | .err_tip(|| "Could not convert max_index_count to usize")?0 |
197 | | ], |
198 | 6 | index_count: u32::try_from(max_index_count) |
199 | 6 | .err_tip(|| "Could not convert max_index_count to u32")?0 , |
200 | | uncompressed_data_size: 0, // Updated later. |
201 | 6 | config: header.config, |
202 | | version: CURRENT_STREAM_FORMAT_VERSION, |
203 | | }; |
204 | | |
205 | | // This is more accurate of an estimate than what get_maximum_output_size calculates. |
206 | 6 | let max_block_size = lz4_compress_bound(u64::from(store.config.block_size)) + U32_SZ + 1; |
207 | | |
208 | 6 | let max_output_size = { |
209 | 6 | let header_size = serialized_size(&header, store.bincode_config)?0 ; |
210 | 6 | let max_content_size = max_block_size * max_index_count; |
211 | 6 | let max_footer_size = U32_SZ + 1 + serialized_size(&footer, store.bincode_config)?0 ; |
212 | 6 | header_size + max_content_size + max_footer_size |
213 | | }; |
214 | | |
215 | 6 | Ok(Self { |
216 | 6 | header, |
217 | 6 | footer, |
218 | 6 | max_output_size, |
219 | 6 | input_max_size, |
220 | 6 | }) |
221 | 6 | } |
222 | | } |
223 | | |
224 | | // TODO(jaroeichler): Use the default `standard` config. |
225 | | type LegacyBincodeConfig = bincode::config::Configuration< |
226 | | bincode::config::LittleEndian, |
227 | | bincode::config::Fixint, |
228 | | bincode::config::NoLimit, |
229 | | >; |
230 | | |
231 | | /// This store will compress data before sending it on to the inner store. |
232 | | /// Note: Currently using `get_part()` and trying to read part of the data will |
233 | | /// result in the entire contents being read from the inner store but will |
234 | | /// only send the contents requested. |
235 | | #[derive(MetricsComponent)] |
236 | | pub struct CompressionStore { |
237 | | #[metric(group = "inner_store")] |
238 | | inner_store: Store, |
239 | | config: nativelink_config::stores::Lz4Config, |
240 | | bincode_config: LegacyBincodeConfig, |
241 | | } |
242 | | |
243 | | impl core::fmt::Debug for CompressionStore { |
244 | 0 | fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { |
245 | 0 | f.debug_struct("CompressionStore") |
246 | 0 | .field("inner_store", &self.inner_store) |
247 | 0 | .field("config", &self.config) |
248 | 0 | .finish_non_exhaustive() |
249 | 0 | } |
250 | | } |
251 | | |
252 | | impl CompressionStore { |
253 | 7 | pub fn new(spec: &CompressionSpec, inner_store: Store) -> Result<Arc<Self>, Error> { |
254 | 7 | let lz4_config = match spec.compression_algorithm { |
255 | 7 | nativelink_config::stores::CompressionAlgorithm::Lz4(mut lz4_config) => { |
256 | 7 | if lz4_config.block_size == 0 { Branch (256:20): [True: 3, False: 4]
Branch (256:20): [Folded - Ignored]
|
257 | 3 | lz4_config.block_size = DEFAULT_BLOCK_SIZE; |
258 | 4 | } |
259 | 7 | if lz4_config.max_decode_block_size == 0 { Branch (259:20): [True: 7, False: 0]
Branch (259:20): [Folded - Ignored]
|
260 | 7 | lz4_config.max_decode_block_size = lz4_config.block_size; |
261 | 7 | }0 |
262 | 7 | lz4_config |
263 | | } |
264 | | }; |
265 | 7 | Ok(Arc::new(Self { |
266 | 7 | inner_store, |
267 | 7 | config: lz4_config, |
268 | 7 | bincode_config: bincode::config::legacy(), |
269 | 7 | })) |
270 | 7 | } |
271 | | } |
272 | | |
273 | | #[async_trait] |
274 | | impl StoreDriver for CompressionStore { |
275 | | async fn has_with_results( |
276 | | self: Pin<&Self>, |
277 | | digests: &[StoreKey<'_>], |
278 | | results: &mut [Option<u64>], |
279 | 0 | ) -> Result<(), Error> { |
280 | | self.inner_store.has_with_results(digests, results).await |
281 | 0 | } |
282 | | |
283 | | async fn update( |
284 | | self: Pin<&Self>, |
285 | | key: StoreKey<'_>, |
286 | | mut reader: DropCloserReadHalf, |
287 | | upload_size: UploadSizeInfo, |
288 | 6 | ) -> Result<(), Error> { |
289 | | let mut output_state = UploadState::new(&self, upload_size)?; |
290 | | |
291 | | let (mut tx, rx) = make_buf_channel_pair(); |
292 | | |
293 | | let inner_store = self.inner_store.clone(); |
294 | | let key = key.into_owned(); |
295 | 6 | let update_fut = spawn!("compression_store_update_spawn", async move { |
296 | 6 | inner_store |
297 | 6 | .update( |
298 | 6 | key, |
299 | 6 | rx, |
300 | 6 | UploadSizeInfo::MaxSize(output_state.max_output_size), |
301 | 6 | ) |
302 | 6 | .await |
303 | 6 | .err_tip(|| "Inner store update in compression store failed") |
304 | 6 | }) |
305 | | .map( |
306 | 6 | |result| match result.err_tip(|| "Failed to run compression update spawn") { |
307 | 6 | Ok(inner_result) => { |
308 | 6 | inner_result.err_tip(|| "Compression underlying store update failed") |
309 | | } |
310 | 0 | Err(e) => Err(e), |
311 | 6 | }, |
312 | | ); |
313 | | |
314 | 6 | let write_fut = async move { |
315 | | { |
316 | | // Write Header. |
317 | 6 | let serialized_header = encode_to_vec(output_state.header, self.bincode_config) |
318 | 6 | .map_err(|e| {0 |
319 | 0 | make_err!(Code::Internal, "Failed to serialize header : {:?}", e) |
320 | 0 | })?; |
321 | 6 | tx.send(serialized_header.into()) |
322 | 6 | .await |
323 | 6 | .err_tip(|| "Failed to write compression header on upload")?0 ; |
324 | | } |
325 | | |
326 | 6 | let mut received_amt = 0; |
327 | 6 | let mut index_count: u32 = 0; |
328 | 99 | for index98 in &mut output_state.footer.indexes { |
329 | 98 | let chunk = reader |
330 | 98 | .consume(Some(self.config.block_size as usize)) |
331 | 98 | .await |
332 | 98 | .err_tip(|| "Failed to read take in update in compression store")?0 ; |
333 | 98 | if chunk.is_empty() { Branch (333:20): [True: 5, False: 93]
Branch (333:20): [Folded - Ignored]
|
334 | 5 | break; // EOF. |
335 | 93 | } |
336 | | |
337 | 93 | received_amt += u64::try_from(chunk.len()) |
338 | 93 | .err_tip(|| "Could not convert chunk.len() to u64")?0 ; |
339 | 0 | error_if!( |
340 | 93 | received_amt > output_state.input_max_size, Branch (340:21): [True: 0, False: 93]
Branch (340:21): [Folded - Ignored]
|
341 | | "Got more data than stated in compression store upload request" |
342 | | ); |
343 | | |
344 | 93 | let max_output_size = get_maximum_output_size(self.config.block_size as usize); |
345 | 93 | let mut compressed_data_buf = BytesMut::with_capacity(max_output_size); |
346 | 93 | compressed_data_buf.put_u8(CHUNK_FRAME_TYPE); |
347 | 93 | compressed_data_buf.put_u32_le(0); // Filled later. |
348 | | |
349 | | // For efficiency reasons we do some raw slice manipulation so we can write directly |
350 | | // into our buffer instead of having to do another allocation. |
351 | 93 | let raw_compressed_data = unsafe { |
352 | 93 | core::slice::from_raw_parts_mut( |
353 | 93 | compressed_data_buf.chunk_mut().as_mut_ptr(), |
354 | 93 | max_output_size, |
355 | 93 | ) |
356 | | }; |
357 | | |
358 | 93 | let compressed_data_sz = compress_into(&chunk, raw_compressed_data) |
359 | 93 | .map_err(|e| make_err!(Code::Internal0 , "Compression error {:?}", e))?0 ; |
360 | 93 | unsafe { |
361 | 93 | compressed_data_buf.advance_mut(compressed_data_sz); |
362 | 93 | } |
363 | | |
364 | | // Now fill the size in our slice. |
365 | 93 | LittleEndian::write_u32( |
366 | 93 | &mut compressed_data_buf[1..5], |
367 | 93 | u32::try_from(compressed_data_sz).unwrap_or(u32::MAX), |
368 | | ); |
369 | | |
370 | | // Now send our chunk. |
371 | 93 | tx.send(compressed_data_buf.freeze()) |
372 | 93 | .await |
373 | 93 | .err_tip(|| "Failed to write chunk to inner store in compression store")?0 ; |
374 | | |
375 | 93 | index.position_from_prev_index = |
376 | 93 | u32::try_from(compressed_data_sz).unwrap_or(u32::MAX); |
377 | | |
378 | 93 | index_count += 1; |
379 | | } |
380 | | // Index 0 is actually a pointer to the second chunk. This is because we don't need |
381 | | // an index for the first item, since it starts at position `{header_len}`. |
382 | | // The code above causes us to create 1 more index than we actually need, so we |
383 | | // remove the last index from our vector here, because at this point we are always |
384 | | // one index too many. |
385 | | // Note: We need to be careful that if we don't have any data (zero bytes) it |
386 | | // doesn't go to -1. |
387 | 6 | index_count = index_count.saturating_sub(1); |
388 | 6 | output_state |
389 | 6 | .footer |
390 | 6 | .indexes |
391 | 6 | .resize(index_count as usize, SliceIndex::default()); |
392 | 6 | output_state.footer.index_count = |
393 | 6 | u32::try_from(output_state.footer.indexes.len()).unwrap_or(u32::MAX); |
394 | 6 | output_state.footer.uncompressed_data_size = received_amt; |
395 | | { |
396 | | // Write Footer. |
397 | 6 | let serialized_footer = encode_to_vec(output_state.footer, self.bincode_config) |
398 | 6 | .map_err(|e| {0 |
399 | 0 | make_err!(Code::Internal, "Failed to serialize header : {:?}", e) |
400 | 0 | })?; |
401 | | |
402 | 6 | let mut footer = BytesMut::with_capacity(1 + 4 + serialized_footer.len()); |
403 | 6 | footer.put_u8(FOOTER_FRAME_TYPE); |
404 | 6 | footer.put_u32_le(u32::try_from(serialized_footer.len()).unwrap_or(u32::MAX)); |
405 | 6 | footer.extend_from_slice(&serialized_footer); |
406 | | |
407 | 6 | tx.send(footer.freeze()) |
408 | 6 | .await |
409 | 6 | .err_tip(|| "Failed to write footer to inner store in compression store")?0 ; |
410 | 6 | tx.send_eof() |
411 | 6 | .err_tip(|| "Failed writing EOF in compression store update")?0 ; |
412 | | } |
413 | | |
414 | 6 | Result::<(), Error>::Ok(()) |
415 | 6 | }; |
416 | | let (write_result, update_result) = tokio::join!(write_fut, update_fut); |
417 | | write_result.merge(update_result) |
418 | 6 | } |
419 | | |
420 | | async fn get_part( |
421 | | self: Pin<&Self>, |
422 | | key: StoreKey<'_>, |
423 | | writer: &mut DropCloserWriteHalf, |
424 | | offset: u64, |
425 | | length: Option<u64>, |
426 | 1.22k | ) -> Result<(), Error> { |
427 | | if is_zero_digest(key.borrow()) { |
428 | | writer |
429 | | .send_eof() |
430 | | .err_tip(|| "Failed to send zero EOF in filesystem store get_part")?; |
431 | | return Ok(()); |
432 | | } |
433 | | |
434 | | let (tx, mut rx) = make_buf_channel_pair(); |
435 | | |
436 | | let inner_store = self.inner_store.clone(); |
437 | | let key = key.into_owned(); |
438 | 1.22k | let get_part_fut = spawn!("compression_store_get_part_spawn", async move { |
439 | 1.22k | inner_store |
440 | 1.22k | .get_part(key, tx, 0, None) |
441 | 1.22k | .await |
442 | 1.22k | .err_tip(|| "Inner store get in compression store failed") |
443 | 1.22k | }) |
444 | | .map( |
445 | 1.22k | |result| match result.err_tip(|| "Failed to run compression get spawn") { |
446 | 1.22k | Ok(inner_result) => { |
447 | 1.22k | inner_result.err_tip(|| "Compression underlying store get failed") |
448 | | } |
449 | 0 | Err(e) => Err(e), |
450 | 1.22k | }, |
451 | | ); |
452 | 1.22k | let read_fut = async move { |
453 | 1.22k | let header = { |
454 | | // Read header. |
455 | | const EMPTY_HEADER: Header = Header { |
456 | | version: CURRENT_STREAM_FORMAT_VERSION, |
457 | | config: Lz4Config { block_size: 0 }, |
458 | | upload_size: UploadSizeInfo::ExactSize(0), |
459 | | }; |
460 | 1.22k | let header_size = serialized_size(&EMPTY_HEADER, self.bincode_config)?0 ; |
461 | 1.22k | let chunk = rx |
462 | 1.22k | .consume(Some(usize::try_from(header_size).unwrap_or(usize::MAX))) |
463 | 1.22k | .await |
464 | 1.22k | .err_tip(|| "Failed to read header in get_part compression store")?0 ; |
465 | 0 | error_if!( |
466 | 1.22k | chunk.len() as u64 != header_size, Branch (466:21): [True: 0, False: 1.22k]
Branch (466:21): [Folded - Ignored]
|
467 | | "Expected inner store to return the proper amount of data in compression store {} != {}", |
468 | 0 | chunk.len(), |
469 | | header_size, |
470 | | ); |
471 | | |
472 | 1.22k | let (header, _) = decode_from_slice::<Header, _>(&chunk, self.bincode_config) |
473 | 1.22k | .map_err(|e| {0 |
474 | 0 | make_err!(Code::Internal, "Failed to deserialize header : {:?}", e) |
475 | 0 | })?; |
476 | 1.22k | header |
477 | | }; |
478 | | |
479 | 0 | error_if!( |
480 | 1.22k | header.version != CURRENT_STREAM_FORMAT_VERSION, Branch (480:17): [True: 0, False: 1.22k]
Branch (480:17): [Folded - Ignored]
|
481 | | "Expected header version to match in get compression, got {}, want {}", |
482 | | header.version, |
483 | | CURRENT_STREAM_FORMAT_VERSION |
484 | | ); |
485 | 0 | error_if!( |
486 | 1.22k | header.config.block_size > self.config.max_decode_block_size, Branch (486:17): [True: 0, False: 1.22k]
Branch (486:17): [Folded - Ignored]
|
487 | | "Block size is too large in compression, got {} > {}", |
488 | | header.config.block_size, |
489 | 0 | self.config.max_decode_block_size |
490 | | ); |
491 | | |
492 | 1.22k | let mut chunk = rx |
493 | 1.22k | .consume(Some(1 + 4)) |
494 | 1.22k | .await |
495 | 1.22k | .err_tip(|| "Failed to read init frame info in compression store")?0 ; |
496 | 0 | error_if!( |
497 | 1.22k | chunk.len() < 1 + 4, Branch (497:17): [True: 0, False: 1.22k]
Branch (497:17): [Folded - Ignored]
|
498 | | "Received EOF too early while reading init frame info in compression store" |
499 | | ); |
500 | | |
501 | 1.22k | let mut frame_type = chunk.get_u8(); |
502 | 1.22k | let mut frame_sz = chunk.get_u32_le(); |
503 | | |
504 | 1.22k | let mut uncompressed_data_sz: u64 = 0; |
505 | 1.22k | let mut remaining_bytes_to_send: u64 = length.unwrap_or(u64::MAX); |
506 | 1.22k | let mut chunks_count: u32 = 0; |
507 | 4.98k | while frame_type != FOOTER_FRAME_TYPE { Branch (507:19): [True: 3.75k, False: 1.22k]
Branch (507:19): [Folded - Ignored]
|
508 | 0 | error_if!( |
509 | 3.75k | frame_type != CHUNK_FRAME_TYPE, Branch (509:21): [True: 0, False: 3.75k]
Branch (509:21): [Folded - Ignored]
|
510 | | "Expected frame to be BODY in compression store, got {} at {}", |
511 | | frame_type, |
512 | | chunks_count |
513 | | ); |
514 | | |
515 | 3.75k | let chunk = rx |
516 | 3.75k | .consume(Some(frame_sz as usize)) |
517 | 3.75k | .await |
518 | 3.75k | .err_tip(|| "Failed to read chunk in get_part compression store")?0 ; |
519 | 3.75k | if chunk.len() < frame_sz as usize { Branch (519:20): [True: 0, False: 3.75k]
Branch (519:20): [Folded - Ignored]
|
520 | 0 | return Err(make_err!( |
521 | 0 | Code::Internal, |
522 | 0 | "Got EOF earlier than expected. Maybe the data is not compressed or different format?" |
523 | 0 | )); |
524 | 3.75k | } |
525 | | { |
526 | 3.75k | let max_output_size = |
527 | 3.75k | get_maximum_output_size(header.config.block_size as usize); |
528 | 3.75k | let mut uncompressed_data = BytesMut::with_capacity(max_output_size); |
529 | | |
530 | | // For efficiency reasons we do some raw slice manipulation so we can write directly |
531 | | // into our buffer instead of having to do another allocation. |
532 | 3.75k | let raw_decompressed_data = unsafe { |
533 | 3.75k | core::slice::from_raw_parts_mut( |
534 | 3.75k | uncompressed_data.chunk_mut().as_mut_ptr(), |
535 | 3.75k | max_output_size, |
536 | 3.75k | ) |
537 | | }; |
538 | | |
539 | 3.75k | let uncompressed_chunk_sz = decompress_into(&chunk, raw_decompressed_data) |
540 | 3.75k | .map_err(|e| make_err!(Code::Internal0 , "Decompression error {:?}", e))?0 ; |
541 | 3.75k | unsafe { uncompressed_data.advance_mut(uncompressed_chunk_sz) }; |
542 | 3.75k | let new_uncompressed_data_sz = |
543 | 3.75k | uncompressed_data_sz + uncompressed_chunk_sz as u64; |
544 | 3.75k | if new_uncompressed_data_sz >= offset && remaining_bytes_to_send > 02.28k { Branch (544:24): [True: 2.28k, False: 1.47k]
Branch (544:62): [True: 1.95k, False: 328]
Branch (544:24): [Folded - Ignored]
Branch (544:62): [Folded - Ignored]
|
545 | 1.95k | let start_pos = |
546 | 1.95k | usize::try_from(offset.saturating_sub(uncompressed_data_sz)) |
547 | 1.95k | .unwrap_or(usize::MAX); |
548 | 1.95k | let end_pos = cmp::min( |
549 | 1.95k | start_pos.saturating_add( |
550 | 1.95k | usize::try_from(remaining_bytes_to_send).unwrap_or(usize::MAX), |
551 | | ), |
552 | 1.95k | uncompressed_chunk_sz, |
553 | | ); |
554 | 1.95k | if end_pos != start_pos { Branch (554:28): [True: 1.85k, False: 102]
Branch (554:28): [Folded - Ignored]
|
555 | | // Make sure we don't send an EOF by accident. |
556 | 1.85k | writer |
557 | 1.85k | .send(uncompressed_data.freeze().slice(start_pos..end_pos)) |
558 | 1.85k | .await |
559 | 1.85k | .err_tip(|| "Failed sending chunk in compression store")?0 ; |
560 | 102 | } |
561 | 1.95k | remaining_bytes_to_send -= (end_pos - start_pos) as u64; |
562 | 1.79k | } |
563 | 3.75k | uncompressed_data_sz = new_uncompressed_data_sz; |
564 | | } |
565 | 3.75k | chunks_count += 1; |
566 | | |
567 | 3.75k | let mut chunk = rx |
568 | 3.75k | .consume(Some(1 + 4)) |
569 | 3.75k | .await |
570 | 3.75k | .err_tip(|| "Failed to read frame info in compression store")?0 ; |
571 | 0 | error_if!( |
572 | 3.75k | chunk.len() < 1 + 4, Branch (572:21): [True: 0, False: 3.75k]
Branch (572:21): [Folded - Ignored]
|
573 | | "Received EOF too early while reading frame info in compression store" |
574 | | ); |
575 | | |
576 | 3.75k | frame_type = chunk.get_u8(); |
577 | 3.75k | frame_sz = chunk.get_u32_le(); |
578 | | } |
579 | | // Index count will always be +1 (unless it is zero bytes long). |
580 | 1.22k | chunks_count = chunks_count.saturating_sub(1); |
581 | | { |
582 | | // Read and validate footer. |
583 | 1.22k | let chunk = rx |
584 | 1.22k | .consume(Some(frame_sz as usize)) |
585 | 1.22k | .await |
586 | 1.22k | .err_tip(|| "Failed to read chunk in get_part compression store")?0 ; |
587 | 0 | error_if!( |
588 | 1.22k | chunk.len() < frame_sz as usize, Branch (588:21): [True: 0, False: 1.22k]
Branch (588:21): [Folded - Ignored]
|
589 | | "Unexpected EOF when reading footer in compression store get_part" |
590 | | ); |
591 | | |
592 | 1.22k | let (footer, _) = decode_from_slice::<Footer, _>(&chunk, self.bincode_config) |
593 | 1.22k | .map_err(|e| {0 |
594 | 0 | make_err!(Code::Internal, "Failed to deserialize footer : {:?}", e) |
595 | 0 | })?; |
596 | | |
597 | 0 | error_if!( |
598 | 1.22k | header.version != footer.version, Branch (598:21): [True: 0, False: 1.22k]
Branch (598:21): [Folded - Ignored]
|
599 | | "Expected header and footer versions to match compression store get_part, {} != {}", |
600 | | header.version, |
601 | | footer.version |
602 | | ); |
603 | 0 | error_if!( |
604 | 1.22k | footer.indexes.len() != footer.index_count as usize, Branch (604:21): [True: 0, False: 1.22k]
Branch (604:21): [Folded - Ignored]
|
605 | | "Expected index counts to match in compression store footer in get_part, {} != {}", |
606 | 0 | footer.indexes.len(), |
607 | | footer.index_count |
608 | | ); |
609 | 0 | error_if!( |
610 | 1.22k | footer.index_count != chunks_count, Branch (610:21): [True: 0, False: 1.22k]
Branch (610:21): [Folded - Ignored]
|
611 | | concat!( |
612 | | "Expected index counts to match received chunks count ", |
613 | | "in compression store footer in get_part, {} != {}" |
614 | | ), |
615 | | footer.index_count, |
616 | | chunks_count |
617 | | ); |
618 | 0 | error_if!( |
619 | 1.22k | footer.uncompressed_data_size != uncompressed_data_sz, Branch (619:21): [True: 0, False: 1.22k]
Branch (619:21): [Folded - Ignored]
|
620 | | "Expected uncompressed data sizes to match in compression store footer in get_part, {} != {}", |
621 | | footer.uncompressed_data_size, |
622 | | uncompressed_data_sz |
623 | | ); |
624 | | } |
625 | | |
626 | 1.22k | writer |
627 | 1.22k | .send_eof() |
628 | 1.22k | .err_tip(|| "Failed to send eof in compression store write")?0 ; |
629 | 1.22k | Ok(()) |
630 | 1.22k | }; |
631 | | |
632 | | let (read_result, get_part_fut_result) = tokio::join!(read_fut, get_part_fut); |
633 | | if let Err(mut e) = read_result { |
634 | | // We may need to propagate the error from reading the data through first. |
635 | | if let Err(err) = get_part_fut_result { |
636 | | e = err.merge(e); |
637 | | } |
638 | | return Err(e); |
639 | | } |
640 | | Ok(()) |
641 | 1.22k | } |
642 | | |
643 | 0 | fn inner_store(&self, _digest: Option<StoreKey>) -> &dyn StoreDriver { |
644 | 0 | self |
645 | 0 | } |
646 | | |
647 | 0 | fn as_any(&self) -> &(dyn core::any::Any + Sync + Send + 'static) { |
648 | 0 | self |
649 | 0 | } |
650 | | |
651 | 0 | fn as_any_arc(self: Arc<Self>) -> Arc<dyn core::any::Any + Sync + Send + 'static> { |
652 | 0 | self |
653 | 0 | } |
654 | | |
655 | 0 | fn register_remove_callback( |
656 | 0 | self: Arc<Self>, |
657 | 0 | callback: Arc<dyn RemoveItemCallback>, |
658 | 0 | ) -> Result<(), Error> { |
659 | 0 | self.inner_store.register_remove_callback(callback) |
660 | 0 | } |
661 | | } |
662 | | |
663 | | default_health_status_indicator!(CompressionStore); |