can now watch golive streams

This commit is contained in:
dank074 2025-04-24 00:25:25 -05:00
parent 4e5c40bad2
commit 1b526e20d3
6 changed files with 97 additions and 16 deletions

View File

@ -8,11 +8,14 @@ import {
Channel, Channel,
Config, Config,
emitEvent, emitEvent,
Member,
Region, Region,
Snowflake, Snowflake,
Stream, Stream,
StreamCreateSchema, StreamCreateSchema,
StreamSession, StreamSession,
VoiceState,
VoiceStateUpdateEvent,
} from "@spacebar/util"; } from "@spacebar/util";
import { check } from "./instanceOf"; import { check } from "./instanceOf";
@ -20,9 +23,22 @@ export async function onStreamCreate(this: WebSocket, data: Payload) {
check.call(this, StreamCreateSchema, data.d); check.call(this, StreamCreateSchema, data.d);
const body = data.d as StreamCreateSchema; const body = data.d as StreamCreateSchema;
// TODO: first check if we are in a voice channel already. cannot create a stream if there's no existing voice connection
if (body.channel_id.trim().length === 0) return; if (body.channel_id.trim().length === 0) return;
// first check if we are in a voice channel already. cannot create a stream if there's no existing voice connection
const voiceState = await VoiceState.findOne({
where: { user_id: this.user_id },
});
if (!voiceState || !voiceState.channel_id) return;
if (body.guild_id) {
voiceState.member = await Member.findOneOrFail({
where: { id: voiceState.user_id, guild_id: voiceState.guild_id },
relations: ["user", "roles"],
});
}
// TODO: permissions check - if it's a guild, check if user is allowed to create stream in this guild // TODO: permissions check - if it's a guild, check if user is allowed to create stream in this guild
const channel = await Channel.findOne({ const channel = await Channel.findOne({
@ -41,6 +57,11 @@ export async function onStreamCreate(this: WebSocket, data: Payload) {
(r) => r.id === regions.default, (r) => r.id === regions.default,
)[0]; )[0];
// first make sure theres no other streams for this user that somehow didnt get cleared
await Stream.delete({
owner_id: this.user_id,
});
// create a new entry in db containing the token for authenticating user in stream gateway IDENTIFY // create a new entry in db containing the token for authenticating user in stream gateway IDENTIFY
const stream = Stream.create({ const stream = Stream.create({
id: Snowflake.generate(), id: Snowflake.generate(),
@ -78,8 +99,7 @@ export async function onStreamCreate(this: WebSocket, data: Payload) {
region: guildRegion.name, region: guildRegion.name,
paused: false, paused: false,
}, },
guild_id: body.guild_id, user_id: this.user_id,
channel_id: body.channel_id,
}); });
await emitEvent({ await emitEvent({
@ -92,6 +112,16 @@ export async function onStreamCreate(this: WebSocket, data: Payload) {
}, },
user_id: this.user_id, user_id: this.user_id,
}); });
voiceState.self_stream = true;
await voiceState.save();
await emitEvent({
event: "VOICE_STATE_UPDATE",
data: { ...voiceState },
guild_id: voiceState.guild_id,
channel_id: voiceState.channel_id,
} as VoiceStateUpdateEvent);
} }
//stream key: //stream key:

View File

@ -1,5 +1,11 @@
import { parseStreamKey, Payload, WebSocket } from "@spacebar/gateway"; import { parseStreamKey, Payload, WebSocket } from "@spacebar/gateway";
import { emitEvent, Stream, StreamDeleteSchema } from "@spacebar/util"; import {
emitEvent,
Stream,
StreamDeleteSchema,
VoiceState,
VoiceStateUpdateEvent,
} from "@spacebar/util";
import { check } from "./instanceOf"; import { check } from "./instanceOf";
export async function onStreamDelete(this: WebSocket, data: Payload) { export async function onStreamDelete(this: WebSocket, data: Payload) {
@ -33,6 +39,22 @@ export async function onStreamDelete(this: WebSocket, data: Payload) {
await stream.remove(); await stream.remove();
const voiceState = await VoiceState.findOne({
where: { user_id: this.user_id },
});
if (voiceState) {
voiceState.self_stream = false;
await voiceState.save();
await emitEvent({
event: "VOICE_STATE_UPDATE",
data: { ...voiceState },
guild_id: guildId,
channel_id: channelId,
} as VoiceStateUpdateEvent);
}
await emitEvent({ await emitEvent({
event: "STREAM_DELETE", event: "STREAM_DELETE",
data: { data: {

View File

@ -79,8 +79,8 @@ export async function onStreamWatch(this: WebSocket, data: Payload) {
region: guildRegion.name, region: guildRegion.name,
paused: false, paused: false,
}, },
guild_id: guildId,
channel_id: channelId, channel_id: channelId,
user_id: this.user_id,
}); });
await emitEvent({ await emitEvent({

View File

@ -29,8 +29,10 @@ import {
VoicePayload, VoicePayload,
WebRtcWebSocket, WebRtcWebSocket,
Send, Send,
generateSsrc,
} from "@spacebar/webrtc"; } from "@spacebar/webrtc";
import { subscribeToProducers } from "./Video"; import { subscribeToProducers } from "./Video";
import { SSRCs } from "spacebar-webrtc-types";
export async function onIdentify(this: WebRtcWebSocket, data: VoicePayload) { export async function onIdentify(this: WebRtcWebSocket, data: VoicePayload) {
clearTimeout(this.readyTimeout); clearTimeout(this.readyTimeout);
@ -110,15 +112,19 @@ export async function onIdentify(this: WebRtcWebSocket, data: VoicePayload) {
await subscribeToProducers.call(this); await subscribeToProducers.call(this);
}); });
// the server generates a unique ssrc for the audio and video stream. Must be unique among users connected to same server
// UDP clients will respect this ssrc, but websocket clients will generate and replace it with their own
const generatedSsrc: SSRCs = {
audio_ssrc: generateSsrc(),
video_ssrc: generateSsrc(),
rtx_ssrc: generateSsrc(),
};
this.webRtcClient.initIncomingSSRCs(generatedSsrc);
await Send(this, { await Send(this, {
op: VoiceOPCodes.READY, op: VoiceOPCodes.READY,
d: { d: {
streams: streams?.map((x) => ({ ssrc: generatedSsrc.audio_ssrc,
...x,
ssrc: 2,
rtx_ssrc: 3,
})),
ssrc: 1,
port: mediaServer.port, port: mediaServer.port,
modes: [ modes: [
"aead_aes256_gcm_rtpsize", "aead_aes256_gcm_rtpsize",
@ -131,6 +137,12 @@ export async function onIdentify(this: WebRtcWebSocket, data: VoicePayload) {
], ],
ip: mediaServer.ip, ip: mediaServer.ip,
experiments: [], experiments: [],
streams: streams?.map((x) => ({
...x,
ssrc: generatedSsrc.video_ssrc,
rtx_ssrc: generatedSsrc.rtx_ssrc,
type: "video", // client expects this to be overriden for some reason???
})),
}, },
}); });
} }

View File

@ -92,7 +92,7 @@ export async function onVideo(this: WebRtcWebSocket, payload: VoicePayload) {
} }
// check if client has signaled that it will send video // check if client has signaled that it will send video
if (wantsToProduceVideo) { if (wantsToProduceVideo) {
this.webRtcClient!.videoStream = stream; this.webRtcClient!.videoStream = { ...stream, type: "video" }; // client sends "screen" on go live but expects "video" on response
// check if we are already publishing video, if not, publish a new video track for it // check if we are already publishing video, if not, publish a new video track for it
if (!this.webRtcClient!.isProducingVideo()) { if (!this.webRtcClient!.isProducingVideo()) {
console.log( console.log(
@ -129,13 +129,16 @@ export async function onVideo(this: WebRtcWebSocket, payload: VoicePayload) {
op: VoiceOPCodes.VIDEO, op: VoiceOPCodes.VIDEO,
d: { d: {
user_id: this.user_id, user_id: this.user_id,
audio_ssrc: ssrcs.audio_ssrc ?? 0, audio_ssrc:
ssrcs.audio_ssrc ??
this.webRtcClient!.getIncomingStreamSSRCs().audio_ssrc, // can never send audio ssrc as 0, it will mess up client state for some reason
video_ssrc: ssrcs.video_ssrc ?? 0, video_ssrc: ssrcs.video_ssrc ?? 0,
rtx_ssrc: ssrcs.rtx_ssrc ?? 0, rtx_ssrc: ssrcs.rtx_ssrc ?? 0,
streams: d.streams?.map((x) => ({ streams: d.streams?.map((x) => ({
...x, ...x,
ssrc: ssrcs.video_ssrc ?? 0, ssrc: ssrcs.video_ssrc ?? 0,
rtx_ssrc: ssrcs.rtx_ssrc ?? 0, rtx_ssrc: ssrcs.rtx_ssrc ?? 0,
type: "video",
})), })),
} as VoiceVideoSchema, } as VoiceVideoSchema,
}); });
@ -157,6 +160,8 @@ export async function subscribeToProducers(
Array.from(clients).map((client) => { Array.from(clients).map((client) => {
let needsUpdate = false; let needsUpdate = false;
if (client.user_id === this.user_id) return Promise.resolve(); // cannot subscribe to self
if (!client.isProducingAudio() && !client.isProducingVideo) if (!client.isProducingAudio() && !client.isProducingVideo)
return Promise.resolve(); return Promise.resolve();
@ -186,17 +191,19 @@ export async function subscribeToProducers(
op: VoiceOPCodes.VIDEO, op: VoiceOPCodes.VIDEO,
d: { d: {
user_id: client.user_id, user_id: client.user_id,
audio_ssrc: ssrcs.audio_ssrc ?? 0, audio_ssrc:
ssrcs.audio_ssrc ??
client.getIncomingStreamSSRCs().audio_ssrc, // can never send audio ssrc as 0, it will mess up client state for some reason
video_ssrc: ssrcs.video_ssrc ?? 0, video_ssrc: ssrcs.video_ssrc ?? 0,
rtx_ssrc: ssrcs.rtx_ssrc ?? 0, rtx_ssrc: ssrcs.rtx_ssrc ?? 0,
streams: [ streams: [
client.videoStream ?? { client.videoStream ?? {
type: this.type === "stream" ? "screen" : "video", type: "video",
rid: "100", rid: "100",
ssrc: ssrcs.video_ssrc ?? 0, ssrc: ssrcs.video_ssrc ?? 0,
active: client.isProducingVideo(), active: client.isProducingVideo(),
quality: 100, quality: 100,
rtx_ssrc: ssrcs.rtx_ssrc, rtx_ssrc: ssrcs.rtx_ssrc ?? 0,
max_bitrate: 2500000, max_bitrate: 2500000,
max_framerate: 20, max_framerate: 20,
max_resolution: { max_resolution: {

View File

@ -65,3 +65,13 @@ export const loadWebRtcLibrary = async () => {
return Promise.reject(); return Promise.reject();
} }
}; };
const MAX_INT32BIT = 2 ** 32;
let count = 1;
export const generateSsrc = () => {
count++;
if (count >= MAX_INT32BIT) count = 1;
return count;
};