~chiefnoah/bufring-rs

71fae827c8eff7d4951359a2c52de6fa972692e2 — Noah Pederson 1 year, 5 months ago 4b7dd3b master
Improvements, but it's broken!
3 files changed, 249 insertions(+), 43 deletions(-)

M src/.lib.rs.rustfmt
A src/errors.rs
M src/lib.rs
M src/.lib.rs.rustfmt => src/.lib.rs.rustfmt +48 -8
@@ 1,21 1,23 @@
use std::collections::VecDeque;
use std::default::Default;
use std::hash::BuildHasherDefault;
use std::hash::Hasher;
use std::rc::Rc;
use zerocopy::{FromBytes, AsBytes};
use crc::{Crc, CRC_32_CKSUM};
use twox_hash::XxHash64;
use zerocopy::{AsBytes, FromBytes};

const CHECKSUM: Crc<u32> = Crc::<u32>::new(&CRC_32_CKSUM);
type ChecksumHasher = BuildHasherDefault<XxHash64>;

// 4MB
const DEFAULT_PAGE_SIZE: usize = 4 * 1024 * 1024;

#[repr(packed)]
#[repr(C)]
struct RingBufferConfig {
    /// The number of pages in this buffer
    page_count: u16,
    /// The size in bytes of each page
    page_size: u32,
    /// The number of records stored in each page
    /// The maximum number of records stored in each page
    records_per_page: u32,
    /// A special flag indicating how many pages are currently in the buffer.
    /// This value must be 0 through 2.


@@ 23,11 25,16 @@ struct RingBufferConfig {
    /// A 1 indicates there are *exactly one* pages serialized to the buffer
    /// A 2 indicates there are *at least two* pages serialized to the buffer
    /// A 3 indicates this buffer has been filled at least once and looped back around
    /// 4 and up are reserved for later use
    page_flag: u8,
    /// The index of the head record.
    /// If this is <0, the head was *not* saved properly and we must assume every record in the
    /// buffer has not been drained. If it is 0 or greater, it is the index of the head record
    head: i32,
    /// The index of the tail record.
    /// If this is <0, the tail was *not* saved properly and we must scan the buffer until we find
    /// the tail record. This is always *optional*.
    tail: i32,
}

impl Default for RingBufferConfig {


@@ 38,25 45,58 @@ impl Default for RingBufferConfig {
            records_per_page: 0,
            page_flag: 0,
            head: 0,
            tail: -1,
        }
    }
}

#[repr(packed)]
/// Represents a single page of records, the primary unit of serialization in a `RingBuffer`.
#[repr(C)]
struct BufferPage<T: AsBytes + FromBytes> {
    /// The sequence number for this page. This should either be decrementing from `page_count` as
    /// specified in this `RingBuffer`'s `RingBufferConfig` or incrementing from 0 to `page_count`.
    /// It's primary function is to determine the tail of the buffer without having a serialized
    /// offset stored anywhere.
    seq: u16,
    crc: u32,
    /// A xxHash64 checksup of the records and `seq` fields of this page.
    cksum: u64,
    /// Control flags for this page.
    /// TODO: document the meaning of each bit
    flags: u8,
    /// A double-ended queue of the records. It must no larger than `records_per_page`. If it is
    /// less, an appropriate flag must be set
    /// TODO: implement this with a more efficient/safer buffer, particularly for use with
    /// zerocopy.
    records: VecDeque<T>,
}

impl<T: AsBytes + FromBytes> BufferPage<T> {
    /// Calculates the appropriate 64-bit checksum hash for `BufferPage` and returns it.
    fn calculate_checksum(&self) -> u64 {
        let mut hasher = XxHash64::default();
        // Using zerocopy, write the contents of records to the hasher
        self.records.iter().for_each(|x| hasher.write(x.as_bytes()));
        // Write the sequence number *last*
        hasher.write_u16(self.seq);
        // Return the hash digest
        hasher.finish()
    }

    /// Updates the `cksum` property on self.
    fn update_checksum(&mut self) -> () {
        let cksum = self.calculate_checksum();
        self.cksum = cksum;
    }
}

impl<T: AsBytes + FromBytes> BufferPage<T> {
    fn new(seq: u16) -> Self {
        Self {
            /// seq needs to be provided by the buffer
            seq: seq,
            flags: 0,
            /// TODO: calculate this when we go to serialize
            crc: 0,
            cksum: 0,
            // TODO: use with_capacity and set it to the correct alignment capacity
            records: VecDeque::new(),
        }

A src/errors.rs => src/errors.rs +47 -0
@@ 0,0 1,47 @@
use std::{fmt::Display, path::PathBuf};

#[derive(Debug, Clone)]
pub enum FatalError {
    Unknown,
}

impl Display for FatalError {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        match self {
            _ => write!(f, "An unknown error occurred."),
        }
    }
}

#[derive(Debug, Clone)]
pub enum RecoverableError {
    FileError(PathBuf),
    Unknown,
}

impl Display for RecoverableError {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        match self {
            RecoverableError::FileError(p) => {
                write!(f, "Unable to open file {}", p.to_string_lossy())
            },
            Unknown => write!(f, "An unknown, recoverable error ocurred")
        }
    }
}

#[derive(Debug, Clone)]
pub enum Error {
    Fatal(FatalError),
    Recoverable(RecoverableError),
}

impl Display for Error {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        // Pass through to the inner errors
        match self {
            Error::Recoverable(e) => e.fmt(f),
            Error::Fatal(e) => e.fmt(f),
        }
    }
}

M src/lib.rs => src/lib.rs +154 -35
@@ 1,30 1,36 @@
#![feature(generic_const_exprs)]
pub mod errors;
use crate::errors::{Error, FatalError, RecoverableError};
use std::cell::RefCell;
use std::collections::VecDeque;
use std::default::Default;
use std::hash::BuildHasherDefault;
use std::hash::Hasher;
use std::mem::replace;
use std::rc::Rc;
use twox_hash::XxHash64;
use zerocopy::{AsBytes, FromBytes};
use zerocopy::{AsBytes, FromBytes, transmute};

type ChecksumHasher = BuildHasherDefault<XxHash64>;

// 4MB
const DEFAULT_PAGE_SIZE: usize = 4 * 1024 * 1024;

const DEFAULT_RECORD_SIZE = DEFAULT_PAGE_SIZE - 11;

#[repr(C)]
struct RingBufferConfig {
    /// The number of pages in this buffer
    page_count: u16,
    /// The size in bytes of each page
    page_size: u32,
    /// The number of records stored in each page
    records_per_page: u32,
    /// A special flag indicating how many pages are currently in the buffer.
    /// This value must be 0 through 2.
    /// A 0 indicates there are *no* pages serialized to the buffer
    /// A 1 indicates there are *exactly one* pages serialized to the buffer
    /// A 2 indicates there are *at least two* pages serialized to the buffer
    /// A 3 indicates this buffer has been filled at least once and looped back around
    /// 4 and up are reserved for later use
    page_flag: u8,
    /// The index of the head record.
    /// If this is <0, the head was *not* saved properly and we must assume every record in the


@@ 33,7 39,7 @@ struct RingBufferConfig {
    /// The index of the tail record.
    /// If this is <0, the tail was *not* saved properly and we must scan the buffer until we find
    /// the tail record. This is always *optional*.
    tail: i32,
    length: i32,
}

impl Default for RingBufferConfig {


@@ 41,61 47,175 @@ impl Default for RingBufferConfig {
        Self {
            page_count: Default::default(),
            page_size: DEFAULT_PAGE_SIZE as u32,
            records_per_page: 0,
            page_flag: 0,
            head: 0,
            tail: -1,
            length: -1,
        }
    }
}

type Records<T> = VecDeque<T>;

#[repr(C)]
struct BufferPage<T: AsBytes + FromBytes> {
/// Represents a single page of records, the primary unit of serialization in a `RingBuffer`.
#[repr(C, align(4194304))] // align to 4MB
#[derive(Clone, AsBytes, FromBytes)]
struct BufferPage<const RECORD_SIZE: usize = DEFAULT_RECORD_SIZE> {
    /// The sequence number for this page. This should either be decrementing from `page_count` as
    /// specified in this `RingBuffer`'s `RingBufferConfig` or incrementing from 0 to `page_count`.
    /// It's primary function is to determine the tail of the buffer without having a serialized
    /// offset stored anywhere.
    seq: u16,
    crc: u64,
    records: VecDeque<T>,
    /// A xxHash64 checksup of the records and `seq` fields of this page.
    cksum: u64,
    /// Control flags for this page.
    /// TODO: document the meaning of each bit
    /// [0:1]: Whether this page is incomplete. If this bit is set to 1, the should be "cast" to a
    /// `CountedBufferPage`
    flags: u8,
    /// A double-ended queue of the records. It must no larger than `records_per_page`. If it is
    /// less, an appropriate flag must be set
    /// TODO: implement this with a more efficient/safer buffer, particularly for use with
    /// zerocopy.
    body: [u8; RECORD_SIZE],
}

impl<T: AsBytes + FromBytes> BufferPage<T> {
#[derive(Debug, Clone)]
struct PageFull;

impl<const RECORD_SIZE: usize> BufferPage<RECORD_SIZE> {
    fn new(seq: u16, body: [u8; RECORD_SIZE]) -> Self {
        Self {
            /// seq needs to be provided by the buffer
            seq,
            flags: 0,
            /// TODO: calculate this when we go to serialize
            cksum: 0,
            body,
        }
    }
    /// Calculates the appropriate 64-bit checksum hash for `BufferPage` and returns it.
    fn calculate_checksum(&self) -> u64 {
        let mut hasher = XxHash64::default();
        self.records.iter().map(|x| hasher.write(x.as_bytes()));
        // Using zerocopy, write the contents of records to the hasher
        hasher.write(self.body.as_bytes());
        // Write the sequence number *last*
        hasher.write_u16(self.seq);
        // Return the hash digest
        hasher.finish()
    }
}

impl<T: AsBytes + FromBytes> BufferPage<T> {
    fn new(seq: u16) -> Self {
        Self {
            /// seq needs to be provided by the buffer
            seq: seq,
            /// TODO: calculate this when we go to serialize
            crc: 0,
            // TODO: use with_capacity and set it to the correct alignment capacity
            records: VecDeque::new(),
        }
    /// Updates the `cksum` property based on contents of self.
    fn update_checksum(&mut self) -> () {
        let cksum = self.calculate_checksum();
        self.cksum = cksum;
    }
}

pub struct RingBuffer<T: AsBytes + FromBytes> {
pub struct SerializedRingBuffer {
    /// The configuration for this buffer
    config: RingBufferConfig,
    /// The index of the tail record
    /// Whether pages should be flushed immediately. If this is true, when `add_record` is called,
    /// the generated `BufferPage` is serialized to disk immediately, regardless of whether `next`
    /// is None or not. This effectively also controls the usage of `next` as well. Records are
    /// always written to and read from disk if this is true.
    /// If this is false, an `add_record` call may simply store the `BufferPage` in `next` which
    /// could be subsequently retrieved with `get_record`.
    flush_immediately: bool,
    /// The index of the tail record. This is where records should be written to.
    tail: u16,
    /// The index of the head record. This index should be read next and then incremented if a
    /// `get_record` call happens and `next` is None.
    head: u16,
    /// The sequence number of the *last serialized `BufferPage`*
    seq: u16,
    /// The number of records currently in the buffer.
    /// The tail of the buffer can be calculated by head - length and wrapping around per normal
    /// ring-buffer semantics.
    length: u16,
    /// An optional cached page to be read from next.
    /// An optional cached page to be read from next. This page is always
    /// If this is `Option::None`, we *must* read a page from disk
    /// It *may* hold a reference to the same `BufferPage` as `last`.
    next: Option<Rc<BufferPage<T>>>,
    /// The tail page of the buffer. New records should be inserted into this page.
    /// It *may* hold a reference to the same `BufferPage` as `last`.
    last: Rc<BufferPage<T>>,
    next: Option<Box<BufferPage>>,
    /// Whether newly created `BufferPage`s should be incrementing or decrementing their `seq`s.
    inc: bool,
}

pub trait RingBuffer<T> {
    fn add_record(&mut self, record: T) -> Result<(), Error>;
    fn get_record(&mut self) -> Result<Option<T>, Error>;
}

impl<T: AsBytes + FromBytes + Clone> RingBuffer<T> for SerializedRingBuffer {
    /// Creates a new page and adds the record to it.
    /// This may or may not serialize to disk, depending on
    fn add_record(&mut self, record: T) -> Result<(), Error> {
        if self.length < self.config.page_count {
            self.length += 1;
        }
        if !self.flush_immediately && self.next.is_none() {
            let (seq, flip) = self.gen_next_seq();
            if flip {
                self.inc = !self.inc
            }
            self.next = Some(Box::new(BufferPage::new(seq, record.as_bytes())));
        } else {
            let (seq, flip) = self.gen_next_seq();
            if flip {
                self.inc = !self.inc
            }
            let page = BufferPage::new(seq, record);
            self.serialize_page(page)?;
        }
        Ok(())
    }

    fn get_record(&mut self) -> Result<Option<T>, Error> {
        if self.length == 0 {
            return Ok(None);
        }
        let page = if self.next.is_some() {
            replace(&mut self.next, None).unwrap()
        } else {
            self.read_page()?
        };
        self.length -= 1;
        Ok(Some(transmute!(*page.body)))
    }
}

impl SerializedRingBuffer {

    fn serialize_page(&mut self, page: BufferPage) -> Result<u16, Error> {
        self.seq = page.seq;
        todo!("Implement page serialization")
    }

    fn read_page(&mut self) -> Result<Box<BufferPage>, Error> {
        todo!("Implement page deserialization")
    }

    fn gen_next_seq(&self) -> (u16, bool) {
        // This is some very conditional code here... let's explain it

        // If we're incrementing the sequences and we aren't at the end
        // We just return the last seq + 1 and *don't* indicate we should flip the inc flag
        if self.inc && self.seq < self.config.page_count - 1 {
            (self.seq + 1, false)
        // If we're incrementing, but we're at the end of the buffer/max seq value,
        // indicate we should flip the inc flag and return the page_count as our new
        // seq, which is now decrementing from page_count.
        } else if self.inc && self.seq == self.config.page_count - 1 {
            (self.config.page_count, true)
        // If we're decrementing, and the last seq wasn't at 0 (the end of the buffer, when
        // decrementing), return last seq - 1 and don't flip the inc flag
        } else if !self.inc && self.seq > 0 {
            (self.seq - 1, false)

        // If we're decrementing and the last sequence was the end of the buffer (0),
        // indicate we should flip the inc flag and and return 0
        } else
        /* !self.inc && self.seq == 0*/
        {
            (0, true)
        }
    }
}

#[cfg(test)]


@@ 104,7 224,6 @@ mod tests {

    #[test]
    fn it_works() {
        let result = add(2, 2);
        assert_eq!(result, 4);
        todo!()
    }
}