# -*- coding: utf-8 -*-
"""Lumberjack (logstash) feeder."""
from datetime import datetime
from json import dumps as dump_json
from json import loads as load_json
from os import environ
from zlib import compress, decompress
from twisted.application.service import Service
from twisted.internet.protocol import Protocol, ServerFactory
from twisted.logger import Logger
from furemcape.feeder.tls import build_tls_options
from furemcape.transformer import Transformer
LOG = Logger()
class LumberjackFrameDecoder:
"""Stateless class that decodes bytes into frame objects."""
def decode(self, data, offset=0):
"""Decodes bytes into frame objects.
This is the main entrypoint for decoding lumberjack data.
Arguments:
data (bytes): Bytes to decode.
offset (int): Offset in bytes at which to start.
Returns:
list: List of `LumberjackFrame` objects.
"""
frames = []
length = len(data)
while offset < length:
frame_offset = offset
try:
version_and_type, offset = self.extract(data, offset, 2)
fn_name = "decode_frame_" + version_and_type.decode("ascii")
decode_fn = getattr(self, fn_name)
offset = decode_fn(data, offset, frames)
except LumberjackPartialFrameException:
frames.append(LumberjackPartialFrame(data[frame_offset:]))
return frames
return frames
def decode_frame_1A(self, data, offset, frames, version=1): # noqa: N802 SC200
"""Decodes a version 1 "ack" frame.
Arguments:
data (bytes): Bytes to decode.
offset (int): Offset in bytes at which to start.
frames (list): List into which to append decoded `LumberjackFrame`.
version (int): Version number to set on decoded frame (defaults to 1).
Returns:
int: Updated offset (specified offset + bytes decoded).
"""
sequence, offset = self.decode_unsigned_32(data, offset)
frames.append(LumberjackAckFrame(version, "A", sequence))
return offset
def decode_frame_2A(self, data, offset, frames, version=2): # noqa: N802 SC200
"""Decodes a version 2 "ack" frame.
Arguments:
data (bytes): Bytes to decode.
offset (int): Offset in bytes at which to start.
frames (list): List into which to append decoded `LumberjackFrame`.
version (int): Version number to set on decoded frame (defaults to 2).
Returns:
int: Updated offset (specified offset + bytes decoded).
"""
return self.decode_frame_1A(data, offset, frames, version) # noqa: SC200
def decode_frame_1C(self, data, offset, frames, version=1): # noqa: N802 SC200
"""Decodes a version 1 "compressed" frame.
Appends both the decoded `LumberjackCompressedFrame`, as well as all
frames contained by the frame, to the specified `frames` list.
Arguments:
data (bytes): Bytes to decode.
offset (int): Offset in bytes at which to start.
frames (list): List into which to append decoded `LumberjackFrame`s.
version (int): Version number to set on decoded frame (defaults to 1).
Returns:
int: Updated offset (specified offset + bytes decoded).
"""
length, offset = self.decode_unsigned_32(data, offset)
compressed_data, offset = self.extract(data, offset, length)
uncompressed_data = decompress(compressed_data) if length else b""
frames.append(
LumberjackCompressedFrame(version, "C", compressed_data, uncompressed_data)
)
frames.extend(self.decode(uncompressed_data))
return offset
def decode_frame_2C(self, data, offset, frames, version=2): # noqa: N802 SC200
"""Decodes a version 2 "compressed" frame.
Appends both the decoded `LumberjackCompressedFrame`, as well as all
frames contained by the frame, to the specified `frames` list.
Arguments:
data (bytes): Bytes to decode.
offset (int): Offset in bytes at which to start.
frames (list): List into which to append decoded `LumberjackFrame`s.
version (int): Version number to set on decoded frame (defaults to 2).
Returns:
int: Updated offset (specified offset + bytes decoded).
"""
return self.decode_frame_1C(data, offset, frames, version) # noqa: SC200
def decode_frame_1D(self, data, offset, frames, version=1): # noqa: N802 SC200
"""Decodes a version 1 "data" frame.
Arguments:
data (bytes): Bytes to decode.
offset (int): Offset in bytes at which to start.
frames (list): List into which to append decoded `LumberjackFrame`.
version (int): Version number to set on decoded frame (defaults to 1).
Returns:
int: Updated offset (specified offset + bytes decoded).
"""
sequence, offset = self.decode_unsigned_32(data, offset)
count, offset = self.decode_unsigned_32(data, offset)
content = {}
for _ in range(count):
key, offset = self.decode_string(data, offset)
value, offset = self.decode_string(data, offset)
content[key] = value
frames.append(LumberjackDataFrame(version, "D", sequence, content))
return offset
def decode_frame_2D(self, data, offset, frames, version=2): # noqa: N802 SC200
"""Decodes a version 2 "data" frame.
Arguments:
data (bytes): Bytes to decode.
offset (int): Offset in bytes at which to start.
frames (list): List into which to append decoded `LumberjackFrame`.
version (int): Version number to set on decoded frame (defaults to 2).
Returns:
int: Updated offset (specified offset + bytes decoded).
"""
return self.decode_frame_1D(data, offset, frames, version) # noqa: SC200
def decode_frame_2J(self, data, offset, frames, version=2): # noqa: N802 SC200
"""Decodes a version 2 "json" frame.
Arguments:
data (bytes): Bytes to decode.
offset (int): Offset in bytes at which to start.
frames (list): List into which to append decoded `LumberjackFrame`.
version (int): Version number to set on decoded frame (defaults to 2).
Returns:
int: Updated offset (specified offset + bytes decoded).
"""
sequence, offset = self.decode_unsigned_32(data, offset)
length, offset = self.decode_unsigned_32(data, offset)
json, offset = self.extract(data, offset, length)
content = load_json(json.decode("utf-8")) if length else {}
frames.append(LumberjackJsonFrame(version, "J", sequence, content))
return offset
def decode_frame_1W(self, data, offset, frames, version=1): # noqa: N802 SC200
"""Decodes a version 1 "window size" frame.
Arguments:
data (bytes): Bytes to decode.
offset (int): Offset in bytes at which to start.
frames (list): List into which to append decoded `LumberjackFrame`.
version (int): Version number to set on decoded frame (defaults to 1).
Returns:
int: Updated offset (specified offset + bytes decoded).
"""
size, offset = self.decode_unsigned_32(data, offset)
frames.append(LumberjackWindowSizeFrame(version, "W", size))
return offset
def decode_frame_2W(self, data, offset, frames, version=2): # noqa: N802 SC200
"""Decodes a version 2 "window size" frame.
Arguments:
data (bytes): Bytes to decode.
offset (int): Offset in bytes at which to start.
frames (list): List into which to append decoded `LumberjackFrame`.
version (int): Version number to set on decoded frame (defaults to 2).
Returns:
int: Updated offset (specified offset + bytes decoded).
"""
return self.decode_frame_1W(data, offset, frames, version) # noqa: SC200
def decode_unsigned_32(self, data, offset):
"""Decodes a 32-bit unsigned integer.
Arguments:
data (bytes): Bytes to decode.
offset (int): Offset in bytes at which to start.
Returns:
int: Decoded integer value.
int: Updated offset (specified offset + bytes decoded).
"""
value, offset = self.extract(data, offset, 4)
return int.from_bytes(value, "big"), offset
def decode_string(self, data, offset):
"""Decodes a string.
Arguments:
data (bytes): Bytes to decode.
offset (int): Offset in bytes at which to start.
Returns:
int: Decoded string value.
int: Updated offset (specified offset + bytes decoded).
"""
length, offset = self.decode_unsigned_32(data, offset)
value, offset = self.extract(data, offset, length)
return value.decode("utf-8"), offset
def extract(self, data, offset, length):
"""Extracts the specified number of bytes.
Arguments:
data (bytes): Bytes to extract.
offset (int): Offset in bytes at which to start.
length (int): Number of bytes to extract.
Returns:
bytes: Extracted bytes.
int: Updated offset (specified offset + bytes decoded).
Raises:
LumberjackPartialFrameException: when less bytes available than required
"""
end = offset + length
if end > len(data):
message = "expected {} but {} bytes remaining".format(length, len(data))
raise LumberjackPartialFrameException(message)
return data[offset:end], end
class LumberjackFrame:
"""Base class for lumberjack frames."""
def __init__(self, version, frame_type):
"""Creates the frame.
Arguments:
version (int): Lumberjack protocol version number.
frame_type (str): Frame type indicator (eg "A", "C", "D", etc).
"""
self.version = version
self.frame_type = frame_type
# seems to be a bug with CCE001 here?
def __eq__(self, other): # noqa: CCE001
"""True if other object is frame of same version and type.
Arguments:
other: Other object to compare to this object.
Returns:
bool: True if other object is frame of same version and type.
"""
return (
isinstance(other, self.__class__)
and self.version == other.version
and self.frame_type == other.frame_type
)
def __str__(self):
"""Frame version and type.
Returns:
str: Frame version and type.
"""
return "{}{}".format(self.version, self.frame_type)
def __repr__(self):
"""Class name plus version and type.
Returns:
str: Class name plus version and type.
"""
return "{}({})".format(self.__class__.__name__, str(self))
class LumberjackAckFrame(LumberjackFrame):
"""Represents a lumberjack "ack" frame."""
def __init__(self, version, frame_type, sequence_number):
"""Creates the frame.
Arguments:
version (int): Lumberjack protocol version number.
frame_type (str): Frame type indicator (eg "A").
sequence_number (int): Frame sequence-number acknowledged.
"""
super().__init__(version, frame_type)
self.sequence_number = sequence_number
def get_bytes(self):
"""Generates byte representation for this frame.
Returns:
bytes: Byte representation for this frame.
"""
version_and_type = bytes(str(self.version) + self.frame_type, "ascii")
return version_and_type + self.sequence_number.to_bytes(4, "big")
def __eq__(self, other):
"""True if other frame has same sequence number.
Arguments:
other: Other object to compare to this object.
Returns:
bool: True if other frame has same sequence number.
"""
return super().__eq__(other) and self.sequence_number == other.sequence_number
class LumberjackCompressedFrame(LumberjackFrame):
"""Represents a lumberjack "compressed" frame."""
def __init__(self, version, frame_type, compressed_data=b"", uncompressed_data=b""):
"""Creates the frame.
Arguments:
version (int): Lumberjack protocol version number.
frame_type (str): Frame type indicator (eg "C").
compressed_data (bytes): Content in compressed form (defaults to empty).
uncompressed_data (bytes): Content in uncompressed form (defaults to empty).
"""
super().__init__(version, frame_type)
self.compressed_data = compressed_data
self.uncompressed_data = uncompressed_data
def get_bytes(self):
"""Generates byte representation for this frame.
Returns:
bytes: Byte representation for this frame.
"""
version_and_type = bytes(str(self.version) + self.frame_type, "ascii")
if not self.compressed_data and self.uncompressed_data:
self.compressed_data = compress(self.uncompressed_data)
length = len(self.compressed_data).to_bytes(4, "big")
return version_and_type + length + self.compressed_data
def __eq__(self, other):
"""True if other frame has same content.
Arguments:
other: Other object to compare to this object.
Returns:
bool: True if other frame has same content.
"""
return super().__eq__(other) and self.compressed_data == other.compressed_data
def __repr__(self):
"""Class name plus version, type, and compressed/uncompressed data length.
Returns:
str: Class name plus version, type, and compressed/uncompressed data length.
"""
return "{}({}{}/{})".format(
self.__class__.__name__,
str(self),
len(self.compressed_data),
len(self.uncompressed_data),
)
class LumberjackDataFrame(LumberjackFrame):
"""Represents a lumberjack "data" frame."""
def __init__(self, version, frame_type, sequence_number, content):
"""Creates the frame.
Arguments:
version (int): Lumberjack protocol version number.
frame_type (str): Frame type indicator (eg "D").
sequence_number (int): Frame sequence number.
content (dict): Dict of key, value string pairs.
"""
super().__init__(version, frame_type)
self.sequence_number = sequence_number
self.content = content
def __eq__(self, other):
"""True if other frame has same sequence number and content.
Arguments:
other: Other object to compare to this object.
Returns:
bool: True if other frame has same sequence number and content.
"""
return (
super().__eq__(other)
and self.sequence_number == other.sequence_number
and self.content == other.content
)
def __repr__(self):
"""Class name plus version, type, sequence number, and content.
Returns:
str: Class name plus version, type, sequence number, and content.
"""
return "{}({}{}/{})".format(
self.__class__.__name__, str(self), self.sequence_number, len(self.content)
)
class LumberjackJsonFrame(LumberjackDataFrame):
"""Represents a lumberjack "json" frame."""
def get_bytes(self):
"""Generates byte representation for this frame.
Returns:
bytes: Byte representation for this frame.
"""
version_and_type = bytes(str(self.version) + self.frame_type, "ascii")
seq = self.sequence_number.to_bytes(4, "big")
json = dump_json(self.content, separators=(",", ":")).encode("utf-8")
length = len(json).to_bytes(4, "big")
return version_and_type + seq + length + json
class LumberjackWindowSizeFrame(LumberjackFrame):
"""Represents a lumberjack "window size" frame."""
def __init__(self, version, frame_type, window_size):
"""Creates the frame.
Arguments:
version (int): Lumberjack protocol version number.
frame_type (str): Frame type indicator (eg "W").
window_size (int): Ack window size.
"""
super().__init__(version, frame_type)
self.window_size = window_size
def get_bytes(self):
"""Generates byte representation for this frame.
Returns:
bytes: Byte representation for this frame.
"""
version_and_type = bytes(str(self.version) + self.frame_type, "ascii")
return version_and_type + self.window_size.to_bytes(4, "big")
def __eq__(self, other):
"""True if other frame has same window size.
Arguments:
other: Other object to compare to this object.
Returns:
bool: True if other frame has same window size.
"""
return super().__eq__(other) and self.window_size == other.window_size
def __repr__(self):
"""Class name plus version, type, sequence number, and content.
Returns:
str: Class name plus version, type, sequence number, and content.
"""
return "{}({}{})".format(self.__class__.__name__, str(self), self.window_size)
class LumberjackPartialFrame(LumberjackFrame):
"""Represents a frame for which only partial data has been received.
When more data is received, the data from this partial frame can be combined
with the new data to attempt to decode full frame.
"""
def __init__(self, data):
"""Creates the frame.
Arguments:
data (bytes): Partial data received.
"""
super().__init__(0, "partial")
self.data = data
def get_bytes(self):
"""Partial bytes received.
Returns:
bytes: Partial bytes received.
"""
return self.data
def __eq__(self, other):
"""True if other frame has same partial data.
Arguments:
other: Other object to compare to this object.
Returns:
bool: True if other frame has same partial data.
"""
return super().__eq__(other) and self.data == other.data
def __repr__(self):
"""Class name plus partial data length.
Returns:
str: Class name plus partial data length.
"""
return "{}({})".format(self.__class__.__name__, len(self.data))
class LumberjackPartialFrameException(Exception):
"""Used by the `LumberjackFrameDecoder` internally.
Raised when the `LumberjackFrameDecoder` tries to read more bytes than
are available. The main `decode` method of the decoder catches this
exception, and adds a `LumberjackPartialFrame` to its frame list with the
un-decoded partial frame data.
"""
pass
class LumberjackReaderProtocol(Protocol):
"""Twisted protocol for lumberjack feeder.
Mantains several state attributes between calls to `dataReceived`.
Attributes:
partial (bytes): Partial frame data saved from previous `dataReceived` call.
window_size (int): Maximum number of data frames that can be received
before an ack must be sent.
frames_since_ack (int): Number of data frames that have been processed
since an ack has been sent.
"""
def __init__(self, transformer):
"""Creates the protocol.
Arguments:
transformer (Transformer): Transformer instance.
"""
super().__init__()
self.transformer = transformer
self.decoder = LumberjackFrameDecoder()
self.partial = b""
self.window_size = 1
self.frames_since_ack = 0
# override Protocol.dataReceived()
def dataReceived(self, data): # noqa: N802
"""Receives a chunk of bytes and feeds it to the transformer.
Arguments:
data (bytes): Received bytes.
"""
timestamp = datetime.now()
LOG.debug(
"{bytes} bytes received from {source}",
bytes=len(data),
source=self.transport.getPeer().host,
)
# continue decoding previously partially-received frame
if self.partial:
data = self.partial + data
self.partial = b""
frames = self.decoder.decode(data)
LOG.debug("unpacked into {frames}", frames=frames)
for frame in frames:
self.consume_data(frame, timestamp)
self.send_ack(frame)
# always ack the final frame
if frames:
self.send_ack(frames[-1], force=True)
# override Protocol.connectionLost()
def connectionLost(self, reason): # noqa: N802
"""Called when socket closed/broken.
Arguments:
reason: Metadata about why the connection was lost.
"""
LOG.debug(
"connection lost from {source}: {reason}",
source=self.transport.getPeer().host,
reason=reason.value or "unknown reason",
)
if self.partial:
LOG.warning(
"{length} partial bytes remaining from {source}: {reason}",
length=len(self.partial),
source=self.transport.getPeer(),
reason=reason.value or "unknown reason",
)
def consume_data(self, frame, timestamp):
"""Consumes the content of the frame.
Arguments:
frame (LumberjackFrame): Frame to consume.
timestamp (datetime): Datetime when frame was received.
"""
if frame.frame_type == "partial":
# save partial data for next call to dataReceived()
self.partial = frame.get_bytes()
elif frame.frame_type in {"D", "J"}:
# feed content from "data" and "json" frames to transformer
LOG.debug("processing frame {content}", content=frame.content)
at = frame.content.get("@timestamp", timestamp)
self.transformer.feed({**frame.content, "at": at})
# cognitive complexity not too high
def send_ack(self, frame, force=False): # noqa: CCR001
"""Sends an ack frame to writer if necessary.
Arguments:
frame (LumberjackFrame): Frame to acknowledge.
force (bool): True to send ack even if not necessary.
"""
if frame.frame_type == "W":
# save new window size
self.window_size = frame.window_size
elif frame.frame_type in {"D", "J"}:
# send ack if have processed at least half the frames
# that fit in the writer's window
self.frames_since_ack += 1
if force or self.frames_since_ack > self.window_size / 2:
LOG.debug("ack frame {number}", number=frame.sequence_number)
ack = LumberjackAckFrame(frame.version, "A", frame.sequence_number)
self.transport.write(ack.get_bytes())
self.frames_since_ack = 0
class LumberjackReaderFactory(ServerFactory):
"""Twisted factory for lumberjack feeder."""
def __init__(self, transformer):
"""Creates the factory.
Arguments:
transformer (Transformer): Transformer instance.
"""
super().__init__()
self.transformer = transformer
# override Factory.buildProtocol()
def buildProtocol(self, addr): # noqa: N802
"""Creates a new LumberjackReaderProtocol instance.
Arguments:
addr: Address (unused).
Returns:
LumberjackReaderProtocol: New socket protocol instance.
"""
return LumberjackReaderProtocol(self.transformer)
class LumberjackReaderService(Service):
"""Twisted service for lumberjack feeder."""
# keyword args represent config props
def __init__( # noqa: CFQ002
self, port=0, interfaces=None, app="", tls=None, transformer=None, reactor=None
):
"""Creates the service.
Arguments:
port (int): Port to listen on (defaults to 5044).
interfaces (list): List of interfaces to listen on (defaults to ["::"]).
app (str): App name (defaults to lumberjack).
tls (dict): Dict of TLS options:
certificate (str): Optional path to TLS certificate file
(ie server cert). If specified, will use a TLS connection.
private_key (str): Optional path to TLS key file
(ie server cert key). If specified, will use a TLS connection.
authorities (list): Optional list of paths to TLS CA certificate files.
If specified, will require the client to use a TLS certificate
(ie client cert) signed by one of the CAs.
minimium_version (str): Minimum TLS version to use
(defaults to "TLSv1_2").
transformer (Transformer): Transformer instance (creates new instance
by default).
reactor (Reactor): Twisted reactor instance (uses default instance
by default).
"""
super().__init__()
self.app = app or environ.get("FUREMCAPE_FEEDER_APP", "lumberjack")
super().setName(self.app)
self.port = port or environ.get("FUREMCAPE_FEEDER_PORT", 5044)
self.interfaces = interfaces or ["::"]
self.tls = build_tls_options(**tls if tls else {})
self.transformer = transformer or Transformer()
if not reactor:
from twisted.internet import reactor
self.reactor = reactor
# override Service.startService()
def startService(self): # noqa: N802
"""Starts the service."""
super().startService()
self.transformer.validate_db()
if self.tls:
for interface in self.interfaces:
self.listener = self.reactor.listenSSL(
self.port,
LumberjackReaderFactory(self.transformer),
self.tls,
interface=interface,
)
else:
for interface in self.interfaces:
self.listener = self.reactor.listenTCP(
self.port,
LumberjackReaderFactory(self.transformer),
interface=interface,
)
# override Service.stopService()
def stopService(self): # noqa: N802
"""Stops the service."""
super().stopService()
self.listener.stopListening()