~kf5jwc/sms-printer

4a1d852f9a38f157710d4950364292dee33c144f — Kyle Jones 2 years ago 88d91e1
Refactor for environment configuration
M sms_printer/application.py => sms_printer/application.py +1 -2
@@ 3,8 3,7 @@ import logging
from flask import Flask, Response, json, request

from .sms import NoMatchingSchema, parse_sms
from .utils import log_request, allowed_sender, log_request, print_message

from .utils import allowed_sender, log_request, print_message

APP = Flask(__name__)
logging.basicConfig(level=logging.DEBUG)

A sms_printer/config.py => sms_printer/config.py +80 -0
@@ 0,0 1,80 @@
from math import floor
from os import getenv
from typing import List

from xdg import BaseDirectory

def first_defined_argument(*args):
    return next((item for item in args if item is not None), None)
first = first_defined_argument

class Environment(object):
    """
    This is where all of the configuration options (should) live.

    Please review the class variables for available configuration
    options through their respective environment variables.
    """

    ARCHIVE = getenv('ARCHIVE_DIR', BaseDirectory.save_data_path(__package__))
    ARCHIVE_SUFFIX_LENGTH = int(getenv('SUFFIX_LENGTH', 6))
    ALLOWED_SENDERS = [
        num for num in getenv('ALLOWED_SENDERS', '').split(',') if num
    ]

    MESSAGE_FORMAT = getenv('MESSAGE_FORMAT', '{message}')

    PRINT_COMMAND = getenv('PRINT_COMMAND', 'lp')
    PRINTER_NAME = getenv('PRINTER_NAME', None)
    PRINT_JOB_TIMEOUT = int(getenv('PRINT_JOB_TIMEOUT', 5)) # seconds

    CPI = int(getenv('CPI', 10))
    LPI = int(getenv('LPI', 6))
    TEXT_SCALE = float(getenv('TEXT_SCALE', 1.0))

    # Set in points; each point is 1/72 inch or 0.35mm.
    MARGINS = getenv('MARGINS', None)
    MARGIN_SIDES = first(getenv('MARGIN_SIDES', None), MARGINS)
    MARGIN_TOP = first(getenv('MARGIN_TOP', None), MARGINS)
    MARGIN_BOTTOM = first(getenv('MARGIN_BOTTOM', None), MARGINS)
    MARGIN_LEFT = first(getenv('MARGIN_LEFT', None), MARGIN_SIDES, MARGINS)
    MARGIN_RIGHT = first(getenv('MARGIN_RIGHT', None), MARGIN_SIDES, MARGINS)

    def print_command(self, file_to_print=None) -> List[str]:
        command = [self.PRINT_COMMAND]

        if self.PRINTER_NAME is not None:
            command.append(
                "-d {}".format(self.PRINTER_NAME))

        if self.TEXT_SCALE is not 1.0:
            # Scale horizontally
            cpi = floor(self.CPI*(1/self.TEXT_SCALE))
            command.append(
                "-o cpi={}".format(cpi))

            # Scale vertically
            lpi = floor(self.LPI*(1/self.TEXT_SCALE))
            command.append(
                "-o lpi={}".format(lpi))

        if self.MARGIN_TOP is not None:
            command.append(
                "-o page-top={}".format(self.MARGIN_TOP))

        if self.MARGIN_BOTTOM is not None:
            command.append(
                "-o page-bottom={}".format(self.MARGIN_BOTTOM))

        if self.MARGIN_LEFT is not None:
            command.append(
                "-o page-left={}".format(self.MARGIN_LEFT))

        if self.MARGIN_RIGHT is not None:
            command.append(
                "-o page-right={}".format(self.MARGIN_RIGHT))

        if file_to_print is not None:
            command.append(file_to_print)

        return command

M sms_printer/sms/__init__.py => sms_printer/sms/__init__.py +2 -2
@@ 3,12 3,12 @@ from jsonschema.exceptions import ValidationError
from flask import json

from .schemas import SCHEMAS
from .schemas.types import sms_base
from .schemas.types import Parser_Base

class NoMatchingSchema(Exception):
    pass

def parse_sms(sms_input: str) -> sms_base:
def parse_sms(sms_input: str) -> Parser_Base:
    for schema, sms_parser in SCHEMAS.items():
        try:
            validate(sms_input, json.loads(schema))

M sms_printer/sms/schemas/__init__.py => sms_printer/sms/schemas/__init__.py +4 -4
@@ 2,13 2,13 @@
This module provides the filtering and parsing. They currently only
provide a list of messages with the sender number and message text.

Each module has a SCHEMA and a Parser(sms_base)
Each module has a `SCHEMA` and a `Parser(Parser_Base)`

The SCHEMA is matched against incoming messages to determine the
The `SCHEMA` is matched against incoming messages to determine the
correct parser to use for a message.

Adding more is simple enough. Define a new parser and its schema in
its own module next to the others, and add it to SCHEMAS here.
its own module next to the others, and add it to `SCHEMAS` here.

I use `genson` to generate schemas based on an example message.
"""


@@ 22,7 22,7 @@ from . import bandwidth_v1
from . import bandwidth_v2


SCHEMAS: Dict[str, Type[types.sms_base]] = {}
SCHEMAS = {} # type: Dict[str, Type[types.Parser_Base]]

SCHEMAS[bandwidth_v1.SCHEMA] = bandwidth_v1.Parser
SCHEMAS[bandwidth_v2.SCHEMA] = bandwidth_v2.Parser

M sms_printer/sms/schemas/bandwidth_v1.py => sms_printer/sms/schemas/bandwidth_v1.py +4 -5
@@ 1,12 1,11 @@
from typing import Dict, List
from typing import Dict

from .types import sms, sms_base
from .types import Parser_Base, sms


class Parser(sms_base):
    messages: List[sms] = []

class Parser(Parser_Base):
    def __init__(self, sms_input: Dict) -> None:
        super().__init__()
        msg = sms(
            sms_input['from'],
            sms_input['text'])

M sms_printer/sms/schemas/bandwidth_v2.py => sms_printer/sms/schemas/bandwidth_v2.py +4 -5
@@ 1,12 1,11 @@
from typing import Dict, List
from typing import List

from .types import sms, sms_base
from .types import Parser_Base, sms


class Parser(sms_base):
    messages: List[sms] = []

class Parser(Parser_Base):
    def __init__(self, sms_json: List) -> None:
        super().__init__()
        for msg_input in sms_json:
            msg = sms(
                msg_input['message']['from'],

M sms_printer/sms/schemas/types.py => sms_printer/sms/schemas/types.py +6 -7
@@ 2,20 2,19 @@ from typing import List


class sms(object):
    sender: str
    text: str
    sender = "" # type: str
    text = "" # type: str

    def __init__(self, sender: str, text: str) -> None:
        self.sender = sender
        self.text = text


class sms_base(object):
    schema: str
    messages: List[sms]
class Parser_Base(object):
    messages = [] # type: List[sms]

    def __init__(self, sms_input):
        pass
    def __init__(self):
        self.messages = []

    def __iter__(self):
        return self

M sms_printer/utils.py => sms_printer/utils.py +37 -29
@@ 1,50 1,58 @@
"""
This file feels like it has way too much configuration to make sense
in its current state.
"""


import os
from math import floor
from os import path
from random import choice as rand_choice
from string import ascii_uppercase, digits
from subprocess import CalledProcessError, run
from tempfile import NamedTemporaryFile
from time import strftime

from flask import json
from xdg import BaseDirectory

ARCHIVE = os.getenv('ARCHIVE_DIR', BaseDirectory.save_data_path(__name__))
ALLOWED_SENDERS = [
    num for num in os.getenv('ALLOWED_SENDERS', '').split(',') if num
]
PRINTER_FORMAT = "\n\n" + "      {message}"*2
TEXT_SCALE = 0.6
CPI_DEFAULT = 10
LPI_DEFAULT = 6
TOP_MARGIN = None
SIDE_MARGIN = None
from .config import Environment

ENV = Environment()

def suffix(length=ENV.ARCHIVE_SUFFIX_LENGTH, chars=ascii_uppercase+digits):
    "Random suffix string"
    return ''.join(rand_choice(chars) for x in range(length))

def allowed_sender(sender: str) -> bool:
    return (not bool(ALLOWED_SENDERS) or sender in ALLOWED_SENDERS)
    """
    Checks the sender against our environment $ALLOWED_SENDERS

    ALLOWED_SENDERS is a comma-delimited list, with numbers strictly
    in the format copied from your provider. +12223334444 (string)
    """
    return (not bool(ENV.ALLOWED_SENDERS)
            or sender in ENV.ALLOWED_SENDERS)

def log_request(data: str):
    timestamp = strftime("%Y%m%d-%H%M%S")
    with open(os.path.join(ARCHIVE, timestamp), 'w+') as archive_file:
    """
    Log this request to the data path for this application.

    The format for these logs is the current timestamp to the second
    with a random suffix.
    """
    msg_id = "{}-{}".format(strftime("%Y%m%d-%H%M%S"), suffix())
    with open(path.join(ENV.ARCHIVE, msg_id), 'w+') as archive_file:
        json.dump(data, archive_file)

def print_message(message: str) -> bool:
    """
    Sends a plaintext message to CUPS for printing.

    The message format can be customized by setting the environment's
    MESSAGE_FORMAT to a string, where `{message}` is replaced with
    the message text.
    """
    with NamedTemporaryFile(mode='w+') as tmp_file:
        print(PRINTER_FORMAT.format(message=message), file=tmp_file)
        tmp_file.write(ENV.MESSAGE_FORMAT.format(message=message))
        tmp_file.flush()

        command = [
            "lp",
            "-o cpi={}".format(floor(CPI_DEFAULT*(1/TEXT_SCALE))),
            "-o lpi={}".format(floor(LPI_DEFAULT*(1/TEXT_SCALE))),
            tmp_file.name]
        try:
            run(command, check=True, timeout=10)
            run(
                ENV.print_command(tmp_file.name),
                check=True,
                timeout=ENV.PRINT_JOB_TIMEOUT)

        except CalledProcessError:
            return False