mirror of
https://github.com/Mintplex-Labs/anything-llm.git
synced 2024-11-15 10:50:31 +01:00
dc4ad6b5a9
* wip bg workers for live document sync * Add ability to re-embed specific documents across many workspaces via background queue bgworkser is gated behind expieremental system setting flag that needs to be explictly enabled UI for watching/unwatching docments that are embedded. TODO: UI to easily manage all bg tasks and see run results TODO: UI to enable this feature and background endpoints to manage it * create frontend views and paths Move elements to correct experimental scope * update migration to delete runs on removal of watched document * Add watch support to YouTube transcripts (#1716) * Add watch support to YouTube transcripts refactor how sync is done for supported types * Watch specific files in Confluence space (#1718) Add failure-prune check for runs * create tmp workflow modifications for beta image * create tmp workflow modifications for beta image * create tmp workflow modifications for beta image * dual build update copy of alert modals * update job interval * Add support for live-sync of Github files * update copy for document sync feature * hide Experimental features from UI * update docs links * [FEAT] Implement new settings menu for experimental features (#1735) * implement new settings menu for experimental features * remove unused context save bar --------- Co-authored-by: timothycarambat <rambat1010@gmail.com> * dont run job on boot * unset workflow changes * Add persistent encryption service Relay key to collector so persistent encryption can be used Encrypt any private data in chunkSources used for replay during resync jobs * update jsDOC * Linting and organization * update modal copy for feature --------- Co-authored-by: Sean Hatfield <seanhatfield5@gmail.com>
238 lines
7.2 KiB
JavaScript
238 lines
7.2 KiB
JavaScript
const { BackgroundService } = require("../utils/BackgroundWorkers");
|
|
const prisma = require("../utils/prisma");
|
|
const { SystemSettings } = require("./systemSettings");
|
|
const { Telemetry } = require("./telemetry");
|
|
|
|
/**
|
|
* @typedef {('link'|'youtube'|'confluence'|'github')} validFileType
|
|
*/
|
|
|
|
const DocumentSyncQueue = {
|
|
featureKey: "experimental_live_file_sync",
|
|
// update the validFileTypes and .canWatch properties when adding elements here.
|
|
validFileTypes: ["link", "youtube", "confluence", "github"],
|
|
defaultStaleAfter: 604800000,
|
|
maxRepeatFailures: 5, // How many times a run can fail in a row before pruning.
|
|
writable: [],
|
|
|
|
bootWorkers: function () {
|
|
new BackgroundService().boot();
|
|
},
|
|
|
|
killWorkers: function () {
|
|
new BackgroundService().stop();
|
|
},
|
|
|
|
/** Check is the Document Sync/Watch feature is enabled and can be used. */
|
|
enabled: async function () {
|
|
return (
|
|
(await SystemSettings.get({ label: this.featureKey }))?.value ===
|
|
"enabled"
|
|
);
|
|
},
|
|
|
|
/**
|
|
* @param {import("@prisma/client").document_sync_queues} queueRecord - queue record to calculate for
|
|
*/
|
|
calcNextSync: function (queueRecord) {
|
|
return new Date(Number(new Date()) + queueRecord.staleAfterMs);
|
|
},
|
|
|
|
canWatch: function ({ title, chunkSource = null } = {}) {
|
|
if (chunkSource.startsWith("link://") && title.endsWith(".html"))
|
|
return true; // If is web-link material (prior to feature most chunkSources were links://)
|
|
if (chunkSource.startsWith("youtube://")) return true; // If is a youtube link
|
|
if (chunkSource.startsWith("confluence://")) return true; // If is a confluence document link
|
|
if (chunkSource.startsWith("github://")) return true; // If is a Github file reference
|
|
return false;
|
|
},
|
|
|
|
/**
|
|
* Creates Queue record and updates document watch status to true on Document record
|
|
* @param {import("@prisma/client").workspace_documents} document - document record to watch, must have `id`
|
|
*/
|
|
watch: async function (document = null) {
|
|
if (!document) return false;
|
|
try {
|
|
const { Document } = require("./documents");
|
|
|
|
// Get all documents that are watched and share the same unique filename. If this value is
|
|
// non-zero then we exit early so that we do not have duplicated watch queues for the same file
|
|
// across many workspaces.
|
|
const workspaceDocIds = (
|
|
await Document.where({ filename: document.filename, watched: true })
|
|
).map((rec) => rec.id);
|
|
const hasRecords =
|
|
(await this.count({ workspaceDocId: { in: workspaceDocIds } })) > 0;
|
|
if (hasRecords)
|
|
throw new Error(
|
|
`Cannot watch this document again - it already has a queue set.`
|
|
);
|
|
|
|
const queue = await prisma.document_sync_queues.create({
|
|
data: {
|
|
workspaceDocId: document.id,
|
|
nextSyncAt: new Date(Number(new Date()) + this.defaultStaleAfter),
|
|
},
|
|
});
|
|
await Document._updateAll(
|
|
{ filename: document.filename },
|
|
{ watched: true }
|
|
);
|
|
return queue || null;
|
|
} catch (error) {
|
|
console.error(error.message);
|
|
return null;
|
|
}
|
|
},
|
|
|
|
/**
|
|
* Deletes Queue record and updates document watch status to false on Document record
|
|
* @param {import("@prisma/client").workspace_documents} document - document record to unwatch, must have `id`
|
|
*/
|
|
unwatch: async function (document = null) {
|
|
if (!document) return false;
|
|
try {
|
|
const { Document } = require("./documents");
|
|
|
|
// We could have been given a document to unwatch which is a clone of one that is already being watched but by another workspaceDocument id.
|
|
// so in this instance we need to delete any queues related to this document by any WorkspaceDocumentId it is referenced by.
|
|
const workspaceDocIds = (
|
|
await Document.where({ filename: document.filename, watched: true })
|
|
).map((rec) => rec.id);
|
|
await this.delete({ workspaceDocId: { in: workspaceDocIds } });
|
|
await Document._updateAll(
|
|
{ filename: document.filename },
|
|
{ watched: false }
|
|
);
|
|
return true;
|
|
} catch (error) {
|
|
console.error(error.message);
|
|
return false;
|
|
}
|
|
},
|
|
|
|
_update: async function (id = null, data = {}) {
|
|
if (!id) throw new Error("No id provided for update");
|
|
|
|
try {
|
|
await prisma.document_sync_queues.update({
|
|
where: { id },
|
|
data,
|
|
});
|
|
return true;
|
|
} catch (error) {
|
|
console.error(error.message);
|
|
return false;
|
|
}
|
|
},
|
|
|
|
get: async function (clause = {}) {
|
|
try {
|
|
const queue = await prisma.document_sync_queues.findFirst({
|
|
where: clause,
|
|
});
|
|
return queue || null;
|
|
} catch (error) {
|
|
console.error(error.message);
|
|
return null;
|
|
}
|
|
},
|
|
|
|
where: async function (
|
|
clause = {},
|
|
limit = null,
|
|
orderBy = null,
|
|
include = {}
|
|
) {
|
|
try {
|
|
const results = await prisma.document_sync_queues.findMany({
|
|
where: clause,
|
|
...(limit !== null ? { take: limit } : {}),
|
|
...(orderBy !== null ? { orderBy } : {}),
|
|
...(include !== null ? { include } : {}),
|
|
});
|
|
return results;
|
|
} catch (error) {
|
|
console.error(error.message);
|
|
return [];
|
|
}
|
|
},
|
|
|
|
count: async function (clause = {}, limit = null) {
|
|
try {
|
|
const count = await prisma.document_sync_queues.count({
|
|
where: clause,
|
|
...(limit !== null ? { take: limit } : {}),
|
|
});
|
|
return count;
|
|
} catch (error) {
|
|
console.error("FAILED TO COUNT DOCUMENTS.", error.message);
|
|
return 0;
|
|
}
|
|
},
|
|
|
|
delete: async function (clause = {}) {
|
|
try {
|
|
await prisma.document_sync_queues.deleteMany({ where: clause });
|
|
return true;
|
|
} catch (error) {
|
|
console.error(error.message);
|
|
return false;
|
|
}
|
|
},
|
|
|
|
/**
|
|
* Gets the "stale" queues where the queue's nextSyncAt is less than the current time
|
|
* @returns {Promise<(
|
|
* import("@prisma/client").document_sync_queues &
|
|
* { workspaceDoc: import("@prisma/client").workspace_documents &
|
|
* { workspace: import("@prisma/client").workspaces }
|
|
* })[]}>}
|
|
*/
|
|
staleDocumentQueues: async function () {
|
|
const queues = await this.where(
|
|
{
|
|
nextSyncAt: {
|
|
lte: new Date().toISOString(),
|
|
},
|
|
},
|
|
null,
|
|
null,
|
|
{
|
|
workspaceDoc: {
|
|
include: {
|
|
workspace: true,
|
|
},
|
|
},
|
|
}
|
|
);
|
|
return queues;
|
|
},
|
|
|
|
saveRun: async function (queueId = null, status = null, result = {}) {
|
|
const { DocumentSyncRun } = require("./documentSyncRun");
|
|
return DocumentSyncRun.save(queueId, status, result);
|
|
},
|
|
|
|
/**
|
|
* Updates document to be watched/unwatched & creates or deletes any queue records and updated Document record `watched` status
|
|
* @param {import("@prisma/client").workspace_documents} documentRecord
|
|
* @param {boolean} watchStatus - indicate if queue record should be created or not.
|
|
* @returns
|
|
*/
|
|
toggleWatchStatus: async function (documentRecord, watchStatus = false) {
|
|
if (!watchStatus) {
|
|
await Telemetry.sendTelemetry("document_unwatched");
|
|
await this.unwatch(documentRecord);
|
|
return;
|
|
}
|
|
|
|
await this.watch(documentRecord);
|
|
await Telemetry.sendTelemetry("document_watched");
|
|
return;
|
|
},
|
|
};
|
|
|
|
module.exports = { DocumentSyncQueue };
|