diff --git a/collector/scripts/watch/convert/as_mbox.py b/collector/scripts/watch/convert/as_mbox.py index 048fd0e4..f5a645ea 100644 --- a/collector/scripts/watch/convert/as_mbox.py +++ b/collector/scripts/watch/convert/as_mbox.py @@ -1,61 +1,127 @@ import os import datetime import email.utils -from mailbox import mbox +import re +import quopri +import base64 +from mailbox import mbox, mboxMessage from slugify import slugify -from ..utils import guid, file_creation_time, write_to_server_documents, move_source -from ...utils import tokenize from bs4 import BeautifulSoup +from scripts.watch.utils import ( + guid, + file_creation_time, + write_to_server_documents, + move_source, +) +from scripts.utils import tokenize + + +def get_content(message: mboxMessage) -> str: + content = "None" + # if message.is_multipart(): + for part in message.walk(): + if part.get_content_type() == "text/plain": + content = part.get_payload(decode=True) + break + elif part.get_content_type() == "text/html": + soup = BeautifulSoup(part.get_payload(decode=True), "html.parser") + content = soup.get_text() + + if isinstance(content, bytes): + try: + content = content.decode("utf-8") + except UnicodeDecodeError: + content = content.decode("latin-1") + + return content + + +def parse_subject(subject: str) -> str: + # Check if subject is Quoted-Printable encoded + if subject.startswith("=?") and subject.endswith("?="): + # Extract character set and encoding information + match = re.match(r"=\?(.+)\?(.)\?(.+)\?=", subject) + if match: + charset = match.group(1) + encoding = match.group(2) + encoded_text = match.group(3) + is_quoted_printable = encoding.upper() == "Q" + is_base64 = encoding.upper() == "B" + if is_quoted_printable: + # Decode Quoted-Printable encoded text + subject = quopri.decodestring(encoded_text).decode(charset) + elif is_base64: + # Decode Base64 encoded text + subject = base64.b64decode(encoded_text).decode(charset) + + return subject + # Process all mbox-related documents. def as_mbox(**kwargs): - parent_dir = kwargs.get('directory', 'hotdir') - filename = kwargs.get('filename') - ext = kwargs.get('ext', '.mbox') - remove = kwargs.get('remove_on_complete', False) + parent_dir = kwargs.get("directory", "hotdir") + filename = kwargs.get("filename") + ext = kwargs.get("ext", ".mbox") + remove = kwargs.get("remove_on_complete", False) + + if filename is not None: + filename = str(filename) + else: + print("[ERROR]: No filename provided.") + return (False, "No filename provided.") + fullpath = f"{parent_dir}/{filename}{ext}" print(f"-- Working {fullpath} --") box = mbox(fullpath) for message in box: - content = "" - if message.is_multipart(): - for part in message.get_payload(): - if part.get_content_type() == 'text/plain': - content = part.get_payload() - elif part.get_content_type() == 'text/html': - soup = BeautifulSoup(part.get_payload(), 'html.parser') - content = soup.get_text() - else: - content = message.get_payload() + content = get_content(message) + content = content.strip().replace("\r\n", "\n") - date_tuple = email.utils.parsedate_tz(message['Date']) + if len(content) == 0: + print("[WARNING]: Mail with no content. Ignored.") + continue + + date_tuple = email.utils.parsedate_tz(message["Date"]) if date_tuple: - local_date = datetime.datetime.fromtimestamp(email.utils.mktime_tz(date_tuple)) + local_date = datetime.datetime.fromtimestamp( + email.utils.mktime_tz(date_tuple) + ) date_sent = local_date.strftime("%a, %d %b %Y %H:%M:%S") else: date_sent = None + subject = message["Subject"] + + if subject is None: + print("[WARNING]: Mail with no subject. But has content.") + subject = "None" + else: + subject = parse_subject(subject) + + abs_path = os.path.abspath( + f"{parent_dir}/processed/{slugify(filename)}-{guid()}{ext}" + ) data = { - 'id': guid(), - 'url': "file://"+os.path.abspath(f"{parent_dir}/processed/{slugify(filename)}-{guid()}{ext}"), - 'title': message['Subject'], - 'docAuthor': message['From'], - 'description': f"email {message['From']} to {message['To']}", - 'docSource': "mbox file uploaded by the user.", - 'published': file_creation_time(fullpath), - 'sender': message['From'], - 'recipient': message['To'], - 'subject': message['Subject'], - 'date_sent': date_sent, - 'wordCount': len(content), - 'pageContent': content, - 'token_count_estimate': len(tokenize(content)) + "id": guid(), + "url": f"file://{abs_path}", + "title": subject, + "docAuthor": message["From"], + "description": f"email from {message['From']} to {message['To']}", + "docSource": "mbox file uploaded by the user.", + "published": file_creation_time(fullpath), + "sender": message["From"], + "recipient": message["To"], + "subject": subject, + "date_sent": date_sent, + "wordCount": len(content), + "pageContent": content, + "token_count_estimate": len(tokenize(content)), } write_to_server_documents(data, f"{slugify(filename)}-{data.get('id')}") move_source(parent_dir, f"{filename}{ext}", remove=remove) print(f"[SUCCESS]: {filename}{ext} converted & ready for embedding.\n") - return(True, None) + return (True, None)