# -*- coding: utf-8 -*-
"""Unit tests for OpenSshdParser."""
from pytest import raises
from furemcape.transformer.processor.open_sshd_parser import OpenSshdParser
def test_init():
processor = OpenSshdParser()
assert processor.property == "message"
assert processor.ignore_missing is False
def test_equals():
assert OpenSshdParser() == OpenSshdParser()
assert OpenSshdParser(property="foo") != OpenSshdParser()
assert OpenSshdParser(property="foo") == OpenSshdParser(property="foo")
assert OpenSshdParser(property="foo") != OpenSshdParser(property="bar")
def test_process_empty():
processor = OpenSshdParser()
assert processor.process({"message": ""}) == {"message": ""}
def test_process_miss():
processor = OpenSshdParser()
message = "Not an expected message!"
assert processor.process({"message": message}) == {"message": message}
def test_process_invalid_user():
processor = OpenSshdParser()
message = "Invalid user mc from 50.247.88.233"
assert processor.process({"message": message}) == {
"message": message,
"ip": "50.247.88.233",
"user": "mc",
"error": "invalid user",
}
message = "Invalid user 0101 from 5.101.40.10"
assert processor.process({"message": message}) == {
"message": message,
"ip": "5.101.40.10",
"user": " 0101",
"error": "invalid user",
}
message = "Invalid user from 139.162.122.110"
assert processor.process({"message": message}) == {
"message": message,
"ip": "139.162.122.110",
"user": "",
"error": "invalid user",
}
def test_process_input_userauth_request_invalid_user():
processor = OpenSshdParser()
message = "input_userauth_request: invalid user test [preauth]"
assert processor.process({"message": message}) == {
"message": message,
"user": "test",
"action": "input_userauth_request [preauth]",
"error": "invalid user",
}
message = "input_userauth_request: invalid user [preauth]"
assert processor.process({"message": message}) == {
"message": message,
"user": "",
"action": "input_userauth_request [preauth]",
"error": "invalid user",
}
def test_process_received_disconnect():
processor = OpenSshdParser()
message = "error: Received disconnect from 216.68.184.194 port 50530:14: No supported authentication methods available [preauth]" # noqa: E501
assert processor.process({"message": message}) == {
"message": message,
"ip": "216.68.184.194",
"action": "received disconnect [preauth]",
"error": "no supported authentication methods available",
}
message = "Received disconnect from 103.94.185.58 port 54998:11: Normal Shutdown, Thank you for playing [preauth]" # noqa: E501
assert processor.process({"message": message}) == {
"message": message,
"ip": "103.94.185.58",
"action": "received disconnect [preauth]",
"error": "normal Shutdown, Thank you for playing",
}
message = "Received disconnect from 221.194.47.205 port 44526:11: "
assert processor.process({"message": message}) == {
"message": message,
"ip": "221.194.47.205",
"action": "received disconnect",
"error": "",
}
def test_process_starting_session():
processor = OpenSshdParser()
message = (
"Starting session: subsystem 'sftp' for dee from 216.68.184.194 port 50666 id 0"
)
assert processor.process({"message": message}) == {
"message": message,
"ip": "216.68.184.194",
"user": "dee",
"action": "starting session",
"resource": "subsystem 'sftp'",
}
message = "Starting session: shell on pts/0 for justin from 172.92.156.105 port 47178 id 0" # noqa: E501
assert processor.process({"message": message}) == {
"message": message,
"ip": "172.92.156.105",
"user": "justin",
"action": "starting session",
"resource": "shell on pts/0",
}
message = "Starting session: command for justin from 172.92.156.105 port 47184 id 0"
assert processor.process({"message": message}) == {
"message": message,
"ip": "172.92.156.105",
"user": "justin",
"action": "starting session",
"resource": "command",
}
def test_process_did_not_receive_id_string():
processor = OpenSshdParser()
message = "Did not receive identification string from 114.72.109.225"
assert processor.process({"message": message}) == {
"message": message,
"ip": "114.72.109.225",
"error": "did not receive identification string",
}
def test_process_connection_from():
processor = OpenSshdParser()
message = "Connection from 50.247.88.233 port 37782 on 10.10.10.18 port 22"
assert processor.process({"message": message}) == {
"message": message,
"ip": "50.247.88.233",
"action": "connection",
}
def test_process_disconnected_from():
processor = OpenSshdParser()
message = "Disconnected from 172.92.156.105 port 47184"
assert processor.process({"message": message}) == {
"message": message,
"ip": "172.92.156.105",
"action": "disconnected",
}
def test_process_disconnected_from_user():
processor = OpenSshdParser()
message = "Disconnected from user justin 172.92.156.105 port 47184"
assert processor.process({"message": message}) == {
"message": message,
"user": "justin",
"ip": "172.92.156.105",
"action": "disconnected",
}
def test_process_accepted_publickey():
processor = OpenSshdParser()
message = "Accepted publickey for dee from 74.133.6.0 port 59053 ssh2: RSA SHA256:kNQvizvVRmwyXTVioT58isAzLS3zd4BqkhGStn8v/GI" # noqa: E501
assert processor.process({"message": message}) == {
"message": message,
"ip": "74.133.6.0",
"user": "dee",
"action": "accepted publickey",
"resource": "RSA SHA256:kNQvizvVRmwyXTVioT58isAzLS3zd4BqkhGStn8v/GI",
}
def test_process_failed_publickey():
processor = OpenSshdParser()
message = "Failed publickey for justin from 152.12.158.105 port 47178 ssh2: RSA SHA256:A3B5qHQ8L3VF2+o68iIUkoUitk+jXASog4YK2TsQkrs" # noqa: E501
assert processor.process({"message": message}) == {
"message": message,
"ip": "152.12.158.105",
"user": "justin",
"action": "failed publickey",
"resource": "RSA SHA256:A3B5qHQ8L3VF2+o68iIUkoUitk+jXASog4YK2TsQkrs",
}
def test_process_accepted_key():
processor = OpenSshdParser()
message = "Accepted key ED25519 SHA256:TP18lkc7zWi1Q1PDV0/5QzhOFVQ60EC5pJD8kS12XQp found at /home/dee/.ssh/authorized_keys:3" # noqa: E501
assert processor.process({"message": message}) == {
"message": message,
"action": "accepted key",
"resource": "ED25519 SHA256:TP18lkc7zWi1Q1PDV0/5QzhOFVQ60EC5pJD8kS12XQp",
}
def test_process_postponed_publickey():
processor = OpenSshdParser()
message = (
"Postponed publickey for justin from 172.92.156.105 port 47184 ssh2 [preauth]"
)
assert processor.process({"message": message}) == {
"message": message,
"ip": "172.92.156.105",
"user": "justin",
"action": "postponed publickey [preauth]",
}
def test_process_session_opened():
processor = OpenSshdParser()
message = "pam_unix(sshd:session): session opened for user dee by (uid=0)"
assert processor.process({"message": message}) == {
"message": message,
"user": "dee",
"action": "session opened",
}
def test_process_session_closed():
processor = OpenSshdParser()
message = "pam_unix(sshd:session): session closed for user justin"
assert processor.process({"message": message}) == {
"message": message,
"user": "justin",
"action": "session closed",
}
def test_process_close_session():
processor = OpenSshdParser()
message = "Close session: user justin from 172.92.156.105 port 47184 id 0"
assert processor.process({"message": message}) == {
"message": message,
"ip": "172.92.156.105",
"user": "justin",
"action": "close session",
}
def test_process_closing_connection():
processor = OpenSshdParser()
message = "Closing connection to 74.133.6.0 port 5905"
assert processor.process({"message": message}) == {
"message": message,
"ip": "74.133.6.0",
"action": "closing connection",
}
def test_process_max_auth_attempts():
processor = OpenSshdParser()
message = "error: maximum authentication attempts exceeded for root from 192.141.189.131 port 4522 ssh2 [preauth]" # noqa: E501
assert processor.process({"message": message}) == {
"message": message,
"ip": "192.141.189.131",
"user": "root",
"error": "maximum authentication attempts exceeded",
}
message = "error: maximum authentication attempts exceeded for invalid user admin from 103.210.135.136 port 39608 ssh2 [preauth]" # noqa: E501
assert processor.process({"message": message}) == {
"message": message,
"ip": "103.210.135.136",
"user": "admin",
"error": "maximum authentication attempts exceeded",
}
def test_process_bad_protocol_version():
processor = OpenSshdParser()
message = "Bad protocol version identification '\\026\\003\\001\\001\"\\001' from 164.52.24.138 port 50628" # noqa: E501
assert processor.process({"message": message}) == {
"message": message,
"ip": "164.52.24.138",
"error": "bad protocol version identification '\\026\\003\\001\\001\"\\001'",
}
def test_process_disconnecting():
processor = OpenSshdParser()
message = "Disconnecting: Too many authentication failures [preauth]"
assert processor.process({"message": message}) == {
"message": message,
"action": "disconnecting [preauth]",
"error": "too many authentication failures",
}
def test_process_connection_reset():
processor = OpenSshdParser()
message = "Connection reset by 221.194.44.211 port 60243 [preauth]"
assert processor.process({"message": message}) == {
"message": message,
"ip": "221.194.44.211",
"action": "connection reset [preauth]",
}
def test_process_connection_closed():
processor = OpenSshdParser()
message = "Connection closed by 116.93.102.204 port 52290 [preauth]"
assert processor.process({"message": message}) == {
"message": message,
"ip": "116.93.102.204",
"action": "connection closed [preauth]",
}
def test_process_unable_to_negotiate():
processor = OpenSshdParser()
message = "fatal: Unable to negotiate with 5.101.0.51 port 51762: no matching cipher found. Their offer: aes256-cbc,rijndael-cbc@lysator.liu.se,aes192-cbc,aes128-cbc,arcfour128,arcfour,3des-cbc,none [preauth]" # noqa: E501
assert processor.process({"message": message}) == {
"message": message,
"ip": "5.101.0.51",
"action": "unable to negotiate [preauth]",
"resource": "aes256-cbc,rijndael-cbc@lysator.liu.se,aes192-cbc,aes128-cbc,arcfour128,arcfour,3des-cbc,none", # noqa: E501
"error": "no matching cipher found",
}
def test_process_read_error():
processor = OpenSshdParser()
message = "Read error from remote host 66.42.179.151: Connection timed out"
assert processor.process({"message": message}) == {
"message": message,
"ip": "66.42.179.151",
"action": "read error",
"error": "connection timed out",
}
message = (
"Read error from remote host 66.42.179.151 port 55580: Connection timed out"
)
assert processor.process({"message": message}) == {
"message": message,
"ip": "66.42.179.151",
"action": "read error",
"error": "connection timed out",
}
def test_process_key_exchange_identification():
processor = OpenSshdParser()
message = "error: kex_exchange_identification: Connection closed by remote host"
assert processor.process({"message": message}) == {
"message": message,
"action": "kex_exchange_identification",
"error": "connection closed by remote host",
}
def test_process_connect_failed():
processor = OpenSshdParser()
message = "channel 3: open failed: connect failed: Connection refused"
assert processor.process({"message": message}) == {
"message": message,
"action": "connect failed",
"error": "connection refused",
}
def test_process_connection_transferred():
processor = OpenSshdParser()
message = "Transferred: sent 13208, received 1656 bytes"
assert processor.process({"message": message}) == {
"message": message,
"action": "transferred",
}
def test_process_connection_user_child():
processor = OpenSshdParser()
message = "User child is on pid 20025"
assert processor.process({"message": message}) == {
"message": message,
"action": "user child",
}
def test_missing_expected_property():
with raises(KeyError, match="missing message property: {}"): # noqa: P103
OpenSshdParser().process({})
with raises(KeyError, match="missing foo property: {'message': 'bar'}"):
OpenSshdParser(property="foo").process({"message": "bar"})
assert OpenSshdParser(ignore_missing=True).process({}) == {}