diff --git a/vnrecode/application.py b/vnrecode/application.py index 239bb05..3b6843e 100755 --- a/vnrecode/application.py +++ b/vnrecode/application.py @@ -1,6 +1,7 @@ #!/usr/bin/env python3 from concurrent.futures import ThreadPoolExecutor, as_completed from datetime import datetime +from pathlib import Path import shutil import os @@ -12,11 +13,11 @@ from .utils import Utils class Application: - def __init__(self, params: Params, compress: Compress, printer: Printer, utils: Utils): - self.__params = params - self.__compress = compress.compress - self.__printer = printer - self.__utils = utils + def __init__(self, params_inst: Params, compress_inst: Compress, printer_inst: Printer, utils_inst: Utils): + self.__params = params_inst + self.__compress = compress_inst.compress + self.__printer = printer_inst + self.__utils = utils_inst def run(self): start_time = datetime.now() @@ -24,21 +25,21 @@ class Application: source = self.__params.source - if os.path.exists(self.__params.dest): + if self.__params.dest.exists(): shutil.rmtree(self.__params.dest) self.__printer.info("Creating folders...") for folder, folders, files in os.walk(source): - if not os.path.exists(folder.replace(source, self.__params.dest)): - os.mkdir(folder.replace(source, self.__params.dest)) + output = Path(folder.replace(str(source), str(self.__params.dest))) + if not output.exists(): + os.mkdir(output) - self.__printer.info(f'Compressing "{folder.replace(source, os.path.split(source)[-1])}" folder...') - output = folder.replace(source, self.__params.dest) + self.__printer.info(f'Compressing "{output}" folder...') with ThreadPoolExecutor(max_workers=self.__params.workers) as executor: futures = [ - executor.submit(self.__compress, folder, file, output) - for file in files if os.path.isfile(os.path.join(folder, file)) + executor.submit(self.__compress, Path(folder, file), Path(output)) + for file in files if Path(folder, file).is_file() ] for future in as_completed(futures): future.result() diff --git a/vnrecode/compress.py b/vnrecode/compress.py index d1a2919..7b06988 100644 --- a/vnrecode/compress.py +++ b/vnrecode/compress.py @@ -1,6 +1,6 @@ from ffmpeg import FFmpeg, FFmpegError +from pathlib import Path from PIL import Image -from os import path import pillow_avif from .printer import Printer @@ -11,7 +11,7 @@ from .utils import Utils class File: @staticmethod - def get_type(filename: str) -> str: + def get_type(filename: Path) -> str: extensions = { "audio": ['.aac', '.flac', '.m4a', '.mp3', '.ogg', '.opus', '.raw', '.wav', '.wma'], @@ -21,7 +21,7 @@ class File: } for file_type in extensions: - if path.splitext(filename)[1] in extensions[file_type]: + if filename.suffix in extensions[file_type]: return file_type return "unknown" @@ -43,61 +43,39 @@ class File: class Compress: - def __init__(self, params: Params, printer: Printer, utils: Utils): - self.__params = params - self.__printer = printer - self.__utils = utils + def __init__(self, params_inst: Params, printer_inst: Printer, utils_inst: Utils): + self.__params = params_inst + self.__printer = printer_inst + self.__utils = utils_inst - def audio(self, in_dir: str, file: str, out_dir: str, extension: str) -> str: + def audio(self, input_path: Path, output_dir: Path, extension: str) -> Path: bit_rate = self.__params.audio_bitrate - prefix = self.__utils.get_hash(file) - out_file = path.join(out_dir, f'{prefix}_{path.splitext(file)[0]}.{extension}') + prefix = self.__utils.get_hash(input_path.name) + out_file = Path(output_dir, f'{prefix}_{input_path.stem}.{extension}') try: (FFmpeg() - .input(path.join(in_dir, file)) + .input(input_path) .option("hide_banner") .output(out_file,{"b:a": bit_rate, "loglevel": "error"}) .execute() ) except FFmpegError as e: - self.__utils.catch_unprocessed(path.join(in_dir, file), out_file, e) - self.__printer.files(file, path.splitext(file)[0], extension, f"{bit_rate}") + self.__utils.catch_unprocessed(input_path, out_file, e) + self.__printer.files(input_path, out_file, f"{bit_rate}") return out_file - def video(self, in_dir: str, file: str, out_dir: str, extension: str) -> str: - prefix = self.__utils.get_hash(file) - out_file = path.join(out_dir, f'{prefix}_{path.splitext(file)[0]}.{extension}') - if not self.__params.video_skip: - codec = self.__params.video_codec - crf = self.__params.video_crf - - try: - (FFmpeg() - .input(path.join(in_dir, file)) - .option("hide_banner") - .option("hwaccel", "auto") - .output(out_file,{"codec:v": codec, "v:b": 0, "loglevel": "error"}, crf=crf) - .execute() - ) - self.__printer.files(file, path.splitext(file)[0], extension, codec) - except FFmpegError as e: - self.__utils.catch_unprocessed(path.join(in_dir, file), out_file, e) - else: - self.__utils.copy_unprocessed(path.join(in_dir, file), out_file) - return out_file - - def image(self, in_dir: str, file: str, out_dir: str, extension: str) -> str: + def image(self, input_path: Path, output_dir: Path, extension: str) -> Path: quality = self.__params.image_quality - prefix = self.__utils.get_hash(file) - out_file = path.join(out_dir, f"{prefix}_{path.splitext(file)[0]}.{extension}") + prefix = self.__utils.get_hash(input_path.name) + out_file = Path(output_dir, f"{prefix}_{input_path.stem}.{extension}") try: - image = Image.open(path.join(in_dir, file)) + image = Image.open(input_path) if (extension == "jpg" or extension == "jpeg" or (extension == "webp" and not self.__params.webp_rgba)): if File.has_transparency(image): - self.__printer.warning(f"{file} has transparency. Changing to fallback...") - extension = self.__params.image_fall_ext + self.__printer.warning(f"{input_path.name} has transparency. Changing to fallback...") + out_file = Path(output_dir, f"{prefix}_{input_path.stem}.{self.__params.image_fall_ext}") if File.has_transparency(image): image.convert('RGBA') @@ -113,39 +91,61 @@ class Compress: lossless=self.__params.image_lossless, quality=quality, minimize_size=True) - self.__printer.files(file, path.splitext(file)[0], extension, f"{quality}%") + self.__printer.files(input_path, out_file, f"{quality}%") except Exception as e: - self.__utils.catch_unprocessed(path.join(in_dir, file), out_file, e) + self.__utils.catch_unprocessed(input_path, out_file, e) return out_file - def unknown(self, in_dir: str, file: str, out_dir: str) -> str: - prefix = self.__utils.get_hash(file) - out_file = path.join(out_dir, f"{prefix}_{file}") - if self.__params.force_compress: - self.__printer.unknown_file(file) + def video(self, input_path: Path, output_dir: Path, extension: str) -> Path: + prefix = self.__utils.get_hash(input_path.name) + out_file = Path(output_dir, f'{prefix}_{input_path.stem}.{extension}') + if not self.__params.video_skip: + codec = self.__params.video_codec + crf = self.__params.video_crf + try: (FFmpeg() - .input(path.join(in_dir, file)) + .input(input_path) + .option("hide_banner") + .option("hwaccel", "auto") + .output(out_file,{"codec:v": codec, "v:b": 0, "loglevel": "error"}, crf=crf) + .execute() + ) + self.__printer.files(input_path, out_file, codec) + except FFmpegError as e: + self.__utils.catch_unprocessed(input_path, out_file, e) + else: + self.__utils.copy_unprocessed(input_path, out_file) + return out_file + + def unknown(self, input_path: Path, output_dir: Path) -> Path: + prefix = self.__utils.get_hash(input_path.name) + out_file = Path(output_dir, f"{prefix}_{input_path.name}") + if self.__params.force_compress: + self.__printer.unknown_file(input_path.name) + try: + (FFmpeg() + .input(input_path) .output(out_file) .execute() ) except FFmpegError as e: - self.__utils.catch_unprocessed(path.join(in_dir, file), out_file, e) + self.__utils.catch_unprocessed(input_path, out_file, e) else: - self.__utils.copy_unprocessed(path.join(in_dir, file), out_file) + self.__utils.copy_unprocessed(input_path, out_file) return out_file - def compress(self, dir_: str, filename: str, output: str): - match File.get_type(filename): + def compress(self, source: Path, output: Path): + match File.get_type(source): case "audio": - out_file = self.audio(dir_, filename, output, self.__params.audio_ext) + out_file = self.audio(source, output, self.__params.audio_ext) case "image": - out_file = self.image(dir_, filename, output, self.__params.image_ext) + out_file = self.image(source, output, self.__params.image_ext) case "video": - out_file = self.video(dir_, filename, output, self.__params.video_ext) + out_file = self.video(source, output, self.__params.video_ext) case "unknown": - out_file = self.unknown(dir_, filename, output) + out_file = self.unknown(source, output) - self.__utils.out_rename(out_file, filename) + self.__utils.out_rename(out_file, source.name) self.__printer.bar.update() self.__printer.bar.next() diff --git a/vnrecode/params.py b/vnrecode/params.py index ad4ec59..d4db410 100644 --- a/vnrecode/params.py +++ b/vnrecode/params.py @@ -1,8 +1,8 @@ from argparse import ArgumentParser, Namespace from dataclasses import dataclass +from pathlib import Path from typing import Self import tomllib -import os @dataclass class Params: @@ -28,14 +28,14 @@ class Params: video_ext: str video_codec: str - source: str - dest: str + source: Path + dest: Path @classmethod def setup(cls) -> Self: args = cls.get_args() if args.config is not None: - if os.path.isfile(args.config): + if Path(args.config).is_file(): with open(args.config, "rb") as cfile: config = tomllib.load(cfile) else: @@ -59,8 +59,8 @@ class Params: video_skip = config["VIDEO"]["SkipVideo"] if args.config else args.v_skip video_ext = config["VIDEO"]["Extension"] if args.config else args.v_ext video_codec = config["VIDEO"]["Codec"] if args.config else args.v_codec - source = args.source - dest = f"{source}_compressed" + source = Path(args.source) + dest = Path(f"{args.source}_compressed") return cls( copy_unprocessed, force_compress, mimic_mode, hide_errors, webp_rgba, workers, diff --git a/vnrecode/printer.py b/vnrecode/printer.py index 30aa496..4de4f3c 100644 --- a/vnrecode/printer.py +++ b/vnrecode/printer.py @@ -1,12 +1,12 @@ from progress.bar import IncrementalBar +from pathlib import Path import colorama import sys import os - class Printer: - def __init__(self, folder): + def __init__(self, folder: Path): file_count = 0 for folder, folders, file in os.walk(folder): file_count += len(file) @@ -36,11 +36,9 @@ class Printer: def error(self, string: str): self.bar_print(self.clean_str(f"\r\033[31m\u2715\033[0m {string}\033[49m")) - def files(self, source: str, dest: str, dest_ext: str, comment: str): - source_ext = os.path.splitext(source)[1] - source_name = os.path.splitext(source)[0] + def files(self, source_path: Path, output_path: Path, comment: str): + self.bar_print(self.clean_str(f"\r\033[0;32m\u2713\033[0m \033[0;37m{source_path.stem}\033[0m{source_path.suffix}\033[0;37m -> " + f"{source_path.stem}\033[0m{output_path.suffix}\033[0;37m ({comment})\033[0m")) - self.bar_print(self.clean_str(f"\r\033[0;32m\u2713\033[0m \033[0;37m{source_name}\033[0m{source_ext}\033[0;37m -> {dest}\033[0m.{dest_ext}\033[0;37m ({comment})\033[0m")) - - def unknown_file(self, file): + def unknown_file(self, file: str): self.bar_print(self.clean_str(f"\r* \033[0;33m{file}\033[0m (File will be force compressed via ffmpeg)")) diff --git a/vnrecode/utils.py b/vnrecode/utils.py index bf4e198..2b52f30 100644 --- a/vnrecode/utils.py +++ b/vnrecode/utils.py @@ -1,12 +1,16 @@ from shutil import copyfile +from pathlib import Path import hashlib import sys import os +from vnrecode.printer import Printer +from vnrecode.params import Params + class Utils: - def __init__(self, params_inst, printer_inst): + def __init__(self, params_inst: Params, printer_inst: Printer): self.__errors = 0 self.__params = params_inst self.__printer = printer_inst @@ -18,12 +22,13 @@ class Utils: os.system("pause") @staticmethod - def get_size(directory: str) -> int: + def get_size(directory: Path) -> int: total_size = 0 for folder, folders, files in os.walk(directory): for file in files: - if not os.path.islink(os.path.join(folder, file)): - total_size += os.path.getsize(os.path.join(folder, file)) + path = Path(folder, file) + if not path.is_symlink(): + total_size += path.stat().st_size return total_size @staticmethod @@ -39,7 +44,7 @@ class Utils: for folder, folders, files in os.walk(self.__params.dest): for file in files: - if not os.path.splitext(file)[1].count("(vncopy)"): + if not file.count("(vncopy)"): output_len += 1 if self.__errors != 0: @@ -58,20 +63,20 @@ class Utils: except ZeroDivisionError: self.__printer.warning("Nothing compressed!") - def catch_unprocessed(self, source, output, error): - self.copy_unprocessed(source, error) + def catch_unprocessed(self, input_path: Path, output_path: Path, error): + self.copy_unprocessed(input_path, output_path) self.__errors += 1 if not self.__params.hide_errors: - self.__printer.error(f"File {os.path.split(source)[-1]} can't be processed! Error: {error}") + self.__printer.error(f"File {input_path.name} can't be processed! Error: {error}") - def copy_unprocessed(self, source, output): + def copy_unprocessed(self, input_path: Path, output_path: Path): if self.__params.copy_unprocessed: - copyfile(source, output) - self.__printer.info(f"File {os.path.split(source)[-1]} copied to compressed folder.") + copyfile(input_path, output_path) + self.__printer.info(f"File {input_path.name} copied to compressed folder.") - def catch_duplicates(self, path: str) -> str: - if os.path.exists(path): - new_path = os.path.splitext(path)[0] + "(vncopy)" + os.path.splitext(path)[1] + def catch_duplicates(self, path: Path) -> Path: + if path.exists(): + new_path = Path(path.stem + "(vncopy)" + path.suffix) self.__duplicates.append(new_path) return new_path return path @@ -79,13 +84,13 @@ class Utils: def print_duplicates(self): for filename in self.__duplicates: self.__printer.warning( - f'Duplicate file has been found! Check manually this files - "{filename}", ' - f'"{os.path.splitext(filename)[0] + "(vncopy)" + os.path.splitext(filename)[1]}"' + f'Duplicate file has been found! Check manually this files - "{filename.name}", ' + f'"{filename.stem + "(vncopy)" + filename.suffix}"' ) - def out_rename(self, filename: str, target: str): + def out_rename(self, out_path: Path, target: str): if not self.__params.mimic_mode: - dest_name = self.catch_duplicates(os.path.join(os.path.dirname(filename), target)) - os.rename(filename, dest_name) + dest_name = self.catch_duplicates(Path(out_path.parent, target)) + os.rename(out_path, dest_name) else: - os.rename(filename, os.path.join(os.path.dirname(filename), target)) + os.rename(out_path, Path(out_path.parent, target))