anything-llm/collector/scripts/watch/convert/as_mbox.py

128 lines
4.0 KiB
Python
Raw Normal View History

import os
import datetime
import email.utils
import re
import quopri
import base64
from mailbox import mbox, mboxMessage
from slugify import slugify
from bs4 import BeautifulSoup
from scripts.watch.utils import (
guid,
file_creation_time,
write_to_server_documents,
move_source,
)
from scripts.utils import tokenize
def get_content(message: mboxMessage) -> str:
content = "None"
# if message.is_multipart():
for part in message.walk():
if part.get_content_type() == "text/plain":
content = part.get_payload(decode=True)
break
elif part.get_content_type() == "text/html":
soup = BeautifulSoup(part.get_payload(decode=True), "html.parser")
content = soup.get_text()
if isinstance(content, bytes):
try:
content = content.decode("utf-8")
except UnicodeDecodeError:
content = content.decode("latin-1")
return content
def parse_subject(subject: str) -> str:
# Check if subject is Quoted-Printable encoded
if subject.startswith("=?") and subject.endswith("?="):
# Extract character set and encoding information
match = re.match(r"=\?(.+)\?(.)\?(.+)\?=", subject)
if match:
charset = match.group(1)
encoding = match.group(2)
encoded_text = match.group(3)
is_quoted_printable = encoding.upper() == "Q"
is_base64 = encoding.upper() == "B"
if is_quoted_printable:
# Decode Quoted-Printable encoded text
subject = quopri.decodestring(encoded_text).decode(charset)
elif is_base64:
# Decode Base64 encoded text
subject = base64.b64decode(encoded_text).decode(charset)
return subject
# Process all mbox-related documents.
def as_mbox(**kwargs):
parent_dir = kwargs.get("directory", "hotdir")
filename = kwargs.get("filename")
ext = kwargs.get("ext", ".mbox")
remove = kwargs.get("remove_on_complete", False)
if filename is not None:
filename = str(filename)
else:
print("[ERROR]: No filename provided.")
return (False, "No filename provided.")
fullpath = f"{parent_dir}/{filename}{ext}"
print(f"-- Working {fullpath} --")
box = mbox(fullpath)
for message in box:
content = get_content(message)
content = content.strip().replace("\r\n", "\n")
if len(content) == 0:
print("[WARNING]: Mail with no content. Ignored.")
continue
date_tuple = email.utils.parsedate_tz(message["Date"])
if date_tuple:
local_date = datetime.datetime.fromtimestamp(
email.utils.mktime_tz(date_tuple)
)
date_sent = local_date.strftime("%a, %d %b %Y %H:%M:%S")
else:
date_sent = None
subject = message["Subject"]
if subject is None:
print("[WARNING]: Mail with no subject. But has content.")
subject = "None"
else:
subject = parse_subject(subject)
abs_path = os.path.abspath(
f"{parent_dir}/processed/{slugify(filename)}-{guid()}{ext}"
)
data = {
"id": guid(),
"url": f"file://{abs_path}",
"title": subject,
"docAuthor": message["From"],
"description": f"email from {message['From']} to {message['To']}",
"docSource": "mbox file uploaded by the user.",
"published": file_creation_time(fullpath),
"sender": message["From"],
"recipient": message["To"],
"subject": subject,
"date_sent": date_sent,
"wordCount": len(content),
"pageContent": content,
"token_count_estimate": len(tokenize(content)),
}
write_to_server_documents(data, f"{slugify(filename)}-{data.get('id')}")
move_source(parent_dir, f"{filename}{ext}", remove=remove)
print(f"[SUCCESS]: {filename}{ext} converted & ready for embedding.\n")
return (True, None)