anything-llm/server/utils/agents/aibitat/plugins/websocket.js

157 lines
4.7 KiB
JavaScript
Raw Normal View History

const chalk = require("chalk");
const { RetryError } = require("../error");
const { Telemetry } = require("../../../../models/telemetry");
const SOCKET_TIMEOUT_MS = 300 * 1_000; // 5 mins
/**
* Websocket Interface plugin. It prints the messages on the console and asks for feedback
* while the conversation is running in the background.
*/
// export interface AIbitatWebSocket extends ServerWebSocket<unknown> {
// askForFeedback?: any
// awaitResponse?: any
// handleFeedback?: (message: string) => void;
// }
const WEBSOCKET_BAIL_COMMANDS = [
"exit",
"/exit",
"stop",
"/stop",
"halt",
"/halt",
];
const websocket = {
name: "websocket",
startupConfig: {
params: {
socket: {
required: true,
},
muteUserReply: {
required: false,
default: true,
},
introspection: {
required: false,
default: true,
},
},
},
plugin: function ({
socket, // @type AIbitatWebSocket
muteUserReply = true, // Do not post messages to "USER" back to frontend.
introspection = false, // when enabled will attach socket to Aibitat object with .introspect method which reports status updates to frontend.
}) {
return {
name: this.name,
setup(aibitat) {
aibitat.onError(async (error) => {
if (!!error?.message) {
console.error(chalk.red(` error: ${error.message}`), error);
aibitat.introspect(
`Error encountered while running: ${error.message}`
);
}
if (error instanceof RetryError) {
console.error(chalk.red(` retrying in 60 seconds...`));
setTimeout(() => {
aibitat.retry();
}, 60000);
return;
}
});
aibitat.introspect = (messageText) => {
if (!introspection) return; // Dump thoughts when not wanted.
socket.send(
JSON.stringify({ type: "statusResponse", content: messageText })
);
};
// expose function for sockets across aibitat
// type param must be set or else msg will not be shown or handled in UI.
aibitat.socket = {
send: (type = "__unhandled", content = "") => {
socket.send(JSON.stringify({ type, content }));
},
};
// aibitat.onStart(() => {
// console.log("🚀 starting chat ...");
// });
aibitat.onMessage((message) => {
if (message.from !== "USER")
Telemetry.sendTelemetry("agent_chat_sent");
if (message.from === "USER" && muteUserReply) return;
socket.send(JSON.stringify(message));
});
aibitat.onTerminate(() => {
// console.log("🚀 chat finished");
socket.close();
});
aibitat.onInterrupt(async (node) => {
const feedback = await socket.askForFeedback(socket, node);
if (WEBSOCKET_BAIL_COMMANDS.includes(feedback)) {
socket.close();
return;
}
await aibitat.continue(feedback);
});
/**
* Socket wait for feedback on socket
*
* @param socket The content to summarize. // AIbitatWebSocket & { receive: any, echo: any }
* @param node The chat node // { from: string; to: string }
* @returns The summarized content.
*/
socket.askForFeedback = (socket, node) => {
socket.awaitResponse = (question = "waiting...") => {
socket.send(JSON.stringify({ type: "WAITING_ON_INPUT", question }));
return new Promise(function (resolve) {
let socketTimeout = null;
socket.handleFeedback = (message) => {
const data = JSON.parse(message);
if (data.type !== "awaitingFeedback") return;
delete socket.handleFeedback;
clearTimeout(socketTimeout);
resolve(data.feedback);
return;
};
socketTimeout = setTimeout(() => {
console.log(
chalk.red(
`Client took too long to respond, chat thread is dead after ${SOCKET_TIMEOUT_MS}ms`
)
);
resolve("exit");
return;
}, SOCKET_TIMEOUT_MS);
});
};
return socket.awaitResponse(`Provide feedback to ${chalk.yellow(
node.to
)} as ${chalk.yellow(node.from)}.
Press enter to skip and use auto-reply, or type 'exit' to end the conversation: \n`);
};
// console.log("🚀 WS plugin is complete.");
},
};
},
};
module.exports = {
websocket,
WEBSOCKET_BAIL_COMMANDS,
};