import os import subprocess from concurrent import futures from typing import Any, Callable import grpc from grpc import StatusCode from rich.console import Console from mpai_cae_arp.files import File, FileType from mpai_cae_arp.network import arp_pb2_grpc as arp_pb2_grpc from mpai_cae_arp.network.arp_pb2 import ( JobRequest, JobResponse, Contact, InfoResponse, License, ) info = File('config/server.yaml', FileType.YAML).get_content() def try_or_error_response( context, on_success_message: str, on_error_message: str, func: Callable, args, on_success_status: StatusCode = StatusCode.OK, on_error_status: StatusCode = StatusCode.INTERNAL, ) -> tuple[JobResponse, Any]: try: result = func(*args) context.set_code(on_success_status) context.set_details(on_success_message) return JobResponse(status="success", message=on_success_message), result except: context.set_code(on_error_status) context.set_details(on_error_message) return JobResponse(status="error", message=on_error_message), None def error_response(context, status, message): context.set_code(status) context.set_details(message) return JobResponse(status="error", message=message) class VideoAnalyserServicer(arp_pb2_grpc.AIMServicer): def __init__(self, console: Console): self.console = console def getInfo(self, request, context) -> InfoResponse: self.console.log('Received request for AIM info') context.set_code(StatusCode.OK) context.set_details('Success') return InfoResponse( title=info['title'], description=info['description'], version=info['version'], contact=Contact( name=info['contact']['name'], email=info['contact']['email'], ), license=License( name=info['license_info']['name'], url=info['license_info']['url'], ) ) def work(self, request: JobRequest, context): self.console.log('Received request for computation') self.console.log(request) working_dir: str = request.working_dir files_name: str = request.files_name index: int = request.index working_dir = os.path.abspath(working_dir) prog = [ 'bin/video_analyser', '--working-path', working_dir, '--files-name', files_name, '--brands', 'true', '--speed', '7.5' ] with subprocess.Popen(prog) as process: yield JobResponse(status="success", message="Process started") yield JobResponse(status="success", message="Process finished") # wait for the process to finish and get its return code # return_code = process.wait() # self.console.log('Process finished with return code:', return_code) # yield JobResponse(status="success", message=f'Process finished with return code: {return_code}') def serve(console): server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) arp_pb2_grpc.add_AIMServicer_to_server(VideoAnalyserServicer(console), server) server.add_insecure_port('[::]:50051') server.start() server.wait_for_termination() if __name__ == '__main__': console = Console() console.print('Server started at localhost:50051 :satellite:') serve(console)