~science-computing/butido

9cbbff2be1f5938c83c097c6cb1094c8b2bfc49e — Matthias Beyer 3 years ago 5c9f1d4
Implement scheduling with max jobs per endpoint

This patch implements support for max jobs per endpoint.
The number of running jobs on one endpoint are tracked with a wrapper around the
Endpoint object, which increases the job counter on allocation and decreases it
on deallocation.

This way, the scheduler can know how many jobs are running on one endpoint and
select the next endpoint accordingly.

The loading/comparing is not perfect, so it might happen that more jobs run on
one endpoint than configured, but this is the first step into the right
direction.

Also, the selection happens on a tokio job which runs in a loop{}. Because this
almost blocks the whole executor thread, we use `tokio::task::yield_now()` as
soon as there is no free endpoint anymore, to yield the execution to another
future to free resources for doing actual work, not scheduling.

Signed-off-by: Matthias Beyer <matthias.beyer@atos.net>
Tested-by: Matthias Beyer <matthias.beyer@atos.net>
2 files changed, 40 insertions(+), 27 deletions(-)

M src/endpoint/configured.rs
M src/endpoint/scheduler.rs
M src/endpoint/configured.rs => src/endpoint/configured.rs +29 -8
@@ 56,6 56,9 @@ pub struct Endpoint {

    #[getset(get = "pub")]
    uri: String,

    #[builder(default)]
    running_jobs: std::sync::atomic::AtomicUsize,
}

impl Debug for Endpoint {


@@ 228,17 231,35 @@ impl Endpoint {
        PreparedContainer::new(self, job, staging_store, release_stores).await
    }

    pub async fn number_of_running_containers(&self) -> Result<usize> {
        self.docker
            .containers()
            .list(&Default::default())
            .await
            .with_context(|| anyhow!("Getting number of running containers on {}", self.name))
            .map_err(Error::from)
            .map(|list| list.len())
    pub fn running_jobs(&self) -> usize {
        self.running_jobs.load(std::sync::atomic::Ordering::Relaxed)
    }
}

pub struct EndpointHandle(Arc<Endpoint>);

impl EndpointHandle {
    pub fn new(ep: Arc<Endpoint>) -> Self {
        let _ = ep.running_jobs.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
        EndpointHandle(ep)
    }
}

impl Drop for EndpointHandle {
    fn drop(&mut self) {
        let _ = self.0.running_jobs.fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
    }
}

impl std::ops::Deref for EndpointHandle {
    type Target = Endpoint;

    fn deref(&self) -> &Self::Target {
        self.0.deref()
    }
}


#[derive(Getters)]
pub struct PreparedContainer<'a> {
    endpoint: &'a Endpoint,

M src/endpoint/scheduler.rs => src/endpoint/scheduler.rs +11 -19
@@ 29,6 29,7 @@ use uuid::Uuid;

use crate::db::models as dbmodels;
use crate::endpoint::Endpoint;
use crate::endpoint::EndpointHandle;
use crate::endpoint::EndpointConfiguration;
use crate::filestore::ArtifactPath;
use crate::filestore::ReleaseStore;


@@ 84,11 85,7 @@ impl EndpointScheduler {
    /// # Warning
    ///
    /// This function blocks as long as there is no free endpoint available!
    pub async fn schedule_job(
        &self,
        job: RunnableJob,
        bar: indicatif::ProgressBar,
    ) -> Result<JobHandle> {
    pub async fn schedule_job(&self, job: RunnableJob, bar: indicatif::ProgressBar) -> Result<JobHandle> {
        let endpoint = self.select_free_endpoint().await?;

        Ok(JobHandle {


@@ 103,29 100,24 @@ impl EndpointScheduler {
        })
    }

    async fn select_free_endpoint(&self) -> Result<Arc<Endpoint>> {
    async fn select_free_endpoint(&self) -> Result<EndpointHandle> {
        loop {
            let ep = self
                .endpoints
                .iter()
                .cloned()
                .map(|ep| async move {
                    ep.number_of_running_containers()
                        .await
                        .map(|num_running| (num_running, ep.clone()))
                .filter(|ep| { // filter out all running containers where the number of max jobs is reached
                    let r = ep.running_jobs() < ep.num_max_jobs();
                    trace!("Endpoint {} considered for scheduling job: {}", ep.name(), r);
                    r
                })
                .collect::<futures::stream::FuturesUnordered<_>>()
                .collect::<Result<Vec<_>>>()
                .await?
                .iter()
                .sorted_by(|tpla, tplb| tpla.0.cmp(&tplb.0))
                .map(|tpl| tpl.1.clone())
                .sorted_by(|ep1, ep2| ep1.running_jobs().cmp(&ep2.running_jobs()))
                .next();

            if let Some(endpoint) = ep {
                return Ok(endpoint);
                return Ok(EndpointHandle::new(endpoint.clone()));
            } else {
                trace!("No free endpoint found, retry...");
                tokio::task::yield_now().await
            }
        }
    }


@@ 133,7 125,7 @@ impl EndpointScheduler {

pub struct JobHandle {
    log_dir: Option<PathBuf>,
    endpoint: Arc<Endpoint>,
    endpoint: EndpointHandle,
    job: RunnableJob,
    bar: ProgressBar,
    db: Arc<PgConnection>,