2024-02-14 21:32:07 +01:00
|
|
|
const { v4: uuidv4 } = require("uuid");
|
|
|
|
const moment = require("moment");
|
|
|
|
|
2024-03-12 23:21:27 +01:00
|
|
|
function clientAbortedHandler(resolve, fullText) {
|
|
|
|
console.log(
|
|
|
|
"\x1b[43m\x1b[34m[STREAM ABORTED]\x1b[0m Client requested to abort stream. Exiting LLM stream handler early."
|
|
|
|
);
|
|
|
|
resolve(fullText);
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
2024-04-30 21:33:42 +02:00
|
|
|
function handleDefaultStreamResponseV2(response, stream, responseProps) {
|
|
|
|
const { uuid = uuidv4(), sources = [] } = responseProps;
|
|
|
|
|
|
|
|
return new Promise(async (resolve) => {
|
|
|
|
let fullText = "";
|
|
|
|
|
|
|
|
// Establish listener to early-abort a streaming response
|
|
|
|
// in case things go sideways or the user does not like the response.
|
|
|
|
// We preserve the generated text but continue as if chat was completed
|
|
|
|
// to preserve previously generated content.
|
|
|
|
const handleAbort = () => clientAbortedHandler(resolve, fullText);
|
|
|
|
response.on("close", handleAbort);
|
|
|
|
|
|
|
|
for await (const chunk of stream) {
|
|
|
|
const message = chunk?.choices?.[0];
|
|
|
|
const token = message?.delta?.content;
|
|
|
|
|
|
|
|
if (token) {
|
|
|
|
fullText += token;
|
|
|
|
writeResponseChunk(response, {
|
|
|
|
uuid,
|
|
|
|
sources: [],
|
|
|
|
type: "textResponseChunk",
|
|
|
|
textResponse: token,
|
|
|
|
close: false,
|
|
|
|
error: false,
|
|
|
|
});
|
|
|
|
}
|
|
|
|
|
|
|
|
// LocalAi returns '' and others return null.
|
|
|
|
if (message.finish_reason !== "" && message.finish_reason !== null) {
|
|
|
|
writeResponseChunk(response, {
|
|
|
|
uuid,
|
|
|
|
sources,
|
|
|
|
type: "textResponseChunk",
|
|
|
|
textResponse: "",
|
|
|
|
close: true,
|
|
|
|
error: false,
|
|
|
|
});
|
|
|
|
response.removeListener("close", handleAbort);
|
|
|
|
resolve(fullText);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
});
|
|
|
|
}
|
|
|
|
|
|
|
|
// TODO: Fully remove - deprecated.
|
2024-02-14 21:32:07 +01:00
|
|
|
// The default way to handle a stream response. Functions best with OpenAI.
|
|
|
|
// Currently used for LMStudio, LocalAI, Mistral API, and OpenAI
|
|
|
|
function handleDefaultStreamResponse(response, stream, responseProps) {
|
|
|
|
const { uuid = uuidv4(), sources = [] } = responseProps;
|
|
|
|
|
|
|
|
return new Promise((resolve) => {
|
|
|
|
let fullText = "";
|
|
|
|
let chunk = "";
|
2024-03-12 23:21:27 +01:00
|
|
|
|
|
|
|
// Establish listener to early-abort a streaming response
|
|
|
|
// in case things go sideways or the user does not like the response.
|
|
|
|
// We preserve the generated text but continue as if chat was completed
|
|
|
|
// to preserve previously generated content.
|
|
|
|
const handleAbort = () => clientAbortedHandler(resolve, fullText);
|
|
|
|
response.on("close", handleAbort);
|
|
|
|
|
2024-02-14 21:32:07 +01:00
|
|
|
stream.data.on("data", (data) => {
|
|
|
|
const lines = data
|
|
|
|
?.toString()
|
|
|
|
?.split("\n")
|
|
|
|
.filter((line) => line.trim() !== "");
|
|
|
|
|
|
|
|
for (const line of lines) {
|
|
|
|
let validJSON = false;
|
|
|
|
const message = chunk + line.replace(/^data: /, "");
|
|
|
|
|
|
|
|
// JSON chunk is incomplete and has not ended yet
|
|
|
|
// so we need to stitch it together. You would think JSON
|
|
|
|
// chunks would only come complete - but they don't!
|
|
|
|
try {
|
|
|
|
JSON.parse(message);
|
|
|
|
validJSON = true;
|
|
|
|
} catch {}
|
|
|
|
|
|
|
|
if (!validJSON) {
|
|
|
|
// It can be possible that the chunk decoding is running away
|
|
|
|
// and the message chunk fails to append due to string length.
|
|
|
|
// In this case abort the chunk and reset so we can continue.
|
|
|
|
// ref: https://github.com/Mintplex-Labs/anything-llm/issues/416
|
|
|
|
try {
|
|
|
|
chunk += message;
|
|
|
|
} catch (e) {
|
|
|
|
console.error(`Chunk appending error`, e);
|
|
|
|
chunk = "";
|
|
|
|
}
|
|
|
|
continue;
|
|
|
|
} else {
|
|
|
|
chunk = "";
|
|
|
|
}
|
|
|
|
|
|
|
|
if (message == "[DONE]") {
|
|
|
|
writeResponseChunk(response, {
|
|
|
|
uuid,
|
|
|
|
sources,
|
|
|
|
type: "textResponseChunk",
|
|
|
|
textResponse: "",
|
|
|
|
close: true,
|
|
|
|
error: false,
|
|
|
|
});
|
2024-03-12 23:21:27 +01:00
|
|
|
response.removeListener("close", handleAbort);
|
2024-02-14 21:32:07 +01:00
|
|
|
resolve(fullText);
|
|
|
|
} else {
|
|
|
|
let finishReason = null;
|
|
|
|
let token = "";
|
|
|
|
try {
|
|
|
|
const json = JSON.parse(message);
|
|
|
|
token = json?.choices?.[0]?.delta?.content;
|
|
|
|
finishReason = json?.choices?.[0]?.finish_reason || null;
|
|
|
|
} catch {
|
|
|
|
continue;
|
|
|
|
}
|
|
|
|
|
|
|
|
if (token) {
|
|
|
|
fullText += token;
|
|
|
|
writeResponseChunk(response, {
|
|
|
|
uuid,
|
|
|
|
sources: [],
|
|
|
|
type: "textResponseChunk",
|
|
|
|
textResponse: token,
|
|
|
|
close: false,
|
|
|
|
error: false,
|
|
|
|
});
|
|
|
|
}
|
|
|
|
|
|
|
|
if (finishReason !== null) {
|
|
|
|
writeResponseChunk(response, {
|
|
|
|
uuid,
|
|
|
|
sources,
|
|
|
|
type: "textResponseChunk",
|
|
|
|
textResponse: "",
|
|
|
|
close: true,
|
|
|
|
error: false,
|
|
|
|
});
|
2024-03-12 23:21:27 +01:00
|
|
|
response.removeListener("close", handleAbort);
|
2024-02-14 21:32:07 +01:00
|
|
|
resolve(fullText);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
});
|
|
|
|
});
|
|
|
|
}
|
|
|
|
|
|
|
|
function convertToChatHistory(history = []) {
|
|
|
|
const formattedHistory = [];
|
|
|
|
history.forEach((history) => {
|
|
|
|
const { prompt, response, createdAt, feedbackScore = null, id } = history;
|
|
|
|
const data = JSON.parse(response);
|
|
|
|
formattedHistory.push([
|
|
|
|
{
|
|
|
|
role: "user",
|
|
|
|
content: prompt,
|
|
|
|
sentAt: moment(createdAt).unix(),
|
|
|
|
},
|
|
|
|
{
|
2024-04-26 20:18:55 +02:00
|
|
|
type: data?.type || "chart",
|
2024-02-14 21:32:07 +01:00
|
|
|
role: "assistant",
|
|
|
|
content: data.text,
|
|
|
|
sources: data.sources || [],
|
|
|
|
chatId: id,
|
|
|
|
sentAt: moment(createdAt).unix(),
|
|
|
|
feedbackScore,
|
|
|
|
},
|
|
|
|
]);
|
|
|
|
});
|
|
|
|
|
|
|
|
return formattedHistory.flat();
|
|
|
|
}
|
|
|
|
|
|
|
|
function convertToPromptHistory(history = []) {
|
|
|
|
const formattedHistory = [];
|
|
|
|
history.forEach((history) => {
|
|
|
|
const { prompt, response } = history;
|
|
|
|
const data = JSON.parse(response);
|
|
|
|
formattedHistory.push([
|
|
|
|
{ role: "user", content: prompt },
|
|
|
|
{ role: "assistant", content: data.text },
|
|
|
|
]);
|
|
|
|
});
|
|
|
|
return formattedHistory.flat();
|
|
|
|
}
|
|
|
|
|
|
|
|
function writeResponseChunk(response, data) {
|
|
|
|
response.write(`data: ${JSON.stringify(data)}\n\n`);
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
module.exports = {
|
2024-04-30 21:33:42 +02:00
|
|
|
handleDefaultStreamResponseV2,
|
2024-02-14 21:32:07 +01:00
|
|
|
handleDefaultStreamResponse,
|
|
|
|
convertToChatHistory,
|
|
|
|
convertToPromptHistory,
|
|
|
|
writeResponseChunk,
|
2024-03-12 23:21:27 +01:00
|
|
|
clientAbortedHandler,
|
2024-02-14 21:32:07 +01:00
|
|
|
};
|