mirror of
https://github.com/Mintplex-Labs/anything-llm.git
synced 2024-11-15 19:00:33 +01:00
dc4ad6b5a9
* wip bg workers for live document sync * Add ability to re-embed specific documents across many workspaces via background queue bgworkser is gated behind expieremental system setting flag that needs to be explictly enabled UI for watching/unwatching docments that are embedded. TODO: UI to easily manage all bg tasks and see run results TODO: UI to enable this feature and background endpoints to manage it * create frontend views and paths Move elements to correct experimental scope * update migration to delete runs on removal of watched document * Add watch support to YouTube transcripts (#1716) * Add watch support to YouTube transcripts refactor how sync is done for supported types * Watch specific files in Confluence space (#1718) Add failure-prune check for runs * create tmp workflow modifications for beta image * create tmp workflow modifications for beta image * create tmp workflow modifications for beta image * dual build update copy of alert modals * update job interval * Add support for live-sync of Github files * update copy for document sync feature * hide Experimental features from UI * update docs links * [FEAT] Implement new settings menu for experimental features (#1735) * implement new settings menu for experimental features * remove unused context save bar --------- Co-authored-by: timothycarambat <rambat1010@gmail.com> * dont run job on boot * unset workflow changes * Add persistent encryption service Relay key to collector so persistent encryption can be used Encrypt any private data in chunkSources used for replay during resync jobs * update jsDOC * Linting and organization * update modal copy for feature --------- Co-authored-by: Sean Hatfield <seanhatfield5@gmail.com>
154 lines
6.7 KiB
JavaScript
154 lines
6.7 KiB
JavaScript
const { Document } = require('../models/documents.js');
|
|
const { DocumentSyncQueue } = require('../models/documentSyncQueue.js');
|
|
const { CollectorApi } = require('../utils/collectorApi');
|
|
const { fileData } = require("../utils/files");
|
|
const { log, conclude, updateSourceDocument } = require('./helpers/index.js');
|
|
const { getVectorDbClass } = require('../utils/helpers/index.js');
|
|
const { DocumentSyncRun } = require('../models/documentSyncRun.js');
|
|
|
|
(async () => {
|
|
try {
|
|
const queuesToProcess = await DocumentSyncQueue.staleDocumentQueues();
|
|
if (queuesToProcess.length === 0) {
|
|
log('No outstanding documents to sync. Exiting.');
|
|
return;
|
|
}
|
|
|
|
const collector = new CollectorApi();
|
|
if (!(await collector.online())) {
|
|
log('Could not reach collector API. Exiting.');
|
|
return;
|
|
}
|
|
|
|
log(`${queuesToProcess.length} watched documents have been found to be stale and will be updated now.`)
|
|
for (const queue of queuesToProcess) {
|
|
let newContent = null;
|
|
const document = queue.workspaceDoc;
|
|
const workspace = document.workspace;
|
|
const { metadata, type, source } = Document.parseDocumentTypeAndSource(document);
|
|
|
|
if (!metadata || !DocumentSyncQueue.validFileTypes.includes(type)) {
|
|
// Document is either broken, invalid, or not supported so drop it from future queues.
|
|
log(`Document ${document.filename} has no metadata, is broken, or invalid and has been removed from all future runs.`)
|
|
await DocumentSyncQueue.unwatch(document);
|
|
continue;
|
|
}
|
|
|
|
if (type === 'link' || type === 'youtube') {
|
|
const response = await collector.forwardExtensionRequest({
|
|
endpoint: "/ext/resync-source-document",
|
|
method: "POST",
|
|
body: JSON.stringify({
|
|
type,
|
|
options: { link: source }
|
|
})
|
|
});
|
|
newContent = response?.content;
|
|
}
|
|
|
|
if (type === 'confluence' || type === 'github') {
|
|
const response = await collector.forwardExtensionRequest({
|
|
endpoint: "/ext/resync-source-document",
|
|
method: "POST",
|
|
body: JSON.stringify({
|
|
type,
|
|
options: { chunkSource: metadata.chunkSource }
|
|
})
|
|
});
|
|
newContent = response?.content;
|
|
}
|
|
|
|
if (!newContent) {
|
|
// Check if the last "x" runs were all failures (not exits!). If so - remove the job entirely since it is broken.
|
|
const failedRunCount = (await DocumentSyncRun.where({ queueId: queue.id }, DocumentSyncQueue.maxRepeatFailures, { createdAt: 'desc' })).filter((run) => run.status === DocumentSyncRun.statuses.failed).length;
|
|
if (failedRunCount >= DocumentSyncQueue.maxRepeatFailures) {
|
|
log(`Document ${document.filename} has failed to refresh ${failedRunCount} times continuously and will now be removed from the watched document set.`)
|
|
await DocumentSyncQueue.unwatch(document);
|
|
continue;
|
|
}
|
|
|
|
log(`Failed to get a new content response from collector for source ${source}. Skipping, but will retry next worker interval. Attempt ${failedRunCount === 0 ? 1 : failedRunCount}/${DocumentSyncQueue.maxRepeatFailures}`);
|
|
await DocumentSyncQueue.saveRun(queue.id, DocumentSyncRun.statuses.failed, { filename: document.filename, workspacesModified: [], reason: 'No content found.' })
|
|
continue;
|
|
}
|
|
|
|
const currentDocumentData = await fileData(document.docpath)
|
|
if (currentDocumentData.pageContent === newContent) {
|
|
const nextSync = DocumentSyncQueue.calcNextSync(queue)
|
|
log(`Source ${source} is unchanged and will be skipped. Next sync will be ${nextSync.toLocaleString()}.`);
|
|
await DocumentSyncQueue._update(
|
|
queue.id,
|
|
{
|
|
lastSyncedAt: new Date().toISOString(),
|
|
nextSyncAt: nextSync.toISOString(),
|
|
}
|
|
);
|
|
await DocumentSyncQueue.saveRun(queue.id, DocumentSyncRun.statuses.exited, { filename: document.filename, workspacesModified: [], reason: 'Content unchanged.' })
|
|
continue;
|
|
}
|
|
|
|
// update the defined document and workspace vectorDB with the latest information
|
|
// it will skip cache and create a new vectorCache file.
|
|
const vectorDatabase = getVectorDbClass();
|
|
await vectorDatabase.deleteDocumentFromNamespace(workspace.slug, document.docId);
|
|
await vectorDatabase.addDocumentToNamespace(
|
|
workspace.slug,
|
|
{ ...currentDocumentData, pageContent: newContent, docId: document.docId },
|
|
document.docpath,
|
|
true
|
|
);
|
|
updateSourceDocument(
|
|
document.docpath,
|
|
{
|
|
...currentDocumentData,
|
|
pageContent: newContent,
|
|
docId: document.docId,
|
|
published: (new Date).toLocaleString(),
|
|
// Todo: Update word count and token_estimate?
|
|
}
|
|
)
|
|
log(`Workspace "${workspace.name}" vectors of ${source} updated. Document and vector cache updated.`)
|
|
|
|
|
|
// Now we can bloom the results to all matching documents in all other workspaces
|
|
const workspacesModified = [workspace.slug];
|
|
const moreReferences = await Document.where({
|
|
id: { not: document.id },
|
|
filename: document.filename
|
|
}, null, null, { workspace: true });
|
|
|
|
if (moreReferences.length !== 0) {
|
|
log(`${source} is referenced in ${moreReferences.length} other workspaces. Updating those workspaces as well...`)
|
|
for (const additionalDocumentRef of moreReferences) {
|
|
const additionalWorkspace = additionalDocumentRef.workspace;
|
|
workspacesModified.push(additionalWorkspace.slug);
|
|
|
|
await vectorDatabase.deleteDocumentFromNamespace(additionalWorkspace.slug, additionalDocumentRef.docId);
|
|
await vectorDatabase.addDocumentToNamespace(
|
|
additionalWorkspace.slug,
|
|
{ ...currentDocumentData, pageContent: newContent, docId: additionalDocumentRef.docId },
|
|
additionalDocumentRef.docpath,
|
|
);
|
|
log(`Workspace "${additionalWorkspace.name}" vectors for ${source} was also updated with the new content from cache.`)
|
|
}
|
|
}
|
|
|
|
const nextRefresh = DocumentSyncQueue.calcNextSync(queue);
|
|
log(`${source} has been refreshed in all workspaces it is currently referenced in. Next refresh will be ${nextRefresh.toLocaleString()}.`)
|
|
await DocumentSyncQueue._update(
|
|
queue.id,
|
|
{
|
|
lastSyncedAt: new Date().toISOString(),
|
|
nextSyncAt: nextRefresh.toISOString(),
|
|
}
|
|
);
|
|
await DocumentSyncQueue.saveRun(queue.id, DocumentSyncRun.statuses.success, { filename: document.filename, workspacesModified })
|
|
}
|
|
} catch (e) {
|
|
console.error(e)
|
|
log(`errored with ${e.message}`)
|
|
} finally {
|
|
conclude();
|
|
}
|
|
})();
|