server.py 4.03 KB
Newer Older
Matteo's avatar
update  
Matteo committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
import os
from concurrent import futures
from typing import Any, Callable
import grpc
from grpc import StatusCode
from rich.console import Console

from mpai_cae_arp.files import File, FileType
from mpai_cae_arp.types.irregularity import IrregularityFile, Source
from mpai_cae_arp.network import arp_pb2_grpc as arp_pb2_grpc
from mpai_cae_arp.network.arp_pb2 import (
    JobRequest,
    JobResponse,
    Contact,
    InfoResponse,
    License,
)

info = File('config/server.yaml', FileType.YAML).get_content()


def try_or_error_response(
    context,
    on_success_message: str,
    on_error_message: str,
    func: Callable,
    args,
    on_success_status: StatusCode = StatusCode.OK,
    on_error_status: StatusCode = StatusCode.INTERNAL,
) -> tuple[JobResponse, Any]:
    try:
        result = func(*args)
        context.set_code(on_success_status)
        context.set_details(on_success_message)
        return JobResponse(status="success", message=on_success_message), result
    except:
        context.set_code(on_error_status)
        context.set_details(on_error_message)
        return JobResponse(status="error", message=on_error_message), None


def error_response(context, status, message):
    context.set_code(status)
    context.set_details(message)
    return JobResponse(status="error", message=message)


class VideoAnalyserServicer(arp_pb2_grpc.AIMServicer):

    def __init__(self, console: Console):
        self.console = console

    def getInfo(self, request, context) -> InfoResponse:
        self.console.log('Received request for AIM info')

        context.set_code(StatusCode.OK)
        context.set_details('Success')

        return InfoResponse(
            title=info['title'],
            description=info['description'],
            version=info['version'],
            contact=Contact(
                name=info['contact']['name'],
                email=info['contact']['email'],
            ),
            license=License(
                name=info['license_info']['name'],
                url=info['license_info']['url'],
            )
        )

    def work(self, request: JobRequest, context):

        self.console.log('Received request for computation')
        self.console.log(request)
        
        working_dir: str = request.working_dir
        files_name: str = request.files_name
        index: int = request.index

        video_src = os.path.join(working_dir, "PreservationAudioVisualFile", f"{files_name}.mov")

        temp_dir = os.path.join(working_dir, "temp", files_name)
        audio_irreg_1 = os.path.join(temp_dir, "AudioAnalyser_IrregularityFileOutput_1.json")
        video_irreg_1 = os.path.join(temp_dir, "VideoAnalyser_IrregularityFileOutput_1.json")
        video_irreg_2 = os.path.join(temp_dir, "VideoAnalyser_IrregularityFileOutput_2.json")
        
        response, _ = try_or_error_response(
            context=context,
            func=os.makedirs,
            args=[temp_dir],
            on_success_message="Folders created successfully",
            on_error_message="Unable to create temporary directory, output path already exists",
            on_error_status=StatusCode.ALREADY_EXISTS,
        )
        yield response

        try:
            File(audio_irreg_1, FileType.JSON).write_content(irreg1.to_json())
            context.set_code(StatusCode.OK)
            yield JobResponse(status="success", message="Irregularity file 1 saved to disk")
        except:
            yield error_response(context, StatusCode.INTERNAL, "Failed to save irregularity file 1")


        File(audio_irreg_2, FileType.JSON).write_content(irreg2.to_json())
        yield JobResponse(status="success", message="Irregularity file 2 created")


def serve(console):
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    arp_pb2_grpc.add_AIMServicer_to_server(VideoAnalyserServicer(console), server)
    server.add_insecure_port('[::]:50051')
    server.start()
    server.wait_for_termination()


if __name__ == '__main__':
    console = Console()
    console.print('Server started at localhost:50051 :satellite:')
    serve(console)