anything-llm/server/endpoints/workspaces.js
Timothy Carambat fdc3add53c
Api session id support (#2158)
* Refactor api endpoint chat handler to its own function
remove legacy `chatWithWorkspace` and cleanup `index.js`

* Add `sessionId` in dev API to partition chats logically statelessly
2024-08-21 15:25:47 -07:00

980 lines
30 KiB
JavaScript

const path = require("path");
const fs = require("fs");
const {
reqBody,
multiUserMode,
userFromSession,
safeJsonParse,
} = require("../utils/http");
const { normalizePath, isWithin } = require("../utils/files");
const { Workspace } = require("../models/workspace");
const { Document } = require("../models/documents");
const { DocumentVectors } = require("../models/vectors");
const { WorkspaceChats } = require("../models/workspaceChats");
const { getVectorDbClass } = require("../utils/helpers");
const { handleFileUpload, handlePfpUpload } = require("../utils/files/multer");
const { validatedRequest } = require("../utils/middleware/validatedRequest");
const { Telemetry } = require("../models/telemetry");
const {
flexUserRoleValid,
ROLES,
} = require("../utils/middleware/multiUserProtected");
const { EventLogs } = require("../models/eventLogs");
const {
WorkspaceSuggestedMessages,
} = require("../models/workspacesSuggestedMessages");
const { validWorkspaceSlug } = require("../utils/middleware/validWorkspace");
const { convertToChatHistory } = require("../utils/helpers/chat/responses");
const { CollectorApi } = require("../utils/collectorApi");
const {
determineWorkspacePfpFilepath,
fetchPfp,
} = require("../utils/files/pfp");
const { getTTSProvider } = require("../utils/TextToSpeech");
const { WorkspaceThread } = require("../models/workspaceThread");
const truncate = require("truncate");
const { purgeDocument } = require("../utils/files/purgeDocument");
function workspaceEndpoints(app) {
if (!app) return;
const responseCache = new Map();
app.post(
"/workspace/new",
[validatedRequest, flexUserRoleValid([ROLES.admin, ROLES.manager])],
async (request, response) => {
try {
const user = await userFromSession(request, response);
const { name = null, onboardingComplete = false } = reqBody(request);
const { workspace, message } = await Workspace.new(name, user?.id);
await Telemetry.sendTelemetry(
"workspace_created",
{
multiUserMode: multiUserMode(response),
LLMSelection: process.env.LLM_PROVIDER || "openai",
Embedder: process.env.EMBEDDING_ENGINE || "inherit",
VectorDbSelection: process.env.VECTOR_DB || "lancedb",
TTSSelection: process.env.TTS_PROVIDER || "native",
},
user?.id
);
await EventLogs.logEvent(
"workspace_created",
{
workspaceName: workspace?.name || "Unknown Workspace",
},
user?.id
);
if (onboardingComplete === true)
await Telemetry.sendTelemetry("onboarding_complete");
response.status(200).json({ workspace, message });
} catch (e) {
console.error(e.message, e);
response.sendStatus(500).end();
}
}
);
app.post(
"/workspace/:slug/update",
[validatedRequest, flexUserRoleValid([ROLES.admin, ROLES.manager])],
async (request, response) => {
try {
const user = await userFromSession(request, response);
const { slug = null } = request.params;
const data = reqBody(request);
const currWorkspace = multiUserMode(response)
? await Workspace.getWithUser(user, { slug })
: await Workspace.get({ slug });
if (!currWorkspace) {
response.sendStatus(400).end();
return;
}
await Workspace.trackChange(currWorkspace, data, user);
const { workspace, message } = await Workspace.update(
currWorkspace.id,
data
);
response.status(200).json({ workspace, message });
} catch (e) {
console.error(e.message, e);
response.sendStatus(500).end();
}
}
);
app.post(
"/workspace/:slug/upload",
[
validatedRequest,
flexUserRoleValid([ROLES.admin, ROLES.manager]),
handleFileUpload,
],
async function (request, response) {
try {
const Collector = new CollectorApi();
const { originalname } = request.file;
const processingOnline = await Collector.online();
if (!processingOnline) {
response
.status(500)
.json({
success: false,
error: `Document processing API is not online. Document ${originalname} will not be processed automatically.`,
})
.end();
return;
}
const { success, reason } =
await Collector.processDocument(originalname);
if (!success) {
response.status(500).json({ success: false, error: reason }).end();
return;
}
Collector.log(
`Document ${originalname} uploaded processed and successfully. It is now available in documents.`
);
await Telemetry.sendTelemetry("document_uploaded");
await EventLogs.logEvent(
"document_uploaded",
{
documentName: originalname,
},
response.locals?.user?.id
);
response.status(200).json({ success: true, error: null });
} catch (e) {
console.error(e.message, e);
response.sendStatus(500).end();
}
}
);
app.post(
"/workspace/:slug/upload-link",
[validatedRequest, flexUserRoleValid([ROLES.admin, ROLES.manager])],
async (request, response) => {
try {
const Collector = new CollectorApi();
const { link = "" } = reqBody(request);
const processingOnline = await Collector.online();
if (!processingOnline) {
response
.status(500)
.json({
success: false,
error: `Document processing API is not online. Link ${link} will not be processed automatically.`,
})
.end();
return;
}
const { success, reason } = await Collector.processLink(link);
if (!success) {
response.status(500).json({ success: false, error: reason }).end();
return;
}
Collector.log(
`Link ${link} uploaded processed and successfully. It is now available in documents.`
);
await Telemetry.sendTelemetry("link_uploaded");
await EventLogs.logEvent(
"link_uploaded",
{ link },
response.locals?.user?.id
);
response.status(200).json({ success: true, error: null });
} catch (e) {
console.error(e.message, e);
response.sendStatus(500).end();
}
}
);
app.post(
"/workspace/:slug/update-embeddings",
[validatedRequest, flexUserRoleValid([ROLES.admin, ROLES.manager])],
async (request, response) => {
try {
const user = await userFromSession(request, response);
const { slug = null } = request.params;
const { adds = [], deletes = [] } = reqBody(request);
const currWorkspace = multiUserMode(response)
? await Workspace.getWithUser(user, { slug })
: await Workspace.get({ slug });
if (!currWorkspace) {
response.sendStatus(400).end();
return;
}
await Document.removeDocuments(
currWorkspace,
deletes,
response.locals?.user?.id
);
const { failedToEmbed = [], errors = [] } = await Document.addDocuments(
currWorkspace,
adds,
response.locals?.user?.id
);
const updatedWorkspace = await Workspace.get({ id: currWorkspace.id });
response.status(200).json({
workspace: updatedWorkspace,
message:
failedToEmbed.length > 0
? `${failedToEmbed.length} documents failed to add.\n\n${errors
.map((msg) => `${msg}`)
.join("\n\n")}`
: null,
});
} catch (e) {
console.error(e.message, e);
response.sendStatus(500).end();
}
}
);
app.delete(
"/workspace/:slug",
[validatedRequest, flexUserRoleValid([ROLES.admin, ROLES.manager])],
async (request, response) => {
try {
const { slug = "" } = request.params;
const user = await userFromSession(request, response);
const VectorDb = getVectorDbClass();
const workspace = multiUserMode(response)
? await Workspace.getWithUser(user, { slug })
: await Workspace.get({ slug });
if (!workspace) {
response.sendStatus(400).end();
return;
}
await WorkspaceChats.delete({ workspaceId: Number(workspace.id) });
await DocumentVectors.deleteForWorkspace(workspace.id);
await Document.delete({ workspaceId: Number(workspace.id) });
await Workspace.delete({ id: Number(workspace.id) });
await EventLogs.logEvent(
"workspace_deleted",
{
workspaceName: workspace?.name || "Unknown Workspace",
},
response.locals?.user?.id
);
try {
await VectorDb["delete-namespace"]({ namespace: slug });
} catch (e) {
console.error(e.message);
}
response.sendStatus(200).end();
} catch (e) {
console.error(e.message, e);
response.sendStatus(500).end();
}
}
);
app.delete(
"/workspace/:slug/reset-vector-db",
[validatedRequest, flexUserRoleValid([ROLES.admin, ROLES.manager])],
async (request, response) => {
try {
const { slug = "" } = request.params;
const user = await userFromSession(request, response);
const VectorDb = getVectorDbClass();
const workspace = multiUserMode(response)
? await Workspace.getWithUser(user, { slug })
: await Workspace.get({ slug });
if (!workspace) {
response.sendStatus(400).end();
return;
}
await DocumentVectors.deleteForWorkspace(workspace.id);
await Document.delete({ workspaceId: Number(workspace.id) });
await EventLogs.logEvent(
"workspace_vectors_reset",
{
workspaceName: workspace?.name || "Unknown Workspace",
},
response.locals?.user?.id
);
try {
await VectorDb["delete-namespace"]({ namespace: slug });
} catch (e) {
console.error(e.message);
}
response.sendStatus(200).end();
} catch (e) {
console.error(e.message, e);
response.sendStatus(500).end();
}
}
);
app.get(
"/workspaces",
[validatedRequest, flexUserRoleValid([ROLES.all])],
async (request, response) => {
try {
const user = await userFromSession(request, response);
const workspaces = multiUserMode(response)
? await Workspace.whereWithUser(user)
: await Workspace.where();
response.status(200).json({ workspaces });
} catch (e) {
console.error(e.message, e);
response.sendStatus(500).end();
}
}
);
app.get(
"/workspace/:slug",
[validatedRequest, flexUserRoleValid([ROLES.all])],
async (request, response) => {
try {
const { slug } = request.params;
const user = await userFromSession(request, response);
const workspace = multiUserMode(response)
? await Workspace.getWithUser(user, { slug })
: await Workspace.get({ slug });
response.status(200).json({ workspace });
} catch (e) {
console.error(e.message, e);
response.sendStatus(500).end();
}
}
);
app.get(
"/workspace/:slug/chats",
[validatedRequest, flexUserRoleValid([ROLES.all])],
async (request, response) => {
try {
const { slug } = request.params;
const user = await userFromSession(request, response);
const workspace = multiUserMode(response)
? await Workspace.getWithUser(user, { slug })
: await Workspace.get({ slug });
if (!workspace) {
response.sendStatus(400).end();
return;
}
const history = multiUserMode(response)
? await WorkspaceChats.forWorkspaceByUser(workspace.id, user.id)
: await WorkspaceChats.forWorkspace(workspace.id);
response.status(200).json({ history: convertToChatHistory(history) });
} catch (e) {
console.error(e.message, e);
response.sendStatus(500).end();
}
}
);
app.delete(
"/workspace/:slug/delete-chats",
[validatedRequest, flexUserRoleValid([ROLES.all]), validWorkspaceSlug],
async (request, response) => {
try {
const { chatIds = [] } = reqBody(request);
const user = await userFromSession(request, response);
const workspace = response.locals.workspace;
if (!workspace || !Array.isArray(chatIds)) {
response.sendStatus(400).end();
return;
}
// This works for both workspace and threads.
// we simplify this by just looking at workspace<>user overlap
// since they are all on the same table.
await WorkspaceChats.delete({
id: { in: chatIds.map((id) => Number(id)) },
user_id: user?.id ?? null,
workspaceId: workspace.id,
});
response.sendStatus(200).end();
} catch (e) {
console.error(e.message, e);
response.sendStatus(500).end();
}
}
);
app.delete(
"/workspace/:slug/delete-edited-chats",
[validatedRequest, flexUserRoleValid([ROLES.all]), validWorkspaceSlug],
async (request, response) => {
try {
const { startingId } = reqBody(request);
const user = await userFromSession(request, response);
const workspace = response.locals.workspace;
await WorkspaceChats.delete({
workspaceId: workspace.id,
thread_id: null,
user_id: user?.id,
id: { gte: Number(startingId) },
});
response.sendStatus(200).end();
} catch (e) {
console.error(e.message, e);
response.sendStatus(500).end();
}
}
);
app.post(
"/workspace/:slug/update-chat",
[validatedRequest, flexUserRoleValid([ROLES.all]), validWorkspaceSlug],
async (request, response) => {
try {
const { chatId, newText = null } = reqBody(request);
if (!newText || !String(newText).trim())
throw new Error("Cannot save empty response");
const user = await userFromSession(request, response);
const workspace = response.locals.workspace;
const existingChat = await WorkspaceChats.get({
workspaceId: workspace.id,
thread_id: null,
user_id: user?.id,
id: Number(chatId),
});
if (!existingChat) throw new Error("Invalid chat.");
const chatResponse = safeJsonParse(existingChat.response, null);
if (!chatResponse) throw new Error("Failed to parse chat response");
await WorkspaceChats._update(existingChat.id, {
response: JSON.stringify({
...chatResponse,
text: String(newText),
}),
});
response.sendStatus(200).end();
} catch (e) {
console.error(e.message, e);
response.sendStatus(500).end();
}
}
);
app.post(
"/workspace/:slug/chat-feedback/:chatId",
[validatedRequest, flexUserRoleValid([ROLES.all]), validWorkspaceSlug],
async (request, response) => {
try {
const { chatId } = request.params;
const { feedback = null } = reqBody(request);
const existingChat = await WorkspaceChats.get({
id: Number(chatId),
workspaceId: response.locals.workspace.id,
});
if (!existingChat) {
response.status(404).end();
return;
}
const result = await WorkspaceChats.updateFeedbackScore(
chatId,
feedback
);
response.status(200).json({ success: result });
} catch (error) {
console.error("Error updating chat feedback:", error);
response.status(500).end();
}
}
);
app.get(
"/workspace/:slug/suggested-messages",
[validatedRequest, flexUserRoleValid([ROLES.all])],
async function (request, response) {
try {
const { slug } = request.params;
const suggestedMessages =
await WorkspaceSuggestedMessages.getMessages(slug);
response.status(200).json({ success: true, suggestedMessages });
} catch (error) {
console.error("Error fetching suggested messages:", error);
response
.status(500)
.json({ success: false, message: "Internal server error" });
}
}
);
app.post(
"/workspace/:slug/suggested-messages",
[validatedRequest, flexUserRoleValid([ROLES.admin, ROLES.manager])],
async (request, response) => {
try {
const { messages = [] } = reqBody(request);
const { slug } = request.params;
if (!Array.isArray(messages)) {
return response.status(400).json({
success: false,
message: "Invalid message format. Expected an array of messages.",
});
}
await WorkspaceSuggestedMessages.saveAll(messages, slug);
return response.status(200).json({
success: true,
message: "Suggested messages saved successfully.",
});
} catch (error) {
console.error("Error processing the suggested messages:", error);
response.status(500).json({
success: true,
message: "Error saving the suggested messages.",
});
}
}
);
app.post(
"/workspace/:slug/update-pin",
[
validatedRequest,
flexUserRoleValid([ROLES.admin, ROLES.manager]),
validWorkspaceSlug,
],
async (request, response) => {
try {
const { docPath, pinStatus = false } = reqBody(request);
const workspace = response.locals.workspace;
const document = await Document.get({
workspaceId: workspace.id,
docpath: docPath,
});
if (!document) return response.sendStatus(404).end();
await Document.update(document.id, { pinned: pinStatus });
return response.status(200).end();
} catch (error) {
console.error("Error processing the pin status update:", error);
return response.status(500).end();
}
}
);
app.get(
"/workspace/:slug/tts/:chatId",
[validatedRequest, flexUserRoleValid([ROLES.all]), validWorkspaceSlug],
async function (request, response) {
try {
const { chatId } = request.params;
const workspace = response.locals.workspace;
const cacheKey = `${workspace.slug}:${chatId}`;
const wsChat = await WorkspaceChats.get({
id: Number(chatId),
workspaceId: workspace.id,
});
const cachedResponse = responseCache.get(cacheKey);
if (cachedResponse) {
response.writeHead(200, {
"Content-Type": cachedResponse.mime || "audio/mpeg",
});
response.end(cachedResponse.buffer);
return;
}
const text = safeJsonParse(wsChat.response, null)?.text;
if (!text) return response.sendStatus(204).end();
const TTSProvider = getTTSProvider();
const buffer = await TTSProvider.ttsBuffer(text);
if (buffer === null) return response.sendStatus(204).end();
responseCache.set(cacheKey, { buffer, mime: "audio/mpeg" });
response.writeHead(200, {
"Content-Type": "audio/mpeg",
});
response.end(buffer);
return;
} catch (error) {
console.error("Error processing the TTS request:", error);
response.status(500).json({ message: "TTS could not be completed" });
}
}
);
app.get(
"/workspace/:slug/pfp",
[validatedRequest, flexUserRoleValid([ROLES.all])],
async function (request, response) {
try {
const { slug } = request.params;
const cachedResponse = responseCache.get(slug);
if (cachedResponse) {
response.writeHead(200, {
"Content-Type": cachedResponse.mime || "image/png",
});
response.end(cachedResponse.buffer);
return;
}
const pfpPath = await determineWorkspacePfpFilepath(slug);
if (!pfpPath) {
response.sendStatus(204).end();
return;
}
const { found, buffer, mime } = fetchPfp(pfpPath);
if (!found) {
response.sendStatus(204).end();
return;
}
responseCache.set(slug, { buffer, mime });
response.writeHead(200, {
"Content-Type": mime || "image/png",
});
response.end(buffer);
return;
} catch (error) {
console.error("Error processing the logo request:", error);
response.status(500).json({ message: "Internal server error" });
}
}
);
app.post(
"/workspace/:slug/upload-pfp",
[
validatedRequest,
flexUserRoleValid([ROLES.admin, ROLES.manager]),
handlePfpUpload,
],
async function (request, response) {
try {
const { slug } = request.params;
const uploadedFileName = request.randomFileName;
if (!uploadedFileName) {
return response.status(400).json({ message: "File upload failed." });
}
const workspaceRecord = await Workspace.get({
slug,
});
const oldPfpFilename = workspaceRecord.pfpFilename;
if (oldPfpFilename) {
const storagePath = path.join(__dirname, "../storage/assets/pfp");
const oldPfpPath = path.join(
storagePath,
normalizePath(workspaceRecord.pfpFilename)
);
if (!isWithin(path.resolve(storagePath), path.resolve(oldPfpPath)))
throw new Error("Invalid path name");
if (fs.existsSync(oldPfpPath)) fs.unlinkSync(oldPfpPath);
}
const { workspace, message } = await Workspace._update(
workspaceRecord.id,
{
pfpFilename: uploadedFileName,
}
);
return response.status(workspace ? 200 : 500).json({
message: workspace
? "Profile picture uploaded successfully."
: message,
});
} catch (error) {
console.error("Error processing the profile picture upload:", error);
response.status(500).json({ message: "Internal server error" });
}
}
);
app.delete(
"/workspace/:slug/remove-pfp",
[validatedRequest, flexUserRoleValid([ROLES.admin, ROLES.manager])],
async function (request, response) {
try {
const { slug } = request.params;
const workspaceRecord = await Workspace.get({
slug,
});
const oldPfpFilename = workspaceRecord.pfpFilename;
if (oldPfpFilename) {
const storagePath = path.join(__dirname, "../storage/assets/pfp");
const oldPfpPath = path.join(
storagePath,
normalizePath(oldPfpFilename)
);
if (!isWithin(path.resolve(storagePath), path.resolve(oldPfpPath)))
throw new Error("Invalid path name");
if (fs.existsSync(oldPfpPath)) fs.unlinkSync(oldPfpPath);
}
const { workspace, message } = await Workspace._update(
workspaceRecord.id,
{
pfpFilename: null,
}
);
// Clear the cache
responseCache.delete(slug);
return response.status(workspace ? 200 : 500).json({
message: workspace
? "Profile picture removed successfully."
: message,
});
} catch (error) {
console.error("Error processing the profile picture removal:", error);
response.status(500).json({ message: "Internal server error" });
}
}
);
app.post(
"/workspace/:slug/thread/fork",
[validatedRequest, flexUserRoleValid([ROLES.all]), validWorkspaceSlug],
async (request, response) => {
try {
const user = await userFromSession(request, response);
const workspace = response.locals.workspace;
const { chatId, threadSlug } = reqBody(request);
if (!chatId)
return response.status(400).json({ message: "chatId is required" });
// Get threadId we are branching from if that request body is sent
// and is a valid thread slug.
const threadId = !!threadSlug
? (
await WorkspaceThread.get({
slug: String(threadSlug),
workspace_id: workspace.id,
})
)?.id ?? null
: null;
const chatsToFork = await WorkspaceChats.where(
{
workspaceId: workspace.id,
user_id: user?.id,
include: true, // only duplicate visible chats
thread_id: threadId,
api_session_id: null, // Do not include API session chats.
id: { lte: Number(chatId) },
},
null,
{ id: "asc" }
);
const { thread: newThread, message: threadError } =
await WorkspaceThread.new(workspace, user?.id);
if (threadError)
return response.status(500).json({ error: threadError });
let lastMessageText = "";
const chatsData = chatsToFork.map((chat) => {
const chatResponse = safeJsonParse(chat.response, {});
if (chatResponse?.text) lastMessageText = chatResponse.text;
return {
workspaceId: workspace.id,
prompt: chat.prompt,
response: JSON.stringify(chatResponse),
user_id: user?.id,
thread_id: newThread.id,
};
});
await WorkspaceChats.bulkCreate(chatsData);
await WorkspaceThread.update(newThread, {
name: !!lastMessageText
? truncate(lastMessageText, 22)
: "Forked Thread",
});
await Telemetry.sendTelemetry("thread_forked");
await EventLogs.logEvent(
"thread_forked",
{
workspaceName: workspace?.name || "Unknown Workspace",
threadName: newThread.name,
},
user?.id
);
response.status(200).json({ newThreadSlug: newThread.slug });
} catch (e) {
console.error(e.message, e);
response.status(500).json({ message: "Internal server error" });
}
}
);
app.put(
"/workspace/workspace-chats/:id",
[validatedRequest, flexUserRoleValid([ROLES.all])],
async (request, response) => {
try {
const { id } = request.params;
const user = await userFromSession(request, response);
const validChat = await WorkspaceChats.get({
id: Number(id),
user_id: user?.id ?? null,
});
if (!validChat)
return response
.status(404)
.json({ success: false, error: "Chat not found." });
await WorkspaceChats._update(validChat.id, { include: false });
response.json({ success: true, error: null });
} catch (e) {
console.error(e.message, e);
response.status(500).json({ success: false, error: "Server error" });
}
}
);
/** Handles the uploading and embedding in one-call by uploading via drag-and-drop in chat container. */
app.post(
"/workspace/:slug/upload-and-embed",
[
validatedRequest,
flexUserRoleValid([ROLES.admin, ROLES.manager]),
handleFileUpload,
],
async function (request, response) {
try {
const { slug = null } = request.params;
const user = await userFromSession(request, response);
const currWorkspace = multiUserMode(response)
? await Workspace.getWithUser(user, { slug })
: await Workspace.get({ slug });
if (!currWorkspace) {
response.sendStatus(400).end();
return;
}
const Collector = new CollectorApi();
const { originalname } = request.file;
const processingOnline = await Collector.online();
if (!processingOnline) {
response
.status(500)
.json({
success: false,
error: `Document processing API is not online. Document ${originalname} will not be processed automatically.`,
})
.end();
return;
}
const { success, reason, documents } =
await Collector.processDocument(originalname);
if (!success || documents?.length === 0) {
response.status(500).json({ success: false, error: reason }).end();
return;
}
Collector.log(
`Document ${originalname} uploaded processed and successfully. It is now available in documents.`
);
await Telemetry.sendTelemetry("document_uploaded");
await EventLogs.logEvent(
"document_uploaded",
{
documentName: originalname,
},
response.locals?.user?.id
);
const document = documents[0];
const { failedToEmbed = [], errors = [] } = await Document.addDocuments(
currWorkspace,
[document.location],
response.locals?.user?.id
);
if (failedToEmbed.length > 0)
return response
.status(200)
.json({ success: false, error: errors?.[0], document: null });
response.status(200).json({
success: true,
error: null,
document: { id: document.id, location: document.location },
});
} catch (e) {
console.error(e.message, e);
response.sendStatus(500).end();
}
}
);
app.delete(
"/workspace/:slug/remove-and-unembed",
[
validatedRequest,
flexUserRoleValid([ROLES.admin, ROLES.manager]),
handleFileUpload,
],
async function (request, response) {
try {
const { slug = null } = request.params;
const body = reqBody(request);
const user = await userFromSession(request, response);
const currWorkspace = multiUserMode(response)
? await Workspace.getWithUser(user, { slug })
: await Workspace.get({ slug });
if (!currWorkspace || !body.documentLocation)
return response.sendStatus(400).end();
// Will delete the document from the entire system + wil unembed it.
await purgeDocument(body.documentLocation);
response.status(200).end();
} catch (e) {
console.error(e.message, e);
response.sendStatus(500).end();
}
}
);
}
module.exports = { workspaceEndpoints };