diff --git a/pyproject.toml b/pyproject.toml index 4d8aa85..f2c379f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -27,5 +27,6 @@ dependencies = [ "pillow-avif-plugin==1.4.3", "python-ffmpeg==2.0.12", "progress==1.6", - "colorama==0.4.6" + "colorama==0.4.6", + "argparse~=1.4.0" ] \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index c3c571c..16bd1e6 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,4 +2,5 @@ Pillow==10.3.0 pillow-avif-plugin==1.4.3 python-ffmpeg==2.0.12 progress==1.6 -colorama==0.4.6 \ No newline at end of file +colorama==0.4.6 +argparse~=1.4.0 \ No newline at end of file diff --git a/vnrecode/application.py b/vnrecode/application.py index d32a3cf..912c526 100755 --- a/vnrecode/application.py +++ b/vnrecode/application.py @@ -4,17 +4,22 @@ from datetime import datetime import shutil import os +from .compress import Compress +from .printer import Printer +from .params import Params +from .utils import Utils + class Application: - def __init__(self, params, compress, printer, utils): + def __init__(self, params: Params, compress: Compress, printer: Printer, utils: Utils): self.params = params self.compress = compress.compress self.printer = printer self.utils = utils - def compress_worker(self, folder, file, source, output): - if os.path.isfile(f'{folder}/{file}'): + def compress_worker(self, folder: str, file: str, source: str, output: str): + if os.path.isfile(os.path.join(folder, file)): self.compress(folder, file, source, output) def run(self): @@ -37,11 +42,12 @@ class Application: with ThreadPoolExecutor(max_workers=self.params.workers) as executor: futures = [ executor.submit(self.compress, folder, file, source, output) - for file in files if os.path.isfile(f'{folder}/{file}') + for file in files if os.path.isfile(os.path.join(folder, file)) ] for future in as_completed(futures): future.result() + self.utils.print_duplicates() self.utils.get_compression_status(source) self.utils.sys_pause() print(f"Time taken: {datetime.now() - start_time}") \ No newline at end of file diff --git a/vnrecode/compress.py b/vnrecode/compress.py index 7dbc6fc..18fb20e 100644 --- a/vnrecode/compress.py +++ b/vnrecode/compress.py @@ -1,26 +1,29 @@ from ffmpeg import FFmpeg, FFmpegError from PIL import Image +from os import path import pillow_avif -import os + +from .printer import Printer +from .params import Params +from .utils import Utils class File: @staticmethod - def get_type(filename): - audio_ext = ['.aac', '.flac', '.m4a', '.mp3', '.ogg', '.opus', '.raw', '.wav', '.wma'] - image_ext = ['.apng', '.avif', '.bmp', '.tga', '.tiff', '.dds', '.svg', '.webp', '.jpg', '.jpeg', '.png'] - video_ext = ['.3gp' '.amv', '.avi', '.m2t', '.m4v', '.mkv', '.mov', '.mp4', '.m4v', '.mpeg', '.mpv', - '.webm', '.ogv'] + def get_type(filename: str) -> str: - if os.path.splitext(filename)[1] in audio_ext: - return "audio" - elif os.path.splitext(filename)[1] in image_ext: - return "image" - elif os.path.splitext(filename)[1] in video_ext: - return "video" - else: - return "unknown" + extensions = { + "audio": ['.aac', '.flac', '.m4a', '.mp3', '.ogg', '.opus', '.raw', '.wav', '.wma'], + "image": ['.apng', '.avif', '.bmp', '.tga', '.tiff', '.dds', '.svg', '.webp', '.jpg', '.jpeg', '.png'], + "video": ['.3gp' '.amv', '.avi', '.m2t', '.m4v', '.mkv', '.mov', '.mp4', '.m4v', '.mpeg', '.mpv', + '.webm', '.ogv'] + } + + for file_type in extensions: + if path.splitext(filename)[1] in extensions[file_type]: + return file_type + return "unknown" @staticmethod def has_transparency(img: Image) -> bool: @@ -40,59 +43,59 @@ class File: class Compress: - def __init__(self, params, printer, utils): + def __init__(self, params: Params, printer: Printer, utils: Utils): self.params = params self.printer = printer self.utils = utils - def audio(self, folder, file, target_folder, extension): - bitrate = self.params.audio_bitrate + def audio(self, in_dir: str, file: str, out_dir: str, extension: str) -> str: + bit_rate = self.params.audio_bitrate + out_file = self.utils.check_duplicates(in_dir, out_dir, f'{path.splitext(file)[0]}.{extension}') try: (FFmpeg() - .input(f'{folder}/{file}') + .input(path.join(in_dir, file)) .option("hide_banner") - .output(self.utils.check_duplicates(f'{target_folder}/{os.path.splitext(file)[0]}.{extension}'), - {"b:a": bitrate, "loglevel": "error"}) + .output(out_file,{"b:a": bit_rate, "loglevel": "error"}) .execute() ) except FFmpegError as e: - self.utils.add_unprocessed_file(f'{folder}/{file}', f'{target_folder}/{file}') + self.utils.add_unprocessed_file(path.join(in_dir, file), path.join(out_dir, file)) self.utils.errors += 1 if not self.params.hide_errors: self.printer.error(f"File {file} can't be processed! Error: {e}") - self.printer.files(file, os.path.splitext(file)[0], extension, f"{bitrate}") - return f'{target_folder}/{os.path.splitext(file)[0]}.{extension}' + self.printer.files(file, path.splitext(file)[0], extension, f"{bit_rate}") + return out_file - - def video(self, folder, file, target_folder, extension): + def video(self, in_dir: str, file: str, out_dir: str, extension: str) -> str: if not self.params.video_skip: + out_file = self.utils.check_duplicates(in_dir, out_dir, f'{path.splitext(file)[0]}.{extension}') codec = self.params.video_codec crf = self.params.video_crf try: (FFmpeg() - .input(f'{folder}/{file}') + .input(path.join(in_dir, file)) .option("hide_banner") .option("hwaccel", "auto") - .output(self.utils.check_duplicates(f'{target_folder}/{os.path.splitext(file)[0]}.{extension}'), - {"codec:v": codec, "v:b": 0, "loglevel": "error"}, crf=crf) + .output(out_file,{"codec:v": codec, "v:b": 0, "loglevel": "error"}, crf=crf) .execute() ) - self.printer.files(file, os.path.splitext(file)[0], extension, codec) + self.printer.files(file, path.splitext(file)[0], extension, codec) except FFmpegError as e: - self.utils.add_unprocessed_file(f'{folder}/{file}', f'{target_folder}/{file}') + self.utils.add_unprocessed_file(f'{in_dir}/{file}', f'{out_dir}/{file}') self.utils.errors += 1 if not self.params.hide_errors: self.printer.error(f"File {file} can't be processed! Error: {e}") + return out_file else: - self.utils.add_unprocessed_file(f'{folder}/{file}', f'{target_folder}/{file}') - return f'{target_folder}/{os.path.splitext(file)[0]}.{extension}' + self.utils.add_unprocessed_file(f'{in_dir}/{file}', f'{out_dir}/{file}') + return f'{out_dir}/{path.splitext(file)[0]}.{extension}' - - def image(self, folder, file, target_folder, extension): + def image(self, in_dir: str, file: str, out_dir: str, extension: str) -> str: quality = self.params.image_quality + out_file = self.utils.check_duplicates(in_dir, out_dir, f"{path.splitext(file)[0]}.{extension}") try: - image = Image.open(f'{folder}/{file}') + image = Image.open(path.join(in_dir, file)) if (extension == "jpg" or extension == "jpeg" or (extension == "webp" and not self.params.webp_rgba)): @@ -109,40 +112,40 @@ class Compress: new_size = (int(width / res_downscale), int(height / res_downscale)) image = image.resize(new_size) - image.save(self.utils.check_duplicates(f"{target_folder}/{os.path.splitext(file)[0]}.{extension}"), + image.save(out_file, optimize=True, lossless=self.params.image_lossless, quality=quality, minimize_size=True) - self.printer.files(file, os.path.splitext(file)[0], extension, f"{quality}%") + self.printer.files(file, path.splitext(file)[0], extension, f"{quality}%") except Exception as e: - self.utils.add_unprocessed_file(f'{folder}/{file}', f'{target_folder}/{file}') + self.utils.add_unprocessed_file(path.join(in_dir, file), path.join(out_dir, file)) self.utils.errors += 1 if not self.params.hide_errors: self.printer.error(f"File {file} can't be processed! Error: {e}") - return f'{target_folder}/{os.path.splitext(file)[0]}.{extension}' + return out_file - - def unknown(self, folder, file, target_folder): + def unknown(self, in_dir: str, filename: str, out_dir: str) -> str: if self.params.force_compress: - self.printer.unknown_file(file) + self.printer.unknown_file(filename) + out_file = self.utils.check_duplicates(in_dir, out_dir, filename) try: (FFmpeg() - .input(f'{folder}/{file}') - .output(self.utils.check_duplicates(f'{target_folder}/{file}')) + .input(path.join(in_dir, filename)) + .output(out_file) .execute() ) except FFmpegError as e: - self.utils.add_unprocessed_file(f'{folder}/{file}', f'{target_folder}/{file}') + self.utils.add_unprocessed_file(path.join(in_dir, filename), path.join(out_dir, filename)) self.utils.errors += 1 if not self.params.hide_errors: - self.printer.error(f"File {file} can't be processed! Error: {e}") + self.printer.error(f"File {filename} can't be processed! Error: {e}") + return out_file else: - self.utils.add_unprocessed_file(f'{folder}/{file}', f'{target_folder}/{file}') - return f'{target_folder}/{file}' + self.utils.add_unprocessed_file(path.join(in_dir, filename), path.join(out_dir, filename)) + return path.join(out_dir, filename) - - def compress(self, _dir, filename, source, output): + def compress(self, _dir: str, filename: str, source: str, output: str): match File.get_type(filename): case "audio": out_file = self.audio(_dir, filename, output, self.params.audio_ext) @@ -154,10 +157,7 @@ class Compress: out_file = self.unknown(_dir, filename, output) if self.params.mimic_mode: - try: - os.rename(out_file, f'{_dir}/{filename}'.replace(source, f"{source}_compressed")) - except FileNotFoundError: - self.printer.warning(f"File {out_file} failed to copy to out dir") + self.utils.mimic_rename(out_file, path.join(_dir, filename), source) self.printer.bar.update() self.printer.bar.next() diff --git a/vnrecode/params.py b/vnrecode/params.py index 789cac5..ea15691 100644 --- a/vnrecode/params.py +++ b/vnrecode/params.py @@ -1,4 +1,4 @@ -from argparse import ArgumentParser +from argparse import ArgumentParser, Namespace from dataclasses import dataclass from typing import Self import tomllib @@ -32,30 +32,7 @@ class Params: @classmethod def setup(cls) -> Self: - parser = ArgumentParser(prog="vnrecode", - description="Python utility to compress Visual Novel Resources" - ) - parser.add_argument("source", help="SourceDir") - parser.add_argument("-c", "--config", help="ConfigFile") - parser.add_argument("-u", type=bool, help="CopyUnprocessed", default=True) - parser.add_argument("-f", "--force", type=bool, help="ForceCompress", default=False) - parser.add_argument("-m", "--mimic", type=bool, help="MimicMode", default=True) - parser.add_argument("-s", "--silent", type=bool, help="HideErrors", default=True) - parser.add_argument("--webprgba", type=bool, help="WebpRGBA", default=True) - parser.add_argument("-j", "--jobs", type=int, help="Workers", default=16) - parser.add_argument("-ae", "--aext", help="Audio Extension", default="opus") - parser.add_argument("-ab", "--abit", help="Audio Bitrate", default="128k") - parser.add_argument("-id", "--idown", type=int, help="Image Downscale", default=1) - parser.add_argument("-ie", "--iext", help="Image Extension", default="avif") - parser.add_argument("-ife", "--ifallext", help="Image Fallback Extension", default="webp") - parser.add_argument("-il", "--ilossless", type=bool, help="Image Lossless", default=True) - parser.add_argument("-iq", "--iquality", type=int, help="Image Quality", default=100) - parser.add_argument("--vcrf", help="Video CRF", type=int, default=27) - parser.add_argument("-vs", "--vskip", help="Video Skip", default=False) - parser.add_argument("-ve", "--vext", help="Video Extension", default="webm") - parser.add_argument("-vc", "--vcodec", help="Video Codec", default="libvpx-vp9") - args = parser.parse_args() - + args = cls.get_args() if args.config is not None: if os.path.isfile(args.config): with open(args.config, "rb") as cfile: @@ -64,23 +41,23 @@ class Params: print("Failed to find config. Check `vnrecode -h` to more info") exit(255) - copy_unprocessed = config["FFMPEG"]["CopyUnprocessed"] if args.config else args.u + copy_unprocessed = config["FFMPEG"]["CopyUnprocessed"] if args.config else args.unproc force_compress = config["FFMPEG"]["ForceCompress"] if args.config else args.force mimic_mode = config["FFMPEG"]["MimicMode"] if args.config else args.mimic - hide_errors = config["FFMPEG"]["HideErrors"] if args.config else args.silent + hide_errors = config["FFMPEG"]["HideErrors"] if args.config else args.show_errors workers = config["FFMPEG"]["Workers"] if args.config else args.jobs - webp_rgba = config["FFMPEG"]["WebpRGBA"] if args.config else args.webprgba - audio_ext = config["AUDIO"]["Extension"] if args.config else args.aext - audio_bitrate = config["AUDIO"]["BitRate"] if args.config else args.abit - image_downscale = config["IMAGE"]["ResDownScale"] if args.config else args.idown - image_ext = config["IMAGE"]["Extension"] if args.config else args.iext - image_fall_ext = config["IMAGE"]["FallBackExtension"] if args.config else args.ifallext - image_lossless = config["IMAGE"]["Lossless"] if args.config else args.ilossless - image_quality = config["IMAGE"]["Quality"] if args.config else args.iquality - video_crf = config["VIDEO"]["CRF"] if args.config else args.vcrf - video_skip = config["VIDEO"]["SkipVideo"] if args.config else args.vskip - video_ext = config["VIDEO"]["Extension"] if args.config else args.vext - video_codec = config["VIDEO"]["Codec"] if args.config else args.vcodec + webp_rgba = config["FFMPEG"]["WebpRGBA"] if args.config else args.webp_rgba + audio_ext = config["AUDIO"]["Extension"] if args.config else args.a_ext + audio_bitrate = config["AUDIO"]["BitRate"] if args.config else args.a_bit + image_downscale = config["IMAGE"]["ResDownScale"] if args.config else args.i_down + image_ext = config["IMAGE"]["Extension"] if args.config else args.i_ext + image_fall_ext = config["IMAGE"]["FallBackExtension"] if args.config else args.i_fallext + image_lossless = config["IMAGE"]["Lossless"] if args.config else args.i_lossless + image_quality = config["IMAGE"]["Quality"] if args.config else args.i_quality + video_crf = config["VIDEO"]["CRF"] if args.config else args.v_crf + video_skip = config["VIDEO"]["SkipVideo"] if args.config else args.v_skip + video_ext = config["VIDEO"]["Extension"] if args.config else args.v_ext + video_codec = config["VIDEO"]["Codec"] if args.config else args.v_codec source = args.source return cls( @@ -89,3 +66,30 @@ class Params: image_downscale, image_ext, image_fall_ext, image_lossless, image_quality, video_crf, video_skip, video_ext, video_codec, source ) + + @staticmethod + def get_args() -> Namespace: + parser = ArgumentParser(prog="vnrecode", + description="Python utility to compress Visual Novel Resources" + ) + parser.add_argument("source", help="Directory with game files to recode") + parser.add_argument("-c", dest='config', help="Utility config file") + parser.add_argument("-nu", dest='unproc', action='store_false', help="Don't copy unprocessed") + parser.add_argument("-f", "--force", action='store_true', help="Try to recode unknown files") + parser.add_argument("-nm", "--no-mimic", dest='mimic', action='store_false', help="Disable mimic mode") + parser.add_argument("-v", "--show_errors", action='store_false', help="Show recode errors") + parser.add_argument("--webp-rgb", dest='webp_rgba', action='store_false', help="Recode .webp without alpha channel") + parser.add_argument("-j", "--jobs", type=int, help="Number of threads", default=16) + parser.add_argument("-ae", dest="a_ext", help="Audio extension", default="opus") + parser.add_argument("-ab", dest="a_bit", help="Audio bit rate", default="128k") + parser.add_argument("-id", dest="i_down", type=int, help="Image resolution downscale multiplier", default=1) + parser.add_argument("-ie", dest="i_ext", help="Image extension", default="avif") + parser.add_argument("-ife", dest="i_fallext", help="Image fallback extension", default="webp") + parser.add_argument("-il", dest='i_lossless', action='store_false', help="Image losing compression mode") + parser.add_argument("-iq", dest="i_quality", type=int, help="Image quality", default=100) + parser.add_argument("--v_crf", help="Video CRF number", type=int, default=27) + parser.add_argument("-vs", dest="v_skip", action='store_true', help="Skip video recoding") + parser.add_argument("-ve", dest="v_ext", help="Video extension", default="webm") + parser.add_argument("-vc", dest="v_codec", help="Video codec name", default="libvpx-vp9") + args = parser.parse_args() + return args \ No newline at end of file diff --git a/vnrecode/printer.py b/vnrecode/printer.py index 6080caf..30aa496 100644 --- a/vnrecode/printer.py +++ b/vnrecode/printer.py @@ -15,7 +15,7 @@ class Printer: # Fill whole string with spaces for cleaning progress bar @staticmethod - def clean_str(string): + def clean_str(string: str) -> str: return string + " " * (os.get_terminal_size().columns - len(string)) @staticmethod @@ -23,20 +23,20 @@ class Printer: if sys.platform == "win32": colorama.init() - def bar_print(self, string): + def bar_print(self, string: str): print(string) self.bar.update() - def info(self, string): + def info(self, string: str): self.bar_print(self.clean_str(f"\r\033[100m- {string}\033[49m")) - def warning(self, string): + def warning(self, string: str): self.bar_print(self.clean_str(f"\r\033[93m!\033[0m {string}\033[49m")) - def error(self, string): + def error(self, string: str): self.bar_print(self.clean_str(f"\r\033[31m\u2715\033[0m {string}\033[49m")) - def files(self, source, dest, dest_ext, comment): + def files(self, source: str, dest: str, dest_ext: str, comment: str): source_ext = os.path.splitext(source)[1] source_name = os.path.splitext(source)[0] diff --git a/vnrecode/utils.py b/vnrecode/utils.py index 040da09..3410eae 100644 --- a/vnrecode/utils.py +++ b/vnrecode/utils.py @@ -1,6 +1,11 @@ from shutil import copyfile +from glob import glob import sys import os +import re + +import fnmatch + class Utils: @@ -8,6 +13,7 @@ class Utils: self.errors = 0 self.params = params self.printer = printer + self.duplicates = [] @staticmethod def sys_pause(): @@ -15,15 +21,15 @@ class Utils: os.system("pause") @staticmethod - def get_size(directory): + def get_size(directory: str) -> int: total_size = 0 for folder, folders, files in os.walk(directory): for file in files: - if not os.path.islink(f"{folder}/{file}"): - total_size += os.path.getsize(f"{folder}/{file}") + if not os.path.islink(os.path.join(folder, file)): + total_size += os.path.getsize(os.path.join(folder, file)) return total_size - def get_compression(self, source, output): + def get_compression(self, source: str, output: str): try: source = self.get_size(source) output = self.get_size(output) @@ -33,39 +39,56 @@ class Utils: except ZeroDivisionError: self.printer.warning("Nothing compressed!") - def get_compression_status(self, orig_folder): - orig_folder_len = 0 - comp_folder_len = 0 + def get_compression_status(self, source: str): + source_len = 0 + output_len = 0 - for folder, folders, files in os.walk(orig_folder): - orig_folder_len += len(files) + for folder, folders, files in os.walk(source): + source_len += len(files) - for folder, folders, files in os.walk(f'{orig_folder}_compressed'): + for folder, folders, files in os.walk(f'{source}_compressed'): for file in files: if not os.path.splitext(file)[1].count("(copy)"): - comp_folder_len += 1 + output_len += 1 if self.errors != 0: self.printer.warning("Some files failed to compress!") - if orig_folder_len == comp_folder_len: + if source_len == output_len: self.printer.info("Success!") - self.get_compression(orig_folder, f"{orig_folder}_compressed") else: self.printer.warning("Original and compressed folders are not identical!") - self.get_compression(orig_folder, f"{orig_folder}_compressed") + self.get_compression(source, f"{source}_compressed") - def add_unprocessed_file(self, orig_folder, new_folder): + def add_unprocessed_file(self, source: str, output: str): if self.params.copy_unprocessed: - filename = orig_folder.split("/").pop() - copyfile(orig_folder, new_folder) + filename = os.path.split(source)[-1] + copyfile(source, output) self.printer.info(f"File {filename} copied to compressed folder.") - def check_duplicates(self, new_folder): - filename = new_folder.split().pop() - if os.path.exists(new_folder): + def check_duplicates(self, source: str, output: str, filename: str) -> str: + re_pattern = re.compile(os.path.splitext(filename)[0]+r".[a-zA-Z0-9]+$", re.IGNORECASE) + duplicates = [name for name in os.listdir(source) if re_pattern.match(name)] + + if len(duplicates) > 1: + if filename.lower() not in (duplicate.lower() for duplicate in self.duplicates): + self.duplicates.append(filename) + new_name = os.path.splitext(filename)[0] + "(vncopy)" + os.path.splitext(filename)[1] + return os.path.join(output, new_name) + return os.path.join(output, filename) + + def print_duplicates(self): + for filename in self.duplicates: self.printer.warning( f'Duplicate file has been found! Check manually this files - "{filename}", ' - f'"{os.path.splitext(filename)[0] + "(copy)" + os.path.splitext(filename)[1]}"') - return os.path.splitext(new_folder)[0] + "(copy)" + os.path.splitext(new_folder)[1] - return new_folder + f'"{os.path.splitext(filename)[0] + "(vncopy)" + os.path.splitext(filename)[1]}"' + ) + + def mimic_rename(self, filename: str, target: str, source: str): + if filename.count("(vncopy)"): + orig_name = filename.replace("(vncopy)", "") + index = self.duplicates.index(os.path.split(orig_name)[-1]) + self.duplicates[index] = os.path.split(target)[-1] + target = os.path.splitext(target)[0] + "(vncopy)" + os.path.splitext(target)[1] + + os.rename(filename, target.replace(source, f"{source}_compressed")) \ No newline at end of file