~kf5jwc/sms-printer

ee6a28be8bf9f1ece16319f56f50fb00f2dc2207 — Kyle Jones 1 year, 8 months ago 001bc47
Enable namespace for parsers!
6 files changed, 111 insertions(+), 82 deletions(-)

M sms_printer/__init__.py
M sms_printer/config.py
A sms_printer/load.py
A sms_printer/parser_errors.py
A sms_printer/printer.py
D sms_printer/utils.py
M sms_printer/__init__.py => sms_printer/__init__.py +8 -5
@@ 1,6 1,9 @@
"""
This prints SMS messages (on a real printer) using CUPS.
"""An SMS printer!"""

This is a web server which accepts JSON-formatted SMS messages
at /sms-printer from brokers (such as bandwidth.com) for printing.
"""
from .load import load_parsers as __load_parsers
from .config import Environment as __Environment
from .printer import parse_messages, print_message
from . import parser_errors

PARSERS = __load_parsers()
ENV = __Environment()

M sms_printer/config.py => sms_printer/config.py +7 -13
@@ 2,14 2,12 @@ from math import floor
from os import getenv
from typing import List

from xdg import BaseDirectory


def first_defined_argument(*args):
def _first_defined_argument(*args):
    return next((item for item in args if item is not None), None)


first = first_defined_argument
_first = _first_defined_argument


class Environment(object):


@@ 20,10 18,6 @@ class Environment(object):
    options through their respective environment variables.
    """

    ARCHIVE = getenv("ARCHIVE_DIR", BaseDirectory.save_data_path(__package__))
    ARCHIVE_SUFFIX_LENGTH = int(getenv("SUFFIX_LENGTH", "6"))
    ALLOWED_SENDERS = [num for num in getenv("ALLOWED_SENDERS", "").split(",") if num]

    MESSAGE_FORMAT = getenv("MESSAGE_FORMAT", "{message}")

    PRINT_COMMAND = getenv("PRINT_COMMAND", "lp")


@@ 36,11 30,11 @@ class Environment(object):

    # Set in points; each point is 1/72 inch or 0.35mm.
    MARGINS = getenv("MARGINS", None)
    MARGIN_SIDES = first(getenv("MARGIN_SIDES", None), MARGINS)
    MARGIN_TOP = first(getenv("MARGIN_TOP", None), MARGINS)
    MARGIN_BOTTOM = first(getenv("MARGIN_BOTTOM", None), MARGINS)
    MARGIN_LEFT = first(getenv("MARGIN_LEFT", None), MARGIN_SIDES, MARGINS)
    MARGIN_RIGHT = first(getenv("MARGIN_RIGHT", None), MARGIN_SIDES, MARGINS)
    MARGIN_SIDES = _first(getenv("MARGIN_SIDES", None), MARGINS)
    MARGIN_TOP = _first(getenv("MARGIN_TOP", None), MARGINS)
    MARGIN_BOTTOM = _first(getenv("MARGIN_BOTTOM", None), MARGINS)
    MARGIN_LEFT = _first(getenv("MARGIN_LEFT", None), MARGIN_SIDES, MARGINS)
    MARGIN_RIGHT = _first(getenv("MARGIN_RIGHT", None), MARGIN_SIDES, MARGINS)

    def print_command(self, file_to_print=None) -> List[str]:
        command = [self.PRINT_COMMAND]

A sms_printer/load.py => sms_printer/load.py +34 -0
@@ 0,0 1,34 @@
import importlib
import pkgutil

from loguru import logger
import sms_printer.parsers as sms_parsers


@logger.catch
def load_parsers(namespace=sms_parsers):

    packages = tuple(pkgutil.iter_modules(namespace.__path__, namespace.__name__ + "."))
    logger.info("Parsers found: {}", len(packages))

    parsers = [importlib.import_module(name) for _, name, _ in packages]
    parsers = tuple(filter(__contains_parser_interface, parsers))

    if len(parsers):
        logger.info("Parsers loaded: {}", len(parsers))
    else:
        logger.warning("No parsers were loaded!")

    return parsers


def __contains_parser_interface(package):
    interface = [callable(getattr(package, func)) for func in ("validate", "parse")]

    if all(interface):
        logger.info("Loaded parser: {}", package.__name__)
    else:
        logger.warning("Could not load parser: {}", package.__name__)
        logger.warning("Parser did not implement the correct interface!")

    return all(interface)

A sms_printer/parser_errors.py => sms_printer/parser_errors.py +6 -0
@@ 0,0 1,6 @@
class NoParserForMessage(Exception):
    pass


class NoValidParserForMessage(NoParserForMessage):
    pass

A sms_printer/printer.py => sms_printer/printer.py +56 -0
@@ 0,0 1,56 @@
from subprocess import CalledProcessError, run
from tempfile import NamedTemporaryFile
from typing import List
from loguru import logger
from . import ENV, PARSERS
from .parser_errors import NoParserForMessage, NoValidParserForMessage


def parse_messages(data: str) -> List:
    logger.trace("Parsing: {}", data)

    parsers = [parser for parser in PARSERS if parser.validate(data)]
    logger.trace("Parsers for this mesage:")
    [logger.trace("{}", repr(parser)) for parser in parsers]
    logger.info("Parsers found for this message: {}", len(parsers))

    if not len(parsers):
        raise NoValidParserForMessage

    for parser in parsers:
        try:
            messages = parser.parse(data)
        except Exception as e:
            logger.trace("Parser encountered error while parsing message: {}", parser)
            logger.trace("Exception: {}", e)
        else:
            return messages

    raise NoParserForMessage


def print_message(message: str) -> bool:
    """
    Sends a plaintext message to CUPS for printing.

    The message format can be customized by setting the environment's
    MESSAGE_FORMAT to a string, where `{message}` is replaced with
    the message text.
    """
    logger.info("Printing message!")
    logger.debug("{}", message)

    with NamedTemporaryFile(mode="w+") as tmp_file:
        logger.debug("Print file: {}", tmp_file.name)
        tmp_file.write(ENV.MESSAGE_FORMAT.format(message=message))
        tmp_file.flush()

        try:
            print_cmd = ENV.print_command(tmp_file.name)
            logger.debug("Print command: {}", print_cmd)
            run(print_cmd, check=True, timeout=ENV.PRINT_JOB_TIMEOUT)

        except CalledProcessError:
            return False

    return True

D sms_printer/utils.py => sms_printer/utils.py +0 -64
@@ 1,64 0,0 @@
from os import path
from random import choice as rand_choice
from string import ascii_uppercase, digits
from subprocess import CalledProcessError, run
from tempfile import NamedTemporaryFile
from time import strftime

from flask import json

from .config import Environment

ENV = Environment()


def suffix(length=ENV.ARCHIVE_SUFFIX_LENGTH, chars=ascii_uppercase + digits) -> str:
    "Random suffix string"
    return "".join(rand_choice(chars) for x in range(length))


def allowed_sender(sender: str) -> bool:
    """
    Checks the sender against our environment $ALLOWED_SENDERS

    ALLOWED_SENDERS is a comma-delimited list, with numbers strictly
    in the format copied from your provider. +12223334444 (string)
    """
    return not bool(ENV.ALLOWED_SENDERS) or sender in ENV.ALLOWED_SENDERS


def log_request(data: str):
    """
    Log this request to the data path for this application.

    The format for these logs is the current timestamp to the second
    with a random suffix.
    """
    msg_id = "{}-{}".format(strftime("%Y%m%d-%H%M%S"), suffix())
    with open(path.join(ENV.ARCHIVE, msg_id), "w+") as archive_file:
        json.dump(data, archive_file)


def print_message(message: str) -> bool:
    """
    Sends a plaintext message to CUPS for printing.

    The message format can be customized by setting the environment's
    MESSAGE_FORMAT to a string, where `{message}` is replaced with
    the message text.
    """
    with NamedTemporaryFile(mode="w+") as tmp_file:
        tmp_file.write(ENV.MESSAGE_FORMAT.format(message=message))
        tmp_file.flush()

        try:
            run(
                ENV.print_command(tmp_file.name),
                check=True,
                timeout=ENV.PRINT_JOB_TIMEOUT,
            )

        except CalledProcessError:
            return False

    return True