1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
use ::{Error, Reader, Result, Writer};

use std::io::{Read, Write};
use std::mem::{size_of, transmute};

const FILE_HEADER: &'static [u8] = b"DFSN";

/// is a reader that reads from a file.
/// This file needs to be generated by a corresponding writer.
/// Currently it does not support a growing file, e.g. files cannot be modified during read.
#[derive(Debug)]
pub struct FileReader<T>
    where T: Read
{
    file: T,
}

impl<T> FileReader<T> where T: Read {
    /// returns a new instance of `FileReader`.
    /// It returns error if the file is found corrupted.
    pub fn new(mut file: T) -> Result<FileReader<T>> {
        let mut header = vec![0u8; FILE_HEADER.len()];
        let read_length = try!(file.read(&mut header));
        if read_length == FILE_HEADER.len() && &*header.into_boxed_slice() == FILE_HEADER {
            return Ok(FileReader { file: file });
        } else {
            return Err(Error::CorruptSegmentHeader);
        }
    }
}

impl<T> Reader for FileReader<T> where T: Read {
    fn read(&mut self) -> Result<Option<Vec<u8>>> {
        // Rust currently does not support constexpr.
        let mut header = [0u8; 4];
        let header_read_length = try!(self.file.read(&mut header));
        if header_read_length == size_of::<i32>() {
            let header_ptr: *const i32 = unsafe { transmute(&header[0]) };
            let body_length_number = unsafe { ::std::ptr::read::<i32>(header_ptr) };
            let body_length = body_length_number as usize;
            let mut remaining_length = body_length;
            let mut full_buffer = Vec::with_capacity(body_length);
            while remaining_length > 0 {
                let mut buffer: Vec<u8> = Vec::with_capacity(remaining_length);
                unsafe {
                    buffer.set_len(remaining_length);
                }
                let read_length = try!(self.file.read(&mut buffer));
                if read_length == body_length {
                    // Optimize for getting data with only one read.
                    return Ok(Some(buffer));
                } else if read_length == 0 {
                    return Err(Error::InsufficientLength(remaining_length));
                } else {
                    remaining_length -= read_length;
                }
                buffer.truncate(read_length);
                full_buffer.extend(buffer);
            }
            return Ok(Some(full_buffer));
        } else if header_read_length == 0 {
            return Ok(None);
        } else {
            return Err(Error::CorruptMsgHeader);
        }
    }
}

impl<T> Iterator for FileReader<T> where T: Read {
    type Item = Result<Vec<u8>>;
    fn next(&mut self) -> Option<Result<Vec<u8>>> {
        match self.read() {
            Ok(Some(data)) => Some(Ok(data)),
            Ok(None) => None,
            Err(error) => Some(Err(error)),
        }
    }
}

/// is a writer for file.
/// It can only start to write a new file but not append to an existing file.
#[derive(Debug)]
pub struct FileWriter<T>
    where T: Write
{
    file: T,
}

impl<T> FileWriter<T> where T: Write {
    /// returns a new file writer instance.
    /// It returns error if there is IO error during the process.
    pub fn new(mut file: T) -> Result<FileWriter<T>> {
        if try!(file.write(FILE_HEADER)) == FILE_HEADER.len() {
            Ok(FileWriter { file: file })
        } else {
            Err(Error::CorruptSegmentHeader)
        }
    }
}

impl<T> Writer for FileWriter<T> where T: Write {
    fn write(&mut self, buf: &[u8]) -> Result<()> {
        let value = buf.len() as i32;
        let header_ptr: *const u8 = unsafe { transmute(&value) };
        let header_length = size_of::<i32>();
        let slice = unsafe { ::std::slice::from_raw_parts(header_ptr, header_length) };
        // TODO: Check insufficient write. Or even need to add new error types. Restore upon
        // failuer? And add tests.
        try!(self.file.write(slice));
        try!(self.file.write(buf));
        Ok(())
    }
}