anything-llm/server/utils/agents/ephemeral.js

380 lines
11 KiB
JavaScript
Raw Normal View History

const AIbitat = require("./aibitat");
const AgentPlugins = require("./aibitat/plugins");
const ImportedPlugin = require("./imported");
const { httpSocket } = require("./aibitat/plugins/http-socket.js");
const { WorkspaceChats } = require("../../models/workspaceChats");
const { safeJsonParse } = require("../http");
const {
USER_AGENT,
WORKSPACE_AGENT,
agentSkillsFromSystemSettings,
} = require("./defaults");
const { AgentHandler } = require(".");
const {
WorkspaceAgentInvocation,
} = require("../../models/workspaceAgentInvocation");
/**
* This is an instance and functional Agent handler, but it does not utilize
* sessions or websocket's and is instead a singular one-off agent run that does
* not persist between invocations
*/
class EphemeralAgentHandler extends AgentHandler {
#invocationUUID = null;
#workspace = null;
#userId = null;
#threadId = null;
#sessionId = null;
#prompt = null;
#funcsToLoad = [];
aibitat = null;
channel = null;
provider = null;
model = null;
constructor({
uuid,
workspace,
prompt,
userId = null,
threadId = null,
sessionId = null,
}) {
super({ uuid });
this.#invocationUUID = uuid;
this.#workspace = workspace;
this.#prompt = prompt;
this.#userId = userId;
this.#threadId = threadId;
this.#sessionId = sessionId;
}
log(text, ...args) {
console.log(`\x1b[36m[EphemeralAgentHandler]\x1b[0m ${text}`, ...args);
}
closeAlert() {
this.log(`End ${this.#invocationUUID}::${this.provider}:${this.model}`);
}
async #chatHistory(limit = 10) {
try {
const rawHistory = (
await WorkspaceChats.where(
{
workspaceId: this.#workspace.id,
user_id: this.#userId || null,
thread_id: this.#threadId || null,
api_session_id: this.#sessionId,
include: true,
},
limit,
{ id: "desc" }
)
).reverse();
const agentHistory = [];
rawHistory.forEach((chatLog) => {
agentHistory.push(
{
from: USER_AGENT.name,
to: WORKSPACE_AGENT.name,
content: chatLog.prompt,
state: "success",
},
{
from: WORKSPACE_AGENT.name,
to: USER_AGENT.name,
content: safeJsonParse(chatLog.response)?.text || "",
state: "success",
}
);
});
return agentHistory;
} catch (e) {
this.log("Error loading chat history", e.message);
return [];
}
}
/**
* Finds or assumes the model preference value to use for API calls.
* If multi-model loading is supported, we use their agent model selection of the workspace
* If not supported, we attempt to fallback to the system provider value for the LLM preference
* and if that fails - we assume a reasonable base model to exist.
* @returns {string} the model preference value to use in API calls
*/
#fetchModel() {
if (!Object.keys(this.noProviderModelDefault).includes(this.provider))
return this.#workspace.agentModel || this.providerDefault();
// Provider has no reliable default (cant load many models) - so we need to look at system
// for the model param.
const sysModelKey = this.noProviderModelDefault[this.provider];
if (!!sysModelKey)
return process.env[sysModelKey] ?? this.providerDefault();
// If all else fails - look at the provider default list
return this.providerDefault();
}
#providerSetupAndCheck() {
this.provider = this.#workspace.agentProvider;
this.model = this.#fetchModel();
this.log(`Start ${this.#invocationUUID}::${this.provider}:${this.model}`);
this.checkSetup();
}
#attachPlugins(args) {
for (const name of this.#funcsToLoad) {
// Load child plugin
if (name.includes("#")) {
const [parent, childPluginName] = name.split("#");
if (!AgentPlugins.hasOwnProperty(parent)) {
this.log(
`${parent} is not a valid plugin. Skipping inclusion to agent cluster.`
);
continue;
}
const childPlugin = AgentPlugins[parent].plugin.find(
(child) => child.name === childPluginName
);
if (!childPlugin) {
this.log(
`${parent} does not have child plugin named ${childPluginName}. Skipping inclusion to agent cluster.`
);
continue;
}
const callOpts = this.parseCallOptions(
args,
childPlugin?.startupConfig?.params,
name
);
this.aibitat.use(childPlugin.plugin(callOpts));
this.log(
`Attached ${parent}:${childPluginName} plugin to Agent cluster`
);
continue;
}
// Load imported plugin. This is marked by `@@` in the array of functions to load.
// and is the @@hubID of the plugin.
if (name.startsWith("@@")) {
const hubId = name.replace("@@", "");
const valid = ImportedPlugin.validateImportedPluginHandler(hubId);
if (!valid) {
this.log(
`Imported plugin by hubId ${hubId} not found in plugin directory. Skipping inclusion to agent cluster.`
);
continue;
}
const plugin = ImportedPlugin.loadPluginByHubId(hubId);
const callOpts = plugin.parseCallOptions();
this.aibitat.use(plugin.plugin(callOpts));
this.log(
`Attached ${plugin.name} (${hubId}) imported plugin to Agent cluster`
);
continue;
}
// Load single-stage plugin.
if (!AgentPlugins.hasOwnProperty(name)) {
this.log(
`${name} is not a valid plugin. Skipping inclusion to agent cluster.`
);
continue;
}
const callOpts = this.parseCallOptions(
args,
AgentPlugins[name].startupConfig.params
);
const AIbitatPlugin = AgentPlugins[name];
this.aibitat.use(AIbitatPlugin.plugin(callOpts));
this.log(`Attached ${name} plugin to Agent cluster`);
}
}
async #loadAgents() {
// Default User agent and workspace agent
this.log(`Attaching user and default agent to Agent cluster.`);
this.aibitat.agent(USER_AGENT.name, await USER_AGENT.getDefinition());
this.aibitat.agent(
WORKSPACE_AGENT.name,
await WORKSPACE_AGENT.getDefinition(this.provider)
);
this.#funcsToLoad = [
AgentPlugins.memory.name,
AgentPlugins.docSummarizer.name,
AgentPlugins.webScraping.name,
...(await agentSkillsFromSystemSettings()),
...(await ImportedPlugin.activeImportedPlugins()),
];
}
async init() {
this.#providerSetupAndCheck();
return this;
}
async createAIbitat(
args = {
handler,
}
) {
this.aibitat = new AIbitat({
provider: this.provider ?? "openai",
model: this.model ?? "gpt-4o",
chats: await this.#chatHistory(20),
handlerProps: {
invocation: {
workspace: this.#workspace,
workspace_id: this.#workspace.id,
},
log: this.log,
},
});
// Attach HTTP response object if defined for chunk streaming.
this.log(`Attached ${httpSocket.name} plugin to Agent cluster`);
this.aibitat.use(
httpSocket.plugin({
handler: args.handler,
muteUserReply: true,
introspection: true,
})
);
// Load required agents (Default + custom)
await this.#loadAgents();
// Attach all required plugins for functions to operate.
this.#attachPlugins(args);
}
startAgentCluster() {
return this.aibitat.start({
from: USER_AGENT.name,
to: this.channel ?? WORKSPACE_AGENT.name,
content: this.#prompt,
});
}
/**
* Determine if the message provided is an agent invocation.
* @param {{message:string}} parameters
* @returns {boolean}
*/
static isAgentInvocation({ message }) {
const agentHandles = WorkspaceAgentInvocation.parseAgents(message);
if (agentHandles.length > 0) return true;
return false;
}
}
const EventEmitter = require("node:events");
const { writeResponseChunk } = require("../helpers/chat/responses");
/**
* This is a special EventEmitter specifically used in the Aibitat agent handler
* that enables us to use HTTP to relay all .introspect and .send events back to an
* http handler instead of websockets, like we do on the frontend. This interface is meant to
* mock a websocket interface for the methods used and bind them to an HTTP method so that the developer
* API can invoke agent calls.
*/
class EphemeralEventListener extends EventEmitter {
messages = [];
constructor() {
super();
}
send(jsonData) {
const data = JSON.parse(jsonData);
this.messages.push(data);
this.emit("chunk", data);
}
close() {
this.emit("closed");
}
/**
* Compacts all messages in class and returns them in a condensed format.
* @returns {{thoughts: string[], textResponse: string}}
*/
packMessages() {
const thoughts = [];
let textResponse = null;
for (let msg of this.messages) {
if (msg.type !== "statusResponse") {
textResponse = msg.content;
} else {
thoughts.push(msg.content);
}
}
return { thoughts, textResponse };
}
/**
* Waits on the HTTP plugin to emit the 'closed' event from the agentHandler
* so that we can compact and return all the messages in the current queue.
* @returns {Promise<{thoughts: string[], textResponse: string}>}
*/
async waitForClose() {
return new Promise((resolve) => {
this.once("closed", () => resolve(this.packMessages()));
});
}
/**
* Streams the events with `writeResponseChunk` over HTTP chunked encoding
* and returns on the close event emission.
* ----------
* DevNote: Agents do not stream so in here we are simply
* emitting the thoughts and text response as soon as we get them.
* @param {import("express").Response} response
* @param {string} uuid - Unique identifier that is the same across chunks.
* @returns {Promise<{thoughts: string[], textResponse: string}>}
*/
async streamAgentEvents(response, uuid) {
const onChunkHandler = (data) => {
if (data.type === "statusResponse") {
return writeResponseChunk(response, {
id: uuid,
type: "agentThought",
thought: data.content,
sources: [],
attachments: [],
close: false,
error: null,
});
}
return writeResponseChunk(response, {
id: uuid,
type: "textResponse",
textResponse: data.content,
sources: [],
attachments: [],
close: true,
error: null,
});
};
this.on("chunk", onChunkHandler);
// Wait for close and after remove chunk listener
return this.waitForClose().then((closedResponse) => {
this.removeListener("chunk", onChunkHandler);
return closedResponse;
});
}
}
module.exports = { EphemeralAgentHandler, EphemeralEventListener };