create getPipeline function + clean up

This commit is contained in:
Flam3rboy 2021-04-08 18:52:57 +02:00
parent 9d9a4c7e72
commit f268e9c42f

View File

@ -2,6 +2,7 @@ import { db, Event, MongooseCache, UserModel, getPermission, Permissions } from
import { OPCODES } from "../util/Constants"; import { OPCODES } from "../util/Constants";
import { Send } from "../util/Send"; import { Send } from "../util/Send";
import WebSocket from "../util/WebSocket"; import WebSocket from "../util/WebSocket";
import "missing-native-js-functions";
// TODO: close connection on Invalidated Token // TODO: close connection on Invalidated Token
// TODO: check intent // TODO: check intent
@ -12,43 +13,39 @@ import WebSocket from "../util/WebSocket";
// https://discord.com/developers/docs/topics/gateway#sharding // https://discord.com/developers/docs/topics/gateway#sharding
export interface DispatchOpts { export interface DispatchOpts {
eventStream: any; eventStream: MongooseCache;
user_guilds: Array<string>; guilds: Array<string>;
shard_count?: bigint; }
shard_id?: bigint;
function getPipeline(this: WebSocket, guilds: string[]) {
if (this.shard_count) {
guilds = guilds.filter((x) => (BigInt(x) >> 22n) % this.shard_count === this.shard_id);
}
return [
{
$match: {
$or: [{ "fullDocument.guild_id": { $in: guilds } }, { "fullDocument.user_id": this.user_id }],
},
},
];
} }
export async function setupListener(this: WebSocket) { export async function setupListener(this: WebSocket) {
const user = await UserModel.findOne({ id: this.user_id }).lean().exec(); const user = await UserModel.findOne({ id: this.user_id }).lean().exec();
var user_guilds = user.guilds; var guilds = user.guilds;
const shard_count = 10n;
const shard_id = 0n;
if (shard_count) { const eventStream = new MongooseCache(db.collection("events"), getPipeline.call(this, guilds), {
user_guilds = user.guilds.filter((x) => (BigInt(x) >> 22n) % shard_count === shard_id); onlyEvents: true,
} });
const eventStream = new MongooseCache(
db.collection("events"),
[
{
$match: {
$or: [{ "fullDocument.guild_id": { $in: user_guilds } }, { "fullDocument.user_id": user.id }]
}
}
],
{
onlyEvents: true
}
);
await eventStream.init(); await eventStream.init();
eventStream.on("insert", (document) => dispatch.call(this, { eventStream, user_guilds, shard_count, shard_id }, document)); eventStream.on("insert", (document: Event) => dispatch.call(this, document, { eventStream, guilds }));
this.once("close", () => eventStream.destroy()); this.once("close", () => eventStream.destroy());
} }
export async function dispatch(this: WebSocket, { eventStream, user_guilds, shard_count, shard_id }: DispatchOpts, document: Event) { export async function dispatch(this: WebSocket, document: Event, { eventStream, guilds }: DispatchOpts) {
var permission = new Permissions("ADMINISTRATOR"); // default permission for dms var permission = new Permissions("ADMINISTRATOR"); // default permission for dms
console.log("event", document); console.log("event", document);
@ -59,13 +56,11 @@ export async function dispatch(this: WebSocket, { eventStream, user_guilds, shar
} }
if (document.event === "GUILD_CREATE") { if (document.event === "GUILD_CREATE") {
user_guilds.push(document.guild_id); guilds.push(document.guild_id);
eventStream.changeStream(getPipeline.call(this, guilds));
if (shard_count) { } else if (document.event === "GUILD_DELETE") {
user_guilds = user_guilds.filter((x) => (BigInt(x) >> 22n) % shard_count === shard_id); guilds.remove(document.guild);
} eventStream.changeStream(getPipeline.call(this, guilds));
eventStream.changeStream([{ $match: { $or: [{ "fullDocument.guild_id": { $in: user_guilds } }, { "fullDocument.user_id": document.user_id }] } }]);
} }
// check intents: https://discord.com/developers/docs/topics/gateway#gateway-intents // check intents: https://discord.com/developers/docs/topics/gateway#gateway-intents
@ -200,6 +195,6 @@ export async function dispatch(this: WebSocket, { eventStream, user_guilds, shar
op: OPCODES.Dispatch, op: OPCODES.Dispatch,
t: document.event, t: document.event,
d: document.data, d: document.data,
s: this.sequence++ s: this.sequence++,
}); });
} }