more stuff

This commit is contained in:
dank074 2025-04-09 14:37:19 -05:00
parent e2b9924d6e
commit 96e457323b
17 changed files with 8939 additions and 2241 deletions

10913
package-lock.json generated

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,52 @@
import { Payload, WebSocket } from "@spacebar/gateway";
import { Config, emitEvent, Region, VoiceState } from "@spacebar/util";
interface StreamCreateSchema {
type: "guild" | "call";
channel_id: string;
guild_id?: string;
preferred_region?: string;
}
export async function onStreamCreate(this: WebSocket, data: Payload) {
const body = data.d as StreamCreateSchema;
// first check if we are in a voice channel already. cannot create a stream if there's no existing voice connection
if (!this.voiceWs || !this.voiceWs.webrtcConnected) return;
// TODO: permissions check - if it's a guild, check if user is allowed to create stream in this guild
// TODO: create a new entry in db (StreamState?) containing the token for authenticating user in stream gateway IDENTIFY
// TODO: actually apply preferred_region from the event payload
const regions = Config.get().regions;
const guildRegion = regions.available.filter(
(r) => r.id === regions.default,
)[0];
const streamKey = `${body.type}${body.type === "guild" ? ":" + body.guild_id : ""}:${body.channel_id}:${this.user_id}`;
await emitEvent({
event: "STREAM_CREATE",
data: {
stream_key: streamKey,
rtc_server_id: "lol", // for voice connections in guilds it is guild_id, for dm voice calls it seems to be DM channel id, for GoLive streams a generated number
},
guild_id: body.guild_id,
//user_id: this.user_id,
});
await emitEvent({
event: "STREAM_SERVER_UPDATE",
data: {
token: "TEST",
stream_key: streamKey,
endpoint: guildRegion.endpoint,
},
user_id: this.user_id,
});
}
//stream key:
// guild:${guild_id}:${channel_id}:${user_id}
// call:${channel_id}:${user_id}

View File

@ -0,0 +1,29 @@
import { Payload, WebSocket } from "@spacebar/gateway";
interface StreamDeleteSchema {
stream_key: string;
}
export async function onStreamDelete(this: WebSocket, data: Payload) {
const body = data.d as StreamDeleteSchema;
const splitStreamKey = body.stream_key.split(":");
if (splitStreamKey.length < 3) {
return this.close(4000, "Invalid stream key");
}
const type = splitStreamKey.shift()!;
let guild_id: string;
if (type === "guild") {
guild_id = splitStreamKey.shift()!;
}
const channel_id = splitStreamKey.shift()!;
const user_id = splitStreamKey.shift()!;
if (this.user_id !== user_id) {
return this.close(4000, "Cannot delete stream for another user");
}
// TODO: actually delete stream
}

View File

@ -0,0 +1,23 @@
import { Payload, WebSocket } from "@spacebar/gateway";
interface StreamWatchSchema {
stream_key: string;
}
export async function onStreamWatch(this: WebSocket, data: Payload) {
const body = data.d as StreamWatchSchema;
const splitStreamKey = body.stream_key.split(":");
if (splitStreamKey.length < 3) {
return this.close(4000, "Invalid stream key");
}
const type = splitStreamKey.shift()!;
let guild_id: string;
if (type === "guild") {
guild_id = splitStreamKey.shift()!;
}
const channel_id = splitStreamKey.shift()!;
const user_id = splitStreamKey.shift()!;
}

View File

@ -25,6 +25,9 @@ import { onRequestGuildMembers } from "./RequestGuildMembers";
import { onResume } from "./Resume";
import { onVoiceStateUpdate } from "./VoiceStateUpdate";
import { onGuildSubscriptionsBulk } from "./GuildSubscriptionsBulk";
import { onStreamCreate } from "./StreamCreate";
import { onStreamDelete } from "./StreamDelete";
import { onStreamWatch } from "./StreamWatch";
export type OPCodeHandler = (this: WebSocket, data: Payload) => unknown;
@ -41,5 +44,8 @@ export default {
// 10: Hello
// 13: Dm_update
14: onLazyRequest,
18: onStreamCreate,
19: onStreamDelete,
20: onStreamWatch,
37: onGuildSubscriptionsBulk,
} as { [key: number]: OPCodeHandler };

View File

@ -42,6 +42,7 @@ export interface WebSocket extends WS {
member_events: Record<string, () => unknown>;
listen_options: ListenEventOpts;
capabilities?: Capabilities;
client?: WebRtcClient<WebSocket>;
voiceWs?: WebRtcClient<WebSocket>;
streamWs?: WebRtcClient<WebSocket>;
large_threshold: number;
}

View File

@ -445,6 +445,23 @@ export interface VoiceServerUpdateEvent extends Event {
};
}
export interface StreamCreateEvent extends Event {
event: "STREAM_CREATE";
data: {
stream_key: string;
rtc_server_id: string;
};
}
export interface StreamServerUpdateEvent extends Event {
event: "STREAM_SERVER_UPDATE";
data: {
token: string;
stream_key: string;
endpoint: string;
};
}
export interface WebhooksUpdateEvent extends Event {
event: "WEBHOOKS_UPDATE";
data: {
@ -681,6 +698,8 @@ export type EVENT =
| "INTERACTION_CREATE"
| "VOICE_STATE_UPDATE"
| "VOICE_SERVER_UPDATE"
| "STREAM_CREATE"
| "STREAM_SERVER_UPDATE"
| "APPLICATION_COMMAND_CREATE"
| "APPLICATION_COMMAND_UPDATE"
| "APPLICATION_COMMAND_DELETE"

View File

@ -8,11 +8,11 @@ import {
Transport,
Endpoint,
} from "@dank074/medooze-media-server";
import { VoiceChannel } from "./VoiceChannel";
import { VoiceRoom } from "./VoiceRoom";
import { MedoozeWebRtcClient } from "./MedoozeWebRtcClient";
export class MedoozeSignalingDelegate implements SignalingDelegate {
private _channels: Map<string, VoiceChannel> = new Map();
private _rooms: Map<string, VoiceRoom> = new Map();
private _ip: string;
private _port: number;
private _endpoint: Endpoint;
@ -44,7 +44,11 @@ export class MedoozeSignalingDelegate implements SignalingDelegate {
return Promise.resolve();
}
public join(channelId: string, userId: string, ws: any): WebRtcClient<any> {
public join(
rtcServerId: string,
userId: string,
ws: any,
): WebRtcClient<any> {
const existingClient = this.getClientForUserId(userId);
if (existingClient) {
@ -52,16 +56,16 @@ export class MedoozeSignalingDelegate implements SignalingDelegate {
this.onClientClose(existingClient);
}
if (!this._channels.has(channelId)) {
if (!this._rooms.has(rtcServerId)) {
console.debug("no channel created, creating one...");
this.createChannel(channelId);
this.createChannel(rtcServerId);
}
const channel = this._channels.get(channelId)!;
const room = this._rooms.get(rtcServerId)!;
const client = new MedoozeWebRtcClient(userId, channelId, ws, channel);
const client = new MedoozeWebRtcClient(userId, rtcServerId, ws, room);
channel?.onClientJoin(client);
room?.onClientJoin(client);
return client;
}
@ -71,9 +75,9 @@ export class MedoozeSignalingDelegate implements SignalingDelegate {
sdpOffer: string,
codecs: Codec[],
): Promise<string> {
const channel = this._channels.get(client.channel_id);
const room = this._rooms.get(client.rtc_server_id);
if (!channel) {
if (!room) {
console.error(
"error, client sent an offer but has not authenticated",
);
@ -175,7 +179,7 @@ export class MedoozeSignalingDelegate implements SignalingDelegate {
transport.setRemoteProperties(offer);
channel?.onClientOffer(client, transport);
room?.onClientOffer(client, transport);
const dtls = transport.getLocalDTLSInfo();
const ice = transport.getLocalICEInfo();
@ -197,37 +201,39 @@ export class MedoozeSignalingDelegate implements SignalingDelegate {
}
public onClientClose = (client: WebRtcClient<any>) => {
this._channels.get(client.channel_id)?.onClientLeave(client);
this._rooms.get(client.rtc_server_id)?.onClientLeave(client);
};
public updateSDP(offer: string): void {
throw new Error("Method not implemented.");
}
public createChannel(channelId: string): void {
this._channels.set(channelId, new VoiceChannel(channelId, this));
public createChannel(rtcServerId: string): void {
this._rooms.set(rtcServerId, new VoiceRoom(rtcServerId, this));
}
public disposeChannelRouter(channelId: string): void {
this._channels.delete(channelId);
public disposeRoom(rtcServerId: string): void {
const room = this._rooms.get(rtcServerId);
room?.dispose();
this._rooms.delete(rtcServerId);
}
get channels(): Map<string, VoiceChannel> {
return this._channels;
get rooms(): Map<string, VoiceRoom> {
return this._rooms;
}
public getClientsForChannel(channelId: string): Set<WebRtcClient<any>> {
if (!this._channels.has(channelId)) {
public getClientsForRtcServer(rtcServerId: string): Set<WebRtcClient<any>> {
if (!this._rooms.has(rtcServerId)) {
return new Set();
}
return new Set(this._channels.get(channelId)?.clients.values())!;
return new Set(this._rooms.get(rtcServerId)?.clients.values())!;
}
private getClientForUserId = (
userId: string,
): MedoozeWebRtcClient | undefined => {
for (const channel of this.channels.values()) {
for (const channel of this.rooms.values()) {
let result = channel.getClientById(userId);
if (result) {
return result;

View File

@ -4,29 +4,29 @@ import {
Transport,
} from "@dank074/medooze-media-server";
import { SSRCs, WebRtcClient } from "webrtc/util";
import { VoiceChannel } from "./VoiceChannel";
import { VoiceRoom } from "./VoiceRoom";
export class MedoozeWebRtcClient implements WebRtcClient<any> {
websocket: any;
user_id: string;
channel_id: string;
rtc_server_id: string;
webrtcConnected: boolean;
public transport?: Transport;
public incomingStream?: IncomingStream;
public outgoingStream?: OutgoingStream;
public channel?: VoiceChannel;
public room?: VoiceRoom;
public isStopped?: boolean;
constructor(
userId: string,
channelId: string,
rtcServerId: string,
websocket: any,
channel: VoiceChannel,
room: VoiceRoom,
) {
this.user_id = userId;
this.channel_id = channelId;
this.rtc_server_id = rtcServerId;
this.websocket = websocket;
this.channel = channel;
this.room = room;
this.webrtcConnected = false;
this.isStopped = false;
}
@ -120,7 +120,7 @@ export class MedoozeWebRtcClient implements WebRtcClient<any> {
const id = `${type}-${user_id}`;
const otherClient = this.channel?.getClientById(user_id);
const otherClient = this.room?.getClientById(user_id);
const incomingStream = otherClient?.incomingStream;
const incomingTrack = incomingStream?.getTrack(id);

View File

@ -7,7 +7,7 @@ import {
import { MedoozeWebRtcClient } from "./MedoozeWebRtcClient";
import { StreamInfo } from "semantic-sdp";
export class VoiceChannel {
export class VoiceRoom {
private _clients: Map<string, MedoozeWebRtcClient>;
private _id: string;
private _sfu: MedoozeSignalingDelegate;
@ -78,7 +78,7 @@ export class VoiceChannel {
client.outgoingStream?.stop();
client.transport?.stop();
client.channel = undefined;
client.room = undefined;
client.incomingStream = undefined;
client.outgoingStream = undefined;
client.transport = undefined;
@ -97,4 +97,14 @@ export class VoiceChannel {
get id(): string {
return this._id;
}
public dispose(): void {
const clients = this._clients.values();
for (const client of clients) {
this.onClientLeave(client);
}
this._clients.clear();
this._sfu = undefined!;
this._clients = undefined!;
}
}

View File

@ -37,10 +37,10 @@ export async function onIdentify(this: WebSocket, data: Payload) {
this.user_id = user_id;
this.session_id = session_id;
this.client = mediaServer.join(voiceState.channel_id, this.user_id, this);
this.voiceWs = mediaServer.join(voiceState.channel_id, this.user_id, this);
this.on("close", () => {
mediaServer.onClientClose(this.client!);
mediaServer.onClientClose(this.voiceWs!);
});
await Send(this, {

View File

@ -21,7 +21,7 @@ import { SelectProtocolSchema, validateSchema } from "@spacebar/util";
import { VoiceOPCodes, mediaServer } from "@spacebar/webrtc";
export async function onSelectProtocol(this: WebSocket, payload: Payload) {
if (!this.client) return;
if (!this.voiceWs) return;
const data = validateSchema(
"SelectProtocolSchema",
@ -29,7 +29,7 @@ export async function onSelectProtocol(this: WebSocket, payload: Payload) {
) as SelectProtocolSchema;
const answer = await mediaServer.onOffer(
this.client,
this.voiceWs,
data.sdp!,
data.codecs ?? [],
);

View File

@ -22,10 +22,10 @@ import { mediaServer, VoiceOPCodes } from "../util";
// {"speaking":1,"delay":5,"ssrc":2805246727}
export async function onSpeaking(this: WebSocket, data: Payload) {
if (!this.client) return;
if (!this.voiceWs) return;
mediaServer
.getClientsForChannel<WebSocket>(this.client.channel_id)
.getClientsForRtcServer<WebSocket>(this.voiceWs.rtc_server_id)
.forEach((client) => {
if (client.user_id === this.user_id) return;

View File

@ -21,8 +21,8 @@ import { validateSchema, VoiceVideoSchema } from "@spacebar/util";
import { mediaServer, VoiceOPCodes, WebRtcClient } from "@spacebar/webrtc";
export async function onVideo(this: WebSocket, payload: Payload) {
if (!this.client || !this.client.webrtcConnected) return;
const { channel_id } = this.client;
if (!this.voiceWs || !this.voiceWs.webrtcConnected) return;
const { rtc_server_id: channel_id } = this.voiceWs;
const d = validateSchema("VoiceVideoSchema", payload.d) as VoiceVideoSchema;
@ -30,7 +30,7 @@ export async function onVideo(this: WebSocket, payload: Payload) {
await Send(this, { op: VoiceOPCodes.MEDIA_SINK_WANTS, d: { any: 100 } });
const ssrcs = this.client.getIncomingStreamSSRCs();
const ssrcs = this.voiceWs.getIncomingStreamSSRCs();
const clientsThatNeedUpdate = new Set<WebRtcClient<WebSocket>>();
@ -41,11 +41,11 @@ export async function onVideo(this: WebSocket, payload: Payload) {
console.log(
`[${this.user_id}] publishing new audio track ssrc:${d.audio_ssrc}`,
);
this.client.publishTrack("audio", { audio_ssrc: d.audio_ssrc });
this.voiceWs.publishTrack("audio", { audio_ssrc: d.audio_ssrc });
}
// now check that all clients have outgoing media for this ssrcs
for (const client of mediaServer.getClientsForChannel<WebSocket>(
for (const client of mediaServer.getClientsForRtcServer<WebSocket>(
channel_id,
)) {
if (client.user_id === this.user_id) continue;
@ -55,7 +55,7 @@ export async function onVideo(this: WebSocket, payload: Payload) {
console.log(
`[${client.user_id}] subscribing to audio track ssrcs: ${d.audio_ssrc}`,
);
client.subscribeToTrack(this.client.user_id, "audio");
client.subscribeToTrack(this.voiceWs.user_id, "audio");
clientsThatNeedUpdate.add(client);
}
@ -68,26 +68,26 @@ export async function onVideo(this: WebSocket, payload: Payload) {
console.log(
`[${this.user_id}] publishing new video track ssrc:${d.video_ssrc}`,
);
this.client.publishTrack("video", {
this.voiceWs.publishTrack("video", {
video_ssrc: d.video_ssrc,
rtx_ssrc: d.rtx_ssrc,
});
}
// now check that all clients have outgoing media for this ssrcs
for (const client of mediaServer.getClientsForChannel<WebSocket>(
for (const client of mediaServer.getClientsForRtcServer<WebSocket>(
channel_id,
)) {
if (client.user_id === this.user_id) continue;
const ssrcs = client.getOutgoingStreamSSRCsForUser(
this.client.user_id,
this.voiceWs.user_id,
);
if (ssrcs.video_ssrc != d.video_ssrc) {
console.log(
`[${client.user_id}] subscribing to video track ssrc: ${d.video_ssrc}`,
);
client.subscribeToTrack(this.client.user_id, "video");
client.subscribeToTrack(this.voiceWs.user_id, "video");
clientsThatNeedUpdate.add(client);
}

View File

@ -16,7 +16,20 @@
along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
import { MedoozeSignalingDelegate } from "../medooze/MedoozeSignalingDelegate";
//import { MedoozeSignalingDelegate } from "../medooze/MedoozeSignalingDelegate";
import { SignalingDelegate } from "./SignalingDelegate";
export const mediaServer: SignalingDelegate = new MedoozeSignalingDelegate();
export let mediaServer: SignalingDelegate;
(async () => {
try {
//mediaServer = require('../medooze/MedoozeSignalingDelegate');
mediaServer = new (
await import("../medooze/MedoozeSignalingDelegate")
).MedoozeSignalingDelegate();
} catch (e) {
console.error("Failed to import MedoozeSignalingDelegate", e);
// Fallback to a different implementation or handle the error
// For example, you could set mediaServer to null or throw an error
}
})();

View File

@ -3,7 +3,7 @@ import { Codec, WebRtcClient } from "./WebRtcClient";
export interface SignalingDelegate {
start: () => Promise<void>;
stop: () => Promise<void>;
join<T>(channelId: string, userId: string, ws: T): WebRtcClient<T>;
join<T>(rtcServerId: string, userId: string, ws: T): WebRtcClient<T>;
onOffer<T>(
client: WebRtcClient<T>,
offer: string,
@ -11,7 +11,7 @@ export interface SignalingDelegate {
): Promise<string>;
onClientClose<T>(client: WebRtcClient<T>): void;
updateSDP(offer: string): void;
getClientsForChannel<T>(channelId: string): Set<WebRtcClient<T>>;
getClientsForRtcServer<T>(rtcServerId: string): Set<WebRtcClient<T>>;
get ip(): string;
get port(): number;
}

View File

@ -1,7 +1,7 @@
export interface WebRtcClient<T> {
websocket: T;
user_id: string;
channel_id: string;
rtc_server_id: string;
webrtcConnected: boolean;
getIncomingStreamSSRCs: () => SSRCs;
getOutgoingStreamSSRCsForUser: (user_id: string) => SSRCs;