improve performance of identify + listener

This commit is contained in:
Flam3rboy 2021-10-17 00:37:06 +02:00
parent aec3834fe5
commit d53a4048d0
2 changed files with 152 additions and 79 deletions

View File

@ -6,6 +6,9 @@ import {
EventOpts, EventOpts,
ListenEventOpts, ListenEventOpts,
Member, Member,
EVENTEnum,
Relationship,
RelationshipType,
} from "@fosscord/util"; } from "@fosscord/util";
import { OPCODES } from "../util/Constants"; import { OPCODES } from "../util/Constants";
import { Send } from "../util/Send"; import { Send } from "../util/Send";
@ -21,22 +24,45 @@ import { Recipient } from "@fosscord/util";
// Sharding: calculate if the current shard id matches the formula: shard_id = (guild_id >> 22) % num_shards // Sharding: calculate if the current shard id matches the formula: shard_id = (guild_id >> 22) % num_shards
// https://discord.com/developers/docs/topics/gateway#sharding // https://discord.com/developers/docs/topics/gateway#sharding
export function handlePresenceUpdate(
this: WebSocket,
{ event, acknowledge, data }: EventOpts
) {
acknowledge?.();
if (event === EVENTEnum.PresenceUpdate) {
return Send(this, {
op: OPCODES.Dispatch,
t: event,
d: data,
s: this.sequence++,
});
}
}
// TODO: use already queried guilds/channels of Identify and don't fetch them again // TODO: use already queried guilds/channels of Identify and don't fetch them again
export async function setupListener(this: WebSocket) { export async function setupListener(this: WebSocket) {
const members = await Member.find({ const [members, recipients, relationships] = await Promise.all([
where: { id: this.user_id }, Member.find({
relations: ["guild", "guild.channels"], where: { id: this.user_id },
}); relations: ["guild", "guild.channels"],
}),
Recipient.find({
where: { user_id: this.user_id, closed: false },
relations: ["channel"],
}),
Relationship.find({
from_id: this.user_id,
type: RelationshipType.friends,
}),
]);
const guilds = members.map((x) => x.guild); const guilds = members.map((x) => x.guild);
const recipients = await Recipient.find({
where: { user_id: this.user_id, closed: false },
relations: ["channel"],
});
const dm_channels = recipients.map((x) => x.channel); const dm_channels = recipients.map((x) => x.channel);
const opts: { acknowledge: boolean; channel?: AMQChannel } = { const opts: { acknowledge: boolean; channel?: AMQChannel } = {
acknowledge: true, acknowledge: true,
}; };
this.listen_options = opts;
const consumer = consume.bind(this); const consumer = consume.bind(this);
if (RabbitMQ.connection) { if (RabbitMQ.connection) {
@ -47,45 +73,44 @@ export async function setupListener(this: WebSocket) {
this.events[this.user_id] = await listenEvent(this.user_id, consumer, opts); this.events[this.user_id] = await listenEvent(this.user_id, consumer, opts);
for (const channel of dm_channels) { relationships.forEach(async (relationship) => {
this.events[relationship.to_id] = await listenEvent(
relationship.to_id,
handlePresenceUpdate.bind(this),
opts
);
});
dm_channels.forEach(async (channel) => {
this.events[channel.id] = await listenEvent(channel.id, consumer, opts); this.events[channel.id] = await listenEvent(channel.id, consumer, opts);
} });
for (const guild of guilds) { guilds.forEach(async (guild) => {
// contains guild and dm channels const permission = await getPermission(this.user_id, guild.id);
this.permissions[guild.id] = permission;
this.events[guild.id] = await listenEvent(guild.id, consumer, opts);
getPermission(this.user_id, guild.id) guild.channels.forEach(async (channel) => {
.then(async (x) => { if (
this.permissions[guild.id] = x; permission
this.listeners; .overwriteChannel(channel.permission_overwrites!)
this.events[guild.id] = await listenEvent( .has("VIEW_CHANNEL")
guild.id, ) {
this.events[channel.id] = await listenEvent(
channel.id,
consumer, consumer,
opts opts
); );
}
for (const channel of guild.channels) { });
if ( });
x
.overwriteChannel(channel.permission_overwrites!)
.has("VIEW_CHANNEL")
) {
this.events[channel.id] = await listenEvent(
channel.id,
consumer,
opts
);
}
}
})
.catch((e) =>
console.log("couldn't get permission for guild " + guild, e)
);
}
this.once("close", () => { this.once("close", () => {
if (opts.channel) opts.channel.close(); if (opts.channel) opts.channel.close();
else Object.values(this.events).forEach((x) => x()); else {
Object.values(this.events).forEach((x) => x());
Object.values(this.member_events).forEach((x) => x());
}
}); });
} }
@ -97,10 +122,23 @@ async function consume(this: WebSocket, opts: EventOpts) {
const consumer = consume.bind(this); const consumer = consume.bind(this);
const listenOpts = opts as ListenEventOpts; const listenOpts = opts as ListenEventOpts;
opts.acknowledge?.();
// console.log("event", event); // console.log("event", event);
// subscription managment // subscription managment
switch (event) { switch (event) {
case "GUILD_MEMBER_REMOVE":
this.member_events[data.user.id]?.();
delete this.member_events[data.user.id];
case "GUILD_MEMBER_ADD":
if (this.member_events[data.user.id]) break; // already subscribed
this.member_events[data.user.id] = await listenEvent(
data.user.id,
handlePresenceUpdate.bind(this),
this.listen_options
);
break;
case "RELATIONSHIP_REMOVE":
case "CHANNEL_DELETE": case "CHANNEL_DELETE":
case "GUILD_DELETE": case "GUILD_DELETE":
delete this.events[id]; delete this.events[id];
@ -196,5 +234,4 @@ async function consume(this: WebSocket, opts: EventOpts) {
d: data, d: data,
s: this.sequence++, s: this.sequence++,
}); });
opts.acknowledge?.();
} }

View File

@ -13,6 +13,11 @@ import {
PrivateUserProjection, PrivateUserProjection,
ReadState, ReadState,
Application, Application,
emitEvent,
SessionsReplace,
PrivateSessionProjection,
MemberPrivateProjection,
PresenceUpdateEvent,
} from "@fosscord/util"; } from "@fosscord/util";
import { Send } from "../util/Send"; import { Send } from "../util/Send";
import { CLOSECODES, OPCODES } from "../util/Constants"; import { CLOSECODES, OPCODES } from "../util/Constants";
@ -43,11 +48,56 @@ export async function onIdentify(this: WebSocket, data: Payload) {
} }
this.user_id = decoded.id; this.user_id = decoded.id;
const user = await User.findOneOrFail({ const session_id = genSessionId();
where: { id: this.user_id }, this.session_id = session_id; //Set the session of the WebSocket object
relations: ["relationships", "relationships.to"],
select: [...PrivateUserProjection, "relationships"], const [user, read_states, members, recipients, session, application] =
}); await Promise.all([
User.findOneOrFail({
where: { id: this.user_id },
relations: ["relationships", "relationships.to"],
select: [...PrivateUserProjection, "relationships"],
}),
ReadState.find({ user_id: this.user_id }),
Member.find({
where: { id: this.user_id },
select: MemberPrivateProjection,
relations: [
"guild",
"guild.channels",
"guild.emojis",
"guild.emojis.user",
"guild.roles",
"guild.stickers",
"user",
"roles",
],
}),
Recipient.find({
where: { user_id: this.user_id, closed: false },
relations: [
"channel",
"channel.recipients",
"channel.recipients.user",
],
// TODO: public user selection
}),
// save the session and delete it when the websocket is closed
new Session({
user_id: this.user_id,
session_id: session_id,
// TODO: check if status is only one of: online, dnd, offline, idle
status: identify.presence?.status || "online", //does the session always start as online?
client_info: {
//TODO read from identity
client: "desktop",
os: identify.properties?.os,
version: 0,
},
}).save(),
Application.findOne({ id: this.user_id }),
]);
if (!user) return this.close(CLOSECODES.Authentication_failed); if (!user) return this.close(CLOSECODES.Authentication_failed);
if (!identify.intents) identify.intents = BigInt("0b11111111111111"); if (!identify.intents) identify.intents = BigInt("0b11111111111111");
@ -68,19 +118,6 @@ export async function onIdentify(this: WebSocket, data: Payload) {
} }
var users: PublicUser[] = []; var users: PublicUser[] = [];
const members = await Member.find({
where: { id: this.user_id },
relations: [
"guild",
"guild.channels",
"guild.emojis",
"guild.emojis.user",
"guild.roles",
"guild.stickers",
"user",
"roles",
],
});
const merged_members = members.map((x: Member) => { const merged_members = members.map((x: Member) => {
return [ return [
{ {
@ -112,11 +149,6 @@ export async function onIdentify(this: WebSocket, data: Payload) {
const user_guild_settings_entries = members.map((x) => x.settings); const user_guild_settings_entries = members.map((x) => x.settings);
const recipients = await Recipient.find({
where: { user_id: this.user_id, closed: false },
relations: ["channel", "channel.recipients", "channel.recipients.user"],
// TODO: public user selection
});
const channels = recipients.map((x) => { const channels = recipients.map((x) => {
// @ts-ignore // @ts-ignore
x.channel.recipients = x.channel.recipients?.map((x) => x.user); x.channel.recipients = x.channel.recipients?.map((x) => x.user);
@ -144,24 +176,28 @@ export async function onIdentify(this: WebSocket, data: Payload) {
users.push(public_related_user); users.push(public_related_user);
} }
const session_id = genSessionId(); setImmediate(async () => {
this.session_id = session_id; //Set the session of the WebSocket object // run in seperate "promise context" because ready payload is not dependent on those events
const session = new Session({ emitEvent({
user_id: this.user_id, event: "SESSIONS_REPLACE",
session_id: session_id, user_id: this.user_id,
status: "online", //does the session always start as online? data: await Session.find({
client_info: { where: { user_id: this.user_id },
//TODO read from identity select: PrivateSessionProjection,
client: "desktop", }),
os: "linux", } as SessionsReplace);
version: 0, emitEvent({
}, event: "PRESENCE_UPDATE",
user_id: this.user_id,
data: {
user: await User.getPublicUser(this.user_id),
activities: session.activities,
client_status: session?.client_info,
status: session.status,
},
} as PresenceUpdateEvent);
}); });
//We save the session and we delete it when the websocket is closed
await session.save();
const read_states = await ReadState.find({ user_id: this.user_id });
read_states.forEach((s: any) => { read_states.forEach((s: any) => {
s.id = s.channel_id; s.id = s.channel_id;
delete s.user_id; delete s.user_id;
@ -192,7 +228,7 @@ export async function onIdentify(this: WebSocket, data: Payload) {
const d: ReadyEventData = { const d: ReadyEventData = {
v: 8, v: 8,
application: await Application.findOne({ id: this.user_id }), application,
user: privateUser, user: privateUser,
user_settings: user.settings, user_settings: user.settings,
// @ts-ignore // @ts-ignore