~emersion/python-emailthreads

835ec9858f5c2da32cdb81b29e29f91abb0c6d7d — emersion 2 years ago 6a883f4
Split into separate files
6 files changed, 226 insertions(+), 152 deletions(-)

M __init__.py
R emailthreads.py => quotes.py
M test/test_blocks.py
A threads.py
M try.py
A util.py
M __init__.py => __init__.py +1 -2
@@ 1,2 1,1 @@
from .emailthreads import Text, Quote, parse_blocks, match_quotes, trim_noisy_text
from .emailthreads import Thread, parse
from .threads import Thread, parse

R emailthreads.py => quotes.py +1 -140
@@ 2,43 2,7 @@ import re
import sys
from email.message import EmailMessage

def get_message_by_id(msgs, msg_id):
	# TODO: handle weird brackets stuff
	for msg in msgs:
		if msg["message-id"] == msg_id:
			return msg
	return None

def strip_prefix(s, prefix):
	if s.startswith(prefix):
		s = s[len(prefix):]
	return s

def flatten_header_field(value):
	value = value.strip()
	# TODO: more of these
	while value.startswith("Re:"):
		value = strip_prefix(value, "Re:").strip()
	lines = value.splitlines()
	lines = [l.strip() for l in lines]
	return " ".join(lines)

def get_text_part(msg):
	for part in msg.walk():
		if part.get_content_type() == "text/plain":
			return part
	return None

def normalize_whitespace(text):
	# TODO: more of these
	# No-break space
	return text.replace('\xa0', ' ')

def get_text(msg):
	text_part = get_text_part(msg)
	text = text_part.get_payload(decode=True).decode('utf-8')
	text = normalize_whitespace(text)
	return text
from util import *

def trim_empty_lines(block):
	start = 0


@@ 57,14 21,6 @@ def trim_empty_lines(block):

	return block

def lines_as_list(lines):
	if isinstance(lines, list):
		return lines
	elif isinstance(lines, str):
		return lines.split("\n")
	else:
		return list(lines)

class Text:
	def __init__(self, region, lines=[]):
		self.region = region


@@ 261,98 217,3 @@ def merge_blocks(blocks):
			last_block = block

	return merged

def quote_str(s):
	lines = s.split("\n")
	lines = ["| " + l for l in lines]
	return "\n".join(lines)

class Thread:
	def __init__(self, lines, source_msg, source_region, index=None):
		self.source_msg = source_msg
		self.source_region = source_region
		self.lines = lines_as_list(lines)
		self.index = index
		self.children = []

	def at(self, msg, index):
		if self.source_msg == msg and index >= self.source_region[0] and index < self.source_region[1]:
			return self

		for c in self.children:
			cc = c.at(msg, index)
			if cc is not None:
				return cc
		return None

	def __repr__(self):
		children_by_line = {}
		standalone_children = []
		for c in self.children:
			if c.index is not None and c.index < len(self.lines):
				if c.index not in children_by_line:
					children_by_line[c.index] = [c]
				else:
					children_by_line[c.index].append(c)
			else:
				standalone_children.append(c)

		repr_lines = []
		for (i, line) in enumerate(self.lines):
			repr_lines.append(line)

			for c in children_by_line.get(i, []):
				repr_lines.append("[inline thread by " + c.source_msg["from"] + " at " + c.source_msg["date"] + "]")
				s = quote_str(str(c))
				repr_lines.append(s)

		for c in standalone_children:
			repr_lines.append("[standalone thread by " + c.source_msg["from"] + " at " + c.source_msg["date"] + "]")
			s = quote_str(str(c))
			repr_lines.append(s)

		return "\n".join(repr_lines)

def parse(msg, refs=[]):
	# For some reason Python strips "Re:" prefixes
	subject = flatten_header_field(msg["subject"])

	in_reply_to = get_message_by_id(refs, msg['in-reply-to'])
	if in_reply_to is None or flatten_header_field(in_reply_to["subject"]) != subject:
		text = get_text(msg)
		text_lines = text.splitlines()
		return Thread(text_lines, msg, (0, len(text_lines)))

	blocks = parse_blocks(msg)
	blocks = trim_quotes_footer(blocks)
	blocks = match_quotes(blocks, in_reply_to)
	blocks = trim_noisy_text(blocks)
	# print("\n".join([str(block) for block in blocks]))
	blocks = merge_blocks(blocks)

	thread = parse(in_reply_to, refs)

	last_quote = None
	for block in blocks:
		if isinstance(block, Text):
			c = None
			if last_quote is not None:
				assert(last_quote.parent_region is not None)
				i = last_quote.parent_region[1] - 1
				parent = thread.at(in_reply_to, i)
				if parent is not None:
					c = Thread(block.lines, msg, block.region, i - parent.source_region[0])
					parent.children.append(c)
				else:
					# TODO: include previous quote, if any
					c = Thread(block.lines, msg, block.region)
					thread.children.append(c)
				last_quote = None
			else:
				# TODO: include previous quote, if any
				c = Thread(block.lines, msg, block.region)
				thread.children.append(c)
		elif isinstance(block, Quote):
			last_quote = block

	return thread

M test/test_blocks.py => test/test_blocks.py +9 -9
@@ 1,4 1,4 @@
import emailthreads
from emailthreads import quotes
import unittest
from email.message import EmailMessage



@@ 7,28 7,28 @@ class ParseBlocksTestCase(unittest.TestCase):
		msg = EmailMessage()
		msg.set_content(text)

		got = emailthreads.parse_blocks(msg)
		got = quotes.parse_blocks(msg)

		self.assertEqual(got, want)

	def test_only_text(self):
		text = "a\nb\nc"
		want = [emailthreads.Text((0, 3), text)]
		want = [quotes.Text((0, 3), text)]
		self._test(text, want)

	def test_only_quote(self):
		text = ">a\n>  b\n> c"
		want = [emailthreads.Quote((0, 3), "a\nb\nc")]
		want = [quotes.Quote((0, 3), "a\nb\nc")]
		self._test(text, want)

	def test_with_quotes(self):
		text = "a\n>b\nc\n> d\n> e\nf"
		want = [
			emailthreads.Text((0, 1), "a"),
			emailthreads.Quote((1, 2), "b"),
			emailthreads.Text((2, 3), "c"),
			emailthreads.Quote((3, 5), "d\ne"),
			emailthreads.Text((5, 6), "f"),
			quotes.Text((0, 1), "a"),
			quotes.Quote((1, 2), "b"),
			quotes.Text((2, 3), "c"),
			quotes.Quote((3, 5), "d\ne"),
			quotes.Text((5, 6), "f"),
		]
		self._test(text, want)


A threads.py => threads.py +164 -0
@@ 0,0 1,164 @@
import re
import sys
from email.message import EmailMessage

from util import *
from quotes import *

def get_message_by_id(msgs, msg_id):
	# TODO: handle weird brackets stuff
	for msg in msgs:
		if msg["message-id"] == msg_id:
			return msg
	return None

def strip_prefix(s, prefix):
	if s.startswith(prefix):
		s = s[len(prefix):]
	return s

def flatten_header_field(value):
	value = value.strip()
	# TODO: more of these
	while value.startswith("Re:"):
		value = strip_prefix(value, "Re:").strip()
	lines = value.splitlines()
	lines = [l.strip() for l in lines]
	return " ".join(lines)

def get_text_part(msg):
	for part in msg.walk():
		if part.get_content_type() == "text/plain":
			return part
	return None

def normalize_whitespace(text):
	# TODO: more of these
	# No-break space
	return text.replace('\xa0', ' ')

def get_text(msg):
	text_part = get_text_part(msg)
	text = text_part.get_payload(decode=True).decode('utf-8')
	text = normalize_whitespace(text)
	return text

def trim_empty_lines(block):
	start = 0
	for (i, l) in enumerate(block):
		if l != "":
			break
		start = i + 1
	block = block[start:]

	end = len(block)
	for (i, l) in enumerate(reversed(block)):
		if l != "":
			break
		end = len(block) - i - 1
	block = block[:end]

	return block

def lines_as_list(lines):
	if isinstance(lines, list):
		return lines
	elif isinstance(lines, str):
		return lines.split("\n")
	else:
		return list(lines)

def quote_str(s):
	lines = s.split("\n")
	lines = ["| " + l for l in lines]
	return "\n".join(lines)

class Thread:
	def __init__(self, lines, source_msg, source_region, index=None):
		self.source_msg = source_msg
		self.source_region = source_region
		self.lines = lines_as_list(lines)
		self.index = index
		self.children = []

	def at(self, msg, index):
		if self.source_msg == msg and index >= self.source_region[0] and index < self.source_region[1]:
			return self

		for c in self.children:
			cc = c.at(msg, index)
			if cc is not None:
				return cc
		return None

	def __repr__(self):
		children_by_line = {}
		standalone_children = []
		for c in self.children:
			if c.index is not None and c.index < len(self.lines):
				if c.index not in children_by_line:
					children_by_line[c.index] = [c]
				else:
					children_by_line[c.index].append(c)
			else:
				standalone_children.append(c)

		repr_lines = []
		for (i, line) in enumerate(self.lines):
			repr_lines.append(line)

			for c in children_by_line.get(i, []):
				repr_lines.append("[inline thread by " + c.source_msg["from"] + " at " + c.source_msg["date"] + "]")
				s = quote_str(str(c))
				repr_lines.append(s)

		for c in standalone_children:
			repr_lines.append("[standalone thread by " + c.source_msg["from"] + " at " + c.source_msg["date"] + "]")
			s = quote_str(str(c))
			repr_lines.append(s)

		return "\n".join(repr_lines)

def parse(msg, refs=[]):
	# For some reason Python strips "Re:" prefixes
	subject = flatten_header_field(msg["subject"])

	in_reply_to = get_message_by_id(refs, msg['in-reply-to'])
	if in_reply_to is None or flatten_header_field(in_reply_to["subject"]) != subject:
		text = get_text(msg)
		text_lines = text.splitlines()
		return Thread(text_lines, msg, (0, len(text_lines)))

	blocks = parse_blocks(msg)
	blocks = trim_quotes_footer(blocks)
	blocks = match_quotes(blocks, in_reply_to)
	blocks = trim_noisy_text(blocks)
	# print("\n".join([str(block) for block in blocks]))
	blocks = merge_blocks(blocks)

	thread = parse(in_reply_to, refs)

	last_quote = None
	for block in blocks:
		if isinstance(block, Text):
			c = None
			if last_quote is not None:
				assert(last_quote.parent_region is not None)
				i = last_quote.parent_region[1] - 1
				parent = thread.at(in_reply_to, i)
				if parent is not None:
					c = Thread(block.lines, msg, block.region, i - parent.source_region[0])
					parent.children.append(c)
				else:
					# TODO: include previous quote, if any
					c = Thread(block.lines, msg, block.region)
					thread.children.append(c)
				last_quote = None
			else:
				# TODO: include previous quote, if any
				c = Thread(block.lines, msg, block.region)
				thread.children.append(c)
		elif isinstance(block, Quote):
			last_quote = block

	return thread

M try.py => try.py +2 -1
@@ 2,7 2,8 @@

import mailbox

from emailthreads import *
from util import *
from threads import *

def get_message_references(msg):
	# TODO: handle spaces in message IDs

A util.py => util.py +49 -0
@@ 0,0 1,49 @@
import re
import sys
from email.message import EmailMessage

def get_message_by_id(msgs, msg_id):
	# TODO: handle weird brackets stuff
	for msg in msgs:
		if msg["message-id"] == msg_id:
			return msg
	return None

def strip_prefix(s, prefix):
	if s.startswith(prefix):
		s = s[len(prefix):]
	return s

def flatten_header_field(value):
	value = value.strip()
	# TODO: more of these
	while value.startswith("Re:"):
		value = strip_prefix(value, "Re:").strip()
	lines = value.splitlines()
	lines = [l.strip() for l in lines]
	return " ".join(lines)

def get_text_part(msg):
	for part in msg.walk():
		if part.get_content_type() == "text/plain":
			return part
	return None

def normalize_whitespace(text):
	# TODO: more of these
	# No-break space
	return text.replace('\xa0', ' ')

def get_text(msg):
	text_part = get_text_part(msg)
	text = text_part.get_payload(decode=True).decode('utf-8')
	text = normalize_whitespace(text)
	return text

def lines_as_list(lines):
	if isinstance(lines, list):
		return lines
	elif isinstance(lines, str):
		return lines.split("\n")
	else:
		return list(lines)