mirror of
https://github.com/Mintplex-Labs/anything-llm.git
synced 2024-11-19 20:50:09 +01:00
238 lines
7.2 KiB
JavaScript
238 lines
7.2 KiB
JavaScript
|
const { BackgroundService } = require("../utils/BackgroundWorkers");
|
||
|
const prisma = require("../utils/prisma");
|
||
|
const { SystemSettings } = require("./systemSettings");
|
||
|
const { Telemetry } = require("./telemetry");
|
||
|
|
||
|
/**
|
||
|
* @typedef {('link'|'youtube'|'confluence'|'github')} validFileType
|
||
|
*/
|
||
|
|
||
|
const DocumentSyncQueue = {
|
||
|
featureKey: "experimental_live_file_sync",
|
||
|
// update the validFileTypes and .canWatch properties when adding elements here.
|
||
|
validFileTypes: ["link", "youtube", "confluence", "github"],
|
||
|
defaultStaleAfter: 604800000,
|
||
|
maxRepeatFailures: 5, // How many times a run can fail in a row before pruning.
|
||
|
writable: [],
|
||
|
|
||
|
bootWorkers: function () {
|
||
|
new BackgroundService().boot();
|
||
|
},
|
||
|
|
||
|
killWorkers: function () {
|
||
|
new BackgroundService().stop();
|
||
|
},
|
||
|
|
||
|
/** Check is the Document Sync/Watch feature is enabled and can be used. */
|
||
|
enabled: async function () {
|
||
|
return (
|
||
|
(await SystemSettings.get({ label: this.featureKey }))?.value ===
|
||
|
"enabled"
|
||
|
);
|
||
|
},
|
||
|
|
||
|
/**
|
||
|
* @param {import("@prisma/client").document_sync_queues} queueRecord - queue record to calculate for
|
||
|
*/
|
||
|
calcNextSync: function (queueRecord) {
|
||
|
return new Date(Number(new Date()) + queueRecord.staleAfterMs);
|
||
|
},
|
||
|
|
||
|
canWatch: function ({ title, chunkSource = null } = {}) {
|
||
|
if (chunkSource.startsWith("link://") && title.endsWith(".html"))
|
||
|
return true; // If is web-link material (prior to feature most chunkSources were links://)
|
||
|
if (chunkSource.startsWith("youtube://")) return true; // If is a youtube link
|
||
|
if (chunkSource.startsWith("confluence://")) return true; // If is a confluence document link
|
||
|
if (chunkSource.startsWith("github://")) return true; // If is a Github file reference
|
||
|
return false;
|
||
|
},
|
||
|
|
||
|
/**
|
||
|
* Creates Queue record and updates document watch status to true on Document record
|
||
|
* @param {import("@prisma/client").workspace_documents} document - document record to watch, must have `id`
|
||
|
*/
|
||
|
watch: async function (document = null) {
|
||
|
if (!document) return false;
|
||
|
try {
|
||
|
const { Document } = require("./documents");
|
||
|
|
||
|
// Get all documents that are watched and share the same unique filename. If this value is
|
||
|
// non-zero then we exit early so that we do not have duplicated watch queues for the same file
|
||
|
// across many workspaces.
|
||
|
const workspaceDocIds = (
|
||
|
await Document.where({ filename: document.filename, watched: true })
|
||
|
).map((rec) => rec.id);
|
||
|
const hasRecords =
|
||
|
(await this.count({ workspaceDocId: { in: workspaceDocIds } })) > 0;
|
||
|
if (hasRecords)
|
||
|
throw new Error(
|
||
|
`Cannot watch this document again - it already has a queue set.`
|
||
|
);
|
||
|
|
||
|
const queue = await prisma.document_sync_queues.create({
|
||
|
data: {
|
||
|
workspaceDocId: document.id,
|
||
|
nextSyncAt: new Date(Number(new Date()) + this.defaultStaleAfter),
|
||
|
},
|
||
|
});
|
||
|
await Document._updateAll(
|
||
|
{ filename: document.filename },
|
||
|
{ watched: true }
|
||
|
);
|
||
|
return queue || null;
|
||
|
} catch (error) {
|
||
|
console.error(error.message);
|
||
|
return null;
|
||
|
}
|
||
|
},
|
||
|
|
||
|
/**
|
||
|
* Deletes Queue record and updates document watch status to false on Document record
|
||
|
* @param {import("@prisma/client").workspace_documents} document - document record to unwatch, must have `id`
|
||
|
*/
|
||
|
unwatch: async function (document = null) {
|
||
|
if (!document) return false;
|
||
|
try {
|
||
|
const { Document } = require("./documents");
|
||
|
|
||
|
// We could have been given a document to unwatch which is a clone of one that is already being watched but by another workspaceDocument id.
|
||
|
// so in this instance we need to delete any queues related to this document by any WorkspaceDocumentId it is referenced by.
|
||
|
const workspaceDocIds = (
|
||
|
await Document.where({ filename: document.filename, watched: true })
|
||
|
).map((rec) => rec.id);
|
||
|
await this.delete({ workspaceDocId: { in: workspaceDocIds } });
|
||
|
await Document._updateAll(
|
||
|
{ filename: document.filename },
|
||
|
{ watched: false }
|
||
|
);
|
||
|
return true;
|
||
|
} catch (error) {
|
||
|
console.error(error.message);
|
||
|
return false;
|
||
|
}
|
||
|
},
|
||
|
|
||
|
_update: async function (id = null, data = {}) {
|
||
|
if (!id) throw new Error("No id provided for update");
|
||
|
|
||
|
try {
|
||
|
await prisma.document_sync_queues.update({
|
||
|
where: { id },
|
||
|
data,
|
||
|
});
|
||
|
return true;
|
||
|
} catch (error) {
|
||
|
console.error(error.message);
|
||
|
return false;
|
||
|
}
|
||
|
},
|
||
|
|
||
|
get: async function (clause = {}) {
|
||
|
try {
|
||
|
const queue = await prisma.document_sync_queues.findFirst({
|
||
|
where: clause,
|
||
|
});
|
||
|
return queue || null;
|
||
|
} catch (error) {
|
||
|
console.error(error.message);
|
||
|
return null;
|
||
|
}
|
||
|
},
|
||
|
|
||
|
where: async function (
|
||
|
clause = {},
|
||
|
limit = null,
|
||
|
orderBy = null,
|
||
|
include = {}
|
||
|
) {
|
||
|
try {
|
||
|
const results = await prisma.document_sync_queues.findMany({
|
||
|
where: clause,
|
||
|
...(limit !== null ? { take: limit } : {}),
|
||
|
...(orderBy !== null ? { orderBy } : {}),
|
||
|
...(include !== null ? { include } : {}),
|
||
|
});
|
||
|
return results;
|
||
|
} catch (error) {
|
||
|
console.error(error.message);
|
||
|
return [];
|
||
|
}
|
||
|
},
|
||
|
|
||
|
count: async function (clause = {}, limit = null) {
|
||
|
try {
|
||
|
const count = await prisma.document_sync_queues.count({
|
||
|
where: clause,
|
||
|
...(limit !== null ? { take: limit } : {}),
|
||
|
});
|
||
|
return count;
|
||
|
} catch (error) {
|
||
|
console.error("FAILED TO COUNT DOCUMENTS.", error.message);
|
||
|
return 0;
|
||
|
}
|
||
|
},
|
||
|
|
||
|
delete: async function (clause = {}) {
|
||
|
try {
|
||
|
await prisma.document_sync_queues.deleteMany({ where: clause });
|
||
|
return true;
|
||
|
} catch (error) {
|
||
|
console.error(error.message);
|
||
|
return false;
|
||
|
}
|
||
|
},
|
||
|
|
||
|
/**
|
||
|
* Gets the "stale" queues where the queue's nextSyncAt is less than the current time
|
||
|
* @returns {Promise<(
|
||
|
* import("@prisma/client").document_sync_queues &
|
||
|
* { workspaceDoc: import("@prisma/client").workspace_documents &
|
||
|
* { workspace: import("@prisma/client").workspaces }
|
||
|
* })[]}>}
|
||
|
*/
|
||
|
staleDocumentQueues: async function () {
|
||
|
const queues = await this.where(
|
||
|
{
|
||
|
nextSyncAt: {
|
||
|
lte: new Date().toISOString(),
|
||
|
},
|
||
|
},
|
||
|
null,
|
||
|
null,
|
||
|
{
|
||
|
workspaceDoc: {
|
||
|
include: {
|
||
|
workspace: true,
|
||
|
},
|
||
|
},
|
||
|
}
|
||
|
);
|
||
|
return queues;
|
||
|
},
|
||
|
|
||
|
saveRun: async function (queueId = null, status = null, result = {}) {
|
||
|
const { DocumentSyncRun } = require("./documentSyncRun");
|
||
|
return DocumentSyncRun.save(queueId, status, result);
|
||
|
},
|
||
|
|
||
|
/**
|
||
|
* Updates document to be watched/unwatched & creates or deletes any queue records and updated Document record `watched` status
|
||
|
* @param {import("@prisma/client").workspace_documents} documentRecord
|
||
|
* @param {boolean} watchStatus - indicate if queue record should be created or not.
|
||
|
* @returns
|
||
|
*/
|
||
|
toggleWatchStatus: async function (documentRecord, watchStatus = false) {
|
||
|
if (!watchStatus) {
|
||
|
await Telemetry.sendTelemetry("document_unwatched");
|
||
|
await this.unwatch(documentRecord);
|
||
|
return;
|
||
|
}
|
||
|
|
||
|
await this.watch(documentRecord);
|
||
|
await Telemetry.sendTelemetry("document_watched");
|
||
|
return;
|
||
|
},
|
||
|
};
|
||
|
|
||
|
module.exports = { DocumentSyncQueue };
|