diff --git a/assets/schemas.json b/assets/schemas.json index 44c77a77..2d39546f 100755 --- a/assets/schemas.json +++ b/assets/schemas.json @@ -614199,10 +614199,7 @@ "type": "integer" }, "rtx_payload_type": { - "type": [ - "null", - "integer" - ] + "type": "integer" } }, "additionalProperties": false, @@ -684127,6 +684124,9 @@ "type" ] } + }, + "max_secure_frames_version": { + "type": "integer" } }, "additionalProperties": false, @@ -749017,6 +749017,42 @@ "items": { "type": "string" } + }, + "message_reference": { + "type": "object", + "properties": { + "message_id": { + "type": "string" + }, + "channel_id": { + "type": "string" + }, + "guild_id": { + "type": "string" + }, + "fail_if_not_exists": { + "type": "boolean" + } + }, + "additionalProperties": false, + "required": [ + "message_id" + ] + }, + "sticker_ids": { + "type": "array", + "items": { + "type": "string" + } + }, + "nonce": { + "type": "string" + }, + "enforce_nonce": { + "type": "boolean" + }, + "poll": { + "$ref": "#/definitions/PollCreationSchema" } }, "additionalProperties": false, diff --git a/package.json b/package.json index 69779141..01bf56c1 100644 --- a/package.json +++ b/package.json @@ -118,11 +118,14 @@ "@spacebar/api": "dist/api", "@spacebar/cdn": "dist/cdn", "@spacebar/gateway": "dist/gateway", - "@spacebar/util": "dist/util" + "@spacebar/util": "dist/util", + "@spacebar/webrtc": "dist/webrtc" }, "optionalDependencies": { "@yukikaze-bot/erlpack": "^1.0.1", "jimp": "^1.6.0", + "@dank074/medooze-media-server": "1.156.3", + "semantic-sdp": "^3.31.1", "mysql": "^2.18.1", "nodemailer-mailgun-transport": "^2.1.5", "nodemailer-mailjet-transport": "github:n0script22/nodemailer-mailjet-transport", diff --git a/src/bundle/Server.ts b/src/bundle/Server.ts index d281120d..b34322a3 100644 --- a/src/bundle/Server.ts +++ b/src/bundle/Server.ts @@ -22,6 +22,7 @@ process.on("uncaughtException", console.error); import http from "http"; import * as Api from "@spacebar/api"; import * as Gateway from "@spacebar/gateway"; +import * as Webrtc from "@spacebar/webrtc"; import { CDNServer } from "@spacebar/cdn"; import express from "express"; import { green, bold } from "picocolors"; @@ -36,12 +37,14 @@ server.on("request", app); const api = new Api.SpacebarServer({ server, port, production, app }); const cdn = new CDNServer({ server, port, production, app }); const gateway = new Gateway.Server({ server, port, production }); +const webrtc = new Webrtc.Server({ server: undefined, port: 3004, production }); process.on("SIGTERM", async () => { console.log("Shutting down due to SIGTERM"); await gateway.stop(); await cdn.stop(); await api.stop(); + await webrtc.stop(); server.close(); Sentry.close(); }); @@ -54,7 +57,12 @@ async function main() { await new Promise((resolve) => server.listen({ port }, () => resolve(undefined)), ); - await Promise.all([api.start(), cdn.start(), gateway.start()]); + await Promise.all([ + api.start(), + cdn.start(), + gateway.start(), + webrtc.start(), + ]); Sentry.errorHandler(app); diff --git a/src/gateway/opcodes/VoiceStateUpdate.ts b/src/gateway/opcodes/VoiceStateUpdate.ts index b45c8203..1c69958c 100644 --- a/src/gateway/opcodes/VoiceStateUpdate.ts +++ b/src/gateway/opcodes/VoiceStateUpdate.ts @@ -17,19 +17,19 @@ */ import { Payload, WebSocket } from "@spacebar/gateway"; -import { genVoiceToken } from "../util/SessionUtils"; -import { check } from "./instanceOf"; import { Config, emitEvent, Guild, Member, + Region, VoiceServerUpdateEvent, VoiceState, VoiceStateUpdateEvent, VoiceStateUpdateSchema, - Region, } from "@spacebar/util"; +import { genVoiceToken } from "../util/SessionUtils"; +import { check } from "./instanceOf"; // TODO: check if a voice server is setup // Notice: Bot users respect the voice channel's user limit, if set. @@ -39,6 +39,8 @@ import { export async function onVoiceStateUpdate(this: WebSocket, data: Payload) { check.call(this, VoiceStateUpdateSchema, data.d); const body = data.d as VoiceStateUpdateSchema; + const isNew = body.channel_id === null && body.guild_id === null; + let isChanged = false; let voiceState: VoiceState; try { @@ -54,6 +56,8 @@ export async function onVoiceStateUpdate(this: WebSocket, data: Payload) { return; } + if (voiceState.channel_id !== body.channel_id) isChanged = true; + //If a user change voice channel between guild we should send a left event first if ( voiceState.guild_id !== body.guild_id && @@ -111,7 +115,7 @@ export async function onVoiceStateUpdate(this: WebSocket, data: Payload) { ]); //If it's null it means that we are leaving the channel and this event is not needed - if (voiceState.channel_id !== null) { + if ((isNew || isChanged) && voiceState.channel_id !== null) { const guild = await Guild.findOne({ where: { id: voiceState.guild_id }, }); @@ -135,6 +139,7 @@ export async function onVoiceStateUpdate(this: WebSocket, data: Payload) { endpoint: guildRegion.endpoint, }, guild_id: voiceState.guild_id, + user_id: voiceState.user_id, } as VoiceServerUpdateEvent); } } diff --git a/src/gateway/util/Constants.ts b/src/gateway/util/Constants.ts index 5c0f134a..0fc9545b 100644 --- a/src/gateway/util/Constants.ts +++ b/src/gateway/util/Constants.ts @@ -16,7 +16,7 @@ along with this program. If not, see . */ -// import { VoiceOPCodes } from "@spacebar/webrtc"; +import { VoiceOPCodes } from "@spacebar/webrtc"; export enum OPCODES { Dispatch = 0, @@ -63,7 +63,7 @@ export enum CLOSECODES { } export interface Payload { - op: OPCODES /* | VoiceOPCodes */; + op: OPCODES | VoiceOPCodes; // eslint-disable-next-line @typescript-eslint/no-explicit-any d?: any; s?: number; diff --git a/src/gateway/util/WebSocket.ts b/src/gateway/util/WebSocket.ts index 8cfc5e08..17f7f4ac 100644 --- a/src/gateway/util/WebSocket.ts +++ b/src/gateway/util/WebSocket.ts @@ -20,7 +20,7 @@ import { Intents, ListenEventOpts, Permissions } from "@spacebar/util"; import WS from "ws"; import { Deflate, Inflate } from "fast-zlib"; import { Capabilities } from "./Capabilities"; -// import { Client } from "@spacebar/webrtc"; +import { WebRtcClient } from "@spacebar/webrtc"; export interface WebSocket extends WS { version: number; @@ -42,6 +42,6 @@ export interface WebSocket extends WS { member_events: Record unknown>; listen_options: ListenEventOpts; capabilities?: Capabilities; - // client?: Client; + client?: WebRtcClient; large_threshold: number; } diff --git a/src/util/schemas/SelectProtocolSchema.ts b/src/util/schemas/SelectProtocolSchema.ts index 09283619..d04adf71 100644 --- a/src/util/schemas/SelectProtocolSchema.ts +++ b/src/util/schemas/SelectProtocolSchema.ts @@ -31,7 +31,7 @@ export interface SelectProtocolSchema { type: "audio" | "video"; priority: number; payload_type: number; - rtx_payload_type?: number | null; + rtx_payload_type?: number; }[]; rtc_connection_id?: string; // uuid } diff --git a/src/util/schemas/VoiceIdentifySchema.ts b/src/util/schemas/VoiceIdentifySchema.ts index 618d6591..e4436bf1 100644 --- a/src/util/schemas/VoiceIdentifySchema.ts +++ b/src/util/schemas/VoiceIdentifySchema.ts @@ -27,4 +27,5 @@ export interface VoiceIdentifySchema { rid: string; quality: number; }[]; + max_secure_frames_version?: number; } diff --git a/src/webrtc/Server.ts b/src/webrtc/Server.ts index 0ba2e41b..07949f8c 100644 --- a/src/webrtc/Server.ts +++ b/src/webrtc/Server.ts @@ -21,6 +21,7 @@ import dotenv from "dotenv"; import http from "http"; import ws from "ws"; import { Connection } from "./events/Connection"; +import { mediaServer } from "./util/MediaServer"; dotenv.config(); export class Server { @@ -69,6 +70,7 @@ export class Server { await initDatabase(); await Config.init(); await initEvent(); + await mediaServer.start(); if (!this.server.listening) { this.server.listen(this.port); console.log(`[WebRTC] online on 0.0.0.0:${this.port}`); @@ -78,5 +80,6 @@ export class Server { async stop() { closeDatabase(); this.server.close(); + mediaServer.stop(); } } diff --git a/src/webrtc/events/Message.ts b/src/webrtc/events/Message.ts index 22189e95..6c805805 100644 --- a/src/webrtc/events/Message.ts +++ b/src/webrtc/events/Message.ts @@ -30,14 +30,12 @@ const PayloadSchema = { export async function onMessage(this: WebSocket, buffer: Buffer) { try { - var data: Payload = JSON.parse(buffer.toString()); + const data: Payload = JSON.parse(buffer.toString()); if (data.op !== VoiceOPCodes.IDENTIFY && !this.user_id) return this.close(CLOSECODES.Not_authenticated); - // @ts-ignore const OPCodeHandler = OPCodeHandlers[data.op]; if (!OPCodeHandler) { - // @ts-ignore console.error("[WebRTC] Unkown opcode " + VoiceOPCodes[data.op]); // TODO: if all opcodes are implemented comment this out: // this.close(CloseCodes.Unknown_opcode); @@ -49,7 +47,6 @@ export async function onMessage(this: WebSocket, buffer: Buffer) { data.op as VoiceOPCodes, ) ) { - // @ts-ignore console.log("[WebRTC] Opcode " + VoiceOPCodes[data.op]); } diff --git a/src/webrtc/medooze/MedoozeSignalingDelegate.ts b/src/webrtc/medooze/MedoozeSignalingDelegate.ts new file mode 100644 index 00000000..ad6f9175 --- /dev/null +++ b/src/webrtc/medooze/MedoozeSignalingDelegate.ts @@ -0,0 +1,253 @@ +import { CodecInfo, MediaInfo, SDPInfo } from "semantic-sdp"; +import { SignalingDelegate } from "../util/SignalingDelegate"; +import { Codec, WebRtcClient } from "../util/WebRtcClient"; +import { + MediaServer, + IncomingStream, + OutgoingStream, + Transport, + Endpoint, +} from "@dank074/medooze-media-server"; +import { VoiceChannel } from "./VoiceChannel"; +import { MedoozeWebRtcClient } from "./MedoozeWebRtcClient"; + +export class MedoozeSignalingDelegate implements SignalingDelegate { + private _channels: Map = new Map(); + private _ip: string; + private _port: number; + private _endpoint: Endpoint; + + public start(): Promise { + MediaServer.enableLog(true); + + this._ip = process.env.PUBLIC_IP || "127.0.0.1"; + + try { + const range = process.env.WEBRTC_PORT_RANGE || "3690-3960"; + var ports = range.split("-"); + const min = Number(ports[0]); + const max = Number(ports[1]); + + MediaServer.setPortRange(min, max); + } catch (error) { + console.error( + "Invalid env var: WEBRTC_PORT_RANGE", + process.env.WEBRTC_PORT_RANGE, + error, + ); + process.exit(1); + } + + //MediaServer.setAffinity(2) + this._endpoint = MediaServer.createEndpoint(this._ip); + this._port = this._endpoint.getLocalPort(); + return Promise.resolve(); + } + + public join(channelId: string, userId: string, ws: any): WebRtcClient { + const existingClient = this.getClientForUserId(userId); + + if (existingClient) { + console.log("client already connected, disconnect.."); + this.onClientClose(existingClient); + } + + if (!this._channels.has(channelId)) { + console.debug("no channel created, creating one..."); + this.createChannel(channelId); + } + + const channel = this._channels.get(channelId)!; + + const client = new MedoozeWebRtcClient(userId, channelId, ws, channel); + + channel?.onClientJoin(client); + + return client; + } + + public async onOffer( + client: WebRtcClient, + sdpOffer: string, + codecs: Codec[], + ): Promise { + const channel = this._channels.get(client.channel_id); + + if (!channel) { + console.error( + "error, client sent an offer but has not authenticated", + ); + Promise.reject(); + } + + const offer = SDPInfo.parse("m=audio\n" + sdpOffer); + + const rtpHeaders = new Map(offer.medias[0].extensions); + + const getIdForHeader = ( + rtpHeaders: Map, + headerUri: string, + ) => { + for (const [key, value] of rtpHeaders) { + if (value == headerUri) return key; + } + return -1; + }; + + const audioMedia = new MediaInfo("0", "audio"); + const audioCodec = new CodecInfo( + "opus", + codecs.find((val) => val.name == "opus")?.payload_type ?? 111, + ); + audioCodec.addParam("minptime", "10"); + audioCodec.addParam("usedtx", "1"); + audioCodec.addParam("useinbandfec", "1"); + audioCodec.setChannels(2); + audioMedia.addCodec(audioCodec); + + audioMedia.addExtension( + getIdForHeader( + rtpHeaders, + "urn:ietf:params:rtp-hdrext:ssrc-audio-level", + ), + "urn:ietf:params:rtp-hdrext:ssrc-audio-level", + ); + if (audioCodec.type === 111) + // if this is chromium, apply this header + audioMedia.addExtension( + getIdForHeader( + rtpHeaders, + "http://www.ietf.org/id/draft-holmer-rmcat-transport-wide-cc-extensions-01", + ), + "http://www.ietf.org/id/draft-holmer-rmcat-transport-wide-cc-extensions-01", + ); + + const videoMedia = new MediaInfo("1", "video"); + const videoCodec = new CodecInfo( + "H264", + codecs.find((val) => val.name == "H264")?.payload_type ?? 102, + ); + videoCodec.setRTX( + codecs.find((val) => val.name == "H264")?.rtx_payload_type ?? 103, + ); + videoCodec.addParam("level-asymmetry-allowed", "1"); + videoCodec.addParam("packetization-mode", "1"); + videoCodec.addParam("profile-level-id", "42e01f"); + videoCodec.addParam("x-google-max-bitrate", "2500"); + videoMedia.addCodec(videoCodec); + + videoMedia.addExtension( + getIdForHeader( + rtpHeaders, + "http://www.webrtc.org/experiments/rtp-hdrext/abs-send-time", + ), + "http://www.webrtc.org/experiments/rtp-hdrext/abs-send-time", + ); + videoMedia.addExtension( + getIdForHeader(rtpHeaders, "urn:ietf:params:rtp-hdrext:toffset"), + "urn:ietf:params:rtp-hdrext:toffset", + ); + videoMedia.addExtension( + getIdForHeader( + rtpHeaders, + "http://www.webrtc.org/experiments/rtp-hdrext/playout-delay", + ), + "http://www.webrtc.org/experiments/rtp-hdrext/playout-delay", + ); + videoMedia.addExtension( + getIdForHeader( + rtpHeaders, + "http://www.ietf.org/id/draft-holmer-rmcat-transport-wide-cc-extensions-01", + ), + "http://www.ietf.org/id/draft-holmer-rmcat-transport-wide-cc-extensions-01", + ); + + if (audioCodec.type === 111) + // if this is chromium, apply this header + videoMedia.addExtension( + getIdForHeader(rtpHeaders, "urn:3gpp:video-orientation"), + "urn:3gpp:video-orientation", + ); + + offer.medias = [audioMedia, videoMedia]; + + const transport = this._endpoint.createTransport(offer); + + transport.setRemoteProperties(offer); + + channel?.onClientOffer(client, transport); + + const dtls = transport.getLocalDTLSInfo(); + const ice = transport.getLocalICEInfo(); + const fingerprint = dtls.getHash() + " " + dtls.getFingerprint(); + const candidates = transport.getLocalCandidates(); + const candidate = candidates[0]; + + const answer = + `m=audio ${this.port} ICE/SDP\n` + + `a=fingerprint:${fingerprint}\n` + + `c=IN IP4 ${this.ip}\n` + + `a=rtcp:${this.port}\n` + + `a=ice-ufrag:${ice.getUfrag()}\n` + + `a=ice-pwd:${ice.getPwd()}\n` + + `a=fingerprint:${fingerprint}\n` + + `a=candidate:1 1 ${candidate.getTransport()} ${candidate.getFoundation()} ${candidate.getAddress()} ${candidate.getPort()} typ host\n`; + + return Promise.resolve(answer); + } + + public onClientClose = (client: WebRtcClient) => { + this._channels.get(client.channel_id)?.onClientLeave(client); + }; + + public updateSDP(offer: string): void { + throw new Error("Method not implemented."); + } + + public createChannel(channelId: string): void { + this._channels.set(channelId, new VoiceChannel(channelId, this)); + } + + public disposeChannelRouter(channelId: string): void { + this._channels.delete(channelId); + } + + get channels(): Map { + return this._channels; + } + + public getClientsForChannel(channelId: string): Set> { + if (!this._channels.has(channelId)) { + return new Set(); + } + + return new Set(this._channels.get(channelId)?.clients.values())!; + } + + private getClientForUserId = ( + userId: string, + ): MedoozeWebRtcClient | undefined => { + for (const channel of this.channels.values()) { + let result = channel.getClientById(userId); + if (result) { + return result; + } + } + return undefined; + }; + + get ip(): string { + return this._ip; + } + get port(): number { + return this._port; + } + + get endpoint(): Endpoint { + return this._endpoint; + } + + public stop(): Promise { + return Promise.resolve(); + } +} diff --git a/src/webrtc/medooze/MedoozeWebRtcClient.ts b/src/webrtc/medooze/MedoozeWebRtcClient.ts new file mode 100644 index 00000000..9441aa02 --- /dev/null +++ b/src/webrtc/medooze/MedoozeWebRtcClient.ts @@ -0,0 +1,152 @@ +import { + IncomingStream, + OutgoingStream, + Transport, +} from "@dank074/medooze-media-server"; +import { SSRCs, WebRtcClient } from "webrtc/util"; +import { VoiceChannel } from "./VoiceChannel"; + +export class MedoozeWebRtcClient implements WebRtcClient { + websocket: any; + user_id: string; + channel_id: string; + webrtcConnected: boolean; + public transport?: Transport; + public incomingStream?: IncomingStream; + public outgoingStream?: OutgoingStream; + public channel?: VoiceChannel; + public isStopped?: boolean; + + constructor( + userId: string, + channelId: string, + websocket: any, + channel: VoiceChannel, + ) { + this.user_id = userId; + this.channel_id = channelId; + this.websocket = websocket; + this.channel = channel; + this.webrtcConnected = false; + this.isStopped = false; + } + + public isProducingAudio(): boolean { + if (!this.webrtcConnected) return false; + const audioTrack = this.incomingStream?.getTrack( + `audio-${this.user_id}`, + ); + + if (audioTrack) return true; + + return false; + } + + public isProducingVideo(): boolean { + if (!this.webrtcConnected) return false; + const videoTrack = this.incomingStream?.getTrack( + `video-${this.user_id}`, + ); + + if (videoTrack) return true; + + return false; + } + + public getIncomingStreamSSRCs(): SSRCs { + if (!this.webrtcConnected) + return { audio_ssrc: 0, video_ssrc: 0, rtx_ssrc: 0 }; + + const audioTrack = this.incomingStream?.getTrack( + `audio-${this.user_id}`, + ); + const audio_ssrc = + audioTrack?.getSSRCs()[audioTrack.getDefaultEncoding().id]; + const videoTrack = this.incomingStream?.getTrack( + `video-${this.user_id}`, + ); + const video_ssrc = + videoTrack?.getSSRCs()[videoTrack.getDefaultEncoding().id]; + + return { + audio_ssrc: audio_ssrc?.media ?? 0, + video_ssrc: video_ssrc?.media ?? 0, + rtx_ssrc: video_ssrc?.rtx ?? 0, + }; + } + + public getOutgoingStreamSSRCsForUser(user_id: string): SSRCs { + const outgoingStream = this.outgoingStream; + + const audioTrack = outgoingStream?.getTrack(`audio-${user_id}`); + const audio_ssrc = audioTrack?.getSSRCs(); + const videoTrack = outgoingStream?.getTrack(`video-${user_id}`); + const video_ssrc = videoTrack?.getSSRCs(); + + return { + audio_ssrc: audio_ssrc?.media ?? 0, + video_ssrc: video_ssrc?.media ?? 0, + rtx_ssrc: video_ssrc?.rtx ?? 0, + }; + } + + public publishTrack(type: "audio" | "video", ssrc: SSRCs) { + if (!this.transport) return; + + const id = `${type}-${this.user_id}`; + const existingTrack = this.incomingStream?.getTrack(id); + + if (existingTrack) { + console.error(`error: attempted to create duplicate track ${id}`); + return; + } + let ssrcs; + if (type === "audio") { + ssrcs = { media: ssrc.audio_ssrc! }; + } else { + ssrcs = { media: ssrc.video_ssrc!, rtx: ssrc.rtx_ssrc }; + } + const track = this.transport?.createIncomingStreamTrack( + type, + { id, ssrcs: ssrcs, media: type }, + this.incomingStream, + ); + + //this.channel?.onClientPublishTrack(this, track, ssrcs); + } + + public subscribeToTrack(user_id: string, type: "audio" | "video") { + if (!this.transport) return; + + const id = `${type}-${user_id}`; + + const otherClient = this.channel?.getClientById(user_id); + const incomingStream = otherClient?.incomingStream; + const incomingTrack = incomingStream?.getTrack(id); + + if (!incomingTrack) { + console.error(`error subscribing, not track found ${id}`); + return; + } + + let ssrcs; + if (type === "audio") { + ssrcs = { + media: otherClient?.getIncomingStreamSSRCs().audio_ssrc!, + }; + } else { + ssrcs = { + media: otherClient?.getIncomingStreamSSRCs().video_ssrc!, + rtx: otherClient?.getIncomingStreamSSRCs().rtx_ssrc, + }; + } + + const outgoingTrack = this.transport?.createOutgoingStreamTrack( + incomingTrack.media, + { id, ssrcs, media: incomingTrack.media }, + this.outgoingStream, + ); + + outgoingTrack?.attachTo(incomingTrack); + } +} diff --git a/src/webrtc/medooze/VoiceChannel.ts b/src/webrtc/medooze/VoiceChannel.ts new file mode 100644 index 00000000..937d7bf8 --- /dev/null +++ b/src/webrtc/medooze/VoiceChannel.ts @@ -0,0 +1,100 @@ +import { MedoozeSignalingDelegate } from "./MedoozeSignalingDelegate"; +import { + IncomingStreamTrack, + SSRCs, + Transport, +} from "@dank074/medooze-media-server"; +import { MedoozeWebRtcClient } from "./MedoozeWebRtcClient"; +import { StreamInfo } from "semantic-sdp"; + +export class VoiceChannel { + private _clients: Map; + private _id: string; + private _sfu: MedoozeSignalingDelegate; + + constructor(id: string, sfu: MedoozeSignalingDelegate) { + this._id = id; + + this._clients = new Map(); + this._sfu = sfu; + } + + onClientJoin = (client: MedoozeWebRtcClient) => { + // do shit here + this._clients.set(client.user_id, client); + }; + + onClientOffer = (client: MedoozeWebRtcClient, transport: Transport) => { + client.transport = transport; + + client.transport.on("dtlsstate", (state, self) => { + if (state === "connected") { + client.webrtcConnected = true; + console.log("connected"); + } + }); + + client.incomingStream = transport.createIncomingStream( + new StreamInfo(`in-${client.user_id}`), + ); + + client.outgoingStream = transport.createOutgoingStream( + new StreamInfo(`out-${client.user_id}`), + ); + + client.webrtcConnected = true; + + // subscribe to all current streams from this channel + // for(const otherClient of this._clients.values()) { + // const incomingStream = otherClient.incomingStream + + // if(!incomingStream) continue; + + // for(const track of (incomingStream.getTracks())) { + // client.subscribeToTrack(otherClient.user_id, track.media) + // } + // } + }; + + onClientLeave = (client: MedoozeWebRtcClient) => { + console.log("stopping client"); + this._clients.delete(client.user_id); + + // stop the client + if (!client.isStopped) { + client.isStopped = true; + + for (const otherClient of this.clients.values()) { + //remove outgoing track for this user + otherClient.outgoingStream + ?.getTrack(`audio-${client.user_id}`) + ?.stop(); + otherClient.outgoingStream + ?.getTrack(`video-${client.user_id}`) + ?.stop(); + } + + client.incomingStream?.stop(); + client.outgoingStream?.stop(); + + client.transport?.stop(); + client.channel = undefined; + client.incomingStream = undefined; + client.outgoingStream = undefined; + client.transport = undefined; + client.websocket = undefined; + } + }; + + get clients(): Map { + return this._clients; + } + + getClientById = (id: string) => { + return this._clients.get(id); + }; + + get id(): string { + return this._id; + } +} diff --git a/src/webrtc/opcodes/Identify.ts b/src/webrtc/opcodes/Identify.ts index 3f65127e..5c9f9876 100644 --- a/src/webrtc/opcodes/Identify.ts +++ b/src/webrtc/opcodes/Identify.ts @@ -22,9 +22,7 @@ import { VoiceIdentifySchema, VoiceState, } from "@spacebar/util"; -import { endpoint, getClients, VoiceOPCodes, PublicIP } from "@spacebar/webrtc"; -import SemanticSDP from "semantic-sdp"; -const defaultSDP = require("./sdp.json"); +import { mediaServer, VoiceOPCodes } from "@spacebar/webrtc"; export async function onIdentify(this: WebSocket, data: Payload) { clearTimeout(this.readyTimeout); @@ -38,53 +36,33 @@ export async function onIdentify(this: WebSocket, data: Payload) { this.user_id = user_id; this.session_id = session_id; - const sdp = SemanticSDP.SDPInfo.expand(defaultSDP); - sdp.setDTLS( - SemanticSDP.DTLSInfo.expand({ - setup: "actpass", - hash: "sha-256", - fingerprint: endpoint.getDTLSFingerprint(), - }), - ); - this.client = { - websocket: this, - out: { - tracks: new Map(), - }, - in: { - audio_ssrc: 0, - video_ssrc: 0, - rtx_ssrc: 0, - }, - sdp, - channel_id: voiceState.channel_id, - }; - - const clients = getClients(voiceState.channel_id)!; - clients.add(this.client); + this.client = mediaServer.join(voiceState.channel_id, this.user_id, this); this.on("close", () => { - clients.delete(this.client!); + mediaServer.onClientClose(this.client!); }); await Send(this, { op: VoiceOPCodes.READY, d: { - streams: [ - // { type: "video", ssrc: this.ssrc + 1, rtx_ssrc: this.ssrc + 2, rid: "100", quality: 100, active: false } - ], - ssrc: -1, - port: endpoint.getLocalPort(), + streams: streams?.map((x) => ({ + ...x, + ssrc: 2, + rtx_ssrc: 3, + })), + ssrc: 1, + port: mediaServer.port, modes: [ "aead_aes256_gcm_rtpsize", "aead_aes256_gcm", + "aead_xchacha20_poly1305_rtpsize", "xsalsa20_poly1305_lite_rtpsize", "xsalsa20_poly1305_lite", "xsalsa20_poly1305_suffix", "xsalsa20_poly1305", ], - ip: PublicIP, + ip: mediaServer.ip, experiments: [], }, }); diff --git a/src/webrtc/opcodes/SelectProtocol.ts b/src/webrtc/opcodes/SelectProtocol.ts index 0a06e722..d88f6711 100644 --- a/src/webrtc/opcodes/SelectProtocol.ts +++ b/src/webrtc/opcodes/SelectProtocol.ts @@ -18,8 +18,7 @@ import { Payload, Send, WebSocket } from "@spacebar/gateway"; import { SelectProtocolSchema, validateSchema } from "@spacebar/util"; -import { PublicIP, VoiceOPCodes, endpoint } from "@spacebar/webrtc"; -import SemanticSDP, { MediaInfo, SDPInfo } from "semantic-sdp"; +import { VoiceOPCodes, mediaServer } from "@spacebar/webrtc"; export async function onSelectProtocol(this: WebSocket, payload: Payload) { if (!this.client) return; @@ -29,31 +28,11 @@ export async function onSelectProtocol(this: WebSocket, payload: Payload) { payload.d, ) as SelectProtocolSchema; - const offer = SemanticSDP.SDPInfo.parse("m=audio\n" + data.sdp!); - this.client.sdp!.setICE(offer.getICE()); - this.client.sdp!.setDTLS(offer.getDTLS()); - - const transport = endpoint.createTransport(this.client.sdp!); - this.client.transport = transport; - transport.setRemoteProperties(this.client.sdp!); - transport.setLocalProperties(this.client.sdp!); - - const dtls = transport.getLocalDTLSInfo(); - const ice = transport.getLocalICEInfo(); - const port = endpoint.getLocalPort(); - const fingerprint = dtls.getHash() + " " + dtls.getFingerprint(); - const candidates = transport.getLocalCandidates(); - const candidate = candidates[0]; - - const answer = - `m=audio ${port} ICE/SDP` + - `a=fingerprint:${fingerprint}` + - `c=IN IP4 ${PublicIP}` + - `a=rtcp:${port}` + - `a=ice-ufrag:${ice.getUfrag()}` + - `a=ice-pwd:${ice.getPwd()}` + - `a=fingerprint:${fingerprint}` + - `a=candidate:1 1 ${candidate.getTransport()} ${candidate.getFoundation()} ${candidate.getAddress()} ${candidate.getPort()} typ host`; + const answer = await mediaServer.onOffer( + this.client, + data.sdp!, + data.codecs ?? [], + ); await Send(this, { op: VoiceOPCodes.SESSION_DESCRIPTION, diff --git a/src/webrtc/opcodes/Speaking.ts b/src/webrtc/opcodes/Speaking.ts index 97055e0a..22cff096 100644 --- a/src/webrtc/opcodes/Speaking.ts +++ b/src/webrtc/opcodes/Speaking.ts @@ -17,24 +17,27 @@ */ import { Payload, Send, WebSocket } from "@spacebar/gateway"; -import { getClients, VoiceOPCodes } from "../util"; +import { mediaServer, VoiceOPCodes } from "../util"; // {"speaking":1,"delay":5,"ssrc":2805246727} export async function onSpeaking(this: WebSocket, data: Payload) { if (!this.client) return; - getClients(this.client.channel_id).forEach((client) => { - if (client === this.client) return; - const ssrc = this.client!.out.tracks.get(client.websocket.user_id); + mediaServer + .getClientsForChannel(this.client.channel_id) + .forEach((client) => { + if (client.user_id === this.user_id) return; - Send(client.websocket, { - op: VoiceOPCodes.SPEAKING, - d: { - user_id: client.websocket.user_id, - speaking: data.d.speaking, - ssrc: ssrc?.audio_ssrc || 0, - }, + const ssrc = client.getOutgoingStreamSSRCsForUser(this.user_id); + + Send(client.websocket, { + op: VoiceOPCodes.SPEAKING, + d: { + user_id: this.user_id, + speaking: data.d.speaking, + ssrc: ssrc.audio_ssrc ?? 0, + }, + }); }); - }); } diff --git a/src/webrtc/opcodes/Video.ts b/src/webrtc/opcodes/Video.ts index 3228d4ee..b8b8d38e 100644 --- a/src/webrtc/opcodes/Video.ts +++ b/src/webrtc/opcodes/Video.ts @@ -18,134 +18,98 @@ import { Payload, Send, WebSocket } from "@spacebar/gateway"; import { validateSchema, VoiceVideoSchema } from "@spacebar/util"; -import { channels, getClients, VoiceOPCodes } from "@spacebar/webrtc"; -import { IncomingStreamTrack, SSRCs } from "medooze-media-server"; -import SemanticSDP from "semantic-sdp"; +import { mediaServer, VoiceOPCodes, WebRtcClient } from "@spacebar/webrtc"; export async function onVideo(this: WebSocket, payload: Payload) { - if (!this.client) return; - const { transport, channel_id } = this.client; - if (!transport) return; + if (!this.client || !this.client.webrtcConnected) return; + const { channel_id } = this.client; + const d = validateSchema("VoiceVideoSchema", payload.d) as VoiceVideoSchema; + const stream = d.streams?.find((element) => element !== undefined); + await Send(this, { op: VoiceOPCodes.MEDIA_SINK_WANTS, d: { any: 100 } }); - const id = "stream" + this.user_id; + const ssrcs = this.client.getIncomingStreamSSRCs(); - var stream = this.client.in.stream!; - if (!stream) { - stream = this.client.transport!.createIncomingStream( - // @ts-ignore - SemanticSDP.StreamInfo.expand({ - id, - // @ts-ignore - tracks: [], - }), - ); - this.client.in.stream = stream; + const clientsThatNeedUpdate = new Set>(); - const interval = setInterval(() => { - for (const track of stream.getTracks()) { - for (const layer of Object.values(track.getStats())) { - console.log(track.getId(), layer.total); - } + // check if client has signaled that it will send audio + if (d.audio_ssrc !== 0) { + // check if we already have incoming media for this ssrcs, if not, publish a new audio track for it + if (ssrcs.audio_ssrc != d.audio_ssrc) { + console.log( + `[${this.user_id}] publishing new audio track ssrc:${d.audio_ssrc}`, + ); + this.client.publishTrack("audio", { audio_ssrc: d.audio_ssrc }); + } + + // now check that all clients have outgoing media for this ssrcs + for (const client of mediaServer.getClientsForChannel( + channel_id, + )) { + if (client.user_id === this.user_id) continue; + + const ssrcs = client.getOutgoingStreamSSRCsForUser(this.user_id); + if (ssrcs.audio_ssrc != d.audio_ssrc) { + console.log( + `[${client.user_id}] subscribing to audio track ssrcs: ${d.audio_ssrc}`, + ); + client.subscribeToTrack(this.client.user_id, "audio"); + + clientsThatNeedUpdate.add(client); } - }, 5000); - - stream.on("stopped", () => { - console.log("stream stopped"); - clearInterval(interval); - }); - this.on("close", () => { - transport!.stop(); - }); - const out = transport.createOutgoingStream( - // @ts-ignore - SemanticSDP.StreamInfo.expand({ - id: "out" + this.user_id, - // @ts-ignore - tracks: [], - }), - ); - this.client.out.stream = out; - - const clients = channels.get(channel_id)!; - - clients.forEach((client) => { - if (client.websocket.user_id === this.user_id) return; - if (!client.in.stream) return; - - client.in.stream?.getTracks().forEach((track) => { - attachTrack.call(this, track, client.websocket.user_id); + } + } + // check if client has signaled that it will send video + if (d.video_ssrc !== 0 && stream?.active) { + // check if we already have incoming media for this ssrcs, if not, publish a new video track for it + if (ssrcs.video_ssrc != d.video_ssrc) { + console.log( + `[${this.user_id}] publishing new video track ssrc:${d.video_ssrc}`, + ); + this.client.publishTrack("video", { + video_ssrc: d.video_ssrc, + rtx_ssrc: d.rtx_ssrc, }); - }); + } + + // now check that all clients have outgoing media for this ssrcs + for (const client of mediaServer.getClientsForChannel( + channel_id, + )) { + if (client.user_id === this.user_id) continue; + + const ssrcs = client.getOutgoingStreamSSRCsForUser( + this.client.user_id, + ); + if (ssrcs.video_ssrc != d.video_ssrc) { + console.log( + `[${client.user_id}] subscribing to video track ssrc: ${d.video_ssrc}`, + ); + client.subscribeToTrack(this.client.user_id, "video"); + + clientsThatNeedUpdate.add(client); + } + } } - if (d.audio_ssrc) { - handleSSRC.call(this, "audio", { - media: d.audio_ssrc, - rtx: d.audio_ssrc + 1, - }); - } - if (d.video_ssrc && d.rtx_ssrc) { - handleSSRC.call(this, "video", { - media: d.video_ssrc, - rtx: d.rtx_ssrc, - }); - } -} - -function attachTrack( - this: WebSocket, - track: IncomingStreamTrack, - user_id: string, -) { - if (!this.client) return; - const outTrack = this.client.transport!.createOutgoingStreamTrack( - track.getMedia(), - ); - outTrack.attachTo(track); - this.client.out.stream!.addTrack(outTrack); - var ssrcs = this.client.out.tracks.get(user_id)!; - if (!ssrcs) - ssrcs = this.client.out.tracks - .set(user_id, { audio_ssrc: 0, rtx_ssrc: 0, video_ssrc: 0 }) - .get(user_id)!; - - if (track.getMedia() === "audio") { - ssrcs.audio_ssrc = outTrack.getSSRCs().media!; - } else if (track.getMedia() === "video") { - ssrcs.video_ssrc = outTrack.getSSRCs().media!; - ssrcs.rtx_ssrc = outTrack.getSSRCs().rtx!; - } - - Send(this, { - op: VoiceOPCodes.VIDEO, - d: { - user_id: user_id, - ...ssrcs, - } as VoiceVideoSchema, - }); -} - -function handleSSRC(this: WebSocket, type: "audio" | "video", ssrcs: SSRCs) { - if (!this.client) return; - const stream = this.client.in.stream!; - const transport = this.client.transport!; - - const id = type + ssrcs.media; - var track = stream.getTrack(id); - if (!track) { - console.log("createIncomingStreamTrack", id); - track = transport.createIncomingStreamTrack(type, { id, ssrcs }); - stream.addTrack(track); - - const clients = getClients(this.client.channel_id)!; - clients.forEach((client) => { - if (client.websocket.user_id === this.user_id) return; - if (!client.out.stream) return; - - attachTrack.call(this, track, client.websocket.user_id); + for (const client of clientsThatNeedUpdate) { + const ssrcs = client.getOutgoingStreamSSRCsForUser(this.user_id); + + Send(client.websocket, { + op: VoiceOPCodes.VIDEO, + d: { + user_id: this.user_id, + audio_ssrc: ssrcs.audio_ssrc ?? 0, + video_ssrc: ssrcs.video_ssrc ?? 0, + rtx_ssrc: ssrcs.rtx_ssrc ?? 0, + streams: d.streams?.map((x) => ({ + ...x, + ssrc: ssrcs.video_ssrc ?? 0, + rtx_ssrc: ssrcs.rtx_ssrc ?? 0, + })), + } as VoiceVideoSchema, }); } } diff --git a/src/webrtc/opcodes/index.ts b/src/webrtc/opcodes/index.ts index 34681055..217bd5ab 100644 --- a/src/webrtc/opcodes/index.ts +++ b/src/webrtc/opcodes/index.ts @@ -34,4 +34,4 @@ export default { [VoiceOPCodes.VIDEO]: onVideo, [VoiceOPCodes.SPEAKING]: onSpeaking, [VoiceOPCodes.SELECT_PROTOCOL]: onSelectProtocol, -}; +} as { [key: number]: OPCodeHandler }; diff --git a/src/webrtc/util/MediaServer.ts b/src/webrtc/util/MediaServer.ts index 0c12876c..7dfb9918 100644 --- a/src/webrtc/util/MediaServer.ts +++ b/src/webrtc/util/MediaServer.ts @@ -16,62 +16,7 @@ along with this program. If not, see . */ -import { WebSocket } from "@spacebar/gateway"; -import MediaServer, { - IncomingStream, - OutgoingStream, - Transport, -} from "medooze-media-server"; -import SemanticSDP from "semantic-sdp"; -MediaServer.enableLog(true); +import { MedoozeSignalingDelegate } from "../medooze/MedoozeSignalingDelegate"; +import { SignalingDelegate } from "./SignalingDelegate"; -export const PublicIP = process.env.PUBLIC_IP || "127.0.0.1"; - -try { - const range = process.env.WEBRTC_PORT_RANGE || "4000"; - var ports = range.split("-"); - const min = Number(ports[0]); - const max = Number(ports[1]); - - MediaServer.setPortRange(min, max); -} catch (error) { - console.error( - "Invalid env var: WEBRTC_PORT_RANGE", - process.env.WEBRTC_PORT_RANGE, - error, - ); - process.exit(1); -} - -export const endpoint = MediaServer.createEndpoint(PublicIP); - -export const channels = new Map>(); - -export interface Client { - transport?: Transport; - websocket: WebSocket; - out: { - stream?: OutgoingStream; - tracks: Map< - string, - { - audio_ssrc: number; - video_ssrc: number; - rtx_ssrc: number; - } - >; - }; - in: { - stream?: IncomingStream; - audio_ssrc: number; - video_ssrc: number; - rtx_ssrc: number; - }; - sdp: SemanticSDP.SDPInfo; - channel_id: string; -} - -export function getClients(channel_id: string) { - if (!channels.has(channel_id)) channels.set(channel_id, new Set()); - return channels.get(channel_id)!; -} +export const mediaServer: SignalingDelegate = new MedoozeSignalingDelegate(); diff --git a/src/webrtc/util/SignalingDelegate.ts b/src/webrtc/util/SignalingDelegate.ts new file mode 100644 index 00000000..de57f319 --- /dev/null +++ b/src/webrtc/util/SignalingDelegate.ts @@ -0,0 +1,17 @@ +import { Codec, WebRtcClient } from "./WebRtcClient"; + +export interface SignalingDelegate { + start: () => Promise; + stop: () => Promise; + join(channelId: string, userId: string, ws: T): WebRtcClient; + onOffer( + client: WebRtcClient, + offer: string, + codecs: Codec[], + ): Promise; + onClientClose(client: WebRtcClient): void; + updateSDP(offer: string): void; + getClientsForChannel(channelId: string): Set>; + get ip(): string; + get port(): number; +} diff --git a/src/webrtc/util/WebRtcClient.ts b/src/webrtc/util/WebRtcClient.ts new file mode 100644 index 00000000..0662b315 --- /dev/null +++ b/src/webrtc/util/WebRtcClient.ts @@ -0,0 +1,31 @@ +export interface WebRtcClient { + websocket: T; + user_id: string; + channel_id: string; + webrtcConnected: boolean; + getIncomingStreamSSRCs: () => SSRCs; + getOutgoingStreamSSRCsForUser: (user_id: string) => SSRCs; + isProducingAudio: () => boolean; + isProducingVideo: () => boolean; + publishTrack: (type: "audio" | "video", ssrc: SSRCs) => void; + subscribeToTrack: (user_id: string, type: "audio" | "video") => void; +} + +export interface SSRCs { + audio_ssrc?: number; + video_ssrc?: number; + rtx_ssrc?: number; +} + +export interface RtpHeader { + uri: string; + id: number; +} + +export interface Codec { + name: "opus" | "VP8" | "VP9" | "H264"; + type: "audio" | "video"; + priority: number; + payload_type: number; + rtx_payload_type?: number; +} diff --git a/src/webrtc/util/index.ts b/src/webrtc/util/index.ts index 66126c1f..e50c72d1 100644 --- a/src/webrtc/util/index.ts +++ b/src/webrtc/util/index.ts @@ -18,3 +18,4 @@ export * from "./Constants"; export * from "./MediaServer"; +export * from "./WebRtcClient"; diff --git a/tsconfig.json b/tsconfig.json index 63b5e96c..5d408b24 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -1,5 +1,4 @@ { - "exclude": ["./src/webrtc"], "include": ["./src"], "compilerOptions": { /* Visit https://aka.ms/tsconfig to read more about this file */ @@ -37,7 +36,8 @@ "@spacebar/api*": ["./api"], "@spacebar/gateway*": ["./gateway"], "@spacebar/cdn*": ["./cdn"], - "@spacebar/util*": ["./util"] + "@spacebar/util*": ["./util"], + "@spacebar/webrtc*": ["./webrtc"] } /* Specify a set of entries that re-map imports to additional lookup locations. */, // "rootDirs": [], /* Allow multiple folders to be treated as one when resolving modules. */ // "typeRoots": [], /* Specify multiple folders that act like './node_modules/@types'. */