anything-llm/server/utils/agents/aibitat/index.js

810 lines
23 KiB
JavaScript
Raw Normal View History

const { EventEmitter } = require("events");
const { APIError } = require("./error.js");
const Providers = require("./providers/index.js");
const { Telemetry } = require("../../../models/telemetry.js");
/**
* AIbitat is a class that manages the conversation between agents.
* It is designed to solve a task with LLM.
*
* Guiding the chat through a graph of agents.
*/
class AIbitat {
emitter = new EventEmitter();
provider = null;
defaultProvider = null;
defaultInterrupt;
maxRounds;
_chats;
agents = new Map();
channels = new Map();
functions = new Map();
constructor(props = {}) {
const {
chats = [],
interrupt = "NEVER",
maxRounds = 100,
provider = "openai",
handlerProps = {}, // Inherited props we can spread so aibitat can access.
...rest
} = props;
this._chats = chats;
this.defaultInterrupt = interrupt;
this.maxRounds = maxRounds;
this.handlerProps = handlerProps;
this.defaultProvider = {
provider,
...rest,
};
this.provider = this.defaultProvider.provider;
this.model = this.defaultProvider.model;
}
/**
* Get the chat history between agents and channels.
*/
get chats() {
return this._chats;
}
/**
* Install a plugin.
*/
use(plugin) {
plugin.setup(this);
return this;
}
/**
* Add a new agent to the AIbitat.
*
* @param name
* @param config
* @returns
*/
agent(name = "", config = {}) {
this.agents.set(name, config);
return this;
}
/**
* Add a new channel to the AIbitat.
*
* @param name
* @param members
* @param config
* @returns
*/
channel(name = "", members = [""], config = {}) {
this.channels.set(name, {
members,
...config,
});
return this;
}
/**
* Get the specific agent configuration.
*
* @param agent The name of the agent.
* @throws When the agent configuration is not found.
* @returns The agent configuration.
*/
getAgentConfig(agent = "") {
const config = this.agents.get(agent);
if (!config) {
throw new Error(`Agent configuration "${agent}" not found`);
}
return {
role: "You are a helpful AI assistant.",
// role: `You are a helpful AI assistant.
// Solve tasks using your coding and language skills.
// In the following cases, suggest typescript code (in a typescript coding block) or shell script (in a sh coding block) for the user to execute.
// 1. When you need to collect info, use the code to output the info you need, for example, browse or search the web, download/read a file, print the content of a webpage or a file, get the current date/time, check the operating system. After sufficient info is printed and the task is ready to be solved based on your language skill, you can solve the task by yourself.
// 2. When you need to perform some task with code, use the code to perform the task and output the result. Finish the task smartly.
// Solve the task step by step if you need to. If a plan is not provided, explain your plan first. Be clear which step uses code, and which step uses your language skill.
// When using code, you must indicate the script type in the code block. The user cannot provide any other feedback or perform any other action beyond executing the code you suggest. The user can't modify your code. So do not suggest incomplete code which requires users to modify. Don't use a code block if it's not intended to be executed by the user.
// If you want the user to save the code in a file before executing it, put # filename: <filename> inside the code block as the first line. Don't include multiple code blocks in one response. Do not ask users to copy and paste the result. Instead, use 'print' function for the output when relevant. Check the execution result returned by the user.
// If the result indicates there is an error, fix the error and output the code again. Suggest the full code instead of partial code or code changes. If the error can't be fixed or if the task is not solved even after the code is executed successfully, analyze the problem, revisit your assumption, collect additional info you need, and think of a different approach to try.
// When you find an answer, verify the answer carefully. Include verifiable evidence in your response if possible.
// Reply "TERMINATE" when everything is done.`,
...config,
};
}
/**
* Get the specific channel configuration.
*
* @param channel The name of the channel.
* @throws When the channel configuration is not found.
* @returns The channel configuration.
*/
getChannelConfig(channel = "") {
const config = this.channels.get(channel);
if (!config) {
throw new Error(`Channel configuration "${channel}" not found`);
}
return {
maxRounds: 10,
role: "",
...config,
};
}
/**
* Get the members of a group.
* @throws When the group is not defined as an array in the connections.
* @param node The name of the group.
* @returns The members of the group.
*/
getGroupMembers(node = "") {
const group = this.getChannelConfig(node);
return group.members;
}
/**
* Triggered when a plugin, socket, or command is aborted.
*
* @param listener
* @returns
*/
onAbort(listener = () => null) {
this.emitter.on("abort", listener);
return this;
}
/**
* Abort the running of any plugins that may still be pending (Langchain summarize)
*/
abort() {
this.emitter.emit("abort", null, this);
}
/**
* Triggered when a chat is terminated. After this, the chat can't be continued.
*
* @param listener
* @returns
*/
onTerminate(listener = () => null) {
this.emitter.on("terminate", listener);
return this;
}
/**
* Terminate the chat. After this, the chat can't be continued.
*
* @param node Last node to chat with
*/
terminate(node = "") {
this.emitter.emit("terminate", node, this);
}
/**
* Triggered when a chat is interrupted by a node.
*
* @param listener
* @returns
*/
onInterrupt(listener = () => null) {
this.emitter.on("interrupt", listener);
return this;
}
/**
* Interruption the chat.
*
* @param route The nodes that participated in the interruption.
* @returns
*/
interrupt(route) {
this._chats.push({
...route,
state: "interrupt",
});
this.emitter.emit("interrupt", route, this);
}
/**
* Triggered when a message is added to the chat history.
* This can either be the first message or a reply to a message.
*
* @param listener
* @returns
*/
onMessage(listener = (chat) => null) {
this.emitter.on("message", listener);
return this;
}
/**
* Register a new successful message in the chat history.
* This will trigger the `onMessage` event.
*
* @param message
*/
newMessage(message) {
const chat = {
...message,
state: "success",
};
this._chats.push(chat);
this.emitter.emit("message", chat, this);
}
/**
* Triggered when an error occurs during the chat.
*
* @param listener
* @returns
*/
onError(
listener = (
/**
* The error that occurred.
*
* Native errors are:
* - `APIError`
* - `AuthorizationError`
* - `UnknownError`
* - `RateLimitError`
* - `ServerError`
*/
error = null,
/**
* The message when the error occurred.
*/
{}
) => null
) {
this.emitter.on("replyError", listener);
return this;
}
/**
* Register an error in the chat history.
* This will trigger the `onError` event.
*
* @param route
* @param error
*/
newError(route, error) {
const chat = {
...route,
content: error instanceof Error ? error.message : String(error),
state: "error",
};
this._chats.push(chat);
this.emitter.emit("replyError", error, chat);
}
/**
* Triggered when a chat is interrupted by a node.
*
* @param listener
* @returns
*/
onStart(listener = (chat, aibitat) => null) {
this.emitter.on("start", listener);
return this;
}
/**
* Start a new chat.
*
* @param message The message to start the chat.
*/
async start(message) {
// register the message in the chat history
this.newMessage(message);
this.emitter.emit("start", message, this);
// ask the node to reply
await this.chat({
to: message.from,
from: message.to,
});
return this;
}
/**
* Recursively chat between two nodes.
*
* @param route
* @param keepAlive Whether to keep the chat alive.
*/
async chat(route, keepAlive = true) {
// check if the message is for a group
// if it is, select the next node to chat with from the group
// and then ask them to reply.
if (this.channels.get(route.from)) {
// select a node from the group
let nextNode;
try {
nextNode = await this.selectNext(route.from);
} catch (error) {
if (error instanceof APIError) {
return this.newError({ from: route.from, to: route.to }, error);
}
throw error;
}
if (!nextNode) {
// TODO: should it throw an error or keep the chat alive when there is no node to chat with in the group?
// maybe it should wrap up the chat and reply to the original node
// For now, it will terminate the chat
this.terminate(route.from);
return;
}
const nextChat = {
from: nextNode,
to: route.from,
};
if (this.shouldAgentInterrupt(nextNode)) {
this.interrupt(nextChat);
return;
}
// get chats only from the group's nodes
const history = this.getHistory({ to: route.from });
const group = this.getGroupMembers(route.from);
const rounds = history.filter((chat) => group.includes(chat.from)).length;
const { maxRounds } = this.getChannelConfig(route.from);
if (rounds >= maxRounds) {
this.terminate(route.to);
return;
}
await this.chat(nextChat);
return;
}
// If it's a direct message, reply to the message
let reply = "";
try {
reply = await this.reply(route);
} catch (error) {
if (error instanceof APIError) {
return this.newError({ from: route.from, to: route.to }, error);
}
throw error;
}
if (
reply === "TERMINATE" ||
this.hasReachedMaximumRounds(route.from, route.to)
) {
this.terminate(route.to);
return;
}
const newChat = { to: route.from, from: route.to };
if (
reply === "INTERRUPT" ||
(this.agents.get(route.to) && this.shouldAgentInterrupt(route.to))
) {
this.interrupt(newChat);
return;
}
if (keepAlive) {
// keep the chat alive by replying to the other node
await this.chat(newChat, true);
}
}
/**
* Check if the agent should interrupt the chat based on its configuration.
*
* @param agent
* @returns {boolean} Whether the agent should interrupt the chat.
*/
shouldAgentInterrupt(agent = "") {
const config = this.getAgentConfig(agent);
return this.defaultInterrupt === "ALWAYS" || config.interrupt === "ALWAYS";
}
/**
* Select the next node to chat with from a group. The node will be selected based on the history of chats.
* It will select the node that has not reached the maximum number of rounds yet and has not chatted with the channel in the last round.
* If it could not determine the next node, it will return a random node.
*
* @param channel The name of the group.
* @returns The name of the node to chat with.
*/
async selectNext(channel = "") {
// get all members of the group
const nodes = this.getGroupMembers(channel);
const channelConfig = this.getChannelConfig(channel);
// TODO: move this to when the group is created
// warn if the group is underpopulated
if (nodes.length < 3) {
console.warn(
`- Group (${channel}) is underpopulated with ${nodes.length} agents. Direct communication would be more efficient.`
);
}
// get the nodes that have not reached the maximum number of rounds
const availableNodes = nodes.filter(
(node) => !this.hasReachedMaximumRounds(channel, node)
);
// remove the last node that chatted with the channel, so it doesn't chat again
const lastChat = this._chats.filter((c) => c.to === channel).at(-1);
if (lastChat) {
const index = availableNodes.indexOf(lastChat.from);
if (index > -1) {
availableNodes.splice(index, 1);
}
}
// TODO: what should it do when there is no node to chat with?
if (!availableNodes.length) return;
// get the provider that will be used for the channel
// if the channel has a provider, use that otherwise
// use the GPT-4 because it has a better reasoning
const provider = this.getProviderForConfig({
// @ts-expect-error
model: "gpt-4",
...this.defaultProvider,
...channelConfig,
});
const history = this.getHistory({ to: channel });
// build the messages to send to the provider
const messages = [
{
role: "system",
content: channelConfig.role,
},
{
role: "user",
content: `You are in a role play game. The following roles are available:
${availableNodes
.map((node) => `@${node}: ${this.getAgentConfig(node).role}`)
.join("\n")}.
Read the following conversation.
CHAT HISTORY
${history.map((c) => `@${c.from}: ${c.content}`).join("\n")}
Then select the next role from that is going to speak next.
Only return the role.
`,
},
];
// ask the provider to select the next node to chat with
// and remove the @ from the response
const { result } = await provider.complete(messages);
const name = result?.replace(/^@/g, "");
if (this.agents.get(name)) {
return name;
}
// if the name is not in the nodes, return a random node
return availableNodes[Math.floor(Math.random() * availableNodes.length)];
}
/**
*
* @param {string} pluginName this name of the plugin being called
* @returns string of the plugin to be called compensating for children denoted by # in the string.
* eg: sql-agent:list-database-connections
* or is a custom plugin
* eg: @@custom-plugin-name
*/
#parseFunctionName(pluginName = "") {
if (!pluginName.includes("#") && !pluginName.startsWith("@@"))
return pluginName;
if (pluginName.startsWith("@@")) return pluginName.replace("@@", "");
return pluginName.split("#")[1];
}
/**
* Check if the chat has reached the maximum number of rounds.
*/
hasReachedMaximumRounds(from = "", to = "") {
return this.getHistory({ from, to }).length >= this.maxRounds;
}
/**
* Ask the for the AI provider to generate a reply to the chat.
*
* @param route.to The node that sent the chat.
* @param route.from The node that will reply to the chat.
*/
async reply(route) {
// get the provider for the node that will reply
const fromConfig = this.getAgentConfig(route.from);
const chatHistory =
// if it is sending message to a group, send the group chat history to the provider
// otherwise, send the chat history between the two nodes
this.channels.get(route.to)
? [
{
role: "user",
content: `You are in a whatsapp group. Read the following conversation and then reply.
Do not add introduction or conclusion to your reply because this will be a continuous conversation. Don't introduce yourself.
CHAT HISTORY
${this.getHistory({ to: route.to })
.map((c) => `@${c.from}: ${c.content}`)
.join("\n")}
@${route.from}:`,
},
]
: this.getHistory(route).map((c) => ({
content: c.content,
role: c.from === route.to ? "user" : "assistant",
}));
// build the messages to send to the provider
const messages = [
{
content: fromConfig.role,
role: "system",
},
// get the history of chats between the two nodes
...chatHistory,
];
// get the functions that the node can call
const functions = fromConfig.functions
?.map((name) => this.functions.get(this.#parseFunctionName(name)))
.filter((a) => !!a);
const provider = this.getProviderForConfig({
...this.defaultProvider,
...fromConfig,
});
// get the chat completion
const content = await this.handleExecution(
provider,
messages,
functions,
route.from
);
this.newMessage({ ...route, content });
return content;
}
async handleExecution(
provider,
messages = [],
functions = [],
byAgent = null
) {
// get the chat completion
const completion = await provider.complete(messages, functions);
if (completion.functionCall) {
const { name, arguments: args } = completion.functionCall;
const fn = this.functions.get(name);
// if provider hallucinated on the function name
// ask the provider to complete again
if (!fn) {
return await this.handleExecution(
provider,
[
...messages,
{
name,
role: "function",
content: `Function "${name}" not found. Try again.`,
},
],
functions,
byAgent
);
}
// Execute the function and return the result to the provider
fn.caller = byAgent || "agent";
// For OSS LLMs we really need to keep tabs on what they are calling
// so we can log it here.
if (provider?.verbose) {
this?.introspect?.(
`[debug]: ${fn.caller} is attempting to call \`${name}\` tool`
);
this.handlerProps.log(
`[debug]: ${fn.caller} is attempting to call \`${name}\` tool`
);
}
const result = await fn.handler(args);
Telemetry.sendTelemetry("agent_tool_call", { tool: name }, null, true);
return await this.handleExecution(
provider,
[
...messages,
{
name,
role: "function",
content: result,
},
],
functions,
byAgent
);
}
return completion?.result;
}
/**
* Continue the chat from the last interruption.
* If the last chat was not an interruption, it will throw an error.
* Provide a feedback where it was interrupted if you want to.
*
* @param feedback The feedback to the interruption if any.
* @returns
*/
async continue(feedback) {
const lastChat = this._chats.at(-1);
if (!lastChat || lastChat.state !== "interrupt") {
throw new Error("No chat to continue");
}
// remove the last chat's that was interrupted
this._chats.pop();
const { from, to } = lastChat;
if (this.hasReachedMaximumRounds(from, to)) {
throw new Error("Maximum rounds reached");
}
if (feedback) {
const message = {
from,
to,
content: feedback,
};
// register the message in the chat history
this.newMessage(message);
// ask the node to reply
await this.chat({
to: message.from,
from: message.to,
});
} else {
await this.chat({ from, to });
}
return this;
}
/**
* Retry the last chat that threw an error.
* If the last chat was not an error, it will throw an error.
*/
async retry() {
const lastChat = this._chats.at(-1);
if (!lastChat || lastChat.state !== "error") {
throw new Error("No chat to retry");
}
// remove the last chat's that threw an error
const { from, to } = this?._chats?.pop();
await this.chat({ from, to });
return this;
}
/**
* Get the chat history between two nodes or all chats to/from a node.
*/
getHistory({ from, to }) {
return this._chats.filter((chat) => {
const isSuccess = chat.state === "success";
// return all chats to the node
if (!from) {
return isSuccess && chat.to === to;
}
// get all chats from the node
if (!to) {
return isSuccess && chat.from === from;
}
// check if the chat is between the two nodes
const hasSent = chat.from === from && chat.to === to;
const hasReceived = chat.from === to && chat.to === from;
const mutual = hasSent || hasReceived;
return isSuccess && mutual;
});
}
/**
* Get provider based on configurations.
* If the provider is a string, it will return the default provider for that string.
*
* @param config The provider configuration.
*/
getProviderForConfig(config) {
if (typeof config.provider === "object") {
return config.provider;
}
switch (config.provider) {
case "openai":
return new Providers.OpenAIProvider({ model: config.model });
case "anthropic":
return new Providers.AnthropicProvider({ model: config.model });
case "lmstudio":
return new Providers.LMStudioProvider({});
case "ollama":
return new Providers.OllamaProvider({ model: config.model });
case "groq":
return new Providers.GroqProvider({ model: config.model });
case "togetherai":
return new Providers.TogetherAIProvider({ model: config.model });
case "azure":
return new Providers.AzureOpenAiProvider({ model: config.model });
case "koboldcpp":
return new Providers.KoboldCPPProvider({});
case "localai":
return new Providers.LocalAIProvider({ model: config.model });
case "openrouter":
return new Providers.OpenRouterProvider({ model: config.model });
case "mistral":
return new Providers.MistralProvider({ model: config.model });
case "generic-openai":
return new Providers.GenericOpenAiProvider({ model: config.model });
case "perplexity":
return new Providers.PerplexityProvider({ model: config.model });
case "textgenwebui":
return new Providers.TextWebGenUiProvider({});
case "bedrock":
return new Providers.AWSBedrockProvider({});
case "fireworksai":
return new Providers.FireworksAIProvider({ model: config.model });
case "deepseek":
return new Providers.DeepSeekProvider({ model: config.model });
case "apipie":
return new Providers.ApiPieProvider({ model: config.model });
default:
throw new Error(
`Unknown provider: ${config.provider}. Please use "openai"`
);
}
}
/**
* Register a new function to be called by the AIbitat agents.
* You are also required to specify the which node can call the function.
* @param functionConfig The function configuration.
*/
function(functionConfig) {
this.functions.set(functionConfig.name, functionConfig);
return this;
}
}
module.exports = AIbitat;