@@ 1,21 1,23 @@
use std::collections::VecDeque;
use std::default::Default;
+use std::hash::BuildHasherDefault;
+use std::hash::Hasher;
use std::rc::Rc;
-use zerocopy::{FromBytes, AsBytes};
-use crc::{Crc, CRC_32_CKSUM};
+use twox_hash::XxHash64;
+use zerocopy::{AsBytes, FromBytes};
-const CHECKSUM: Crc<u32> = Crc::<u32>::new(&CRC_32_CKSUM);
+type ChecksumHasher = BuildHasherDefault<XxHash64>;
// 4MB
const DEFAULT_PAGE_SIZE: usize = 4 * 1024 * 1024;
-#[repr(packed)]
+#[repr(C)]
struct RingBufferConfig {
/// The number of pages in this buffer
page_count: u16,
/// The size in bytes of each page
page_size: u32,
- /// The number of records stored in each page
+ /// The maximum number of records stored in each page
records_per_page: u32,
/// A special flag indicating how many pages are currently in the buffer.
/// This value must be 0 through 2.
@@ 23,11 25,16 @@ struct RingBufferConfig {
/// A 1 indicates there are *exactly one* pages serialized to the buffer
/// A 2 indicates there are *at least two* pages serialized to the buffer
/// A 3 indicates this buffer has been filled at least once and looped back around
+ /// 4 and up are reserved for later use
page_flag: u8,
/// The index of the head record.
/// If this is <0, the head was *not* saved properly and we must assume every record in the
/// buffer has not been drained. If it is 0 or greater, it is the index of the head record
head: i32,
+ /// The index of the tail record.
+ /// If this is <0, the tail was *not* saved properly and we must scan the buffer until we find
+ /// the tail record. This is always *optional*.
+ tail: i32,
}
impl Default for RingBufferConfig {
@@ 38,25 45,58 @@ impl Default for RingBufferConfig {
records_per_page: 0,
page_flag: 0,
head: 0,
+ tail: -1,
}
}
}
-#[repr(packed)]
+/// Represents a single page of records, the primary unit of serialization in a `RingBuffer`.
+#[repr(C)]
struct BufferPage<T: AsBytes + FromBytes> {
+ /// The sequence number for this page. This should either be decrementing from `page_count` as
+ /// specified in this `RingBuffer`'s `RingBufferConfig` or incrementing from 0 to `page_count`.
+ /// It's primary function is to determine the tail of the buffer without having a serialized
+ /// offset stored anywhere.
seq: u16,
- crc: u32,
+ /// A xxHash64 checksup of the records and `seq` fields of this page.
+ cksum: u64,
+ /// Control flags for this page.
+ /// TODO: document the meaning of each bit
+ flags: u8,
+ /// A double-ended queue of the records. It must no larger than `records_per_page`. If it is
+ /// less, an appropriate flag must be set
+ /// TODO: implement this with a more efficient/safer buffer, particularly for use with
+ /// zerocopy.
records: VecDeque<T>,
}
impl<T: AsBytes + FromBytes> BufferPage<T> {
+ /// Calculates the appropriate 64-bit checksum hash for `BufferPage` and returns it.
+ fn calculate_checksum(&self) -> u64 {
+ let mut hasher = XxHash64::default();
+ // Using zerocopy, write the contents of records to the hasher
+ self.records.iter().for_each(|x| hasher.write(x.as_bytes()));
+ // Write the sequence number *last*
+ hasher.write_u16(self.seq);
+ // Return the hash digest
+ hasher.finish()
+ }
+ /// Updates the `cksum` property on self.
+ fn update_checksum(&mut self) -> () {
+ let cksum = self.calculate_checksum();
+ self.cksum = cksum;
+ }
+}
+
+impl<T: AsBytes + FromBytes> BufferPage<T> {
fn new(seq: u16) -> Self {
Self {
/// seq needs to be provided by the buffer
seq: seq,
+ flags: 0,
/// TODO: calculate this when we go to serialize
- crc: 0,
+ cksum: 0,
// TODO: use with_capacity and set it to the correct alignment capacity
records: VecDeque::new(),
}
@@ 0,0 1,47 @@
+use std::{fmt::Display, path::PathBuf};
+
+#[derive(Debug, Clone)]
+pub enum FatalError {
+ Unknown,
+}
+
+impl Display for FatalError {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ match self {
+ _ => write!(f, "An unknown error occurred."),
+ }
+ }
+}
+
+#[derive(Debug, Clone)]
+pub enum RecoverableError {
+ FileError(PathBuf),
+ Unknown,
+}
+
+impl Display for RecoverableError {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ match self {
+ RecoverableError::FileError(p) => {
+ write!(f, "Unable to open file {}", p.to_string_lossy())
+ },
+ Unknown => write!(f, "An unknown, recoverable error ocurred")
+ }
+ }
+}
+
+#[derive(Debug, Clone)]
+pub enum Error {
+ Fatal(FatalError),
+ Recoverable(RecoverableError),
+}
+
+impl Display for Error {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ // Pass through to the inner errors
+ match self {
+ Error::Recoverable(e) => e.fmt(f),
+ Error::Fatal(e) => e.fmt(f),
+ }
+ }
+}
@@ 1,30 1,36 @@
+#![feature(generic_const_exprs)]
+pub mod errors;
+use crate::errors::{Error, FatalError, RecoverableError};
+use std::cell::RefCell;
use std::collections::VecDeque;
use std::default::Default;
use std::hash::BuildHasherDefault;
use std::hash::Hasher;
+use std::mem::replace;
use std::rc::Rc;
use twox_hash::XxHash64;
-use zerocopy::{AsBytes, FromBytes};
+use zerocopy::{AsBytes, FromBytes, transmute};
type ChecksumHasher = BuildHasherDefault<XxHash64>;
// 4MB
const DEFAULT_PAGE_SIZE: usize = 4 * 1024 * 1024;
+const DEFAULT_RECORD_SIZE = DEFAULT_PAGE_SIZE - 11;
+
#[repr(C)]
struct RingBufferConfig {
/// The number of pages in this buffer
page_count: u16,
/// The size in bytes of each page
page_size: u32,
- /// The number of records stored in each page
- records_per_page: u32,
/// A special flag indicating how many pages are currently in the buffer.
/// This value must be 0 through 2.
/// A 0 indicates there are *no* pages serialized to the buffer
/// A 1 indicates there are *exactly one* pages serialized to the buffer
/// A 2 indicates there are *at least two* pages serialized to the buffer
/// A 3 indicates this buffer has been filled at least once and looped back around
+ /// 4 and up are reserved for later use
page_flag: u8,
/// The index of the head record.
/// If this is <0, the head was *not* saved properly and we must assume every record in the
@@ 33,7 39,7 @@ struct RingBufferConfig {
/// The index of the tail record.
/// If this is <0, the tail was *not* saved properly and we must scan the buffer until we find
/// the tail record. This is always *optional*.
- tail: i32,
+ length: i32,
}
impl Default for RingBufferConfig {
@@ 41,61 47,175 @@ impl Default for RingBufferConfig {
Self {
page_count: Default::default(),
page_size: DEFAULT_PAGE_SIZE as u32,
- records_per_page: 0,
page_flag: 0,
head: 0,
- tail: -1,
+ length: -1,
}
}
}
-type Records<T> = VecDeque<T>;
-
-#[repr(C)]
-struct BufferPage<T: AsBytes + FromBytes> {
+/// Represents a single page of records, the primary unit of serialization in a `RingBuffer`.
+#[repr(C, align(4194304))] // align to 4MB
+#[derive(Clone, AsBytes, FromBytes)]
+struct BufferPage<const RECORD_SIZE: usize = DEFAULT_RECORD_SIZE> {
+ /// The sequence number for this page. This should either be decrementing from `page_count` as
+ /// specified in this `RingBuffer`'s `RingBufferConfig` or incrementing from 0 to `page_count`.
+ /// It's primary function is to determine the tail of the buffer without having a serialized
+ /// offset stored anywhere.
seq: u16,
- crc: u64,
- records: VecDeque<T>,
+ /// A xxHash64 checksup of the records and `seq` fields of this page.
+ cksum: u64,
+ /// Control flags for this page.
+ /// TODO: document the meaning of each bit
+ /// [0:1]: Whether this page is incomplete. If this bit is set to 1, the should be "cast" to a
+ /// `CountedBufferPage`
+ flags: u8,
+ /// A double-ended queue of the records. It must no larger than `records_per_page`. If it is
+ /// less, an appropriate flag must be set
+ /// TODO: implement this with a more efficient/safer buffer, particularly for use with
+ /// zerocopy.
+ body: [u8; RECORD_SIZE],
}
-impl<T: AsBytes + FromBytes> BufferPage<T> {
+#[derive(Debug, Clone)]
+struct PageFull;
+
+impl<const RECORD_SIZE: usize> BufferPage<RECORD_SIZE> {
+ fn new(seq: u16, body: [u8; RECORD_SIZE]) -> Self {
+ Self {
+ /// seq needs to be provided by the buffer
+ seq,
+ flags: 0,
+ /// TODO: calculate this when we go to serialize
+ cksum: 0,
+ body,
+ }
+ }
+ /// Calculates the appropriate 64-bit checksum hash for `BufferPage` and returns it.
fn calculate_checksum(&self) -> u64 {
let mut hasher = XxHash64::default();
- self.records.iter().map(|x| hasher.write(x.as_bytes()));
+ // Using zerocopy, write the contents of records to the hasher
+ hasher.write(self.body.as_bytes());
+ // Write the sequence number *last*
hasher.write_u16(self.seq);
+ // Return the hash digest
hasher.finish()
}
-}
-impl<T: AsBytes + FromBytes> BufferPage<T> {
- fn new(seq: u16) -> Self {
- Self {
- /// seq needs to be provided by the buffer
- seq: seq,
- /// TODO: calculate this when we go to serialize
- crc: 0,
- // TODO: use with_capacity and set it to the correct alignment capacity
- records: VecDeque::new(),
- }
+ /// Updates the `cksum` property based on contents of self.
+ fn update_checksum(&mut self) -> () {
+ let cksum = self.calculate_checksum();
+ self.cksum = cksum;
}
}
-pub struct RingBuffer<T: AsBytes + FromBytes> {
+pub struct SerializedRingBuffer {
/// The configuration for this buffer
config: RingBufferConfig,
- /// The index of the tail record
+ /// Whether pages should be flushed immediately. If this is true, when `add_record` is called,
+ /// the generated `BufferPage` is serialized to disk immediately, regardless of whether `next`
+ /// is None or not. This effectively also controls the usage of `next` as well. Records are
+ /// always written to and read from disk if this is true.
+ /// If this is false, an `add_record` call may simply store the `BufferPage` in `next` which
+ /// could be subsequently retrieved with `get_record`.
+ flush_immediately: bool,
+ /// The index of the tail record. This is where records should be written to.
tail: u16,
+ /// The index of the head record. This index should be read next and then incremented if a
+ /// `get_record` call happens and `next` is None.
+ head: u16,
+ /// The sequence number of the *last serialized `BufferPage`*
+ seq: u16,
/// The number of records currently in the buffer.
/// The tail of the buffer can be calculated by head - length and wrapping around per normal
/// ring-buffer semantics.
length: u16,
- /// An optional cached page to be read from next.
+ /// An optional cached page to be read from next. This page is always
/// If this is `Option::None`, we *must* read a page from disk
- /// It *may* hold a reference to the same `BufferPage` as `last`.
- next: Option<Rc<BufferPage<T>>>,
- /// The tail page of the buffer. New records should be inserted into this page.
- /// It *may* hold a reference to the same `BufferPage` as `last`.
- last: Rc<BufferPage<T>>,
+ next: Option<Box<BufferPage>>,
+ /// Whether newly created `BufferPage`s should be incrementing or decrementing their `seq`s.
+ inc: bool,
+}
+
+pub trait RingBuffer<T> {
+ fn add_record(&mut self, record: T) -> Result<(), Error>;
+ fn get_record(&mut self) -> Result<Option<T>, Error>;
+}
+
+impl<T: AsBytes + FromBytes + Clone> RingBuffer<T> for SerializedRingBuffer {
+ /// Creates a new page and adds the record to it.
+ /// This may or may not serialize to disk, depending on
+ fn add_record(&mut self, record: T) -> Result<(), Error> {
+ if self.length < self.config.page_count {
+ self.length += 1;
+ }
+ if !self.flush_immediately && self.next.is_none() {
+ let (seq, flip) = self.gen_next_seq();
+ if flip {
+ self.inc = !self.inc
+ }
+ self.next = Some(Box::new(BufferPage::new(seq, record.as_bytes())));
+ } else {
+ let (seq, flip) = self.gen_next_seq();
+ if flip {
+ self.inc = !self.inc
+ }
+ let page = BufferPage::new(seq, record);
+ self.serialize_page(page)?;
+ }
+ Ok(())
+ }
+
+ fn get_record(&mut self) -> Result<Option<T>, Error> {
+ if self.length == 0 {
+ return Ok(None);
+ }
+ let page = if self.next.is_some() {
+ replace(&mut self.next, None).unwrap()
+ } else {
+ self.read_page()?
+ };
+ self.length -= 1;
+ Ok(Some(transmute!(*page.body)))
+ }
+}
+
+impl SerializedRingBuffer {
+
+ fn serialize_page(&mut self, page: BufferPage) -> Result<u16, Error> {
+ self.seq = page.seq;
+ todo!("Implement page serialization")
+ }
+
+ fn read_page(&mut self) -> Result<Box<BufferPage>, Error> {
+ todo!("Implement page deserialization")
+ }
+
+ fn gen_next_seq(&self) -> (u16, bool) {
+ // This is some very conditional code here... let's explain it
+
+ // If we're incrementing the sequences and we aren't at the end
+ // We just return the last seq + 1 and *don't* indicate we should flip the inc flag
+ if self.inc && self.seq < self.config.page_count - 1 {
+ (self.seq + 1, false)
+ // If we're incrementing, but we're at the end of the buffer/max seq value,
+ // indicate we should flip the inc flag and return the page_count as our new
+ // seq, which is now decrementing from page_count.
+ } else if self.inc && self.seq == self.config.page_count - 1 {
+ (self.config.page_count, true)
+ // If we're decrementing, and the last seq wasn't at 0 (the end of the buffer, when
+ // decrementing), return last seq - 1 and don't flip the inc flag
+ } else if !self.inc && self.seq > 0 {
+ (self.seq - 1, false)
+
+ // If we're decrementing and the last sequence was the end of the buffer (0),
+ // indicate we should flip the inc flag and and return 0
+ } else
+ /* !self.inc && self.seq == 0*/
+ {
+ (0, true)
+ }
+ }
}
#[cfg(test)]
@@ 104,7 224,6 @@ mod tests {
#[test]
fn it_works() {
- let result = add(2, 2);
- assert_eq!(result, 4);
+ todo!()
}
}