RSA-Signing on server<->collector communication via API (#1005)

* WIP integrity check between processes

* Implement integrity checking on document processor payloads
This commit is contained in:
Timothy Carambat 2024-04-01 13:56:35 -07:00 committed by GitHub
parent 200bd7f061
commit f4088d9348
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 278 additions and 72 deletions

View File

@ -2,6 +2,7 @@
"cSpell.words": [
"anythingllm",
"Astra",
"comkey",
"Dockerized",
"Embeddable",
"GROQ",
@ -20,4 +21,4 @@
],
"eslint.experimental.useFlatConfig": true,
"docker.languageserver.formatter.ignoreMultilineInstructions": true
}
}

View File

@ -1,9 +1,10 @@
const { verifyPayloadIntegrity } = require("../middleware/verifyIntegrity");
const { reqBody } = require("../utils/http");
function extensions(app) {
if (!app) return;
app.post("/ext/github-repo", async function (request, response) {
app.post("/ext/github-repo", [verifyPayloadIntegrity], async function (request, response) {
try {
const loadGithubRepo = require("../utils/extensions/GithubRepo");
const { success, reason, data } = await loadGithubRepo(reqBody(request));
@ -24,7 +25,7 @@ function extensions(app) {
});
// gets all branches for a specific repo
app.post("/ext/github-repo/branches", async function (request, response) {
app.post("/ext/github-repo/branches", [verifyPayloadIntegrity], async function (request, response) {
try {
const GithubRepoLoader = require("../utils/extensions/GithubRepo/RepoLoader");
const allBranches = await (new GithubRepoLoader(reqBody(request))).getRepoBranches()
@ -48,7 +49,7 @@ function extensions(app) {
return;
});
app.post("/ext/youtube-transcript", async function (request, response) {
app.post("/ext/youtube-transcript", [verifyPayloadIntegrity], async function (request, response) {
try {
const loadYouTubeTranscript = require("../utils/extensions/YoutubeTranscript");
const { success, reason, data } = await loadYouTubeTranscript(reqBody(request));

View File

@ -13,6 +13,7 @@ const { processLink } = require("./processLink");
const { wipeCollectorStorage } = require("./utils/files");
const extensions = require("./extensions");
const { processRawText } = require("./processRawText");
const { verifyPayloadIntegrity } = require("./middleware/verifyIntegrity");
const app = express();
app.use(cors({ origin: true }));
@ -24,71 +25,83 @@ app.use(
})
);
app.post("/process", async function (request, response) {
const { filename, options = {} } = reqBody(request);
try {
const targetFilename = path
.normalize(filename)
.replace(/^(\.\.(\/|\\|$))+/, "");
const {
success,
reason,
documents = [],
} = await processSingleFile(targetFilename, options);
response
.status(200)
.json({ filename: targetFilename, success, reason, documents });
} catch (e) {
console.error(e);
response.status(200).json({
filename: filename,
success: false,
reason: "A processing error occurred.",
documents: [],
});
app.post(
"/process",
[verifyPayloadIntegrity],
async function (request, response) {
const { filename, options = {} } = reqBody(request);
try {
const targetFilename = path
.normalize(filename)
.replace(/^(\.\.(\/|\\|$))+/, "");
const {
success,
reason,
documents = [],
} = await processSingleFile(targetFilename, options);
response
.status(200)
.json({ filename: targetFilename, success, reason, documents });
} catch (e) {
console.error(e);
response.status(200).json({
filename: filename,
success: false,
reason: "A processing error occurred.",
documents: [],
});
}
return;
}
return;
});
);
app.post("/process-link", async function (request, response) {
const { link } = reqBody(request);
try {
const { success, reason, documents = [] } = await processLink(link);
response.status(200).json({ url: link, success, reason, documents });
} catch (e) {
console.error(e);
response.status(200).json({
url: link,
success: false,
reason: "A processing error occurred.",
documents: [],
});
app.post(
"/process-link",
[verifyPayloadIntegrity],
async function (request, response) {
const { link } = reqBody(request);
try {
const { success, reason, documents = [] } = await processLink(link);
response.status(200).json({ url: link, success, reason, documents });
} catch (e) {
console.error(e);
response.status(200).json({
url: link,
success: false,
reason: "A processing error occurred.",
documents: [],
});
}
return;
}
return;
});
);
app.post("/process-raw-text", async function (request, response) {
const { textContent, metadata } = reqBody(request);
try {
const {
success,
reason,
documents = [],
} = await processRawText(textContent, metadata);
response
.status(200)
.json({ filename: metadata.title, success, reason, documents });
} catch (e) {
console.error(e);
response.status(200).json({
filename: metadata?.title || "Unknown-doc.txt",
success: false,
reason: "A processing error occurred.",
documents: [],
});
app.post(
"/process-raw-text",
[verifyPayloadIntegrity],
async function (request, response) {
const { textContent, metadata } = reqBody(request);
try {
const {
success,
reason,
documents = [],
} = await processRawText(textContent, metadata);
response
.status(200)
.json({ filename: metadata.title, success, reason, documents });
} catch (e) {
console.error(e);
response.status(200).json({
filename: metadata?.title || "Unknown-doc.txt",
success: false,
reason: "A processing error occurred.",
documents: [],
});
}
return;
}
return;
});
);
extensions(app);

View File

@ -0,0 +1,21 @@
const { CommunicationKey } = require("../utils/comKey");
function verifyPayloadIntegrity(request, response, next) {
const comKey = new CommunicationKey();
if (process.env.NODE_ENV === "development") {
comKey.log('verifyPayloadIntegrity is skipped in development.')
next();
return;
}
const signature = request.header("X-Integrity");
if (!signature) return response.status(400).json({ msg: 'Failed integrity signature check.' })
const validSignedPayload = comKey.verify(signature, request.body);
if (!validSignedPayload) return response.status(400).json({ msg: 'Failed integrity signature check.' })
next();
}
module.exports = {
verifyPayloadIntegrity
}

View File

@ -4,11 +4,26 @@ const {
WATCH_DIRECTORY,
SUPPORTED_FILETYPE_CONVERTERS,
} = require("../utils/constants");
const { trashFile, isTextType } = require("../utils/files");
const {
trashFile,
isTextType,
normalizePath,
isWithin,
} = require("../utils/files");
const RESERVED_FILES = ["__HOTDIR__.md"];
async function processSingleFile(targetFilename, options = {}) {
const fullFilePath = path.resolve(WATCH_DIRECTORY, targetFilename);
const fullFilePath = path.resolve(
WATCH_DIRECTORY,
normalizePath(targetFilename)
);
if (!isWithin(path.resolve(WATCH_DIRECTORY), fullFilePath))
return {
success: false,
reason: "Filename is a not a valid path to process.",
documents: [],
};
if (RESERVED_FILES.includes(targetFilename))
return {
success: false,

View File

@ -0,0 +1,42 @@
const crypto = require("crypto");
const fs = require("fs");
const path = require("path");
const keyPath =
process.env.NODE_ENV === "development"
? path.resolve(__dirname, `../../../server/storage/comkey`)
: path.resolve(process.env.STORAGE_DIR, `comkey`);
class CommunicationKey {
#pubKeyName = "ipc-pub.pem";
#storageLoc = keyPath;
constructor() {}
log(text, ...args) {
console.log(`\x1b[36m[CommunicationKeyVerify]\x1b[0m ${text}`, ...args);
}
#readPublicKey() {
return fs.readFileSync(path.resolve(this.#storageLoc, this.#pubKeyName));
}
// Given a signed payload from private key from /app/server/ this signature should
// decode to match the textData provided. This class does verification only in collector.
// Note: The textData is typically the JSON stringified body sent to the document processor API.
verify(signature = "", textData = "") {
try {
let data = textData;
if (typeof textData !== "string") data = JSON.stringify(data);
return crypto.verify(
"RSA-SHA256",
Buffer.from(data),
this.#readPublicKey(),
Buffer.from(signature, "hex")
);
} catch {}
return false;
}
}
module.exports = { CommunicationKey };

View File

@ -108,10 +108,33 @@ async function wipeCollectorStorage() {
return;
}
/**
* Checks if a given path is within another path.
* @param {string} outer - The outer path (should be resolved).
* @param {string} inner - The inner path (should be resolved).
* @returns {boolean} - Returns true if the inner path is within the outer path, false otherwise.
*/
function isWithin(outer, inner) {
if (outer === inner) return false;
const rel = path.relative(outer, inner);
return !rel.startsWith("../") && rel !== "..";
}
function normalizePath(filepath = "") {
const result = path
.normalize(filepath.trim())
.replace(/^(\.\.(\/|\\|$))+/, "")
.trim();
if (["..", ".", "/"].includes(result)) throw new Error("Invalid path.");
return result;
}
module.exports = {
trashFile,
isTextType,
createdDate,
writeToServerDocuments,
wipeCollectorStorage,
normalizePath,
isWithin,
};

1
server/.gitignore vendored
View File

@ -3,6 +3,7 @@
storage/assets/*
!storage/assets/anything-llm.png
storage/documents/*
storage/comkey/*
storage/tmp/*
storage/vector-cache/*.json
storage/exports

View File

@ -1,4 +1,5 @@
const { Telemetry } = require("../../models/telemetry");
const { CommunicationKey } = require("../comKey");
const setupTelemetry = require("../telemetry");
function bootSSL(app, port = 3001) {
@ -16,6 +17,7 @@ function bootSSL(app, port = 3001) {
.createServer(credentials, app)
.listen(port, async () => {
await setupTelemetry();
new CommunicationKey(true);
console.log(`Primary server in HTTPS mode listening on port ${port}`);
})
.on("error", catchSigTerms);
@ -40,6 +42,7 @@ function bootHTTP(app, port = 3001) {
app
.listen(port, async () => {
await setupTelemetry();
new CommunicationKey(true);
console.log(`Primary server in HTTP mode listening on port ${port}`);
})
.on("error", catchSigTerms);

View File

@ -5,6 +5,8 @@
class CollectorApi {
constructor() {
const { CommunicationKey } = require("../comKey");
this.comkey = new CommunicationKey();
this.endpoint = `http://0.0.0.0:${process.env.COLLECTOR_PORT || 8888}`;
}
@ -40,15 +42,19 @@ class CollectorApi {
async processDocument(filename = "") {
if (!filename) return false;
const data = JSON.stringify({
filename,
options: this.#attachOptions(),
});
return await fetch(`${this.endpoint}/process`, {
method: "POST",
headers: {
"Content-Type": "application/json",
"X-Integrity": this.comkey.sign(data),
},
body: JSON.stringify({
filename,
options: this.#attachOptions(),
}),
body: data,
})
.then((res) => {
if (!res.ok) throw new Error("Response could not be completed");
@ -64,12 +70,14 @@ class CollectorApi {
async processLink(link = "") {
if (!link) return false;
const data = JSON.stringify({ link });
return await fetch(`${this.endpoint}/process-link`, {
method: "POST",
headers: {
"Content-Type": "application/json",
"X-Integrity": this.comkey.sign(data),
},
body: JSON.stringify({ link }),
body: data,
})
.then((res) => {
if (!res.ok) throw new Error("Response could not be completed");
@ -83,12 +91,14 @@ class CollectorApi {
}
async processRawText(textContent = "", metadata = {}) {
const data = JSON.stringify({ textContent, metadata });
return await fetch(`${this.endpoint}/process-raw-text`, {
method: "POST",
headers: {
"Content-Type": "application/json",
"X-Integrity": this.comkey.sign(data),
},
body: JSON.stringify({ textContent, metadata }),
body: data,
})
.then((res) => {
if (!res.ok) throw new Error("Response could not be completed");
@ -110,6 +120,7 @@ class CollectorApi {
body, // Stringified JSON!
headers: {
"Content-Type": "application/json",
"X-Integrity": this.comkey.sign(body),
},
})
.then((res) => {

View File

@ -0,0 +1,75 @@
const crypto = require("crypto");
const fs = require("fs");
const path = require("path");
const keyPath =
process.env.NODE_ENV === "development"
? path.resolve(__dirname, `../../storage/comkey`)
: path.resolve(process.env.STORAGE_DIR, `comkey`);
// What does this class do?
// This class generates a hashed version of some text (typically a JSON payload) using a rolling RSA key
// that can then be appended as a header value to do integrity checking on a payload. Given the
// nature of this class and that keys are rolled constantly, this protects the request
// integrity of requests sent to the collector as only the server can sign these requests.
// This keeps accidental misconfigurations of AnythingLLM that leaving port 8888 open from
// being abused or SSRF'd by users scraping malicious sites who have a loopback embedded in a <script>, for example.
// Since each request to the collector must be signed to be valid, unsigned requests directly to the collector
// will be dropped and must go through the /server endpoint directly.
class CommunicationKey {
#privKeyName = "ipc-priv.pem";
#pubKeyName = "ipc-pub.pem";
#storageLoc = keyPath;
// Init the class and determine if keys should be rolled.
// This typically occurs on boot up so key is fresh each boot.
constructor(generate = false) {
if (generate) this.#generate();
}
log(text, ...args) {
console.log(`\x1b[36m[CommunicationKey]\x1b[0m ${text}`, ...args);
}
#readPrivateKey() {
return fs.readFileSync(path.resolve(this.#storageLoc, this.#privKeyName));
}
#generate() {
const keyPair = crypto.generateKeyPairSync("rsa", {
modulusLength: 2048,
publicKeyEncoding: {
type: "pkcs1",
format: "pem",
},
privateKeyEncoding: {
type: "pkcs1",
format: "pem",
},
});
if (!fs.existsSync(this.#storageLoc))
fs.mkdirSync(this.#storageLoc, { recursive: true });
fs.writeFileSync(
`${path.resolve(this.#storageLoc, this.#privKeyName)}`,
keyPair.privateKey
);
fs.writeFileSync(
`${path.resolve(this.#storageLoc, this.#pubKeyName)}`,
keyPair.publicKey
);
this.log(
"RSA key pair generated for signed payloads within AnythingLLM services."
);
}
// This instance of ComKey on server is intended for generation of Priv/Pub key for signing and decoding.
// this resource is shared with /collector/ via a class of the same name in /utils which does decoding/verification only
// while this server class only does signing with the private key.
sign(textData = "") {
return crypto
.sign("RSA-SHA256", Buffer.from(textData), this.#readPrivateKey())
.toString("hex");
}
}
module.exports = { CommunicationKey };