Commit e2f2e18f authored by Matteo's avatar Matteo
Browse files

refactor script with mpai-cae-arp io

parent 92fed87d
clean: clean-build clean-pyc clean-test
clean-build:
rm -fr build/
rm -fr dist/
rm -fr .eggs/
find . -name '*.egg-info' -exec rm -fr {} +
find . -name '*.egg' -exec rm -fr {} +
clean-pyc:
find . -name '*.pyc' -exec rm -f {} +
find . -name '*.pyo' -exec rm -f {} +
find . -name '*~' -exec rm -f {} +
find . -name '__pycache__' -exec rm -fr {} +
clean-test:
rm -fr .tox/
rm -f .coverage
rm -fr htmlcov/
rm -fr .pytest_cache
\ No newline at end of file
...@@ -21,10 +21,12 @@ import json ...@@ -21,10 +21,12 @@ import json
import os import os
import shutil import shutil
import sys import sys
import yaml
from argparse import ArgumentParser, RawTextHelpFormatter from argparse import ArgumentParser, RawTextHelpFormatter
from moviepy.editor import VideoFileClip, AudioFileClip from moviepy.editor import VideoFileClip, AudioFileClip
from mpai_cae_arp.files import File, FileType
from mpai_cae_arp.io import Color, Style, pprint
__author__ = "Nadir Dalla Pozza" __author__ = "Nadir Dalla Pozza"
__copyright__ = "Copyright 2022, Audio Innova S.r.l." __copyright__ = "Copyright 2022, Audio Innova S.r.l."
__credits__ = ["Niccolò Pretto", "Nadir Dalla Pozza", "Sergio Canazza"] __credits__ = ["Niccolò Pretto", "Nadir Dalla Pozza", "Sergio Canazza"]
...@@ -35,22 +37,6 @@ __email__ = "nadir.dallapozza@unipd.it" ...@@ -35,22 +37,6 @@ __email__ = "nadir.dallapozza@unipd.it"
__status__ = "Production" __status__ = "Production"
class CC:
"""
Variables for customizing console colors
"""
PURPLE = '\033[95m'
CYAN = '\033[96m'
DARK_CYAN = '\033[36m'
BLUE = '\033[94m'
GREEN = '\033[92m'
YELLOW = '\033[93m'
RED = '\033[91m'
BOLD = '\033[1m'
UNDERLINE = '\033[4m'
END = '\033[0m'
def get_arguments() -> tuple[str, str]: def get_arguments() -> tuple[str, str]:
""" """
Method to obtain arguments from config/args.yaml file or command line. Method to obtain arguments from config/args.yaml file or command line.
...@@ -87,15 +73,15 @@ def get_arguments() -> tuple[str, str]: ...@@ -87,15 +73,15 @@ def get_arguments() -> tuple[str, str]:
# Read from configuration file # Read from configuration file
config = object config = object
try: try:
config = yaml.safe_load(open('./config/args.yaml', 'r')) config = File('./config/args.yaml', FileType.YAML).get_content()
if 'WORKING_PATH' not in config: if 'WORKING_PATH' not in config:
print(CC.RED + 'WORKING_PATH key not found in config/args.yaml!' + CC.END) pprint('WORKING_PATH key not found in config/args.yaml!', color=Color.RED)
quit(os.EX_CONFIG) quit(os.EX_CONFIG)
if 'FILES_NAME' not in config: if 'FILES_NAME' not in config:
print(CC.RED + 'FILES_NAME key not found in config/args.yaml!' + CC.END) pprint('FILES_NAME key not found in config/args.yaml!', color=Color.RED)
quit(os.EX_CONFIG) quit(os.EX_CONFIG)
except FileNotFoundError: except FileNotFoundError:
print(CC.RED + 'config/args.yaml file not found!' + CC.END) pprint('config/args.yaml file not found!', color=Color.RED)
quit(os.EX_NOINPUT) quit(os.EX_NOINPUT)
working_path = config['WORKING_PATH'] working_path = config['WORKING_PATH']
files_name = config['FILES_NAME'] files_name = config['FILES_NAME']
...@@ -110,17 +96,17 @@ def check_input(working_path: str, files_name: str) -> str: ...@@ -110,17 +96,17 @@ def check_input(working_path: str, files_name: str) -> str:
:return: str representing the path where the files to be processed during the current execution are stored. :return: str representing the path where the files to be processed during the current execution are stored.
""" """
if not os.path.exists(working_path): if not os.path.exists(working_path):
print(CC.RED + 'The specified WORKING_PATH is non-existent!' + CC.END) pprint('The specified WORKING_PATH is non-existent!', color=Color.RED)
quit(os.EX_CONFIG) quit(os.EX_CONFIG)
# Check for temp directory existence # Check for temp directory existence
temp_path = os.path.join(working_path, 'temp') temp_path = os.path.join(working_path, 'temp')
if not os.path.exists(temp_path): if not os.path.exists(temp_path):
print(CC.RED + 'WORKING_PATH structure is not conformant!' + CC.END) pprint('WORKING_PATH structure is not conformant!', color=Color.RED)
quit(os.EX_NOINPUT) quit(os.EX_NOINPUT)
# Check for input directory existence # Check for input directory existence
temp_path = os.path.join(temp_path, files_name) temp_path = os.path.join(temp_path, files_name)
if not os.path.exists(temp_path): if not os.path.exists(temp_path):
print(CC.RED + 'The specified FILES_NAME has no corresponding files!' + CC.END) pprint('The specified FILES_NAME has no corresponding files!', color=Color.RED)
quit(os.EX_NOINPUT) quit(os.EX_NOINPUT)
return temp_path return temp_path
...@@ -137,7 +123,7 @@ def make_dir(dir_path: str) -> bool: ...@@ -137,7 +123,7 @@ def make_dir(dir_path: str) -> bool:
print("Directory '% s' created" % dir_path) print("Directory '% s' created" % dir_path)
return True return True
else: else:
print((CC.PURPLE + "Directory '% s' already exists!" + CC.END) % dir_path) pprint(f"Directory '{dir_path}' already exists!", color=Color.YELLOW)
overwrite = input('Do you want to overwrite it? [y/n]: ') overwrite = input('Do you want to overwrite it? [y/n]: ')
if overwrite.casefold() == 'y': if overwrite.casefold() == 'y':
# Overwrite directory # Overwrite directory
...@@ -146,7 +132,7 @@ def make_dir(dir_path: str) -> bool: ...@@ -146,7 +132,7 @@ def make_dir(dir_path: str) -> bool:
print('%s overwritten' % dir_path) print('%s overwritten' % dir_path)
return True return True
elif overwrite.casefold() != 'n': elif overwrite.casefold() != 'n':
print(CC.RED + 'Unknown command, exiting' + CC.END) pprint('Unknown command, exiting', color=Color.RED)
quit(os.EX_USAGE) quit(os.EX_USAGE)
return False return False
...@@ -156,8 +142,8 @@ def main(): ...@@ -156,8 +142,8 @@ def main():
Main execution method. Main execution method.
:return: exit codes corresponding to the execution status :return: exit codes corresponding to the execution status
""" """
print(CC.BOLD + "\nWelcome to ARP Packager!" + CC.END) pprint('\nWelcome to ARP Packager!', styles=[Style.BOLD])
print("You are using Python version: " + sys.version) print(f"You are using Python version: {sys.version}")
# Get the input from config/args.yaml or command line # Get the input from config/args.yaml or command line
working_path, files_name = get_arguments() working_path, files_name = get_arguments()
...@@ -165,7 +151,7 @@ def main(): ...@@ -165,7 +151,7 @@ def main():
temp_path = check_input(working_path, files_name) temp_path = check_input(working_path, files_name)
# Access Copy Files # Access Copy Files
print('\n' + CC.BOLD + 'Creation of Access Copy Files...' + CC.END) pprint('\nCreation of Access Copy Files...', styles=[Style.BOLD])
# By default, AccessCopyFiles directory is created in the output path... # By default, AccessCopyFiles directory is created in the output path...
acf_dir = 'AccessCopyFiles/' + files_name + '/' acf_dir = 'AccessCopyFiles/' + files_name + '/'
...@@ -177,11 +163,11 @@ def main(): ...@@ -177,11 +163,11 @@ def main():
# Copy RestoredAudioFiles # Copy RestoredAudioFiles
raf_path = os.path.join(temp_path, 'RestoredAudioFiles') raf_path = os.path.join(temp_path, 'RestoredAudioFiles')
if not os.path.exists(raf_path): if not os.path.exists(raf_path):
print((CC.RED + "Restored Audio Files directory '% s' not found!" + CC.END) % raf_path) pprint(f"Restored Audio Files directory '{raf_path}' not found!", color=Color.RED)
quit(os.EX_NOINPUT) quit(os.EX_NOINPUT)
restored_audio_files = os.listdir(raf_path) restored_audio_files = os.listdir(raf_path)
if len(restored_audio_files) == 1: if len(restored_audio_files) == 1:
print((CC.YELLOW + "Restored Audio Files directory '% s' is empty" + CC.END) % raf_path) pprint(f"Restored Audio Files directory '{raf_path}' is empty!", color=Color.YELLOW)
shutil.copytree(raf_path, os.path.join(acf_path, 'RestoredAudioFiles')) shutil.copytree(raf_path, os.path.join(acf_path, 'RestoredAudioFiles'))
print("Restored Audio Files copied") print("Restored Audio Files copied")
...@@ -190,18 +176,18 @@ def main(): ...@@ -190,18 +176,18 @@ def main():
try: try:
shutil.copy2(el_path, acf_path) shutil.copy2(el_path, acf_path)
except FileNotFoundError: except FileNotFoundError:
print((CC.RED + "Editing List file '% s' not found!" + CC.END) % el_path) pprint(f"Editing List file '{el_path}' not found!", color=Color.RED)
quit(os.EX_NOINPUT) quit(os.EX_NOINPUT)
print("Editing List copied") print("Editing List copied")
# Create Irregularity Images archive # Create Irregularity Images archive
ii_path = os.path.join(temp_path, 'IrregularityImages') ii_path = os.path.join(temp_path, 'IrregularityImages')
if not os.path.exists(ii_path): if not os.path.exists(ii_path):
print((CC.RED + "Irregularity Images directory '% s' not found!" + CC.END) % ii_path) pprint(f"Irregularity Images directory '{ii_path}' not found!", color=Color.RED)
quit(os.EX_NOINPUT) quit(os.EX_NOINPUT)
irregularity_images = os.listdir(ii_path) irregularity_images = os.listdir(ii_path)
if len(irregularity_images) == 1: if len(irregularity_images) == 1:
print((CC.YELLOW + "Irregularity Images directory '% s' is empty" + CC.END) % ii_path) pprint(f"Irregularity Images directory '{ii_path}' is empty!", color=Color.YELLOW)
shutil.make_archive(acf_path + 'IrregularityImages', 'zip', temp_path, 'IrregularityImages') shutil.make_archive(acf_path + 'IrregularityImages', 'zip', temp_path, 'IrregularityImages')
print("Irregularity Images archive created") print("Irregularity Images archive created")
...@@ -210,15 +196,15 @@ def main(): ...@@ -210,15 +196,15 @@ def main():
try: try:
shutil.copy2(if_path, os.path.join(acf_path, 'IrregularityFile.json')) shutil.copy2(if_path, os.path.join(acf_path, 'IrregularityFile.json'))
except FileNotFoundError: except FileNotFoundError:
print((CC.RED + "Irregularity File file '% s' not found!" + CC.END) % if_path) pprint(f"Irregularity File file '{if_path}' not found!", color=Color.RED)
quit(os.EX_NOINPUT) quit(os.EX_NOINPUT)
print("Irregularity File copied") print("Irregularity File copied")
# End Access Copy Files # End Access Copy Files
print(CC.GREEN + CC.BOLD + "Success!" + CC.END) pprint('Success!', color=Color.GREEN, styles=[Style.BOLD])
# Preservation Master Files # Preservation Master Files
print('\n' + CC.BOLD + 'Creation of Preservation Master Files...' + CC.END) pprint('\nCreation of Preservation Master Files...', styles=[Style.BOLD])
# By default, PreservationMasterFiles directory is created in the output path... # By default, PreservationMasterFiles directory is created in the output path...
pmf_dir = 'PreservationMasterFiles/' + files_name + '/' pmf_dir = 'PreservationMasterFiles/' + files_name + '/'
...@@ -234,7 +220,7 @@ def main(): ...@@ -234,7 +220,7 @@ def main():
try: try:
shutil.copy2(paf_path, pmf_path + 'PreservationAudioFile.wav') shutil.copy2(paf_path, pmf_path + 'PreservationAudioFile.wav')
except FileNotFoundError: except FileNotFoundError:
print((CC.RED + "Preservation Audio File file '% s' not found!" + CC.END) % paf_path) pprint(f"Preservation Audio File file '{paf_path}' not found!", color=Color.RED)
quit(os.EX_NOINPUT) quit(os.EX_NOINPUT)
print("Preservation Audio File copied") print("Preservation Audio File copied")
...@@ -258,7 +244,7 @@ def main(): ...@@ -258,7 +244,7 @@ def main():
video.write_videofile(pmf_path + 'PreservationAudioVisualFile.mov', bitrate='3000k', codec='mpeg4') video.write_videofile(pmf_path + 'PreservationAudioVisualFile.mov', bitrate='3000k', codec='mpeg4')
print("Preservation Audio-Visual File created") print("Preservation Audio-Visual File created")
except OSError: except OSError:
print((CC.RED + "Preservation Audio-Visual File file '% s' not found!" + CC.END) % pvf_path) pprint(f"Preservation Audio-Visual File file '{pvf_path}' not found!", color=Color.RED)
quit(os.EX_NOINPUT) quit(os.EX_NOINPUT)
# Create Irregularity Images archive (already checked) # Create Irregularity Images archive (already checked)
...@@ -273,7 +259,7 @@ def main(): ...@@ -273,7 +259,7 @@ def main():
print("Irregularity File copied") print("Irregularity File copied")
# End Preservation Master Files # End Preservation Master Files
print(CC.GREEN + CC.BOLD + "Success!" + CC.END + '\n') pprint('Success!\n', color=Color.GREEN, styles=[Style.BOLD])
else: else:
print("\nExit") print("\nExit")
......
...@@ -10,7 +10,7 @@ app = FastAPI(**info) ...@@ -10,7 +10,7 @@ app = FastAPI(**info)
@app.get("/") @app.get("/")
def list_all_entrypoints() -> list[str]: async def list_all_entrypoints() -> list[str]:
""" """
Get the list of available main routes. Get the list of available main routes.
""" """
...@@ -18,7 +18,7 @@ def list_all_entrypoints() -> list[str]: ...@@ -18,7 +18,7 @@ def list_all_entrypoints() -> list[str]:
@app.get("/description") @app.get("/description")
def get_server_info() -> Info: async def get_server_info() -> Info:
""" """
Retrieve all the informations about the software running on the server. Retrieve all the informations about the software running on the server.
""" """
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment