diff --git a/vnrecode/application.py b/vnrecode/application.py index c7cb7cd..68fa45f 100755 --- a/vnrecode/application.py +++ b/vnrecode/application.py @@ -4,17 +4,22 @@ from datetime import datetime import shutil import os +from .compress import Compress +from .printer import Printer +from .params import Params +from .utils import Utils + class Application: - def __init__(self, params, compress, printer, utils): + def __init__(self, params: Params, compress: Compress, printer: Printer, utils: Utils): self.params = params self.compress = compress.compress self.printer = printer self.utils = utils - def compress_worker(self, folder, file, source, output): - if os.path.isfile(f'{folder}/{file}'): + def compress_worker(self, folder: str, file: str, source: str, output: str): + if os.path.isfile(os.path.join(folder, file)): self.compress(folder, file, source, output) def run(self): diff --git a/vnrecode/compress.py b/vnrecode/compress.py index 249061e..18fb20e 100644 --- a/vnrecode/compress.py +++ b/vnrecode/compress.py @@ -1,13 +1,17 @@ from ffmpeg import FFmpeg, FFmpegError from PIL import Image +from os import path import pillow_avif -import os + +from .printer import Printer +from .params import Params +from .utils import Utils class File: @staticmethod - def get_type(filename): + def get_type(filename: str) -> str: extensions = { "audio": ['.aac', '.flac', '.m4a', '.mp3', '.ogg', '.opus', '.raw', '.wav', '.wma'], @@ -17,7 +21,7 @@ class File: } for file_type in extensions: - if os.path.splitext(filename)[1] in extensions[file_type]: + if path.splitext(filename)[1] in extensions[file_type]: return file_type return "unknown" @@ -39,45 +43,44 @@ class File: class Compress: - def __init__(self, params, printer, utils): + def __init__(self, params: Params, printer: Printer, utils: Utils): self.params = params self.printer = printer self.utils = utils - def audio(self, in_dir, file, out_dir, extension): + def audio(self, in_dir: str, file: str, out_dir: str, extension: str) -> str: bit_rate = self.params.audio_bitrate - out_file = self.utils.check_duplicates(in_dir, out_dir, f'{os.path.splitext(file)[0]}.{extension}') + out_file = self.utils.check_duplicates(in_dir, out_dir, f'{path.splitext(file)[0]}.{extension}') try: (FFmpeg() - .input(f'{in_dir}/{file}') + .input(path.join(in_dir, file)) .option("hide_banner") .output(out_file,{"b:a": bit_rate, "loglevel": "error"}) .execute() ) except FFmpegError as e: - self.utils.add_unprocessed_file(f'{in_dir}/{file}', f'{out_dir}/{file}') + self.utils.add_unprocessed_file(path.join(in_dir, file), path.join(out_dir, file)) self.utils.errors += 1 if not self.params.hide_errors: self.printer.error(f"File {file} can't be processed! Error: {e}") - self.printer.files(file, os.path.splitext(file)[0], extension, f"{bit_rate}") + self.printer.files(file, path.splitext(file)[0], extension, f"{bit_rate}") return out_file - - def video(self, in_dir, file, out_dir, extension): + def video(self, in_dir: str, file: str, out_dir: str, extension: str) -> str: if not self.params.video_skip: - out_file = self.utils.check_duplicates(in_dir, out_dir, f'{os.path.splitext(file)[0]}.{extension}') + out_file = self.utils.check_duplicates(in_dir, out_dir, f'{path.splitext(file)[0]}.{extension}') codec = self.params.video_codec crf = self.params.video_crf try: (FFmpeg() - .input(f'{in_dir}/{file}') + .input(path.join(in_dir, file)) .option("hide_banner") .option("hwaccel", "auto") .output(out_file,{"codec:v": codec, "v:b": 0, "loglevel": "error"}, crf=crf) .execute() ) - self.printer.files(file, os.path.splitext(file)[0], extension, codec) + self.printer.files(file, path.splitext(file)[0], extension, codec) except FFmpegError as e: self.utils.add_unprocessed_file(f'{in_dir}/{file}', f'{out_dir}/{file}') self.utils.errors += 1 @@ -86,14 +89,13 @@ class Compress: return out_file else: self.utils.add_unprocessed_file(f'{in_dir}/{file}', f'{out_dir}/{file}') - return f'{out_dir}/{os.path.splitext(file)[0]}.{extension}' + return f'{out_dir}/{path.splitext(file)[0]}.{extension}' - - def image(self, in_dir, file, out_dir, extension): + def image(self, in_dir: str, file: str, out_dir: str, extension: str) -> str: quality = self.params.image_quality - out_file = self.utils.check_duplicates(in_dir, out_dir, f"{os.path.splitext(file)[0]}.{extension}") + out_file = self.utils.check_duplicates(in_dir, out_dir, f"{path.splitext(file)[0]}.{extension}") try: - image = Image.open(f'{in_dir}/{file}') + image = Image.open(path.join(in_dir, file)) if (extension == "jpg" or extension == "jpeg" or (extension == "webp" and not self.params.webp_rgba)): @@ -115,36 +117,35 @@ class Compress: lossless=self.params.image_lossless, quality=quality, minimize_size=True) - self.printer.files(file, os.path.splitext(file)[0], extension, f"{quality}%") + self.printer.files(file, path.splitext(file)[0], extension, f"{quality}%") except Exception as e: - self.utils.add_unprocessed_file(f'{in_dir}/{file}', f'{out_dir}/{file}') + self.utils.add_unprocessed_file(path.join(in_dir, file), path.join(out_dir, file)) self.utils.errors += 1 if not self.params.hide_errors: self.printer.error(f"File {file} can't be processed! Error: {e}") return out_file - - def unknown(self, in_dir, filename, out_dir): + def unknown(self, in_dir: str, filename: str, out_dir: str) -> str: if self.params.force_compress: self.printer.unknown_file(filename) out_file = self.utils.check_duplicates(in_dir, out_dir, filename) try: (FFmpeg() - .input(f'{in_dir}/{filename}') + .input(path.join(in_dir, filename)) .output(out_file) .execute() ) except FFmpegError as e: - self.utils.add_unprocessed_file(f'{in_dir}/{filename}', f'{out_dir}/{filename}') + self.utils.add_unprocessed_file(path.join(in_dir, filename), path.join(out_dir, filename)) self.utils.errors += 1 if not self.params.hide_errors: self.printer.error(f"File {filename} can't be processed! Error: {e}") return out_file else: - self.utils.add_unprocessed_file(f'{in_dir}/{filename}', f'{out_dir}/{filename}') - return f'{out_dir}/{filename}' + self.utils.add_unprocessed_file(path.join(in_dir, filename), path.join(out_dir, filename)) + return path.join(out_dir, filename) - def compress(self, _dir, filename, source, output): + def compress(self, _dir: str, filename: str, source: str, output: str): match File.get_type(filename): case "audio": out_file = self.audio(_dir, filename, output, self.params.audio_ext) @@ -156,7 +157,7 @@ class Compress: out_file = self.unknown(_dir, filename, output) if self.params.mimic_mode: - self.utils.mimic_rename(out_file, f'{_dir}/{filename}', source) + self.utils.mimic_rename(out_file, path.join(_dir, filename), source) self.printer.bar.update() self.printer.bar.next() diff --git a/vnrecode/params.py b/vnrecode/params.py index 89fe28f..ea15691 100644 --- a/vnrecode/params.py +++ b/vnrecode/params.py @@ -1,4 +1,4 @@ -from argparse import ArgumentParser +from argparse import ArgumentParser, Namespace from dataclasses import dataclass from typing import Self import tomllib @@ -68,7 +68,7 @@ class Params: ) @staticmethod - def get_args(): + def get_args() -> Namespace: parser = ArgumentParser(prog="vnrecode", description="Python utility to compress Visual Novel Resources" ) diff --git a/vnrecode/printer.py b/vnrecode/printer.py index 6080caf..30aa496 100644 --- a/vnrecode/printer.py +++ b/vnrecode/printer.py @@ -15,7 +15,7 @@ class Printer: # Fill whole string with spaces for cleaning progress bar @staticmethod - def clean_str(string): + def clean_str(string: str) -> str: return string + " " * (os.get_terminal_size().columns - len(string)) @staticmethod @@ -23,20 +23,20 @@ class Printer: if sys.platform == "win32": colorama.init() - def bar_print(self, string): + def bar_print(self, string: str): print(string) self.bar.update() - def info(self, string): + def info(self, string: str): self.bar_print(self.clean_str(f"\r\033[100m- {string}\033[49m")) - def warning(self, string): + def warning(self, string: str): self.bar_print(self.clean_str(f"\r\033[93m!\033[0m {string}\033[49m")) - def error(self, string): + def error(self, string: str): self.bar_print(self.clean_str(f"\r\033[31m\u2715\033[0m {string}\033[49m")) - def files(self, source, dest, dest_ext, comment): + def files(self, source: str, dest: str, dest_ext: str, comment: str): source_ext = os.path.splitext(source)[1] source_name = os.path.splitext(source)[0] diff --git a/vnrecode/utils.py b/vnrecode/utils.py index 8acd4dc..ec5c2c5 100644 --- a/vnrecode/utils.py +++ b/vnrecode/utils.py @@ -17,15 +17,15 @@ class Utils: os.system("pause") @staticmethod - def get_size(directory): + def get_size(directory: str) -> int: total_size = 0 for folder, folders, files in os.walk(directory): for file in files: - if not os.path.islink(f"{folder}/{file}"): - total_size += os.path.getsize(f"{folder}/{file}") + if not os.path.islink(os.path.join(folder, file)): + total_size += os.path.getsize(os.path.join(folder, file)) return total_size - def get_compression(self, source, output): + def get_compression(self, source: str, output: str): try: source = self.get_size(source) output = self.get_size(output) @@ -35,50 +35,50 @@ class Utils: except ZeroDivisionError: self.printer.warning("Nothing compressed!") - def get_compression_status(self, orig_folder): - orig_folder_len = 0 - comp_folder_len = 0 + def get_compression_status(self, source: str): + source_len = 0 + output_len = 0 - for folder, folders, files in os.walk(orig_folder): - orig_folder_len += len(files) + for folder, folders, files in os.walk(source): + source_len += len(files) - for folder, folders, files in os.walk(f'{orig_folder}_compressed'): + for folder, folders, files in os.walk(f'{source}_compressed'): for file in files: if not os.path.splitext(file)[1].count("(copy)"): - comp_folder_len += 1 + output_len += 1 if self.errors != 0: self.printer.warning("Some files failed to compress!") - if orig_folder_len == comp_folder_len: + if source_len == output_len: self.printer.info("Success!") - self.get_compression(orig_folder, f"{orig_folder}_compressed") else: self.printer.warning("Original and compressed folders are not identical!") - self.get_compression(orig_folder, f"{orig_folder}_compressed") + self.get_compression(source, f"{source}_compressed") - def add_unprocessed_file(self, orig_folder, new_folder): + def add_unprocessed_file(self, source: str, output: str): if self.params.copy_unprocessed: - filename = orig_folder.split("/").pop() - copyfile(orig_folder, new_folder) + filename = os.path.split(source)[-1] + copyfile(source, output) self.printer.info(f"File {filename} copied to compressed folder.") - def check_duplicates(self, in_dir, out_dir, filename): - duplicates = glob(f"{in_dir}/{os.path.splitext(filename)[0]}.*") + def check_duplicates(self, source: str, output: str, filename: str) -> str: + duplicates = glob(os.path.join(source, os.path.splitext(filename)[0]+".*")) if len(duplicates) > 1: if filename not in self.duplicates: self.duplicates.append(filename) new_name = os.path.splitext(filename)[0] + "(vncopy)" + os.path.splitext(filename)[1] - return f"{out_dir}/{new_name}" - return f"{out_dir}/{filename}" + return os.path.join(output, new_name) + return os.path.join(output, filename) def print_duplicates(self): for filename in self.duplicates: self.printer.warning( f'Duplicate file has been found! Check manually this files - "{filename}", ' - f'"{os.path.splitext(filename)[0] + "(vncopy)" + os.path.splitext(filename)[1]}"') + f'"{os.path.splitext(filename)[0] + "(vncopy)" + os.path.splitext(filename)[1]}"' + ) - def mimic_rename(self, filename, target, source): + def mimic_rename(self, filename: str, target: str, source: str): if filename.count("(vncopy)"): orig_name = filename.replace("(vncopy)", "") index = self.duplicates.index(os.path.split(orig_name)[-1])