~cypheon/trakka

5f6aeb063e2ca293be2a1804a802bb61ade9ffbe — Johann Rudloff 2 years ago 3d56301
Use Celery task queue for background analysis processing
M README.md => README.md +8 -0
@@ 17,10 17,18 @@ poetry run ./manage.py migrate

## Starting the Server

Start the main development server:

```sh
poetry run ./manage.py runserver
```

Start the worker pool performing track analysis jobs:

```sh
poetry run celery --app trakka worker --loglevel DEBUG
```


## License


M activities/services.py => activities/services.py +0 -15
@@ 137,18 137,3 @@ def process_activity(act: Activity):

    duration = time.monotonic() - begin
    logger.info('finished processing activity %d in %d ms', act.id, duration * 1000)

def process_update_activity(act: Activity):
    try:
        process_activity(act)
        act.status = 'DONE'
        act.save()
    except:
        act.status = 'ERROR'
        act.save()
        logger.error(f'error processing activity {act.id}', exc_info=True)
        logger.error(f'activity content: {bytes(act.orig_upload.content[:512])}')

def queue_analysis(act: Activity):
    # TODO: actually queue, instead of processing synchronously
    process_update_activity(act)

A activities/tasks.py => activities/tasks.py +21 -0
@@ 0,0 1,21 @@
import logging

from celery import shared_task

from .models import Activity
from .services import process_activity

logger = logging.getLogger(__name__)

@shared_task
def process_update_activity(activity_id: int):
    act = Activity.objects.get(id=activity_id)
    try:
        process_activity(act)
        act.status = 'DONE'
        act.save()
    except:
        act.status = 'ERROR'
        act.save()
        logger.error(f'error processing activity {act.id}', exc_info=True)
        logger.error(f'activity content: {bytes(act.orig_upload.content[:512])}')

M activities/views.py => activities/views.py +5 -3
@@ 9,7 9,8 @@ from django.contrib.auth.models import User
from django import forms

from .models import Activity
from .services import UPLOAD_MAX_SIZE, create_activity, queue_analysis
from .services import UPLOAD_MAX_SIZE, create_activity
from .tasks import process_update_activity
from . import graph

import datetime


@@ 35,9 36,9 @@ def health(request):
    return render(request, 'health.html')

def schedule(request):
    unprocessed = Activity.objects.filter(status='PENDING').all()[:10]
    unprocessed = Activity.objects.filter(status='PENDING').all()[:1000]
    for act in unprocessed:
        queue_analysis(act)
        process_update_activity.delay(act.id)

    return HttpResponse('done')



@@ 74,6 75,7 @@ def upload(request):
            act = create_activity(request.user, title, body)
            act.orig_upload.save()
            act.save()
            process_update_activity.delay(act.id)
            return HttpResponse('upload OK')
        else:
            print(f'form invalid: {form.errors}')

M poetry.lock => poetry.lock +206 -5
@@ 1,4 1,15 @@
[[package]]
category = "main"
description = "Low-level AMQP client for Python (fork of amqplib)."
name = "amqp"
optional = false
python-versions = ">=3.6"
version = "5.0.5"

[package.dependencies]
vine = "5.0.0"

[[package]]
category = "dev"
description = "Disable App Nap on macOS >= 10.9"
marker = "sys_platform == \"darwin\" or platform_system == \"Darwin\" or python_version >= \"3.3\" and sys_platform == \"darwin\""


@@ 66,6 77,14 @@ python-versions = "*"
version = "0.2.0"

[[package]]
category = "main"
description = "Python multiprocessing fork with improvements and bugfixes"
name = "billiard"
optional = false
python-versions = "*"
version = "3.6.3.0"

[[package]]
category = "dev"
description = "An easy safelist-based HTML-sanitizing tool."
name = "bleach"


@@ 80,6 99,58 @@ webencodings = "*"

[[package]]
category = "main"
description = "Distributed Task Queue."
name = "celery"
optional = false
python-versions = ">=3.6,"
version = "5.0.5"

[package.dependencies]
billiard = ">=3.6.3.0,<4.0"
click = ">=7.0,<8.0"
click-didyoumean = ">=0.0.3"
click-plugins = ">=1.1.1"
click-repl = ">=0.1.6"
kombu = ">=5.0.0,<6.0"
pytz = ">0.0-dev"
vine = ">=5.0.0,<6.0"

[package.extras]
arangodb = ["pyArango (>=1.3.2)"]
auth = ["cryptography"]
azureblockblob = ["azure-storage (0.36.0)", "azure-common (1.1.5)", "azure-storage-common (1.1.0)"]
brotli = ["brotli (>=1.0.0)", "brotlipy (>=0.7.0)"]
cassandra = ["cassandra-driver (<3.21.0)"]
consul = ["python-consul"]
cosmosdbsql = ["pydocumentdb (2.3.2)"]
couchbase = ["couchbase (>=3.0.0)"]
couchdb = ["pycouchdb"]
django = ["Django (>=1.11)"]
dynamodb = ["boto3 (>=1.9.178)"]
elasticsearch = ["elasticsearch"]
eventlet = ["eventlet (>=0.26.1)"]
gevent = ["gevent (>=1.0.0)"]
librabbitmq = ["librabbitmq (>=1.5.0)"]
lzma = ["backports.lzma"]
memcache = ["pylibmc"]
mongodb = ["pymongo (>=3.3.0)"]
msgpack = ["msgpack"]
pymemcache = ["python-memcached"]
pyro = ["pyro4"]
pytest = ["pytest-celery"]
redis = ["redis (>=3.2.0)"]
s3 = ["boto3 (>=1.9.125)"]
slmq = ["softlayer-messaging (>=1.0.3)"]
solar = ["ephem"]
sqlalchemy = ["sqlalchemy"]
sqs = ["boto3 (>=1.9.125)", "pycurl (7.43.0.5)"]
tblib = ["tblib (>=1.3.0)", "tblib (>=1.5.0)"]
yaml = ["PyYAML (>=3.10)"]
zookeeper = ["kazoo (>=1.3.1)"]
zstd = ["zstandard"]

[[package]]
category = "main"
description = "Python package for providing Mozilla's CA Bundle."
name = "certifi"
optional = false


@@ 106,6 177,52 @@ python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*"
version = "4.0.0"

[[package]]
category = "main"
description = "Composable command line interface toolkit"
name = "click"
optional = false
python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*"
version = "7.1.2"

[[package]]
category = "main"
description = "Enable git-like did-you-mean feature in click."
name = "click-didyoumean"
optional = false
python-versions = "*"
version = "0.0.3"

[package.dependencies]
click = "*"

[[package]]
category = "main"
description = "An extension module for click to enable registering CLI commands via setuptools entry-points."
name = "click-plugins"
optional = false
python-versions = "*"
version = "1.1.1"

[package.dependencies]
click = ">=4.0"

[package.extras]
dev = ["pytest (>=3.6)", "pytest-cov", "wheel", "coveralls"]

[[package]]
category = "main"
description = "REPL plugin for Click"
name = "click-repl"
optional = false
python-versions = "*"
version = "0.1.6"

[package.dependencies]
click = "*"
prompt-toolkit = "*"
six = "*"

[[package]]
category = "dev"
description = "Cross-platform colored terminal text."
marker = "python_version >= \"3.3\" and sys_platform == \"win32\" or sys_platform == \"win32\""


@@ 460,6 577,33 @@ version = "1.3.1"

[[package]]
category = "main"
description = "Messaging library for Python."
name = "kombu"
optional = false
python-versions = ">=3.6"
version = "5.0.2"

[package.dependencies]
amqp = ">=5.0.0,<6.0.0"

[package.extras]
azureservicebus = ["azure-servicebus (>=0.21.1)"]
azurestoragequeues = ["azure-storage-queue"]
consul = ["python-consul (>=0.6.0)"]
librabbitmq = ["librabbitmq (>=1.5.2)"]
mongodb = ["pymongo (>=3.3.0)"]
msgpack = ["msgpack"]
pyro = ["pyro4"]
qpid = ["qpid-python (>=0.26)", "qpid-tools (>=0.26)"]
redis = ["redis (>=3.3.11)"]
slmq = ["softlayer-messaging (>=1.0.3)"]
sqlalchemy = ["sqlalchemy"]
sqs = ["boto3 (>=1.4.4)", "pycurl (7.43.0.2)"]
yaml = ["PyYAML (>=3.10)"]
zookeeper = ["kazoo (>=1.3.1)"]

[[package]]
category = "main"
description = "Safely add untrusted strings to HTML/XML markup."
name = "markupsafe"
optional = false


@@ 662,7 806,6 @@ version = "1.4.3"
[[package]]
category = "dev"
description = "A Python Parser"
marker = "python_version >= \"3.3\""
name = "parso"
optional = false
python-versions = ">=3.6"


@@ 712,7 855,7 @@ version = "0.9.0"
twisted = ["twisted"]

[[package]]
category = "dev"
category = "main"
description = "Library for building powerful interactive command lines in Python"
name = "prompt-toolkit"
optional = false


@@ 733,7 876,7 @@ version = "2.8.6"
[[package]]
category = "dev"
description = "Run a subprocess in a pseudo terminal"
marker = "python_version >= \"3.3\" and sys_platform != \"win32\" or os_name != \"nt\""
marker = "python_version >= \"3.3\" and sys_platform != \"win32\" or sys_platform != \"win32\" or os_name != \"nt\" or python_version >= \"3.3\" and sys_platform != \"win32\" and (python_version >= \"3.3\" and sys_platform != \"win32\" or sys_platform != \"win32\")"
name = "ptyprocess"
optional = false
python-versions = "*"


@@ 861,6 1004,17 @@ version = "1.9.0"

[[package]]
category = "main"
description = "Python client for Redis key-value store"
name = "redis"
optional = false
python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*"
version = "3.5.3"

[package.extras]
hiredis = ["hiredis (>=0.1.3)"]

[[package]]
category = "main"
description = "Python HTTP for Humans."
name = "requests"
optional = false


@@ 987,7 1141,15 @@ secure = ["pyOpenSSL (>=0.14)", "cryptography (>=1.3.4)", "idna (>=2.0.0)", "cer
socks = ["PySocks (>=1.5.6,<1.5.7 || >1.5.7,<2.0)"]

[[package]]
category = "dev"
category = "main"
description = "Promises, promises, promises."
name = "vine"
optional = false
python-versions = ">=3.6"
version = "5.0.0"

[[package]]
category = "main"
description = "Measures the displayed width of unicode strings in a terminal"
name = "wcwidth"
optional = false


@@ 1014,10 1176,14 @@ version = "3.5.1"
notebook = ">=4.4.1"

[metadata]
content-hash = "7a9246ed4704c6849ade8cb9ea70d09690847960a115455585ea3dfef5176de6"
content-hash = "fbca03429ca1ee0ca62fd25a0015833d6642a70ab67583bb1bd4297f92c8e438"
python-versions = ">=3.9"

[metadata.files]
amqp = [
    {file = "amqp-5.0.5-py3-none-any.whl", hash = "sha256:1e759a7f202d910939de6eca45c23a107f6b71111f41d1282c648e9ac3d21901"},
    {file = "amqp-5.0.5.tar.gz", hash = "sha256:affdd263d8b8eb3c98170b78bf83867cdb6a14901d586e00ddb65bfe2f0c4e60"},
]
appnope = [
    {file = "appnope-0.1.2-py2.py3-none-any.whl", hash = "sha256:93aa393e9d6c54c5cd570ccadd8edad61ea0c4b9ea7a01409020c9aa019eb442"},
    {file = "appnope-0.1.2.tar.gz", hash = "sha256:dd83cd4b5b460958838f6eb3000c660b1f9caf2a5b1de4264e941512f603258a"},


@@ 1058,10 1224,18 @@ backcall = [
    {file = "backcall-0.2.0-py2.py3-none-any.whl", hash = "sha256:fbbce6a29f263178a1f7915c1940bde0ec2b2a967566fe1c65c1dfb7422bd255"},
    {file = "backcall-0.2.0.tar.gz", hash = "sha256:5cbdbf27be5e7cfadb448baf0aa95508f91f2bbc6c6437cd9cd06e2a4c215e1e"},
]
billiard = [
    {file = "billiard-3.6.3.0-py3-none-any.whl", hash = "sha256:bff575450859a6e0fbc2f9877d9b715b0bbc07c3565bb7ed2280526a0cdf5ede"},
    {file = "billiard-3.6.3.0.tar.gz", hash = "sha256:d91725ce6425f33a97dfa72fb6bfef0e47d4652acd98a032bd1a7fbf06d5fa6a"},
]
bleach = [
    {file = "bleach-3.3.0-py2.py3-none-any.whl", hash = "sha256:6123ddc1052673e52bab52cdc955bcb57a015264a1c57d37bea2f6b817af0125"},
    {file = "bleach-3.3.0.tar.gz", hash = "sha256:98b3170739e5e83dd9dc19633f074727ad848cbedb6026708c8ac2d3b697a433"},
]
celery = [
    {file = "celery-5.0.5-py3-none-any.whl", hash = "sha256:5e8d364e058554e83bbb116e8377d90c79be254785f357cb2cec026e79febe13"},
    {file = "celery-5.0.5.tar.gz", hash = "sha256:f4efebe6f8629b0da2b8e529424de376494f5b7a743c321c8a2ddc2b1414921c"},
]
certifi = [
    {file = "certifi-2020.12.5-py2.py3-none-any.whl", hash = "sha256:719a74fb9e33b9bd44cc7f3a8d94bc35e4049deebe19ba7d8e108280cfd59830"},
    {file = "certifi-2020.12.5.tar.gz", hash = "sha256:1a4995114262bffbc2413b159f2a1a480c969de6e6eb13ee966d470af86af59c"},


@@ 1109,6 1283,21 @@ chardet = [
    {file = "chardet-4.0.0-py2.py3-none-any.whl", hash = "sha256:f864054d66fd9118f2e67044ac8981a54775ec5b67aed0441892edb553d21da5"},
    {file = "chardet-4.0.0.tar.gz", hash = "sha256:0d6f53a15db4120f2b08c94f11e7d93d2c911ee118b6b30a04ec3ee8310179fa"},
]
click = [
    {file = "click-7.1.2-py2.py3-none-any.whl", hash = "sha256:dacca89f4bfadd5de3d7489b7c8a566eee0d3676333fbb50030263894c38c0dc"},
    {file = "click-7.1.2.tar.gz", hash = "sha256:d2b5255c7c6349bc1bd1e59e08cd12acbbd63ce649f2588755783aa94dfb6b1a"},
]
click-didyoumean = [
    {file = "click-didyoumean-0.0.3.tar.gz", hash = "sha256:112229485c9704ff51362fe34b2d4f0b12fc71cc20f6d2b3afabed4b8bfa6aeb"},
]
click-plugins = [
    {file = "click-plugins-1.1.1.tar.gz", hash = "sha256:46ab999744a9d831159c3411bb0c79346d94a444df9a3a3742e9ed63645f264b"},
    {file = "click_plugins-1.1.1-py2.py3-none-any.whl", hash = "sha256:5d262006d3222f5057fd81e1623d4443e41dcda5dc815c06b442aa3c02889fc8"},
]
click-repl = [
    {file = "click-repl-0.1.6.tar.gz", hash = "sha256:b9f29d52abc4d6059f8e276132a111ab8d94980afe6a5432b9d996544afa95d5"},
    {file = "click_repl-0.1.6-py3-none-any.whl", hash = "sha256:9c4c3d022789cae912aad8a3f5e1d7c2cdd016ee1225b5212ad3e8691563cda5"},
]
colorama = [
    {file = "colorama-0.4.4-py2.py3-none-any.whl", hash = "sha256:9f47eda37229f68eee03b24b9748937c7dc3868f906e8ba69fbcbdd3bc5dc3e2"},
    {file = "colorama-0.4.4.tar.gz", hash = "sha256:5941b2b48a20143d2267e95b1c2a7603ce057ee39fd88e7329b0c292aa16869b"},


@@ 1246,6 1435,10 @@ kiwisolver = [
    {file = "kiwisolver-1.3.1-pp36-pypy36_pp73-win32.whl", hash = "sha256:401a2e9afa8588589775fe34fc22d918ae839aaaf0c0e96441c0fdbce6d8ebe6"},
    {file = "kiwisolver-1.3.1.tar.gz", hash = "sha256:950a199911a8d94683a6b10321f9345d5a3a8433ec58b217ace979e18f16e248"},
]
kombu = [
    {file = "kombu-5.0.2-py2.py3-none-any.whl", hash = "sha256:6dc509178ac4269b0e66ab4881f70a2035c33d3a622e20585f965986a5182006"},
    {file = "kombu-5.0.2.tar.gz", hash = "sha256:f4965fba0a4718d47d470beeb5d6446e3357a62402b16c510b6a2f251e05ac3c"},
]
markupsafe = [
    {file = "MarkupSafe-1.1.1-cp27-cp27m-macosx_10_6_intel.whl", hash = "sha256:09027a7803a62ca78792ad89403b1b7a73a01c8cb65909cd876f7fcebd79b161"},
    {file = "MarkupSafe-1.1.1-cp27-cp27m-manylinux1_i686.whl", hash = "sha256:e249096428b3ae81b08327a63a485ad0878de3fb939049038579ac0ef61e17e7"},


@@ 1584,6 1777,10 @@ qtpy = [
    {file = "QtPy-1.9.0-py2.py3-none-any.whl", hash = "sha256:fa0b8363b363e89b2a6f49eddc162a04c0699ae95e109a6be3bb145a913190ea"},
    {file = "QtPy-1.9.0.tar.gz", hash = "sha256:2db72c44b55d0fe1407be8fba35c838ad0d6d3bb81f23007886dc1fc0f459c8d"},
]
redis = [
    {file = "redis-3.5.3-py2.py3-none-any.whl", hash = "sha256:432b788c4530cfe16d8d943a09d40ca6c16149727e4afe8c2c9d5580c59d9f24"},
    {file = "redis-3.5.3.tar.gz", hash = "sha256:0e7e0cfca8660dea8b7d5cd8c4f6c5e29e11f31158c0b0ae91a397f00e5a05a2"},
]
requests = [
    {file = "requests-2.25.1-py2.py3-none-any.whl", hash = "sha256:c210084e36a42ae6b9219e00e48287def368a26d03a048ddad7bfee44f75871e"},
    {file = "requests-2.25.1.tar.gz", hash = "sha256:27973dd4a904a4f13b263a19c866c13b92a39ed1c964655f025f3f8d3d75b804"},


@@ 1717,6 1914,10 @@ urllib3 = [
    {file = "urllib3-1.22-py2.py3-none-any.whl", hash = "sha256:06330f386d6e4b195fbfc736b297f58c5a892e4440e54d294d7004e3a9bbea1b"},
    {file = "urllib3-1.22.tar.gz", hash = "sha256:cc44da8e1145637334317feebd728bd869a35285b93cbb4cca2577da7e62db4f"},
]
vine = [
    {file = "vine-5.0.0-py2.py3-none-any.whl", hash = "sha256:4c9dceab6f76ed92105027c49c823800dd33cacce13bdedc5b914e3514b7fb30"},
    {file = "vine-5.0.0.tar.gz", hash = "sha256:7d3b1624a953da82ef63462013bbd271d3eb75751489f9807598e8f340bd637e"},
]
wcwidth = [
    {file = "wcwidth-0.2.5-py2.py3-none-any.whl", hash = "sha256:beb4802a9cebb9144e99086eff703a642a13d6a0052920003a230f3294bbe784"},
    {file = "wcwidth-0.2.5.tar.gz", hash = "sha256:c4d647b99872929fdb7bdcaa4fbe7f01413ed3d98077df798530e5b04f116c83"},

M pyproject.toml => pyproject.toml +2 -0
@@ 17,6 17,8 @@ django-oauth-toolkit = "^1.3.3"
gpxpy = "^1.4.2"
matplotlib = "^3.3.3"
scipy = "^1.6.1"
celery = "^5.0.5"
redis = "^3.5.3"

[tool.poetry.dev-dependencies]
mypy = "^0.812"

M trakka/__init__.py => trakka/__init__.py +5 -0
@@ 0,0 1,5 @@
# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app

__all__ = ('celery_app',)

A trakka/celery.py => trakka/celery.py +14 -0
@@ 0,0 1,14 @@
import os

from celery import Celery

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'trakka.settings')

app = Celery('trakka')

# - namespace='CELERY' means all celery-related configuration keys
#   should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')

# Load task modules from all registered Django app configs.
app.autodiscover_tasks()

M trakka/settings.py => trakka/settings.py +4 -0
@@ 172,3 172,7 @@ LOGGING = {
        'level': 'DEBUG',
    },
}

# Celery task queue

CELERY_BROKER_URL = 'redis://localhost:6379/0'