anything-llm/server/jobs/sync-watched-documents.js

154 lines
6.7 KiB
JavaScript
Raw Normal View History

[BETA] Live document sync (#1719) * wip bg workers for live document sync * Add ability to re-embed specific documents across many workspaces via background queue bgworkser is gated behind expieremental system setting flag that needs to be explictly enabled UI for watching/unwatching docments that are embedded. TODO: UI to easily manage all bg tasks and see run results TODO: UI to enable this feature and background endpoints to manage it * create frontend views and paths Move elements to correct experimental scope * update migration to delete runs on removal of watched document * Add watch support to YouTube transcripts (#1716) * Add watch support to YouTube transcripts refactor how sync is done for supported types * Watch specific files in Confluence space (#1718) Add failure-prune check for runs * create tmp workflow modifications for beta image * create tmp workflow modifications for beta image * create tmp workflow modifications for beta image * dual build update copy of alert modals * update job interval * Add support for live-sync of Github files * update copy for document sync feature * hide Experimental features from UI * update docs links * [FEAT] Implement new settings menu for experimental features (#1735) * implement new settings menu for experimental features * remove unused context save bar --------- Co-authored-by: timothycarambat <rambat1010@gmail.com> * dont run job on boot * unset workflow changes * Add persistent encryption service Relay key to collector so persistent encryption can be used Encrypt any private data in chunkSources used for replay during resync jobs * update jsDOC * Linting and organization * update modal copy for feature --------- Co-authored-by: Sean Hatfield <seanhatfield5@gmail.com>
2024-06-21 22:38:50 +02:00
const { Document } = require('../models/documents.js');
const { DocumentSyncQueue } = require('../models/documentSyncQueue.js');
const { CollectorApi } = require('../utils/collectorApi');
const { fileData } = require("../utils/files");
const { log, conclude, updateSourceDocument } = require('./helpers/index.js');
const { getVectorDbClass } = require('../utils/helpers/index.js');
const { DocumentSyncRun } = require('../models/documentSyncRun.js');
(async () => {
try {
const queuesToProcess = await DocumentSyncQueue.staleDocumentQueues();
if (queuesToProcess.length === 0) {
log('No outstanding documents to sync. Exiting.');
return;
}
const collector = new CollectorApi();
if (!(await collector.online())) {
log('Could not reach collector API. Exiting.');
return;
}
log(`${queuesToProcess.length} watched documents have been found to be stale and will be updated now.`)
for (const queue of queuesToProcess) {
let newContent = null;
const document = queue.workspaceDoc;
const workspace = document.workspace;
const { metadata, type, source } = Document.parseDocumentTypeAndSource(document);
if (!metadata || !DocumentSyncQueue.validFileTypes.includes(type)) {
// Document is either broken, invalid, or not supported so drop it from future queues.
log(`Document ${document.filename} has no metadata, is broken, or invalid and has been removed from all future runs.`)
await DocumentSyncQueue.unwatch(document);
continue;
}
if (type === 'link' || type === 'youtube') {
const response = await collector.forwardExtensionRequest({
endpoint: "/ext/resync-source-document",
method: "POST",
body: JSON.stringify({
type,
options: { link: source }
})
});
newContent = response?.content;
}
if (type === 'confluence' || type === 'github') {
const response = await collector.forwardExtensionRequest({
endpoint: "/ext/resync-source-document",
method: "POST",
body: JSON.stringify({
type,
options: { chunkSource: metadata.chunkSource }
})
});
newContent = response?.content;
}
if (!newContent) {
// Check if the last "x" runs were all failures (not exits!). If so - remove the job entirely since it is broken.
const failedRunCount = (await DocumentSyncRun.where({ queueId: queue.id }, DocumentSyncQueue.maxRepeatFailures, { createdAt: 'desc' })).filter((run) => run.status === DocumentSyncRun.statuses.failed).length;
if (failedRunCount >= DocumentSyncQueue.maxRepeatFailures) {
log(`Document ${document.filename} has failed to refresh ${failedRunCount} times continuously and will now be removed from the watched document set.`)
await DocumentSyncQueue.unwatch(document);
continue;
}
log(`Failed to get a new content response from collector for source ${source}. Skipping, but will retry next worker interval. Attempt ${failedRunCount === 0 ? 1 : failedRunCount}/${DocumentSyncQueue.maxRepeatFailures}`);
await DocumentSyncQueue.saveRun(queue.id, DocumentSyncRun.statuses.failed, { filename: document.filename, workspacesModified: [], reason: 'No content found.' })
continue;
}
const currentDocumentData = await fileData(document.docpath)
if (currentDocumentData.pageContent === newContent) {
const nextSync = DocumentSyncQueue.calcNextSync(queue)
log(`Source ${source} is unchanged and will be skipped. Next sync will be ${nextSync.toLocaleString()}.`);
await DocumentSyncQueue._update(
queue.id,
{
lastSyncedAt: new Date().toISOString(),
nextSyncAt: nextSync.toISOString(),
}
);
await DocumentSyncQueue.saveRun(queue.id, DocumentSyncRun.statuses.exited, { filename: document.filename, workspacesModified: [], reason: 'Content unchanged.' })
continue;
}
// update the defined document and workspace vectorDB with the latest information
// it will skip cache and create a new vectorCache file.
const vectorDatabase = getVectorDbClass();
await vectorDatabase.deleteDocumentFromNamespace(workspace.slug, document.docId);
await vectorDatabase.addDocumentToNamespace(
workspace.slug,
{ ...currentDocumentData, pageContent: newContent, docId: document.docId },
document.docpath,
true
);
updateSourceDocument(
document.docpath,
{
...currentDocumentData,
pageContent: newContent,
docId: document.docId,
published: (new Date).toLocaleString(),
// Todo: Update word count and token_estimate?
}
)
log(`Workspace "${workspace.name}" vectors of ${source} updated. Document and vector cache updated.`)
// Now we can bloom the results to all matching documents in all other workspaces
const workspacesModified = [workspace.slug];
const moreReferences = await Document.where({
id: { not: document.id },
filename: document.filename
}, null, null, { workspace: true });
if (moreReferences.length !== 0) {
log(`${source} is referenced in ${moreReferences.length} other workspaces. Updating those workspaces as well...`)
for (const additionalDocumentRef of moreReferences) {
const additionalWorkspace = additionalDocumentRef.workspace;
workspacesModified.push(additionalWorkspace.slug);
await vectorDatabase.deleteDocumentFromNamespace(additionalWorkspace.slug, additionalDocumentRef.docId);
await vectorDatabase.addDocumentToNamespace(
additionalWorkspace.slug,
{ ...currentDocumentData, pageContent: newContent, docId: additionalDocumentRef.docId },
additionalDocumentRef.docpath,
);
log(`Workspace "${additionalWorkspace.name}" vectors for ${source} was also updated with the new content from cache.`)
}
}
const nextRefresh = DocumentSyncQueue.calcNextSync(queue);
log(`${source} has been refreshed in all workspaces it is currently referenced in. Next refresh will be ${nextRefresh.toLocaleString()}.`)
await DocumentSyncQueue._update(
queue.id,
{
lastSyncedAt: new Date().toISOString(),
nextSyncAt: nextRefresh.toISOString(),
}
);
await DocumentSyncQueue.saveRun(queue.id, DocumentSyncRun.statuses.success, { filename: document.filename, workspacesModified })
}
} catch (e) {
console.error(e)
log(`errored with ${e.message}`)
} finally {
conclude();
}
})();