anything-llm/server/models/documentSyncQueue.js
Timothy Carambat b44889a843
PR#2355 Continued + expanded scope (#2365)
* #2317 Fetch pinned documents once per folder to reduce the number of queries.

* Reorder the lines to keeps const declarations together.

* Add some comments to functions
move pinned document fetch for folder to function
move watched documents per-folder to also function the same
remove unused function in documents model

---------

Co-authored-by: Błażej Owczarczyk <blazeyy@gmail.com>
2024-09-24 15:55:54 -07:00

245 lines
7.6 KiB
JavaScript

const { BackgroundService } = require("../utils/BackgroundWorkers");
const prisma = require("../utils/prisma");
const { SystemSettings } = require("./systemSettings");
const { Telemetry } = require("./telemetry");
/**
* @typedef {('link'|'youtube'|'confluence'|'github')} validFileType
*/
const DocumentSyncQueue = {
featureKey: "experimental_live_file_sync",
// update the validFileTypes and .canWatch properties when adding elements here.
validFileTypes: ["link", "youtube", "confluence", "github"],
defaultStaleAfter: 604800000,
maxRepeatFailures: 5, // How many times a run can fail in a row before pruning.
writable: [],
bootWorkers: function () {
new BackgroundService().boot();
},
killWorkers: function () {
new BackgroundService().stop();
},
/** Check is the Document Sync/Watch feature is enabled and can be used. */
enabled: async function () {
return (
(await SystemSettings.get({ label: this.featureKey }))?.value ===
"enabled"
);
},
/**
* @param {import("@prisma/client").document_sync_queues} queueRecord - queue record to calculate for
*/
calcNextSync: function (queueRecord) {
return new Date(Number(new Date()) + queueRecord.staleAfterMs);
},
/**
* Check if the document can be watched based on the metadata fields
* @param {object} metadata - metadata to check
* @param {string} metadata.title - title of the document
* @param {string} metadata.chunkSource - chunk source of the document
* @returns {boolean} - true if the document can be watched, false otherwise
*/
canWatch: function ({ title, chunkSource = null } = {}) {
if (chunkSource.startsWith("link://") && title.endsWith(".html"))
return true; // If is web-link material (prior to feature most chunkSources were links://)
if (chunkSource.startsWith("youtube://")) return true; // If is a youtube link
if (chunkSource.startsWith("confluence://")) return true; // If is a confluence document link
if (chunkSource.startsWith("github://")) return true; // If is a Github file reference
return false;
},
/**
* Creates Queue record and updates document watch status to true on Document record
* @param {import("@prisma/client").workspace_documents} document - document record to watch, must have `id`
*/
watch: async function (document = null) {
if (!document) return false;
try {
const { Document } = require("./documents");
// Get all documents that are watched and share the same unique filename. If this value is
// non-zero then we exit early so that we do not have duplicated watch queues for the same file
// across many workspaces.
const workspaceDocIds = (
await Document.where({ filename: document.filename, watched: true })
).map((rec) => rec.id);
const hasRecords =
(await this.count({ workspaceDocId: { in: workspaceDocIds } })) > 0;
if (hasRecords)
throw new Error(
`Cannot watch this document again - it already has a queue set.`
);
const queue = await prisma.document_sync_queues.create({
data: {
workspaceDocId: document.id,
nextSyncAt: new Date(Number(new Date()) + this.defaultStaleAfter),
},
});
await Document._updateAll(
{ filename: document.filename },
{ watched: true }
);
return queue || null;
} catch (error) {
console.error(error.message);
return null;
}
},
/**
* Deletes Queue record and updates document watch status to false on Document record
* @param {import("@prisma/client").workspace_documents} document - document record to unwatch, must have `id`
*/
unwatch: async function (document = null) {
if (!document) return false;
try {
const { Document } = require("./documents");
// We could have been given a document to unwatch which is a clone of one that is already being watched but by another workspaceDocument id.
// so in this instance we need to delete any queues related to this document by any WorkspaceDocumentId it is referenced by.
const workspaceDocIds = (
await Document.where({ filename: document.filename, watched: true })
).map((rec) => rec.id);
await this.delete({ workspaceDocId: { in: workspaceDocIds } });
await Document._updateAll(
{ filename: document.filename },
{ watched: false }
);
return true;
} catch (error) {
console.error(error.message);
return false;
}
},
_update: async function (id = null, data = {}) {
if (!id) throw new Error("No id provided for update");
try {
await prisma.document_sync_queues.update({
where: { id },
data,
});
return true;
} catch (error) {
console.error(error.message);
return false;
}
},
get: async function (clause = {}) {
try {
const queue = await prisma.document_sync_queues.findFirst({
where: clause,
});
return queue || null;
} catch (error) {
console.error(error.message);
return null;
}
},
where: async function (
clause = {},
limit = null,
orderBy = null,
include = {}
) {
try {
const results = await prisma.document_sync_queues.findMany({
where: clause,
...(limit !== null ? { take: limit } : {}),
...(orderBy !== null ? { orderBy } : {}),
...(include !== null ? { include } : {}),
});
return results;
} catch (error) {
console.error(error.message);
return [];
}
},
count: async function (clause = {}, limit = null) {
try {
const count = await prisma.document_sync_queues.count({
where: clause,
...(limit !== null ? { take: limit } : {}),
});
return count;
} catch (error) {
console.error("FAILED TO COUNT DOCUMENTS.", error.message);
return 0;
}
},
delete: async function (clause = {}) {
try {
await prisma.document_sync_queues.deleteMany({ where: clause });
return true;
} catch (error) {
console.error(error.message);
return false;
}
},
/**
* Gets the "stale" queues where the queue's nextSyncAt is less than the current time
* @returns {Promise<(
* import("@prisma/client").document_sync_queues &
* { workspaceDoc: import("@prisma/client").workspace_documents &
* { workspace: import("@prisma/client").workspaces }
* })[]}>}
*/
staleDocumentQueues: async function () {
const queues = await this.where(
{
nextSyncAt: {
lte: new Date().toISOString(),
},
},
null,
null,
{
workspaceDoc: {
include: {
workspace: true,
},
},
}
);
return queues;
},
saveRun: async function (queueId = null, status = null, result = {}) {
const { DocumentSyncRun } = require("./documentSyncRun");
return DocumentSyncRun.save(queueId, status, result);
},
/**
* Updates document to be watched/unwatched & creates or deletes any queue records and updated Document record `watched` status
* @param {import("@prisma/client").workspace_documents} documentRecord
* @param {boolean} watchStatus - indicate if queue record should be created or not.
* @returns
*/
toggleWatchStatus: async function (documentRecord, watchStatus = false) {
if (!watchStatus) {
await Telemetry.sendTelemetry("document_unwatched");
await this.unwatch(documentRecord);
return;
}
await this.watch(documentRecord);
await Telemetry.sendTelemetry("document_watched");
return;
},
};
module.exports = { DocumentSyncQueue };