M CHANGES.rst => CHANGES.rst +2 -0
@@ 11,6 11,8 @@ Unreleased
- Turn namedtuples ``Message`` and ``PriorityValue`` into dataclasses.
+- Added type hints.
+
Version 0.3
-----------
M src/syslogmp/facility.py => src/syslogmp/facility.py +3 -2
@@ 12,6 12,7 @@ Labels and numerical codes as seen in `RFC 3164`_.
"""
from enum import Enum, unique
+from typing import Dict
@unique
@@ 42,11 43,11 @@ class Facility(Enum):
local7 = 23
@property
- def description(self):
+ def description(self) -> str:
return DESCRIPTIONS[self.value]
-DESCRIPTIONS = {
+DESCRIPTIONS: Dict[int, str] = {
0: 'kernel messages',
1: 'user-level messages',
2: 'mail system',
M src/syslogmp/parser.py => src/syslogmp/parser.py +12 -10
@@ 24,8 24,10 @@ latter.
:License: MIT, see LICENSE for details.
"""
+from __future__ import annotations
from dataclasses import dataclass
from datetime import datetime
+from typing import Tuple
from .facility import Facility
from .message import Message
@@ 33,10 35,10 @@ from .severity import Severity
from .stream import Stream
-MAX_MESSAGE_LENGTH = 1024
+MAX_MESSAGE_LENGTH: int = 1024
-def parse(data):
+def parse(data: bytes) -> Message:
"""Parse a syslog message."""
parser = _Parser(data)
@@ 56,7 58,7 @@ def parse(data):
class _Parser:
"""Parse a syslog message."""
- def __init__(self, data):
+ def __init__(self, data: bytes) -> None:
ensure(isinstance(data, bytes), 'Data must be a byte string.')
ensure(
@@ 66,12 68,12 @@ class _Parser:
self.stream = Stream(data)
- def _parse_pri_part(self):
+ def _parse_pri_part(self) -> PriorityValue:
"""Extract facility and severity from the PRI part."""
pri_part = self.stream.read_until_inclusive(b'>')
return _create_priority_value_from_pri_part(pri_part)
- def _parse_header_part(self):
+ def _parse_header_part(self) -> Tuple[datetime, str]:
"""Extract timestamp and hostname from the HEADER part."""
timestamp = self._parse_timestamp()
@@ 85,7 87,7 @@ class _Parser:
return timestamp, hostname
- def _parse_timestamp(self):
+ def _parse_timestamp(self) -> datetime:
"""Parse timestamp into a `datetime` instance."""
timestamp_bytes = self.stream.read(15)
timestamp_ascii = timestamp_bytes.decode('ascii')
@@ 106,11 108,11 @@ class _Parser:
return timestamp
- def _parse_hostname(self):
+ def _parse_hostname(self) -> str:
hostname_bytes = self.stream.read_until(b' ')
return hostname_bytes.decode('ascii')
- def _parse_msg_part(self):
+ def _parse_msg_part(self) -> bytes:
return self.stream.read_remainder()
@@ 120,7 122,7 @@ class PriorityValue:
severity: Severity
-def _create_priority_value_from_pri_part(pri_part):
+def _create_priority_value_from_pri_part(pri_part: bytes) -> PriorityValue:
"""Create priority value from PRI part."""
ensure(len(pri_part) in {3, 4, 5}, 'PRI part must have 3, 4, or 5 bytes.')
@@ 151,7 153,7 @@ def _create_priority_value_from_pri_part(pri_part):
return PriorityValue(facility=facility, severity=severity)
-def ensure(expression, error_message):
+def ensure(expression: bool, error_message: str) -> None:
"""Raise exception if the expression evaluates to `False`."""
if not expression:
raise MessageFormatError(error_message)
M src/syslogmp/stream.py => src/syslogmp/stream.py +10 -9
@@ 10,17 10,18 @@ Treat binary data as a stream and provide methods to read from it.
"""
from itertools import islice, takewhile
+from typing import Callable, Iterator
class Stream:
- def __init__(self, data):
+ def __init__(self, data: bytes) -> None:
self.iterator = iter(data)
- def read(self, n):
+ def read(self, n: int) -> bytes:
"""Return the next `n` bytes."""
return bytes(islice(self.iterator, n))
- def read_until(self, stop_byte):
+ def read_until(self, stop_byte: bytes) -> bytes:
"""Return bytes until the first occurrence of the stop byte.
The stop byte is not returned, but silently dropped from the
@@ 29,12 30,12 @@ class Stream:
predicate = create_match_predicate(stop_byte)
return bytes(takewhile(predicate, self.iterator))
- def read_until_inclusive(self, stop_byte):
+ def read_until_inclusive(self, stop_byte: bytes) -> bytes:
"""Return bytes until, and including, the first occurrence of
the stop byte.
"""
- def inner():
+ def inner() -> Iterator[int]:
predicate = create_match_predicate(stop_byte)
for code_point in self.iterator:
yield code_point
@@ 43,11 44,11 @@ class Stream:
return bytes(inner())
- def read_remainder(self):
+ def read_remainder(self) -> bytes:
"""Return all remaining bytes."""
return bytes(self.iterator)
-def create_match_predicate(value_to_match):
- value_to_match = ord(value_to_match)
- return lambda cp: cp != value_to_match
+def create_match_predicate(value_to_match: bytes) -> Callable[[int], bool]:
+ code_point = ord(value_to_match)
+ return lambda cp: cp != code_point