Enabled use of @agent (and skills) via dev API calls (#2161)

* Use `@agent` via dev API

* Move EphemeralEventListener to same file as agent
This commit is contained in:
Timothy Carambat 2024-08-22 13:12:09 -07:00 committed by GitHub
parent fdc3add53c
commit 2de9e492ec
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 595 additions and 35 deletions

View File

@ -0,0 +1,87 @@
const chalk = require("chalk");
const { RetryError } = require("../error");
const { Telemetry } = require("../../../../models/telemetry");
/**
* HTTP Interface plugin for Aibitat to emulate a websocket interface in the agent
* framework so we dont have to modify the interface for passing messages and responses
* in REST or WSS.
*/
const httpSocket = {
name: "httpSocket",
startupConfig: {
params: {
handler: {
required: true,
},
muteUserReply: {
required: false,
default: true,
},
introspection: {
required: false,
default: true,
},
},
},
plugin: function ({
handler,
muteUserReply = true, // Do not post messages to "USER" back to frontend.
introspection = false, // when enabled will attach socket to Aibitat object with .introspect method which reports status updates to frontend.
}) {
return {
name: this.name,
setup(aibitat) {
aibitat.onError(async (error) => {
if (!!error?.message) {
console.error(chalk.red(` error: ${error.message}`), error);
aibitat.introspect(
`Error encountered while running: ${error.message}`
);
}
if (error instanceof RetryError) {
console.error(chalk.red(` retrying in 60 seconds...`));
setTimeout(() => {
aibitat.retry();
}, 60_000);
return;
}
});
aibitat.introspect = (messageText) => {
if (!introspection) return; // Dump thoughts when not wanted.
handler.send(
JSON.stringify({ type: "statusResponse", content: messageText })
);
};
// expose function for sockets across aibitat
// type param must be set or else msg will not be shown or handled in UI.
aibitat.socket = {
send: (type = "__unhandled", content = "") => {
handler.send(JSON.stringify({ type, content }));
},
};
// We can only receive one message response with HTTP
// so we end on first response.
aibitat.onMessage((message) => {
if (message.from !== "USER")
Telemetry.sendTelemetry("agent_chat_sent");
if (message.from === "USER" && muteUserReply) return;
handler.send(JSON.stringify(message));
handler.close();
});
aibitat.onTerminate(() => {
handler.close();
});
},
};
},
};
module.exports = {
httpSocket,
};

View File

@ -22,36 +22,48 @@ const WORKSPACE_AGENT = {
AgentPlugins.webScraping.name, // Collector web-scraping
];
const _setting = (
await SystemSettings.get({ label: "default_agent_skills" })
)?.value;
safeJsonParse(_setting, []).forEach((skillName) => {
if (!AgentPlugins.hasOwnProperty(skillName)) return;
// This is a plugin module with many sub-children plugins who
// need to be named via `${parent}#${child}` naming convention
if (Array.isArray(AgentPlugins[skillName].plugin)) {
for (const subPlugin of AgentPlugins[skillName].plugin) {
defaultFunctions.push(
`${AgentPlugins[skillName].name}#${subPlugin.name}`
);
}
return;
}
// This is normal single-stage plugin
defaultFunctions.push(AgentPlugins[skillName].name);
});
return {
role: Provider.systemPrompt(provider),
functions: defaultFunctions,
functions: [
...defaultFunctions,
...(await agentSkillsFromSystemSettings()),
],
};
},
};
/**
* Fetches and preloads the names/identifiers for plugins that will be dynamically
* loaded later
* @returns {Promise<string[]>}
*/
async function agentSkillsFromSystemSettings() {
const systemFunctions = [];
const _setting = (await SystemSettings.get({ label: "default_agent_skills" }))
?.value;
safeJsonParse(_setting, []).forEach((skillName) => {
if (!AgentPlugins.hasOwnProperty(skillName)) return;
// This is a plugin module with many sub-children plugins who
// need to be named via `${parent}#${child}` naming convention
if (Array.isArray(AgentPlugins[skillName].plugin)) {
for (const subPlugin of AgentPlugins[skillName].plugin) {
systemFunctions.push(
`${AgentPlugins[skillName].name}#${subPlugin.name}`
);
}
return;
}
// This is normal single-stage plugin
systemFunctions.push(AgentPlugins[skillName].name);
});
return systemFunctions;
}
module.exports = {
USER_AGENT,
WORKSPACE_AGENT,
agentSkillsFromSystemSettings,
};

View File

@ -0,0 +1,351 @@
const AIbitat = require("./aibitat");
const AgentPlugins = require("./aibitat/plugins");
const { httpSocket } = require("./aibitat/plugins/http-socket.js");
const { WorkspaceChats } = require("../../models/workspaceChats");
const { safeJsonParse } = require("../http");
const {
USER_AGENT,
WORKSPACE_AGENT,
agentSkillsFromSystemSettings,
} = require("./defaults");
const { AgentHandler } = require(".");
const {
WorkspaceAgentInvocation,
} = require("../../models/workspaceAgentInvocation");
/**
* This is an instance and functional Agent handler, but it does not utilize
* sessions or websocket's and is instead a singular one-off agent run that does
* not persist between invocations
*/
class EphemeralAgentHandler extends AgentHandler {
#invocationUUID = null;
#workspace = null;
#userId = null;
#threadId = null;
#sessionId = null;
#prompt = null;
#funcsToLoad = [];
aibitat = null;
channel = null;
provider = null;
model = null;
constructor({
uuid,
workspace,
prompt,
userId = null,
threadId = null,
sessionId = null,
}) {
super({ uuid });
this.#invocationUUID = uuid;
this.#workspace = workspace;
this.#prompt = prompt;
this.#userId = userId;
this.#threadId = threadId;
this.#sessionId = sessionId;
}
log(text, ...args) {
console.log(`\x1b[36m[EphemeralAgentHandler]\x1b[0m ${text}`, ...args);
}
closeAlert() {
this.log(`End ${this.#invocationUUID}::${this.provider}:${this.model}`);
}
async #chatHistory(limit = 10) {
try {
const rawHistory = (
await WorkspaceChats.where(
{
workspaceId: this.#workspace.id,
user_id: this.#userId || null,
thread_id: this.#threadId || null,
api_session_id: this.#sessionId,
include: true,
},
limit,
{ id: "desc" }
)
).reverse();
const agentHistory = [];
rawHistory.forEach((chatLog) => {
agentHistory.push(
{
from: USER_AGENT.name,
to: WORKSPACE_AGENT.name,
content: chatLog.prompt,
state: "success",
},
{
from: WORKSPACE_AGENT.name,
to: USER_AGENT.name,
content: safeJsonParse(chatLog.response)?.text || "",
state: "success",
}
);
});
return agentHistory;
} catch (e) {
this.log("Error loading chat history", e.message);
return [];
}
}
/**
* Finds or assumes the model preference value to use for API calls.
* If multi-model loading is supported, we use their agent model selection of the workspace
* If not supported, we attempt to fallback to the system provider value for the LLM preference
* and if that fails - we assume a reasonable base model to exist.
* @returns {string} the model preference value to use in API calls
*/
#fetchModel() {
if (!Object.keys(this.noProviderModelDefault).includes(this.provider))
return this.#workspace.agentModel || this.providerDefault();
// Provider has no reliable default (cant load many models) - so we need to look at system
// for the model param.
const sysModelKey = this.noProviderModelDefault[this.provider];
if (!!sysModelKey)
return process.env[sysModelKey] ?? this.providerDefault();
// If all else fails - look at the provider default list
return this.providerDefault();
}
#providerSetupAndCheck() {
this.provider = this.#workspace.agentProvider;
this.model = this.#fetchModel();
this.log(`Start ${this.#invocationUUID}::${this.provider}:${this.model}`);
this.checkSetup();
}
#attachPlugins(args) {
for (const name of this.#funcsToLoad) {
// Load child plugin
if (name.includes("#")) {
const [parent, childPluginName] = name.split("#");
if (!AgentPlugins.hasOwnProperty(parent)) {
this.log(
`${parent} is not a valid plugin. Skipping inclusion to agent cluster.`
);
continue;
}
const childPlugin = AgentPlugins[parent].plugin.find(
(child) => child.name === childPluginName
);
if (!childPlugin) {
this.log(
`${parent} does not have child plugin named ${childPluginName}. Skipping inclusion to agent cluster.`
);
continue;
}
const callOpts = this.parseCallOptions(
args,
childPlugin?.startupConfig?.params,
name
);
this.aibitat.use(childPlugin.plugin(callOpts));
this.log(
`Attached ${parent}:${childPluginName} plugin to Agent cluster`
);
continue;
}
// Load single-stage plugin.
if (!AgentPlugins.hasOwnProperty(name)) {
this.log(
`${name} is not a valid plugin. Skipping inclusion to agent cluster.`
);
continue;
}
const callOpts = this.parseCallOptions(
args,
AgentPlugins[name].startupConfig.params
);
const AIbitatPlugin = AgentPlugins[name];
this.aibitat.use(AIbitatPlugin.plugin(callOpts));
this.log(`Attached ${name} plugin to Agent cluster`);
}
}
async #loadAgents() {
// Default User agent and workspace agent
this.log(`Attaching user and default agent to Agent cluster.`);
this.aibitat.agent(USER_AGENT.name, await USER_AGENT.getDefinition());
this.aibitat.agent(
WORKSPACE_AGENT.name,
await WORKSPACE_AGENT.getDefinition(this.provider)
);
this.#funcsToLoad = [
AgentPlugins.docSummarizer.name,
AgentPlugins.webScraping.name,
...(await agentSkillsFromSystemSettings()),
];
}
async init() {
this.#providerSetupAndCheck();
return this;
}
async createAIbitat(
args = {
handler,
}
) {
this.aibitat = new AIbitat({
provider: this.provider ?? "openai",
model: this.model ?? "gpt-4o",
chats: await this.#chatHistory(20),
handlerProps: {
log: this.log,
},
});
// Attach HTTP response object if defined for chunk streaming.
this.log(`Attached ${httpSocket.name} plugin to Agent cluster`);
this.aibitat.use(
httpSocket.plugin({
handler: args.handler,
muteUserReply: true,
introspection: true,
})
);
// Load required agents (Default + custom)
await this.#loadAgents();
// Attach all required plugins for functions to operate.
this.#attachPlugins(args);
}
startAgentCluster() {
return this.aibitat.start({
from: USER_AGENT.name,
to: this.channel ?? WORKSPACE_AGENT.name,
content: this.#prompt,
});
}
/**
* Determine if the message provided is an agent invocation.
* @param {{message:string}} parameters
* @returns {boolean}
*/
static isAgentInvocation({ message }) {
const agentHandles = WorkspaceAgentInvocation.parseAgents(message);
if (agentHandles.length > 0) return true;
return false;
}
}
const EventEmitter = require("node:events");
const { writeResponseChunk } = require("../helpers/chat/responses");
/**
* This is a special EventEmitter specifically used in the Aibitat agent handler
* that enables us to use HTTP to relay all .introspect and .send events back to an
* http handler instead of websockets, like we do on the frontend. This interface is meant to
* mock a websocket interface for the methods used and bind them to an HTTP method so that the developer
* API can invoke agent calls.
*/
class EphemeralEventListener extends EventEmitter {
messages = [];
constructor() {
super();
}
send(jsonData) {
const data = JSON.parse(jsonData);
this.messages.push(data);
this.emit("chunk", data);
}
close() {
this.emit("closed");
}
/**
* Compacts all messages in class and returns them in a condensed format.
* @returns {{thoughts: string[], textResponse: string}}
*/
packMessages() {
const thoughts = [];
let textResponse = null;
for (let msg of this.messages) {
if (msg.type !== "statusResponse") {
textResponse = msg.content;
} else {
thoughts.push(msg.content);
}
}
return { thoughts, textResponse };
}
/**
* Waits on the HTTP plugin to emit the 'closed' event from the agentHandler
* so that we can compact and return all the messages in the current queue.
* @returns {Promise<{thoughts: string[], textResponse: string}>}
*/
async waitForClose() {
return new Promise((resolve) => {
this.once("closed", () => resolve(this.packMessages()));
});
}
/**
* Streams the events with `writeResponseChunk` over HTTP chunked encoding
* and returns on the close event emission.
* ----------
* DevNote: Agents do not stream so in here we are simply
* emitting the thoughts and text response as soon as we get them.
* @param {import("express").Response} response
* @param {string} uuid - Unique identifier that is the same across chunks.
* @returns {Promise<{thoughts: string[], textResponse: string}>}
*/
async streamAgentEvents(response, uuid) {
const onChunkHandler = (data) => {
if (data.type === "statusResponse") {
return writeResponseChunk(response, {
id: uuid,
type: "agentThought",
thought: data.content,
sources: [],
attachments: [],
close: false,
error: null,
});
}
return writeResponseChunk(response, {
id: uuid,
type: "textResponse",
textResponse: data.content,
sources: [],
attachments: [],
close: true,
error: null,
});
};
this.on("chunk", onChunkHandler);
// Wait for close and after remove chunk listener
return this.waitForClose().then((closedResponse) => {
this.removeListener("chunk", onChunkHandler);
return closedResponse;
});
}
}
module.exports = { EphemeralAgentHandler, EphemeralEventListener };

View File

@ -10,7 +10,7 @@ const { USER_AGENT, WORKSPACE_AGENT } = require("./defaults");
class AgentHandler {
#invocationUUID;
#funcsToLoad = [];
#noProviderModelDefault = {
noProviderModelDefault = {
azure: "OPEN_MODEL_PREF",
lmstudio: "LMSTUDIO_MODEL_PREF",
textgenwebui: null, // does not even use `model` in API req
@ -74,7 +74,7 @@ class AgentHandler {
}
}
#checkSetup() {
checkSetup() {
switch (this.provider) {
case "openai":
if (!process.env.OPEN_AI_KEY)
@ -163,7 +163,7 @@ class AgentHandler {
}
}
#providerDefault() {
providerDefault() {
switch (this.provider) {
case "openai":
return "gpt-4o";
@ -210,24 +210,24 @@ class AgentHandler {
* @returns {string} the model preference value to use in API calls
*/
#fetchModel() {
if (!Object.keys(this.#noProviderModelDefault).includes(this.provider))
return this.invocation.workspace.agentModel || this.#providerDefault();
if (!Object.keys(this.noProviderModelDefault).includes(this.provider))
return this.invocation.workspace.agentModel || this.providerDefault();
// Provider has no reliable default (cant load many models) - so we need to look at system
// for the model param.
const sysModelKey = this.#noProviderModelDefault[this.provider];
const sysModelKey = this.noProviderModelDefault[this.provider];
if (!!sysModelKey)
return process.env[sysModelKey] ?? this.#providerDefault();
return process.env[sysModelKey] ?? this.providerDefault();
// If all else fails - look at the provider default list
return this.#providerDefault();
return this.providerDefault();
}
#providerSetupAndCheck() {
this.provider = this.invocation.workspace.agentProvider;
this.model = this.#fetchModel();
this.log(`Start ${this.#invocationUUID}::${this.provider}:${this.model}`);
this.#checkSetup();
this.checkSetup();
}
async #validInvocation() {
@ -239,7 +239,7 @@ class AgentHandler {
this.invocation = invocation ?? null;
}
#parseCallOptions(args, config = {}, pluginName) {
parseCallOptions(args, config = {}, pluginName) {
const callOpts = {};
for (const [param, definition] of Object.entries(config)) {
if (
@ -280,7 +280,7 @@ class AgentHandler {
continue;
}
const callOpts = this.#parseCallOptions(
const callOpts = this.parseCallOptions(
args,
childPlugin?.startupConfig?.params,
name
@ -300,7 +300,7 @@ class AgentHandler {
continue;
}
const callOpts = this.#parseCallOptions(
const callOpts = this.parseCallOptions(
args,
AgentPlugins[name].startupConfig.params
);

View File

@ -4,6 +4,11 @@ const { WorkspaceChats } = require("../../models/workspaceChats");
const { getVectorDbClass, getLLMProvider } = require("../helpers");
const { writeResponseChunk } = require("../helpers/chat/responses");
const { chatPrompt, sourceIdentifier, recentChatHistory } = require("./index");
const {
EphemeralAgentHandler,
EphemeralEventListener,
} = require("../agents/ephemeral");
const { Telemetry } = require("../../models/telemetry");
/**
* @typedef ResponseObject
@ -37,6 +42,59 @@ async function chatSync({
}) {
const uuid = uuidv4();
const chatMode = mode ?? "chat";
if (EphemeralAgentHandler.isAgentInvocation({ message })) {
await Telemetry.sendTelemetry("agent_chat_started");
// Initialize the EphemeralAgentHandler to handle non-continuous
// conversations with agents since this is over REST.
const agentHandler = new EphemeralAgentHandler({
uuid,
workspace,
prompt: message,
userId: user?.id || null,
threadId: thread?.id || null,
sessionId,
});
// Establish event listener that emulates websocket calls
// in Aibitat so that we can keep the same interface in Aibitat
// but use HTTP.
const eventListener = new EphemeralEventListener();
await agentHandler.init();
await agentHandler.createAIbitat({ handler: eventListener });
agentHandler.startAgentCluster();
// The cluster has started and now we wait for close event since
// this is a synchronous call for an agent, so we return everything at once.
// After this, we conclude the call as we normally do.
return await eventListener
.waitForClose()
.then(async ({ thoughts, textResponse }) => {
await WorkspaceChats.new({
workspaceId: workspace.id,
prompt: String(message),
response: {
text: textResponse,
sources: [],
type: chatMode,
thoughts,
},
include: false,
apiSessionId: sessionId,
});
return {
id: uuid,
type: "textResponse",
sources: [],
close: true,
error: null,
textResponse,
thoughts,
};
});
}
const LLMConnector = getLLMProvider({
provider: workspace?.chatProvider,
model: workspace?.chatModel,
@ -257,6 +315,58 @@ async function streamChat({
}) {
const uuid = uuidv4();
const chatMode = mode ?? "chat";
if (EphemeralAgentHandler.isAgentInvocation({ message })) {
await Telemetry.sendTelemetry("agent_chat_started");
// Initialize the EphemeralAgentHandler to handle non-continuous
// conversations with agents since this is over REST.
const agentHandler = new EphemeralAgentHandler({
uuid,
workspace,
prompt: message,
userId: user?.id || null,
threadId: thread?.id || null,
sessionId,
});
// Establish event listener that emulates websocket calls
// in Aibitat so that we can keep the same interface in Aibitat
// but use HTTP.
const eventListener = new EphemeralEventListener();
await agentHandler.init();
await agentHandler.createAIbitat({ handler: eventListener });
agentHandler.startAgentCluster();
// The cluster has started and now we wait for close event since
// and stream back any results we get from agents as they come in.
return eventListener
.streamAgentEvents(response, uuid)
.then(async ({ thoughts, textResponse }) => {
console.log({ thoughts, textResponse });
await WorkspaceChats.new({
workspaceId: workspace.id,
prompt: String(message),
response: {
text: textResponse,
sources: [],
type: chatMode,
thoughts,
},
include: false,
apiSessionId: sessionId,
});
writeResponseChunk(response, {
uuid,
type: "finalizeResponseStream",
textResponse,
thoughts,
close: true,
error: false,
});
});
}
const LLMConnector = getLLMProvider({
provider: workspace?.chatProvider,
model: workspace?.chatModel,