anything-llm/server/models/documents.js
Timothy Carambat dc4ad6b5a9
[BETA] Live document sync (#1719)
* wip bg workers for live document sync

* Add ability to re-embed specific documents across many workspaces via background queue
bgworkser is gated behind expieremental system setting flag that needs to be explictly enabled
UI for watching/unwatching docments that are embedded.
TODO: UI to easily manage all bg tasks and see run results
TODO: UI to enable this feature and background endpoints to manage it

* create frontend views and paths
Move elements to correct experimental scope

* update migration to delete runs on removal of watched document

* Add watch support to YouTube transcripts (#1716)

* Add watch support to YouTube transcripts
refactor how sync is done for supported types

* Watch specific files in Confluence space (#1718)

Add failure-prune check for runs

* create tmp workflow modifications for beta image

* create tmp workflow modifications for beta image

* create tmp workflow modifications for beta image

* dual build
update copy of alert modals

* update job interval

* Add support for live-sync of Github files

* update copy for document sync feature

* hide Experimental features from UI

* update docs links

* [FEAT] Implement new settings menu for experimental features (#1735)

* implement new settings menu for experimental features

* remove unused context save bar

---------

Co-authored-by: timothycarambat <rambat1010@gmail.com>

* dont run job on boot

* unset workflow changes

* Add persistent encryption service
Relay key to collector so persistent encryption can be used
Encrypt any private data in chunkSources used for replay during resync jobs

* update jsDOC

* Linting and organization

* update modal copy for feature

---------

Co-authored-by: Sean Hatfield <seanhatfield5@gmail.com>
2024-06-21 13:38:50 -07:00

272 lines
7.8 KiB
JavaScript

const { v4: uuidv4 } = require("uuid");
const { getVectorDbClass } = require("../utils/helpers");
const prisma = require("../utils/prisma");
const { Telemetry } = require("./telemetry");
const { EventLogs } = require("./eventLogs");
const { safeJsonParse } = require("../utils/http");
const Document = {
writable: ["pinned", "watched", "lastUpdatedAt"],
/**
* @param {import("@prisma/client").workspace_documents} document - Document PrismaRecord
* @returns {{
* metadata: (null|object),
* type: import("./documentSyncQueue.js").validFileType,
* source: string
* }}
*/
parseDocumentTypeAndSource: function (document) {
const metadata = safeJsonParse(document.metadata, null);
if (!metadata) return { metadata: null, type: null, source: null };
// Parse the correct type of source and its original source path.
const idx = metadata.chunkSource.indexOf("://");
const [type, source] = [
metadata.chunkSource.slice(0, idx),
metadata.chunkSource.slice(idx + 3),
];
return { metadata, type, source: this._stripSource(source, type) };
},
forWorkspace: async function (workspaceId = null) {
if (!workspaceId) return [];
return await prisma.workspace_documents.findMany({
where: { workspaceId },
});
},
delete: async function (clause = {}) {
try {
await prisma.workspace_documents.deleteMany({ where: clause });
return true;
} catch (error) {
console.error(error.message);
return false;
}
},
get: async function (clause = {}) {
try {
const document = await prisma.workspace_documents.findFirst({
where: clause,
});
return document || null;
} catch (error) {
console.error(error.message);
return null;
}
},
getOnlyWorkspaceIds: async function (clause = {}) {
try {
const workspaceIds = await prisma.workspace_documents.findMany({
where: clause,
select: {
workspaceId: true,
},
});
return workspaceIds.map((record) => record.workspaceId) || [];
} catch (error) {
console.error(error.message);
return [];
}
},
where: async function (
clause = {},
limit = null,
orderBy = null,
include = null
) {
try {
const results = await prisma.workspace_documents.findMany({
where: clause,
...(limit !== null ? { take: limit } : {}),
...(orderBy !== null ? { orderBy } : {}),
...(include !== null ? { include } : {}),
});
return results;
} catch (error) {
console.error(error.message);
return [];
}
},
addDocuments: async function (workspace, additions = [], userId = null) {
const VectorDb = getVectorDbClass();
if (additions.length === 0) return { failed: [], embedded: [] };
const { fileData } = require("../utils/files");
const embedded = [];
const failedToEmbed = [];
const errors = new Set();
for (const path of additions) {
const data = await fileData(path);
if (!data) continue;
const docId = uuidv4();
const { pageContent, ...metadata } = data;
const newDoc = {
docId,
filename: path.split("/")[1],
docpath: path,
workspaceId: workspace.id,
metadata: JSON.stringify(metadata),
};
const { vectorized, error } = await VectorDb.addDocumentToNamespace(
workspace.slug,
{ ...data, docId },
path
);
if (!vectorized) {
console.error(
"Failed to vectorize",
metadata?.title || newDoc.filename
);
failedToEmbed.push(metadata?.title || newDoc.filename);
errors.add(error);
continue;
}
try {
await prisma.workspace_documents.create({ data: newDoc });
embedded.push(path);
} catch (error) {
console.error(error.message);
}
}
await Telemetry.sendTelemetry("documents_embedded_in_workspace", {
LLMSelection: process.env.LLM_PROVIDER || "openai",
Embedder: process.env.EMBEDDING_ENGINE || "inherit",
VectorDbSelection: process.env.VECTOR_DB || "lancedb",
});
await EventLogs.logEvent(
"workspace_documents_added",
{
workspaceName: workspace?.name || "Unknown Workspace",
numberOfDocumentsAdded: additions.length,
},
userId
);
return { failedToEmbed, errors: Array.from(errors), embedded };
},
removeDocuments: async function (workspace, removals = [], userId = null) {
const VectorDb = getVectorDbClass();
if (removals.length === 0) return;
for (const path of removals) {
const document = await this.get({
docpath: path,
workspaceId: workspace.id,
});
if (!document) continue;
await VectorDb.deleteDocumentFromNamespace(
workspace.slug,
document.docId
);
try {
await prisma.workspace_documents.delete({
where: { id: document.id, workspaceId: workspace.id },
});
await prisma.document_vectors.deleteMany({
where: { docId: document.docId },
});
} catch (error) {
console.error(error.message);
}
}
await Telemetry.sendTelemetry("documents_removed_in_workspace", {
LLMSelection: process.env.LLM_PROVIDER || "openai",
Embedder: process.env.EMBEDDING_ENGINE || "inherit",
VectorDbSelection: process.env.VECTOR_DB || "lancedb",
});
await EventLogs.logEvent(
"workspace_documents_removed",
{
workspaceName: workspace?.name || "Unknown Workspace",
numberOfDocuments: removals.length,
},
userId
);
return true;
},
count: async function (clause = {}, limit = null) {
try {
const count = await prisma.workspace_documents.count({
where: clause,
...(limit !== null ? { take: limit } : {}),
});
return count;
} catch (error) {
console.error("FAILED TO COUNT DOCUMENTS.", error.message);
return 0;
}
},
update: async function (id = null, data = {}) {
if (!id) throw new Error("No workspace document id provided for update");
const validKeys = Object.keys(data).filter((key) =>
this.writable.includes(key)
);
if (validKeys.length === 0)
return { document: { id }, message: "No valid fields to update!" };
try {
const document = await prisma.workspace_documents.update({
where: { id },
data,
});
return { document, message: null };
} catch (error) {
console.error(error.message);
return { document: null, message: error.message };
}
},
_updateAll: async function (clause = {}, data = {}) {
try {
await prisma.workspace_documents.updateMany({
where: clause,
data,
});
return true;
} catch (error) {
console.error(error.message);
return false;
}
},
content: async function (docId) {
if (!docId) throw new Error("No workspace docId provided!");
const document = await this.get({ docId: String(docId) });
if (!document) throw new Error(`Could not find a document by id ${docId}`);
const { fileData } = require("../utils/files");
const data = await fileData(document.docpath);
return { title: data.title, content: data.pageContent };
},
contentByDocPath: async function (docPath) {
const { fileData } = require("../utils/files");
const data = await fileData(docPath);
return { title: data.title, content: data.pageContent };
},
// Some data sources have encoded params in them we don't want to log - so strip those details.
_stripSource: function (sourceString, type) {
if (["confluence", "github"].includes(type)) {
const _src = new URL(sourceString);
_src.search = ""; // remove all search params that are encoded for resync.
return _src.toString();
}
return sourceString;
},
};
module.exports = { Document };