import subprocess from fastapi import FastAPI, Response, status, Query from typing import Annotated from mpai_cae_arp.files import File, FileType from mpai_cae_arp.types.schema import Info info = File("config/server.yaml", FileType.YAML).get_content() app = FastAPI(**info) @app.get("/") def list_all_entrypoints() -> list[str]: """ Get the list of available main routes. """ return ["restore", "description", "docs"] @app.get("/description") def get_server_info() -> Info: """ Retrieve all the informations about the software running on the server. """ return info @app.get("/package", status_code=200) async def predict( response: Response, files_name: Annotated[str, Query( description="Name of the audio/video file to be analyzed without the extension.", example="filename" )], working_path: Annotated[str, Query( description="Path to the directory containing the preservation files.", examples={ "Absolute path": {"value": "/home/user/preservation"}, "Relative path": {"value": "preservation"} } )] = None ): process = [ "python", "src/packager.py", ] config_file = File("config/args.yaml", FileType.YAML) # Read configuration file config = config_file.get_content() # Update configuration file with query parameters config["FilesName"] = files_name if working_path is not None: config["WorkingPath"] = working_path config_file.write_content(config) cprocess = subprocess.run(process, capture_output=True) if cprocess.returncode == 0: return {"message": "Package created successfully"} else: response.status_code = status.HTTP_412_PRECONDITION_FAILED return {"error": { "returncode": cprocess.returncode, "stdout": cprocess.stdout.decode("utf-8"), "stderr": cprocess.stderr.decode("utf-8"), }}