diff --git a/src/gateway/opcodes/StreamCreate.ts b/src/gateway/opcodes/StreamCreate.ts index ea0ada04..82c2c243 100644 --- a/src/gateway/opcodes/StreamCreate.ts +++ b/src/gateway/opcodes/StreamCreate.ts @@ -5,6 +5,7 @@ import { generateStreamKey, } from "@spacebar/gateway"; import { + Channel, Config, emitEvent, Region, @@ -28,6 +29,16 @@ export async function onStreamCreate(this: WebSocket, data: Payload) { // TODO: permissions check - if it's a guild, check if user is allowed to create stream in this guild + const channel = await Channel.findOne({ + where: { id: body.channel_id }, + }); + + if ( + !channel || + (body.type === "guild" && channel.guild_id != body.guild_id) + ) + return this.close(4000, "invalid channel"); + // TODO: actually apply preferred_region from the event payload const regions = Config.get().regions; const guildRegion = regions.available.filter( diff --git a/src/gateway/opcodes/StreamWatch.ts b/src/gateway/opcodes/StreamWatch.ts index 341076ae..2b42f46a 100644 --- a/src/gateway/opcodes/StreamWatch.ts +++ b/src/gateway/opcodes/StreamWatch.ts @@ -40,6 +40,13 @@ export async function onStreamWatch(this: WebSocket, data: Payload) { if (type === "guild" && stream.channel.guild_id != guildId) return this.close(4000, "Invalid stream key"); + const regions = Config.get().regions; + const guildRegion = regions.available.find( + (r) => r.endpoint === stream.endpoint, + ); + + if (!guildRegion) return this.close(4000, "Unknown region"); + const streamSession = StreamSession.create({ stream_id: stream.id, user_id: this.user_id, @@ -49,13 +56,6 @@ export async function onStreamWatch(this: WebSocket, data: Payload) { await streamSession.save(); - const regions = Config.get().regions; - const guildRegion = regions.available.find( - (r) => r.endpoint === stream.endpoint, - ); - - if (!guildRegion) return this.close(4000, "Unknown region"); - const viewers = await StreamSession.find({ where: { stream_id: stream.id }, }); diff --git a/src/webrtc/opcodes/Speaking.ts b/src/webrtc/opcodes/Speaking.ts index d44c83a5..da55b5ac 100644 --- a/src/webrtc/opcodes/Speaking.ts +++ b/src/webrtc/opcodes/Speaking.ts @@ -29,16 +29,17 @@ import { export async function onSpeaking(this: WebRtcWebSocket, data: VoicePayload) { if (!this.webRtcClient) return; - mediaServer - .getClientsForRtcServer( - this.webRtcClient.rtc_server_id, - ) - .forEach((client) => { - if (client.user_id === this.user_id) return; + Promise.all( + Array.from( + mediaServer.getClientsForRtcServer( + this.webRtcClient.rtc_server_id, + ), + ).map((client) => { + if (client.user_id === this.user_id) return Promise.resolve(); const ssrc = client.getOutgoingStreamSSRCsForUser(this.user_id); - Send(client.websocket, { + return Send(client.websocket, { op: VoiceOPCodes.SPEAKING, d: { user_id: this.user_id, @@ -46,5 +47,6 @@ export async function onSpeaking(this: WebRtcWebSocket, data: VoicePayload) { ssrc: ssrc.audio_ssrc ?? 0, }, }); - }); + }), + ); } diff --git a/src/webrtc/opcodes/Video.ts b/src/webrtc/opcodes/Video.ts index 1f21be83..391255bf 100644 --- a/src/webrtc/opcodes/Video.ts +++ b/src/webrtc/opcodes/Video.ts @@ -15,7 +15,7 @@ You should have received a copy of the GNU Affero General Public License along with this program. If not, see . */ -import { validateSchema, VoiceVideoSchema } from "@spacebar/util"; +import { Stream, validateSchema, VoiceVideoSchema } from "@spacebar/util"; import { mediaServer, VoiceOPCodes, @@ -32,6 +32,15 @@ export async function onVideo(this: WebRtcWebSocket, payload: VoicePayload) { const d = validateSchema("VoiceVideoSchema", payload.d) as VoiceVideoSchema; + if (this.type === "stream") { + const stream = await Stream.findOne({ + where: { id: rtc_server_id }, + }); + + // only the stream owner can publish to a go live stream + if (stream?.owner_id != this.user_id) return; + } + const stream = d.streams?.find((element) => element !== undefined); await Send(this, { op: VoiceOPCodes.MEDIA_SINK_WANTS, d: { any: 100 } }); @@ -102,22 +111,24 @@ export async function onVideo(this: WebRtcWebSocket, payload: VoicePayload) { } } - for (const client of clientsThatNeedUpdate) { - const ssrcs = client.getOutgoingStreamSSRCsForUser(this.user_id); + await Promise.all( + Array.from(clientsThatNeedUpdate).map((client) => { + const ssrcs = client.getOutgoingStreamSSRCsForUser(this.user_id); - Send(client.websocket, { - op: VoiceOPCodes.VIDEO, - d: { - user_id: this.user_id, - audio_ssrc: ssrcs.audio_ssrc ?? 0, - video_ssrc: ssrcs.video_ssrc ?? 0, - rtx_ssrc: ssrcs.rtx_ssrc ?? 0, - streams: d.streams?.map((x) => ({ - ...x, - ssrc: ssrcs.video_ssrc ?? 0, + return Send(client.websocket, { + op: VoiceOPCodes.VIDEO, + d: { + user_id: this.user_id, + audio_ssrc: ssrcs.audio_ssrc ?? 0, + video_ssrc: ssrcs.video_ssrc ?? 0, rtx_ssrc: ssrcs.rtx_ssrc ?? 0, - })), - } as VoiceVideoSchema, - }); - } + streams: d.streams?.map((x) => ({ + ...x, + ssrc: ssrcs.video_ssrc ?? 0, + rtx_ssrc: ssrcs.rtx_ssrc ?? 0, + })), + } as VoiceVideoSchema, + }); + }), + ); }