diff --git a/src/gateway/opcodes/StreamCreate.ts b/src/gateway/opcodes/StreamCreate.ts index 608b6792..cf1ad579 100644 --- a/src/gateway/opcodes/StreamCreate.ts +++ b/src/gateway/opcodes/StreamCreate.ts @@ -8,11 +8,14 @@ import { Channel, Config, emitEvent, + Member, Region, Snowflake, Stream, StreamCreateSchema, StreamSession, + VoiceState, + VoiceStateUpdateEvent, } from "@spacebar/util"; import { check } from "./instanceOf"; @@ -20,9 +23,22 @@ export async function onStreamCreate(this: WebSocket, data: Payload) { check.call(this, StreamCreateSchema, data.d); const body = data.d as StreamCreateSchema; - // TODO: first check if we are in a voice channel already. cannot create a stream if there's no existing voice connection if (body.channel_id.trim().length === 0) return; + // first check if we are in a voice channel already. cannot create a stream if there's no existing voice connection + const voiceState = await VoiceState.findOne({ + where: { user_id: this.user_id }, + }); + + if (!voiceState || !voiceState.channel_id) return; + + if (body.guild_id) { + voiceState.member = await Member.findOneOrFail({ + where: { id: voiceState.user_id, guild_id: voiceState.guild_id }, + relations: ["user", "roles"], + }); + } + // TODO: permissions check - if it's a guild, check if user is allowed to create stream in this guild const channel = await Channel.findOne({ @@ -41,6 +57,11 @@ export async function onStreamCreate(this: WebSocket, data: Payload) { (r) => r.id === regions.default, )[0]; + // first make sure theres no other streams for this user that somehow didnt get cleared + await Stream.delete({ + owner_id: this.user_id, + }); + // create a new entry in db containing the token for authenticating user in stream gateway IDENTIFY const stream = Stream.create({ id: Snowflake.generate(), @@ -78,8 +99,7 @@ export async function onStreamCreate(this: WebSocket, data: Payload) { region: guildRegion.name, paused: false, }, - guild_id: body.guild_id, - channel_id: body.channel_id, + user_id: this.user_id, }); await emitEvent({ @@ -92,6 +112,16 @@ export async function onStreamCreate(this: WebSocket, data: Payload) { }, user_id: this.user_id, }); + + voiceState.self_stream = true; + await voiceState.save(); + + await emitEvent({ + event: "VOICE_STATE_UPDATE", + data: { ...voiceState }, + guild_id: voiceState.guild_id, + channel_id: voiceState.channel_id, + } as VoiceStateUpdateEvent); } //stream key: diff --git a/src/gateway/opcodes/StreamDelete.ts b/src/gateway/opcodes/StreamDelete.ts index c6f006ef..e9d0f6b3 100644 --- a/src/gateway/opcodes/StreamDelete.ts +++ b/src/gateway/opcodes/StreamDelete.ts @@ -1,5 +1,11 @@ import { parseStreamKey, Payload, WebSocket } from "@spacebar/gateway"; -import { emitEvent, Stream, StreamDeleteSchema } from "@spacebar/util"; +import { + emitEvent, + Stream, + StreamDeleteSchema, + VoiceState, + VoiceStateUpdateEvent, +} from "@spacebar/util"; import { check } from "./instanceOf"; export async function onStreamDelete(this: WebSocket, data: Payload) { @@ -33,6 +39,22 @@ export async function onStreamDelete(this: WebSocket, data: Payload) { await stream.remove(); + const voiceState = await VoiceState.findOne({ + where: { user_id: this.user_id }, + }); + + if (voiceState) { + voiceState.self_stream = false; + await voiceState.save(); + + await emitEvent({ + event: "VOICE_STATE_UPDATE", + data: { ...voiceState }, + guild_id: guildId, + channel_id: channelId, + } as VoiceStateUpdateEvent); + } + await emitEvent({ event: "STREAM_DELETE", data: { diff --git a/src/gateway/opcodes/StreamWatch.ts b/src/gateway/opcodes/StreamWatch.ts index 119d4cad..1eee4cbd 100644 --- a/src/gateway/opcodes/StreamWatch.ts +++ b/src/gateway/opcodes/StreamWatch.ts @@ -79,8 +79,8 @@ export async function onStreamWatch(this: WebSocket, data: Payload) { region: guildRegion.name, paused: false, }, - guild_id: guildId, channel_id: channelId, + user_id: this.user_id, }); await emitEvent({ diff --git a/src/webrtc/opcodes/Identify.ts b/src/webrtc/opcodes/Identify.ts index adf47da3..65bfad06 100644 --- a/src/webrtc/opcodes/Identify.ts +++ b/src/webrtc/opcodes/Identify.ts @@ -29,8 +29,10 @@ import { VoicePayload, WebRtcWebSocket, Send, + generateSsrc, } from "@spacebar/webrtc"; import { subscribeToProducers } from "./Video"; +import { SSRCs } from "spacebar-webrtc-types"; export async function onIdentify(this: WebRtcWebSocket, data: VoicePayload) { clearTimeout(this.readyTimeout); @@ -110,15 +112,19 @@ export async function onIdentify(this: WebRtcWebSocket, data: VoicePayload) { await subscribeToProducers.call(this); }); + // the server generates a unique ssrc for the audio and video stream. Must be unique among users connected to same server + // UDP clients will respect this ssrc, but websocket clients will generate and replace it with their own + const generatedSsrc: SSRCs = { + audio_ssrc: generateSsrc(), + video_ssrc: generateSsrc(), + rtx_ssrc: generateSsrc(), + }; + this.webRtcClient.initIncomingSSRCs(generatedSsrc); + await Send(this, { op: VoiceOPCodes.READY, d: { - streams: streams?.map((x) => ({ - ...x, - ssrc: 2, - rtx_ssrc: 3, - })), - ssrc: 1, + ssrc: generatedSsrc.audio_ssrc, port: mediaServer.port, modes: [ "aead_aes256_gcm_rtpsize", @@ -131,6 +137,12 @@ export async function onIdentify(this: WebRtcWebSocket, data: VoicePayload) { ], ip: mediaServer.ip, experiments: [], + streams: streams?.map((x) => ({ + ...x, + ssrc: generatedSsrc.video_ssrc, + rtx_ssrc: generatedSsrc.rtx_ssrc, + type: "video", // client expects this to be overriden for some reason??? + })), }, }); } diff --git a/src/webrtc/opcodes/Video.ts b/src/webrtc/opcodes/Video.ts index 55802b2f..e0379506 100644 --- a/src/webrtc/opcodes/Video.ts +++ b/src/webrtc/opcodes/Video.ts @@ -92,7 +92,7 @@ export async function onVideo(this: WebRtcWebSocket, payload: VoicePayload) { } // check if client has signaled that it will send video if (wantsToProduceVideo) { - this.webRtcClient!.videoStream = stream; + this.webRtcClient!.videoStream = { ...stream, type: "video" }; // client sends "screen" on go live but expects "video" on response // check if we are already publishing video, if not, publish a new video track for it if (!this.webRtcClient!.isProducingVideo()) { console.log( @@ -129,13 +129,16 @@ export async function onVideo(this: WebRtcWebSocket, payload: VoicePayload) { op: VoiceOPCodes.VIDEO, d: { user_id: this.user_id, - audio_ssrc: ssrcs.audio_ssrc ?? 0, + audio_ssrc: + ssrcs.audio_ssrc ?? + this.webRtcClient!.getIncomingStreamSSRCs().audio_ssrc, // can never send audio ssrc as 0, it will mess up client state for some reason video_ssrc: ssrcs.video_ssrc ?? 0, rtx_ssrc: ssrcs.rtx_ssrc ?? 0, streams: d.streams?.map((x) => ({ ...x, ssrc: ssrcs.video_ssrc ?? 0, rtx_ssrc: ssrcs.rtx_ssrc ?? 0, + type: "video", })), } as VoiceVideoSchema, }); @@ -157,6 +160,8 @@ export async function subscribeToProducers( Array.from(clients).map((client) => { let needsUpdate = false; + if (client.user_id === this.user_id) return Promise.resolve(); // cannot subscribe to self + if (!client.isProducingAudio() && !client.isProducingVideo) return Promise.resolve(); @@ -186,17 +191,19 @@ export async function subscribeToProducers( op: VoiceOPCodes.VIDEO, d: { user_id: client.user_id, - audio_ssrc: ssrcs.audio_ssrc ?? 0, + audio_ssrc: + ssrcs.audio_ssrc ?? + client.getIncomingStreamSSRCs().audio_ssrc, // can never send audio ssrc as 0, it will mess up client state for some reason video_ssrc: ssrcs.video_ssrc ?? 0, rtx_ssrc: ssrcs.rtx_ssrc ?? 0, streams: [ client.videoStream ?? { - type: this.type === "stream" ? "screen" : "video", + type: "video", rid: "100", ssrc: ssrcs.video_ssrc ?? 0, active: client.isProducingVideo(), quality: 100, - rtx_ssrc: ssrcs.rtx_ssrc, + rtx_ssrc: ssrcs.rtx_ssrc ?? 0, max_bitrate: 2500000, max_framerate: 20, max_resolution: { diff --git a/src/webrtc/util/MediaServer.ts b/src/webrtc/util/MediaServer.ts index 35abf3db..848b3f73 100644 --- a/src/webrtc/util/MediaServer.ts +++ b/src/webrtc/util/MediaServer.ts @@ -65,3 +65,13 @@ export const loadWebRtcLibrary = async () => { return Promise.reject(); } }; + +const MAX_INT32BIT = 2 ** 32; + +let count = 1; +export const generateSsrc = () => { + count++; + if (count >= MAX_INT32BIT) count = 1; + + return count; +};