Refactor LLM chat backend (#717)

* refactor stream/chat/embed-stram to be a single execution logic path so that it is easier to maintain and build upon

* no thread in sync chat since only api uses it
adjust import locations
This commit is contained in:
Timothy Carambat 2024-02-14 12:32:07 -08:00 committed by GitHub
parent 161dc5f901
commit c59ab9da0a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
20 changed files with 287 additions and 468 deletions

View File

@ -4,19 +4,19 @@ const { Telemetry } = require("../../../models/telemetry");
const { DocumentVectors } = require("../../../models/vectors");
const { Workspace } = require("../../../models/workspace");
const { WorkspaceChats } = require("../../../models/workspaceChats");
const {
convertToChatHistory,
chatWithWorkspace,
} = require("../../../utils/chats");
const { chatWithWorkspace } = require("../../../utils/chats");
const { getVectorDbClass } = require("../../../utils/helpers");
const { multiUserMode, reqBody } = require("../../../utils/http");
const { validApiKey } = require("../../../utils/middleware/validApiKey");
const {
streamChatWithWorkspace,
writeResponseChunk,
VALID_CHAT_MODE,
} = require("../../../utils/chats/stream");
const { EventLogs } = require("../../../models/eventLogs");
const {
convertToChatHistory,
writeResponseChunk,
} = require("../../../utils/helpers/chat/responses");
function apiWorkspaceEndpoints(app) {
if (!app) return;

View File

@ -7,7 +7,6 @@ const { SystemSettings } = require("../models/systemSettings");
const { Telemetry } = require("../models/telemetry");
const {
streamChatWithWorkspace,
writeResponseChunk,
VALID_CHAT_MODE,
} = require("../utils/chats/stream");
const {
@ -18,6 +17,7 @@ const { EventLogs } = require("../models/eventLogs");
const {
validWorkspaceAndThreadSlug,
} = require("../utils/middleware/validWorkspace");
const { writeResponseChunk } = require("../utils/helpers/chat/responses");
function chatEndpoints(app) {
if (!app) return;

View File

@ -1,15 +1,17 @@
const { v4: uuidv4 } = require("uuid");
const { reqBody, multiUserMode } = require("../../utils/http");
const { Telemetry } = require("../../models/telemetry");
const { writeResponseChunk } = require("../../utils/chats/stream");
const { streamChatWithForEmbed } = require("../../utils/chats/embed");
const { convertToChatHistory } = require("../../utils/chats");
const { EmbedChats } = require("../../models/embedChats");
const {
validEmbedConfig,
canRespond,
setConnectionMeta,
} = require("../../utils/middleware/embedMiddleware");
const {
convertToChatHistory,
writeResponseChunk,
} = require("../../utils/helpers/chat/responses");
function embeddedEndpoints(app) {
if (!app) return;

View File

@ -12,7 +12,7 @@ const {
validWorkspaceAndThreadSlug,
} = require("../utils/middleware/validWorkspace");
const { WorkspaceChats } = require("../models/workspaceChats");
const { convertToChatHistory } = require("../utils/chats");
const { convertToChatHistory } = require("../utils/helpers/chat/responses");
function workspaceThreadEndpoints(app) {
if (!app) return;

View File

@ -3,7 +3,6 @@ const { Workspace } = require("../models/workspace");
const { Document } = require("../models/documents");
const { DocumentVectors } = require("../models/vectors");
const { WorkspaceChats } = require("../models/workspaceChats");
const { convertToChatHistory } = require("../utils/chats");
const { getVectorDbClass } = require("../utils/helpers");
const { setupMulter } = require("../utils/files/multer");
const {
@ -22,6 +21,7 @@ const {
WorkspaceSuggestedMessages,
} = require("../models/workspacesSuggestedMessages");
const { validWorkspaceSlug } = require("../utils/middleware/validWorkspace");
const { convertToChatHistory } = require("../utils/helpers/chat/responses");
const { handleUploads } = setupMulter();
function workspaceEndpoints(app) {

View File

@ -1,6 +1,6 @@
const { AzureOpenAiEmbedder } = require("../../EmbeddingEngines/azureOpenAi");
const { chatPrompt } = require("../../chats");
const { writeResponseChunk } = require("../../chats/stream");
const { writeResponseChunk } = require("../../helpers/chat/responses");
class AzureOpenAiLLM {
constructor(embedder = null, _modelPreference = null) {

View File

@ -1,5 +1,5 @@
const { chatPrompt } = require("../../chats");
const { writeResponseChunk } = require("../../chats/stream");
const { writeResponseChunk } = require("../../helpers/chat/responses");
class GeminiLLM {
constructor(embedder = null, modelPreference = null) {

View File

@ -1,7 +1,7 @@
const { NativeEmbedder } = require("../../EmbeddingEngines/native");
const { OpenAiEmbedder } = require("../../EmbeddingEngines/openAi");
const { chatPrompt } = require("../../chats");
const { writeResponseChunk } = require("../../chats/stream");
const { writeResponseChunk } = require("../../helpers/chat/responses");
class HuggingFaceLLM {
constructor(embedder = null, _modelPreference = null) {

View File

@ -1,5 +1,5 @@
const { chatPrompt } = require("../../chats");
const { handleDefaultStreamResponse } = require("../../chats/stream");
const { handleDefaultStreamResponse } = require("../../helpers/chat/responses");
// hybrid of openAi LLM chat completion for LMStudio
class LMStudioLLM {

View File

@ -1,5 +1,5 @@
const { chatPrompt } = require("../../chats");
const { handleDefaultStreamResponse } = require("../../chats/stream");
const { handleDefaultStreamResponse } = require("../../helpers/chat/responses");
class LocalAiLLM {
constructor(embedder = null, modelPreference = null) {

View File

@ -1,5 +1,5 @@
const { chatPrompt } = require("../../chats");
const { handleDefaultStreamResponse } = require("../../chats/stream");
const { handleDefaultStreamResponse } = require("../../helpers/chat/responses");
class MistralLLM {
constructor(embedder = null, modelPreference = null) {

View File

@ -2,7 +2,7 @@ const fs = require("fs");
const path = require("path");
const { NativeEmbedder } = require("../../EmbeddingEngines/native");
const { chatPrompt } = require("../../chats");
const { writeResponseChunk } = require("../../chats/stream");
const { writeResponseChunk } = require("../../helpers/chat/responses");
// Docs: https://api.js.langchain.com/classes/chat_models_llama_cpp.ChatLlamaCpp.html
const ChatLlamaCpp = (...args) =>

View File

@ -1,6 +1,6 @@
const { chatPrompt } = require("../../chats");
const { StringOutputParser } = require("langchain/schema/output_parser");
const { writeResponseChunk } = require("../../chats/stream");
const { writeResponseChunk } = require("../../helpers/chat/responses");
// Docs: https://github.com/jmorganca/ollama/blob/main/docs/api.md
class OllamaAILLM {

View File

@ -1,6 +1,6 @@
const { OpenAiEmbedder } = require("../../EmbeddingEngines/openAi");
const { chatPrompt } = require("../../chats");
const { handleDefaultStreamResponse } = require("../../chats/stream");
const { handleDefaultStreamResponse } = require("../../helpers/chat/responses");
class OpenAiLLM {
constructor(embedder = null, modelPreference = null) {

View File

@ -1,5 +1,5 @@
const { chatPrompt } = require("../../chats");
const { writeResponseChunk } = require("../../chats/stream");
const { writeResponseChunk } = require("../../helpers/chat/responses");
function togetherAiModels() {
const { MODELS } = require("./models.js");

View File

@ -1,8 +1,11 @@
const { v4: uuidv4 } = require("uuid");
const { getVectorDbClass, getLLMProvider } = require("../helpers");
const { chatPrompt, convertToPromptHistory } = require(".");
const { writeResponseChunk } = require("./stream");
const { chatPrompt } = require("./index");
const { EmbedChats } = require("../../models/embedChats");
const {
convertToPromptHistory,
writeResponseChunk,
} = require("../helpers/chat/responses");
async function streamChatWithForEmbed(
response,
@ -44,30 +47,20 @@ async function streamChatWithForEmbed(
const messageLimit = 20;
const hasVectorizedSpace = await VectorDb.hasNamespace(embed.workspace.slug);
const embeddingsCount = await VectorDb.namespaceCount(embed.workspace.slug);
if (!hasVectorizedSpace || embeddingsCount === 0) {
if (chatMode === "query") {
writeResponseChunk(response, {
id: uuid,
type: "textResponse",
textResponse:
"I do not have enough information to answer that. Try another question.",
sources: [],
close: true,
error: null,
});
return;
}
// If there are no embeddings - chat like a normal LLM chat interface.
return await streamEmptyEmbeddingChat({
response,
uuid,
sessionId,
message,
embed,
messageLimit,
LLMConnector,
// User is trying to query-mode chat a workspace that has no data in it - so
// we should exit early as no information can be found under these conditions.
if ((!hasVectorizedSpace || embeddingsCount === 0) && chatMode === "query") {
writeResponseChunk(response, {
id: uuid,
type: "textResponse",
textResponse:
"I do not have enough information to answer that. Try another question.",
sources: [],
close: true,
error: null,
});
return;
}
let completeText;
@ -77,17 +70,24 @@ async function streamChatWithForEmbed(
messageLimit,
chatMode
);
const {
contextTexts = [],
sources = [],
message: error,
} = await VectorDb.performSimilaritySearch({
namespace: embed.workspace.slug,
input: message,
LLMConnector,
similarityThreshold: embed.workspace?.similarityThreshold,
topN: embed.workspace?.topN,
});
} = embeddingsCount !== 0 // if there no embeddings don't bother searching.
? await VectorDb.performSimilaritySearch({
namespace: embed.workspace.slug,
input: message,
LLMConnector,
similarityThreshold: embed.workspace?.similarityThreshold,
topN: embed.workspace?.topN,
})
: {
contextTexts: [],
sources: [],
message: null,
};
// Failed similarity search.
if (!!error) {
@ -176,7 +176,7 @@ async function recentEmbedChatHistory(
messageLimit = 20,
chatMode = null
) {
if (chatMode === "query") return [];
if (chatMode === "query") return { rawHistory: [], chatHistory: [] };
const rawHistory = (
await EmbedChats.forEmbedByUser(embed.id, sessionId, messageLimit, {
id: "desc",
@ -185,65 +185,6 @@ async function recentEmbedChatHistory(
return { rawHistory, chatHistory: convertToPromptHistory(rawHistory) };
}
async function streamEmptyEmbeddingChat({
response,
uuid,
sessionId,
message,
embed,
messageLimit,
LLMConnector,
}) {
let completeText;
const { rawHistory, chatHistory } = await recentEmbedChatHistory(
sessionId,
embed,
messageLimit
);
if (LLMConnector.streamingEnabled() !== true) {
console.log(
`\x1b[31m[STREAMING DISABLED]\x1b[0m Streaming is not available for ${LLMConnector.constructor.name}. Will use regular chat method.`
);
completeText = await LLMConnector.sendChat(
chatHistory,
message,
embed.workspace,
rawHistory
);
writeResponseChunk(response, {
uuid,
type: "textResponseChunk",
textResponse: completeText,
sources: [],
close: true,
error: false,
});
}
const stream = await LLMConnector.streamChat(
chatHistory,
message,
embed.workspace,
rawHistory
);
completeText = await LLMConnector.handleStream(response, stream, {
uuid,
sources: [],
});
await EmbedChats.new({
embedId: embed.id,
prompt: message,
response: { text: completeText, type: "chat" },
connection_information: response.locals.connection
? { ...response.locals.connection }
: {},
sessionId,
});
return;
}
module.exports = {
streamChatWithForEmbed,
};

View File

@ -1,46 +1,8 @@
const { v4: uuidv4 } = require("uuid");
const { WorkspaceChats } = require("../../models/workspaceChats");
const { resetMemory } = require("./commands/reset");
const moment = require("moment");
const { getVectorDbClass, getLLMProvider } = require("../helpers");
function convertToChatHistory(history = []) {
const formattedHistory = [];
history.forEach((history) => {
const { prompt, response, createdAt, feedbackScore = null, id } = history;
const data = JSON.parse(response);
formattedHistory.push([
{
role: "user",
content: prompt,
sentAt: moment(createdAt).unix(),
},
{
role: "assistant",
content: data.text,
sources: data.sources || [],
chatId: id,
sentAt: moment(createdAt).unix(),
feedbackScore,
},
]);
});
return formattedHistory.flat();
}
function convertToPromptHistory(history = []) {
const formattedHistory = [];
history.forEach((history) => {
const { prompt, response } = history;
const data = JSON.parse(response);
formattedHistory.push([
{ role: "user", content: prompt },
{ role: "assistant", content: data.text },
]);
});
return formattedHistory.flat();
}
const { convertToPromptHistory } = require("../helpers/chat/responses");
const VALID_COMMANDS = {
"/reset": resetMemory,
@ -64,7 +26,8 @@ async function chatWithWorkspace(
workspace,
message,
chatMode = "chat",
user = null
user = null,
thread = null
) {
const uuid = uuidv4();
const command = grepCommand(message);
@ -92,49 +55,51 @@ async function chatWithWorkspace(
const messageLimit = workspace?.openAiHistory || 20;
const hasVectorizedSpace = await VectorDb.hasNamespace(workspace.slug);
const embeddingsCount = await VectorDb.namespaceCount(workspace.slug);
if (!hasVectorizedSpace || embeddingsCount === 0) {
if (chatMode === "query") {
return {
id: uuid,
type: "textResponse",
sources: [],
close: true,
error: null,
textResponse:
"There is no relevant information in this workspace to answer your query.",
};
}
// If there are no embeddings - chat like a normal LLM chat interface.
return await emptyEmbeddingChat({
uuid,
user,
message,
workspace,
messageLimit,
LLMConnector,
});
// User is trying to query-mode chat a workspace that has no data in it - so
// we should exit early as no information can be found under these conditions.
if ((!hasVectorizedSpace || embeddingsCount === 0) && chatMode === "query") {
return {
id: uuid,
type: "textResponse",
sources: [],
close: true,
error: null,
textResponse:
"There is no relevant information in this workspace to answer your query.",
};
}
const { rawHistory, chatHistory } = await recentChatHistory(
// If we are here we know that we are in a workspace that is:
// 1. Chatting in "chat" mode and may or may _not_ have embeddings
// 2. Chatting in "query" mode and has at least 1 embedding
const { rawHistory, chatHistory } = await recentChatHistory({
user,
workspace,
thread,
messageLimit,
chatMode
);
chatMode,
});
const {
contextTexts = [],
sources = [],
message: error,
} = await VectorDb.performSimilaritySearch({
namespace: workspace.slug,
input: message,
LLMConnector,
similarityThreshold: workspace?.similarityThreshold,
topN: workspace?.topN,
});
} = embeddingsCount !== 0 // if there no embeddings don't bother searching.
? await VectorDb.performSimilaritySearch({
namespace: workspace.slug,
input: message,
LLMConnector,
similarityThreshold: workspace?.similarityThreshold,
topN: workspace?.topN,
})
: {
contextTexts: [],
sources: [],
message: null,
};
// Failed similarity search.
// Failed similarity search if it was run at all and failed.
if (!!error) {
return {
id: uuid,
@ -147,7 +112,7 @@ async function chatWithWorkspace(
}
// If in query mode and no sources are found, do not
// let the LLM try to hallucinate a response or use general knowledge
// let the LLM try to hallucinate a response or use general knowledge and exit early
if (chatMode === "query" && sources.length === 0) {
return {
id: uuid,
@ -160,7 +125,7 @@ async function chatWithWorkspace(
};
}
// Compress message to ensure prompt passes token limit with room for response
// Compress & Assemble message to ensure prompt passes token limit with room for response
// and build system messages based on inputs and history.
const messages = await LLMConnector.compressMessages(
{
@ -187,10 +152,12 @@ async function chatWithWorkspace(
error: "No text completion could be completed with this input.",
};
}
const { chat } = await WorkspaceChats.new({
workspaceId: workspace.id,
prompt: message,
response: { text: textResponse, sources, type: chatMode },
threadId: thread?.id || null,
user,
});
return {
@ -204,41 +171,14 @@ async function chatWithWorkspace(
};
}
// On query we dont return message history. All other chat modes and when chatting
// with no embeddings we return history.
// TODO: Refactor to just run a .where on WorkspaceChat to simplify what is going on here.
// see recentThreadChatHistory
async function recentChatHistory(
async function recentChatHistory({
user = null,
workspace,
thread = null,
messageLimit = 20,
chatMode = null
) {
if (chatMode === "query") return [];
const rawHistory = (
user
? await WorkspaceChats.forWorkspaceByUser(
workspace.id,
user.id,
messageLimit,
{ id: "desc" }
)
: await WorkspaceChats.forWorkspace(workspace.id, messageLimit, {
id: "desc",
})
).reverse();
return { rawHistory, chatHistory: convertToPromptHistory(rawHistory) };
}
// Extension of recentChatHistory that supports threads
async function recentThreadChatHistory(
user = null,
workspace,
thread,
messageLimit = 20,
chatMode = null
) {
if (chatMode === "query") return [];
chatMode = null,
}) {
if (chatMode === "query") return { rawHistory: [], chatHistory: [] };
const rawHistory = (
await WorkspaceChats.where(
{
@ -254,42 +194,6 @@ async function recentThreadChatHistory(
return { rawHistory, chatHistory: convertToPromptHistory(rawHistory) };
}
async function emptyEmbeddingChat({
uuid,
user,
message,
workspace,
messageLimit,
LLMConnector,
}) {
const { rawHistory, chatHistory } = await recentChatHistory(
user,
workspace,
messageLimit
);
const textResponse = await LLMConnector.sendChat(
chatHistory,
message,
workspace,
rawHistory
);
const { chat } = await WorkspaceChats.new({
workspaceId: workspace.id,
prompt: message,
response: { text: textResponse, sources: [], type: "chat" },
user,
});
return {
id: uuid,
type: "textResponse",
sources: [],
close: true,
error: null,
chatId: chat.id,
textResponse,
};
}
function chatPrompt(workspace) {
return (
workspace?.openAiPrompt ??
@ -299,9 +203,6 @@ function chatPrompt(workspace) {
module.exports = {
recentChatHistory,
recentThreadChatHistory,
convertToPromptHistory,
convertToChatHistory,
chatWithWorkspace,
chatPrompt,
grepCommand,

View File

@ -1,19 +1,15 @@
const { v4: uuidv4 } = require("uuid");
const { WorkspaceChats } = require("../../models/workspaceChats");
const { getVectorDbClass, getLLMProvider } = require("../helpers");
const { writeResponseChunk } = require("../helpers/chat/responses");
const {
grepCommand,
recentChatHistory,
VALID_COMMANDS,
chatPrompt,
recentThreadChatHistory,
} = require(".");
recentChatHistory,
} = require("./index");
const VALID_CHAT_MODE = ["chat", "query"];
function writeResponseChunk(response, data) {
response.write(`data: ${JSON.stringify(data)}\n\n`);
return;
}
async function streamChatWithWorkspace(
response,
@ -58,59 +54,53 @@ async function streamChatWithWorkspace(
const messageLimit = workspace?.openAiHistory || 20;
const hasVectorizedSpace = await VectorDb.hasNamespace(workspace.slug);
const embeddingsCount = await VectorDb.namespaceCount(workspace.slug);
if (!hasVectorizedSpace || embeddingsCount === 0) {
if (chatMode === "query") {
writeResponseChunk(response, {
id: uuid,
type: "textResponse",
textResponse:
"There is no relevant information in this workspace to answer your query.",
sources: [],
close: true,
error: null,
});
return;
}
// If there are no embeddings - chat like a normal LLM chat interface.
// no need to pass in chat mode - because if we are here we are in
// "chat" mode + have embeddings.
return await streamEmptyEmbeddingChat({
response,
uuid,
user,
message,
workspace,
messageLimit,
LLMConnector,
thread,
// User is trying to query-mode chat a workspace that has no data in it - so
// we should exit early as no information can be found under these conditions.
if ((!hasVectorizedSpace || embeddingsCount === 0) && chatMode === "query") {
writeResponseChunk(response, {
id: uuid,
type: "textResponse",
textResponse:
"There is no relevant information in this workspace to answer your query.",
sources: [],
close: true,
error: null,
});
return;
}
// If we are here we know that we are in a workspace that is:
// 1. Chatting in "chat" mode and may or may _not_ have embeddings
// 2. Chatting in "query" mode and has at least 1 embedding
let completeText;
const { rawHistory, chatHistory } = thread
? await recentThreadChatHistory(
user,
workspace,
thread,
messageLimit,
chatMode
)
: await recentChatHistory(user, workspace, messageLimit, chatMode);
const { rawHistory, chatHistory } = await recentChatHistory({
user,
workspace,
thread,
messageLimit,
chatMode,
});
const {
contextTexts = [],
sources = [],
message: error,
} = await VectorDb.performSimilaritySearch({
namespace: workspace.slug,
input: message,
LLMConnector,
similarityThreshold: workspace?.similarityThreshold,
topN: workspace?.topN,
});
} = embeddingsCount !== 0 // if there no embeddings don't bother searching.
? await VectorDb.performSimilaritySearch({
namespace: workspace.slug,
input: message,
LLMConnector,
similarityThreshold: workspace?.similarityThreshold,
topN: workspace?.topN,
})
: {
contextTexts: [],
sources: [],
message: null,
};
// Failed similarity search.
// Failed similarity search if it was run at all and failed.
if (!!error) {
writeResponseChunk(response, {
id: uuid,
@ -124,7 +114,7 @@ async function streamChatWithWorkspace(
}
// If in query mode and no sources are found, do not
// let the LLM try to hallucinate a response or use general knowledge
// let the LLM try to hallucinate a response or use general knowledge and exit early
if (chatMode === "query" && sources.length === 0) {
writeResponseChunk(response, {
id: uuid,
@ -138,7 +128,7 @@ async function streamChatWithWorkspace(
return;
}
// Compress message to ensure prompt passes token limit with room for response
// Compress & Assemble message to ensure prompt passes token limit with room for response
// and build system messages based on inputs and history.
const messages = await LLMConnector.compressMessages(
{
@ -181,7 +171,7 @@ async function streamChatWithWorkspace(
workspaceId: workspace.id,
prompt: message,
response: { text: completeText, sources, type: chatMode },
threadId: thread?.id,
threadId: thread?.id || null,
user,
});
@ -195,166 +185,7 @@ async function streamChatWithWorkspace(
return;
}
async function streamEmptyEmbeddingChat({
response,
uuid,
user,
message,
workspace,
messageLimit,
LLMConnector,
thread = null,
}) {
let completeText;
const { rawHistory, chatHistory } = thread
? await recentThreadChatHistory(user, workspace, thread, messageLimit)
: await recentChatHistory(user, workspace, messageLimit);
// If streaming is not explicitly enabled for connector
// we do regular waiting of a response and send a single chunk.
if (LLMConnector.streamingEnabled() !== true) {
console.log(
`\x1b[31m[STREAMING DISABLED]\x1b[0m Streaming is not available for ${LLMConnector.constructor.name}. Will use regular chat method.`
);
completeText = await LLMConnector.sendChat(
chatHistory,
message,
workspace,
rawHistory
);
writeResponseChunk(response, {
uuid,
type: "textResponseChunk",
textResponse: completeText,
sources: [],
close: true,
error: false,
});
} else {
const stream = await LLMConnector.streamChat(
chatHistory,
message,
workspace,
rawHistory
);
completeText = await LLMConnector.handleStream(response, stream, {
uuid,
sources: [],
});
}
const { chat } = await WorkspaceChats.new({
workspaceId: workspace.id,
prompt: message,
response: { text: completeText, sources: [], type: "chat" },
threadId: thread?.id,
user,
});
writeResponseChunk(response, {
uuid,
type: "finalizeResponseStream",
close: true,
error: false,
chatId: chat.id,
});
return;
}
// The default way to handle a stream response. Functions best with OpenAI.
function handleDefaultStreamResponse(response, stream, responseProps) {
const { uuid = uuidv4(), sources = [] } = responseProps;
return new Promise((resolve) => {
let fullText = "";
let chunk = "";
stream.data.on("data", (data) => {
const lines = data
?.toString()
?.split("\n")
.filter((line) => line.trim() !== "");
for (const line of lines) {
let validJSON = false;
const message = chunk + line.replace(/^data: /, "");
// JSON chunk is incomplete and has not ended yet
// so we need to stitch it together. You would think JSON
// chunks would only come complete - but they don't!
try {
JSON.parse(message);
validJSON = true;
} catch {}
if (!validJSON) {
// It can be possible that the chunk decoding is running away
// and the message chunk fails to append due to string length.
// In this case abort the chunk and reset so we can continue.
// ref: https://github.com/Mintplex-Labs/anything-llm/issues/416
try {
chunk += message;
} catch (e) {
console.error(`Chunk appending error`, e);
chunk = "";
}
continue;
} else {
chunk = "";
}
if (message == "[DONE]") {
writeResponseChunk(response, {
uuid,
sources,
type: "textResponseChunk",
textResponse: "",
close: true,
error: false,
});
resolve(fullText);
} else {
let finishReason = null;
let token = "";
try {
const json = JSON.parse(message);
token = json?.choices?.[0]?.delta?.content;
finishReason = json?.choices?.[0]?.finish_reason || null;
} catch {
continue;
}
if (token) {
fullText += token;
writeResponseChunk(response, {
uuid,
sources: [],
type: "textResponseChunk",
textResponse: token,
close: false,
error: false,
});
}
if (finishReason !== null) {
writeResponseChunk(response, {
uuid,
sources,
type: "textResponseChunk",
textResponse: "",
close: true,
error: false,
});
resolve(fullText);
}
}
}
});
});
}
module.exports = {
VALID_CHAT_MODE,
streamChatWithWorkspace,
writeResponseChunk,
handleDefaultStreamResponse,
};

View File

@ -1,5 +1,5 @@
const { convertToPromptHistory } = require("../../chats");
const { TokenManager } = require("../tiktoken");
const { convertToPromptHistory } = require("./responses");
/*
What is the message Array compressor?

View File

@ -0,0 +1,144 @@
const { v4: uuidv4 } = require("uuid");
const moment = require("moment");
// The default way to handle a stream response. Functions best with OpenAI.
// Currently used for LMStudio, LocalAI, Mistral API, and OpenAI
function handleDefaultStreamResponse(response, stream, responseProps) {
const { uuid = uuidv4(), sources = [] } = responseProps;
return new Promise((resolve) => {
let fullText = "";
let chunk = "";
stream.data.on("data", (data) => {
const lines = data
?.toString()
?.split("\n")
.filter((line) => line.trim() !== "");
for (const line of lines) {
let validJSON = false;
const message = chunk + line.replace(/^data: /, "");
// JSON chunk is incomplete and has not ended yet
// so we need to stitch it together. You would think JSON
// chunks would only come complete - but they don't!
try {
JSON.parse(message);
validJSON = true;
} catch {}
if (!validJSON) {
// It can be possible that the chunk decoding is running away
// and the message chunk fails to append due to string length.
// In this case abort the chunk and reset so we can continue.
// ref: https://github.com/Mintplex-Labs/anything-llm/issues/416
try {
chunk += message;
} catch (e) {
console.error(`Chunk appending error`, e);
chunk = "";
}
continue;
} else {
chunk = "";
}
if (message == "[DONE]") {
writeResponseChunk(response, {
uuid,
sources,
type: "textResponseChunk",
textResponse: "",
close: true,
error: false,
});
resolve(fullText);
} else {
let finishReason = null;
let token = "";
try {
const json = JSON.parse(message);
token = json?.choices?.[0]?.delta?.content;
finishReason = json?.choices?.[0]?.finish_reason || null;
} catch {
continue;
}
if (token) {
fullText += token;
writeResponseChunk(response, {
uuid,
sources: [],
type: "textResponseChunk",
textResponse: token,
close: false,
error: false,
});
}
if (finishReason !== null) {
writeResponseChunk(response, {
uuid,
sources,
type: "textResponseChunk",
textResponse: "",
close: true,
error: false,
});
resolve(fullText);
}
}
}
});
});
}
function convertToChatHistory(history = []) {
const formattedHistory = [];
history.forEach((history) => {
const { prompt, response, createdAt, feedbackScore = null, id } = history;
const data = JSON.parse(response);
formattedHistory.push([
{
role: "user",
content: prompt,
sentAt: moment(createdAt).unix(),
},
{
role: "assistant",
content: data.text,
sources: data.sources || [],
chatId: id,
sentAt: moment(createdAt).unix(),
feedbackScore,
},
]);
});
return formattedHistory.flat();
}
function convertToPromptHistory(history = []) {
const formattedHistory = [];
history.forEach((history) => {
const { prompt, response } = history;
const data = JSON.parse(response);
formattedHistory.push([
{ role: "user", content: prompt },
{ role: "assistant", content: data.text },
]);
});
return formattedHistory.flat();
}
function writeResponseChunk(response, data) {
response.write(`data: ${JSON.stringify(data)}\n\n`);
return;
}
module.exports = {
handleDefaultStreamResponse,
convertToChatHistory,
convertToPromptHistory,
writeResponseChunk,
};