~fabrixxm/activist

1cbb979c922528c075e59e206002c9e5b174f342 — fabrixxm 10 months ago 5bc616f
Rework background task code
2 files changed, 143 insertions(+), 88 deletions(-)

M activist/background.py
M activist/commands/admin.py
M activist/background.py => activist/background.py +141 -86
@@ 1,14 1,21 @@
from typing import Optional, TypedDict, Dict, Any, Callable, List
import dataclasses
import logging
import json
from pathlib import Path
from datetime import datetime, timedelta

from activist import tasks

from .config import settings


logger = logging.getLogger(__name__)


# background task machinery

"""
TaskType = TypedDict("TaskType",{
    'fnc':str, 
    'args':List[Any], 


@@ 18,95 25,143 @@ TaskType = TypedDict("TaskType",{
    'lasttry': str,
    'error':Optional[Dict[str,str]]
}, total=False)

registered_tasks:Dict[str, Callable] = {}

def background(fnc:Callable, retry:int=10):
    def _new_background(*args, **kwargs):
        new(fnc, args, kwargs, retry)
    return _new_background


def new(fnc:Callable, args:List[Any], kwargs:Dict[str, Any], retry:int=10) -> None:
    """
    Add new task
    """
    registered_tasks[fnc.__name__] = fnc
    now = datetime.now()
    data:TaskType = {
        'fnc': fnc.__name__,
        'args': args,
        'kwargs': kwargs,
        'created': now.isoformat(),
        'lasttry': "",
        'retry': retry,
    }
    taskfile = settings.STORE / "tasks/pending" / now.isoformat()
    taskfile.parent.mkdir(parents=True, exist_ok=True)
    with taskfile.open(mode="w") as f:
        json.dump(data, f)


def get_next() -> Optional[Path]:
    pending_path = settings.STORE / "tasks/pending"
    now = datetime.now().isoformat()
    # get tasks pending in past
    tasks = [p for p in pending_path.glob("*") if p.name < now]
    if len(tasks) == 0:
        return None
    return tasks[-1]


class TaskCtx:
    def __init__(self, taskfile:Path) -> None:
        self.taskfile = taskfile

    def _move_to(self, state:str, newname = None) -> None:
        destpath = settings.STORE / "tasks" / state / self.taskfile.name
        if newname:
            destpath = destpath.with_name(newname)
"""

@dataclasses.dataclass
class TaskType:
    fnc:str
    args:List[Any]
    kwargs:Dict[str,Any]
    retry:int
    created:str
    lasttry:str
    error:Optional[Dict[str,str]] = None

    _file:Optional[Path] = None

    @classmethod
    def from_file(self, file:Path) -> 'TaskType':
        d = json.loads(file.read_text())
        return TaskType(_file=file, **d)

    def to_json(self) -> str:
        return json.dumps(dict((k,v) for k,v in self.__dict__.items() if not k.startswith("_")))

    def _set_state(self, state:str, newname = None) -> None:
        logger.debug("move taskfile to state %r , optional new name %r", state, newname)
        if newname is None and self._file is None:
            newname = datetime.now().isoformat()
        elif newname is None and self._file:
            newname = self._file.name

        destpath = settings.STORE / "tasks" / state / newname
        
        destpath.parent.mkdir(parents=True, exist_ok=True)
        print(f"{self.taskfile} => {destpath}")
        self.taskfile.rename(destpath)
        self.taskfile = destpath
    
    def __enter__(self) -> TaskType:
        self._move_to("processing")
        return json.loads(self.taskfile.read_text())

    def __exit__(self, extype, value, traceback) -> Optional[bool]:
        if extype is None:
            self.taskfile.unlink()
        logger.debug("move %r to %r", self._file, destpath)
        if self._file:
            self._file.rename(destpath)
        self._file = destpath

    def on_processing(self) -> None:
        self._set_state("processing")

    def on_error(self, e:Exception) -> None:
        self.retry -= 1
        self.lasttry = datetime.now().isoformat()
        self.error = {
            'type': type(e).__name__,
            'value': str(e),
            'traceback': str(e.__traceback__)
        }

        if self._file:
            self._file.write_text(self.to_json())

        if self.retry == 0:
            logger.info("task failed too many times: failed")
            self._set_state("failed")
        else:
            # seconds to wait to retry.
            # given a retry value 1-9, this results in a delay
            # of circa 82 secs to 74 hours (3 days)
            next = 30 * 2.75 ** (10-self.retry)
            retry_at = datetime.now() + timedelta(seconds=next)
            logger.info("retry task not before %s, retry #%d", retry_at.isoformat(), self.retry)
            self._set_state("pending", newname=retry_at.isoformat())

    def on_success(self):
        self._set_state("success")


class Background:
    def __init__(self) -> None:
        self.logger = logging.getLogger(__name__ + ".Background")
        self.registered_tasks:Dict[str, Callable] = {}

    def __call__(self, fnc:Callable, retry:int=10) -> Callable:
        self.registered_tasks[fnc.__name__] = fnc
        def _new_background(*args, **kwargs):
            self.new(fnc, args, kwargs, retry)
        return _new_background

    def new(self, fnc:Callable, args:List[Any], kwargs:Dict[str, Any], retry:int=10) -> None:
        """
        Add new task
        """
        self.logger.info("new task %r %r %r", fnc, args, kwargs)
        now = datetime.now()
        task = TaskType(
            fnc = fnc.__name__,
            args = args,
            kwargs = kwargs,
            created = now.isoformat(),
            lasttry = "",
            retry = retry,
        )
        taskfile = settings.STORE / "tasks/pending" / now.isoformat()
        taskfile.parent.mkdir(parents=True, exist_ok=True)
        taskfile.write_text(task.to_json())
        self.logger.debug("new task in %r", taskfile)

    def get_next(self) -> Optional[TaskType]:
        pending_path = settings.STORE / "tasks/pending"
        now = datetime.now().isoformat()
        logger.debug("get_next task before %r", now)
        # get tasks pending in past
        tasksfiles = [p for p in pending_path.glob("*") if p.name < now]
        if len(tasksfiles) == 0:
            logger.debug("get_next no task")
            return None
        
        taskfile = tasksfiles[-1]
        logger.debug("get_next taskfile %r", taskfile)

        task = TaskType.from_file(taskfile)
        return task

    def execute(self, task:TaskType) -> None:
        task.on_processing()
        try:
            function = self.registered_tasks[task.fnc]
            function(*task.args, **task.kwargs)
        except KeyError as e:
            logger.exception(e)
            task.on_error(e)
        except Exception as e:
            logger.exception(e)
            task.on_error(e)
        else:
            data:TaskType = json.loads(self.taskfile.read_text())
            data['retry'] = data['retry'] - 1
            data['lasttry'] = datetime.now().isoformat()
            data['error'] = {
                'type': repr(extype),
                'value': str(value),
                'traceback': str(traceback)
            }
            with self.taskfile.open(mode="w") as f:
                json.dump(data, f)
            if data['retry'] == 0:
                self._move_to("failed")
            else:
                retry_at = datetime.now() + timedelta(days=1)
                self._move_to("pending", newname=retry_at.isoformat())

        return True
            task.on_success()



# global background tasks register
background = Background()


def execute():
    taskfile = get_next()
    print("task file:", taskfile)
    if taskfile is None:
    task = background.get_next()
    if task is None:
        return
    
    with TaskCtx(taskfile) as task:
        fncname, args, kwargs = task['fnc'], task['args'], task['kwargs']
        fnc = registered_tasks.get(fncname, None)
        print("\t", fnc, args, kwargs)
        if fnc:
            fnc(*args, **kwargs)
    
\ No newline at end of file
    background.execute(task)


M activist/commands/admin.py => activist/commands/admin.py +2 -2
@@ 4,7 4,7 @@ from ..config import settings
from .. import activitystream
from .. import store
from .. import tasks

from .. import background

@command
def runserver(host:str="127.0.0.1", port:int=5000):


@@ 20,7 20,7 @@ def runtask():
    """
    Run the oldest background task
    """
    tasks.execute()
    background.execute()


@command