a lot of changes

This commit is contained in:
dank074 2025-04-16 13:40:42 -05:00
parent 96e457323b
commit 77b8d45543
35 changed files with 540 additions and 178 deletions

View File

@ -684108,6 +684108,11 @@
"type": "object", "type": "object",
"properties": { "properties": {
"type": { "type": {
"enum": [
"audio",
"screen",
"video"
],
"type": "string" "type": "string"
}, },
"rid": { "rid": {
@ -694118,6 +694123,7 @@
"type": { "type": {
"enum": [ "enum": [
"audio", "audio",
"screen",
"video" "video"
], ],
"type": "string" "type": "string"
@ -749017,42 +749023,6 @@
"items": { "items": {
"type": "string" "type": "string"
} }
},
"message_reference": {
"type": "object",
"properties": {
"message_id": {
"type": "string"
},
"channel_id": {
"type": "string"
},
"guild_id": {
"type": "string"
},
"fail_if_not_exists": {
"type": "boolean"
}
},
"additionalProperties": false,
"required": [
"message_id"
]
},
"sticker_ids": {
"type": "array",
"items": {
"type": "string"
}
},
"nonce": {
"type": "string"
},
"enforce_nonce": {
"type": "boolean"
},
"poll": {
"$ref": "#/definitions/PollCreationSchema"
} }
}, },
"additionalProperties": false, "additionalProperties": false,

View File

@ -124,7 +124,7 @@
"optionalDependencies": { "optionalDependencies": {
"@yukikaze-bot/erlpack": "^1.0.1", "@yukikaze-bot/erlpack": "^1.0.1",
"jimp": "^1.6.0", "jimp": "^1.6.0",
"@dank074/medooze-media-server": "1.156.3", "@dank074/medooze-media-server": "1.156.4",
"semantic-sdp": "^3.31.1", "semantic-sdp": "^3.31.1",
"mysql": "^2.18.1", "mysql": "^2.18.1",
"nodemailer-mailgun-transport": "^2.1.5", "nodemailer-mailgun-transport": "^2.1.5",

View File

@ -1,5 +1,17 @@
import { Payload, WebSocket } from "@spacebar/gateway"; import {
import { Config, emitEvent, Region, VoiceState } from "@spacebar/util"; genVoiceToken,
Payload,
WebSocket,
generateStreamKey,
} from "@spacebar/gateway";
import {
Config,
emitEvent,
Region,
Snowflake,
Stream,
StreamSession,
} from "@spacebar/util";
interface StreamCreateSchema { interface StreamCreateSchema {
type: "guild" | "call"; type: "guild" | "call";
@ -11,37 +23,65 @@ interface StreamCreateSchema {
export async function onStreamCreate(this: WebSocket, data: Payload) { export async function onStreamCreate(this: WebSocket, data: Payload) {
const body = data.d as StreamCreateSchema; const body = data.d as StreamCreateSchema;
// first check if we are in a voice channel already. cannot create a stream if there's no existing voice connection // TODO: first check if we are in a voice channel already. cannot create a stream if there's no existing voice connection
if (!this.voiceWs || !this.voiceWs.webrtcConnected) return; if (body.channel_id.trim().length === 0) return;
// TODO: permissions check - if it's a guild, check if user is allowed to create stream in this guild // TODO: permissions check - if it's a guild, check if user is allowed to create stream in this guild
// TODO: create a new entry in db (StreamState?) containing the token for authenticating user in stream gateway IDENTIFY
// TODO: actually apply preferred_region from the event payload // TODO: actually apply preferred_region from the event payload
const regions = Config.get().regions; const regions = Config.get().regions;
const guildRegion = regions.available.filter( const guildRegion = regions.available.filter(
(r) => r.id === regions.default, (r) => r.id === regions.default,
)[0]; )[0];
const streamKey = `${body.type}${body.type === "guild" ? ":" + body.guild_id : ""}:${body.channel_id}:${this.user_id}`; // create a new entry in db containing the token for authenticating user in stream gateway IDENTIFY
const stream = Stream.create({
id: Snowflake.generate(),
owner_id: this.user_id,
channel_id: body.channel_id,
endpoint: guildRegion.endpoint,
});
await stream.save();
const token = genVoiceToken();
const streamSession = StreamSession.create({
stream_id: stream.id,
user_id: this.user_id,
session_id: this.session_id,
token,
});
await streamSession.save();
const streamKey = generateStreamKey(
body.type,
body.guild_id,
body.channel_id,
this.user_id,
);
await emitEvent({ await emitEvent({
event: "STREAM_CREATE", event: "STREAM_CREATE",
data: { data: {
stream_key: streamKey, stream_key: streamKey,
rtc_server_id: "lol", // for voice connections in guilds it is guild_id, for dm voice calls it seems to be DM channel id, for GoLive streams a generated number rtc_server_id: stream.id, // for voice connections in guilds it is guild_id, for dm voice calls it seems to be DM channel id, for GoLive streams a generated number
viewer_ids: [],
region: guildRegion.name,
paused: false,
}, },
guild_id: body.guild_id, guild_id: body.guild_id,
//user_id: this.user_id, channel_id: body.channel_id,
}); });
await emitEvent({ await emitEvent({
event: "STREAM_SERVER_UPDATE", event: "STREAM_SERVER_UPDATE",
data: { data: {
token: "TEST", token: streamSession.token,
stream_key: streamKey, stream_key: streamKey,
endpoint: guildRegion.endpoint, guild_id: null, // not sure why its always null
endpoint: stream.endpoint,
}, },
user_id: this.user_id, user_id: this.user_id,
}); });

View File

@ -1,4 +1,5 @@
import { Payload, WebSocket } from "@spacebar/gateway"; import { parseStreamKey, Payload, WebSocket } from "@spacebar/gateway";
import { emitEvent, Stream } from "@spacebar/util";
interface StreamDeleteSchema { interface StreamDeleteSchema {
stream_key: string; stream_key: string;
@ -7,23 +8,28 @@ interface StreamDeleteSchema {
export async function onStreamDelete(this: WebSocket, data: Payload) { export async function onStreamDelete(this: WebSocket, data: Payload) {
const body = data.d as StreamDeleteSchema; const body = data.d as StreamDeleteSchema;
const splitStreamKey = body.stream_key.split(":"); const { userId, channelId, guildId, type } = parseStreamKey(
if (splitStreamKey.length < 3) { body.stream_key,
return this.close(4000, "Invalid stream key"); );
}
const type = splitStreamKey.shift()!; if (this.user_id !== userId) {
let guild_id: string;
if (type === "guild") {
guild_id = splitStreamKey.shift()!;
}
const channel_id = splitStreamKey.shift()!;
const user_id = splitStreamKey.shift()!;
if (this.user_id !== user_id) {
return this.close(4000, "Cannot delete stream for another user"); return this.close(4000, "Cannot delete stream for another user");
} }
// TODO: actually delete stream const stream = await Stream.findOne({
where: { channel_id: channelId, owner_id: userId },
});
if (!stream) return this.close(4000, "Invalid stream key");
await stream.remove();
await emitEvent({
event: "STREAM_DELETE",
data: {
stream_key: body.stream_key,
},
guild_id: guildId,
channel_id: channelId,
});
} }

View File

@ -1,4 +1,10 @@
import { Payload, WebSocket } from "@spacebar/gateway"; import {
genVoiceToken,
parseStreamKey,
Payload,
WebSocket,
} from "@spacebar/gateway";
import { Config, emitEvent, Stream, StreamSession } from "@spacebar/util";
interface StreamWatchSchema { interface StreamWatchSchema {
stream_key: string; stream_key: string;
@ -7,17 +13,60 @@ interface StreamWatchSchema {
export async function onStreamWatch(this: WebSocket, data: Payload) { export async function onStreamWatch(this: WebSocket, data: Payload) {
const body = data.d as StreamWatchSchema; const body = data.d as StreamWatchSchema;
const splitStreamKey = body.stream_key.split(":"); // TODO: apply perms: check if user is allowed to watch
if (splitStreamKey.length < 3) { try {
const { type, channelId, guildId, userId } = parseStreamKey(
body.stream_key,
);
const stream = await Stream.findOneOrFail({
where: { channel_id: channelId, owner_id: userId },
});
const streamSession = StreamSession.create({
stream_id: stream.id,
user_id: this.user_id,
session_id: this.session_id,
token: genVoiceToken(),
});
await streamSession.save();
const regions = Config.get().regions;
const guildRegion = regions.available.find(
(r) => r.endpoint === stream.endpoint,
);
if (!guildRegion) return this.close(4000, "Unknown region");
const viewers = await StreamSession.find({
where: { stream_id: stream.id },
});
await emitEvent({
event: "STREAM_CREATE",
data: {
stream_key: body.stream_key,
rtc_server_id: stream.id, // for voice connections in guilds it is guild_id, for dm voice calls it seems to be DM channel id, for GoLive streams a generated number
viewer_ids: viewers.map((v) => v.user_id),
region: guildRegion.name,
paused: false,
},
guild_id: guildId,
channel_id: channelId,
});
await emitEvent({
event: "STREAM_SERVER_UPDATE",
data: {
token: streamSession.token,
stream_key: body.stream_key,
guild_id: null, // not sure why its always null
endpoint: stream.endpoint,
},
user_id: this.user_id,
});
} catch (e) {
return this.close(4000, "Invalid stream key"); return this.close(4000, "Invalid stream key");
} }
const type = splitStreamKey.shift()!;
let guild_id: string;
if (type === "guild") {
guild_id = splitStreamKey.shift()!;
}
const channel_id = splitStreamKey.shift()!;
const user_id = splitStreamKey.shift()!;
} }

View File

@ -42,6 +42,8 @@ export async function onVoiceStateUpdate(this: WebSocket, data: Payload) {
const isNew = body.channel_id === null && body.guild_id === null; const isNew = body.channel_id === null && body.guild_id === null;
let isChanged = false; let isChanged = false;
let prevState;
let voiceState: VoiceState; let voiceState: VoiceState;
try { try {
voiceState = await VoiceState.findOneOrFail({ voiceState = await VoiceState.findOneOrFail({
@ -60,6 +62,7 @@ export async function onVoiceStateUpdate(this: WebSocket, data: Payload) {
//If a user change voice channel between guild we should send a left event first //If a user change voice channel between guild we should send a left event first
if ( if (
voiceState.guild_id &&
voiceState.guild_id !== body.guild_id && voiceState.guild_id !== body.guild_id &&
voiceState.session_id === this.session_id voiceState.session_id === this.session_id
) { ) {
@ -71,7 +74,8 @@ export async function onVoiceStateUpdate(this: WebSocket, data: Payload) {
} }
//The event send by Discord's client on channel leave has both guild_id and channel_id as null //The event send by Discord's client on channel leave has both guild_id and channel_id as null
if (body.guild_id === null) body.guild_id = voiceState.guild_id; //if (body.guild_id === null) body.guild_id = voiceState.guild_id;
prevState = { ...voiceState };
voiceState.assign(body); voiceState.assign(body);
} catch (error) { } catch (error) {
voiceState = VoiceState.create({ voiceState = VoiceState.create({
@ -83,34 +87,46 @@ export async function onVoiceStateUpdate(this: WebSocket, data: Payload) {
}); });
} }
// 'Fix' for this one voice state error. TODO: Find out why this is sent // if user left voice channel, send an update to previous channel/guild to let other people know that the user left
// It seems to be sent on client load, if (
// so maybe its trying to find which server you were connected to before disconnecting, if any? voiceState.session_id === this.session_id &&
if (body.guild_id == null) { body.guild_id == null &&
return; body.channel_id == null &&
(prevState?.guild_id || prevState?.channel_id)
) {
await emitEvent({
event: "VOICE_STATE_UPDATE",
data: { ...voiceState, channel_id: null, guild_id: null },
guild_id: prevState?.guild_id,
channel_id: prevState?.channel_id,
});
} }
//TODO the member should only have these properties: hoisted_role, deaf, joined_at, mute, roles, user //TODO the member should only have these properties: hoisted_role, deaf, joined_at, mute, roles, user
//TODO the member.user should only have these properties: avatar, discriminator, id, username //TODO the member.user should only have these properties: avatar, discriminator, id, username
//TODO this may fail //TODO this may fail
voiceState.member = await Member.findOneOrFail({ if (body.guild_id) {
where: { id: voiceState.user_id, guild_id: voiceState.guild_id }, voiceState.member = await Member.findOneOrFail({
relations: ["user", "roles"], where: { id: voiceState.user_id, guild_id: voiceState.guild_id },
}); relations: ["user", "roles"],
});
}
//If the session changed we generate a new token //If the session changed we generate a new token
if (voiceState.session_id !== this.session_id) if (voiceState.session_id !== this.session_id)
voiceState.token = genVoiceToken(); voiceState.token = genVoiceToken();
voiceState.session_id = this.session_id; voiceState.session_id = this.session_id;
const { id, ...newObj } = voiceState; const { id, member, ...newObj } = voiceState;
await Promise.all([ await Promise.all([
voiceState.save(), voiceState.save(),
emitEvent({ emitEvent({
event: "VOICE_STATE_UPDATE", event: "VOICE_STATE_UPDATE",
data: newObj, data: { ...newObj, member: member?.toPublicMember() },
guild_id: voiceState.guild_id, guild_id: voiceState.guild_id,
channel_id: voiceState.channel_id,
user_id: voiceState.user_id,
} as VoiceStateUpdateEvent), } as VoiceStateUpdateEvent),
]); ]);
@ -137,8 +153,10 @@ export async function onVoiceStateUpdate(this: WebSocket, data: Payload) {
token: voiceState.token, token: voiceState.token,
guild_id: voiceState.guild_id, guild_id: voiceState.guild_id,
endpoint: guildRegion.endpoint, endpoint: guildRegion.endpoint,
channel_id: voiceState.guild_id
? undefined
: voiceState.channel_id, // only DM voice calls have this set, and DM channel is one where guild_id is null
}, },
guild_id: voiceState.guild_id,
user_id: voiceState.user_id, user_id: voiceState.user_id,
} as VoiceServerUpdateEvent); } as VoiceServerUpdateEvent);
} }

View File

@ -16,8 +16,6 @@
along with this program. If not, see <https://www.gnu.org/licenses/>. along with this program. If not, see <https://www.gnu.org/licenses/>.
*/ */
import { VoiceOPCodes } from "@spacebar/webrtc";
export enum OPCODES { export enum OPCODES {
Dispatch = 0, Dispatch = 0,
Heartbeat = 1, Heartbeat = 1,
@ -63,7 +61,7 @@ export enum CLOSECODES {
} }
export interface Payload { export interface Payload {
op: OPCODES | VoiceOPCodes; op: OPCODES;
// eslint-disable-next-line @typescript-eslint/no-explicit-any // eslint-disable-next-line @typescript-eslint/no-explicit-any
d?: any; d?: any;
s?: number; s?: number;

43
src/gateway/util/Utils.ts Normal file
View File

@ -0,0 +1,43 @@
export function parseStreamKey(streamKey: string): {
type: "guild" | "call";
channelId: string;
guildId?: string;
userId: string;
} {
const streamKeyArray = streamKey.split(":");
const type = streamKeyArray.shift();
if (type !== "guild" && type !== "call") {
throw new Error(`Invalid stream key type: ${type}`);
}
if (
(type === "guild" && streamKeyArray.length < 3) ||
(type === "call" && streamKeyArray.length < 2)
)
throw new Error(`Invalid stream key: ${streamKey}`); // invalid stream key
let guildId: string | undefined;
if (type === "guild") {
guildId = streamKeyArray.shift();
}
const channelId = streamKeyArray.shift();
const userId = streamKeyArray.shift();
if (!channelId || !userId) {
throw new Error(`Invalid stream key: ${streamKey}`);
}
return { type, channelId, guildId, userId };
}
export function generateStreamKey(
type: "guild" | "call",
guildId: string | undefined,
channelId: string,
userId: string,
): string {
const streamKey = `${type}${type === "guild" ? `:${guildId}` : ""}:${channelId}:${userId}`;
return streamKey;
}

View File

@ -20,7 +20,6 @@ import { Intents, ListenEventOpts, Permissions } from "@spacebar/util";
import WS from "ws"; import WS from "ws";
import { Deflate, Inflate } from "fast-zlib"; import { Deflate, Inflate } from "fast-zlib";
import { Capabilities } from "./Capabilities"; import { Capabilities } from "./Capabilities";
import { WebRtcClient } from "@spacebar/webrtc";
export interface WebSocket extends WS { export interface WebSocket extends WS {
version: number; version: number;
@ -42,7 +41,5 @@ export interface WebSocket extends WS {
member_events: Record<string, () => unknown>; member_events: Record<string, () => unknown>;
listen_options: ListenEventOpts; listen_options: ListenEventOpts;
capabilities?: Capabilities; capabilities?: Capabilities;
voiceWs?: WebRtcClient<WebSocket>;
streamWs?: WebRtcClient<WebSocket>;
large_threshold: number; large_threshold: number;
} }

View File

@ -22,3 +22,4 @@ export * from "./SessionUtils";
export * from "./Heartbeat"; export * from "./Heartbeat";
export * from "./WebSocket"; export * from "./WebSocket";
export * from "./Capabilities"; export * from "./Capabilities";
export * from "./Utils";

View File

@ -0,0 +1,42 @@
import {
Column,
Entity,
JoinColumn,
ManyToOne,
OneToMany,
RelationId,
} from "typeorm";
import { BaseClass } from "./BaseClass";
import { dbEngine } from "../util/Database";
import { User } from "./User";
import { Channel } from "./Channel";
import { StreamSession } from "./StreamSession";
@Entity({
name: "streams",
engine: dbEngine,
})
export class Stream extends BaseClass {
@Column()
@RelationId((stream: Stream) => stream.owner)
owner_id: string;
@JoinColumn({ name: "owner_id" })
@ManyToOne(() => User, {
onDelete: "CASCADE",
})
owner: User;
@Column()
@RelationId((stream: Stream) => stream.channel)
channel_id: string;
@JoinColumn({ name: "channel_id" })
@ManyToOne(() => Channel, {
onDelete: "CASCADE",
})
channel: Channel;
@Column()
endpoint: string;
}

View File

@ -0,0 +1,48 @@
import {
Column,
Entity,
JoinColumn,
ManyToOne,
OneToMany,
RelationId,
} from "typeorm";
import { BaseClass } from "./BaseClass";
import { dbEngine } from "../util/Database";
import { User } from "./User";
import { Stream } from "./Stream";
@Entity({
name: "stream_sessions",
engine: dbEngine,
})
export class StreamSession extends BaseClass {
@Column()
@RelationId((session: StreamSession) => session.stream)
stream_id: string;
@JoinColumn({ name: "stream_id" })
@ManyToOne(() => Stream, {
onDelete: "CASCADE",
})
stream: Stream;
@Column()
@RelationId((session: StreamSession) => session.user)
user_id: string;
@JoinColumn({ name: "user_id" })
@ManyToOne(() => User, {
onDelete: "CASCADE",
})
user: User;
@Column({ nullable: true })
token: string;
// this is for gateway session
@Column()
session_id: string;
@Column({ default: false })
used: boolean;
}

View File

@ -47,6 +47,8 @@ export * from "./SecurityKey";
export * from "./Session"; export * from "./Session";
export * from "./Sticker"; export * from "./Sticker";
export * from "./StickerPack"; export * from "./StickerPack";
export * from "./Stream";
export * from "./StreamSession";
export * from "./Team"; export * from "./Team";
export * from "./TeamMember"; export * from "./TeamMember";
export * from "./Template"; export * from "./Template";

View File

@ -440,8 +440,9 @@ export interface VoiceServerUpdateEvent extends Event {
event: "VOICE_SERVER_UPDATE"; event: "VOICE_SERVER_UPDATE";
data: { data: {
token: string; token: string;
guild_id: string; guild_id: string | null;
endpoint: string; endpoint: string;
channel_id?: string;
}; };
} }
@ -700,6 +701,7 @@ export type EVENT =
| "VOICE_SERVER_UPDATE" | "VOICE_SERVER_UPDATE"
| "STREAM_CREATE" | "STREAM_CREATE"
| "STREAM_SERVER_UPDATE" | "STREAM_SERVER_UPDATE"
| "STREAM_DELETE"
| "APPLICATION_COMMAND_CREATE" | "APPLICATION_COMMAND_CREATE"
| "APPLICATION_COMMAND_UPDATE" | "APPLICATION_COMMAND_UPDATE"
| "APPLICATION_COMMAND_DELETE" | "APPLICATION_COMMAND_DELETE"

View File

@ -23,7 +23,7 @@ export interface VoiceIdentifySchema {
token: string; token: string;
video?: boolean; video?: boolean;
streams?: { streams?: {
type: string; type: "video" | "audio" | "screen";
rid: string; rid: string;
quality: number; quality: number;
}[]; }[];

View File

@ -22,7 +22,7 @@ export interface VoiceVideoSchema {
rtx_ssrc?: number; rtx_ssrc?: number;
user_id?: string; user_id?: string;
streams?: { streams?: {
type: "video" | "audio"; type: "video" | "audio" | "screen";
rid: string; rid: string;
ssrc: number; ssrc: number;
active: boolean; active: boolean;

View File

@ -23,9 +23,9 @@ import { EVENT, Event } from "../interfaces";
export const events = new EventEmitter(); export const events = new EventEmitter();
export async function emitEvent(payload: Omit<Event, "created_at">) { export async function emitEvent(payload: Omit<Event, "created_at">) {
const id = (payload.channel_id || const id = (payload.guild_id ||
payload.user_id || payload.channel_id ||
payload.guild_id) as string; payload.user_id) as string;
if (!id) return console.error("event doesn't contain any id", payload); if (!id) return console.error("event doesn't contain any id", payload);
if (RabbitMQ.connection) { if (RabbitMQ.connection) {

View File

@ -22,6 +22,7 @@ import http from "http";
import ws from "ws"; import ws from "ws";
import { Connection } from "./events/Connection"; import { Connection } from "./events/Connection";
import { mediaServer } from "./util/MediaServer"; import { mediaServer } from "./util/MediaServer";
import { green, yellow } from "picocolors";
dotenv.config(); dotenv.config();
export class Server { export class Server {
@ -70,16 +71,22 @@ export class Server {
await initDatabase(); await initDatabase();
await Config.init(); await Config.init();
await initEvent(); await initEvent();
// if we failed to load webrtc library
if (!mediaServer) {
console.log(`[WebRTC] ${yellow("WEBRTC disabled")}`);
return Promise.resolve();
}
await mediaServer.start(); await mediaServer.start();
if (!this.server.listening) { if (!this.server.listening) {
this.server.listen(this.port); this.server.listen(this.port);
console.log(`[WebRTC] online on 0.0.0.0:${this.port}`); console.log(`[WebRTC] ${green(`online on 0.0.0.0:${this.port}`)}`);
} }
} }
async stop() { async stop() {
closeDatabase(); closeDatabase();
this.server.close(); this.server.close();
mediaServer.stop(); mediaServer?.stop();
} }
} }

View File

@ -16,11 +16,11 @@
along with this program. If not, see <https://www.gnu.org/licenses/>. along with this program. If not, see <https://www.gnu.org/licenses/>.
*/ */
import { CLOSECODES, Send, setHeartbeat, WebSocket } from "@spacebar/gateway"; import { CLOSECODES, setHeartbeat } from "@spacebar/gateway";
import { IncomingMessage } from "http"; import { IncomingMessage } from "http";
import { URL } from "url"; import { URL } from "url";
import WS from "ws"; import WS from "ws";
import { VoiceOPCodes } from "../util"; import { VoiceOPCodes, WebRtcWebSocket, Send } from "../util";
import { onClose } from "./Close"; import { onClose } from "./Close";
import { onMessage } from "./Message"; import { onMessage } from "./Message";
@ -30,7 +30,7 @@ import { onMessage } from "./Message";
export async function Connection( export async function Connection(
this: WS.Server, this: WS.Server,
socket: WebSocket, socket: WebRtcWebSocket,
request: IncomingMessage, request: IncomingMessage,
) { ) {
try { try {

View File

@ -16,10 +16,10 @@
along with this program. If not, see <https://www.gnu.org/licenses/>. along with this program. If not, see <https://www.gnu.org/licenses/>.
*/ */
import { CLOSECODES, Payload, WebSocket } from "@spacebar/gateway"; import { CLOSECODES } from "@spacebar/gateway";
import { Tuple } from "lambert-server"; import { Tuple } from "lambert-server";
import OPCodeHandlers from "../opcodes"; import OPCodeHandlers from "../opcodes";
import { VoiceOPCodes } from "../util"; import { VoiceOPCodes, VoicePayload, WebRtcWebSocket } from "../util";
const PayloadSchema = { const PayloadSchema = {
op: Number, op: Number,
@ -28,9 +28,9 @@ const PayloadSchema = {
$t: String, $t: String,
}; };
export async function onMessage(this: WebSocket, buffer: Buffer) { export async function onMessage(this: WebRtcWebSocket, buffer: Buffer) {
try { try {
const data: Payload = JSON.parse(buffer.toString()); const data: VoicePayload = JSON.parse(buffer.toString());
if (data.op !== VoiceOPCodes.IDENTIFY && !this.user_id) if (data.op !== VoiceOPCodes.IDENTIFY && !this.user_id)
return this.close(CLOSECODES.Not_authenticated); return this.close(CLOSECODES.Not_authenticated);

View File

@ -1,13 +1,7 @@
import { CodecInfo, MediaInfo, SDPInfo } from "semantic-sdp"; import { CodecInfo, MediaInfo, SDPInfo } from "semantic-sdp";
import { SignalingDelegate } from "../util/SignalingDelegate"; import { SignalingDelegate } from "../util/SignalingDelegate";
import { Codec, WebRtcClient } from "../util/WebRtcClient"; import { Codec, WebRtcClient } from "../util/WebRtcClient";
import { import { MediaServer, Endpoint } from "@dank074/medooze-media-server";
MediaServer,
IncomingStream,
OutgoingStream,
Transport,
Endpoint,
} from "@dank074/medooze-media-server";
import { VoiceRoom } from "./VoiceRoom"; import { VoiceRoom } from "./VoiceRoom";
import { MedoozeWebRtcClient } from "./MedoozeWebRtcClient"; import { MedoozeWebRtcClient } from "./MedoozeWebRtcClient";
@ -48,8 +42,26 @@ export class MedoozeSignalingDelegate implements SignalingDelegate {
rtcServerId: string, rtcServerId: string,
userId: string, userId: string,
ws: any, ws: any,
type: "guild-voice" | "dm-voice" | "stream",
): WebRtcClient<any> { ): WebRtcClient<any> {
const existingClient = this.getClientForUserId(userId); // make sure user isn't already in a room of the same type
// user can be in two simultanous rooms of different type though (can be in a voice channel and watching a stream for example)
const rooms = this.rooms
.values()
.filter((room) =>
type === "stream"
? room.type === "stream"
: room.type === "dm-voice" || room.type === "guild-voice",
);
let existingClient;
for (const room of rooms) {
let result = room.getClientById(userId);
if (result) {
existingClient = result;
break;
}
}
if (existingClient) { if (existingClient) {
console.log("client already connected, disconnect.."); console.log("client already connected, disconnect..");
@ -58,7 +70,7 @@ export class MedoozeSignalingDelegate implements SignalingDelegate {
if (!this._rooms.has(rtcServerId)) { if (!this._rooms.has(rtcServerId)) {
console.debug("no channel created, creating one..."); console.debug("no channel created, creating one...");
this.createChannel(rtcServerId); this.createRoom(rtcServerId, type);
} }
const room = this._rooms.get(rtcServerId)!; const room = this._rooms.get(rtcServerId)!;
@ -208,8 +220,11 @@ export class MedoozeSignalingDelegate implements SignalingDelegate {
throw new Error("Method not implemented."); throw new Error("Method not implemented.");
} }
public createChannel(rtcServerId: string): void { public createRoom(
this._rooms.set(rtcServerId, new VoiceRoom(rtcServerId, this)); rtcServerId: string,
type: "guild-voice" | "dm-voice" | "stream",
): void {
this._rooms.set(rtcServerId, new VoiceRoom(rtcServerId, type, this));
} }
public disposeRoom(rtcServerId: string): void { public disposeRoom(rtcServerId: string): void {

View File

@ -11,10 +11,15 @@ export class VoiceRoom {
private _clients: Map<string, MedoozeWebRtcClient>; private _clients: Map<string, MedoozeWebRtcClient>;
private _id: string; private _id: string;
private _sfu: MedoozeSignalingDelegate; private _sfu: MedoozeSignalingDelegate;
private _type: "guild-voice" | "dm-voice" | "stream";
constructor(id: string, sfu: MedoozeSignalingDelegate) { constructor(
id: string,
type: "guild-voice" | "dm-voice" | "stream",
sfu: MedoozeSignalingDelegate,
) {
this._id = id; this._id = id;
this._type = type;
this._clients = new Map(); this._clients = new Map();
this._sfu = sfu; this._sfu = sfu;
} }
@ -98,6 +103,10 @@ export class VoiceRoom {
return this._id; return this._id;
} }
get type(): "guild-voice" | "dm-voice" | "stream" {
return this._type;
}
public dispose(): void { public dispose(): void {
const clients = this._clients.values(); const clients = this._clients.values();
for (const client of clients) { for (const client of clients) {

View File

@ -16,10 +16,12 @@
along with this program. If not, see <https://www.gnu.org/licenses/>. along with this program. If not, see <https://www.gnu.org/licenses/>.
*/ */
import { Payload, Send, WebSocket } from "@spacebar/gateway"; import { VoiceOPCodes, VoicePayload, WebRtcWebSocket, Send } from "../util";
import { VoiceOPCodes } from "../util";
export async function onBackendVersion(this: WebSocket, data: Payload) { export async function onBackendVersion(
this: WebRtcWebSocket,
data: VoicePayload,
) {
await Send(this, { await Send(this, {
op: VoiceOPCodes.VOICE_BACKEND_VERSION, op: VoiceOPCodes.VOICE_BACKEND_VERSION,
d: { voice: "0.8.43", rtc_worker: "0.3.26" }, d: { voice: "0.8.43", rtc_worker: "0.3.26" },

View File

@ -16,16 +16,10 @@
along with this program. If not, see <https://www.gnu.org/licenses/>. along with this program. If not, see <https://www.gnu.org/licenses/>.
*/ */
import { import { CLOSECODES, setHeartbeat } from "@spacebar/gateway";
CLOSECODES, import { VoiceOPCodes, VoicePayload, WebRtcWebSocket, Send } from "../util";
Payload,
Send,
setHeartbeat,
WebSocket,
} from "@spacebar/gateway";
import { VoiceOPCodes } from "../util";
export async function onHeartbeat(this: WebSocket, data: Payload) { export async function onHeartbeat(this: WebRtcWebSocket, data: VoicePayload) {
setHeartbeat(this); setHeartbeat(this);
if (isNaN(data.d)) return this.close(CLOSECODES.Decode_error); if (isNaN(data.d)) return this.close(CLOSECODES.Decode_error);

View File

@ -16,31 +16,79 @@
along with this program. If not, see <https://www.gnu.org/licenses/>. along with this program. If not, see <https://www.gnu.org/licenses/>.
*/ */
import { CLOSECODES, Payload, Send, WebSocket } from "@spacebar/gateway"; import { CLOSECODES } from "@spacebar/gateway";
import { import {
StreamSession,
validateSchema, validateSchema,
VoiceIdentifySchema, VoiceIdentifySchema,
VoiceState, VoiceState,
} from "@spacebar/util"; } from "@spacebar/util";
import { mediaServer, VoiceOPCodes } from "@spacebar/webrtc"; import {
mediaServer,
VoiceOPCodes,
VoicePayload,
WebRtcWebSocket,
Send,
} from "@spacebar/webrtc";
export async function onIdentify(this: WebSocket, data: Payload) { export async function onIdentify(this: WebRtcWebSocket, data: VoicePayload) {
clearTimeout(this.readyTimeout); clearTimeout(this.readyTimeout);
const { server_id, user_id, session_id, token, streams, video } = const { server_id, user_id, session_id, token, streams, video } =
validateSchema("VoiceIdentifySchema", data.d) as VoiceIdentifySchema; validateSchema("VoiceIdentifySchema", data.d) as VoiceIdentifySchema;
const voiceState = await VoiceState.findOne({ // server_id can be one of the following: a unique id for a GO Live stream, a channel id for a DM voice call, or a guild id for a guild voice channel
where: { guild_id: server_id, user_id, token, session_id }, // not sure if there's a way to determine whether a snowflake is a channel id or a guild id without checking if it exists in db
// luckily we will only have to determine this once
let type: "guild-voice" | "dm-voice" | "stream";
let authenticated = false;
// first check if its a guild voice connection or DM voice call
let voiceState = await VoiceState.findOne({
where: [
{ guild_id: server_id, user_id, token, session_id },
{ channel_id: server_id, user_id, token, session_id },
],
}); });
if (!voiceState) return this.close(CLOSECODES.Authentication_failed);
if (voiceState) {
type = voiceState.guild_id === server_id ? "guild-voice" : "dm-voice";
authenticated = true;
} else {
// if its not a guild/dm voice connection, check if it is a go live stream
const streamSession = await StreamSession.findOne({
where: {
stream_id: server_id,
user_id,
token,
session_id,
used: false,
},
});
if (streamSession) {
type = "stream";
authenticated = true;
streamSession.used = true;
await streamSession.save();
this.once("close", async () => {
await streamSession.remove();
});
}
}
// if it doesnt match any then not valid token
if (!authenticated) return this.close(CLOSECODES.Authentication_failed);
this.user_id = user_id; this.user_id = user_id;
this.session_id = session_id; this.session_id = session_id;
this.voiceWs = mediaServer.join(voiceState.channel_id, this.user_id, this); this.type = type!;
this.webRtcClient = mediaServer.join(server_id, this.user_id, this, type!);
this.on("close", () => { this.on("close", () => {
mediaServer.onClientClose(this.voiceWs!); // ice-lite media server relies on this to know when the peer went away
mediaServer.onClientClose(this.webRtcClient!);
}); });
await Send(this, { await Send(this, {

View File

@ -15,13 +15,20 @@
You should have received a copy of the GNU Affero General Public License You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>. along with this program. If not, see <https://www.gnu.org/licenses/>.
*/ */
import { Payload, Send, WebSocket } from "@spacebar/gateway";
import { SelectProtocolSchema, validateSchema } from "@spacebar/util"; import { SelectProtocolSchema, validateSchema } from "@spacebar/util";
import { VoiceOPCodes, mediaServer } from "@spacebar/webrtc"; import {
VoiceOPCodes,
VoicePayload,
WebRtcWebSocket,
mediaServer,
Send,
} from "@spacebar/webrtc";
export async function onSelectProtocol(this: WebSocket, payload: Payload) { export async function onSelectProtocol(
if (!this.voiceWs) return; this: WebRtcWebSocket,
payload: VoicePayload,
) {
if (!this.webRtcClient) return;
const data = validateSchema( const data = validateSchema(
"SelectProtocolSchema", "SelectProtocolSchema",
@ -29,7 +36,7 @@ export async function onSelectProtocol(this: WebSocket, payload: Payload) {
) as SelectProtocolSchema; ) as SelectProtocolSchema;
const answer = await mediaServer.onOffer( const answer = await mediaServer.onOffer(
this.voiceWs, this.webRtcClient,
data.sdp!, data.sdp!,
data.codecs ?? [], data.codecs ?? [],
); );

View File

@ -16,16 +16,23 @@
along with this program. If not, see <https://www.gnu.org/licenses/>. along with this program. If not, see <https://www.gnu.org/licenses/>.
*/ */
import { Payload, Send, WebSocket } from "@spacebar/gateway"; import {
import { mediaServer, VoiceOPCodes } from "../util"; mediaServer,
VoiceOPCodes,
VoicePayload,
WebRtcWebSocket,
Send,
} from "../util";
// {"speaking":1,"delay":5,"ssrc":2805246727} // {"speaking":1,"delay":5,"ssrc":2805246727}
export async function onSpeaking(this: WebSocket, data: Payload) { export async function onSpeaking(this: WebRtcWebSocket, data: VoicePayload) {
if (!this.voiceWs) return; if (!this.webRtcClient) return;
mediaServer mediaServer
.getClientsForRtcServer<WebSocket>(this.voiceWs.rtc_server_id) .getClientsForRtcServer<WebRtcWebSocket>(
this.webRtcClient.rtc_server_id,
)
.forEach((client) => { .forEach((client) => {
if (client.user_id === this.user_id) return; if (client.user_id === this.user_id) return;

View File

@ -15,14 +15,20 @@
You should have received a copy of the GNU Affero General Public License You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>. along with this program. If not, see <https://www.gnu.org/licenses/>.
*/ */
import { Payload, Send, WebSocket } from "@spacebar/gateway";
import { validateSchema, VoiceVideoSchema } from "@spacebar/util"; import { validateSchema, VoiceVideoSchema } from "@spacebar/util";
import { mediaServer, VoiceOPCodes, WebRtcClient } from "@spacebar/webrtc"; import {
mediaServer,
VoiceOPCodes,
VoicePayload,
WebRtcClient,
WebRtcWebSocket,
Send,
} from "@spacebar/webrtc";
export async function onVideo(this: WebSocket, payload: Payload) { export async function onVideo(this: WebRtcWebSocket, payload: VoicePayload) {
if (!this.voiceWs || !this.voiceWs.webrtcConnected) return; if (!this.webRtcClient || !this.webRtcClient.webrtcConnected) return;
const { rtc_server_id: channel_id } = this.voiceWs;
const { rtc_server_id } = this.webRtcClient;
const d = validateSchema("VoiceVideoSchema", payload.d) as VoiceVideoSchema; const d = validateSchema("VoiceVideoSchema", payload.d) as VoiceVideoSchema;
@ -30,9 +36,9 @@ export async function onVideo(this: WebSocket, payload: Payload) {
await Send(this, { op: VoiceOPCodes.MEDIA_SINK_WANTS, d: { any: 100 } }); await Send(this, { op: VoiceOPCodes.MEDIA_SINK_WANTS, d: { any: 100 } });
const ssrcs = this.voiceWs.getIncomingStreamSSRCs(); const ssrcs = this.webRtcClient.getIncomingStreamSSRCs();
const clientsThatNeedUpdate = new Set<WebRtcClient<WebSocket>>(); const clientsThatNeedUpdate = new Set<WebRtcClient<WebRtcWebSocket>>();
// check if client has signaled that it will send audio // check if client has signaled that it will send audio
if (d.audio_ssrc !== 0) { if (d.audio_ssrc !== 0) {
@ -41,12 +47,14 @@ export async function onVideo(this: WebSocket, payload: Payload) {
console.log( console.log(
`[${this.user_id}] publishing new audio track ssrc:${d.audio_ssrc}`, `[${this.user_id}] publishing new audio track ssrc:${d.audio_ssrc}`,
); );
this.voiceWs.publishTrack("audio", { audio_ssrc: d.audio_ssrc }); this.webRtcClient.publishTrack("audio", {
audio_ssrc: d.audio_ssrc,
});
} }
// now check that all clients have outgoing media for this ssrcs // now check that all clients have outgoing media for this ssrcs
for (const client of mediaServer.getClientsForRtcServer<WebSocket>( for (const client of mediaServer.getClientsForRtcServer<WebRtcWebSocket>(
channel_id, rtc_server_id,
)) { )) {
if (client.user_id === this.user_id) continue; if (client.user_id === this.user_id) continue;
@ -55,7 +63,7 @@ export async function onVideo(this: WebSocket, payload: Payload) {
console.log( console.log(
`[${client.user_id}] subscribing to audio track ssrcs: ${d.audio_ssrc}`, `[${client.user_id}] subscribing to audio track ssrcs: ${d.audio_ssrc}`,
); );
client.subscribeToTrack(this.voiceWs.user_id, "audio"); client.subscribeToTrack(this.webRtcClient.user_id, "audio");
clientsThatNeedUpdate.add(client); clientsThatNeedUpdate.add(client);
} }
@ -68,26 +76,26 @@ export async function onVideo(this: WebSocket, payload: Payload) {
console.log( console.log(
`[${this.user_id}] publishing new video track ssrc:${d.video_ssrc}`, `[${this.user_id}] publishing new video track ssrc:${d.video_ssrc}`,
); );
this.voiceWs.publishTrack("video", { this.webRtcClient.publishTrack("video", {
video_ssrc: d.video_ssrc, video_ssrc: d.video_ssrc,
rtx_ssrc: d.rtx_ssrc, rtx_ssrc: d.rtx_ssrc,
}); });
} }
// now check that all clients have outgoing media for this ssrcs // now check that all clients have outgoing media for this ssrcs
for (const client of mediaServer.getClientsForRtcServer<WebSocket>( for (const client of mediaServer.getClientsForRtcServer<WebRtcWebSocket>(
channel_id, rtc_server_id,
)) { )) {
if (client.user_id === this.user_id) continue; if (client.user_id === this.user_id) continue;
const ssrcs = client.getOutgoingStreamSSRCsForUser( const ssrcs = client.getOutgoingStreamSSRCsForUser(
this.voiceWs.user_id, this.webRtcClient.user_id,
); );
if (ssrcs.video_ssrc != d.video_ssrc) { if (ssrcs.video_ssrc != d.video_ssrc) {
console.log( console.log(
`[${client.user_id}] subscribing to video track ssrc: ${d.video_ssrc}`, `[${client.user_id}] subscribing to video track ssrc: ${d.video_ssrc}`,
); );
client.subscribeToTrack(this.voiceWs.user_id, "video"); client.subscribeToTrack(this.webRtcClient.user_id, "video");
clientsThatNeedUpdate.add(client); clientsThatNeedUpdate.add(client);
} }

View File

@ -16,8 +16,7 @@
along with this program. If not, see <https://www.gnu.org/licenses/>. along with this program. If not, see <https://www.gnu.org/licenses/>.
*/ */
import { Payload, WebSocket } from "@spacebar/gateway"; import { VoiceOPCodes, VoicePayload, WebRtcWebSocket } from "../util";
import { VoiceOPCodes } from "../util";
import { onBackendVersion } from "./BackendVersion"; import { onBackendVersion } from "./BackendVersion";
import { onHeartbeat } from "./Heartbeat"; import { onHeartbeat } from "./Heartbeat";
import { onIdentify } from "./Identify"; import { onIdentify } from "./Identify";
@ -25,7 +24,7 @@ import { onSelectProtocol } from "./SelectProtocol";
import { onSpeaking } from "./Speaking"; import { onSpeaking } from "./Speaking";
import { onVideo } from "./Video"; import { onVideo } from "./Video";
export type OPCodeHandler = (this: WebSocket, data: Payload) => any; export type OPCodeHandler = (this: WebRtcWebSocket, data: VoicePayload) => any;
export default { export default {
[VoiceOPCodes.HEARTBEAT]: onHeartbeat, [VoiceOPCodes.HEARTBEAT]: onHeartbeat,

View File

@ -16,6 +16,8 @@
along with this program. If not, see <https://www.gnu.org/licenses/>. along with this program. If not, see <https://www.gnu.org/licenses/>.
*/ */
import { Payload } from "@spacebar/gateway";
export enum VoiceStatus { export enum VoiceStatus {
CONNECTED = 0, CONNECTED = 0,
CONNECTING = 1, CONNECTING = 1,
@ -42,3 +44,5 @@ export enum VoiceOPCodes {
VOICE_BACKEND_VERSION = 16, VOICE_BACKEND_VERSION = 16,
CHANNEL_OPTIONS_UPDATE = 17, CHANNEL_OPTIONS_UPDATE = 17,
} }
export type VoicePayload = Omit<Payload, "op"> & { op: VoiceOPCodes };

View File

@ -18,6 +18,7 @@
//import { MedoozeSignalingDelegate } from "../medooze/MedoozeSignalingDelegate"; //import { MedoozeSignalingDelegate } from "../medooze/MedoozeSignalingDelegate";
import { SignalingDelegate } from "./SignalingDelegate"; import { SignalingDelegate } from "./SignalingDelegate";
import { green, red } from "picocolors";
export let mediaServer: SignalingDelegate; export let mediaServer: SignalingDelegate;
@ -27,9 +28,13 @@ export let mediaServer: SignalingDelegate;
mediaServer = new ( mediaServer = new (
await import("../medooze/MedoozeSignalingDelegate") await import("../medooze/MedoozeSignalingDelegate")
).MedoozeSignalingDelegate(); ).MedoozeSignalingDelegate();
console.log(
`[WebRTC] ${green("Succesfully loaded MedoozeSignalingDelegate")}`,
);
} catch (e) { } catch (e) {
console.error("Failed to import MedoozeSignalingDelegate", e); console.log(
// Fallback to a different implementation or handle the error `[WebRTC] ${red("Failed to import MedoozeSignalingDelegate")}`,
// For example, you could set mediaServer to null or throw an error );
} }
})(); })();

27
src/webrtc/util/Send.ts Normal file
View File

@ -0,0 +1,27 @@
import { JSONReplacer } from "@spacebar/util";
import { VoicePayload } from "./Constants";
import { WebRtcWebSocket } from "./WebRtcWebSocket";
export function Send(socket: WebRtcWebSocket, data: VoicePayload) {
if (process.env.WRTC_WS_VERBOSE)
console.log(`[WebRTC] Outgoing message: ${JSON.stringify(data)}`);
let buffer: Buffer | string;
// TODO: encode circular object
if (socket.encoding === "json") buffer = JSON.stringify(data, JSONReplacer);
else return;
return new Promise((res, rej) => {
if (socket.readyState !== 1) {
// return rej("socket not open");
socket.close();
return;
}
socket.send(buffer, (err) => {
if (err) return rej(err);
return res(null);
});
});
}

View File

@ -3,7 +3,12 @@ import { Codec, WebRtcClient } from "./WebRtcClient";
export interface SignalingDelegate { export interface SignalingDelegate {
start: () => Promise<void>; start: () => Promise<void>;
stop: () => Promise<void>; stop: () => Promise<void>;
join<T>(rtcServerId: string, userId: string, ws: T): WebRtcClient<T>; join<T>(
rtcServerId: string,
userId: string,
ws: T,
type: "guild-voice" | "dm-voice" | "stream",
): WebRtcClient<T>;
onOffer<T>( onOffer<T>(
client: WebRtcClient<T>, client: WebRtcClient<T>,
offer: string, offer: string,

View File

@ -0,0 +1,7 @@
import { WebSocket } from "@spacebar/gateway";
import { WebRtcClient } from "./WebRtcClient";
export interface WebRtcWebSocket extends WebSocket {
type: "guild-voice" | "dm-voice" | "stream";
webRtcClient?: WebRtcClient<WebRtcWebSocket>;
}

View File

@ -19,3 +19,5 @@
export * from "./Constants"; export * from "./Constants";
export * from "./MediaServer"; export * from "./MediaServer";
export * from "./WebRtcClient"; export * from "./WebRtcClient";
export * from "./WebRtcWebSocket";
export * from "./Send";