start adding some abstraction

This commit is contained in:
dank074 2025-04-01 01:22:43 -05:00
parent 24292592c4
commit e2b9924d6e
23 changed files with 745 additions and 269 deletions

View File

@ -614199,10 +614199,7 @@
"type": "integer" "type": "integer"
}, },
"rtx_payload_type": { "rtx_payload_type": {
"type": [ "type": "integer"
"null",
"integer"
]
} }
}, },
"additionalProperties": false, "additionalProperties": false,
@ -684127,6 +684124,9 @@
"type" "type"
] ]
} }
},
"max_secure_frames_version": {
"type": "integer"
} }
}, },
"additionalProperties": false, "additionalProperties": false,
@ -749017,6 +749017,42 @@
"items": { "items": {
"type": "string" "type": "string"
} }
},
"message_reference": {
"type": "object",
"properties": {
"message_id": {
"type": "string"
},
"channel_id": {
"type": "string"
},
"guild_id": {
"type": "string"
},
"fail_if_not_exists": {
"type": "boolean"
}
},
"additionalProperties": false,
"required": [
"message_id"
]
},
"sticker_ids": {
"type": "array",
"items": {
"type": "string"
}
},
"nonce": {
"type": "string"
},
"enforce_nonce": {
"type": "boolean"
},
"poll": {
"$ref": "#/definitions/PollCreationSchema"
} }
}, },
"additionalProperties": false, "additionalProperties": false,

View File

@ -118,11 +118,14 @@
"@spacebar/api": "dist/api", "@spacebar/api": "dist/api",
"@spacebar/cdn": "dist/cdn", "@spacebar/cdn": "dist/cdn",
"@spacebar/gateway": "dist/gateway", "@spacebar/gateway": "dist/gateway",
"@spacebar/util": "dist/util" "@spacebar/util": "dist/util",
"@spacebar/webrtc": "dist/webrtc"
}, },
"optionalDependencies": { "optionalDependencies": {
"@yukikaze-bot/erlpack": "^1.0.1", "@yukikaze-bot/erlpack": "^1.0.1",
"jimp": "^1.6.0", "jimp": "^1.6.0",
"@dank074/medooze-media-server": "1.156.3",
"semantic-sdp": "^3.31.1",
"mysql": "^2.18.1", "mysql": "^2.18.1",
"nodemailer-mailgun-transport": "^2.1.5", "nodemailer-mailgun-transport": "^2.1.5",
"nodemailer-mailjet-transport": "github:n0script22/nodemailer-mailjet-transport", "nodemailer-mailjet-transport": "github:n0script22/nodemailer-mailjet-transport",

View File

@ -22,6 +22,7 @@ process.on("uncaughtException", console.error);
import http from "http"; import http from "http";
import * as Api from "@spacebar/api"; import * as Api from "@spacebar/api";
import * as Gateway from "@spacebar/gateway"; import * as Gateway from "@spacebar/gateway";
import * as Webrtc from "@spacebar/webrtc";
import { CDNServer } from "@spacebar/cdn"; import { CDNServer } from "@spacebar/cdn";
import express from "express"; import express from "express";
import { green, bold } from "picocolors"; import { green, bold } from "picocolors";
@ -36,12 +37,14 @@ server.on("request", app);
const api = new Api.SpacebarServer({ server, port, production, app }); const api = new Api.SpacebarServer({ server, port, production, app });
const cdn = new CDNServer({ server, port, production, app }); const cdn = new CDNServer({ server, port, production, app });
const gateway = new Gateway.Server({ server, port, production }); const gateway = new Gateway.Server({ server, port, production });
const webrtc = new Webrtc.Server({ server: undefined, port: 3004, production });
process.on("SIGTERM", async () => { process.on("SIGTERM", async () => {
console.log("Shutting down due to SIGTERM"); console.log("Shutting down due to SIGTERM");
await gateway.stop(); await gateway.stop();
await cdn.stop(); await cdn.stop();
await api.stop(); await api.stop();
await webrtc.stop();
server.close(); server.close();
Sentry.close(); Sentry.close();
}); });
@ -54,7 +57,12 @@ async function main() {
await new Promise((resolve) => await new Promise((resolve) =>
server.listen({ port }, () => resolve(undefined)), server.listen({ port }, () => resolve(undefined)),
); );
await Promise.all([api.start(), cdn.start(), gateway.start()]); await Promise.all([
api.start(),
cdn.start(),
gateway.start(),
webrtc.start(),
]);
Sentry.errorHandler(app); Sentry.errorHandler(app);

View File

@ -17,19 +17,19 @@
*/ */
import { Payload, WebSocket } from "@spacebar/gateway"; import { Payload, WebSocket } from "@spacebar/gateway";
import { genVoiceToken } from "../util/SessionUtils";
import { check } from "./instanceOf";
import { import {
Config, Config,
emitEvent, emitEvent,
Guild, Guild,
Member, Member,
Region,
VoiceServerUpdateEvent, VoiceServerUpdateEvent,
VoiceState, VoiceState,
VoiceStateUpdateEvent, VoiceStateUpdateEvent,
VoiceStateUpdateSchema, VoiceStateUpdateSchema,
Region,
} from "@spacebar/util"; } from "@spacebar/util";
import { genVoiceToken } from "../util/SessionUtils";
import { check } from "./instanceOf";
// TODO: check if a voice server is setup // TODO: check if a voice server is setup
// Notice: Bot users respect the voice channel's user limit, if set. // Notice: Bot users respect the voice channel's user limit, if set.
@ -39,6 +39,8 @@ import {
export async function onVoiceStateUpdate(this: WebSocket, data: Payload) { export async function onVoiceStateUpdate(this: WebSocket, data: Payload) {
check.call(this, VoiceStateUpdateSchema, data.d); check.call(this, VoiceStateUpdateSchema, data.d);
const body = data.d as VoiceStateUpdateSchema; const body = data.d as VoiceStateUpdateSchema;
const isNew = body.channel_id === null && body.guild_id === null;
let isChanged = false;
let voiceState: VoiceState; let voiceState: VoiceState;
try { try {
@ -54,6 +56,8 @@ export async function onVoiceStateUpdate(this: WebSocket, data: Payload) {
return; return;
} }
if (voiceState.channel_id !== body.channel_id) isChanged = true;
//If a user change voice channel between guild we should send a left event first //If a user change voice channel between guild we should send a left event first
if ( if (
voiceState.guild_id !== body.guild_id && voiceState.guild_id !== body.guild_id &&
@ -111,7 +115,7 @@ export async function onVoiceStateUpdate(this: WebSocket, data: Payload) {
]); ]);
//If it's null it means that we are leaving the channel and this event is not needed //If it's null it means that we are leaving the channel and this event is not needed
if (voiceState.channel_id !== null) { if ((isNew || isChanged) && voiceState.channel_id !== null) {
const guild = await Guild.findOne({ const guild = await Guild.findOne({
where: { id: voiceState.guild_id }, where: { id: voiceState.guild_id },
}); });
@ -135,6 +139,7 @@ export async function onVoiceStateUpdate(this: WebSocket, data: Payload) {
endpoint: guildRegion.endpoint, endpoint: guildRegion.endpoint,
}, },
guild_id: voiceState.guild_id, guild_id: voiceState.guild_id,
user_id: voiceState.user_id,
} as VoiceServerUpdateEvent); } as VoiceServerUpdateEvent);
} }
} }

View File

@ -16,7 +16,7 @@
along with this program. If not, see <https://www.gnu.org/licenses/>. along with this program. If not, see <https://www.gnu.org/licenses/>.
*/ */
// import { VoiceOPCodes } from "@spacebar/webrtc"; import { VoiceOPCodes } from "@spacebar/webrtc";
export enum OPCODES { export enum OPCODES {
Dispatch = 0, Dispatch = 0,
@ -63,7 +63,7 @@ export enum CLOSECODES {
} }
export interface Payload { export interface Payload {
op: OPCODES /* | VoiceOPCodes */; op: OPCODES | VoiceOPCodes;
// eslint-disable-next-line @typescript-eslint/no-explicit-any // eslint-disable-next-line @typescript-eslint/no-explicit-any
d?: any; d?: any;
s?: number; s?: number;

View File

@ -20,7 +20,7 @@ import { Intents, ListenEventOpts, Permissions } from "@spacebar/util";
import WS from "ws"; import WS from "ws";
import { Deflate, Inflate } from "fast-zlib"; import { Deflate, Inflate } from "fast-zlib";
import { Capabilities } from "./Capabilities"; import { Capabilities } from "./Capabilities";
// import { Client } from "@spacebar/webrtc"; import { WebRtcClient } from "@spacebar/webrtc";
export interface WebSocket extends WS { export interface WebSocket extends WS {
version: number; version: number;
@ -42,6 +42,6 @@ export interface WebSocket extends WS {
member_events: Record<string, () => unknown>; member_events: Record<string, () => unknown>;
listen_options: ListenEventOpts; listen_options: ListenEventOpts;
capabilities?: Capabilities; capabilities?: Capabilities;
// client?: Client; client?: WebRtcClient<WebSocket>;
large_threshold: number; large_threshold: number;
} }

View File

@ -31,7 +31,7 @@ export interface SelectProtocolSchema {
type: "audio" | "video"; type: "audio" | "video";
priority: number; priority: number;
payload_type: number; payload_type: number;
rtx_payload_type?: number | null; rtx_payload_type?: number;
}[]; }[];
rtc_connection_id?: string; // uuid rtc_connection_id?: string; // uuid
} }

View File

@ -27,4 +27,5 @@ export interface VoiceIdentifySchema {
rid: string; rid: string;
quality: number; quality: number;
}[]; }[];
max_secure_frames_version?: number;
} }

View File

@ -21,6 +21,7 @@ import dotenv from "dotenv";
import http from "http"; import http from "http";
import ws from "ws"; import ws from "ws";
import { Connection } from "./events/Connection"; import { Connection } from "./events/Connection";
import { mediaServer } from "./util/MediaServer";
dotenv.config(); dotenv.config();
export class Server { export class Server {
@ -69,6 +70,7 @@ export class Server {
await initDatabase(); await initDatabase();
await Config.init(); await Config.init();
await initEvent(); await initEvent();
await mediaServer.start();
if (!this.server.listening) { if (!this.server.listening) {
this.server.listen(this.port); this.server.listen(this.port);
console.log(`[WebRTC] online on 0.0.0.0:${this.port}`); console.log(`[WebRTC] online on 0.0.0.0:${this.port}`);
@ -78,5 +80,6 @@ export class Server {
async stop() { async stop() {
closeDatabase(); closeDatabase();
this.server.close(); this.server.close();
mediaServer.stop();
} }
} }

View File

@ -30,14 +30,12 @@ const PayloadSchema = {
export async function onMessage(this: WebSocket, buffer: Buffer) { export async function onMessage(this: WebSocket, buffer: Buffer) {
try { try {
var data: Payload = JSON.parse(buffer.toString()); const data: Payload = JSON.parse(buffer.toString());
if (data.op !== VoiceOPCodes.IDENTIFY && !this.user_id) if (data.op !== VoiceOPCodes.IDENTIFY && !this.user_id)
return this.close(CLOSECODES.Not_authenticated); return this.close(CLOSECODES.Not_authenticated);
// @ts-ignore
const OPCodeHandler = OPCodeHandlers[data.op]; const OPCodeHandler = OPCodeHandlers[data.op];
if (!OPCodeHandler) { if (!OPCodeHandler) {
// @ts-ignore
console.error("[WebRTC] Unkown opcode " + VoiceOPCodes[data.op]); console.error("[WebRTC] Unkown opcode " + VoiceOPCodes[data.op]);
// TODO: if all opcodes are implemented comment this out: // TODO: if all opcodes are implemented comment this out:
// this.close(CloseCodes.Unknown_opcode); // this.close(CloseCodes.Unknown_opcode);
@ -49,7 +47,6 @@ export async function onMessage(this: WebSocket, buffer: Buffer) {
data.op as VoiceOPCodes, data.op as VoiceOPCodes,
) )
) { ) {
// @ts-ignore
console.log("[WebRTC] Opcode " + VoiceOPCodes[data.op]); console.log("[WebRTC] Opcode " + VoiceOPCodes[data.op]);
} }

View File

@ -0,0 +1,253 @@
import { CodecInfo, MediaInfo, SDPInfo } from "semantic-sdp";
import { SignalingDelegate } from "../util/SignalingDelegate";
import { Codec, WebRtcClient } from "../util/WebRtcClient";
import {
MediaServer,
IncomingStream,
OutgoingStream,
Transport,
Endpoint,
} from "@dank074/medooze-media-server";
import { VoiceChannel } from "./VoiceChannel";
import { MedoozeWebRtcClient } from "./MedoozeWebRtcClient";
export class MedoozeSignalingDelegate implements SignalingDelegate {
private _channels: Map<string, VoiceChannel> = new Map();
private _ip: string;
private _port: number;
private _endpoint: Endpoint;
public start(): Promise<void> {
MediaServer.enableLog(true);
this._ip = process.env.PUBLIC_IP || "127.0.0.1";
try {
const range = process.env.WEBRTC_PORT_RANGE || "3690-3960";
var ports = range.split("-");
const min = Number(ports[0]);
const max = Number(ports[1]);
MediaServer.setPortRange(min, max);
} catch (error) {
console.error(
"Invalid env var: WEBRTC_PORT_RANGE",
process.env.WEBRTC_PORT_RANGE,
error,
);
process.exit(1);
}
//MediaServer.setAffinity(2)
this._endpoint = MediaServer.createEndpoint(this._ip);
this._port = this._endpoint.getLocalPort();
return Promise.resolve();
}
public join(channelId: string, userId: string, ws: any): WebRtcClient<any> {
const existingClient = this.getClientForUserId(userId);
if (existingClient) {
console.log("client already connected, disconnect..");
this.onClientClose(existingClient);
}
if (!this._channels.has(channelId)) {
console.debug("no channel created, creating one...");
this.createChannel(channelId);
}
const channel = this._channels.get(channelId)!;
const client = new MedoozeWebRtcClient(userId, channelId, ws, channel);
channel?.onClientJoin(client);
return client;
}
public async onOffer(
client: WebRtcClient<any>,
sdpOffer: string,
codecs: Codec[],
): Promise<string> {
const channel = this._channels.get(client.channel_id);
if (!channel) {
console.error(
"error, client sent an offer but has not authenticated",
);
Promise.reject();
}
const offer = SDPInfo.parse("m=audio\n" + sdpOffer);
const rtpHeaders = new Map(offer.medias[0].extensions);
const getIdForHeader = (
rtpHeaders: Map<number, string>,
headerUri: string,
) => {
for (const [key, value] of rtpHeaders) {
if (value == headerUri) return key;
}
return -1;
};
const audioMedia = new MediaInfo("0", "audio");
const audioCodec = new CodecInfo(
"opus",
codecs.find((val) => val.name == "opus")?.payload_type ?? 111,
);
audioCodec.addParam("minptime", "10");
audioCodec.addParam("usedtx", "1");
audioCodec.addParam("useinbandfec", "1");
audioCodec.setChannels(2);
audioMedia.addCodec(audioCodec);
audioMedia.addExtension(
getIdForHeader(
rtpHeaders,
"urn:ietf:params:rtp-hdrext:ssrc-audio-level",
),
"urn:ietf:params:rtp-hdrext:ssrc-audio-level",
);
if (audioCodec.type === 111)
// if this is chromium, apply this header
audioMedia.addExtension(
getIdForHeader(
rtpHeaders,
"http://www.ietf.org/id/draft-holmer-rmcat-transport-wide-cc-extensions-01",
),
"http://www.ietf.org/id/draft-holmer-rmcat-transport-wide-cc-extensions-01",
);
const videoMedia = new MediaInfo("1", "video");
const videoCodec = new CodecInfo(
"H264",
codecs.find((val) => val.name == "H264")?.payload_type ?? 102,
);
videoCodec.setRTX(
codecs.find((val) => val.name == "H264")?.rtx_payload_type ?? 103,
);
videoCodec.addParam("level-asymmetry-allowed", "1");
videoCodec.addParam("packetization-mode", "1");
videoCodec.addParam("profile-level-id", "42e01f");
videoCodec.addParam("x-google-max-bitrate", "2500");
videoMedia.addCodec(videoCodec);
videoMedia.addExtension(
getIdForHeader(
rtpHeaders,
"http://www.webrtc.org/experiments/rtp-hdrext/abs-send-time",
),
"http://www.webrtc.org/experiments/rtp-hdrext/abs-send-time",
);
videoMedia.addExtension(
getIdForHeader(rtpHeaders, "urn:ietf:params:rtp-hdrext:toffset"),
"urn:ietf:params:rtp-hdrext:toffset",
);
videoMedia.addExtension(
getIdForHeader(
rtpHeaders,
"http://www.webrtc.org/experiments/rtp-hdrext/playout-delay",
),
"http://www.webrtc.org/experiments/rtp-hdrext/playout-delay",
);
videoMedia.addExtension(
getIdForHeader(
rtpHeaders,
"http://www.ietf.org/id/draft-holmer-rmcat-transport-wide-cc-extensions-01",
),
"http://www.ietf.org/id/draft-holmer-rmcat-transport-wide-cc-extensions-01",
);
if (audioCodec.type === 111)
// if this is chromium, apply this header
videoMedia.addExtension(
getIdForHeader(rtpHeaders, "urn:3gpp:video-orientation"),
"urn:3gpp:video-orientation",
);
offer.medias = [audioMedia, videoMedia];
const transport = this._endpoint.createTransport(offer);
transport.setRemoteProperties(offer);
channel?.onClientOffer(client, transport);
const dtls = transport.getLocalDTLSInfo();
const ice = transport.getLocalICEInfo();
const fingerprint = dtls.getHash() + " " + dtls.getFingerprint();
const candidates = transport.getLocalCandidates();
const candidate = candidates[0];
const answer =
`m=audio ${this.port} ICE/SDP\n` +
`a=fingerprint:${fingerprint}\n` +
`c=IN IP4 ${this.ip}\n` +
`a=rtcp:${this.port}\n` +
`a=ice-ufrag:${ice.getUfrag()}\n` +
`a=ice-pwd:${ice.getPwd()}\n` +
`a=fingerprint:${fingerprint}\n` +
`a=candidate:1 1 ${candidate.getTransport()} ${candidate.getFoundation()} ${candidate.getAddress()} ${candidate.getPort()} typ host\n`;
return Promise.resolve(answer);
}
public onClientClose = (client: WebRtcClient<any>) => {
this._channels.get(client.channel_id)?.onClientLeave(client);
};
public updateSDP(offer: string): void {
throw new Error("Method not implemented.");
}
public createChannel(channelId: string): void {
this._channels.set(channelId, new VoiceChannel(channelId, this));
}
public disposeChannelRouter(channelId: string): void {
this._channels.delete(channelId);
}
get channels(): Map<string, VoiceChannel> {
return this._channels;
}
public getClientsForChannel(channelId: string): Set<WebRtcClient<any>> {
if (!this._channels.has(channelId)) {
return new Set();
}
return new Set(this._channels.get(channelId)?.clients.values())!;
}
private getClientForUserId = (
userId: string,
): MedoozeWebRtcClient | undefined => {
for (const channel of this.channels.values()) {
let result = channel.getClientById(userId);
if (result) {
return result;
}
}
return undefined;
};
get ip(): string {
return this._ip;
}
get port(): number {
return this._port;
}
get endpoint(): Endpoint {
return this._endpoint;
}
public stop(): Promise<void> {
return Promise.resolve();
}
}

View File

@ -0,0 +1,152 @@
import {
IncomingStream,
OutgoingStream,
Transport,
} from "@dank074/medooze-media-server";
import { SSRCs, WebRtcClient } from "webrtc/util";
import { VoiceChannel } from "./VoiceChannel";
export class MedoozeWebRtcClient implements WebRtcClient<any> {
websocket: any;
user_id: string;
channel_id: string;
webrtcConnected: boolean;
public transport?: Transport;
public incomingStream?: IncomingStream;
public outgoingStream?: OutgoingStream;
public channel?: VoiceChannel;
public isStopped?: boolean;
constructor(
userId: string,
channelId: string,
websocket: any,
channel: VoiceChannel,
) {
this.user_id = userId;
this.channel_id = channelId;
this.websocket = websocket;
this.channel = channel;
this.webrtcConnected = false;
this.isStopped = false;
}
public isProducingAudio(): boolean {
if (!this.webrtcConnected) return false;
const audioTrack = this.incomingStream?.getTrack(
`audio-${this.user_id}`,
);
if (audioTrack) return true;
return false;
}
public isProducingVideo(): boolean {
if (!this.webrtcConnected) return false;
const videoTrack = this.incomingStream?.getTrack(
`video-${this.user_id}`,
);
if (videoTrack) return true;
return false;
}
public getIncomingStreamSSRCs(): SSRCs {
if (!this.webrtcConnected)
return { audio_ssrc: 0, video_ssrc: 0, rtx_ssrc: 0 };
const audioTrack = this.incomingStream?.getTrack(
`audio-${this.user_id}`,
);
const audio_ssrc =
audioTrack?.getSSRCs()[audioTrack.getDefaultEncoding().id];
const videoTrack = this.incomingStream?.getTrack(
`video-${this.user_id}`,
);
const video_ssrc =
videoTrack?.getSSRCs()[videoTrack.getDefaultEncoding().id];
return {
audio_ssrc: audio_ssrc?.media ?? 0,
video_ssrc: video_ssrc?.media ?? 0,
rtx_ssrc: video_ssrc?.rtx ?? 0,
};
}
public getOutgoingStreamSSRCsForUser(user_id: string): SSRCs {
const outgoingStream = this.outgoingStream;
const audioTrack = outgoingStream?.getTrack(`audio-${user_id}`);
const audio_ssrc = audioTrack?.getSSRCs();
const videoTrack = outgoingStream?.getTrack(`video-${user_id}`);
const video_ssrc = videoTrack?.getSSRCs();
return {
audio_ssrc: audio_ssrc?.media ?? 0,
video_ssrc: video_ssrc?.media ?? 0,
rtx_ssrc: video_ssrc?.rtx ?? 0,
};
}
public publishTrack(type: "audio" | "video", ssrc: SSRCs) {
if (!this.transport) return;
const id = `${type}-${this.user_id}`;
const existingTrack = this.incomingStream?.getTrack(id);
if (existingTrack) {
console.error(`error: attempted to create duplicate track ${id}`);
return;
}
let ssrcs;
if (type === "audio") {
ssrcs = { media: ssrc.audio_ssrc! };
} else {
ssrcs = { media: ssrc.video_ssrc!, rtx: ssrc.rtx_ssrc };
}
const track = this.transport?.createIncomingStreamTrack(
type,
{ id, ssrcs: ssrcs, media: type },
this.incomingStream,
);
//this.channel?.onClientPublishTrack(this, track, ssrcs);
}
public subscribeToTrack(user_id: string, type: "audio" | "video") {
if (!this.transport) return;
const id = `${type}-${user_id}`;
const otherClient = this.channel?.getClientById(user_id);
const incomingStream = otherClient?.incomingStream;
const incomingTrack = incomingStream?.getTrack(id);
if (!incomingTrack) {
console.error(`error subscribing, not track found ${id}`);
return;
}
let ssrcs;
if (type === "audio") {
ssrcs = {
media: otherClient?.getIncomingStreamSSRCs().audio_ssrc!,
};
} else {
ssrcs = {
media: otherClient?.getIncomingStreamSSRCs().video_ssrc!,
rtx: otherClient?.getIncomingStreamSSRCs().rtx_ssrc,
};
}
const outgoingTrack = this.transport?.createOutgoingStreamTrack(
incomingTrack.media,
{ id, ssrcs, media: incomingTrack.media },
this.outgoingStream,
);
outgoingTrack?.attachTo(incomingTrack);
}
}

View File

@ -0,0 +1,100 @@
import { MedoozeSignalingDelegate } from "./MedoozeSignalingDelegate";
import {
IncomingStreamTrack,
SSRCs,
Transport,
} from "@dank074/medooze-media-server";
import { MedoozeWebRtcClient } from "./MedoozeWebRtcClient";
import { StreamInfo } from "semantic-sdp";
export class VoiceChannel {
private _clients: Map<string, MedoozeWebRtcClient>;
private _id: string;
private _sfu: MedoozeSignalingDelegate;
constructor(id: string, sfu: MedoozeSignalingDelegate) {
this._id = id;
this._clients = new Map();
this._sfu = sfu;
}
onClientJoin = (client: MedoozeWebRtcClient) => {
// do shit here
this._clients.set(client.user_id, client);
};
onClientOffer = (client: MedoozeWebRtcClient, transport: Transport) => {
client.transport = transport;
client.transport.on("dtlsstate", (state, self) => {
if (state === "connected") {
client.webrtcConnected = true;
console.log("connected");
}
});
client.incomingStream = transport.createIncomingStream(
new StreamInfo(`in-${client.user_id}`),
);
client.outgoingStream = transport.createOutgoingStream(
new StreamInfo(`out-${client.user_id}`),
);
client.webrtcConnected = true;
// subscribe to all current streams from this channel
// for(const otherClient of this._clients.values()) {
// const incomingStream = otherClient.incomingStream
// if(!incomingStream) continue;
// for(const track of (incomingStream.getTracks())) {
// client.subscribeToTrack(otherClient.user_id, track.media)
// }
// }
};
onClientLeave = (client: MedoozeWebRtcClient) => {
console.log("stopping client");
this._clients.delete(client.user_id);
// stop the client
if (!client.isStopped) {
client.isStopped = true;
for (const otherClient of this.clients.values()) {
//remove outgoing track for this user
otherClient.outgoingStream
?.getTrack(`audio-${client.user_id}`)
?.stop();
otherClient.outgoingStream
?.getTrack(`video-${client.user_id}`)
?.stop();
}
client.incomingStream?.stop();
client.outgoingStream?.stop();
client.transport?.stop();
client.channel = undefined;
client.incomingStream = undefined;
client.outgoingStream = undefined;
client.transport = undefined;
client.websocket = undefined;
}
};
get clients(): Map<string, MedoozeWebRtcClient> {
return this._clients;
}
getClientById = (id: string) => {
return this._clients.get(id);
};
get id(): string {
return this._id;
}
}

View File

@ -22,9 +22,7 @@ import {
VoiceIdentifySchema, VoiceIdentifySchema,
VoiceState, VoiceState,
} from "@spacebar/util"; } from "@spacebar/util";
import { endpoint, getClients, VoiceOPCodes, PublicIP } from "@spacebar/webrtc"; import { mediaServer, VoiceOPCodes } from "@spacebar/webrtc";
import SemanticSDP from "semantic-sdp";
const defaultSDP = require("./sdp.json");
export async function onIdentify(this: WebSocket, data: Payload) { export async function onIdentify(this: WebSocket, data: Payload) {
clearTimeout(this.readyTimeout); clearTimeout(this.readyTimeout);
@ -38,53 +36,33 @@ export async function onIdentify(this: WebSocket, data: Payload) {
this.user_id = user_id; this.user_id = user_id;
this.session_id = session_id; this.session_id = session_id;
const sdp = SemanticSDP.SDPInfo.expand(defaultSDP);
sdp.setDTLS(
SemanticSDP.DTLSInfo.expand({
setup: "actpass",
hash: "sha-256",
fingerprint: endpoint.getDTLSFingerprint(),
}),
);
this.client = { this.client = mediaServer.join(voiceState.channel_id, this.user_id, this);
websocket: this,
out: {
tracks: new Map(),
},
in: {
audio_ssrc: 0,
video_ssrc: 0,
rtx_ssrc: 0,
},
sdp,
channel_id: voiceState.channel_id,
};
const clients = getClients(voiceState.channel_id)!;
clients.add(this.client);
this.on("close", () => { this.on("close", () => {
clients.delete(this.client!); mediaServer.onClientClose(this.client!);
}); });
await Send(this, { await Send(this, {
op: VoiceOPCodes.READY, op: VoiceOPCodes.READY,
d: { d: {
streams: [ streams: streams?.map((x) => ({
// { type: "video", ssrc: this.ssrc + 1, rtx_ssrc: this.ssrc + 2, rid: "100", quality: 100, active: false } ...x,
], ssrc: 2,
ssrc: -1, rtx_ssrc: 3,
port: endpoint.getLocalPort(), })),
ssrc: 1,
port: mediaServer.port,
modes: [ modes: [
"aead_aes256_gcm_rtpsize", "aead_aes256_gcm_rtpsize",
"aead_aes256_gcm", "aead_aes256_gcm",
"aead_xchacha20_poly1305_rtpsize",
"xsalsa20_poly1305_lite_rtpsize", "xsalsa20_poly1305_lite_rtpsize",
"xsalsa20_poly1305_lite", "xsalsa20_poly1305_lite",
"xsalsa20_poly1305_suffix", "xsalsa20_poly1305_suffix",
"xsalsa20_poly1305", "xsalsa20_poly1305",
], ],
ip: PublicIP, ip: mediaServer.ip,
experiments: [], experiments: [],
}, },
}); });

View File

@ -18,8 +18,7 @@
import { Payload, Send, WebSocket } from "@spacebar/gateway"; import { Payload, Send, WebSocket } from "@spacebar/gateway";
import { SelectProtocolSchema, validateSchema } from "@spacebar/util"; import { SelectProtocolSchema, validateSchema } from "@spacebar/util";
import { PublicIP, VoiceOPCodes, endpoint } from "@spacebar/webrtc"; import { VoiceOPCodes, mediaServer } from "@spacebar/webrtc";
import SemanticSDP, { MediaInfo, SDPInfo } from "semantic-sdp";
export async function onSelectProtocol(this: WebSocket, payload: Payload) { export async function onSelectProtocol(this: WebSocket, payload: Payload) {
if (!this.client) return; if (!this.client) return;
@ -29,31 +28,11 @@ export async function onSelectProtocol(this: WebSocket, payload: Payload) {
payload.d, payload.d,
) as SelectProtocolSchema; ) as SelectProtocolSchema;
const offer = SemanticSDP.SDPInfo.parse("m=audio\n" + data.sdp!); const answer = await mediaServer.onOffer(
this.client.sdp!.setICE(offer.getICE()); this.client,
this.client.sdp!.setDTLS(offer.getDTLS()); data.sdp!,
data.codecs ?? [],
const transport = endpoint.createTransport(this.client.sdp!); );
this.client.transport = transport;
transport.setRemoteProperties(this.client.sdp!);
transport.setLocalProperties(this.client.sdp!);
const dtls = transport.getLocalDTLSInfo();
const ice = transport.getLocalICEInfo();
const port = endpoint.getLocalPort();
const fingerprint = dtls.getHash() + " " + dtls.getFingerprint();
const candidates = transport.getLocalCandidates();
const candidate = candidates[0];
const answer =
`m=audio ${port} ICE/SDP` +
`a=fingerprint:${fingerprint}` +
`c=IN IP4 ${PublicIP}` +
`a=rtcp:${port}` +
`a=ice-ufrag:${ice.getUfrag()}` +
`a=ice-pwd:${ice.getPwd()}` +
`a=fingerprint:${fingerprint}` +
`a=candidate:1 1 ${candidate.getTransport()} ${candidate.getFoundation()} ${candidate.getAddress()} ${candidate.getPort()} typ host`;
await Send(this, { await Send(this, {
op: VoiceOPCodes.SESSION_DESCRIPTION, op: VoiceOPCodes.SESSION_DESCRIPTION,

View File

@ -17,24 +17,27 @@
*/ */
import { Payload, Send, WebSocket } from "@spacebar/gateway"; import { Payload, Send, WebSocket } from "@spacebar/gateway";
import { getClients, VoiceOPCodes } from "../util"; import { mediaServer, VoiceOPCodes } from "../util";
// {"speaking":1,"delay":5,"ssrc":2805246727} // {"speaking":1,"delay":5,"ssrc":2805246727}
export async function onSpeaking(this: WebSocket, data: Payload) { export async function onSpeaking(this: WebSocket, data: Payload) {
if (!this.client) return; if (!this.client) return;
getClients(this.client.channel_id).forEach((client) => { mediaServer
if (client === this.client) return; .getClientsForChannel<WebSocket>(this.client.channel_id)
const ssrc = this.client!.out.tracks.get(client.websocket.user_id); .forEach((client) => {
if (client.user_id === this.user_id) return;
Send(client.websocket, { const ssrc = client.getOutgoingStreamSSRCsForUser(this.user_id);
op: VoiceOPCodes.SPEAKING,
d: { Send(client.websocket, {
user_id: client.websocket.user_id, op: VoiceOPCodes.SPEAKING,
speaking: data.d.speaking, d: {
ssrc: ssrc?.audio_ssrc || 0, user_id: this.user_id,
}, speaking: data.d.speaking,
ssrc: ssrc.audio_ssrc ?? 0,
},
});
}); });
});
} }

View File

@ -18,134 +18,98 @@
import { Payload, Send, WebSocket } from "@spacebar/gateway"; import { Payload, Send, WebSocket } from "@spacebar/gateway";
import { validateSchema, VoiceVideoSchema } from "@spacebar/util"; import { validateSchema, VoiceVideoSchema } from "@spacebar/util";
import { channels, getClients, VoiceOPCodes } from "@spacebar/webrtc"; import { mediaServer, VoiceOPCodes, WebRtcClient } from "@spacebar/webrtc";
import { IncomingStreamTrack, SSRCs } from "medooze-media-server";
import SemanticSDP from "semantic-sdp";
export async function onVideo(this: WebSocket, payload: Payload) { export async function onVideo(this: WebSocket, payload: Payload) {
if (!this.client) return; if (!this.client || !this.client.webrtcConnected) return;
const { transport, channel_id } = this.client; const { channel_id } = this.client;
if (!transport) return;
const d = validateSchema("VoiceVideoSchema", payload.d) as VoiceVideoSchema; const d = validateSchema("VoiceVideoSchema", payload.d) as VoiceVideoSchema;
const stream = d.streams?.find((element) => element !== undefined);
await Send(this, { op: VoiceOPCodes.MEDIA_SINK_WANTS, d: { any: 100 } }); await Send(this, { op: VoiceOPCodes.MEDIA_SINK_WANTS, d: { any: 100 } });
const id = "stream" + this.user_id; const ssrcs = this.client.getIncomingStreamSSRCs();
var stream = this.client.in.stream!; const clientsThatNeedUpdate = new Set<WebRtcClient<WebSocket>>();
if (!stream) {
stream = this.client.transport!.createIncomingStream(
// @ts-ignore
SemanticSDP.StreamInfo.expand({
id,
// @ts-ignore
tracks: [],
}),
);
this.client.in.stream = stream;
const interval = setInterval(() => { // check if client has signaled that it will send audio
for (const track of stream.getTracks()) { if (d.audio_ssrc !== 0) {
for (const layer of Object.values(track.getStats())) { // check if we already have incoming media for this ssrcs, if not, publish a new audio track for it
console.log(track.getId(), layer.total); if (ssrcs.audio_ssrc != d.audio_ssrc) {
} console.log(
`[${this.user_id}] publishing new audio track ssrc:${d.audio_ssrc}`,
);
this.client.publishTrack("audio", { audio_ssrc: d.audio_ssrc });
}
// now check that all clients have outgoing media for this ssrcs
for (const client of mediaServer.getClientsForChannel<WebSocket>(
channel_id,
)) {
if (client.user_id === this.user_id) continue;
const ssrcs = client.getOutgoingStreamSSRCsForUser(this.user_id);
if (ssrcs.audio_ssrc != d.audio_ssrc) {
console.log(
`[${client.user_id}] subscribing to audio track ssrcs: ${d.audio_ssrc}`,
);
client.subscribeToTrack(this.client.user_id, "audio");
clientsThatNeedUpdate.add(client);
} }
}, 5000); }
}
stream.on("stopped", () => { // check if client has signaled that it will send video
console.log("stream stopped"); if (d.video_ssrc !== 0 && stream?.active) {
clearInterval(interval); // check if we already have incoming media for this ssrcs, if not, publish a new video track for it
}); if (ssrcs.video_ssrc != d.video_ssrc) {
this.on("close", () => { console.log(
transport!.stop(); `[${this.user_id}] publishing new video track ssrc:${d.video_ssrc}`,
}); );
const out = transport.createOutgoingStream( this.client.publishTrack("video", {
// @ts-ignore video_ssrc: d.video_ssrc,
SemanticSDP.StreamInfo.expand({ rtx_ssrc: d.rtx_ssrc,
id: "out" + this.user_id,
// @ts-ignore
tracks: [],
}),
);
this.client.out.stream = out;
const clients = channels.get(channel_id)!;
clients.forEach((client) => {
if (client.websocket.user_id === this.user_id) return;
if (!client.in.stream) return;
client.in.stream?.getTracks().forEach((track) => {
attachTrack.call(this, track, client.websocket.user_id);
}); });
}); }
// now check that all clients have outgoing media for this ssrcs
for (const client of mediaServer.getClientsForChannel<WebSocket>(
channel_id,
)) {
if (client.user_id === this.user_id) continue;
const ssrcs = client.getOutgoingStreamSSRCsForUser(
this.client.user_id,
);
if (ssrcs.video_ssrc != d.video_ssrc) {
console.log(
`[${client.user_id}] subscribing to video track ssrc: ${d.video_ssrc}`,
);
client.subscribeToTrack(this.client.user_id, "video");
clientsThatNeedUpdate.add(client);
}
}
} }
if (d.audio_ssrc) { for (const client of clientsThatNeedUpdate) {
handleSSRC.call(this, "audio", { const ssrcs = client.getOutgoingStreamSSRCsForUser(this.user_id);
media: d.audio_ssrc,
rtx: d.audio_ssrc + 1, Send(client.websocket, {
}); op: VoiceOPCodes.VIDEO,
} d: {
if (d.video_ssrc && d.rtx_ssrc) { user_id: this.user_id,
handleSSRC.call(this, "video", { audio_ssrc: ssrcs.audio_ssrc ?? 0,
media: d.video_ssrc, video_ssrc: ssrcs.video_ssrc ?? 0,
rtx: d.rtx_ssrc, rtx_ssrc: ssrcs.rtx_ssrc ?? 0,
}); streams: d.streams?.map((x) => ({
} ...x,
} ssrc: ssrcs.video_ssrc ?? 0,
rtx_ssrc: ssrcs.rtx_ssrc ?? 0,
function attachTrack( })),
this: WebSocket, } as VoiceVideoSchema,
track: IncomingStreamTrack,
user_id: string,
) {
if (!this.client) return;
const outTrack = this.client.transport!.createOutgoingStreamTrack(
track.getMedia(),
);
outTrack.attachTo(track);
this.client.out.stream!.addTrack(outTrack);
var ssrcs = this.client.out.tracks.get(user_id)!;
if (!ssrcs)
ssrcs = this.client.out.tracks
.set(user_id, { audio_ssrc: 0, rtx_ssrc: 0, video_ssrc: 0 })
.get(user_id)!;
if (track.getMedia() === "audio") {
ssrcs.audio_ssrc = outTrack.getSSRCs().media!;
} else if (track.getMedia() === "video") {
ssrcs.video_ssrc = outTrack.getSSRCs().media!;
ssrcs.rtx_ssrc = outTrack.getSSRCs().rtx!;
}
Send(this, {
op: VoiceOPCodes.VIDEO,
d: {
user_id: user_id,
...ssrcs,
} as VoiceVideoSchema,
});
}
function handleSSRC(this: WebSocket, type: "audio" | "video", ssrcs: SSRCs) {
if (!this.client) return;
const stream = this.client.in.stream!;
const transport = this.client.transport!;
const id = type + ssrcs.media;
var track = stream.getTrack(id);
if (!track) {
console.log("createIncomingStreamTrack", id);
track = transport.createIncomingStreamTrack(type, { id, ssrcs });
stream.addTrack(track);
const clients = getClients(this.client.channel_id)!;
clients.forEach((client) => {
if (client.websocket.user_id === this.user_id) return;
if (!client.out.stream) return;
attachTrack.call(this, track, client.websocket.user_id);
}); });
} }
} }

View File

@ -34,4 +34,4 @@ export default {
[VoiceOPCodes.VIDEO]: onVideo, [VoiceOPCodes.VIDEO]: onVideo,
[VoiceOPCodes.SPEAKING]: onSpeaking, [VoiceOPCodes.SPEAKING]: onSpeaking,
[VoiceOPCodes.SELECT_PROTOCOL]: onSelectProtocol, [VoiceOPCodes.SELECT_PROTOCOL]: onSelectProtocol,
}; } as { [key: number]: OPCodeHandler };

View File

@ -16,62 +16,7 @@
along with this program. If not, see <https://www.gnu.org/licenses/>. along with this program. If not, see <https://www.gnu.org/licenses/>.
*/ */
import { WebSocket } from "@spacebar/gateway"; import { MedoozeSignalingDelegate } from "../medooze/MedoozeSignalingDelegate";
import MediaServer, { import { SignalingDelegate } from "./SignalingDelegate";
IncomingStream,
OutgoingStream,
Transport,
} from "medooze-media-server";
import SemanticSDP from "semantic-sdp";
MediaServer.enableLog(true);
export const PublicIP = process.env.PUBLIC_IP || "127.0.0.1"; export const mediaServer: SignalingDelegate = new MedoozeSignalingDelegate();
try {
const range = process.env.WEBRTC_PORT_RANGE || "4000";
var ports = range.split("-");
const min = Number(ports[0]);
const max = Number(ports[1]);
MediaServer.setPortRange(min, max);
} catch (error) {
console.error(
"Invalid env var: WEBRTC_PORT_RANGE",
process.env.WEBRTC_PORT_RANGE,
error,
);
process.exit(1);
}
export const endpoint = MediaServer.createEndpoint(PublicIP);
export const channels = new Map<string, Set<Client>>();
export interface Client {
transport?: Transport;
websocket: WebSocket;
out: {
stream?: OutgoingStream;
tracks: Map<
string,
{
audio_ssrc: number;
video_ssrc: number;
rtx_ssrc: number;
}
>;
};
in: {
stream?: IncomingStream;
audio_ssrc: number;
video_ssrc: number;
rtx_ssrc: number;
};
sdp: SemanticSDP.SDPInfo;
channel_id: string;
}
export function getClients(channel_id: string) {
if (!channels.has(channel_id)) channels.set(channel_id, new Set());
return channels.get(channel_id)!;
}

View File

@ -0,0 +1,17 @@
import { Codec, WebRtcClient } from "./WebRtcClient";
export interface SignalingDelegate {
start: () => Promise<void>;
stop: () => Promise<void>;
join<T>(channelId: string, userId: string, ws: T): WebRtcClient<T>;
onOffer<T>(
client: WebRtcClient<T>,
offer: string,
codecs: Codec[],
): Promise<string>;
onClientClose<T>(client: WebRtcClient<T>): void;
updateSDP(offer: string): void;
getClientsForChannel<T>(channelId: string): Set<WebRtcClient<T>>;
get ip(): string;
get port(): number;
}

View File

@ -0,0 +1,31 @@
export interface WebRtcClient<T> {
websocket: T;
user_id: string;
channel_id: string;
webrtcConnected: boolean;
getIncomingStreamSSRCs: () => SSRCs;
getOutgoingStreamSSRCsForUser: (user_id: string) => SSRCs;
isProducingAudio: () => boolean;
isProducingVideo: () => boolean;
publishTrack: (type: "audio" | "video", ssrc: SSRCs) => void;
subscribeToTrack: (user_id: string, type: "audio" | "video") => void;
}
export interface SSRCs {
audio_ssrc?: number;
video_ssrc?: number;
rtx_ssrc?: number;
}
export interface RtpHeader {
uri: string;
id: number;
}
export interface Codec {
name: "opus" | "VP8" | "VP9" | "H264";
type: "audio" | "video";
priority: number;
payload_type: number;
rtx_payload_type?: number;
}

View File

@ -18,3 +18,4 @@
export * from "./Constants"; export * from "./Constants";
export * from "./MediaServer"; export * from "./MediaServer";
export * from "./WebRtcClient";

View File

@ -1,5 +1,4 @@
{ {
"exclude": ["./src/webrtc"],
"include": ["./src"], "include": ["./src"],
"compilerOptions": { "compilerOptions": {
/* Visit https://aka.ms/tsconfig to read more about this file */ /* Visit https://aka.ms/tsconfig to read more about this file */
@ -37,7 +36,8 @@
"@spacebar/api*": ["./api"], "@spacebar/api*": ["./api"],
"@spacebar/gateway*": ["./gateway"], "@spacebar/gateway*": ["./gateway"],
"@spacebar/cdn*": ["./cdn"], "@spacebar/cdn*": ["./cdn"],
"@spacebar/util*": ["./util"] "@spacebar/util*": ["./util"],
"@spacebar/webrtc*": ["./webrtc"]
} /* Specify a set of entries that re-map imports to additional lookup locations. */, } /* Specify a set of entries that re-map imports to additional lookup locations. */,
// "rootDirs": [], /* Allow multiple folders to be treated as one when resolving modules. */ // "rootDirs": [], /* Allow multiple folders to be treated as one when resolving modules. */
// "typeRoots": [], /* Specify multiple folders that act like './node_modules/@types'. */ // "typeRoots": [], /* Specify multiple folders that act like './node_modules/@types'. */