from typeguard import typechecked try: from common_utils import msg_builder, rabbitmq except ModuleNotFoundError: from common_module.common_utils import msg_builder, rabbitmq try: from common_utils.logger import create_logger except ModuleNotFoundError: from common_module.common_utils.logger import create_logger LOGGER = create_logger(__name__) @typechecked def run(message_body: dict, worker: rabbitmq.Worker) -> bool: """ ctrler's msg_builder.build_msg. Returns False. """ if not msg_builder.validate_message( message_body, ["external_id", "application", "uid", "job_status", "process_status"], ): LOGGER.error(f"bad msg: {message_body=}") else: # default vals uid = message_body["programme"]["uid"] force = False queue_name = "" if "force" in message_body["programme"]: force_v = message_body["programme"]["force"] LOGGER.debug(f"{type(force_v)=}") if force_v is not None and ( (isinstance(force_v, bool) and force_v) or (isinstance(force_v, str) and force_v.lower() == "true") ): force = True LOGGER.debug(f"{force=}") if message_body["programme"]["job_status"] == "start": queue_name = "queue_module_osd_tvs" else: # job_status = "working" # read val of key "module" if message_body["programme"]["process_status"] == "completed": if message_body["programme"]["module"] == "osd_tvs": queue_name = "queue_module_mmc_aus" elif message_body["programme"]["module"] == "mmc_aus": queue_name = "queue_module_osd_vcd" elif message_body["programme"]["module"] == "osd_vcd": queue_name = "queue_module_mmc_sir" elif message_body["programme"]["module"] == "mmc_sir": queue_name = "queue_module_mmc_asr" elif message_body["programme"]["module"] == "mmc_asr": queue_name = "queue_module_paf_fir" elif message_body["programme"]["module"] == "paf_fir": queue_name = "queue_module_osd_ava" elif message_body["programme"]["module"] == "osd_ava": queue_name = "queue_module_osd_avs" elif message_body["programme"]["module"] == "osd_avs": queue_name = "queue_module_osd_ave" LOGGER.debug( "module " f"{message_body['programme']['module']} " f"{message_body['programme']['process_status']} " "response" ) LOGGER.debug(f"{queue_name=}") if queue_name != "": # send data to analyzer s_job_status = "--" if "job_status" in message_body["programme"]: s_job_status = message_body["programme"]["job_status"] s_process_status = "--" if "process_status" in message_body["programme"]: s_process_status = message_body["programme"]["process_status"] LOGGER.debug( f"[TRACE][{uid}][SEND] queue: {queue_name}" f" -- job_status: {s_job_status}" f" -- process_status: {s_process_status}" ) message_body["programme"]["force"] = force worker.send_messages(queue=queue_name, messages=(message_body,)) else: if "process_status" in message_body["programme"]: if not ( "working" in message_body["programme"]["process_status"] or "failed" in message_body["programme"]["process_status"] ): LOGGER.warn( f"bad msg: {message_body['programme']['process_status']=}" ) return False