diff --git a/assets/schemas.json b/assets/schemas.json index 2d39546f..d71d0794 100755 --- a/assets/schemas.json +++ b/assets/schemas.json @@ -684108,6 +684108,11 @@ "type": "object", "properties": { "type": { + "enum": [ + "audio", + "screen", + "video" + ], "type": "string" }, "rid": { @@ -694118,6 +694123,7 @@ "type": { "enum": [ "audio", + "screen", "video" ], "type": "string" @@ -749017,42 +749023,6 @@ "items": { "type": "string" } - }, - "message_reference": { - "type": "object", - "properties": { - "message_id": { - "type": "string" - }, - "channel_id": { - "type": "string" - }, - "guild_id": { - "type": "string" - }, - "fail_if_not_exists": { - "type": "boolean" - } - }, - "additionalProperties": false, - "required": [ - "message_id" - ] - }, - "sticker_ids": { - "type": "array", - "items": { - "type": "string" - } - }, - "nonce": { - "type": "string" - }, - "enforce_nonce": { - "type": "boolean" - }, - "poll": { - "$ref": "#/definitions/PollCreationSchema" } }, "additionalProperties": false, diff --git a/package.json b/package.json index 01bf56c1..acf8ca4d 100644 --- a/package.json +++ b/package.json @@ -124,7 +124,7 @@ "optionalDependencies": { "@yukikaze-bot/erlpack": "^1.0.1", "jimp": "^1.6.0", - "@dank074/medooze-media-server": "1.156.3", + "@dank074/medooze-media-server": "1.156.4", "semantic-sdp": "^3.31.1", "mysql": "^2.18.1", "nodemailer-mailgun-transport": "^2.1.5", diff --git a/src/gateway/opcodes/StreamCreate.ts b/src/gateway/opcodes/StreamCreate.ts index bb493ed5..ea0ada04 100644 --- a/src/gateway/opcodes/StreamCreate.ts +++ b/src/gateway/opcodes/StreamCreate.ts @@ -1,5 +1,17 @@ -import { Payload, WebSocket } from "@spacebar/gateway"; -import { Config, emitEvent, Region, VoiceState } from "@spacebar/util"; +import { + genVoiceToken, + Payload, + WebSocket, + generateStreamKey, +} from "@spacebar/gateway"; +import { + Config, + emitEvent, + Region, + Snowflake, + Stream, + StreamSession, +} from "@spacebar/util"; interface StreamCreateSchema { type: "guild" | "call"; @@ -11,37 +23,65 @@ interface StreamCreateSchema { export async function onStreamCreate(this: WebSocket, data: Payload) { const body = data.d as StreamCreateSchema; - // first check if we are in a voice channel already. cannot create a stream if there's no existing voice connection - if (!this.voiceWs || !this.voiceWs.webrtcConnected) return; + // TODO: first check if we are in a voice channel already. cannot create a stream if there's no existing voice connection + if (body.channel_id.trim().length === 0) return; // TODO: permissions check - if it's a guild, check if user is allowed to create stream in this guild - // TODO: create a new entry in db (StreamState?) containing the token for authenticating user in stream gateway IDENTIFY - // TODO: actually apply preferred_region from the event payload const regions = Config.get().regions; const guildRegion = regions.available.filter( (r) => r.id === regions.default, )[0]; - const streamKey = `${body.type}${body.type === "guild" ? ":" + body.guild_id : ""}:${body.channel_id}:${this.user_id}`; + // create a new entry in db containing the token for authenticating user in stream gateway IDENTIFY + const stream = Stream.create({ + id: Snowflake.generate(), + owner_id: this.user_id, + channel_id: body.channel_id, + endpoint: guildRegion.endpoint, + }); + + await stream.save(); + + const token = genVoiceToken(); + + const streamSession = StreamSession.create({ + stream_id: stream.id, + user_id: this.user_id, + session_id: this.session_id, + token, + }); + + await streamSession.save(); + + const streamKey = generateStreamKey( + body.type, + body.guild_id, + body.channel_id, + this.user_id, + ); await emitEvent({ event: "STREAM_CREATE", data: { stream_key: streamKey, - rtc_server_id: "lol", // for voice connections in guilds it is guild_id, for dm voice calls it seems to be DM channel id, for GoLive streams a generated number + rtc_server_id: stream.id, // for voice connections in guilds it is guild_id, for dm voice calls it seems to be DM channel id, for GoLive streams a generated number + viewer_ids: [], + region: guildRegion.name, + paused: false, }, guild_id: body.guild_id, - //user_id: this.user_id, + channel_id: body.channel_id, }); await emitEvent({ event: "STREAM_SERVER_UPDATE", data: { - token: "TEST", + token: streamSession.token, stream_key: streamKey, - endpoint: guildRegion.endpoint, + guild_id: null, // not sure why its always null + endpoint: stream.endpoint, }, user_id: this.user_id, }); diff --git a/src/gateway/opcodes/StreamDelete.ts b/src/gateway/opcodes/StreamDelete.ts index 050b0f98..e33204ad 100644 --- a/src/gateway/opcodes/StreamDelete.ts +++ b/src/gateway/opcodes/StreamDelete.ts @@ -1,4 +1,5 @@ -import { Payload, WebSocket } from "@spacebar/gateway"; +import { parseStreamKey, Payload, WebSocket } from "@spacebar/gateway"; +import { emitEvent, Stream } from "@spacebar/util"; interface StreamDeleteSchema { stream_key: string; @@ -7,23 +8,28 @@ interface StreamDeleteSchema { export async function onStreamDelete(this: WebSocket, data: Payload) { const body = data.d as StreamDeleteSchema; - const splitStreamKey = body.stream_key.split(":"); - if (splitStreamKey.length < 3) { - return this.close(4000, "Invalid stream key"); - } + const { userId, channelId, guildId, type } = parseStreamKey( + body.stream_key, + ); - const type = splitStreamKey.shift()!; - let guild_id: string; - - if (type === "guild") { - guild_id = splitStreamKey.shift()!; - } - const channel_id = splitStreamKey.shift()!; - const user_id = splitStreamKey.shift()!; - - if (this.user_id !== user_id) { + if (this.user_id !== userId) { return this.close(4000, "Cannot delete stream for another user"); } - // TODO: actually delete stream + const stream = await Stream.findOne({ + where: { channel_id: channelId, owner_id: userId }, + }); + + if (!stream) return this.close(4000, "Invalid stream key"); + + await stream.remove(); + + await emitEvent({ + event: "STREAM_DELETE", + data: { + stream_key: body.stream_key, + }, + guild_id: guildId, + channel_id: channelId, + }); } diff --git a/src/gateway/opcodes/StreamWatch.ts b/src/gateway/opcodes/StreamWatch.ts index b5f86635..f94a2f07 100644 --- a/src/gateway/opcodes/StreamWatch.ts +++ b/src/gateway/opcodes/StreamWatch.ts @@ -1,4 +1,10 @@ -import { Payload, WebSocket } from "@spacebar/gateway"; +import { + genVoiceToken, + parseStreamKey, + Payload, + WebSocket, +} from "@spacebar/gateway"; +import { Config, emitEvent, Stream, StreamSession } from "@spacebar/util"; interface StreamWatchSchema { stream_key: string; @@ -7,17 +13,60 @@ interface StreamWatchSchema { export async function onStreamWatch(this: WebSocket, data: Payload) { const body = data.d as StreamWatchSchema; - const splitStreamKey = body.stream_key.split(":"); - if (splitStreamKey.length < 3) { + // TODO: apply perms: check if user is allowed to watch + try { + const { type, channelId, guildId, userId } = parseStreamKey( + body.stream_key, + ); + + const stream = await Stream.findOneOrFail({ + where: { channel_id: channelId, owner_id: userId }, + }); + + const streamSession = StreamSession.create({ + stream_id: stream.id, + user_id: this.user_id, + session_id: this.session_id, + token: genVoiceToken(), + }); + + await streamSession.save(); + + const regions = Config.get().regions; + const guildRegion = regions.available.find( + (r) => r.endpoint === stream.endpoint, + ); + + if (!guildRegion) return this.close(4000, "Unknown region"); + + const viewers = await StreamSession.find({ + where: { stream_id: stream.id }, + }); + + await emitEvent({ + event: "STREAM_CREATE", + data: { + stream_key: body.stream_key, + rtc_server_id: stream.id, // for voice connections in guilds it is guild_id, for dm voice calls it seems to be DM channel id, for GoLive streams a generated number + viewer_ids: viewers.map((v) => v.user_id), + region: guildRegion.name, + paused: false, + }, + guild_id: guildId, + channel_id: channelId, + }); + + await emitEvent({ + event: "STREAM_SERVER_UPDATE", + data: { + token: streamSession.token, + stream_key: body.stream_key, + guild_id: null, // not sure why its always null + endpoint: stream.endpoint, + }, + user_id: this.user_id, + }); + } catch (e) { return this.close(4000, "Invalid stream key"); } - - const type = splitStreamKey.shift()!; - let guild_id: string; - - if (type === "guild") { - guild_id = splitStreamKey.shift()!; - } - const channel_id = splitStreamKey.shift()!; - const user_id = splitStreamKey.shift()!; } diff --git a/src/gateway/opcodes/VoiceStateUpdate.ts b/src/gateway/opcodes/VoiceStateUpdate.ts index 1c69958c..0e9267b5 100644 --- a/src/gateway/opcodes/VoiceStateUpdate.ts +++ b/src/gateway/opcodes/VoiceStateUpdate.ts @@ -42,6 +42,8 @@ export async function onVoiceStateUpdate(this: WebSocket, data: Payload) { const isNew = body.channel_id === null && body.guild_id === null; let isChanged = false; + let prevState; + let voiceState: VoiceState; try { voiceState = await VoiceState.findOneOrFail({ @@ -60,6 +62,7 @@ export async function onVoiceStateUpdate(this: WebSocket, data: Payload) { //If a user change voice channel between guild we should send a left event first if ( + voiceState.guild_id && voiceState.guild_id !== body.guild_id && voiceState.session_id === this.session_id ) { @@ -71,7 +74,8 @@ export async function onVoiceStateUpdate(this: WebSocket, data: Payload) { } //The event send by Discord's client on channel leave has both guild_id and channel_id as null - if (body.guild_id === null) body.guild_id = voiceState.guild_id; + //if (body.guild_id === null) body.guild_id = voiceState.guild_id; + prevState = { ...voiceState }; voiceState.assign(body); } catch (error) { voiceState = VoiceState.create({ @@ -83,34 +87,46 @@ export async function onVoiceStateUpdate(this: WebSocket, data: Payload) { }); } - // 'Fix' for this one voice state error. TODO: Find out why this is sent - // It seems to be sent on client load, - // so maybe its trying to find which server you were connected to before disconnecting, if any? - if (body.guild_id == null) { - return; + // if user left voice channel, send an update to previous channel/guild to let other people know that the user left + if ( + voiceState.session_id === this.session_id && + body.guild_id == null && + body.channel_id == null && + (prevState?.guild_id || prevState?.channel_id) + ) { + await emitEvent({ + event: "VOICE_STATE_UPDATE", + data: { ...voiceState, channel_id: null, guild_id: null }, + guild_id: prevState?.guild_id, + channel_id: prevState?.channel_id, + }); } //TODO the member should only have these properties: hoisted_role, deaf, joined_at, mute, roles, user //TODO the member.user should only have these properties: avatar, discriminator, id, username //TODO this may fail - voiceState.member = await Member.findOneOrFail({ - where: { id: voiceState.user_id, guild_id: voiceState.guild_id }, - relations: ["user", "roles"], - }); + if (body.guild_id) { + voiceState.member = await Member.findOneOrFail({ + where: { id: voiceState.user_id, guild_id: voiceState.guild_id }, + relations: ["user", "roles"], + }); + } //If the session changed we generate a new token if (voiceState.session_id !== this.session_id) voiceState.token = genVoiceToken(); voiceState.session_id = this.session_id; - const { id, ...newObj } = voiceState; + const { id, member, ...newObj } = voiceState; await Promise.all([ voiceState.save(), emitEvent({ event: "VOICE_STATE_UPDATE", - data: newObj, + data: { ...newObj, member: member?.toPublicMember() }, guild_id: voiceState.guild_id, + channel_id: voiceState.channel_id, + user_id: voiceState.user_id, } as VoiceStateUpdateEvent), ]); @@ -137,8 +153,10 @@ export async function onVoiceStateUpdate(this: WebSocket, data: Payload) { token: voiceState.token, guild_id: voiceState.guild_id, endpoint: guildRegion.endpoint, + channel_id: voiceState.guild_id + ? undefined + : voiceState.channel_id, // only DM voice calls have this set, and DM channel is one where guild_id is null }, - guild_id: voiceState.guild_id, user_id: voiceState.user_id, } as VoiceServerUpdateEvent); } diff --git a/src/gateway/util/Constants.ts b/src/gateway/util/Constants.ts index 0fc9545b..26c90dbe 100644 --- a/src/gateway/util/Constants.ts +++ b/src/gateway/util/Constants.ts @@ -16,8 +16,6 @@ along with this program. If not, see . */ -import { VoiceOPCodes } from "@spacebar/webrtc"; - export enum OPCODES { Dispatch = 0, Heartbeat = 1, @@ -63,7 +61,7 @@ export enum CLOSECODES { } export interface Payload { - op: OPCODES | VoiceOPCodes; + op: OPCODES; // eslint-disable-next-line @typescript-eslint/no-explicit-any d?: any; s?: number; diff --git a/src/gateway/util/Utils.ts b/src/gateway/util/Utils.ts new file mode 100644 index 00000000..38da3ab9 --- /dev/null +++ b/src/gateway/util/Utils.ts @@ -0,0 +1,43 @@ +export function parseStreamKey(streamKey: string): { + type: "guild" | "call"; + channelId: string; + guildId?: string; + userId: string; +} { + const streamKeyArray = streamKey.split(":"); + + const type = streamKeyArray.shift(); + + if (type !== "guild" && type !== "call") { + throw new Error(`Invalid stream key type: ${type}`); + } + + if ( + (type === "guild" && streamKeyArray.length < 3) || + (type === "call" && streamKeyArray.length < 2) + ) + throw new Error(`Invalid stream key: ${streamKey}`); // invalid stream key + + let guildId: string | undefined; + if (type === "guild") { + guildId = streamKeyArray.shift(); + } + const channelId = streamKeyArray.shift(); + const userId = streamKeyArray.shift(); + + if (!channelId || !userId) { + throw new Error(`Invalid stream key: ${streamKey}`); + } + return { type, channelId, guildId, userId }; +} + +export function generateStreamKey( + type: "guild" | "call", + guildId: string | undefined, + channelId: string, + userId: string, +): string { + const streamKey = `${type}${type === "guild" ? `:${guildId}` : ""}:${channelId}:${userId}`; + + return streamKey; +} diff --git a/src/gateway/util/WebSocket.ts b/src/gateway/util/WebSocket.ts index 4f41bf73..5c110840 100644 --- a/src/gateway/util/WebSocket.ts +++ b/src/gateway/util/WebSocket.ts @@ -20,7 +20,6 @@ import { Intents, ListenEventOpts, Permissions } from "@spacebar/util"; import WS from "ws"; import { Deflate, Inflate } from "fast-zlib"; import { Capabilities } from "./Capabilities"; -import { WebRtcClient } from "@spacebar/webrtc"; export interface WebSocket extends WS { version: number; @@ -42,7 +41,5 @@ export interface WebSocket extends WS { member_events: Record unknown>; listen_options: ListenEventOpts; capabilities?: Capabilities; - voiceWs?: WebRtcClient; - streamWs?: WebRtcClient; large_threshold: number; } diff --git a/src/gateway/util/index.ts b/src/gateway/util/index.ts index 6ef694d9..5a8c906b 100644 --- a/src/gateway/util/index.ts +++ b/src/gateway/util/index.ts @@ -22,3 +22,4 @@ export * from "./SessionUtils"; export * from "./Heartbeat"; export * from "./WebSocket"; export * from "./Capabilities"; +export * from "./Utils"; diff --git a/src/util/entities/Stream.ts b/src/util/entities/Stream.ts new file mode 100644 index 00000000..2787e3ce --- /dev/null +++ b/src/util/entities/Stream.ts @@ -0,0 +1,42 @@ +import { + Column, + Entity, + JoinColumn, + ManyToOne, + OneToMany, + RelationId, +} from "typeorm"; +import { BaseClass } from "./BaseClass"; +import { dbEngine } from "../util/Database"; +import { User } from "./User"; +import { Channel } from "./Channel"; +import { StreamSession } from "./StreamSession"; + +@Entity({ + name: "streams", + engine: dbEngine, +}) +export class Stream extends BaseClass { + @Column() + @RelationId((stream: Stream) => stream.owner) + owner_id: string; + + @JoinColumn({ name: "owner_id" }) + @ManyToOne(() => User, { + onDelete: "CASCADE", + }) + owner: User; + + @Column() + @RelationId((stream: Stream) => stream.channel) + channel_id: string; + + @JoinColumn({ name: "channel_id" }) + @ManyToOne(() => Channel, { + onDelete: "CASCADE", + }) + channel: Channel; + + @Column() + endpoint: string; +} diff --git a/src/util/entities/StreamSession.ts b/src/util/entities/StreamSession.ts new file mode 100644 index 00000000..6d7ccf9d --- /dev/null +++ b/src/util/entities/StreamSession.ts @@ -0,0 +1,48 @@ +import { + Column, + Entity, + JoinColumn, + ManyToOne, + OneToMany, + RelationId, +} from "typeorm"; +import { BaseClass } from "./BaseClass"; +import { dbEngine } from "../util/Database"; +import { User } from "./User"; +import { Stream } from "./Stream"; + +@Entity({ + name: "stream_sessions", + engine: dbEngine, +}) +export class StreamSession extends BaseClass { + @Column() + @RelationId((session: StreamSession) => session.stream) + stream_id: string; + + @JoinColumn({ name: "stream_id" }) + @ManyToOne(() => Stream, { + onDelete: "CASCADE", + }) + stream: Stream; + + @Column() + @RelationId((session: StreamSession) => session.user) + user_id: string; + + @JoinColumn({ name: "user_id" }) + @ManyToOne(() => User, { + onDelete: "CASCADE", + }) + user: User; + + @Column({ nullable: true }) + token: string; + + // this is for gateway session + @Column() + session_id: string; + + @Column({ default: false }) + used: boolean; +} diff --git a/src/util/entities/index.ts b/src/util/entities/index.ts index b2356aa7..6f132084 100644 --- a/src/util/entities/index.ts +++ b/src/util/entities/index.ts @@ -47,6 +47,8 @@ export * from "./SecurityKey"; export * from "./Session"; export * from "./Sticker"; export * from "./StickerPack"; +export * from "./Stream"; +export * from "./StreamSession"; export * from "./Team"; export * from "./TeamMember"; export * from "./Template"; diff --git a/src/util/interfaces/Event.ts b/src/util/interfaces/Event.ts index 40870614..b6101b9a 100644 --- a/src/util/interfaces/Event.ts +++ b/src/util/interfaces/Event.ts @@ -440,8 +440,9 @@ export interface VoiceServerUpdateEvent extends Event { event: "VOICE_SERVER_UPDATE"; data: { token: string; - guild_id: string; + guild_id: string | null; endpoint: string; + channel_id?: string; }; } @@ -700,6 +701,7 @@ export type EVENT = | "VOICE_SERVER_UPDATE" | "STREAM_CREATE" | "STREAM_SERVER_UPDATE" + | "STREAM_DELETE" | "APPLICATION_COMMAND_CREATE" | "APPLICATION_COMMAND_UPDATE" | "APPLICATION_COMMAND_DELETE" diff --git a/src/util/schemas/VoiceIdentifySchema.ts b/src/util/schemas/VoiceIdentifySchema.ts index e4436bf1..82f846c3 100644 --- a/src/util/schemas/VoiceIdentifySchema.ts +++ b/src/util/schemas/VoiceIdentifySchema.ts @@ -23,7 +23,7 @@ export interface VoiceIdentifySchema { token: string; video?: boolean; streams?: { - type: string; + type: "video" | "audio" | "screen"; rid: string; quality: number; }[]; diff --git a/src/util/schemas/VoiceVideoSchema.ts b/src/util/schemas/VoiceVideoSchema.ts index 0f43adc0..c621431b 100644 --- a/src/util/schemas/VoiceVideoSchema.ts +++ b/src/util/schemas/VoiceVideoSchema.ts @@ -22,7 +22,7 @@ export interface VoiceVideoSchema { rtx_ssrc?: number; user_id?: string; streams?: { - type: "video" | "audio"; + type: "video" | "audio" | "screen"; rid: string; ssrc: number; active: boolean; diff --git a/src/util/util/Event.ts b/src/util/util/Event.ts index bbc93aac..f56d6664 100644 --- a/src/util/util/Event.ts +++ b/src/util/util/Event.ts @@ -23,9 +23,9 @@ import { EVENT, Event } from "../interfaces"; export const events = new EventEmitter(); export async function emitEvent(payload: Omit) { - const id = (payload.channel_id || - payload.user_id || - payload.guild_id) as string; + const id = (payload.guild_id || + payload.channel_id || + payload.user_id) as string; if (!id) return console.error("event doesn't contain any id", payload); if (RabbitMQ.connection) { diff --git a/src/webrtc/Server.ts b/src/webrtc/Server.ts index 07949f8c..6127aef4 100644 --- a/src/webrtc/Server.ts +++ b/src/webrtc/Server.ts @@ -22,6 +22,7 @@ import http from "http"; import ws from "ws"; import { Connection } from "./events/Connection"; import { mediaServer } from "./util/MediaServer"; +import { green, yellow } from "picocolors"; dotenv.config(); export class Server { @@ -70,16 +71,22 @@ export class Server { await initDatabase(); await Config.init(); await initEvent(); + + // if we failed to load webrtc library + if (!mediaServer) { + console.log(`[WebRTC] ${yellow("WEBRTC disabled")}`); + return Promise.resolve(); + } await mediaServer.start(); if (!this.server.listening) { this.server.listen(this.port); - console.log(`[WebRTC] online on 0.0.0.0:${this.port}`); + console.log(`[WebRTC] ${green(`online on 0.0.0.0:${this.port}`)}`); } } async stop() { closeDatabase(); this.server.close(); - mediaServer.stop(); + mediaServer?.stop(); } } diff --git a/src/webrtc/events/Connection.ts b/src/webrtc/events/Connection.ts index 6c5bab03..a068a8fd 100644 --- a/src/webrtc/events/Connection.ts +++ b/src/webrtc/events/Connection.ts @@ -16,11 +16,11 @@ along with this program. If not, see . */ -import { CLOSECODES, Send, setHeartbeat, WebSocket } from "@spacebar/gateway"; +import { CLOSECODES, setHeartbeat } from "@spacebar/gateway"; import { IncomingMessage } from "http"; import { URL } from "url"; import WS from "ws"; -import { VoiceOPCodes } from "../util"; +import { VoiceOPCodes, WebRtcWebSocket, Send } from "../util"; import { onClose } from "./Close"; import { onMessage } from "./Message"; @@ -30,7 +30,7 @@ import { onMessage } from "./Message"; export async function Connection( this: WS.Server, - socket: WebSocket, + socket: WebRtcWebSocket, request: IncomingMessage, ) { try { diff --git a/src/webrtc/events/Message.ts b/src/webrtc/events/Message.ts index 6c805805..f503bd1e 100644 --- a/src/webrtc/events/Message.ts +++ b/src/webrtc/events/Message.ts @@ -16,10 +16,10 @@ along with this program. If not, see . */ -import { CLOSECODES, Payload, WebSocket } from "@spacebar/gateway"; +import { CLOSECODES } from "@spacebar/gateway"; import { Tuple } from "lambert-server"; import OPCodeHandlers from "../opcodes"; -import { VoiceOPCodes } from "../util"; +import { VoiceOPCodes, VoicePayload, WebRtcWebSocket } from "../util"; const PayloadSchema = { op: Number, @@ -28,9 +28,9 @@ const PayloadSchema = { $t: String, }; -export async function onMessage(this: WebSocket, buffer: Buffer) { +export async function onMessage(this: WebRtcWebSocket, buffer: Buffer) { try { - const data: Payload = JSON.parse(buffer.toString()); + const data: VoicePayload = JSON.parse(buffer.toString()); if (data.op !== VoiceOPCodes.IDENTIFY && !this.user_id) return this.close(CLOSECODES.Not_authenticated); diff --git a/src/webrtc/medooze/MedoozeSignalingDelegate.ts b/src/webrtc/medooze/MedoozeSignalingDelegate.ts index c3bf2566..9e05fb45 100644 --- a/src/webrtc/medooze/MedoozeSignalingDelegate.ts +++ b/src/webrtc/medooze/MedoozeSignalingDelegate.ts @@ -1,13 +1,7 @@ import { CodecInfo, MediaInfo, SDPInfo } from "semantic-sdp"; import { SignalingDelegate } from "../util/SignalingDelegate"; import { Codec, WebRtcClient } from "../util/WebRtcClient"; -import { - MediaServer, - IncomingStream, - OutgoingStream, - Transport, - Endpoint, -} from "@dank074/medooze-media-server"; +import { MediaServer, Endpoint } from "@dank074/medooze-media-server"; import { VoiceRoom } from "./VoiceRoom"; import { MedoozeWebRtcClient } from "./MedoozeWebRtcClient"; @@ -48,8 +42,26 @@ export class MedoozeSignalingDelegate implements SignalingDelegate { rtcServerId: string, userId: string, ws: any, + type: "guild-voice" | "dm-voice" | "stream", ): WebRtcClient { - const existingClient = this.getClientForUserId(userId); + // make sure user isn't already in a room of the same type + // user can be in two simultanous rooms of different type though (can be in a voice channel and watching a stream for example) + const rooms = this.rooms + .values() + .filter((room) => + type === "stream" + ? room.type === "stream" + : room.type === "dm-voice" || room.type === "guild-voice", + ); + let existingClient; + + for (const room of rooms) { + let result = room.getClientById(userId); + if (result) { + existingClient = result; + break; + } + } if (existingClient) { console.log("client already connected, disconnect.."); @@ -58,7 +70,7 @@ export class MedoozeSignalingDelegate implements SignalingDelegate { if (!this._rooms.has(rtcServerId)) { console.debug("no channel created, creating one..."); - this.createChannel(rtcServerId); + this.createRoom(rtcServerId, type); } const room = this._rooms.get(rtcServerId)!; @@ -208,8 +220,11 @@ export class MedoozeSignalingDelegate implements SignalingDelegate { throw new Error("Method not implemented."); } - public createChannel(rtcServerId: string): void { - this._rooms.set(rtcServerId, new VoiceRoom(rtcServerId, this)); + public createRoom( + rtcServerId: string, + type: "guild-voice" | "dm-voice" | "stream", + ): void { + this._rooms.set(rtcServerId, new VoiceRoom(rtcServerId, type, this)); } public disposeRoom(rtcServerId: string): void { diff --git a/src/webrtc/medooze/VoiceRoom.ts b/src/webrtc/medooze/VoiceRoom.ts index e220fcb3..55eb6fea 100644 --- a/src/webrtc/medooze/VoiceRoom.ts +++ b/src/webrtc/medooze/VoiceRoom.ts @@ -11,10 +11,15 @@ export class VoiceRoom { private _clients: Map; private _id: string; private _sfu: MedoozeSignalingDelegate; + private _type: "guild-voice" | "dm-voice" | "stream"; - constructor(id: string, sfu: MedoozeSignalingDelegate) { + constructor( + id: string, + type: "guild-voice" | "dm-voice" | "stream", + sfu: MedoozeSignalingDelegate, + ) { this._id = id; - + this._type = type; this._clients = new Map(); this._sfu = sfu; } @@ -98,6 +103,10 @@ export class VoiceRoom { return this._id; } + get type(): "guild-voice" | "dm-voice" | "stream" { + return this._type; + } + public dispose(): void { const clients = this._clients.values(); for (const client of clients) { diff --git a/src/webrtc/opcodes/BackendVersion.ts b/src/webrtc/opcodes/BackendVersion.ts index 60de3e58..c97f4b49 100644 --- a/src/webrtc/opcodes/BackendVersion.ts +++ b/src/webrtc/opcodes/BackendVersion.ts @@ -16,10 +16,12 @@ along with this program. If not, see . */ -import { Payload, Send, WebSocket } from "@spacebar/gateway"; -import { VoiceOPCodes } from "../util"; +import { VoiceOPCodes, VoicePayload, WebRtcWebSocket, Send } from "../util"; -export async function onBackendVersion(this: WebSocket, data: Payload) { +export async function onBackendVersion( + this: WebRtcWebSocket, + data: VoicePayload, +) { await Send(this, { op: VoiceOPCodes.VOICE_BACKEND_VERSION, d: { voice: "0.8.43", rtc_worker: "0.3.26" }, diff --git a/src/webrtc/opcodes/Heartbeat.ts b/src/webrtc/opcodes/Heartbeat.ts index 3d8e187b..ef3cae44 100644 --- a/src/webrtc/opcodes/Heartbeat.ts +++ b/src/webrtc/opcodes/Heartbeat.ts @@ -16,16 +16,10 @@ along with this program. If not, see . */ -import { - CLOSECODES, - Payload, - Send, - setHeartbeat, - WebSocket, -} from "@spacebar/gateway"; -import { VoiceOPCodes } from "../util"; +import { CLOSECODES, setHeartbeat } from "@spacebar/gateway"; +import { VoiceOPCodes, VoicePayload, WebRtcWebSocket, Send } from "../util"; -export async function onHeartbeat(this: WebSocket, data: Payload) { +export async function onHeartbeat(this: WebRtcWebSocket, data: VoicePayload) { setHeartbeat(this); if (isNaN(data.d)) return this.close(CLOSECODES.Decode_error); diff --git a/src/webrtc/opcodes/Identify.ts b/src/webrtc/opcodes/Identify.ts index 367c3537..3cadd865 100644 --- a/src/webrtc/opcodes/Identify.ts +++ b/src/webrtc/opcodes/Identify.ts @@ -16,31 +16,79 @@ along with this program. If not, see . */ -import { CLOSECODES, Payload, Send, WebSocket } from "@spacebar/gateway"; +import { CLOSECODES } from "@spacebar/gateway"; import { + StreamSession, validateSchema, VoiceIdentifySchema, VoiceState, } from "@spacebar/util"; -import { mediaServer, VoiceOPCodes } from "@spacebar/webrtc"; +import { + mediaServer, + VoiceOPCodes, + VoicePayload, + WebRtcWebSocket, + Send, +} from "@spacebar/webrtc"; -export async function onIdentify(this: WebSocket, data: Payload) { +export async function onIdentify(this: WebRtcWebSocket, data: VoicePayload) { clearTimeout(this.readyTimeout); const { server_id, user_id, session_id, token, streams, video } = validateSchema("VoiceIdentifySchema", data.d) as VoiceIdentifySchema; - const voiceState = await VoiceState.findOne({ - where: { guild_id: server_id, user_id, token, session_id }, + // server_id can be one of the following: a unique id for a GO Live stream, a channel id for a DM voice call, or a guild id for a guild voice channel + // not sure if there's a way to determine whether a snowflake is a channel id or a guild id without checking if it exists in db + // luckily we will only have to determine this once + let type: "guild-voice" | "dm-voice" | "stream"; + let authenticated = false; + + // first check if its a guild voice connection or DM voice call + let voiceState = await VoiceState.findOne({ + where: [ + { guild_id: server_id, user_id, token, session_id }, + { channel_id: server_id, user_id, token, session_id }, + ], }); - if (!voiceState) return this.close(CLOSECODES.Authentication_failed); + + if (voiceState) { + type = voiceState.guild_id === server_id ? "guild-voice" : "dm-voice"; + authenticated = true; + } else { + // if its not a guild/dm voice connection, check if it is a go live stream + const streamSession = await StreamSession.findOne({ + where: { + stream_id: server_id, + user_id, + token, + session_id, + used: false, + }, + }); + + if (streamSession) { + type = "stream"; + authenticated = true; + streamSession.used = true; + await streamSession.save(); + + this.once("close", async () => { + await streamSession.remove(); + }); + } + } + + // if it doesnt match any then not valid token + if (!authenticated) return this.close(CLOSECODES.Authentication_failed); this.user_id = user_id; this.session_id = session_id; - this.voiceWs = mediaServer.join(voiceState.channel_id, this.user_id, this); + this.type = type!; + this.webRtcClient = mediaServer.join(server_id, this.user_id, this, type!); this.on("close", () => { - mediaServer.onClientClose(this.voiceWs!); + // ice-lite media server relies on this to know when the peer went away + mediaServer.onClientClose(this.webRtcClient!); }); await Send(this, { diff --git a/src/webrtc/opcodes/SelectProtocol.ts b/src/webrtc/opcodes/SelectProtocol.ts index b419ffe7..16cfbaec 100644 --- a/src/webrtc/opcodes/SelectProtocol.ts +++ b/src/webrtc/opcodes/SelectProtocol.ts @@ -15,13 +15,20 @@ You should have received a copy of the GNU Affero General Public License along with this program. If not, see . */ - -import { Payload, Send, WebSocket } from "@spacebar/gateway"; import { SelectProtocolSchema, validateSchema } from "@spacebar/util"; -import { VoiceOPCodes, mediaServer } from "@spacebar/webrtc"; +import { + VoiceOPCodes, + VoicePayload, + WebRtcWebSocket, + mediaServer, + Send, +} from "@spacebar/webrtc"; -export async function onSelectProtocol(this: WebSocket, payload: Payload) { - if (!this.voiceWs) return; +export async function onSelectProtocol( + this: WebRtcWebSocket, + payload: VoicePayload, +) { + if (!this.webRtcClient) return; const data = validateSchema( "SelectProtocolSchema", @@ -29,7 +36,7 @@ export async function onSelectProtocol(this: WebSocket, payload: Payload) { ) as SelectProtocolSchema; const answer = await mediaServer.onOffer( - this.voiceWs, + this.webRtcClient, data.sdp!, data.codecs ?? [], ); diff --git a/src/webrtc/opcodes/Speaking.ts b/src/webrtc/opcodes/Speaking.ts index 9332a026..d44c83a5 100644 --- a/src/webrtc/opcodes/Speaking.ts +++ b/src/webrtc/opcodes/Speaking.ts @@ -16,16 +16,23 @@ along with this program. If not, see . */ -import { Payload, Send, WebSocket } from "@spacebar/gateway"; -import { mediaServer, VoiceOPCodes } from "../util"; +import { + mediaServer, + VoiceOPCodes, + VoicePayload, + WebRtcWebSocket, + Send, +} from "../util"; // {"speaking":1,"delay":5,"ssrc":2805246727} -export async function onSpeaking(this: WebSocket, data: Payload) { - if (!this.voiceWs) return; +export async function onSpeaking(this: WebRtcWebSocket, data: VoicePayload) { + if (!this.webRtcClient) return; mediaServer - .getClientsForRtcServer(this.voiceWs.rtc_server_id) + .getClientsForRtcServer( + this.webRtcClient.rtc_server_id, + ) .forEach((client) => { if (client.user_id === this.user_id) return; diff --git a/src/webrtc/opcodes/Video.ts b/src/webrtc/opcodes/Video.ts index f3b4d03a..61be1e60 100644 --- a/src/webrtc/opcodes/Video.ts +++ b/src/webrtc/opcodes/Video.ts @@ -15,14 +15,20 @@ You should have received a copy of the GNU Affero General Public License along with this program. If not, see . */ - -import { Payload, Send, WebSocket } from "@spacebar/gateway"; import { validateSchema, VoiceVideoSchema } from "@spacebar/util"; -import { mediaServer, VoiceOPCodes, WebRtcClient } from "@spacebar/webrtc"; +import { + mediaServer, + VoiceOPCodes, + VoicePayload, + WebRtcClient, + WebRtcWebSocket, + Send, +} from "@spacebar/webrtc"; -export async function onVideo(this: WebSocket, payload: Payload) { - if (!this.voiceWs || !this.voiceWs.webrtcConnected) return; - const { rtc_server_id: channel_id } = this.voiceWs; +export async function onVideo(this: WebRtcWebSocket, payload: VoicePayload) { + if (!this.webRtcClient || !this.webRtcClient.webrtcConnected) return; + + const { rtc_server_id } = this.webRtcClient; const d = validateSchema("VoiceVideoSchema", payload.d) as VoiceVideoSchema; @@ -30,9 +36,9 @@ export async function onVideo(this: WebSocket, payload: Payload) { await Send(this, { op: VoiceOPCodes.MEDIA_SINK_WANTS, d: { any: 100 } }); - const ssrcs = this.voiceWs.getIncomingStreamSSRCs(); + const ssrcs = this.webRtcClient.getIncomingStreamSSRCs(); - const clientsThatNeedUpdate = new Set>(); + const clientsThatNeedUpdate = new Set>(); // check if client has signaled that it will send audio if (d.audio_ssrc !== 0) { @@ -41,12 +47,14 @@ export async function onVideo(this: WebSocket, payload: Payload) { console.log( `[${this.user_id}] publishing new audio track ssrc:${d.audio_ssrc}`, ); - this.voiceWs.publishTrack("audio", { audio_ssrc: d.audio_ssrc }); + this.webRtcClient.publishTrack("audio", { + audio_ssrc: d.audio_ssrc, + }); } // now check that all clients have outgoing media for this ssrcs - for (const client of mediaServer.getClientsForRtcServer( - channel_id, + for (const client of mediaServer.getClientsForRtcServer( + rtc_server_id, )) { if (client.user_id === this.user_id) continue; @@ -55,7 +63,7 @@ export async function onVideo(this: WebSocket, payload: Payload) { console.log( `[${client.user_id}] subscribing to audio track ssrcs: ${d.audio_ssrc}`, ); - client.subscribeToTrack(this.voiceWs.user_id, "audio"); + client.subscribeToTrack(this.webRtcClient.user_id, "audio"); clientsThatNeedUpdate.add(client); } @@ -68,26 +76,26 @@ export async function onVideo(this: WebSocket, payload: Payload) { console.log( `[${this.user_id}] publishing new video track ssrc:${d.video_ssrc}`, ); - this.voiceWs.publishTrack("video", { + this.webRtcClient.publishTrack("video", { video_ssrc: d.video_ssrc, rtx_ssrc: d.rtx_ssrc, }); } // now check that all clients have outgoing media for this ssrcs - for (const client of mediaServer.getClientsForRtcServer( - channel_id, + for (const client of mediaServer.getClientsForRtcServer( + rtc_server_id, )) { if (client.user_id === this.user_id) continue; const ssrcs = client.getOutgoingStreamSSRCsForUser( - this.voiceWs.user_id, + this.webRtcClient.user_id, ); if (ssrcs.video_ssrc != d.video_ssrc) { console.log( `[${client.user_id}] subscribing to video track ssrc: ${d.video_ssrc}`, ); - client.subscribeToTrack(this.voiceWs.user_id, "video"); + client.subscribeToTrack(this.webRtcClient.user_id, "video"); clientsThatNeedUpdate.add(client); } diff --git a/src/webrtc/opcodes/index.ts b/src/webrtc/opcodes/index.ts index 217bd5ab..71c5f2e7 100644 --- a/src/webrtc/opcodes/index.ts +++ b/src/webrtc/opcodes/index.ts @@ -16,8 +16,7 @@ along with this program. If not, see . */ -import { Payload, WebSocket } from "@spacebar/gateway"; -import { VoiceOPCodes } from "../util"; +import { VoiceOPCodes, VoicePayload, WebRtcWebSocket } from "../util"; import { onBackendVersion } from "./BackendVersion"; import { onHeartbeat } from "./Heartbeat"; import { onIdentify } from "./Identify"; @@ -25,7 +24,7 @@ import { onSelectProtocol } from "./SelectProtocol"; import { onSpeaking } from "./Speaking"; import { onVideo } from "./Video"; -export type OPCodeHandler = (this: WebSocket, data: Payload) => any; +export type OPCodeHandler = (this: WebRtcWebSocket, data: VoicePayload) => any; export default { [VoiceOPCodes.HEARTBEAT]: onHeartbeat, diff --git a/src/webrtc/util/Constants.ts b/src/webrtc/util/Constants.ts index dba1c511..b28ec94e 100644 --- a/src/webrtc/util/Constants.ts +++ b/src/webrtc/util/Constants.ts @@ -16,6 +16,8 @@ along with this program. If not, see . */ +import { Payload } from "@spacebar/gateway"; + export enum VoiceStatus { CONNECTED = 0, CONNECTING = 1, @@ -42,3 +44,5 @@ export enum VoiceOPCodes { VOICE_BACKEND_VERSION = 16, CHANNEL_OPTIONS_UPDATE = 17, } + +export type VoicePayload = Omit & { op: VoiceOPCodes }; diff --git a/src/webrtc/util/MediaServer.ts b/src/webrtc/util/MediaServer.ts index 5c13a50a..befacbce 100644 --- a/src/webrtc/util/MediaServer.ts +++ b/src/webrtc/util/MediaServer.ts @@ -18,6 +18,7 @@ //import { MedoozeSignalingDelegate } from "../medooze/MedoozeSignalingDelegate"; import { SignalingDelegate } from "./SignalingDelegate"; +import { green, red } from "picocolors"; export let mediaServer: SignalingDelegate; @@ -27,9 +28,13 @@ export let mediaServer: SignalingDelegate; mediaServer = new ( await import("../medooze/MedoozeSignalingDelegate") ).MedoozeSignalingDelegate(); + + console.log( + `[WebRTC] ${green("Succesfully loaded MedoozeSignalingDelegate")}`, + ); } catch (e) { - console.error("Failed to import MedoozeSignalingDelegate", e); - // Fallback to a different implementation or handle the error - // For example, you could set mediaServer to null or throw an error + console.log( + `[WebRTC] ${red("Failed to import MedoozeSignalingDelegate")}`, + ); } })(); diff --git a/src/webrtc/util/Send.ts b/src/webrtc/util/Send.ts new file mode 100644 index 00000000..7f8ab4dd --- /dev/null +++ b/src/webrtc/util/Send.ts @@ -0,0 +1,27 @@ +import { JSONReplacer } from "@spacebar/util"; +import { VoicePayload } from "./Constants"; +import { WebRtcWebSocket } from "./WebRtcWebSocket"; + +export function Send(socket: WebRtcWebSocket, data: VoicePayload) { + if (process.env.WRTC_WS_VERBOSE) + console.log(`[WebRTC] Outgoing message: ${JSON.stringify(data)}`); + + let buffer: Buffer | string; + + // TODO: encode circular object + if (socket.encoding === "json") buffer = JSON.stringify(data, JSONReplacer); + else return; + + return new Promise((res, rej) => { + if (socket.readyState !== 1) { + // return rej("socket not open"); + socket.close(); + return; + } + + socket.send(buffer, (err) => { + if (err) return rej(err); + return res(null); + }); + }); +} diff --git a/src/webrtc/util/SignalingDelegate.ts b/src/webrtc/util/SignalingDelegate.ts index 6f050f53..e2f010d2 100644 --- a/src/webrtc/util/SignalingDelegate.ts +++ b/src/webrtc/util/SignalingDelegate.ts @@ -3,7 +3,12 @@ import { Codec, WebRtcClient } from "./WebRtcClient"; export interface SignalingDelegate { start: () => Promise; stop: () => Promise; - join(rtcServerId: string, userId: string, ws: T): WebRtcClient; + join( + rtcServerId: string, + userId: string, + ws: T, + type: "guild-voice" | "dm-voice" | "stream", + ): WebRtcClient; onOffer( client: WebRtcClient, offer: string, diff --git a/src/webrtc/util/WebRtcWebSocket.ts b/src/webrtc/util/WebRtcWebSocket.ts new file mode 100644 index 00000000..086e1247 --- /dev/null +++ b/src/webrtc/util/WebRtcWebSocket.ts @@ -0,0 +1,7 @@ +import { WebSocket } from "@spacebar/gateway"; +import { WebRtcClient } from "./WebRtcClient"; + +export interface WebRtcWebSocket extends WebSocket { + type: "guild-voice" | "dm-voice" | "stream"; + webRtcClient?: WebRtcClient; +} diff --git a/src/webrtc/util/index.ts b/src/webrtc/util/index.ts index e50c72d1..aa33ad2d 100644 --- a/src/webrtc/util/index.ts +++ b/src/webrtc/util/index.ts @@ -19,3 +19,5 @@ export * from "./Constants"; export * from "./MediaServer"; export * from "./WebRtcClient"; +export * from "./WebRtcWebSocket"; +export * from "./Send";