Merge pull request #1612 from thecodingmachine/fix-admin-sockets
Creating only one WS connection to pusher from admin
This commit is contained in:
commit
7b94f8b8ca
@ -52,7 +52,7 @@
|
|||||||
"openid-client": "^4.7.4",
|
"openid-client": "^4.7.4",
|
||||||
"prom-client": "^12.0.0",
|
"prom-client": "^12.0.0",
|
||||||
"query-string": "^6.13.3",
|
"query-string": "^6.13.3",
|
||||||
"uWebSockets.js": "uNetworking/uWebSockets.js#v18.5.0",
|
"uWebSockets.js": "uNetworking/uWebSockets.js#v20.4.0",
|
||||||
"uuidv4": "^6.0.7"
|
"uuidv4": "^6.0.7"
|
||||||
},
|
},
|
||||||
"devDependencies": {
|
"devDependencies": {
|
||||||
@ -71,8 +71,8 @@
|
|||||||
"jasmine": "^3.5.0",
|
"jasmine": "^3.5.0",
|
||||||
"lint-staged": "^11.0.0",
|
"lint-staged": "^11.0.0",
|
||||||
"prettier": "^2.3.1",
|
"prettier": "^2.3.1",
|
||||||
"ts-node-dev": "^1.0.0-pre.44",
|
"ts-node-dev": "^1.1.8",
|
||||||
"typescript": "^3.8.3"
|
"typescript": "^4.5.2"
|
||||||
},
|
},
|
||||||
"lint-staged": {
|
"lint-staged": {
|
||||||
"*.ts": [
|
"*.ts": [
|
||||||
|
@ -44,7 +44,7 @@ export class AdminController extends BaseController {
|
|||||||
const roomId: string = body.roomId;
|
const roomId: string = body.roomId;
|
||||||
|
|
||||||
await apiClientRepository.getClient(roomId).then((roomClient) => {
|
await apiClientRepository.getClient(roomId).then((roomClient) => {
|
||||||
return new Promise((res, rej) => {
|
return new Promise<void>((res, rej) => {
|
||||||
const roomMessage = new RefreshRoomPromptMessage();
|
const roomMessage = new RefreshRoomPromptMessage();
|
||||||
roomMessage.setRoomid(roomId);
|
roomMessage.setRoomid(roomId);
|
||||||
|
|
||||||
@ -101,7 +101,7 @@ export class AdminController extends BaseController {
|
|||||||
await Promise.all(
|
await Promise.all(
|
||||||
targets.map((roomId) => {
|
targets.map((roomId) => {
|
||||||
return apiClientRepository.getClient(roomId).then((roomClient) => {
|
return apiClientRepository.getClient(roomId).then((roomClient) => {
|
||||||
return new Promise((res, rej) => {
|
return new Promise<void>((res, rej) => {
|
||||||
if (type === "message") {
|
if (type === "message") {
|
||||||
const roomMessage = new AdminRoomMessage();
|
const roomMessage = new AdminRoomMessage();
|
||||||
roomMessage.setMessage(text);
|
roomMessage.setMessage(text);
|
||||||
|
9
pusher/src/Controller/InvalidTokenError.ts
Normal file
9
pusher/src/Controller/InvalidTokenError.ts
Normal file
@ -0,0 +1,9 @@
|
|||||||
|
/**
|
||||||
|
* Errors related to variable handling.
|
||||||
|
*/
|
||||||
|
export class InvalidTokenError extends Error {
|
||||||
|
constructor(message: string) {
|
||||||
|
super(message);
|
||||||
|
Object.setPrototypeOf(this, InvalidTokenError.prototype);
|
||||||
|
}
|
||||||
|
}
|
@ -22,14 +22,17 @@ import {
|
|||||||
import { UserMovesMessage } from "../Messages/generated/messages_pb";
|
import { UserMovesMessage } from "../Messages/generated/messages_pb";
|
||||||
import { TemplatedApp } from "uWebSockets.js";
|
import { TemplatedApp } from "uWebSockets.js";
|
||||||
import { parse } from "query-string";
|
import { parse } from "query-string";
|
||||||
import { jwtTokenManager, tokenInvalidException } from "../Services/JWTTokenManager";
|
import { AdminSocketTokenData, jwtTokenManager, tokenInvalidException } from "../Services/JWTTokenManager";
|
||||||
import { adminApi, FetchMemberDataByUuidResponse } from "../Services/AdminApi";
|
import { adminApi, FetchMemberDataByUuidResponse } from "../Services/AdminApi";
|
||||||
import { SocketManager, socketManager } from "../Services/SocketManager";
|
import { SocketManager, socketManager } from "../Services/SocketManager";
|
||||||
import { emitInBatch } from "../Services/IoSocketHelpers";
|
import { emitInBatch } from "../Services/IoSocketHelpers";
|
||||||
import { ADMIN_SOCKETS_TOKEN, ADMIN_API_URL, DISABLE_ANONYMOUS, SOCKET_IDLE_TIMER } from "../Enum/EnvironmentVariable";
|
import { ADMIN_API_URL, DISABLE_ANONYMOUS, SOCKET_IDLE_TIMER } from "../Enum/EnvironmentVariable";
|
||||||
import { Zone } from "_Model/Zone";
|
import { Zone } from "_Model/Zone";
|
||||||
import { ExAdminSocketInterface } from "_Model/Websocket/ExAdminSocketInterface";
|
import { ExAdminSocketInterface } from "_Model/Websocket/ExAdminSocketInterface";
|
||||||
import { CharacterTexture } from "../Services/AdminApi/CharacterTexture";
|
import { CharacterTexture } from "../Services/AdminApi/CharacterTexture";
|
||||||
|
import { isAdminMessageInterface } from "../Model/Websocket/Admin/AdminMessages";
|
||||||
|
import Axios from "axios";
|
||||||
|
import { InvalidTokenError } from "../Controller/InvalidTokenError";
|
||||||
|
|
||||||
export class IoSocketController {
|
export class IoSocketController {
|
||||||
private nextUserId: number = 1;
|
private nextUserId: number = 1;
|
||||||
@ -42,59 +45,108 @@ export class IoSocketController {
|
|||||||
adminRoomSocket() {
|
adminRoomSocket() {
|
||||||
this.app.ws("/admin/rooms", {
|
this.app.ws("/admin/rooms", {
|
||||||
upgrade: (res, req, context) => {
|
upgrade: (res, req, context) => {
|
||||||
const query = parse(req.getQuery());
|
|
||||||
const websocketKey = req.getHeader("sec-websocket-key");
|
const websocketKey = req.getHeader("sec-websocket-key");
|
||||||
const websocketProtocol = req.getHeader("sec-websocket-protocol");
|
const websocketProtocol = req.getHeader("sec-websocket-protocol");
|
||||||
const websocketExtensions = req.getHeader("sec-websocket-extensions");
|
const websocketExtensions = req.getHeader("sec-websocket-extensions");
|
||||||
const token = query.token;
|
|
||||||
let authorizedRoomIds: string[];
|
|
||||||
try {
|
|
||||||
const data = jwtTokenManager.verifyAdminSocketToken(token as string);
|
|
||||||
authorizedRoomIds = data.authorizedRoomIds;
|
|
||||||
} catch (e) {
|
|
||||||
console.error("Admin access refused for token: " + token);
|
|
||||||
res.writeStatus("401 Unauthorized").end("Incorrect token");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
const roomId = query.roomId;
|
|
||||||
if (typeof roomId !== "string" || !authorizedRoomIds.includes(roomId)) {
|
|
||||||
console.error("Invalid room id");
|
|
||||||
res.writeStatus("403 Bad Request").end("Invalid room id");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
res.upgrade({ roomId }, websocketKey, websocketProtocol, websocketExtensions, context);
|
res.upgrade({}, websocketKey, websocketProtocol, websocketExtensions, context);
|
||||||
},
|
},
|
||||||
open: (ws) => {
|
open: (ws) => {
|
||||||
console.log("Admin socket connect for room: " + ws.roomId);
|
console.log("Admin socket connect to client on " + Buffer.from(ws.getRemoteAddressAsText()).toString());
|
||||||
ws.disconnecting = false;
|
ws.disconnecting = false;
|
||||||
|
|
||||||
socketManager.handleAdminRoom(ws as ExAdminSocketInterface, ws.roomId as string);
|
|
||||||
},
|
},
|
||||||
message: (ws, arrayBuffer, isBinary): void => {
|
message: (ws, arrayBuffer, isBinary): void => {
|
||||||
try {
|
try {
|
||||||
//TODO refactor message type and data
|
const message = JSON.parse(new TextDecoder("utf-8").decode(new Uint8Array(arrayBuffer)));
|
||||||
const message: { event: string; message: { type: string; message: unknown; userUuid: string } } =
|
|
||||||
JSON.parse(new TextDecoder("utf-8").decode(new Uint8Array(arrayBuffer)));
|
|
||||||
|
|
||||||
if (message.event === "user-message") {
|
if (!isAdminMessageInterface(message)) {
|
||||||
const messageToEmit = message.message as { message: string; type: string; userUuid: string };
|
console.error("Invalid message received.", message);
|
||||||
|
ws.send(
|
||||||
|
JSON.stringify({
|
||||||
|
type: "Error",
|
||||||
|
data: {
|
||||||
|
message: "Invalid message received! The connection has been closed.",
|
||||||
|
},
|
||||||
|
})
|
||||||
|
);
|
||||||
|
ws.close();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const token = message.jwt;
|
||||||
|
|
||||||
|
let data: AdminSocketTokenData;
|
||||||
|
|
||||||
|
try {
|
||||||
|
data = jwtTokenManager.verifyAdminSocketToken(token);
|
||||||
|
} catch (e) {
|
||||||
|
console.error("Admin socket access refused for token: " + token, e);
|
||||||
|
ws.send(
|
||||||
|
JSON.stringify({
|
||||||
|
type: "Error",
|
||||||
|
data: {
|
||||||
|
message: "Admin socket access refused! The connection has been closed.",
|
||||||
|
},
|
||||||
|
})
|
||||||
|
);
|
||||||
|
ws.close();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const authorizedRoomIds = data.authorizedRoomIds;
|
||||||
|
|
||||||
|
if (message.event === "listen") {
|
||||||
|
const notAuthorizedRoom = message.roomIds.filter(
|
||||||
|
(roomId) => !authorizedRoomIds.includes(roomId)
|
||||||
|
);
|
||||||
|
|
||||||
|
if (notAuthorizedRoom.length > 0) {
|
||||||
|
const errorMessage = `Admin socket refused for client on ${Buffer.from(
|
||||||
|
ws.getRemoteAddressAsText()
|
||||||
|
).toString()} listening of : \n${JSON.stringify(notAuthorizedRoom)}`;
|
||||||
|
console.error();
|
||||||
|
ws.send(
|
||||||
|
JSON.stringify({
|
||||||
|
type: "Error",
|
||||||
|
data: {
|
||||||
|
message: errorMessage,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
);
|
||||||
|
ws.close();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (const roomId of message.roomIds) {
|
||||||
|
socketManager
|
||||||
|
.handleAdminRoom(ws as ExAdminSocketInterface, roomId)
|
||||||
|
.catch((e) => console.error(e));
|
||||||
|
}
|
||||||
|
} else if (message.event === "user-message") {
|
||||||
|
const messageToEmit = message.message;
|
||||||
|
// Get roomIds of the world where we want broadcast the message
|
||||||
|
const roomIds = authorizedRoomIds.filter(
|
||||||
|
(authorizeRoomId) => authorizeRoomId.split("/")[5] === message.world
|
||||||
|
);
|
||||||
|
|
||||||
|
for (const roomId of roomIds) {
|
||||||
if (messageToEmit.type === "banned") {
|
if (messageToEmit.type === "banned") {
|
||||||
socketManager.emitBan(
|
socketManager
|
||||||
|
.emitBan(messageToEmit.userUuid, messageToEmit.message, messageToEmit.type, roomId)
|
||||||
|
.catch((error) => console.error(error));
|
||||||
|
} else if (messageToEmit.type === "ban") {
|
||||||
|
socketManager
|
||||||
|
.emitSendUserMessage(
|
||||||
messageToEmit.userUuid,
|
messageToEmit.userUuid,
|
||||||
messageToEmit.message,
|
messageToEmit.message,
|
||||||
messageToEmit.type,
|
messageToEmit.type,
|
||||||
ws.roomId as string
|
roomId
|
||||||
);
|
)
|
||||||
|
.catch((error) => console.error(error));
|
||||||
}
|
}
|
||||||
if (messageToEmit.type === "ban") {
|
|
||||||
socketManager.emitSendUserMessage(
|
|
||||||
messageToEmit.userUuid,
|
|
||||||
messageToEmit.message,
|
|
||||||
messageToEmit.type,
|
|
||||||
ws.roomId as string
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
const tmp: never = message.event;
|
||||||
}
|
}
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
console.error(err);
|
console.error(err);
|
||||||
@ -202,6 +254,7 @@ export class IoSocketController {
|
|||||||
try {
|
try {
|
||||||
userData = await adminApi.fetchMemberDataByUuid(userIdentifier, roomId, IPAddress);
|
userData = await adminApi.fetchMemberDataByUuid(userIdentifier, roomId, IPAddress);
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
|
if (Axios.isAxiosError(err)) {
|
||||||
if (err?.response?.status == 404) {
|
if (err?.response?.status == 404) {
|
||||||
// If we get an HTTP 404, the token is invalid. Let's perform an anonymous login!
|
// If we get an HTTP 404, the token is invalid. Let's perform an anonymous login!
|
||||||
|
|
||||||
@ -224,6 +277,7 @@ export class IoSocketController {
|
|||||||
websocketExtensions,
|
websocketExtensions,
|
||||||
context
|
context
|
||||||
);
|
);
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
throw err;
|
throw err;
|
||||||
}
|
}
|
||||||
@ -302,17 +356,31 @@ export class IoSocketController {
|
|||||||
context
|
context
|
||||||
);
|
);
|
||||||
} catch (e) {
|
} catch (e) {
|
||||||
|
if (e instanceof Error) {
|
||||||
res.upgrade(
|
res.upgrade(
|
||||||
{
|
{
|
||||||
rejected: true,
|
rejected: true,
|
||||||
reason: e.reason || null,
|
reason: e instanceof InvalidTokenError ? tokenInvalidException : null,
|
||||||
message: e.message ? e.message : "500 Internal Server Error",
|
message: e.message,
|
||||||
},
|
},
|
||||||
websocketKey,
|
websocketKey,
|
||||||
websocketProtocol,
|
websocketProtocol,
|
||||||
websocketExtensions,
|
websocketExtensions,
|
||||||
context
|
context
|
||||||
);
|
);
|
||||||
|
} else {
|
||||||
|
res.upgrade(
|
||||||
|
{
|
||||||
|
rejected: true,
|
||||||
|
reason: null,
|
||||||
|
message: "500 Internal Server Error",
|
||||||
|
},
|
||||||
|
websocketKey,
|
||||||
|
websocketProtocol,
|
||||||
|
websocketExtensions,
|
||||||
|
context
|
||||||
|
);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
})();
|
})();
|
||||||
},
|
},
|
||||||
|
30
pusher/src/Model/Websocket/Admin/AdminMessages.ts
Normal file
30
pusher/src/Model/Websocket/Admin/AdminMessages.ts
Normal file
@ -0,0 +1,30 @@
|
|||||||
|
import * as tg from "generic-type-guard";
|
||||||
|
|
||||||
|
export const isBanBannedAdminMessageInterface = new tg.IsInterface()
|
||||||
|
.withProperties({
|
||||||
|
type: tg.isSingletonStringUnion("ban", "banned"),
|
||||||
|
message: tg.isString,
|
||||||
|
userUuid: tg.isString,
|
||||||
|
})
|
||||||
|
.get();
|
||||||
|
|
||||||
|
export const isUserMessageAdminMessageInterface = new tg.IsInterface()
|
||||||
|
.withProperties({
|
||||||
|
event: tg.isSingletonString("user-message"),
|
||||||
|
message: isBanBannedAdminMessageInterface,
|
||||||
|
world: tg.isString,
|
||||||
|
jwt: tg.isString,
|
||||||
|
})
|
||||||
|
.get();
|
||||||
|
|
||||||
|
export const isListenRoomsMessageInterface = new tg.IsInterface()
|
||||||
|
.withProperties({
|
||||||
|
event: tg.isSingletonString("listen"),
|
||||||
|
roomIds: tg.isArray(tg.isString),
|
||||||
|
jwt: tg.isString,
|
||||||
|
})
|
||||||
|
.get();
|
||||||
|
|
||||||
|
export const isAdminMessageInterface = tg.isUnion(isUserMessageAdminMessageInterface, isListenRoomsMessageInterface);
|
||||||
|
|
||||||
|
export type AdminMessageInterface = tg.GuardedType<typeof isAdminMessageInterface>;
|
@ -3,6 +3,7 @@ import { uuid } from "uuidv4";
|
|||||||
import Jwt, { verify } from "jsonwebtoken";
|
import Jwt, { verify } from "jsonwebtoken";
|
||||||
import { TokenInterface } from "../Controller/AuthenticateController";
|
import { TokenInterface } from "../Controller/AuthenticateController";
|
||||||
import { adminApi, AdminBannedData } from "../Services/AdminApi";
|
import { adminApi, AdminBannedData } from "../Services/AdminApi";
|
||||||
|
import { InvalidTokenError } from "../Controller/InvalidTokenError";
|
||||||
|
|
||||||
export interface AuthTokenData {
|
export interface AuthTokenData {
|
||||||
identifier: string; //will be a email if logged in or an uuid if anonymous
|
identifier: string; //will be a email if logged in or an uuid if anonymous
|
||||||
@ -26,7 +27,12 @@ class JWTTokenManager {
|
|||||||
try {
|
try {
|
||||||
return Jwt.verify(token, SECRET_KEY, { ignoreExpiration }) as AuthTokenData;
|
return Jwt.verify(token, SECRET_KEY, { ignoreExpiration }) as AuthTokenData;
|
||||||
} catch (e) {
|
} catch (e) {
|
||||||
throw { reason: tokenInvalidException, message: e.message };
|
if (e instanceof Error) {
|
||||||
|
// FIXME: we are loosing the stacktrace here.
|
||||||
|
throw new InvalidTokenError(e.message);
|
||||||
|
} else {
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -132,6 +132,12 @@ export class SocketManager implements ZoneEventListener {
|
|||||||
const message = new AdminPusherToBackMessage();
|
const message = new AdminPusherToBackMessage();
|
||||||
message.setSubscribetoroom(roomId);
|
message.setSubscribetoroom(roomId);
|
||||||
|
|
||||||
|
console.log(
|
||||||
|
`Admin socket handle room ${roomId} connections for a client on ${Buffer.from(
|
||||||
|
client.getRemoteAddressAsText()
|
||||||
|
).toString()}`
|
||||||
|
);
|
||||||
|
|
||||||
adminRoomStream.write(message);
|
adminRoomStream.write(message);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2270,7 +2270,7 @@ tree-kill@^1.2.2:
|
|||||||
resolved "https://registry.yarnpkg.com/tree-kill/-/tree-kill-1.2.2.tgz#4ca09a9092c88b73a7cdc5e8a01b507b0790a0cc"
|
resolved "https://registry.yarnpkg.com/tree-kill/-/tree-kill-1.2.2.tgz#4ca09a9092c88b73a7cdc5e8a01b507b0790a0cc"
|
||||||
integrity sha512-L0Orpi8qGpRG//Nd+H90vFB+3iHnue1zSSGmNOOCh1GLJ7rUKVwV2HvijphGQS2UmhUZewS9VgvxYIdgr+fG1A==
|
integrity sha512-L0Orpi8qGpRG//Nd+H90vFB+3iHnue1zSSGmNOOCh1GLJ7rUKVwV2HvijphGQS2UmhUZewS9VgvxYIdgr+fG1A==
|
||||||
|
|
||||||
ts-node-dev@^1.0.0-pre.44:
|
ts-node-dev@^1.1.8:
|
||||||
version "1.1.8"
|
version "1.1.8"
|
||||||
resolved "https://registry.yarnpkg.com/ts-node-dev/-/ts-node-dev-1.1.8.tgz#95520d8ab9d45fffa854d6668e2f8f9286241066"
|
resolved "https://registry.yarnpkg.com/ts-node-dev/-/ts-node-dev-1.1.8.tgz#95520d8ab9d45fffa854d6668e2f8f9286241066"
|
||||||
integrity sha512-Q/m3vEwzYwLZKmV6/0VlFxcZzVV/xcgOt+Tx/VjaaRHyiBcFlV0541yrT09QjzzCxlDZ34OzKjrFAynlmtflEg==
|
integrity sha512-Q/m3vEwzYwLZKmV6/0VlFxcZzVV/xcgOt+Tx/VjaaRHyiBcFlV0541yrT09QjzzCxlDZ34OzKjrFAynlmtflEg==
|
||||||
@ -2337,14 +2337,14 @@ type-fest@^0.8.1:
|
|||||||
resolved "https://registry.yarnpkg.com/type-fest/-/type-fest-0.8.1.tgz#09e249ebde851d3b1e48d27c105444667f17b83d"
|
resolved "https://registry.yarnpkg.com/type-fest/-/type-fest-0.8.1.tgz#09e249ebde851d3b1e48d27c105444667f17b83d"
|
||||||
integrity sha512-4dbzIzqvjtgiM5rw1k5rEHtBANKmdudhGyBEajN01fEyhaAIhsoKNy6y7+IN93IfpFtwY9iqi7kD+xwKhQsNJA==
|
integrity sha512-4dbzIzqvjtgiM5rw1k5rEHtBANKmdudhGyBEajN01fEyhaAIhsoKNy6y7+IN93IfpFtwY9iqi7kD+xwKhQsNJA==
|
||||||
|
|
||||||
typescript@^3.8.3:
|
typescript@^4.5.2:
|
||||||
version "3.9.10"
|
version "4.5.2"
|
||||||
resolved "https://registry.yarnpkg.com/typescript/-/typescript-3.9.10.tgz#70f3910ac7a51ed6bef79da7800690b19bf778b8"
|
resolved "https://registry.yarnpkg.com/typescript/-/typescript-4.5.2.tgz#8ac1fba9f52256fdb06fb89e4122fa6a346c2998"
|
||||||
integrity sha512-w6fIxVE/H1PkLKcCPsFqKE7Kv7QUwhU8qQY2MueZXWx5cPZdwFupLgKK3vntcK98BtNHZtAF4LA/yl2a7k8R6Q==
|
integrity sha512-5BlMof9H1yGt0P8/WF+wPNw6GfctgGjXp5hkblpyT+8rkASSmkUKMXrxR0Xg8ThVCi/JnHQiKXeBaEwCeQwMFw==
|
||||||
|
|
||||||
uWebSockets.js@uNetworking/uWebSockets.js#v18.5.0:
|
uWebSockets.js@uNetworking/uWebSockets.js#v20.4.0:
|
||||||
version "18.5.0"
|
version "20.4.0"
|
||||||
resolved "https://codeload.github.com/uNetworking/uWebSockets.js/tar.gz/9b1605d2db82981cafe69dbe356e10ce412f5805"
|
resolved "https://codeload.github.com/uNetworking/uWebSockets.js/tar.gz/65f39bdff763be3883e6cf18e433dd4fec155845"
|
||||||
|
|
||||||
uri-js@^4.2.2:
|
uri-js@^4.2.2:
|
||||||
version "4.4.1"
|
version "4.4.1"
|
||||||
|
Loading…
Reference in New Issue
Block a user