From df0636c51369b1f005d8f0c2acc74f85ba7c9256 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?David=20N=C3=A9grier?= Date: Fri, 18 Sep 2020 15:51:15 +0200 Subject: [PATCH] Migrating user position messages to protobuf --- back/src/Controller/IoSocketController.ts | 32 ++++++++++---- back/src/Model/Websocket/ExSocketInterface.ts | 5 ++- back/src/Model/Websocket/MessageUserMoved.ts | 6 --- back/src/Model/Websocket/ProtobufUtils.ts | 35 +++++++++++++++ benchmark/socketio-load-test.yaml | 18 ++------ benchmark/socketioLoadTest.js | 33 ++++++++++++++ front/src/Connection.ts | 43 ++++++++++++++++--- front/src/Network/ProtobufClientUtils.ts | 34 +++++++++++++++ front/src/Phaser/Game/GameScene.ts | 17 +++++++- messages/messages.proto | 25 ++++++++--- 10 files changed, 202 insertions(+), 46 deletions(-) delete mode 100644 back/src/Model/Websocket/MessageUserMoved.ts create mode 100644 back/src/Model/Websocket/ProtobufUtils.ts create mode 100644 front/src/Network/ProtobufClientUtils.ts diff --git a/back/src/Controller/IoSocketController.ts b/back/src/Controller/IoSocketController.ts index a2732fb8..f3eab483 100644 --- a/back/src/Controller/IoSocketController.ts +++ b/back/src/Controller/IoSocketController.ts @@ -10,7 +10,6 @@ import {Group} from "../Model/Group"; import {User} from "../Model/User"; import {isSetPlayerDetailsMessage,} from "../Model/Websocket/SetPlayerDetailsMessage"; import {MessageUserJoined} from "../Model/Websocket/MessageUserJoined"; -import {MessageUserMoved} from "../Model/Websocket/MessageUserMoved"; import si from "systeminformation"; import {Gauge} from "prom-client"; import {TokenInterface} from "../Controller/AuthenticateController"; @@ -23,9 +22,17 @@ import {uuid} from 'uuidv4'; import {isViewport} from "../Model/Websocket/ViewportMessage"; import {GroupUpdateInterface} from "_Model/Websocket/GroupUpdateInterface"; import {Movable} from "../Model/Movable"; -import {PositionMessage, SetPlayerDetailsMessage} from "../../../messages/generated/messages_pb"; +import { + PositionMessage, + SetPlayerDetailsMessage, + SubMessage, + UserMovedMessage, + BatchMessage +} from "../../../messages/generated/messages_pb"; import {UserMovesMessage} from "../../../messages/generated/messages_pb"; import Direction = PositionMessage.Direction; +import {ProtobufUtils} from "../Model/Websocket/ProtobufUtils"; +import toPositionMessage = ProtobufUtils.toPositionMessage; enum SocketIoEvent { CONNECTION = "connection", @@ -48,13 +55,13 @@ enum SocketIoEvent { BATCH = "batch", } -function emitInBatch(socket: ExSocketInterface, event: string | symbol, payload: unknown): void { - socket.batchedMessages.push({ event, payload}); +function emitInBatch(socket: ExSocketInterface, event: string, payload: SubMessage): void { + socket.batchedMessages.addPayload(payload); if (socket.batchTimeout === null) { socket.batchTimeout = setTimeout(() => { - socket.emit(SocketIoEvent.BATCH, socket.batchedMessages); - socket.batchedMessages = []; + socket.binary(true).emit(SocketIoEvent.BATCH, socket.batchedMessages.serializeBinary().buffer); + socket.batchedMessages = new BatchMessage(); socket.batchTimeout = null; }, 100); } @@ -159,9 +166,9 @@ export class IoSocketController { ioConnection() { this.Io.on(SocketIoEvent.CONNECTION, (socket: Socket) => { const client : ExSocketInterface = socket as ExSocketInterface; - client.batchedMessages = []; + client.batchedMessages = new BatchMessage(); client.batchTimeout = null; - client.emitInBatch = (event: string | symbol, payload: unknown): void => { + client.emitInBatch = (event: string, payload: SubMessage): void => { emitInBatch(client, event, payload); } this.sockets.set(client.userId, client); @@ -538,7 +545,14 @@ export class IoSocketController { if (thing instanceof User) { const clientUser = this.searchClientByIdOrFail(thing.id); - clientListener.emitInBatch(SocketIoEvent.USER_MOVED, new MessageUserMoved(clientUser.userId, clientUser.position)); + const userMovedMessage = new UserMovedMessage(); + userMovedMessage.setUserid(clientUser.userId); + userMovedMessage.setPosition(toPositionMessage(clientUser.position)); + + const subMessage = new SubMessage(); + subMessage.setUsermovedmessage(userMovedMessage); + + clientListener.emitInBatch(SocketIoEvent.USER_MOVED, subMessage); //console.log("Sending USER_MOVED event"); } else if (thing instanceof Group) { clientListener.emit(SocketIoEvent.GROUP_CREATE_UPDATE, { diff --git a/back/src/Model/Websocket/ExSocketInterface.ts b/back/src/Model/Websocket/ExSocketInterface.ts index 648bbe21..d7edf554 100644 --- a/back/src/Model/Websocket/ExSocketInterface.ts +++ b/back/src/Model/Websocket/ExSocketInterface.ts @@ -3,6 +3,7 @@ import {PointInterface} from "./PointInterface"; import {Identificable} from "./Identificable"; import {TokenInterface} from "../../Controller/AuthenticateController"; import {ViewportInterface} from "_Model/Websocket/ViewportMessage"; +import {BatchMessage, SubMessage} from "../../../../messages/generated/messages_pb"; export interface ExSocketInterface extends Socket, Identificable { token: string; @@ -18,7 +19,7 @@ export interface ExSocketInterface extends Socket, Identificable { /** * Pushes an event that will be sent in the next batch of events */ - emitInBatch: (event: string | symbol, payload: unknown) => void; - batchedMessages: Array<{ event: string | symbol, payload: unknown }>; + emitInBatch: (event: string, payload: SubMessage) => void; + batchedMessages: BatchMessage; batchTimeout: NodeJS.Timeout|null; } diff --git a/back/src/Model/Websocket/MessageUserMoved.ts b/back/src/Model/Websocket/MessageUserMoved.ts deleted file mode 100644 index e08be81b..00000000 --- a/back/src/Model/Websocket/MessageUserMoved.ts +++ /dev/null @@ -1,6 +0,0 @@ -import {PointInterface} from "./PointInterface"; - -export class MessageUserMoved { - constructor(public userId: number, public position: PointInterface) { - } -} diff --git a/back/src/Model/Websocket/ProtobufUtils.ts b/back/src/Model/Websocket/ProtobufUtils.ts new file mode 100644 index 00000000..ff73b7c0 --- /dev/null +++ b/back/src/Model/Websocket/ProtobufUtils.ts @@ -0,0 +1,35 @@ +import {PointInterface} from "./PointInterface"; +import {PositionMessage} from "../../../../messages/generated/messages_pb"; +import {ExSocketInterface} from "_Model/Websocket/ExSocketInterface"; + +export namespace ProtobufUtils { + import Direction = PositionMessage.Direction; + + export function toPositionMessage(point: PointInterface): PositionMessage { + let direction: PositionMessage.DirectionMap[keyof PositionMessage.DirectionMap]; + switch (point.direction) { + case 'up': + direction = Direction.UP; + break; + case 'down': + direction = Direction.DOWN; + break; + case 'left': + direction = Direction.LEFT; + break; + case 'right': + direction = Direction.RIGHT; + break; + default: + throw new Error('unexpected direction'); + } + + const position = new PositionMessage(); + position.setX(point.x); + position.setY(point.y); + position.setMoving(point.moving); + position.setDirection(direction); + + return position; + } +} diff --git a/benchmark/socketio-load-test.yaml b/benchmark/socketio-load-test.yaml index df2f580b..81c7e55e 100644 --- a/benchmark/socketio-load-test.yaml +++ b/benchmark/socketio-load-test.yaml @@ -6,7 +6,7 @@ config: token: "test" phases: - duration: 20 - arrivalRate: 2 + arrivalRate: 3 processor: "./socketioLoadTest.js" scenarios: - name: "Connects and moves player for 20 seconds" @@ -22,7 +22,7 @@ scenarios: - emit: channel: "join-room" data: - roomId: 'global__api.workadventure.localhost/map/files/Floor0/floor0' + roomId: 'global__maps.workadventure.localhost/Floor0/floor0' position: x: 783 y: 170 @@ -35,20 +35,10 @@ scenarios: bottom: 200 - think: 1 - loop: - - function: "setYRandom" + - function: "setUserMovesMessage" - emit: channel: "user-position" - data: - position: - x: "{{ x }}" - y: "{{ y }}" - direction: 'down' - moving: false - viewport: - left: "{{ left }}" - top: "{{ top }}" - right: "{{ right }}" - bottom: "{{ bottom }}" + data: "{{ message }}" - think: 0.2 count: 100 - think: 10 diff --git a/benchmark/socketioLoadTest.js b/benchmark/socketioLoadTest.js index f898d7b9..3f01bab6 100644 --- a/benchmark/socketioLoadTest.js +++ b/benchmark/socketioLoadTest.js @@ -1,5 +1,8 @@ 'use strict'; +require("../messages/generated/messages_pb"); +//import {PositionMessage, UserMovesMessage, ViewportMessage} from "../messages/generated/messages_pb"; + module.exports = { setYRandom }; @@ -18,3 +21,33 @@ function setYRandom(context, events, done) { context.vars.bottom = context.vars.y + 200; return done(); } + +function setUserMovesMessage(context, events, done) { + if (context.angle === undefined) { + context.angle = Math.random() * Math.PI * 2; + } + context.angle += 0.05; + + const x = Math.floor(320 + 1472/2 * (1 + Math.sin(context.angle))); + const y = Math.floor(200 + 1090/2 * (1 + Math.cos(context.angle))); + + const positionMessage = new PositionMessage(); + positionMessage.setX(x); + positionMessage.setY(y); + positionMessage.setDirection(PositionMessage.Direction.UP); + positionMessage.setMoving(false); + + const viewportMessage = new ViewportMessage(); + viewportMessage.setTop(y - 200); + viewportMessage.setBottom(y + 200); + viewportMessage.setLeft(x - 320); + viewportMessage.setRight(x + 320); + + const userMovesMessage = new UserMovesMessage(); + userMovesMessage.setPosition(positionMessage); + userMovesMessage.setViewport(viewportMessage); + + context.vars.message = userMovesMessage.serializeBinary().buffer; + console.log(context.vars.message); + return done(); +} diff --git a/front/src/Connection.ts b/front/src/Connection.ts index b6d4c6ee..78f107fe 100644 --- a/front/src/Connection.ts +++ b/front/src/Connection.ts @@ -2,8 +2,9 @@ import Axios from "axios"; import {API_URL} from "./Enum/EnvironmentVariable"; import {MessageUI} from "./Logger/MessageUI"; import { + BatchMessage, PositionMessage, - SetPlayerDetailsMessage, + SetPlayerDetailsMessage, UserMovedMessage, UserMovesMessage, ViewportMessage } from "../../messages/generated/messages_pb" @@ -132,6 +133,7 @@ export interface RoomJoinedMessageInterface { export class Connection implements Connection { private readonly socket: Socket; private userId: number|null = null; + private batchCallbacks: Map = new Map(); private constructor(token: string) { @@ -149,11 +151,25 @@ export class Connection implements Connection { /** * Messages inside batched messages are extracted and sent to listeners directly. */ - this.socket.on(EventMessage.BATCH, (batchedMessages: BatchedMessageInterface[]) => { - for (const message of batchedMessages) { - const listeners = this.socket.listeners(message.event); + this.socket.on(EventMessage.BATCH, (batchedMessagesBinary: ArrayBuffer) => { + const batchMessage = BatchMessage.deserializeBinary(new Uint8Array(batchedMessagesBinary as ArrayBuffer)); + + for (const message of batchMessage.getPayloadList()) { + let event: string; + let payload; + if (message.hasUsermovedmessage()) { + event = EventMessage.USER_MOVED; + payload = message.getUsermovedmessage(); + } else { + throw new Error('Unexpected batch message type'); + } + + const listeners = this.batchCallbacks.get(event); + if (listeners === undefined) { + continue; + } for (const listener of listeners) { - listener(message.payload); + listener(payload); } } }) @@ -263,8 +279,21 @@ export class Connection implements Connection { this.socket.on(EventMessage.JOIN_ROOM, callback); } - public onUserMoved(callback: (message: MessageUserMovedInterface) => void): void { - this.socket.on(EventMessage.USER_MOVED, callback); + public onUserMoved(callback: (message: UserMovedMessage) => void): void { + this.onBatchMessage(EventMessage.USER_MOVED, callback); + //this.socket.on(EventMessage.USER_MOVED, callback); + } + + /** + * Registers a listener on a message that is part of a batch + */ + private onBatchMessage(eventName: string, callback: Function): void { + let callbacks = this.batchCallbacks.get(eventName); + if (callbacks === undefined) { + callbacks = new Array(); + this.batchCallbacks.set(eventName, callbacks); + } + callbacks.push(callback); } public onUserLeft(callback: (userId: number) => void): void { diff --git a/front/src/Network/ProtobufClientUtils.ts b/front/src/Network/ProtobufClientUtils.ts new file mode 100644 index 00000000..311ba80d --- /dev/null +++ b/front/src/Network/ProtobufClientUtils.ts @@ -0,0 +1,34 @@ +import {PositionMessage} from "../../../messages/generated/messages_pb"; +import {PointInterface} from "../Connection"; + +export namespace ProtobufClientUtils { + import Direction = PositionMessage.Direction; + + export function toPointInterface(position: PositionMessage): PointInterface { + let direction: string; + switch (position.getDirection()) { + case Direction.UP: + direction = 'up'; + break; + case Direction.DOWN: + direction = 'down'; + break; + case Direction.LEFT: + direction = 'left'; + break; + case Direction.RIGHT: + direction = 'right'; + break; + default: + throw new Error("Unexpected direction"); + } + + // sending to all clients in room except sender + return { + x: position.getX(), + y: position.getY(), + direction, + moving: position.getMoving(), + }; + } +} diff --git a/front/src/Phaser/Game/GameScene.ts b/front/src/Phaser/Game/GameScene.ts index 7695294e..bf73b31d 100644 --- a/front/src/Phaser/Game/GameScene.ts +++ b/front/src/Phaser/Game/GameScene.ts @@ -40,6 +40,9 @@ import {FourOFourSceneName} from "../Reconnecting/FourOFourScene"; import {ItemFactoryInterface} from "../Items/ItemFactoryInterface"; import {ActionableItem} from "../Items/ActionableItem"; import {UserInputManager} from "../UserInput/UserInputManager"; +import {UserMovedMessage} from "../../../../messages/generated/messages_pb"; +import {ProtobufClientUtils} from "../../Network/ProtobufClientUtils"; +import toPointInterface = ProtobufClientUtils.toPointInterface; export enum Textures { @@ -213,8 +216,18 @@ export class GameScene extends Phaser.Scene implements CenterListener { this.addPlayer(userMessage); }); - connection.onUserMoved((message: MessageUserMovedInterface) => { - this.updatePlayerPosition(message); + connection.onUserMoved((message: UserMovedMessage) => { + const position = message.getPosition(); + if (position === undefined) { + throw new Error('Position missing from UserMovedMessage'); + } + + const messageUserMoved: MessageUserMovedInterface = { + userId: message.getUserid(), + position: toPointInterface(position) + } + + this.updatePlayerPosition(messageUserMoved); }); connection.onUserLeft((userId: number) => { diff --git a/messages/messages.proto b/messages/messages.proto index cc0449e9..7d8cb0a8 100644 --- a/messages/messages.proto +++ b/messages/messages.proto @@ -1,11 +1,6 @@ syntax = "proto3"; -/*********** CLIENT TO SERVER MESSAGES *************/ - -message SetPlayerDetailsMessage { - string name = 1; - repeated string characterLayers = 2; -} +/*********** PARTIAL MESSAGES **************/ message PositionMessage { int32 x = 1; @@ -27,6 +22,13 @@ message ViewportMessage { int32 bottom = 4; } +/*********** CLIENT TO SERVER MESSAGES *************/ + +message SetPlayerDetailsMessage { + string name = 1; + repeated string characterLayers = 2; +} + message UserMovesMessage { PositionMessage position = 1; ViewportMessage viewport = 2; @@ -39,3 +41,14 @@ message UserMovedMessage { int32 userId = 1; PositionMessage position = 2; } + +message SubMessage { + oneof message { + UserMovedMessage userMovedMessage = 1; + } +} + +message BatchMessage { + string event = 1; + repeated SubMessage payload = 2; +}