ruzstd/streaming_decoder.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138
use core::borrow::BorrowMut;
use crate::frame_decoder::{BlockDecodingStrategy, FrameDecoder, FrameDecoderError};
use crate::io::{Error, ErrorKind, Read};
/// High level Zstandard frame decoder that can be used to decompress a given Zstandard frame.
///
/// This decoder implements `io::Read`, so you can interact with it by calling
/// `io::Read::read_to_end` / `io::Read::read_exact` or passing this to another library / module as a source for the decoded content
///
/// If you need more control over how decompression takes place, you can use
/// the lower level [FrameDecoder], which allows for greater control over how
/// decompression takes place but the implementor must call
/// [FrameDecoder::decode_blocks] repeatedly to decode the entire frame.
///
/// ## Caveat
/// [StreamingDecoder] expects the underlying stream to only contain a single frame,
/// yet the specification states that a single archive may contain multiple frames.
///
/// To decode all the frames in a finite stream, the calling code needs to recreate
/// the instance of the decoder and handle
/// [crate::frame::ReadFrameHeaderError::SkipFrame]
/// errors by skipping forward the `length` amount of bytes, see <https://github.com/KillingSpark/zstd-rs/issues/57>
///
/// ```no_run
/// // `read_to_end` is not implemented by the no_std implementation.
/// #[cfg(feature = "std")]
/// {
/// use std::fs::File;
/// use std::io::Read;
/// use ruzstd::{StreamingDecoder};
///
/// // Read a Zstandard archive from the filesystem then decompress it into a vec.
/// let mut f: File = todo!("Read a .zstd archive from somewhere");
/// let mut decoder = StreamingDecoder::new(f).unwrap();
/// let mut result = Vec::new();
/// Read::read_to_end(&mut decoder, &mut result).unwrap();
/// }
/// ```
pub struct StreamingDecoder<READ: Read, DEC: BorrowMut<FrameDecoder>> {
pub decoder: DEC,
source: READ,
}
impl<READ: Read, DEC: BorrowMut<FrameDecoder>> StreamingDecoder<READ, DEC> {
pub fn new_with_decoder(
mut source: READ,
mut decoder: DEC,
) -> Result<StreamingDecoder<READ, DEC>, FrameDecoderError> {
decoder.borrow_mut().init(&mut source)?;
Ok(StreamingDecoder { decoder, source })
}
}
impl<READ: Read> StreamingDecoder<READ, FrameDecoder> {
pub fn new(
mut source: READ,
) -> Result<StreamingDecoder<READ, FrameDecoder>, FrameDecoderError> {
let mut decoder = FrameDecoder::new();
decoder.init(&mut source)?;
Ok(StreamingDecoder { decoder, source })
}
}
impl<READ: Read, DEC: BorrowMut<FrameDecoder>> StreamingDecoder<READ, DEC> {
/// Gets a reference to the underlying reader.
pub fn get_ref(&self) -> &READ {
&self.source
}
/// Gets a mutable reference to the underlying reader.
///
/// It is inadvisable to directly read from the underlying reader.
pub fn get_mut(&mut self) -> &mut READ {
&mut self.source
}
/// Destructures this object into the inner reader.
pub fn into_inner(self) -> READ
where
READ: Sized,
{
self.source
}
/// Destructures this object into both the inner reader and [FrameDecoder].
pub fn into_parts(self) -> (READ, DEC)
where
READ: Sized,
{
(self.source, self.decoder)
}
/// Destructures this object into the inner [FrameDecoder].
pub fn into_frame_decoder(self) -> DEC {
self.decoder
}
}
impl<READ: Read, DEC: BorrowMut<FrameDecoder>> Read for StreamingDecoder<READ, DEC> {
fn read(&mut self, buf: &mut [u8]) -> Result<usize, Error> {
let decoder = self.decoder.borrow_mut();
if decoder.is_finished() && decoder.can_collect() == 0 {
//No more bytes can ever be decoded
return Ok(0);
}
// need to loop. The UpToBytes strategy doesn't take any effort to actually reach that limit.
// The first few calls can result in just filling the decode buffer but these bytes can not be collected.
// So we need to call this until we can actually collect enough bytes
// TODO add BlockDecodingStrategy::UntilCollectable(usize) that pushes this logic into the decode_blocks function
while decoder.can_collect() < buf.len() && !decoder.is_finished() {
//More bytes can be decoded
let additional_bytes_needed = buf.len() - decoder.can_collect();
match decoder.decode_blocks(
&mut self.source,
BlockDecodingStrategy::UptoBytes(additional_bytes_needed),
) {
Ok(_) => { /*Nothing to do*/ }
Err(e) => {
let err;
#[cfg(feature = "std")]
{
err = Error::new(ErrorKind::Other, e);
}
#[cfg(not(feature = "std"))]
{
err = Error::new(ErrorKind::Other, alloc::boxed::Box::new(e));
}
return Err(err);
}
}
}
decoder.read(buf)
}
}