From c845d2070cf8e8c2cdff8ae8198234c3501efe45 Mon Sep 17 00:00:00 2001 From: dank074 Date: Thu, 17 Apr 2025 23:31:42 -0500 Subject: [PATCH] updates --- package-lock.json | 2 +- src/webrtc/opcodes/Identify.ts | 6 ++ src/webrtc/opcodes/SelectProtocol.ts | 6 +- src/webrtc/opcodes/Video.ts | 92 +++++++++++++++++++++++----- 4 files changed, 87 insertions(+), 19 deletions(-) diff --git a/package-lock.json b/package-lock.json index 66cc2c18..46b8b2d2 100644 --- a/package-lock.json +++ b/package-lock.json @@ -9377,7 +9377,7 @@ }, "node_modules/spacebar-webrtc-types": { "version": "1.0.1", - "resolved": "git+ssh://git@github.com/dank074/spacebar-webrtc-types.git#20e9d4d838e053101487813e666b759f3c486fce", + "resolved": "git+ssh://git@github.com/dank074/spacebar-webrtc-types.git#04044902e71f2c80fc39c256ec5d27fdb8ca425a", "dev": true, "license": "ISC" }, diff --git a/src/webrtc/opcodes/Identify.ts b/src/webrtc/opcodes/Identify.ts index 3cadd865..2c93e924 100644 --- a/src/webrtc/opcodes/Identify.ts +++ b/src/webrtc/opcodes/Identify.ts @@ -30,6 +30,7 @@ import { WebRtcWebSocket, Send, } from "@spacebar/webrtc"; +import { subscribeToProducers } from "./Video"; export async function onIdentify(this: WebRtcWebSocket, data: VoicePayload) { clearTimeout(this.readyTimeout); @@ -91,6 +92,11 @@ export async function onIdentify(this: WebRtcWebSocket, data: VoicePayload) { mediaServer.onClientClose(this.webRtcClient!); }); + // once connected subscribe to tracks from other users + this.webRtcClient.emitter.once("connected", async () => { + await subscribeToProducers.call(this); + }); + await Send(this, { op: VoiceOPCodes.READY, d: { diff --git a/src/webrtc/opcodes/SelectProtocol.ts b/src/webrtc/opcodes/SelectProtocol.ts index 16cfbaec..dfd9714f 100644 --- a/src/webrtc/opcodes/SelectProtocol.ts +++ b/src/webrtc/opcodes/SelectProtocol.ts @@ -35,7 +35,7 @@ export async function onSelectProtocol( payload.d, ) as SelectProtocolSchema; - const answer = await mediaServer.onOffer( + const response = await mediaServer.onOffer( this.webRtcClient, data.sdp!, data.codecs ?? [], @@ -44,8 +44,8 @@ export async function onSelectProtocol( await Send(this, { op: VoiceOPCodes.SESSION_DESCRIPTION, d: { - video_codec: "H264", - sdp: answer, + video_codec: response.selectedVideoCodec, + sdp: response.sdp, media_session_id: this.session_id, audio_codec: "opus", }, diff --git a/src/webrtc/opcodes/Video.ts b/src/webrtc/opcodes/Video.ts index 326c7b05..8a0745b8 100644 --- a/src/webrtc/opcodes/Video.ts +++ b/src/webrtc/opcodes/Video.ts @@ -45,18 +45,16 @@ export async function onVideo(this: WebRtcWebSocket, payload: VoicePayload) { } } - const stream = d.streams?.find((element) => element !== undefined); + const stream = d.streams?.find((element) => element.active); await Send(this, { op: VoiceOPCodes.MEDIA_SINK_WANTS, d: { any: 100 } }); - const ssrcs = this.webRtcClient.getIncomingStreamSSRCs(); - const clientsThatNeedUpdate = new Set>(); // check if client has signaled that it will send audio if (d.audio_ssrc !== 0) { - // check if we already have incoming media for this ssrcs, if not, publish a new audio track for it - if (ssrcs.audio_ssrc != d.audio_ssrc) { + // check if we are already producing audio, if not, publish a new audio track for it + if (!this.webRtcClient!.isProducingAudio()) { console.log( `[${this.user_id}] publishing new audio track ssrc:${d.audio_ssrc}`, ); @@ -65,14 +63,13 @@ export async function onVideo(this: WebRtcWebSocket, payload: VoicePayload) { }); } - // now check that all clients have outgoing media for this ssrcs + // now check that all clients have subscribed to our audio for (const client of mediaServer.getClientsForRtcServer( rtc_server_id, )) { if (client.user_id === this.user_id) continue; - const ssrcs = client.getOutgoingStreamSSRCsForUser(this.user_id); - if (ssrcs.audio_ssrc != d.audio_ssrc) { + if (!client.isSubscribedToTrack(this.user_id, "audio")) { console.log( `[${client.user_id}] subscribing to audio track ssrcs: ${d.audio_ssrc}`, ); @@ -84,8 +81,9 @@ export async function onVideo(this: WebRtcWebSocket, payload: VoicePayload) { } // check if client has signaled that it will send video if (d.video_ssrc !== 0 && stream?.active) { - // check if we already have incoming media for this ssrcs, if not, publish a new video track for it - if (ssrcs.video_ssrc != d.video_ssrc) { + this.webRtcClient!.videoStream = stream; + // check if we are already publishing video, if not, publish a new video track for it + if (!this.webRtcClient!.isProducingVideo()) { console.log( `[${this.user_id}] publishing new video track ssrc:${d.video_ssrc}`, ); @@ -95,16 +93,13 @@ export async function onVideo(this: WebRtcWebSocket, payload: VoicePayload) { }); } - // now check that all clients have outgoing media for this ssrcs + // now check that all clients have subscribed to our video track for (const client of mediaServer.getClientsForRtcServer( rtc_server_id, )) { if (client.user_id === this.user_id) continue; - const ssrcs = client.getOutgoingStreamSSRCsForUser( - this.webRtcClient.user_id, - ); - if (ssrcs.video_ssrc != d.video_ssrc) { + if (!client.isSubscribedToTrack(this.user_id, "video")) { console.log( `[${client.user_id}] subscribing to video track ssrc: ${d.video_ssrc}`, ); @@ -136,3 +131,70 @@ export async function onVideo(this: WebRtcWebSocket, payload: VoicePayload) { }), ); } + +// check if we are not subscribed to producers in this server, if not, subscribe +export async function subscribeToProducers( + this: WebRtcWebSocket, +): Promise { + const clients = mediaServer.getClientsForRtcServer( + this.webRtcClient!.rtc_server_id, + ); + + await Promise.all( + Array.from(clients).map((client) => { + let needsUpdate = false; + + if (!client.isProducingAudio() && !client.isProducingVideo) + return Promise.resolve(); + + if ( + client.isProducingAudio() && + !this.webRtcClient!.isSubscribedToTrack(client.user_id, "audio") + ) { + this.webRtcClient!.subscribeToTrack(client.user_id, "audio"); + needsUpdate = true; + } + + if ( + client.isProducingVideo() && + !this.webRtcClient!.isSubscribedToTrack(client.user_id, "video") + ) { + this.webRtcClient!.subscribeToTrack(client.user_id, "video"); + needsUpdate = true; + } + + if (!needsUpdate) return Promise.resolve(); + + const ssrcs = this.webRtcClient!.getOutgoingStreamSSRCsForUser( + client.user_id, + ); + + return Send(this, { + op: VoiceOPCodes.VIDEO, + d: { + user_id: client.user_id, + audio_ssrc: ssrcs.audio_ssrc ?? 0, + video_ssrc: ssrcs.video_ssrc ?? 0, + rtx_ssrc: ssrcs.rtx_ssrc ?? 0, + streams: [ + client.videoStream ?? { + type: this.type === "stream" ? "screen" : "video", + rid: "100", + ssrc: ssrcs.video_ssrc ?? 0, + active: client.isProducingVideo(), + quality: 100, + rtx_ssrc: ssrcs.rtx_ssrc, + max_bitrate: 2500000, + max_framerate: 20, + max_resolution: { + type: "fixed", + width: 1280, + height: 720, + }, + }, + ], + } as VoiceVideoSchema, + }); + }), + ); +}