import os import subprocess from concurrent import futures from typing import Any, Callable import grpc from grpc import StatusCode from rich.console import Console from mpai_cae_arp.files import File, FileType from mpai_cae_arp.network import arp_pb2_grpc as arp_pb2_grpc from mpai_cae_arp.network.arp_pb2 import ( JobRequest, JobResponse, Contact, InfoResponse, License, ) info = File('config/server.yaml', FileType.YAML).get_content() def try_or_error_response( context, on_success_message: str, on_error_message: str, func: Callable, args, on_success_status: StatusCode = StatusCode.OK, on_error_status: StatusCode = StatusCode.INTERNAL, ) -> tuple[JobResponse, Any]: try: result = func(*args) context.set_code(on_success_status) context.set_details(on_success_message) return JobResponse(status="success", message=on_success_message), result except: context.set_code(on_error_status) context.set_details(on_error_message) return JobResponse(status="error", message=on_error_message), None def error_response(context, status, message): context.set_code(status) context.set_details(message) return JobResponse(status="error", message=message) class VideoAnalyserServicer(arp_pb2_grpc.AIMServicer): def __init__(self, console: Console): self.console = console def getInfo(self, request, context) -> InfoResponse: self.console.log('Received request for AIM info') context.set_code(StatusCode.OK) context.set_details('Success') return InfoResponse( title=info['title'], description=info['description'], version=info['version'], contact=Contact( name=info['contact']['name'], email=info['contact']['email'], ), license=License( name=info['license_info']['name'], url=info['license_info']['url'], ) ) def work(self, request: JobRequest, context): self.console.log('Received request for computation') self.console.log(request) working_dir: str = request.working_dir files_name: str = request.files_name index: int = request.index working_dir = os.path.abspath(working_dir) prog = [ 'bin/video_analyser', '--working-path', working_dir, '--files-name', files_name, '--brands', 'true', '--speed', '7.5' ] process = subprocess.run(prog, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=True) console.print(process.stderr) for line in process.stdout: message = line.decode('utf-8').strip() console.log(message) yield JobResponse(status="success", message=message) def serve(console): server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) arp_pb2_grpc.add_AIMServicer_to_server(VideoAnalyserServicer(console), server) server.add_insecure_port('[::]:50052') server.start() server.wait_for_termination() if __name__ == '__main__': console = Console() console.print('Server started at localhost:50052 :satellite:') serve(console) # working_dir = os.path.abspath('../data') # prog = [ # 'bin/video_analyser', # '--working-path', working_dir, # '--files-name', 'BERIO100.mov', # '--brands', 'true', # '--speed', '7.5' # ] # process = subprocess.run(prog, check=True)