From 0e46a11cb65844640111c44d7588ef7d3861ffba Mon Sep 17 00:00:00 2001 From: Timothy Carambat Date: Tue, 12 Mar 2024 15:21:27 -0700 Subject: [PATCH] Stop generation button during stream-response (#892) * Stop generation button during stream-response * add custom stop icon * add stop to thread chats --- .../StopGenerationButton/index.jsx | 50 +++++++++++++++++++ .../PromptInput/StopGenerationButton/stop.svg | 4 ++ .../ChatContainer/PromptInput/index.jsx | 26 +++++----- .../WorkspaceChat/ChatContainer/index.jsx | 6 +-- frontend/src/models/workspace.js | 11 ++++ frontend/src/models/workspaceThread.js | 11 ++++ frontend/src/utils/chat/index.js | 18 +++++++ server/utils/AiProviders/anthropic/index.js | 13 ++++- server/utils/AiProviders/azureOpenAi/index.js | 14 +++++- server/utils/AiProviders/gemini/index.js | 14 +++++- server/utils/AiProviders/huggingface/index.js | 16 +++++- server/utils/AiProviders/native/index.js | 14 +++++- server/utils/AiProviders/ollama/index.js | 17 ++++++- server/utils/AiProviders/openRouter/index.js | 15 +++++- server/utils/AiProviders/togetherAi/index.js | 15 +++++- server/utils/helpers/chat/responses.js | 19 +++++++ 16 files changed, 236 insertions(+), 27 deletions(-) create mode 100644 frontend/src/components/WorkspaceChat/ChatContainer/PromptInput/StopGenerationButton/index.jsx create mode 100644 frontend/src/components/WorkspaceChat/ChatContainer/PromptInput/StopGenerationButton/stop.svg diff --git a/frontend/src/components/WorkspaceChat/ChatContainer/PromptInput/StopGenerationButton/index.jsx b/frontend/src/components/WorkspaceChat/ChatContainer/PromptInput/StopGenerationButton/index.jsx new file mode 100644 index 000000000..09a7c2ceb --- /dev/null +++ b/frontend/src/components/WorkspaceChat/ChatContainer/PromptInput/StopGenerationButton/index.jsx @@ -0,0 +1,50 @@ +import { ABORT_STREAM_EVENT } from "@/utils/chat"; +import { Tooltip } from "react-tooltip"; + +export default function StopGenerationButton() { + function emitHaltEvent() { + window.dispatchEvent(new CustomEvent(ABORT_STREAM_EVENT)); + } + + return ( + <> + + + + ); +} diff --git a/frontend/src/components/WorkspaceChat/ChatContainer/PromptInput/StopGenerationButton/stop.svg b/frontend/src/components/WorkspaceChat/ChatContainer/PromptInput/StopGenerationButton/stop.svg new file mode 100644 index 000000000..ab98895c2 --- /dev/null +++ b/frontend/src/components/WorkspaceChat/ChatContainer/PromptInput/StopGenerationButton/stop.svg @@ -0,0 +1,4 @@ + + + + diff --git a/frontend/src/components/WorkspaceChat/ChatContainer/PromptInput/index.jsx b/frontend/src/components/WorkspaceChat/ChatContainer/PromptInput/index.jsx index 2b9c5ca4f..52e870123 100644 --- a/frontend/src/components/WorkspaceChat/ChatContainer/PromptInput/index.jsx +++ b/frontend/src/components/WorkspaceChat/ChatContainer/PromptInput/index.jsx @@ -1,4 +1,3 @@ -import { CircleNotch, PaperPlaneRight } from "@phosphor-icons/react"; import React, { useState, useRef } from "react"; import SlashCommandsButton, { SlashCommands, @@ -6,6 +5,8 @@ import SlashCommandsButton, { } from "./SlashCommands"; import { isMobile } from "react-device-detect"; import debounce from "lodash.debounce"; +import { PaperPlaneRight } from "@phosphor-icons/react"; +import StopGenerationButton from "./StopGenerationButton"; export default function PromptInput({ workspace, @@ -83,19 +84,18 @@ export default function PromptInput({ className="cursor-text max-h-[100px] md:min-h-[40px] mx-2 md:mx-0 py-2 w-full text-[16px] md:text-md text-white bg-transparent placeholder:text-white/60 resize-none active:outline-none focus:outline-none flex-grow" placeholder={"Send a message"} /> - + Send message + + )}
diff --git a/frontend/src/components/WorkspaceChat/ChatContainer/index.jsx b/frontend/src/components/WorkspaceChat/ChatContainer/index.jsx index 293da491f..209fed5d6 100644 --- a/frontend/src/components/WorkspaceChat/ChatContainer/index.jsx +++ b/frontend/src/components/WorkspaceChat/ChatContainer/index.jsx @@ -68,11 +68,7 @@ export default function ChatContainer({ workspace, knownHistory = [] }) { const remHistory = chatHistory.length > 0 ? chatHistory.slice(0, -1) : []; var _chatHistory = [...remHistory]; - if (!promptMessage || !promptMessage?.userMessage) { - setLoadingResponse(false); - return false; - } - + if (!promptMessage || !promptMessage?.userMessage) return false; if (!!threadSlug) { await Workspace.threads.streamChat( { workspaceSlug: workspace.slug, threadSlug }, diff --git a/frontend/src/models/workspace.js b/frontend/src/models/workspace.js index 6786abffd..ae2cd5590 100644 --- a/frontend/src/models/workspace.js +++ b/frontend/src/models/workspace.js @@ -3,6 +3,7 @@ import { baseHeaders } from "@/utils/request"; import { fetchEventSource } from "@microsoft/fetch-event-source"; import WorkspaceThread from "@/models/workspaceThread"; import { v4 } from "uuid"; +import { ABORT_STREAM_EVENT } from "@/utils/chat"; const Workspace = { new: async function (data = {}) { @@ -75,6 +76,16 @@ const Workspace = { }, streamChat: async function ({ slug }, message, handleChat) { const ctrl = new AbortController(); + + // Listen for the ABORT_STREAM_EVENT key to be emitted by the client + // to early abort the streaming response. On abort we send a special `stopGeneration` + // event to be handled which resets the UI for us to be able to send another message. + // The backend response abort handling is done in each LLM's handleStreamResponse. + window.addEventListener(ABORT_STREAM_EVENT, () => { + ctrl.abort(); + handleChat({ id: v4(), type: "stopGeneration" }); + }); + await fetchEventSource(`${API_BASE}/workspace/${slug}/stream-chat`, { method: "POST", body: JSON.stringify({ message }), diff --git a/frontend/src/models/workspaceThread.js b/frontend/src/models/workspaceThread.js index f9fad3173..b1bcaf644 100644 --- a/frontend/src/models/workspaceThread.js +++ b/frontend/src/models/workspaceThread.js @@ -1,3 +1,4 @@ +import { ABORT_STREAM_EVENT } from "@/utils/chat"; import { API_BASE } from "@/utils/constants"; import { baseHeaders } from "@/utils/request"; import { fetchEventSource } from "@microsoft/fetch-event-source"; @@ -80,6 +81,16 @@ const WorkspaceThread = { handleChat ) { const ctrl = new AbortController(); + + // Listen for the ABORT_STREAM_EVENT key to be emitted by the client + // to early abort the streaming response. On abort we send a special `stopGeneration` + // event to be handled which resets the UI for us to be able to send another message. + // The backend response abort handling is done in each LLM's handleStreamResponse. + window.addEventListener(ABORT_STREAM_EVENT, () => { + ctrl.abort(); + handleChat({ id: v4(), type: "stopGeneration" }); + }); + await fetchEventSource( `${API_BASE}/workspace/${workspaceSlug}/thread/${threadSlug}/stream-chat`, { diff --git a/frontend/src/utils/chat/index.js b/frontend/src/utils/chat/index.js index f1df11fea..37237c9ec 100644 --- a/frontend/src/utils/chat/index.js +++ b/frontend/src/utils/chat/index.js @@ -1,3 +1,5 @@ +export const ABORT_STREAM_EVENT = "abort-chat-stream"; + // For handling of chat responses in the frontend by their various types. export default function handleChat( chatResult, @@ -108,6 +110,22 @@ export default function handleChat( _chatHistory[chatIdx] = updatedHistory; } setChatHistory([..._chatHistory]); + setLoadingResponse(false); + } else if (type === "stopGeneration") { + const chatIdx = _chatHistory.length - 1; + const existingHistory = { ..._chatHistory[chatIdx] }; + const updatedHistory = { + ...existingHistory, + sources: [], + closed: true, + error: null, + animate: false, + pending: false, + }; + _chatHistory[chatIdx] = updatedHistory; + + setChatHistory([..._chatHistory]); + setLoadingResponse(false); } } diff --git a/server/utils/AiProviders/anthropic/index.js b/server/utils/AiProviders/anthropic/index.js index a48058e81..fea083329 100644 --- a/server/utils/AiProviders/anthropic/index.js +++ b/server/utils/AiProviders/anthropic/index.js @@ -1,6 +1,9 @@ const { v4 } = require("uuid"); const { chatPrompt } = require("../../chats"); -const { writeResponseChunk } = require("../../helpers/chat/responses"); +const { + writeResponseChunk, + clientAbortedHandler, +} = require("../../helpers/chat/responses"); class AnthropicLLM { constructor(embedder = null, modelPreference = null) { if (!process.env.ANTHROPIC_API_KEY) @@ -150,6 +153,13 @@ class AnthropicLLM { let fullText = ""; const { uuid = v4(), sources = [] } = responseProps; + // Establish listener to early-abort a streaming response + // in case things go sideways or the user does not like the response. + // We preserve the generated text but continue as if chat was completed + // to preserve previously generated content. + const handleAbort = () => clientAbortedHandler(resolve, fullText); + response.on("close", handleAbort); + stream.on("streamEvent", (message) => { const data = message; if ( @@ -181,6 +191,7 @@ class AnthropicLLM { close: true, error: false, }); + response.removeListener("close", handleAbort); resolve(fullText); } }); diff --git a/server/utils/AiProviders/azureOpenAi/index.js b/server/utils/AiProviders/azureOpenAi/index.js index 2ac6de3a1..21fc5cd91 100644 --- a/server/utils/AiProviders/azureOpenAi/index.js +++ b/server/utils/AiProviders/azureOpenAi/index.js @@ -1,6 +1,9 @@ const { AzureOpenAiEmbedder } = require("../../EmbeddingEngines/azureOpenAi"); const { chatPrompt } = require("../../chats"); -const { writeResponseChunk } = require("../../helpers/chat/responses"); +const { + writeResponseChunk, + clientAbortedHandler, +} = require("../../helpers/chat/responses"); class AzureOpenAiLLM { constructor(embedder = null, _modelPreference = null) { @@ -174,6 +177,14 @@ class AzureOpenAiLLM { return new Promise(async (resolve) => { let fullText = ""; + + // Establish listener to early-abort a streaming response + // in case things go sideways or the user does not like the response. + // We preserve the generated text but continue as if chat was completed + // to preserve previously generated content. + const handleAbort = () => clientAbortedHandler(resolve, fullText); + response.on("close", handleAbort); + for await (const event of stream) { for (const choice of event.choices) { const delta = choice.delta?.content; @@ -198,6 +209,7 @@ class AzureOpenAiLLM { close: true, error: false, }); + response.removeListener("close", handleAbort); resolve(fullText); }); } diff --git a/server/utils/AiProviders/gemini/index.js b/server/utils/AiProviders/gemini/index.js index bd84a3856..3d334b291 100644 --- a/server/utils/AiProviders/gemini/index.js +++ b/server/utils/AiProviders/gemini/index.js @@ -1,5 +1,8 @@ const { chatPrompt } = require("../../chats"); -const { writeResponseChunk } = require("../../helpers/chat/responses"); +const { + writeResponseChunk, + clientAbortedHandler, +} = require("../../helpers/chat/responses"); class GeminiLLM { constructor(embedder = null, modelPreference = null) { @@ -198,6 +201,14 @@ class GeminiLLM { return new Promise(async (resolve) => { let fullText = ""; + + // Establish listener to early-abort a streaming response + // in case things go sideways or the user does not like the response. + // We preserve the generated text but continue as if chat was completed + // to preserve previously generated content. + const handleAbort = () => clientAbortedHandler(resolve, fullText); + response.on("close", handleAbort); + for await (const chunk of stream) { fullText += chunk.text(); writeResponseChunk(response, { @@ -218,6 +229,7 @@ class GeminiLLM { close: true, error: false, }); + response.removeListener("close", handleAbort); resolve(fullText); }); } diff --git a/server/utils/AiProviders/huggingface/index.js b/server/utils/AiProviders/huggingface/index.js index 416e622a3..751d3595c 100644 --- a/server/utils/AiProviders/huggingface/index.js +++ b/server/utils/AiProviders/huggingface/index.js @@ -1,7 +1,10 @@ const { NativeEmbedder } = require("../../EmbeddingEngines/native"); const { OpenAiEmbedder } = require("../../EmbeddingEngines/openAi"); const { chatPrompt } = require("../../chats"); -const { writeResponseChunk } = require("../../helpers/chat/responses"); +const { + writeResponseChunk, + clientAbortedHandler, +} = require("../../helpers/chat/responses"); class HuggingFaceLLM { constructor(embedder = null, _modelPreference = null) { @@ -172,6 +175,14 @@ class HuggingFaceLLM { return new Promise((resolve) => { let fullText = ""; let chunk = ""; + + // Establish listener to early-abort a streaming response + // in case things go sideways or the user does not like the response. + // We preserve the generated text but continue as if chat was completed + // to preserve previously generated content. + const handleAbort = () => clientAbortedHandler(resolve, fullText); + response.on("close", handleAbort); + stream.data.on("data", (data) => { const lines = data ?.toString() @@ -218,6 +229,7 @@ class HuggingFaceLLM { close: true, error: false, }); + response.removeListener("close", handleAbort); resolve(fullText); } else { let error = null; @@ -241,6 +253,7 @@ class HuggingFaceLLM { close: true, error, }); + response.removeListener("close", handleAbort); resolve(""); return; } @@ -266,6 +279,7 @@ class HuggingFaceLLM { close: true, error: false, }); + response.removeListener("close", handleAbort); resolve(fullText); } } diff --git a/server/utils/AiProviders/native/index.js b/server/utils/AiProviders/native/index.js index 157fb7520..5764d4ee2 100644 --- a/server/utils/AiProviders/native/index.js +++ b/server/utils/AiProviders/native/index.js @@ -2,7 +2,10 @@ const fs = require("fs"); const path = require("path"); const { NativeEmbedder } = require("../../EmbeddingEngines/native"); const { chatPrompt } = require("../../chats"); -const { writeResponseChunk } = require("../../helpers/chat/responses"); +const { + writeResponseChunk, + clientAbortedHandler, +} = require("../../helpers/chat/responses"); // Docs: https://api.js.langchain.com/classes/chat_models_llama_cpp.ChatLlamaCpp.html const ChatLlamaCpp = (...args) => @@ -176,6 +179,14 @@ class NativeLLM { return new Promise(async (resolve) => { let fullText = ""; + + // Establish listener to early-abort a streaming response + // in case things go sideways or the user does not like the response. + // We preserve the generated text but continue as if chat was completed + // to preserve previously generated content. + const handleAbort = () => clientAbortedHandler(resolve, fullText); + response.on("close", handleAbort); + for await (const chunk of stream) { if (chunk === undefined) throw new Error( @@ -202,6 +213,7 @@ class NativeLLM { close: true, error: false, }); + response.removeListener("close", handleAbort); resolve(fullText); }); } diff --git a/server/utils/AiProviders/ollama/index.js b/server/utils/AiProviders/ollama/index.js index 035d4a9d0..6bd857b4e 100644 --- a/server/utils/AiProviders/ollama/index.js +++ b/server/utils/AiProviders/ollama/index.js @@ -1,6 +1,9 @@ const { chatPrompt } = require("../../chats"); const { StringOutputParser } = require("langchain/schema/output_parser"); -const { writeResponseChunk } = require("../../helpers/chat/responses"); +const { + writeResponseChunk, + clientAbortedHandler, +} = require("../../helpers/chat/responses"); // Docs: https://github.com/jmorganca/ollama/blob/main/docs/api.md class OllamaAILLM { @@ -180,8 +183,16 @@ class OllamaAILLM { const { uuid = uuidv4(), sources = [] } = responseProps; return new Promise(async (resolve) => { + let fullText = ""; + + // Establish listener to early-abort a streaming response + // in case things go sideways or the user does not like the response. + // We preserve the generated text but continue as if chat was completed + // to preserve previously generated content. + const handleAbort = () => clientAbortedHandler(resolve, fullText); + response.on("close", handleAbort); + try { - let fullText = ""; for await (const chunk of stream) { if (chunk === undefined) throw new Error( @@ -210,6 +221,7 @@ class OllamaAILLM { close: true, error: false, }); + response.removeListener("close", handleAbort); resolve(fullText); } catch (error) { writeResponseChunk(response, { @@ -222,6 +234,7 @@ class OllamaAILLM { error?.cause ?? error.message }`, }); + response.removeListener("close", handleAbort); } }); } diff --git a/server/utils/AiProviders/openRouter/index.js b/server/utils/AiProviders/openRouter/index.js index 38a6f9f09..a1f606f60 100644 --- a/server/utils/AiProviders/openRouter/index.js +++ b/server/utils/AiProviders/openRouter/index.js @@ -1,7 +1,10 @@ const { NativeEmbedder } = require("../../EmbeddingEngines/native"); const { chatPrompt } = require("../../chats"); const { v4: uuidv4 } = require("uuid"); -const { writeResponseChunk } = require("../../helpers/chat/responses"); +const { + writeResponseChunk, + clientAbortedHandler, +} = require("../../helpers/chat/responses"); function openRouterModels() { const { MODELS } = require("./models.js"); @@ -195,6 +198,13 @@ class OpenRouterLLM { let chunk = ""; let lastChunkTime = null; // null when first token is still not received. + // Establish listener to early-abort a streaming response + // in case things go sideways or the user does not like the response. + // We preserve the generated text but continue as if chat was completed + // to preserve previously generated content. + const handleAbort = () => clientAbortedHandler(resolve, fullText); + response.on("close", handleAbort); + // NOTICE: Not all OpenRouter models will return a stop reason // which keeps the connection open and so the model never finalizes the stream // like the traditional OpenAI response schema does. So in the case the response stream @@ -220,6 +230,7 @@ class OpenRouterLLM { error: false, }); clearInterval(timeoutCheck); + response.removeListener("close", handleAbort); resolve(fullText); } }, 500); @@ -269,6 +280,7 @@ class OpenRouterLLM { error: false, }); clearInterval(timeoutCheck); + response.removeListener("close", handleAbort); resolve(fullText); } else { let finishReason = null; @@ -305,6 +317,7 @@ class OpenRouterLLM { error: false, }); clearInterval(timeoutCheck); + response.removeListener("close", handleAbort); resolve(fullText); } } diff --git a/server/utils/AiProviders/togetherAi/index.js b/server/utils/AiProviders/togetherAi/index.js index 15b254a15..def03df96 100644 --- a/server/utils/AiProviders/togetherAi/index.js +++ b/server/utils/AiProviders/togetherAi/index.js @@ -1,5 +1,8 @@ const { chatPrompt } = require("../../chats"); -const { writeResponseChunk } = require("../../helpers/chat/responses"); +const { + writeResponseChunk, + clientAbortedHandler, +} = require("../../helpers/chat/responses"); function togetherAiModels() { const { MODELS } = require("./models.js"); @@ -185,6 +188,14 @@ class TogetherAiLLM { return new Promise((resolve) => { let fullText = ""; let chunk = ""; + + // Establish listener to early-abort a streaming response + // in case things go sideways or the user does not like the response. + // We preserve the generated text but continue as if chat was completed + // to preserve previously generated content. + const handleAbort = () => clientAbortedHandler(resolve, fullText); + response.on("close", handleAbort); + stream.data.on("data", (data) => { const lines = data ?.toString() @@ -230,6 +241,7 @@ class TogetherAiLLM { close: true, error: false, }); + response.removeListener("close", handleAbort); resolve(fullText); } else { let finishReason = null; @@ -263,6 +275,7 @@ class TogetherAiLLM { close: true, error: false, }); + response.removeListener("close", handleAbort); resolve(fullText); } } diff --git a/server/utils/helpers/chat/responses.js b/server/utils/helpers/chat/responses.js index c4371d818..e2ec7bd0d 100644 --- a/server/utils/helpers/chat/responses.js +++ b/server/utils/helpers/chat/responses.js @@ -1,6 +1,14 @@ const { v4: uuidv4 } = require("uuid"); const moment = require("moment"); +function clientAbortedHandler(resolve, fullText) { + console.log( + "\x1b[43m\x1b[34m[STREAM ABORTED]\x1b[0m Client requested to abort stream. Exiting LLM stream handler early." + ); + resolve(fullText); + return; +} + // The default way to handle a stream response. Functions best with OpenAI. // Currently used for LMStudio, LocalAI, Mistral API, and OpenAI function handleDefaultStreamResponse(response, stream, responseProps) { @@ -9,6 +17,14 @@ function handleDefaultStreamResponse(response, stream, responseProps) { return new Promise((resolve) => { let fullText = ""; let chunk = ""; + + // Establish listener to early-abort a streaming response + // in case things go sideways or the user does not like the response. + // We preserve the generated text but continue as if chat was completed + // to preserve previously generated content. + const handleAbort = () => clientAbortedHandler(resolve, fullText); + response.on("close", handleAbort); + stream.data.on("data", (data) => { const lines = data ?.toString() @@ -52,6 +68,7 @@ function handleDefaultStreamResponse(response, stream, responseProps) { close: true, error: false, }); + response.removeListener("close", handleAbort); resolve(fullText); } else { let finishReason = null; @@ -85,6 +102,7 @@ function handleDefaultStreamResponse(response, stream, responseProps) { close: true, error: false, }); + response.removeListener("close", handleAbort); resolve(fullText); } } @@ -141,4 +159,5 @@ module.exports = { convertToChatHistory, convertToPromptHistory, writeResponseChunk, + clientAbortedHandler, };