import os import docker from typeguard import typechecked try: from common_utils import msg_builder, rabbitmq except ModuleNotFoundError: from common_module.common_utils import msg_builder, rabbitmq try: from common_utils.logger import create_logger except ModuleNotFoundError: from common_module.common_utils.logger import create_logger # Docker client docker_client = docker.from_env() LOGGER = create_logger(__name__) @typechecked def run_container(module: str) -> int: """ Runs container. """ # read vols # key: path on my machine/host # value of key "bind": path inside container # "ro": read only # "rw": read & write # https://stackoverflow.com/a/74524696 # Docker prepends dir name to named vols to prevent clashes w existing containers! # https://forums.docker.com/t/docker-compose-prepends-directory-name-to-named-volumes/32835/2 # TODO # change prepended str if dir is renamed vols = { "ai-framework_namedin": {"bind": "/in", "mode": "rw"}, "ai-framework_namedout": {"bind": "/out", "mode": "rw"}, } # solver # network: name of the network the container will be connected to at creation time # Docker prepends dir name to network! solver_container = docker_client.containers.run( image=f"{module}:{os.environ['TAG']}", detach=True, auto_remove=False, environment=envs, volumes=vols, device_requests=[ docker.types.DeviceRequest(device_ids=["0"], capabilities=[['gpu']])], network="ai-framework_aifw_net", ) return solver_container.id @typechecked def kill_container(solver_container_id: int): """ Kills container. """ # get container running_containers = docker_client.containers.list( filters={"id": solver_container_id} ) for container in running_containers: container.kill() @typechecked def run(message_body: dict, worker: rabbitmq.Worker) -> bool: """ ctrler's msg_builder.build_msg. Returns False. """ if not msg_builder.validate_message( message_body, ["external_id", "application", "uid", "job_status", "process_status"], ): LOGGER.error(f"bad msg: {message_body=}") else: # default vals uid = message_body["programme"]["uid"] force = False queue_name = "" if "force" in message_body["programme"]: force_v = message_body["programme"]["force"] LOGGER.debug(f"{type(force_v)=}") if force_v is not None and ( (isinstance(force_v, bool) and force_v) or (isinstance(force_v, str) and force_v.lower() == "true") ): force = True LOGGER.debug(f"{force=}") if message_body["programme"]["job_status"] == "start": queue_name = "queue_module_osd_tvs" else: # job_status = "working" # read val of key "module" if message_body["programme"]["process_status"] == "completed": if message_body["programme"]["module"] == "osd_tvs": queue_name = "queue_module_mmc_aus" elif message_body["programme"]["module"] == "mmc_aus": queue_name = "queue_module_osd_vcd" elif message_body["programme"]["module"] == "osd_vcd": queue_name = "queue_module_mmc_sir" elif message_body["programme"]["module"] == "mmc_sir": queue_name = "queue_module_mmc_asr" elif message_body["programme"]["module"] == "mmc_asr": queue_name = "queue_module_paf_fir" elif message_body["programme"]["module"] == "paf_fir": queue_name = "queue_module_osd_ava" elif message_body["programme"]["module"] == "osd_ava": queue_name = "queue_module_osd_avs" elif message_body["programme"]["module"] == "osd_avs": queue_name = "queue_module_osd_ave" LOGGER.debug( "module " f"{message_body['programme']['module']} " f"{message_body['programme']['process_status']} " "response" ) LOGGER.debug(f"{queue_name=}") if queue_name != "": # send data to analyzer s_job_status = "--" if "job_status" in message_body["programme"]: s_job_status = message_body["programme"]["job_status"] s_process_status = "--" if "process_status" in message_body["programme"]: s_process_status = message_body["programme"]["process_status"] LOGGER.debug( f"[TRACE][{uid}][SEND] queue: {queue_name}" f" -- job_status: {s_job_status}" f" -- process_status: {s_process_status}" ) message_body["programme"]["force"] = force worker.send_messages(queue=queue_name, messages=(message_body,)) else: if "process_status" in message_body["programme"]: if not ( "working" in message_body["programme"]["process_status"] or "failed" in message_body["programme"]["process_status"] ): LOGGER.warn( f"bad msg: {message_body['programme']['process_status']=}" ) return False