This commit is contained in:
dank074 2025-04-16 23:40:38 -05:00
parent 5bae44faf0
commit 1ff38287a6
4 changed files with 56 additions and 32 deletions

View File

@ -5,6 +5,7 @@ import {
generateStreamKey, generateStreamKey,
} from "@spacebar/gateway"; } from "@spacebar/gateway";
import { import {
Channel,
Config, Config,
emitEvent, emitEvent,
Region, Region,
@ -28,6 +29,16 @@ export async function onStreamCreate(this: WebSocket, data: Payload) {
// TODO: permissions check - if it's a guild, check if user is allowed to create stream in this guild // TODO: permissions check - if it's a guild, check if user is allowed to create stream in this guild
const channel = await Channel.findOne({
where: { id: body.channel_id },
});
if (
!channel ||
(body.type === "guild" && channel.guild_id != body.guild_id)
)
return this.close(4000, "invalid channel");
// TODO: actually apply preferred_region from the event payload // TODO: actually apply preferred_region from the event payload
const regions = Config.get().regions; const regions = Config.get().regions;
const guildRegion = regions.available.filter( const guildRegion = regions.available.filter(

View File

@ -40,6 +40,13 @@ export async function onStreamWatch(this: WebSocket, data: Payload) {
if (type === "guild" && stream.channel.guild_id != guildId) if (type === "guild" && stream.channel.guild_id != guildId)
return this.close(4000, "Invalid stream key"); return this.close(4000, "Invalid stream key");
const regions = Config.get().regions;
const guildRegion = regions.available.find(
(r) => r.endpoint === stream.endpoint,
);
if (!guildRegion) return this.close(4000, "Unknown region");
const streamSession = StreamSession.create({ const streamSession = StreamSession.create({
stream_id: stream.id, stream_id: stream.id,
user_id: this.user_id, user_id: this.user_id,
@ -49,13 +56,6 @@ export async function onStreamWatch(this: WebSocket, data: Payload) {
await streamSession.save(); await streamSession.save();
const regions = Config.get().regions;
const guildRegion = regions.available.find(
(r) => r.endpoint === stream.endpoint,
);
if (!guildRegion) return this.close(4000, "Unknown region");
const viewers = await StreamSession.find({ const viewers = await StreamSession.find({
where: { stream_id: stream.id }, where: { stream_id: stream.id },
}); });

View File

@ -29,16 +29,17 @@ import {
export async function onSpeaking(this: WebRtcWebSocket, data: VoicePayload) { export async function onSpeaking(this: WebRtcWebSocket, data: VoicePayload) {
if (!this.webRtcClient) return; if (!this.webRtcClient) return;
mediaServer Promise.all(
.getClientsForRtcServer<WebRtcWebSocket>( Array.from(
mediaServer.getClientsForRtcServer<WebRtcWebSocket>(
this.webRtcClient.rtc_server_id, this.webRtcClient.rtc_server_id,
) ),
.forEach((client) => { ).map((client) => {
if (client.user_id === this.user_id) return; if (client.user_id === this.user_id) return Promise.resolve();
const ssrc = client.getOutgoingStreamSSRCsForUser(this.user_id); const ssrc = client.getOutgoingStreamSSRCsForUser(this.user_id);
Send(client.websocket, { return Send(client.websocket, {
op: VoiceOPCodes.SPEAKING, op: VoiceOPCodes.SPEAKING,
d: { d: {
user_id: this.user_id, user_id: this.user_id,
@ -46,5 +47,6 @@ export async function onSpeaking(this: WebRtcWebSocket, data: VoicePayload) {
ssrc: ssrc.audio_ssrc ?? 0, ssrc: ssrc.audio_ssrc ?? 0,
}, },
}); });
}); }),
);
} }

View File

@ -15,7 +15,7 @@
You should have received a copy of the GNU Affero General Public License You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>. along with this program. If not, see <https://www.gnu.org/licenses/>.
*/ */
import { validateSchema, VoiceVideoSchema } from "@spacebar/util"; import { Stream, validateSchema, VoiceVideoSchema } from "@spacebar/util";
import { import {
mediaServer, mediaServer,
VoiceOPCodes, VoiceOPCodes,
@ -32,6 +32,15 @@ export async function onVideo(this: WebRtcWebSocket, payload: VoicePayload) {
const d = validateSchema("VoiceVideoSchema", payload.d) as VoiceVideoSchema; const d = validateSchema("VoiceVideoSchema", payload.d) as VoiceVideoSchema;
if (this.type === "stream") {
const stream = await Stream.findOne({
where: { id: rtc_server_id },
});
// only the stream owner can publish to a go live stream
if (stream?.owner_id != this.user_id) return;
}
const stream = d.streams?.find((element) => element !== undefined); const stream = d.streams?.find((element) => element !== undefined);
await Send(this, { op: VoiceOPCodes.MEDIA_SINK_WANTS, d: { any: 100 } }); await Send(this, { op: VoiceOPCodes.MEDIA_SINK_WANTS, d: { any: 100 } });
@ -102,10 +111,11 @@ export async function onVideo(this: WebRtcWebSocket, payload: VoicePayload) {
} }
} }
for (const client of clientsThatNeedUpdate) { await Promise.all(
Array.from(clientsThatNeedUpdate).map((client) => {
const ssrcs = client.getOutgoingStreamSSRCsForUser(this.user_id); const ssrcs = client.getOutgoingStreamSSRCsForUser(this.user_id);
Send(client.websocket, { return Send(client.websocket, {
op: VoiceOPCodes.VIDEO, op: VoiceOPCodes.VIDEO,
d: { d: {
user_id: this.user_id, user_id: this.user_id,
@ -119,5 +129,6 @@ export async function onVideo(this: WebRtcWebSocket, payload: VoicePayload) {
})), })),
} as VoiceVideoSchema, } as VoiceVideoSchema,
}); });
} }),
);
} }