mirror of
https://github.com/Mintplex-Labs/anything-llm.git
synced 2024-11-09 00:10:10 +01:00
125 lines
3.9 KiB
Python
125 lines
3.9 KiB
Python
import os
|
|
import datetime
|
|
import email.utils
|
|
import re
|
|
import quopri
|
|
import base64
|
|
from mailbox import mbox, mboxMessage
|
|
from slugify import slugify
|
|
from bs4 import BeautifulSoup
|
|
from scripts.watch.utils import (
|
|
guid,
|
|
file_creation_time,
|
|
write_to_server_documents,
|
|
move_source,
|
|
)
|
|
from scripts.utils import tokenize
|
|
|
|
|
|
def get_content(message: mboxMessage) -> str:
|
|
content = "None"
|
|
# if message.is_multipart():
|
|
for part in message.walk():
|
|
if part.get_content_type() == "text/plain":
|
|
content = part.get_payload(decode=True)
|
|
break
|
|
elif part.get_content_type() == "text/html":
|
|
soup = BeautifulSoup(part.get_payload(decode=True), "html.parser")
|
|
content = soup.get_text()
|
|
|
|
if isinstance(content, bytes):
|
|
try:
|
|
content = content.decode("utf-8")
|
|
except UnicodeDecodeError:
|
|
content = content.decode("latin-1")
|
|
|
|
return content
|
|
|
|
|
|
def parse_subject(subject: str) -> str:
|
|
# Check if subject is Quoted-Printable encoded
|
|
if subject.startswith("=?") and subject.endswith("?="):
|
|
# Extract character set and encoding information
|
|
match = re.match(r"=\?(.+)\?(.)\?(.+)\?=", subject)
|
|
if match:
|
|
charset = match.group(1)
|
|
encoding = match.group(2)
|
|
encoded_text = match.group(3)
|
|
is_quoted_printable = encoding.upper() == "Q"
|
|
is_base64 = encoding.upper() == "B"
|
|
if is_quoted_printable:
|
|
# Decode Quoted-Printable encoded text
|
|
subject = quopri.decodestring(encoded_text).decode(charset)
|
|
elif is_base64:
|
|
# Decode Base64 encoded text
|
|
subject = base64.b64decode(encoded_text).decode(charset)
|
|
|
|
return subject
|
|
|
|
|
|
# Process all mbox-related documents.
|
|
def as_mbox(**kwargs):
|
|
parent_dir = kwargs.get("directory", "hotdir")
|
|
filename = kwargs.get("filename")
|
|
ext = kwargs.get("ext", ".mbox")
|
|
remove = kwargs.get("remove_on_complete", False)
|
|
|
|
if filename is not None:
|
|
filename = str(filename)
|
|
else:
|
|
print("[ERROR]: No filename provided.")
|
|
return (False, "No filename provided.")
|
|
|
|
fullpath = f"{parent_dir}/{filename}{ext}"
|
|
|
|
print(f"-- Working {fullpath} --")
|
|
box = mbox(fullpath)
|
|
|
|
for message in box:
|
|
content = get_content(message)
|
|
content = content.strip().replace("\r\n", "\n")
|
|
|
|
if len(content) == 0:
|
|
print("[WARNING]: Mail with no content. Ignored.")
|
|
continue
|
|
|
|
date_tuple = email.utils.parsedate_tz(message["Date"])
|
|
if date_tuple:
|
|
local_date = datetime.datetime.fromtimestamp(
|
|
email.utils.mktime_tz(date_tuple)
|
|
)
|
|
date_sent = local_date.strftime("%a, %d %b %Y %H:%M:%S")
|
|
else:
|
|
date_sent = None
|
|
|
|
subject = message["Subject"]
|
|
|
|
if subject is None:
|
|
print("[WARNING]: Mail with no subject. But has content.")
|
|
subject = "None"
|
|
else:
|
|
subject = parse_subject(subject)
|
|
|
|
abs_path = os.path.abspath(
|
|
f"{parent_dir}/processed/{slugify(filename)}-{guid()}{ext}"
|
|
)
|
|
data = {
|
|
"id": guid(),
|
|
"url": f"file://{abs_path}",
|
|
"title": subject,
|
|
"docAuthor": message["From"],
|
|
"description": f"email from {message['From']} to {message['To']}",
|
|
"docSource": "mbox file uploaded by the user.",
|
|
"chunkSource": subject,
|
|
"published": file_creation_time(fullpath),
|
|
"wordCount": len(content),
|
|
"pageContent": content,
|
|
"token_count_estimate": len(tokenize(content)),
|
|
}
|
|
|
|
write_to_server_documents(data, f"{slugify(filename)}-{data.get('id')}")
|
|
|
|
move_source(parent_dir, f"{filename}{ext}", remove=remove)
|
|
print(f"[SUCCESS]: {filename}{ext} converted & ready for embedding.\n")
|
|
return (True, None)
|