~nch/glue

4c697886fd4e4ea5be038631bb9eb7a6ffa97919 — nc a month ago 6c754b5
add sdl2 scheduler
1 files changed, 104 insertions(+), 0 deletions(-)

A sdl2_scheduler.py
A sdl2_scheduler.py => sdl2_scheduler.py +104 -0
@@ 0,0 1,104 @@
import logging
import threading

from typing import Any, Optional

from rx.core import typing
from rx.internal import PriorityQueue
from rx.internal.constants import DELTA_ZERO

from ..scheduleditem import ScheduledItem
from ..periodicscheduler import PeriodicScheduler


log = logging.getLogger("Rx")


class PyGameScheduler(PeriodicScheduler):
    """A scheduler that schedules works for PyGame.
    Note that this class expects the caller to invoke run() repeatedly.
    http://www.pygame.org/docs/ref/time.html
    http://www.pygame.org/docs/ref/event.html"""

    def __init__(self, pygame: Any):
        """Create a new PyGameScheduler.
        Args:
            pygame: The PyGame module to use; typically, you would get this by
                import pygame
        """

        super().__init__()
        self._pygame = pygame  # TODO not used, refactor to actually use pygame?
        self._lock = threading.Lock()
        self._queue: PriorityQueue[ScheduledItem] = PriorityQueue()

    def schedule(self,
                 action: typing.ScheduledAction,
                 state: Optional[typing.TState] = None
                 ) -> typing.Disposable:
        """Schedules an action to be executed.
        Args:
            action: Action to be executed.
            state: [Optional] state to be given to the action function.
        Returns:
            The disposable object used to cancel the scheduled action
            (best effort).
        """

        log.debug("PyGameScheduler.schedule(state=%s)", state)
        return self.schedule_absolute(self.now, action, state=state)

    def schedule_relative(self,
                          duetime: typing.RelativeTime,
                          action: typing.ScheduledAction,
                          state: Optional[typing.TState] = None
                          ) -> typing.Disposable:
        """Schedules an action to be executed after duetime.
        Args:
            duetime: Relative time after which to execute the action.
            action: Action to be executed.
            state: [Optional] state to be given to the action function.
        Returns:
            The disposable object used to cancel the scheduled action
            (best effort).
        """

        duetime = max(DELTA_ZERO, self.to_timedelta(duetime))
        return self.schedule_absolute(self.now + duetime, action, state=state)

    def schedule_absolute(self,
                          duetime: typing.AbsoluteTime,
                          action: typing.ScheduledAction,
                          state: Optional[typing.TState] = None
                          ) -> typing.Disposable:
        """Schedules an action to be executed at duetime.
        Args:
            duetime: Absolute time at which to execute the action.
            action: Action to be executed.
            state: [Optional] state to be given to the action function.
        Returns:
            The disposable object used to cancel the scheduled action
            (best effort).
        """

        duetime = self.to_datetime(duetime)
        si: ScheduledItem = ScheduledItem(self, state, action, duetime)

        with self._lock:
            self._queue.enqueue(si)

        return si.disposable

    def run(self) -> None:
        while self._queue:
            with self._lock:
                item: ScheduledItem = self._queue.peek()
                diff = item.duetime - self.now

                if diff > DELTA_ZERO:
                    break

                item = self._queue.dequeue()

            if not item.is_cancelled():
                item.invoke()