import subprocess from dotenv import load_dotenv import os from dom5game import Dom5game import re load_dotenv() SERVER_PATH = os.getenv("SERVERPATH") def create_server(name, port, channel): if is_port_in_use(port): return "ERROR_PORT_IN_USE" os.mkdir("games/" + name) # print(SERVER_PATH) # print(server_command_builder(name, port, channel)) game = Dom5game(name, channel, [], 0, {}) game.to_json() try: p = subprocess.Popen( server_command_builder(name, port), stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT, ) except Exception as e: return "EXCEPTION OCCURED: " + str(e) return "SUCCESS" # p.wait() def get_nations(): # TODO mod support nation_string = subprocess.check_output([SERVER_PATH, "--listnations"]).decode() nations = {} current_era = None for line in nation_string.splitlines(): line = line.strip() era_match = re.match(r"-+ Era (\d+) -+", line) if era_match: current_era = f"Era {era_match.group(1)}" nations[current_era] = {} continue entry_match = re.match(r"(\d+)\s+(.*?),\s+(.*)", line) if entry_match and current_era: nation_id = int(entry_match.group(1)) name = entry_match.group(2), entry_match.group(3) nations[current_era][nation_id] = {"name": name} return nations # This is terrible. def server_command_builder( name: str, port: int, era: int, closed_slots: list[int] = [], ai_slots: list[tuple[int, int]] = [], client_start: bool = True, teams: list[tuple[int, int, int]] = [], clustered_start: bool = False, team_game: bool = False, mapfile: str = "", random_map: int = 0, research_difficulty: int = 2, random_start_research: bool = True, hof_size: int = 10, global_slots: int = 5, inde_strength: int = 5, magic_sites: int = 40, event_rarity: int = 1, richness: int = 100, resources: int = 100, recruitment: int = 100, supplies: int = 100, masterpass: str = "", start_prov: int = 1, renaming: bool = True, score_graphs: bool = False, no_nation_info: bool = False, no_cheat_det: bool = False, no_artifact_rest: bool = False, story_events: int = 1, new_ai_lvl: int = 2, no_new_ai: bool = False, conq_all: bool = False, thrones: tuple[int, int, int] = (1, 1, 1), required_apoints: int = 0, cataclysm: int = 0, ): if (not ((random_map == 0) ^ (mapfile == ""))) or random_map not in {0, 10, 15, 20}: return "ERROR_MAP" if not (0 < era < 4): return "ERROR_ERA" ai_indexes = [t[1] for t in ai_slots] if not (len(ai_indexes) == len(set(ai_indexes))): return "ERROR_AIS" if bool(set(closed_slots) & set(ai_indexes)): return "ERROR_CLOSED_AIS" if required_apoints == 0: required_apoints = thrones[0] + (2 * thrones[1]) + (3 * thrones[2]) - 1 command = [ SERVER_PATH, "-TS ", name, "--port ", str(port), "--statuspage ", os.getcwd() + "/games/" + name + "/turnstats.html", "--era ", era, ] if team_game: for team in teams: # TODO Check valid inputs command.append( " --team " + str(team[0]) + " " + str(team[1]) + " " + str(team[2]) ) if clustered_start: command.append(" --clustered ") if not client_start: command.append(" --noclientstart ") if cataclysm > 0: command.append(" --cataclysm " + str(cataclysm)) if conq_all: command.append(" --conqall") return command def is_port_in_use(port: int) -> bool: import socket with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: return s.connect_ex(("localhost", port)) == 0