From f140a1642ebfde198946ad6760c1003c1cb9a8c3 Mon Sep 17 00:00:00 2001 From: Rasmus Steinke Date: Sat, 11 Aug 2012 02:40:34 +0200 Subject: scripts --- bin/fs-uae-netplay-server.py | 725 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 725 insertions(+) create mode 100755 bin/fs-uae-netplay-server.py (limited to 'bin/fs-uae-netplay-server.py') diff --git a/bin/fs-uae-netplay-server.py b/bin/fs-uae-netplay-server.py new file mode 100755 index 0000000..7c9086a --- /dev/null +++ b/bin/fs-uae-netplay-server.py @@ -0,0 +1,725 @@ +""" +FS-UAE Netplay Game Server +Copyright (C) 2012 Frode Solheim + +This library is free software; you can redistribute it and/or modify it +under the terms of the GNU Lesser General Public License as published by +the Free Software Foundation; either version 2.1 of the License, or (at +your option) any later version. + +This library is distributed in the hope that it will be useful, but +WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY +or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public +License for more details. + +You should have received a copy of the GNU Lesser General Public License +along with this library; if not, write to the Free Software Foundation, +Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA +""" +from __future__ import print_function + +import sys +import time +from collections import deque +import socket +import traceback +import threading +import random +from hashlib import sha1 + +SERVER_PROTOCOL_VERSION = 1 +MAX_PLAYERS = 6 + +if sys.version > '3': + PYTHON3 = True +else: + PYTHON3 = False + +if PYTHON3: + def int_to_bytes(number): + return bytes([(number & 0xff000000) >> 24, (number & 0x00ff0000) >> 16, + (number & 0x0000ff00) >> 8, (number & 0x000000ff)]) + def bytes_to_int(m): + return m[0] << 24 | m[1] << 16 | m[2] << 8 | m[3] + def byte_ord(v): + #print("byte_ord", v) + try: + return v[0] + except TypeError: + return v + def byte(v): + return bytes([v]) + server_protocol_version = byte(SERVER_PROTOCOL_VERSION) + +else: + def int_to_bytes(number): + return chr((number & 0xff000000) >> 24) + \ + chr((number & 0x00ff0000) >> 16) + \ + chr((number & 0x0000ff00) >> 8) + \ + chr((number & 0x000000ff)) + def bytes_to_int(m): + return ord(m[0]) << 24 | ord(m[1]) << 16 | ord(m[2]) << 8 | ord(m[3]) + def byte_ord(v): + return ord(v) + def byte(v): + return v + server_protocol_version = chr(SERVER_PROTOCOL_VERSION) + +max_drift = 25 +num_clients = 2 +port = 25100 +host = "0.0.0.0" +game = None +game_password = 0 +launch_timeout = 0 + +def create_game_password(value): + # for python 2 + 3 compatibility + #if not isinstance(value, unicode): + value = value.encode("UTF-8") + #print(repr(value)) + h = sha1() + h.update(b"FSNP") + val = b"" + for v in value: + if byte_ord(v) < 128: + val += byte(v) + #print("update:", repr(val)) + h.update(val) + return bytes_to_int(h.digest()[:4]) + +for arg in sys.argv: + if arg.startswith("--"): + parts = arg[2:].split("=", 1) + if len(parts) == 2: + key, value = parts + key = key.lower() + if key == "port": + port = int(value) + elif key == "players": + num_clients = int(value) + elif key == "password": + #game_password = crc32(value) & 0xffffffff + game_password = create_game_password(value) + print("game password (numeric) is", game_password) + elif key == "launch-timeout": + launch_timeout = int(value) + + +MESSAGE_READY = 0 +MESSAGE_MEM_CHECK = 5 +MESSAGE_RND_CHECK = 6 +MESSAGE_PING = 7 +MESSAGE_PLAYERS = 8 +MESSAGE_PLAYER_TAG_0 = 9 +MESSAGE_PLAYER_TAG_1 = 10 +MESSAGE_PLAYER_TAG_2 = 11 +MESSAGE_PLAYER_TAG_3 = 12 +MESSAGE_PLAYER_TAG_4 = 13 +MESSAGE_PLAYER_TAG_5 = 14 + +MESSAGE_PLAYER_PING = 15 +MESSAGE_PLAYER_LAG = 16 +MESSAGE_SET_PLAYER_TAG = 17 +MESSAGE_PROTOCOL_VERSION = 18 +MESSAGE_EMULATION_VERSION = 19 +MESSAGE_ERROR = 20 +MESSAGE_TEXT = 21 +MESSAGE_SESSION_KEY = 22 + +#MESSAGE_MEM_CHECK = 5 +#MESSAGE_RND_CHECK = 6 +#MESSAGE_PING = 7 + +MESSAGE_MEMCHECK_MASK = (0x80000000 | (MESSAGE_MEM_CHECK << 24)) +MESSAGE_RNDCHECK_MASK = (0x80000000 | (MESSAGE_RND_CHECK << 24)) + +ERROR_PROTOCOL_MISMATCH = 1 +ERROR_WRONG_PASSWORD = 2 +ERROR_CANNOT_RESUME = 3 +ERROR_GAME_ALREADY_STARTED = 4 +ERROR_PLAYER_NUMBER = 5 +ERROR_EMULATOR_MISMATCH = 6 +ERROR_CLIENT_ERROR = 7 +ERROR_MEMORY_DESYNC = 8 +ERROR_RANDOM_DESYNC = 9 +ERROR_SESSION_KEY = 10 +ERROR_GAME_STOPPED = 99 + +def create_ext_message(ext, data): + return 0x80000000 | ext << 24 | (data & 0xffffff) + +class Client: + + def __init__(self, socket, address): + #self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + self.socket = socket + self.messages = [] + self.lock = threading.Lock() + self.address = address + self.ready = 0 + self.tag = b"PLY" + self.player = 0 + self.playing = False + self.frame = 0 + self.frame_times = [0.0 for x in range(100)] + self.lag = 0.0 + self.out_seq = 0 + self.memcheck = [(0, 0) for x in range(100)] + self.rndcheck = [(0, 0) for x in range(100)] + self.ping_sent_at = 0 + self.pings = deque([0 for x in range(10)]) + self.pings_sum = 0 + self.pings_avg = 0 + #self.protocol_version = 0 + self.emulator_version = b"" + self.session_key = 0 + + + self.temp_a = 0 + self.temp_b = 0 + self.temp_c = 0 + + threading.Thread(target=self.__thread_function).start() + + def send_error_message(self, error_num): + print(self, "error", error_num) + message = 0x80000000 | (MESSAGE_ERROR) << 24 | error_num + self.send_message(message) + + def send_message(self, message): + with self.lock: + #if message == 0x87000000: + # #print("queueing %08x" % (message,)) + # #traceback.print_stack() + # self.temp_c += 1 + self.__send_data(int_to_bytes(message)) + + def __send_data(self, data): + #if data[0] == '\x87': + # #print("queueing ping") + # #traceback.print_stack() + # self.temp_c += 1 + self.socket.sendall(data) + + def queue_message(self, message): + with self.lock: + #if message == 0x87000000: + # #print("queueing %08x" % (message,)) + # #traceback.print_stack() + # self.temp_c += 1 + #print("queueing %08x" % (message,)) + #if message & 0x20000000: + # traceback.print_stack() + self.messages.append(int_to_bytes(message)) + if len(self.messages) == 100: + self.__send_queued_messages() + + def queue_bytes(self, message): + with self.lock: + self.messages.append(message) + if len(self.messages) == 100: + self.__send_queued_messages() + + def send_queued_messages(self): + with self.lock: + self.__send_queued_messages() + + def __send_queued_messages(self): + data = b"".join(self.messages) + self.messages[:] = [] + #print("sending", repr(data)) + self.__send_data(data) + + def initialize_client(self): + print("initialize", self) + def read_bytes(num): + data = b"" + for i in range(num): + data = data + self.socket.recv(1) + if not len(data) == num: + raise Exception("did not get {0} bytes".format(num)) + return data + # check header message + data = read_bytes(4) + if data == b"PING": + # connection check only + self.__send_data(b"PONG") + return False + if data != b"FSNP": + print(data) + raise Exception("did not get expected FSNP header") + # check protocol version + data = self.socket.recv(1) + #print(repr(data), repr(server_protocol_version)) + if data != server_protocol_version: + print("protocol mismatch") + self.send_error_message(ERROR_PROTOCOL_MISMATCH) + return False + # check net play password + password = bytes_to_int(read_bytes(4)) + if password != game_password: + print("wrong password") + self.send_error_message(ERROR_WRONG_PASSWORD) + return False + # read emulator version + self.emulator_version = read_bytes(8) + # read player number and session key, session key is checked to + # make sure another client cannot hijack this player slot + self.session_key = bytes_to_int(b"\0" + read_bytes(3)) + self.player = byte_ord(self.socket.recv(1)) + self.tag = read_bytes(3) + # get package sequence number + self.resume_from_packet = bytes_to_int(read_bytes(4)) + + error = game.add_client(self) + if error: + self.send_error_message(error) + return False + + message = create_ext_message(MESSAGE_SESSION_KEY, self.session_key) + self.queue_message(message) + data = (self.player << 8) | num_clients + message = create_ext_message(MESSAGE_PLAYERS, data) + self.queue_message(message) + + game.send_player_tags(self) + self.send_queued_messages() + print("initialize done for", self) + return True + + def __thread_function(self): + try: + try: + if not self.initialize_client(): + print("initialize failed for", self) + return + self.receive_loop() + except Exception: + traceback.print_exc() + game.stop = True + finally: + try: + self.socket.close() + except Exception: + pass + + + def receive_loop(self): + data = b"" + count = 0 + while not game.stop: + data = data + self.socket.recv(4 - count) + count = len(data) + if count == 4: + self.on_message(data) + count = 0 + data = b"" + + def send_ping(self): + with self.lock: + #print("ping?", self.ping_sent_at) + if self.ping_sent_at == 0: + self.temp_a += 1 + self.ping_sent_at = time.time() + message = int_to_bytes(0x80000000 | (7 << 24)) + self.__send_data(message) + assert self.ping_sent_at > 0 + + def on_ping(self): + # may not need lock here + with self.lock: + self.temp_b += 1 + if self.temp_a != self.temp_b: + print(self.temp_a, self.temp_b, self.temp_c) + assert self.ping_sent_at > 0 + t = time.time() + new = (t - self.ping_sent_at) / 1.0 + #print(t, "-", self.ping_sent_at, "=", new) + old = self.pings.popleft() + self.pings.append(new) + #print(self.pings) + self.pings_sum = self.pings_sum - old + new + self.pings_avg = self.pings_sum / len(self.pings) + self.ping_sent_at = 0 + + def on_message(self, m): + message = bytes_to_int(m) + #with game.lock: + # if not self.playing: + # print(self, "is no longer playing/valid, ignoring message") + + if message & 0x80000000: + # ext message + command = (message & 0x7f000000) >> 24 + data = message & 0x00ffffff; + if command == MESSAGE_MEM_CHECK: + self.memcheck[self.frame % 100] = (data, self.frame) + elif command == MESSAGE_RND_CHECK: + self.rndcheck[self.frame % 100] = (data, self.frame) + elif command == MESSAGE_PING: + #print("{0:x}".format(message)) + self.on_ping() + elif command == MESSAGE_TEXT: + print("received text command") + remaining = data + text = b"" + while not game.stop: + part = self.socket.recv(remaining) + count = len(part) + text += part + remaining = remaining - count + if remaining == 0: + game.add_text_message(self, text) + break + + elif message & (1 << 30): + frame = message & 0x3fffffff + #print("received frame", frame) + if frame != self.frame + 1: + print("error, expected frame", self.frame + 1, "got", frame) + #print("received frame", frame) + self.frame = frame + t = time.time() + self.frame_times[self.frame % 100] = t + game_t = game.frame_times[self.frame % 100] + self.lag = t - game_t + + elif message & (1 << 29): + game.add_input_event(self, message & 0x00ffffff) + + else: + print("warning: unknown command received %x" % (message,)) + + def __str__(self): + return "".format(self.player, self.tag, + self.address) + +def create_session_key(): + return random.randint(0, 2**24 - 1) + +class Game: + + def __init__(self, num_clients): + self.started = False + self.frame = 0 + self.time = 0 + self.clients = [] + self.num_clients = 0 + self.frame_times = [0.0 for x in range(100)] + self.lock = threading.Lock() + self.stop = False + self.session_keys = [0 for x in range(MAX_PLAYERS)] + self.emulator_version = b"" + self.verified_frame = -1 + + def __start(self): + if len(self.clients) != num_clients: + printf("error - cannot start until all players have connected") + return + print("{0} clients connected, starting game".format(num_clients)) + self.started = True + threading.Thread(target=self.__thread_function).start() + + def add_client(self, client): + with self.lock: + if client.player == 0xff: + if client.resume_from_packet != 0: + return ERROR_CLIENT_ERROR + if self.started: + return ERROR_GAME_ALREADY_STARTED + client.player = len(self.clients) + if client.player == 0: + self.emulator_version = client.emulator_version + else: + if self.emulator_version != client.emulator_version: + return ERROR_EMULATOR_MISMATCH + client.session_key = create_session_key() + self.session_keys[client.player] = client.session_key + self.clients.append(client) + client.playing = True + if not self.started: + if len(self.clients) == num_clients: + self.__start() + else: + if client.player >= len(self.clients): + return ERROR_PLAYER_NUMBER + if self.session_keys[client.player] != client.session_key: + return ERROR_SESSION_KEY + old_client = self.clients[client.player] + # FIXME: must transfer settings for resuming to work + self.clients[client.player] = client + client.playing = True + + if client.resume_from_packet > 0: + # cannot resume yet... + print("cannot resume at packet", resume_from_packet) + return ERROR_CANNOT_RESUME + return 0 + # FIXME: start the game..? + + def __thread_function(self): + try: + self.__game_loop() + except Exception: + traceback.print_exc() + self.stop = True + + def __send_player_tags(self, send_to_client): + for i, client in enumerate(game.clients): + data = bytes_to_int(b"\0" + client.tag) + message = create_ext_message(MESSAGE_PLAYER_TAG_0 + i, data) + send_to_client.queue_message(message) + + def send_player_tags(self, client): + with game.lock: + self.__send_player_tags(client) + + def __game_loop(self): + with self.lock: + for client in self.clients: + self.__send_player_tags(client) + print("game loop running") + self.time = time.time() + while True: + if self.stop: + print("stopping game loop") + # try to send error message to all players + with self.lock: + for client in self.clients: + try: + client.send_error_message(ERROR_GAME_STOPPED) + except Exception: + traceback.print_exc() + return + self.__game_loop_iteration() + + def __game_loop_iteration(self): + # FIXME: higher precision sleep? + target_time = self.time + 0.02 + t2 = time.time() + diff = target_time - t2 + sleep = diff - 0.001 + if sleep > 0.0: + #print(sleep) + time.sleep(sleep) + self.time = target_time + while time.time() < target_time: + # busy-loop until target time + pass + + with self.lock: + if self.frame % 100 == 0: + self.__send_status() + self.frame += 1 + self.frame_times[self.frame % 100] = time.time() + message = (1 << 30) | self.frame + self.__send_to_clients(message, True) + if self.frame % 10 == 0: + for client in self.clients: + client.send_ping() + if self.frame % 200 == 0: + self.__print_status() + + self.__check_game() + + def __check_game(self): + oldest_frame, frames = self.__check_frame_status() + while oldest_frame > self.verified_frame: + self.check_synchronization(self.verified_frame + 1) + self.verified_frame += 1 + diff = self.frame - oldest_frame + if diff <= max_drift: + # clients have not drifted too far + return + first = True + count = 0; + while diff > 0 and not self.stop: + if first: + first = False + print("---", self.frame, "acked", frames) + elif count % 100 == 0: + print(" ", self.frame, "acked", frames) + time.sleep(0.02) + oldest_frame, frames = self.__check_frame_status() + diff = self.frame - oldest_frame + count += 1 + self.time = time.time() - 0.02 + + def __check_frame_status(self): + oldest_frame = self.frame + frames = [] + with self.lock: + for client in self.clients: + af = client.frame + if af < oldest_frame: + oldest_frame = af + frames.append(af) + return oldest_frame, frames + + def __print_status(self): + for i, client in enumerate(self.clients): + print("{0} f {1:6d} p avg: {2:3d} {3:3d}".format(i, + client.frame, int(client.pings_avg * 1000), + int(client.lag * 1000))) + + def __send_status(self): + for i, client in enumerate(self.clients): + v = int(client.lag * 1000) & 0x0000ffff + message = 0x80000000 | MESSAGE_PLAYER_LAG << 24 | i << 16 | v + self.__send_to_clients(message) + v = int(client.pings_avg * 1000) & 0x0000ffff + message = 0x80000000 | MESSAGE_PLAYER_PING << 24 | i << 16 | v + self.__send_to_clients(message) + + def add_input_event(self, client, input_event): + if not self.started: + # game was not started - ignoring input event + print("game not started, ignoring input event {0:08x}".format( + input_event)) + return + with self.lock: + if not client.playing: + print("client", client, "is no longer valid, dropping msg") + return + # for now, just broadcast out again to all clients + message = (1 << 29) | input_event + self.__send_to_clients(message) + + def add_text_message(self, from_client, text): + print("add text message") + with self.lock: + for client in self.clients: + #if from_client == client: + # continue + message = 0x80000000 | MESSAGE_TEXT << 24 \ + | from_client.player << 16 | len(text) + message = int_to_bytes(message) + text + print("send", repr(message), "to", client) + client.queue_bytes(message) + + def send_to_clients(self, message, force_send=False): + # using a lock here to ensure that send_to_clients can + # be called from multiple threads, but all clients will + # still get messages in expected order + with self.lock: + self.__send_to_clients(message, force_send) + + def __send_to_clients(self, message, force_send=False): + for client in self.clients: + #print("send", message, "to", client) + client.queue_message(message) + if force_send: + client.send_queued_messages() + + def check_synchronization(self, check_frame): + # FIXME: MOVE TO GAME CLASS + # FIXME: ONLY RUN ONCE PER FRAME, not once per frame * clients + #if self.frame == 0: + # return + with game.lock: + #print("received memcheck", data) + #check_frame = self.frame - 1 + if check_frame < 0: + return + index = check_frame % 100 + mem_check_data = [] + rnd_check_data = [] + for client in game.clients: + if client.frame <= check_frame: + # cannot check yet + return + #print(client, client.frame, client.memcheck) + mem_check_data.append((client, client.memcheck[index])) + rnd_check_data.append((client, client.rndcheck[index])) + check = mem_check_data[0][1] + for client, data in mem_check_data: + if check != data: + print("memcheck failed for frame", check_frame) + for c, d in mem_check_data: + print("* {0:08x} f {1:05d} {2}".format(d[0], d[1], c)) + for c in game.clients: + c.send_error_message(ERROR_MEMORY_DESYNC) + c.send_queued_messages() + raise Exception("mem check failed") + check = rnd_check_data[0][1] + for client, data in rnd_check_data: + if check != data: + print("rndcheck failed for frame", check_frame) + for c, d in rnd_check_data: + print("* {0:08x} f {1:05d} {2}".format(d[0], d[1], c)) + for c in game.clients: + c.send_error_message(ERROR_RANDOM_DESYNC) + c.send_queued_messages() + raise Exception("rnd check failed") + +address_map = { + +} + +def accept_client(server_socket): + server_socket.settimeout(1) + client_socket, client_address = server_socket.accept() + #client_socket.setsockopt(socket.SOL_SOCKET, socket.SO_SNDLOWAT, 4) + client_socket.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, 1) + client_socket.settimeout(None) + client = Client(client_socket, client_address) + #client.player = len(game.clients) + 1 + print("Client connected", client) + #game.add_client(client) + #client.start() + #if game.can_start(): + # game.start() + +#stop_accepting = False + +def accept_thread(server_socket): + while not game.stop: + try: + accept_client(server_socket) + except socket.timeout: + pass + except Exception: + traceback.print_exc() + time.sleep(1.0) + +def _run_server(): + global game + game = Game(num_clients) + address = (host, port) + server_socket = socket.socket() + server_socket.bind(address) + server_socket.listen(4) + if address[0] != "0.0.0.0": + print("server listening on", address[0], "port", address[1]) + else: + print("server listening on port", address[1]) + print("want", num_clients, "client(s)") + if num_clients > MAX_PLAYERS: + print("ERROR: max clients are", MAX_PLAYERS) + threading.Thread(target=accept_thread, args=(server_socket,)).start() + t1 = time.time() + while not game.started: + time.sleep(0.1) + if launch_timeout: + t2 = time.time() + if t2 - t1 > launch_timeout: + print("game not started yet, aborting") + game.stop = True + return + #sys.exit() + print("game started") + while not game.stop: + time.sleep(0.1) + +def run_server(): + try: + _run_server() + except Exception: + traceback.print_exc() + except KeyboardInterrupt: + traceback.print_exc() + game.stop = True + +if __name__ == "__main__": + run_server() -- cgit v1.2.3-24-g4f1b