~fabrixxm/activist

d13b6bc43efdc31bba6ae6b24f93cde433075615 — fabrixxm 7 months ago 820d2ac wip-taskrunner-threadpool
taskrunner: run tasks in a threadpool
2 files changed, 23 insertions(+), 12 deletions(-)

M activist/commands/admin.py
M activist/tasks.py
M activist/commands/admin.py => activist/commands/admin.py +22 -11
@@ 1,4 1,7 @@
import time
import concurrent.futures
import threading
import logging

from climatik import command
from cryptography.hazmat.primitives import serialization


@@ 9,11 12,15 @@ from inotify_simple import INotify, flags   # type: ignore
from ..config import settings
from .. import background
from .. import tasks    # register backround tasks
from ..app import app
from .. import activitystream
from .. import activitypub
from .. import db


logger = logging.getLogger(__name__)


@command("generate-keypair")
def generate_keypair() -> int:
    """


@@ 64,15 71,19 @@ def taskrunner():
    """
    Execute pending tasks in a loop
    """
    waitfunc = lambda : time.sleep(30)
    try:
        inotify = INotify()
        wd = inotify.add_watch(background.Background.taskstore(), flags.CREATE)
        waitfunc = lambda : inotify.read(timeout=60000, read_delay=100)
    except Exception as e:
        print("inotify not available, using sleep()")
    def _runner(task):
        logger.info("Run task on thread #%s",  threading.get_ident())
        with app.app_context():
            background.background.execute(task)
            

    while True:
        res = background.execute()
        if res is None:
            waitfunc()
    logger.info("Start task runner on thread #%s",  threading.get_ident())
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        futures = []
        while True:
            task = background.background.get_next()
            if task is not None:
                executor.submit(_runner, task)
                time.sleep(.5)
            else:
                time.sleep(10)

M activist/tasks.py => activist/tasks.py +1 -1
@@ 115,7 115,7 @@ def update_actors(actorid:Optional[str] = None):
        if data is not None:
            activitypub.save_recursive(data)

        next_execution = datetime.datetime.now() + datetime.timedelta(minutes=15)
        next_execution = datetime.datetime.now()
        actorid = actor.id
    else:
        next_execution = datetime.datetime.now() + datetime.timedelta(days=1)