~jpastuszek/multistream-batch

cb400175a5f397354dcdce6e63e564023acbbf2f — Jakub Pastuszek 7 months ago 6ade43f + c6cfe69
Merge branch 'master' of github.com:jpastuszek/multistream-batch
M .gitignore => .gitignore +2 -1
@@ 1,3 1,4 @@
/target
**/*.rs.bk
Cargo.lock
\ No newline at end of file
Cargo.lock
.vimrc

M Cargo.toml => Cargo.toml +3 -3
@@ 1,9 1,9 @@
[package]
name = "multistream-batch"
version = "0.1.3-alpha.0"
version = "1.0.1-alpha.0"
authors = ["Jakub Pastuszek <jpastuszek@protonmail.com>"]
description = "Different types and implementations of batching algorithms."
repository = "https://github.com/jpastuszek/multistream-batch"
description = "Implementations of batching algorithms"
repository = "https://sr.ht/~jpastuszek/multistream-batch"
documentation = "https://docs.rs/multistream-batch"
readme = "README.md"
keywords = ["batch", "multiline", "transaction"]

M README.md => README.md +9 -8
@@ 1,14 1,15 @@
[![Latest Version]][crates.io] [![Documentation]][docs.rs] ![License]

Rust library that provides different types and implementations of batching algorithms.
Rust library of batching algorithm implementations.

Batching is based on collecting items and flushing them all together when batch has reached some limit or when manually flushed. This makes all items collected in single batch available at once for further processing (e.g. batch insert into a database).
Batching works by accumulating items and later automatically flushing them all together when the batch has reached a limit.
All items collected in the single batch are available at once for further processing (e.g. batch insert into a database).

This implementations will construct batches based on:
* maximum number of items collected,
* maximum time duration since first item was collected by the batch,
These implementations will construct batches based on:
* limit of the number of items collected in a batch,
* limit of time duration since the first item appended to the batch,
* calling one of the batch consuming methods,
* sending flush command between batch items (channel based batches).
* sending flush command between batch items (channel-based implementations).

See [documentation](https://docs.rs/multistream-batch) of available algorithms.



@@ 22,8 23,8 @@ use multistream_batch::channel::multi_buf_batch::Command::*;
use std::time::Duration;
use assert_matches::assert_matches;

// Create producer thread and batcher with maximum size of 4 items (for each stream) and
// maximum batch duration since first received item of 200 ms.
// Create producer thread with a channel-based, multi-stream batching implementation configured with a maximum size
// of 4 items (for each stream) and a maximum batch duration since the first received item of 200 ms.
let mut batch = MultiBufBatchChannel::with_producer_thread(4, Duration::from_millis(200), 10, |sender| {
	// Send a sequence of `Append` commands with integer stream key and item value
	sender.send(Append(1, 1)).unwrap();

M src/buf_batch.rs => src/buf_batch.rs +15 -15
@@ 1,22 1,22 @@
//! This module provides `BufBatch` that will buffer items until batch is ready and provide them in
//! This module provides `BufBatch` that will buffer items until the batch is ready and provide them in
//! one go using `Drain` iterator.
use std::fmt::Debug;
use std::time::{Duration, Instant};
use std::vec::Drain;

/// Represents result from `poll` function call.
/// Represents result from the `poll` function call.
#[derive(Debug)]
pub enum PollResult {
    /// Batch is ready after reaching one of the limits.
    Ready,
    /// Batch has not reached one of its limits yet.
    /// Provides `Duration` after which `max_duration` limit will be reached if batch has
    /// Provides `Duration` after which `max_duration` limit will be reached if the batch has
    /// at least one item.
    NotReady(Option<Duration>),
}

/// Batches items in internal buffer up to `max_size` items or until `max_duration` has elapsed
/// since first item was appended to the batch.
/// since the first item appended to the batch.
///
/// This base implementation does not handle actual awaiting for batch duration timeout.
#[derive(Debug)]


@@ 28,10 28,10 @@ pub struct BufBatch<I: Debug> {
}

impl<I: Debug> BufBatch<I> {
    /// Creates batch given maximum batch size in number of items (`max_size`)
    /// and maximum duration that batch can last (`max_duration`) since first item appended to it.
    /// Creates batch given maximum batch size in the number of items stored (`max_size`)
    /// and maximum duration that batch can last (`max_duration`) since the first item appended to it.
    ///
    /// Panics if `max_size` == 0.
    /// Panics if `max_size == 0`.
    pub fn new(max_size: usize, max_duration: Duration) -> BufBatch<I> {
        assert!(max_size > 0, "BufBatch::new bad max_size");



@@ 43,11 43,11 @@ impl<I: Debug> BufBatch<I> {
        }
    }

    /// Checks if batch has reached one of its limits.
    /// Checks if the batch has reached one of its limits.
    ///
    /// Returns:
    /// * `PollResult::Ready` - batch has reached one of its limit and is ready to be consumed,
    /// * `PollResult::NotReady(None)` - batch is not ready yet and has no items appeded yet,
    /// * `PollResult::Ready` - batch has reached one of its limits and is ready to be consumed,
    /// * `PollResult::NotReady(None)` - batch is not ready yet and has no items appended yet,
    /// * `PollResult::NotReady(Some(duration))` - batch is not ready yet but it will be ready after time duration due to duration limit.
    pub fn poll(&self) -> PollResult {
        debug_assert!(self.items.is_empty() ^ self.first_item.is_some());


@@ 68,11 68,11 @@ impl<I: Debug> BufBatch<I> {
        PollResult::NotReady(None)
    }

    /// Appends item to batch and returns reference to that item.
    /// Appends item to batch and returns a reference to that item.
    ///
    /// It is an contract error to append batch that is ready according to `self.poll()`.
    /// It is a contract error to append batch that is ready according to `self.poll()`.
    ///
    /// Panics if batch has already reached its `max_size` limit.
    /// Panics if the batch has already reached its `max_size` limit.
    pub fn append(&mut self, item: I) -> &I {
        debug_assert!(self.items.is_empty() ^ self.first_item.is_some());
        assert!(


@@ 87,13 87,13 @@ impl<I: Debug> BufBatch<I> {
        self.items.last().unwrap()
    }

    /// Starts new batch dropping all buffered items.
    /// Starts a new batch by dropping all buffered items.
    pub fn clear(&mut self) {
        self.first_item = None;
        self.items.clear();
    }

    /// Starts new batch by draining all buffered items.
    /// Starts a new batch by draining all buffered items.
    pub fn drain(&mut self) -> Drain<I> {
        self.first_item = None;
        self.items.drain(0..)

M src/channel.rs => src/channel.rs +3 -3
@@ 1,6 1,6 @@
//! Batch implementations that use channels and threads to support simultaneously receiving items and awaiting on timeouts.
//!
//! This implementations are using `crossbeam_channel` to implement awaiting for items or timeout.
//! These implementations are using `crossbeam_channel` to implement awaiting for items or timeout.

pub mod buf_batch;
pub mod multi_buf_batch;


@@ 9,8 9,8 @@ pub mod tx_buf_batch;
use std::error::Error;
use std::fmt;

/// Error returned by channel based implementations when `Sender` end of
/// channel was dropped and no more outstanding data is left to be provided.
/// The error that is returned by channel based implementations when `Sender` end of
/// the channel was dropped and no more outstanding items are left to be provided.
#[derive(PartialEq, Eq, Clone, Copy, Debug)]
pub struct EndOfStreamError;


M src/channel/buf_batch.rs => src/channel/buf_batch.rs +18 -14
@@ 1,10 1,10 @@
/*!
This module provides `BufBatchChannel` that will buffer items until batch is ready and provides them in
This module provides `BufBatchChannel` that will buffer items until the batch is ready and provide them in
one go using `Drain` iterator.

# Example

Collect batches of items by reaching different limits and using `Flush` command.
Collect batches of items after reaching different limits and use of `Flush` command.

```rust
use multistream_batch::channel::buf_batch::BufBatchChannel;


@@ 12,8 12,8 @@ use multistream_batch::channel::buf_batch::Command::*;
use std::time::Duration;
use assert_matches::assert_matches;

// Create producer thread and batcher with maximum size of 4 items and
// maximum batch duration since first received item of 200 ms.
// Create producer thread with `BufBatchChannel` configured with a maximum size of 4 items and
// a maximum batch duration since the first received item of 200 ms.
let mut batch = BufBatchChannel::with_producer_thread(4, Duration::from_millis(200), 10, |sender| {
    // Send a sequence of `Append` commands with integer item value
    sender.send(Append(1)).unwrap();


@@ 71,7 71,7 @@ use std::fmt::Debug;
use std::time::Duration;
use std::vec::Drain;

/// Commands that can be send to `BufBatchChannel` via `Sender` endpoint.
/// Commands that can be sent to `BufBatchChannel` via `Sender` endpoint.
#[derive(Debug)]
pub enum Command<I: Debug> {
    /// Append item `I` to batch.


@@ 81,7 81,7 @@ pub enum Command<I: Debug> {
}

/// Batches items in internal buffer up to `max_size` items or until `max_duration` has elapsed
/// since first item was appended to the batch.
/// since the first item appended to the batch.
#[derive(Debug)]
pub struct BufBatchChannel<I: Debug> {
    channel: Receiver<Command<I>>,


@@ 91,11 91,13 @@ pub struct BufBatchChannel<I: Debug> {
}

impl<I: Debug> BufBatchChannel<I> {
    /// Creates batch given maximum batch size in number of items (`max_size`)
    /// and maximum duration that batch can last (`max_duration`) since first item appended to it.
    /// It also returns `Sender` endpoint into which `Command`s can be sent.
    /// Creates batch given maximum batch size in the number of items stored (`max_size`)
    /// and maximum duration that batch can last (`max_duration`) since the first item appended to it.
    /// Parameter `channel_size` defines the maximum number of messages that can be buffered between sender and receiver.
    ///
    /// Panics if `max_size` == 0.
    /// This method also returns `Sender` endpoint that can be used to send `Command`s.
    ///
    /// Panics if `max_size == 0`.
    pub fn new(
        max_size: usize,
        max_duration: Duration,


@@ 113,7 115,8 @@ impl<I: Debug> BufBatchChannel<I> {
        )
    }

    /// Crates batch calling `producer` closure with `Sender` end of the channel in newly started thread.
    /// Calls `producer` closure with `Sender` end of the channel in a newly started thread and
    /// returns `BufBatchChannel` connected to that `Sender`.
    pub fn with_producer_thread(
        max_size: usize,
        max_duration: Duration,


@@ 134,7 137,8 @@ impl<I: Debug> BufBatchChannel<I> {
    ///
    /// This call will block until batch becomes ready.
    ///
    /// Returns `Err(EndOfStreamError)` after `Sender` end was dropped and all batched items were flushed.
    /// When the `Sender` end has dropped, this method returns with `Err(EndOfStreamError)` after all
    /// outstanding items were flushed.
    pub fn next(&mut self) -> Result<Drain<I>, EndOfStreamError> {
        if self.disconnected {
            return Err(EndOfStreamError);


@@ 178,7 182,7 @@ impl<I: Debug> BufBatchChannel<I> {
        }
    }

    /// Checks if previous `self.next()` call found channel to be disconnected.
    /// Checks if previous `self.next()` call found the channel to be disconnected.
    pub fn is_disconnected(&self) -> bool {
        self.disconnected
    }


@@ 209,7 213,7 @@ impl<I: Debug> BufBatchChannel<I> {
        DrainToEnd(buffer.into_vec().into_iter(), channel)
    }

    /// Splits into `BufBatch` item buffer and channel `Receiver` end
    /// Splits into `BufBatch` item buffer and channel `Receiver` end.
    pub fn split(self) -> (BufBatch<I>, Receiver<Command<I>>) {
        (self.batch, self.channel)
    }

M src/channel/multi_buf_batch.rs => src/channel/multi_buf_batch.rs +16 -12
@@ 1,6 1,6 @@
/*!
This module provides `MultiBufBatchChannel` that will buffer items into multiple internal batches based on batch stream key until
one of the batches is ready and provides this items in one go along with the batch stream key using `Drain` iterator.
one of the batches is ready and provide them in one go, along with the batch stream key, using `Drain` iterator.

# Example



@@ 12,8 12,8 @@ use multistream_batch::channel::multi_buf_batch::Command::*;
use std::time::Duration;
use assert_matches::assert_matches;

// Create producer thread and batcher with maximum size of 4 items (for each stream) and
// maximum batch duration since first received item of 200 ms.
// Create producer thread with `MultiBufBatchChannel` configured with a maximum size of 4 items and
// a maximum batch duration since the first received item of 200 ms.
let mut batch = MultiBufBatchChannel::with_producer_thread(4, Duration::from_millis(200), 10, |sender| {
    // Send a sequence of `Append` commands with integer stream key and item value
    sender.send(Append(1, 1)).unwrap();


@@ 101,7 101,7 @@ use std::hash::Hash;
use std::time::Duration;
use std::vec::Drain;

/// Commands that can be send to `MultiBufBatchChannel` via `Sender` endpoint.
/// Commands that can be sent to `MultiBufBatchChannel` via `Sender` endpoint.
#[derive(Debug)]
pub enum Command<K: Debug + Ord + Hash, I: Debug> {
    /// Append item `I` to batch with stream key `K`.


@@ 110,9 110,9 @@ pub enum Command<K: Debug + Ord + Hash, I: Debug> {
    Flush(K),
}

/// Collects items into multiple batches based on stream key.
/// Collects items into multiple batches based on the stream key.
/// A batch may become ready after collecting `max_size` number of items or until `max_duration` has elapsed
/// since first item was appended to the batch.
/// since the first item appended to the batch.
///
/// Batch item buffers are cached and reused to avoid allocations.
#[derive(Debug)]


@@ 129,10 129,12 @@ where
    I: Debug + Send + 'static,
{
    /// Crates new instance with given maximum batch size (`max_size`) and maximum duration (`max_duration`) that
    /// batch can last since first item appended to it.
    /// It also returns `Sender` endpoint into which `Command`s can be sent.
    /// batch can last since the first item appended to it.
    /// Parameter `channel_size` defines the maximum number of messages that can be buffered between sender and receiver.
    ///
    /// Panics if `max_size` == 0.
    /// This method also returns `Sender` endpoint that can be used to send `Command`s.
    ///
    /// Panics if `max_size == 0`.
    pub fn new(
        max_size: usize,
        max_duration: Duration,


@@ 150,7 152,8 @@ where
        )
    }

    /// Crates batch calling `producer` closure with `Sender` end of the channel in newly started thread.
    /// Calls `producer` closure with `Sender` end of the channel in a newly started thread and
    /// returns `MultiBufBatchChannel` connected to that `Sender`.
    pub fn with_producer_thread(
        max_size: usize,
        max_duration: Duration,


@@ 171,7 174,8 @@ where
    ///
    /// This call will block until one of the batches becomes ready.
    ///
    /// Returns `Err(EndOfStreamError)` after `Sender` end was dropped and all outstanding batches were flushed.
    /// When the `Sender` end has dropped, this method returns with `Err(EndOfStreamError)` after all
    /// outstanding items were flushed.
    pub fn next<'i>(&'i mut self) -> Result<(K, Drain<I>), EndOfStreamError> {
        loop {
            if self.flush.is_some() {


@@ 268,7 272,7 @@ where
        self.batch.stats()
    }

    /// Splits into `MultiBufBatch` item buffer and channel `Receiver` end
    /// Splits into `MultiBufBatch` item buffer and channel `Receiver` end.
    pub fn split(self) -> (MultiBufBatch<K, I>, Receiver<Command<K, I>>) {
        (self.batch, self.channel)
    }

M src/channel/tx_buf_batch.rs => src/channel/tx_buf_batch.rs +18 -14
@@ 13,7 13,7 @@ use std::time::Duration;
use assert_matches::assert_matches;

// Create producer thread and batcher with maximum size of 4 items and
// maximum batch duration since first received item of 200 ms.
// maximum batch duration since the first received item of 200 ms.
let mut batch = TxBufBatchChannel::with_producer_thread(4, Duration::from_millis(200), 10, |sender| {
    // Send a sequence of `Append` commands with integer item value
    sender.send(Append(1)).unwrap();


@@ 93,7 93,7 @@ use std::fmt::Debug;
use std::time::Duration;
use std::vec::Drain;

/// Commands that can be send to `TxBufBatchChannel` via `Sender` endpoint.
/// Commands that can be sent to `TxBufBatchChannel` via `Sender` endpoint.
#[derive(Debug)]
pub enum Command<I: Debug> {
    /// Append item `I` to batch.


@@ 107,17 107,17 @@ pub enum Command<I: Debug> {
pub struct Complete<'i, I: Debug>(&'i mut TxBufBatchChannel<I>);

impl<'i, I: Debug> Complete<'i, I> {
    /// Restarts batch making `TxBufBatchChannel.next()` to iterate already received items starting from oldest one in current batch.
    /// Restarts the batch. `TxBufBatchChannel.next()` will iterate already received items starting from oldest one in the current batch.
    pub fn retry(&mut self) {
        self.0.retry()
    }

    /// Commits current batch by dropping all buffered items.
    /// Commits the current batch by dropping all buffered items.
    pub fn commit(&mut self) {
        self.0.clear()
    }

    /// Commits current batch by draining all buffered items.
    /// Commits the current batch by draining all buffered items.
    pub fn drain(&mut self) -> Drain<I> {
        self.0.drain()
    }


@@ 135,11 135,10 @@ pub enum TxBufBatchChannelResult<'i, I: Debug> {
}

/// Batches items in internal buffer up to `max_size` items or until `max_duration` has elapsed
/// since first item was appended to the batch. Reference to each item is returned for every
/// since the first item appended to the batch. Reference to each item is returned for every
/// received item as soon as they are received.
///
/// This batch can provide all the buffered item references in order as they were received again
/// after batch was completed but retried (not committed).
/// The current batch can be retried. Iteration will yield not committed items again.
#[derive(Debug)]
pub struct TxBufBatchChannel<I: Debug> {
    channel: Receiver<Command<I>>,


@@ 151,10 150,13 @@ pub struct TxBufBatchChannel<I: Debug> {
}

impl<I: Debug> TxBufBatchChannel<I> {
    /// Creates batch given maximum batch size in number of items (`max_size`)
    /// and maximum duration that batch can last (`max_duration`) since first item appended to it.
    /// Creates batch given maximum batch size in the number of items stored (`max_size`)
    /// and maximum duration that batch can last (`max_duration`) since the first item appended to it.
    /// Parameter `channel_size` defines the maximum number of messages that can be buffered between sender and receiver.
    ///
    /// Panics if `max_size` == 0.
    /// This method also returns `Sender` endpoint that can be used to send `Command`s.
    ///
    /// Panics if `max_size == 0`.
    pub fn new(
        max_size: usize,
        max_duration: Duration,


@@ 173,7 175,8 @@ impl<I: Debug> TxBufBatchChannel<I> {
        )
    }

    /// Crates batch calling `producer` closure with `Sender` end of the channel in newly started thread.
    /// Calls `producer` closure with `Sender` end of the channel in a newly started thread and
    /// returns `TxBufBatchChannel` connected to that `Sender`.
    pub fn with_producer_thread(
        max_size: usize,
        max_duration: Duration,


@@ 195,7 198,8 @@ impl<I: Debug> TxBufBatchChannel<I> {
    ///
    /// This call will block until batch becomes ready.
    ///
    /// Returns `Err(EndOfStreamError)` after `Sender` end was dropped and all batched items were flushed.
    /// When the `Sender` end has dropped, this method returns with `Err(EndOfStreamError)` after all
    /// outstanding items were flushed.
    pub fn next(&mut self) -> Result<TxBufBatchChannelResult<I>, EndOfStreamError> {
        // Yield internal messages if batch was retried
        if let Some(retry) = self.retry {


@@ 311,7 315,7 @@ impl<I: Debug> TxBufBatchChannel<I> {
        DrainToEnd(buffer.into_vec().into_iter(), channel)
    }

    /// Splits into `BufBatch` item buffer and channel `Receiver` end
    /// Splits into `BufBatch` item buffer and channel `Receiver` end.
    pub fn split(self) -> (BufBatch<I>, Receiver<Command<I>>) {
        (self.batch, self.channel)
    }

M src/lib.rs => src/lib.rs +7 -6
@@ 1,13 1,14 @@
/*!
This Rust library provides different types and implementations of batching algorithms.
Implementations of batching algorithms.

Batching is based on collecting items and flushing them all together when batch has reached some limit or when manually flushed. This makes all items collected in single batch available at once for further processing (e.g. batch insert into a database).
Batching works by accumulating items and later automatically flushing them all together when the batch has reached a limit.
All items collected in the single batch are available at once for further processing (e.g. batch insert into a database).

This implementations will construct batches based on:
* maximum number of items collected,
* maximum time duration since first item was collected by the batch,
These implementations will construct batches based on:
* limit of the number of items collected in a batch,
* limit of time duration since the first item appended to the batch,
* calling one of the batch consuming methods,
* sending flush command between batch items (channel based batches).
* sending flush command between batch items (channel-based implementations).

See sub modules for documentation of available algorithms.
!*/

M src/multi_buf_batch.rs => src/multi_buf_batch.rs +3 -3
@@ 1,12 1,12 @@
//! This module provides `MultiBufBatch` that will buffer items into multiple internal batches based on batch stream key until
//! one of the batches is ready and provides this items in one go along with the batch stream key using `Drain` iterator.
//! one of the batches is ready. Then it provides accumulated items in one go along with the batch stream key using `Drain` iterator.
use linked_hash_map::LinkedHashMap;
use std::fmt::Debug;
use std::hash::Hash;
use std::time::{Duration, Instant};
use std::vec::Drain;

/// Represents outstanding batch with items buffer from cache and `Instant` at which it was crated.
/// An outstanding batch of items returned from the cache and a time `Instant` at which it was created.
#[derive(Debug)]
struct OutstandingBatch<I: Debug> {
    items: Vec<I>,


@@ 79,7 79,7 @@ where
    /// Crates new instance with given maximum batch size (`max_size`) and maximum duration (`max_duration`) that
    /// batch can last since first item appended to it.
    ///
    /// Panics if `max_size` == 0.
    /// Panics if `max_size == 0`.
    pub fn new(max_size: usize, max_duration: Duration) -> MultiBufBatch<K, I> {
        assert!(max_size > 0, "MultiBufBatch::new bad max_size");