anything-llm/server/models/documentSyncQueue.js
Timothy Carambat dc4ad6b5a9
[BETA] Live document sync (#1719)
* wip bg workers for live document sync

* Add ability to re-embed specific documents across many workspaces via background queue
bgworkser is gated behind expieremental system setting flag that needs to be explictly enabled
UI for watching/unwatching docments that are embedded.
TODO: UI to easily manage all bg tasks and see run results
TODO: UI to enable this feature and background endpoints to manage it

* create frontend views and paths
Move elements to correct experimental scope

* update migration to delete runs on removal of watched document

* Add watch support to YouTube transcripts (#1716)

* Add watch support to YouTube transcripts
refactor how sync is done for supported types

* Watch specific files in Confluence space (#1718)

Add failure-prune check for runs

* create tmp workflow modifications for beta image

* create tmp workflow modifications for beta image

* create tmp workflow modifications for beta image

* dual build
update copy of alert modals

* update job interval

* Add support for live-sync of Github files

* update copy for document sync feature

* hide Experimental features from UI

* update docs links

* [FEAT] Implement new settings menu for experimental features (#1735)

* implement new settings menu for experimental features

* remove unused context save bar

---------

Co-authored-by: timothycarambat <rambat1010@gmail.com>

* dont run job on boot

* unset workflow changes

* Add persistent encryption service
Relay key to collector so persistent encryption can be used
Encrypt any private data in chunkSources used for replay during resync jobs

* update jsDOC

* Linting and organization

* update modal copy for feature

---------

Co-authored-by: Sean Hatfield <seanhatfield5@gmail.com>
2024-06-21 13:38:50 -07:00

238 lines
7.2 KiB
JavaScript

const { BackgroundService } = require("../utils/BackgroundWorkers");
const prisma = require("../utils/prisma");
const { SystemSettings } = require("./systemSettings");
const { Telemetry } = require("./telemetry");
/**
* @typedef {('link'|'youtube'|'confluence'|'github')} validFileType
*/
const DocumentSyncQueue = {
featureKey: "experimental_live_file_sync",
// update the validFileTypes and .canWatch properties when adding elements here.
validFileTypes: ["link", "youtube", "confluence", "github"],
defaultStaleAfter: 604800000,
maxRepeatFailures: 5, // How many times a run can fail in a row before pruning.
writable: [],
bootWorkers: function () {
new BackgroundService().boot();
},
killWorkers: function () {
new BackgroundService().stop();
},
/** Check is the Document Sync/Watch feature is enabled and can be used. */
enabled: async function () {
return (
(await SystemSettings.get({ label: this.featureKey }))?.value ===
"enabled"
);
},
/**
* @param {import("@prisma/client").document_sync_queues} queueRecord - queue record to calculate for
*/
calcNextSync: function (queueRecord) {
return new Date(Number(new Date()) + queueRecord.staleAfterMs);
},
canWatch: function ({ title, chunkSource = null } = {}) {
if (chunkSource.startsWith("link://") && title.endsWith(".html"))
return true; // If is web-link material (prior to feature most chunkSources were links://)
if (chunkSource.startsWith("youtube://")) return true; // If is a youtube link
if (chunkSource.startsWith("confluence://")) return true; // If is a confluence document link
if (chunkSource.startsWith("github://")) return true; // If is a Github file reference
return false;
},
/**
* Creates Queue record and updates document watch status to true on Document record
* @param {import("@prisma/client").workspace_documents} document - document record to watch, must have `id`
*/
watch: async function (document = null) {
if (!document) return false;
try {
const { Document } = require("./documents");
// Get all documents that are watched and share the same unique filename. If this value is
// non-zero then we exit early so that we do not have duplicated watch queues for the same file
// across many workspaces.
const workspaceDocIds = (
await Document.where({ filename: document.filename, watched: true })
).map((rec) => rec.id);
const hasRecords =
(await this.count({ workspaceDocId: { in: workspaceDocIds } })) > 0;
if (hasRecords)
throw new Error(
`Cannot watch this document again - it already has a queue set.`
);
const queue = await prisma.document_sync_queues.create({
data: {
workspaceDocId: document.id,
nextSyncAt: new Date(Number(new Date()) + this.defaultStaleAfter),
},
});
await Document._updateAll(
{ filename: document.filename },
{ watched: true }
);
return queue || null;
} catch (error) {
console.error(error.message);
return null;
}
},
/**
* Deletes Queue record and updates document watch status to false on Document record
* @param {import("@prisma/client").workspace_documents} document - document record to unwatch, must have `id`
*/
unwatch: async function (document = null) {
if (!document) return false;
try {
const { Document } = require("./documents");
// We could have been given a document to unwatch which is a clone of one that is already being watched but by another workspaceDocument id.
// so in this instance we need to delete any queues related to this document by any WorkspaceDocumentId it is referenced by.
const workspaceDocIds = (
await Document.where({ filename: document.filename, watched: true })
).map((rec) => rec.id);
await this.delete({ workspaceDocId: { in: workspaceDocIds } });
await Document._updateAll(
{ filename: document.filename },
{ watched: false }
);
return true;
} catch (error) {
console.error(error.message);
return false;
}
},
_update: async function (id = null, data = {}) {
if (!id) throw new Error("No id provided for update");
try {
await prisma.document_sync_queues.update({
where: { id },
data,
});
return true;
} catch (error) {
console.error(error.message);
return false;
}
},
get: async function (clause = {}) {
try {
const queue = await prisma.document_sync_queues.findFirst({
where: clause,
});
return queue || null;
} catch (error) {
console.error(error.message);
return null;
}
},
where: async function (
clause = {},
limit = null,
orderBy = null,
include = {}
) {
try {
const results = await prisma.document_sync_queues.findMany({
where: clause,
...(limit !== null ? { take: limit } : {}),
...(orderBy !== null ? { orderBy } : {}),
...(include !== null ? { include } : {}),
});
return results;
} catch (error) {
console.error(error.message);
return [];
}
},
count: async function (clause = {}, limit = null) {
try {
const count = await prisma.document_sync_queues.count({
where: clause,
...(limit !== null ? { take: limit } : {}),
});
return count;
} catch (error) {
console.error("FAILED TO COUNT DOCUMENTS.", error.message);
return 0;
}
},
delete: async function (clause = {}) {
try {
await prisma.document_sync_queues.deleteMany({ where: clause });
return true;
} catch (error) {
console.error(error.message);
return false;
}
},
/**
* Gets the "stale" queues where the queue's nextSyncAt is less than the current time
* @returns {Promise<(
* import("@prisma/client").document_sync_queues &
* { workspaceDoc: import("@prisma/client").workspace_documents &
* { workspace: import("@prisma/client").workspaces }
* })[]}>}
*/
staleDocumentQueues: async function () {
const queues = await this.where(
{
nextSyncAt: {
lte: new Date().toISOString(),
},
},
null,
null,
{
workspaceDoc: {
include: {
workspace: true,
},
},
}
);
return queues;
},
saveRun: async function (queueId = null, status = null, result = {}) {
const { DocumentSyncRun } = require("./documentSyncRun");
return DocumentSyncRun.save(queueId, status, result);
},
/**
* Updates document to be watched/unwatched & creates or deletes any queue records and updated Document record `watched` status
* @param {import("@prisma/client").workspace_documents} documentRecord
* @param {boolean} watchStatus - indicate if queue record should be created or not.
* @returns
*/
toggleWatchStatus: async function (documentRecord, watchStatus = false) {
if (!watchStatus) {
await Telemetry.sendTelemetry("document_unwatched");
await this.unwatch(documentRecord);
return;
}
await this.watch(documentRecord);
await Telemetry.sendTelemetry("document_watched");
return;
},
};
module.exports = { DocumentSyncQueue };