diff --git a/Factorio/mod_updater.py b/Factorio/mod_updater.py new file mode 100644 index 0000000..fba1450 --- /dev/null +++ b/Factorio/mod_updater.py @@ -0,0 +1,733 @@ +#!/usr/bin/python3 +""" +This module provides a simple method to manage updating and installing mods +on a given factorio server. + +It is currently not intended to be imported and instead should be executed +directly as a python script. +""" +import argparse +from collections import OrderedDict +from datetime import datetime +from enum import Enum, auto +import glob +import hashlib +import json +import os +import re +import shutil +import subprocess +import sys + +# External URL processing library +# http://docs.python-requests.org/en/master/user/quickstart/ +import requests + + +def _validate_hash(checksum: str, target: str, bsize: int = 65536) -> bool: + """ + Checks to see if the file specified by target matches the provided sha1 + checksum. + + Keyword Arguments: + checksum -- sha1 digest to be matched + target -- path to the file which must be validated + """ + hasher = hashlib.sha1() + + with open(target, "rb") as target_fp: + block = target_fp.read(bsize) + while len(block) > 0: + hasher.update(block) + block = target_fp.read(bsize) + + return hasher.hexdigest() == checksum + + +def _version_match(installed: str, mod: str): + """Checks if factorio versions are compatible.""" + if installed.startswith("1.") and mod == "0.18": + return True + return installed == mod + + +class ModUpdater: + """ + Internal class managing the current version and state of the mods on this + server. + """ + + MOD_VERSION_PATTERN = r"\d+[.]\d+[.]\d+" + MOD_FILE_PATTERN = "^(.*)_({version})[.]zip$".format(version=MOD_VERSION_PATTERN) + + class Mode(Enum): + """Possible execution modes""" + + LIST = auto() + UPDATE = auto() + + def __init__( + self, + settings_path: str, + data_path: str, + mod_path: str, + fact_path: str, + creds: hash, + title_mode: bool, + ): + """ + Initialize the updater class with all mandatory and optional arguments. + + Keyword arguments: + settings_path -- absolute path to the server-settings.json file + mod_path -- absolute path to the factorio mod directory + fact_ver -- local factorio version + """ + self.mod_server_url = "https://mods.factorio.com" + self.mod_path = mod_path + self.timestamp = datetime.utcnow() + self.title_mode = title_mode + + # Get the credentials to download mods + if settings_path is not None: + self.settings = self._parse_settings(settings_path) + else: + self.settings = {} + if data_path is not None: + self.data = self._parse_settings(data_path) + else: + self.data = {} + + # Parse username and token + if "username" in creds and creds["username"] is not None: + self.username = creds["username"] + elif "username" in self.settings: + self.username = self.settings["username"] + elif "service-username" in self.data: + self.username = self.data["service-username"] + else: + self.token = None + + if "token" in creds and creds["token"] is not None: + self.token = creds["token"] + elif "token" in self.settings: + self.token = self.settings["token"] + elif "service-token" in self.data: + self.token = self.data["service-token"] + else: + self.token = None + + # Ensure username and token were specified + if self.username is None or self.username == "": + errmsg = ( + "error: username not specified in " + + "server-settings.json, player-data.json, or cli!" + ) + print(errmsg, file=sys.stderr) + sys.exit(1) + + if self.token is None or self.token == "": + errmsg = ( + "error: token not specified in " + + "server-settings.json, player-data.json, or cli!" + ) + print(errmsg, file=sys.stderr) + sys.exit(1) + + # Begin processing + self._determine_version(fact_path) + self._parse_mod_list() + self._retrieve_metadata() + self._determine_max_name_lengths() + if self.title_mode: + self.mods = OrderedDict( + sorted(self.mods.items(), key=lambda mod: mod[1]["title"]) + ) + else: + self.mods = OrderedDict(sorted(self.mods.items())) + + def _determine_version(self, fact_path: str): + """Determine the local factorio version""" + if not os.path.exists(fact_path): + errmsg = "error: factorio binary '{fpath_path}' does not exist!" + print(errmsg, file=sys.stderr) + sys.exit(1) + + try: + output = subprocess.check_output( + [fact_path, "--version"], universal_newlines=True + ) + ver_re = re.compile(r"Version: (\d+)[.](\d+)[.](\d+) .*\n", re.RegexFlag.M) + match = ver_re.match(output) + if match: + version = {} + version["major"] = match.group(1) + version["minor"] = match.group(2) + version["patch"] = match.group(3) + version["release"] = "{}.{}".format(version["major"], version["minor"]) + self.fact_version = version + else: + errmsg = "Unable to parse version from:\n{output}".format(output=output) + print(errmsg, file=sys.stderr) + sys.exit("1") + + except subprocess.CalledProcessError as error: + errmsg = ("error: failed to run '{fpath} --version': " "{errstr}").format( + fpath=fact_path, errstr=error.stderr + ) + print(errmsg, file=sys.stderr) + sys.exit(1) + + print( + "Factorio Release: {release}\n".format(release=self.fact_version["release"]) + ) + + @staticmethod + def _parse_settings(config_path: str): + """Process the specified server-settings.json or player-data.json file.""" + try: + with open(config_path, "r") as config_fp: + return json.load(config_fp) + except IOError as error: + errmsg = ("error: failed to open file '{fname}': " "{errstr}").format( + fname=config_path, errstr=error.strerror + ) + print(errmsg, file=sys.stderr) + sys.exit(1) + except json.JSONDecodeError as error: + errmsg = ("error: failed to parse json file '{fname}': " "{errstr}").format( + fname=config_path, errstr=error.msg + ) + print(errmsg, file=sys.stderr) + sys.exit(1) + + def _retrieve_metadata(self): + """ + Pull the latest metadata for each mod from the factorio server + See https://wiki.factorio.com/Mod_portal_API for details + """ + print("Retrieving metadata", end="") + for mod, data in self.mods.items(): + self._retrieve_mod_metadata(mod) + print(".", end="", flush=True) + print("complete!\n") + + # Add missing dependencies to the overall list + while True: + missing_mods = [] + for mod, data in self.mods.items(): + if "missing_deps" in data: + missing_mods.extend(data["missing_deps"]) + + unique_missing = set(missing_mods) + for mod in self.mods.keys(): + if mod in unique_missing: + unique_missing.remove(mod) + if len(unique_missing) == 0: + break + for mod in unique_missing: + entry = {} + entry["enabled"] = True + entry["installed"] = False + self.mods[mod] = entry + self._retrieve_mod_metadata(mod) + print("Info: adding missing dependency {dep}".format(dep=mod)) + + for mod, data in self.mods.items(): + if "metadata" not in data: + warnmsg = ( + "Warning: Unable to retrieve metadata for" + " {mod}, skipped!".format(mod=mod) + ) + print(warnmsg) + + def _retrieve_mod_metadata(self, mod: str): + """ + Attempts to retrieve the metadata for the target mod. If found, the + data object is updated with the 'metadata' key and the 'latest' keys. + """ + data = self.mods[mod] + mod_url = self.mod_server_url + "/api/mods/" + mod + "/full" + with requests.get(mod_url) as req: + if req.status_code == 200: + data["metadata"] = req.json() + + if "metadata" in data: + # Find the latest release for this version of Factorio + matching_releases = [] + for rel in data["metadata"]["releases"]: + rel_ver = rel["info_json"]["factorio_version"] + if _version_match(installed=self.fact_version["release"], mod=rel_ver): + matching_releases.append(rel) + + if len(matching_releases) > 0: + data["latest"] = matching_releases[-1] + + # Add title key + data["title"] = data["metadata"]["title"] + + # Mark whether it's deprecated + data["deprecated"] = data["metadata"].get("deprecated", False) + else: + data["title"] = mod + + # Assume not deprecated if we can't find it + data["deprecated"] = False + + if "latest" in data: + self._resolve_dependencies(mod) + + def _resolve_dependencies(self, mod: str): + """ + Processes the dependency list for this mod and returns an array + listing those which are not currently enabled. Note that this skips + exclusions and optional dependencies. (! and ?) + """ + data = self.mods[mod] + if "latest" in data: + data["missing_deps"] = [] + data["dependencies"] = {} + dependencies = data["latest"]["info_json"]["dependencies"] + # Preparation for future explicit version matching + dep_pattern = re.compile(r"^([\w -]+) ([<=>][=])? (\d+[.]\d+[.]\d+)$") + for dep_entry in dependencies: + match = dep_pattern.fullmatch(dep_entry) + if match: + dep = {} + dep_name = match.group(1) + if dep_name == "base": + continue + dep["argument"] = match.group(2) + dep["version"] = match.group(3) + data["dependencies"][match.group(1)] = dep + + for dep_name in data["dependencies"].keys(): + if dep_name not in self.mods: + data["missing_deps"].append(dep_name) + + def _parse_mod_list(self): + """Process the mod-list.json within mod_path.""" + mod_list_path = os.path.join(self.mod_path, "mod-list.json") + try: + settings_fp = open(mod_list_path, "r") + mod_json = json.load(settings_fp) + self.mods = {} + if "mods" in mod_json: + for mod in mod_json["mods"]: + entry = {} + entry["enabled"] = mod["enabled"] + self.mods[mod["name"]] = entry + else: + print( + "Invalid mod-list.json file \ + '{path}'!".format( + path=mod_list_path + ), + file=sys.stderr, + ) + sys.exit(1) + + # Remove the 'base' mod as it's not relevant to this process + if "base" in self.mods: + del self.mods["base"] + except IOError as error: + errmsg = ("error: failed to open file '{fname}': " "{errstr}").format( + fname=mod_list_path, errstr=error.strerror + ) + print(errmsg, file=sys.stderr) + sys.exit(1) + except json.JSONDecodeError as error: + errmsg = ("error: failed to parse json file '{fname}': " "{errstr}").format( + fname=mod_list_path, errstr=error.msg + ) + print(errmsg, file=sys.stderr) + sys.exit(1) + + # Collect the installed state & versions + self.mod_files = glob.glob("{mod_path}/*.zip".format(mod_path=self.mod_path)) + installed_mods = {} + mod_pattern = re.compile(self.MOD_FILE_PATTERN) + for entry in self.mod_files: + basename = os.path.basename(entry) + match = mod_pattern.fullmatch(basename) + if match: + installed_mods[match.group(1)] = match.group(2) + + for mod, data in self.mods.items(): + if mod in installed_mods: + data["installed"] = True + data["version"] = installed_mods[mod] + else: + data["installed"] = False + + def _update_mod_list(self): + """ + Generates an updated mod-list.json file which takes into account any + newly added dependencies. + """ + # Build the simplified object for json output + mod_list_output = {} + mod_list_output["mods"] = [] + for mod, data in self.mods.items(): + mod_entry = {} + mod_entry["name"] = mod + mod_entry["enabled"] = data["enabled"] + mod_list_output["mods"].append(mod_entry) + + # Rename the old mod-list file with a timestamp + mod_list_path = os.path.join(self.mod_path, "mod-list.json") + mod_list_backup_path = os.path.join( + self.mod_path, + "mod-list.{timestamp}.json".format( + timestamp=self.timestamp.strftime("%Y-%m-%d_%H%M.%S") + ), + ) + try: + os.rename(src=mod_list_path, dst=mod_list_backup_path) + except IOError as error: + errmsg = ( + "error: failed to rename file '{s}' to '{d}': " "{errstr}" + ).format(s=mod_list_path, d=mod_list_backup_path, errstr=error.strerror) + print(errmsg, file=sys.stderr) + sys.exit(1) + + # Store the current mod list + try: + mod_list_fp = open(mod_list_path, "w") + mod_list_fp.write(json.dumps(mod_list_output, indent=2, sort_keys=True)) + except IOError as error: + errmsg = ( + "error: failed to store updated mod list file '{s}': " "{errstr}" + ).format(s=mod_list_path, errstr=error.strerror) + print(errmsg, file=sys.stderr) + sys.exit(1) + + def _determine_max_name_lengths(self): + """Returns the length of the longest mod name""" + max_mod_len = 0 + max_cver_len = 0 + max_lver_len = 0 + for mod, data in self.mods.items(): + mod_len = len(data["title"]) if self.title_mode else len(mod) + max_mod_len = mod_len if mod_len > max_mod_len else max_mod_len + cver_len = len(data["version"]) if data["installed"] else len("Version") + max_cver_len = cver_len if cver_len > max_cver_len else max_cver_len + lver_len = ( + len(data["latest"]["version"]) if "latest" in data else len("Version") + ) + max_lver_len = lver_len if lver_len > max_lver_len else max_lver_len + + self.max_mod_len = max_mod_len + self.max_cver_len = max_cver_len + self.max_lver_len = max_lver_len + self.max_ver_len = max_lver_len if max_lver_len > max_cver_len else max_cver_len + + def list(self): + """Lists the mods installed on this server.""" + # Find the longest mod name + + print( + "{:<{width}}\tenabled\tinstalled\tcurrent_v\tlatest_v".format( + "mod_name", width=self.max_mod_len + ) + ) + for mod, data in self.mods.items(): + print( + "{:<{width}}\t{enbld}\t{inst}\t\t{cver}\t\t{lver}".format( + mod, + enbld=str(data["enabled"]), + inst=str(data["installed"]), + cver=data["version"] if data["installed"] else "N/A", + lver=data["latest"]["version"] if "latest" in data else "N/A", + width=self.max_mod_len, + ) + ) + + def override_credentials(self, username: str, token: str): + """Replaces the values provided in server-settings.json or player-data.json""" + if username is not None: + self.username = username + if token is not None: + self.token = token + + def _print_mod_message( + self, mod: str, version: str, action: str, result: str, message: str, data: hash + ): + """ + Prints a mod status message using the provided parameters. + """ + if data is not None: + title = data["title"] if self.title_mode else mod + else: + title = mod + + output_string = ( + "{title:<{mwidth}}\t{version:<{vwidth}}" + "\t{action:<10}\t{result:<10}\t{message}" + ).format( + title=title, + version=version, + action=action, + result=result, + message=message, + vwidth=self.max_ver_len, + mwidth=self.max_mod_len, + ) + print(output_string) + + def update(self): + """ + Updates all mods currently installed on this server to the latest + release + """ + self._print_mod_message("Mod", "Version", "Action", "Result", "Message", None) + + for mod, data in self.mods.items(): + version = data["version"] if data["installed"] else "N/A" + if "metadata" not in data: + self._print_mod_message( + mod=mod, + version=version, + action="Skip", + result="N/A", + message="Missing metadata, skipping update!", + data=data, + ) + continue + if "latest" not in data: + message = ( + "No release found for factorio '{version}', skipping update!" + ).format(version=self.fact_version["release"]) + self._print_mod_message( + mod=mod, + version=version, + action="Skip", + result="N/A", + message=message, + data=data, + ) + continue + + self._prune_old_releases(mod) + self._download_latest_release(mod) + + # Update the mod list file + self._update_mod_list() + + def _prune_old_releases(self, mod: str): + """ + Deletes any locally installed versions older than the latest release. + + Keyword Arguments: + mod -- name of the target to update + """ + data = self.mods[mod] + latest_version = data["latest"]["version"] + + # Declare the patterns + mod_pattern = re.compile( + "^{mod}_({ver})[.]zip$".format(mod=mod, ver=self.MOD_VERSION_PATTERN) + ) + version_pattern = re.compile( + "^{mod}_{ver}[.]zip$".format(mod=mod, ver=latest_version) + ) + + # Build the parse list + basenames = [os.path.basename(x) for x in self.mod_files] + inst_rels = [x for x in basenames if mod_pattern.fullmatch(x)] + for rel in inst_rels: + if version_pattern.fullmatch(rel): + continue + + match = mod_pattern.fullmatch(rel) + if match: + rel_ver = match.group(1) + else: + rel_ver = "TBD" + + rel_path = os.path.join(self.mod_path, rel) + try: + os.remove(rel_path) + result = "Success" + message = "" + except OSError as error: + message = ("error: failed to remove '{fname}': " "{errstr}").format( + fname=rel_path, errstr=error.strerror + ) + result = "Failure" + + self._print_mod_message( + mod=mod, + version=rel_ver, + action="Remove", + result=result, + message=message, + data=data, + ) + + def _download_latest_release(self, mod: str): + """ + Retrieves the latest version of the specified mod compatible with the + factorio release present on this server. + + Keyword Arguments: + mod -- name of the target to update + """ + data = self.mods[mod] + latest = data["latest"] + target = os.path.join(self.mod_path, latest["file_name"]) + + validate = download = False + + v_cur = data["version"] if "version" in data else "N/A" + v_new = latest["version"] + if data["installed"]: + if v_new == v_cur: + validate = True + else: + message = "Updating from '{v_cur}'".format(v_cur=v_cur) + download = True + else: + message = "Downloading initial release '{v_new}'".format(v_new=v_new) + download = True + + if validate: + if _validate_hash(latest["sha1"], target): + result = "Success" + message = "Deprecated mod" if data["deprecated"] else "" + else: + result = "Failure" + download = True + message = "Validation failed, downloading again" + self._print_mod_message( + mod=mod, + version=v_cur, + action="Validate", + result=result, + message=message, + data=data, + ) + + if download: + creds = {"username": self.username, "token": self.token} + dl_url = self.mod_server_url + latest["download_url"] + with requests.get(dl_url, params=creds, stream=True) as req: + if req.status_code == 200: + with open(target, "wb") as target_file: + shutil.copyfileobj(req.raw, target_file) + target_file.flush() + if _validate_hash(latest["sha1"], target): + result = "Success" + else: + result = "Failure" + message = "Download did not match checksum!" + elif req.status_code == 403: + message = ( + "Failed to download, credentials not accepted. " + + "Check your username/token" + ) + result = "Failure" + else: + message = "Unable to retrieve, status code: " + str(req.status_code) + result = "Failure" + + self._print_mod_message( + mod=mod, + version=v_new, + action="Download", + result=result, + message=message, + data=data, + ) + + +if __name__ == "__main__": + DESC_TEXT = "Updates mods for a target factorio installation" + PARSER = argparse.ArgumentParser(description=DESC_TEXT) + # Username + PARSER.add_argument( + "-u", + "--username", + dest="username", + help="factorio.com username overriding server-settings.json/player-data.json", + ) + # Token + PARSER.add_argument( + "-t", + "--token", + dest="token", + help="factorio.com API token overriding server-settings.json/player-data.json", + ) + # Title format + PARSER.add_argument( + "--print-titles", + dest="title_mode", + default=False, + action="store_true", + help="When true, print the mod title instead of the api name", + ) + # Server Settings + PARSER.add_argument( + "-s", + "--server-settings", + dest="settings_path", + required=False, + help=( + "Absolute path to the server-settings.json file " + + "(overrides player-data.json)" + ), + ) + # Player Data + PARSER.add_argument( + "-d", + "--player-data", + dest="data_path", + required=False, + help="Absolute path to the player-data.json file", + ) + # Factorio mod directory + PARSER.add_argument( + "-m", + "--mod-directory", + dest="mod_path", + required=True, + help="Absolute path to the mod directory", + ) + # Factorio binary absolute path + PARSER.add_argument( + "--fact-path", + dest="fact_path", + required=True, + help="Absolute path to the factorio binary", + ) + # Possible Execution modes + MODE_GROUP = PARSER.add_mutually_exclusive_group(required=True) + MODE_GROUP.add_argument( + "--list", + dest="mode", + action="store_const", + const=ModUpdater.Mode.LIST, + help="List the currently installed mods with versions", + ) + MODE_GROUP.add_argument( + "--update", + dest="mode", + action="store_const", + const=ModUpdater.Mode.UPDATE, + help="Update all mods to their latest release", + ) + + ARGS = PARSER.parse_args() + UPDATER = ModUpdater( + settings_path=ARGS.settings_path, + data_path=ARGS.data_path, + mod_path=ARGS.mod_path, + fact_path=ARGS.fact_path, + creds={"username": ARGS.username, "token": ARGS.token}, + title_mode=ARGS.title_mode, + ) + + if ARGS.mode == ModUpdater.Mode.LIST: + UPDATER.list() + elif ARGS.mode == ModUpdater.Mode.UPDATE: + UPDATER.update()