vertically scale bundle

This commit is contained in:
Flam3rboy 2021-10-18 17:38:31 +02:00
parent dc22db6902
commit 45efdb7ce4
4 changed files with 71 additions and 37 deletions

View File

@ -7,7 +7,7 @@ config();
import { FosscordServer } from "./Server"; import { FosscordServer } from "./Server";
import cluster from "cluster"; import cluster from "cluster";
import os from "os"; import os from "os";
const cores = Number(process.env.threads) || os.cpus().length; const cores = Number(process.env.THREADS) || os.cpus().length;
if (cluster.isMaster && process.env.NODE_ENV == "production") { if (cluster.isMaster && process.env.NODE_ENV == "production") {
console.log(`Primary ${process.pid} is running`); console.log(`Primary ${process.pid} is running`);

View File

@ -12,7 +12,7 @@ import { Config, initDatabase } from "@fosscord/util";
const app = express(); const app = express();
const server = http.createServer(); const server = http.createServer();
const port = Number(process.env.PORT) || 3001; const port = Number(process.env.PORT) || 3001;
const production = false; const production = true;
server.on("request", app); server.on("request", app);
// @ts-ignore // @ts-ignore
@ -23,6 +23,7 @@ const cdn = new CDNServer({ server, port, production, app });
const gateway = new Gateway.Server({ server, port, production }); const gateway = new Gateway.Server({ server, port, production });
async function main() { async function main() {
server.listen(port);
await initDatabase(); await initDatabase();
await Config.init(); await Config.init();
// only set endpointPublic, if not already set // only set endpointPublic, if not already set

View File

@ -1,6 +1,6 @@
// process.env.MONGOMS_DEBUG = "true"; // process.env.MONGOMS_DEBUG = "true";
import "reflect-metadata"; import "reflect-metadata";
import cluster from "cluster"; import cluster, { Worker } from "cluster";
import os from "os"; import os from "os";
import { red, bold, yellow, cyan } from "nanocolors"; import { red, bold, yellow, cyan } from "nanocolors";
import { initStats } from "./stats"; import { initStats } from "./stats";
@ -8,20 +8,21 @@ import { config } from "dotenv";
config(); config();
import { execSync } from "child_process"; import { execSync } from "child_process";
// TODO: add tcp socket event transmission // TODO: add socket event transmission
const cores = 1 || Number(process.env.threads) || os.cpus().length; let cores = Number(process.env.THREADS) || os.cpus().length;
function getCommitOrFail() { if (cluster.isMaster) {
try { function getCommitOrFail() {
return execSync("git rev-parse HEAD").toString().trim(); try {
} catch (e) { return execSync("git rev-parse HEAD").toString().trim();
return null; } catch (e) {
return null;
}
} }
} const commit = getCommitOrFail();
const commit = getCommitOrFail();
console.log( console.log(
bold(` bold(`
@ -38,32 +39,40 @@ console.log(
)} )}
Current commit: ${ Current commit: ${
commit !== null commit !== null
? `${cyan(commit)} (${yellow(commit.slice(0, 7))})` ? `${cyan(commit)} (${yellow(commit.slice(0, 7))})`
: "Unknown (Git cannot be found)" : "Unknown (Git cannot be found)"
}
`)
);
if (commit == null)
console.log(yellow(`Warning: Git is not installed or not in PATH.`));
if (cluster.isMaster && !process.env.masterStarted) {
process.env.masterStarted = "true";
(async () => {
initStats();
if (cores === 1) {
require("./Server");
return;
} }
`)
);
if (commit == null) {
console.log(yellow(`Warning: Git is not installed or not in PATH.`));
}
initStats();
console.log(`[Process] starting with ${cores} threads`);
if (cores === 1) {
require("./Server");
} else {
process.env.EVENT_TRANSMISSION = "process";
// Fork workers. // Fork workers.
for (let i = 0; i < cores; i++) { for (let i = 0; i < cores; i++) {
cluster.fork(); cluster.fork();
console.log(`[Process] worker ${i} started.`);
} }
cluster.on("message", (sender: Worker, message: any) => {
for (const id in cluster.workers) {
const worker = cluster.workers[id];
if (worker === sender || !worker) continue;
worker.send(message);
}
});
cluster.on("exit", (worker: any, code: any, signal: any) => { cluster.on("exit", (worker: any, code: any, signal: any) => {
console.log( console.log(
`[Worker] ${red( `[Worker] ${red(
@ -72,7 +81,7 @@ if (cluster.isMaster && !process.env.masterStarted) {
); );
cluster.fork(); cluster.fork();
}); });
})(); }
} else { } else {
require("./Server"); require("./Server");
} }

View File

@ -15,6 +15,8 @@ export async function emitEvent(payload: Omit<Event, "created_at">) {
// assertQueue isn't needed, because a queue will automatically created if it doesn't exist // assertQueue isn't needed, because a queue will automatically created if it doesn't exist
const successful = RabbitMQ.channel?.publish(id, "", Buffer.from(`${data}`), { type: payload.event }); const successful = RabbitMQ.channel?.publish(id, "", Buffer.from(`${data}`), { type: payload.event });
if (!successful) throw new Error("failed to send event"); if (!successful) throw new Error("failed to send event");
} else if (process.env.EVENT_TRANSMISSION === "process") {
process.send?.({ type: "event", event: payload, id } as ProcessEvent);
} else { } else {
events.emit(id, payload); events.emit(id, payload);
} }
@ -25,6 +27,7 @@ export async function initEvent() {
if (RabbitMQ.connection) { if (RabbitMQ.connection) {
} else { } else {
// use event emitter // use event emitter
// use process messages
} }
} }
@ -39,17 +42,38 @@ export interface ListenEventOpts {
acknowledge?: boolean; acknowledge?: boolean;
} }
export interface ProcessEvent {
type: "event";
event: Event;
id: string;
}
export async function listenEvent(event: string, callback: (event: EventOpts) => any, opts?: ListenEventOpts) { export async function listenEvent(event: string, callback: (event: EventOpts) => any, opts?: ListenEventOpts) {
if (RabbitMQ.connection) { if (RabbitMQ.connection) {
// @ts-ignore // @ts-ignore
return rabbitListen(opts?.channel || RabbitMQ.channel, event, callback, { acknowledge: opts?.acknowledge }); return rabbitListen(opts?.channel || RabbitMQ.channel, event, callback, { acknowledge: opts?.acknowledge });
} else { } else if (process.env.EVENT_TRANSMISSION === "process") {
const cancel = () => { const cancel = () => {
events.removeListener(event, callback); process.removeListener("message", listener);
process.setMaxListeners(process.getMaxListeners() - 1);
};
const listener = (msg: ProcessEvent) => {
msg.type === "event" && msg.id === event && callback({ ...msg.event, cancel });
};
process.addListener("message", listener);
process.setMaxListeners(process.getMaxListeners() + 1);
return cancel;
} else {
const listener = (opts: any) => callback({ ...opts, cancel });
const cancel = () => {
events.removeListener(event, listener);
events.setMaxListeners(events.getMaxListeners() - 1); events.setMaxListeners(events.getMaxListeners() - 1);
}; };
events.setMaxListeners(events.getMaxListeners() + 1); events.setMaxListeners(events.getMaxListeners() + 1);
events.addListener(event, (opts) => callback({ ...opts, cancel })); events.addListener(event, listener);
return cancel; return cancel;
} }