abstract Event emission

This commit is contained in:
Flam3rboy 2021-08-13 13:00:49 +02:00
parent 644430921a
commit 0157028af5
10 changed files with 119 additions and 281 deletions

View File

@ -7,9 +7,11 @@
"": {
"name": "@fosscord/gateway",
"version": "1.0.0",
"hasInstallScript": true,
"license": "ISC",
"dependencies": {
"@fosscord/server-util": "^1.3.51",
"@fosscord/util": "file:../util",
"ajv": "^8.5.0",
"amqplib": "^0.8.0",
"dotenv": "^8.2.0",
@ -32,6 +34,30 @@
"ts-node-dev": "^1.1.6"
}
},
"../util": {
"name": "@fosscord/util",
"version": "1.3.52",
"license": "GPLV3",
"dependencies": {
"@types/jsonwebtoken": "^8.5.0",
"@types/mongoose-autopopulate": "^0.10.1",
"@types/mongoose-lean-virtuals": "^0.5.1",
"@types/node": "^14.14.25",
"ajv": "^8.5.0",
"amqplib": "^0.8.0",
"dot-prop": "^6.0.1",
"env-paths": "^2.2.1",
"jsonwebtoken": "^8.5.1",
"missing-native-js-functions": "^1.2.2",
"mongodb": "^3.6.9",
"mongoose": "^5.13.7",
"mongoose-autopopulate": "^0.12.3",
"typescript": "^4.1.3"
},
"devDependencies": {
"@types/amqplib": "^0.8.1"
}
},
"node_modules/@fosscord/server-util": {
"version": "1.3.51",
"resolved": "https://registry.npmjs.org/@fosscord/server-util/-/server-util-1.3.51.tgz",
@ -53,6 +79,10 @@
"typescript": "^4.1.3"
}
},
"node_modules/@fosscord/util": {
"resolved": "../util",
"link": true
},
"node_modules/@types/amqplib": {
"version": "0.8.1",
"resolved": "https://registry.npmjs.org/@types/amqplib/-/amqplib-0.8.1.tgz",
@ -2387,6 +2417,26 @@
"typescript": "^4.1.3"
}
},
"@fosscord/util": {
"version": "file:../util",
"requires": {
"@types/amqplib": "^0.8.1",
"@types/jsonwebtoken": "^8.5.0",
"@types/mongoose-autopopulate": "^0.10.1",
"@types/mongoose-lean-virtuals": "^0.5.1",
"@types/node": "^14.14.25",
"ajv": "^8.5.0",
"amqplib": "^0.8.0",
"dot-prop": "^6.0.1",
"env-paths": "^2.2.1",
"jsonwebtoken": "^8.5.1",
"missing-native-js-functions": "^1.2.2",
"mongodb": "^3.6.9",
"mongoose": "^5.13.7",
"mongoose-autopopulate": "^0.12.3",
"typescript": "^4.1.3"
}
},
"@types/amqplib": {
"version": "0.8.1",
"resolved": "https://registry.npmjs.org/@types/amqplib/-/amqplib-0.8.1.tgz",

View File

@ -4,9 +4,11 @@
"description": "",
"main": "dist/index.js",
"scripts": {
"link": "npm run build && npm link",
"postinstall": "npm run --prefix ../util/ link && npm link @fosscord/util && npm run link",
"test": "echo \"Error: no test specified\" && exit 1",
"start": "npm run build && node dist/start.js",
"build": "npx tsc -b .",
"build": "tsc -b .",
"dev": "tsnd --respawn src/start.ts"
},
"keywords": [],
@ -14,6 +16,7 @@
"license": "ISC",
"dependencies": {
"@fosscord/server-util": "^1.3.51",
"@fosscord/util": "file:../util",
"ajv": "^8.5.0",
"amqplib": "^0.8.0",
"dotenv": "^8.2.0",

View File

@ -1,7 +1,7 @@
import "missing-native-js-functions";
import dotenv from "dotenv";
dotenv.config();
import { Config, db, RabbitMQ } from "@fosscord/server-util";
import { Config, db, initEvent, RabbitMQ } from "@fosscord/util";
import { Server as WebSocketServer } from "ws";
import { Connection } from "./events/Connection";
import http from "http";
@ -40,8 +40,7 @@ export class Server {
await (db as Promise<Connection>);
await this.setupSchema();
await Config.init();
await RabbitMQ.init();
console.log("[Database] connected");
await initEvent();
if (!this.server.listening) {
this.server.listen(this.port);
console.log(`[Gateway] online on 0.0.0.0:${this.port}`);

View File

@ -40,6 +40,7 @@ export async function Connection(this: Server, socket: WebSocket, request: Incom
socket.deflate.on("data", (chunk) => socket.send(chunk));
}
socket.events = {};
socket.permissions = {};
socket.sequence = 0;

View File

@ -1,63 +1,31 @@
import {
db,
Event,
MongooseCache,
UserModel,
getPermission,
Permissions,
ChannelModel,
RabbitMQ,
EVENT,
} from "@fosscord/server-util";
listenEvent,
EventOpts,
ListenEventOpts,
} from "@fosscord/util";
import { OPCODES } from "../util/Constants";
import { Send } from "../util/Send";
import WebSocket from "../util/WebSocket";
import "missing-native-js-functions";
import { ConsumeMessage } from "amqplib";
import { Channel } from "amqplib";
// TODO: close connection on Invalidated Token
// TODO: check intent
// TODO: Guild Member Update is sent for current-user updates regardless of whether the GUILD_MEMBERS intent is set.
// ? How to resubscribe MongooseCache for new dm channel events? Maybe directly send them to the user_id regardless of the channel_id? -> max overhead of creating 10 events in database for dm user group. Or a new field in event -> recipient_ids?
// Sharding: calculate if the current shard id matches the formula: shard_id = (guild_id >> 22) % num_shards
// https://discord.com/developers/docs/topics/gateway#sharding
export interface DispatchOpts {
eventStream: MongooseCache;
guilds: Array<string>;
}
function getPipeline(this: WebSocket, guilds: string[], channels: string[] = []) {
if (this.shard_count) {
guilds = guilds.filter((x) => (BigInt(x) >> 22n) % this.shard_count! === this.shard_id);
}
return [
{
$match: {
$or: [
{ "fullDocument.guild_id": { $in: guilds } },
{ "fullDocument.user_id": this.user_id },
{ "fullDocument.channel_id": { $in: channels } },
],
},
},
];
}
async function rabbitListen(this: WebSocket, id: string) {
await this.rabbitCh!.assertExchange(id, "fanout", { durable: false });
const q = await this.rabbitCh!.assertQueue("", { exclusive: true, autoDelete: true });
this.rabbitCh!.bindQueue(q.queue, id, "");
this.rabbitCh!.consume(q.queue, consume.bind(this), {
noAck: false,
});
this.rabbitCh!.queues[id] = q.queue;
}
// TODO: use already required guilds/channels of Identify and don't fetch them again
// TODO: use already queried guilds/channels of Identify and don't fetch them again
export async function setupListener(this: WebSocket) {
const user = await UserModel.findOne({ id: this.user_id }, { guilds: true }).exec();
const channels = await ChannelModel.find(
@ -67,26 +35,32 @@ export async function setupListener(this: WebSocket) {
const dm_channels = channels.filter((x) => !x.guild_id);
const guild_channels = channels.filter((x) => x.guild_id);
if (RabbitMQ.connection) {
// @ts-ignore
this.rabbitCh = await RabbitMQ.connection.createChannel();
this.rabbitCh!.queues = {};
const opts: { acknowledge: boolean; channel?: Channel } = { acknowledge: true };
const consumer = consume.bind(this);
rabbitListen.call(this, this.user_id);
if (RabbitMQ.connection) {
opts.channel = await RabbitMQ.connection.createChannel();
// @ts-ignore
opts.channel.queues = {};
}
this.events[this.user_id] = await listenEvent(this.user_id, consumer, opts);
for (const channel of dm_channels) {
rabbitListen.call(this, channel.id);
this.events[channel.id] = await listenEvent(channel.id, consumer, opts);
}
for (const guild of user.guilds) {
// contains guild and dm channels
getPermission(this.user_id, guild)
.then((x) => {
.then(async (x) => {
this.permissions[guild] = x;
rabbitListen.call(this, guild);
this.listeners;
this.events[guild] = await listenEvent(guild, consumer, opts);
for (const channel of guild_channels) {
if (x.overwriteChannel(channel.permission_overwrites).has("VIEW_CHANNEL")) {
rabbitListen.call(this, channel.id);
this.events[channel.id] = await listenEvent(channel.id, consumer, opts);
}
}
})
@ -94,63 +68,44 @@ export async function setupListener(this: WebSocket) {
}
this.once("close", () => {
this.rabbitCh!.close();
if (opts.channel) opts.channel.close();
else Object.values(this.events).forEach((x) => x());
});
} else {
const eventStream = new MongooseCache(
db.collection("events"),
getPipeline.call(
this,
user.guilds,
channels.map((x) => x.id)
),
{
onlyEvents: true,
}
);
await eventStream.init();
eventStream.on("insert", (document: Event) =>
dispatch.call(this, document, { eventStream, guilds: user.guilds })
);
this.once("close", () => eventStream.destroy());
}
}
// TODO: use rabbitmq to only receive events that are included in intents
function consume(this: WebSocket, opts: ConsumeMessage | null) {
if (!opts) return;
if (!this.rabbitCh) return;
const data = JSON.parse(opts.content.toString());
// TODO: only subscribe for events that are in the connection intents
function consume(this: WebSocket, opts: EventOpts) {
const { data, event } = opts;
const id = data.id as string;
const event = opts.properties.type as EVENT;
const permission = this.permissions[id] || new Permissions("ADMINISTRATOR"); // default permission for dm
console.log("rabbitmq event", event);
const consumer = consume.bind(this);
const listenOpts = opts as ListenEventOpts;
console.log("event", event);
// subscription managment
switch (event) {
case "CHANNEL_DELETE":
case "GUILD_DELETE":
this.rabbitCh.cancel(id);
delete this.events[id];
opts.cancel();
break;
case "CHANNEL_CREATE":
if (!permission.overwriteChannel(data.permission_overwrites).has("VIEW_CHANNEL")) return;
// TODO: check if user has permission to channel
case "GUILD_CREATE":
rabbitListen.call(this, id);
listenEvent(id, consumer, listenOpts);
break;
case "CHANNEL_UPDATE":
const queue_id = this.rabbitCh.queues[id];
const exists = this.events[id];
// @ts-ignore
const exists = this.rabbitCh.consumers[id];
if (permission.overwriteChannel(data.permission_overwrites).has("VIEW_CHANNEL")) {
if (exists) break;
rabbitListen.call(this, id);
listenEvent(id, consumer, listenOpts);
} else {
if (!exists) break;
this.rabbitCh.cancel(queue_id);
this.rabbitCh.unbindQueue(queue_id, id, "");
if (!exists) return; // return -> do not send channel update events for hidden channels
opts.cancel(id);
delete this.events[id];
}
break;
}
@ -216,167 +171,5 @@ function consume(this: WebSocket, opts: ConsumeMessage | null) {
d: data,
s: this.sequence++,
});
this.rabbitCh.ack(opts);
}
// TODO: cache permission
// we shouldn't fetch the permission for every event, as a message send event with many channel members would result in many thousand db queries.
// instead we should calculate all (guild, channel) permissions once and dynamically update if it changes.
// TODO: only subscribe for events that are in the connection intents
// TODO: only subscribe for channel/guilds that the user has access to (and re-subscribe if it changes)
export async function dispatch(this: WebSocket, document: Event, { eventStream, guilds }: DispatchOpts) {
var permission = new Permissions("ADMINISTRATOR"); // default permission for dms
console.log("event", document);
var channel_id = document.channel_id || document.data?.channel_id;
// TODO: clean up
if (document.event === "GUILD_CREATE") {
guilds.push(document.data.id);
eventStream.changeStream(getPipeline.call(this, guilds));
} else if (document.event === "GUILD_DELETE") {
guilds.remove(document.guild_id!);
eventStream.changeStream(getPipeline.call(this, guilds));
} else if (document.event === "CHANNEL_DELETE") channel_id = null;
if (document.guild_id && !this.intents.has("GUILDS")) return;
try {
permission = await getPermission(this.user_id, document.guild_id, channel_id);
} catch (e) {
permission = new Permissions();
}
// check intents: https://discord.com/developers/docs/topics/gateway#gateway-intents
switch (document.event) {
case "GUILD_DELETE":
case "GUILD_CREATE":
case "GUILD_UPDATE":
case "GUILD_ROLE_CREATE":
case "GUILD_ROLE_UPDATE":
case "GUILD_ROLE_DELETE":
case "CHANNEL_CREATE":
case "CHANNEL_DELETE":
case "CHANNEL_UPDATE":
// gets sent if GUILDS intent is set (already checked in if document.guild_id)
break;
case "GUILD_INTEGRATIONS_UPDATE":
if (!this.intents.has("GUILD_INTEGRATIONS")) return;
break;
case "WEBHOOKS_UPDATE":
if (!this.intents.has("GUILD_WEBHOOKS")) return;
break;
case "GUILD_EMOJI_UPDATE":
if (!this.intents.has("GUILD_EMOJIS")) return;
break;
// only send them, if the user subscribed for this part of the member list, or is a bot
case "GUILD_MEMBER_ADD":
case "GUILD_MEMBER_REMOVE":
case "GUILD_MEMBER_UPDATE":
if (!this.intents.has("GUILD_MEMBERS")) return;
break;
case "VOICE_STATE_UPDATE":
if (!this.intents.has("GUILD_VOICE_STATES")) return;
break;
case "GUILD_BAN_ADD":
case "GUILD_BAN_REMOVE":
if (!this.intents.has("GUILD_BANS")) return;
break;
case "INVITE_CREATE":
case "INVITE_DELETE":
if (!this.intents.has("GUILD_INVITES")) return;
case "PRESENCE_UPDATE":
if (!this.intents.has("GUILD_PRESENCES")) return;
break;
case "MESSAGE_CREATE":
case "MESSAGE_DELETE":
case "MESSAGE_DELETE_BULK":
case "MESSAGE_UPDATE":
case "CHANNEL_PINS_UPDATE":
if (!this.intents.has("GUILD_MESSAGES") && document.guild_id) return;
if (!this.intents.has("DIRECT_MESSAGES") && !document.guild_id) return;
break;
case "MESSAGE_REACTION_ADD":
case "MESSAGE_REACTION_REMOVE":
case "MESSAGE_REACTION_REMOVE_ALL":
case "MESSAGE_REACTION_REMOVE_EMOJI":
if (!this.intents.has("GUILD_MESSAGE_REACTIONS") && document.guild_id) return;
if (!this.intents.has("DIRECT_MESSAGE_REACTIONS") && !document.guild_id) return;
break;
case "TYPING_START":
if (!this.intents.has("GUILD_MESSAGE_TYPING") && document.guild_id) return;
if (!this.intents.has("DIRECT_MESSAGE_TYPING") && !document.guild_id) return;
break;
case "READY": // will be sent by the gateway
case "USER_UPDATE":
case "APPLICATION_COMMAND_CREATE":
case "APPLICATION_COMMAND_DELETE":
case "APPLICATION_COMMAND_UPDATE":
default:
// Any events not defined in an intent are considered "passthrough" and will always be sent to you.
break;
}
// check permissions
switch (document.event) {
case "GUILD_INTEGRATIONS_UPDATE":
if (!permission.has("MANAGE_GUILD")) return;
break;
case "WEBHOOKS_UPDATE":
if (!permission.has("MANAGE_WEBHOOKS")) return;
break;
case "GUILD_MEMBER_ADD":
case "GUILD_MEMBER_REMOVE":
case "GUILD_MEMBER_UPDATE":
// only send them, if the user subscribed for this part of the member list, or is a bot
break;
case "GUILD_BAN_ADD":
case "GUILD_BAN_REMOVE":
if (!permission.has("BAN_MEMBERS")) break;
break;
case "INVITE_CREATE":
case "INVITE_DELETE":
if (!permission.has("MANAGE_GUILD")) break;
case "PRESENCE_UPDATE":
break;
case "VOICE_STATE_UPDATE":
case "MESSAGE_CREATE":
case "MESSAGE_DELETE":
case "MESSAGE_DELETE_BULK":
case "MESSAGE_UPDATE":
case "CHANNEL_PINS_UPDATE":
case "MESSAGE_REACTION_ADD":
case "MESSAGE_REACTION_REMOVE":
case "MESSAGE_REACTION_REMOVE_ALL":
case "MESSAGE_REACTION_REMOVE_EMOJI":
case "TYPING_START":
// only gets send if the user is alowed to view the current channel
if (!permission.has("VIEW_CHANNEL")) return;
break;
case "GUILD_CREATE":
case "GUILD_DELETE":
case "GUILD_UPDATE":
case "GUILD_ROLE_CREATE":
case "GUILD_ROLE_UPDATE":
case "GUILD_ROLE_DELETE":
case "CHANNEL_CREATE":
case "CHANNEL_DELETE":
case "CHANNEL_UPDATE":
case "GUILD_EMOJI_UPDATE":
case "READY": // will be sent by the gateway
case "USER_UPDATE":
case "APPLICATION_COMMAND_CREATE":
case "APPLICATION_COMMAND_DELETE":
case "APPLICATION_COMMAND_UPDATE":
default:
// always gets sent
// Any events not defined in an intent are considered "passthrough" and will always be sent
break;
}
return Send(this, {
op: OPCODES.Dispatch,
t: document.event,
d: document.data,
s: this.sequence++,
});
opts.acknowledge?.();
}

View File

@ -12,7 +12,7 @@ import {
toObject,
EVENTEnum,
Config,
} from "@fosscord/server-util";
} from "@fosscord/util";
import { setupListener } from "../listener/listener";
import { IdentifySchema } from "../schema/Identify";
import { Send } from "../util/Send";

View File

@ -1,13 +1,5 @@
// @ts-nocheck WIP
import {
db,
getPermission,
MemberModel,
MongooseCache,
PublicUserProjection,
RoleModel,
toObject,
} from "@fosscord/server-util";
import { db, getPermission, PublicUserProjection, toObject } from "@fosscord/util";
import { LazyRequest } from "../schema/LazyRequest";
import { OPCODES, Payload } from "../util/Constants";
import { Send } from "../util/Send";

View File

@ -1,4 +1,4 @@
import { ActivityBodySchema } from "@fosscord/server-util";
import { ActivityBodySchema } from "@fosscord/util";
import { EmojiSchema } from "./Emoji";
export const ActivitySchema = {

View File

@ -1,6 +1,6 @@
// @ts-nocheck
import { Config } from "@fosscord/server-util";
import { getConfigPathForFile } from "@fosscord/server-util/dist/util/Config";
import { Config } from "@fosscord/util";
import { getConfigPathForFile } from "@fosscord/util/dist/util/Config";
import Ajv, { JSONSchemaType } from "ajv";
export interface DefaultOptions {

View File

@ -1,4 +1,4 @@
import { Intents, Permissions } from "@fosscord/server-util";
import { Intents, Permissions } from "@fosscord/util";
import WS, { Server, Data } from "ws";
import { Deflate } from "zlib";
import { Channel } from "amqplib";
@ -15,8 +15,8 @@ interface WebSocket extends WS {
readyTimeout: NodeJS.Timeout;
intents: Intents;
sequence: number;
rabbitCh?: Channel & { queues: Record<string, string> };
permissions: Record<string, Permissions>;
events: Record<string, Function>;
}
export default WebSocket;