diff --git a/pyproject.toml b/pyproject.toml index f2c379f..4d8aa85 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -27,6 +27,5 @@ dependencies = [ "pillow-avif-plugin==1.4.3", "python-ffmpeg==2.0.12", "progress==1.6", - "colorama==0.4.6", - "argparse~=1.4.0" + "colorama==0.4.6" ] \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index 16bd1e6..c3c571c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,5 +2,4 @@ Pillow==10.3.0 pillow-avif-plugin==1.4.3 python-ffmpeg==2.0.12 progress==1.6 -colorama==0.4.6 -argparse~=1.4.0 \ No newline at end of file +colorama==0.4.6 \ No newline at end of file diff --git a/vnrecode/application.py b/vnrecode/application.py index 912c526..d32a3cf 100755 --- a/vnrecode/application.py +++ b/vnrecode/application.py @@ -4,22 +4,17 @@ from datetime import datetime import shutil import os -from .compress import Compress -from .printer import Printer -from .params import Params -from .utils import Utils - class Application: - def __init__(self, params: Params, compress: Compress, printer: Printer, utils: Utils): + def __init__(self, params, compress, printer, utils): self.params = params self.compress = compress.compress self.printer = printer self.utils = utils - def compress_worker(self, folder: str, file: str, source: str, output: str): - if os.path.isfile(os.path.join(folder, file)): + def compress_worker(self, folder, file, source, output): + if os.path.isfile(f'{folder}/{file}'): self.compress(folder, file, source, output) def run(self): @@ -42,12 +37,11 @@ class Application: with ThreadPoolExecutor(max_workers=self.params.workers) as executor: futures = [ executor.submit(self.compress, folder, file, source, output) - for file in files if os.path.isfile(os.path.join(folder, file)) + for file in files if os.path.isfile(f'{folder}/{file}') ] for future in as_completed(futures): future.result() - self.utils.print_duplicates() self.utils.get_compression_status(source) self.utils.sys_pause() print(f"Time taken: {datetime.now() - start_time}") \ No newline at end of file diff --git a/vnrecode/compress.py b/vnrecode/compress.py index 18fb20e..7dbc6fc 100644 --- a/vnrecode/compress.py +++ b/vnrecode/compress.py @@ -1,29 +1,26 @@ from ffmpeg import FFmpeg, FFmpegError from PIL import Image -from os import path import pillow_avif - -from .printer import Printer -from .params import Params -from .utils import Utils +import os class File: @staticmethod - def get_type(filename: str) -> str: + def get_type(filename): + audio_ext = ['.aac', '.flac', '.m4a', '.mp3', '.ogg', '.opus', '.raw', '.wav', '.wma'] + image_ext = ['.apng', '.avif', '.bmp', '.tga', '.tiff', '.dds', '.svg', '.webp', '.jpg', '.jpeg', '.png'] + video_ext = ['.3gp' '.amv', '.avi', '.m2t', '.m4v', '.mkv', '.mov', '.mp4', '.m4v', '.mpeg', '.mpv', + '.webm', '.ogv'] - extensions = { - "audio": ['.aac', '.flac', '.m4a', '.mp3', '.ogg', '.opus', '.raw', '.wav', '.wma'], - "image": ['.apng', '.avif', '.bmp', '.tga', '.tiff', '.dds', '.svg', '.webp', '.jpg', '.jpeg', '.png'], - "video": ['.3gp' '.amv', '.avi', '.m2t', '.m4v', '.mkv', '.mov', '.mp4', '.m4v', '.mpeg', '.mpv', - '.webm', '.ogv'] - } - - for file_type in extensions: - if path.splitext(filename)[1] in extensions[file_type]: - return file_type - return "unknown" + if os.path.splitext(filename)[1] in audio_ext: + return "audio" + elif os.path.splitext(filename)[1] in image_ext: + return "image" + elif os.path.splitext(filename)[1] in video_ext: + return "video" + else: + return "unknown" @staticmethod def has_transparency(img: Image) -> bool: @@ -43,59 +40,59 @@ class File: class Compress: - def __init__(self, params: Params, printer: Printer, utils: Utils): + def __init__(self, params, printer, utils): self.params = params self.printer = printer self.utils = utils - def audio(self, in_dir: str, file: str, out_dir: str, extension: str) -> str: - bit_rate = self.params.audio_bitrate - out_file = self.utils.check_duplicates(in_dir, out_dir, f'{path.splitext(file)[0]}.{extension}') + def audio(self, folder, file, target_folder, extension): + bitrate = self.params.audio_bitrate try: (FFmpeg() - .input(path.join(in_dir, file)) + .input(f'{folder}/{file}') .option("hide_banner") - .output(out_file,{"b:a": bit_rate, "loglevel": "error"}) + .output(self.utils.check_duplicates(f'{target_folder}/{os.path.splitext(file)[0]}.{extension}'), + {"b:a": bitrate, "loglevel": "error"}) .execute() ) except FFmpegError as e: - self.utils.add_unprocessed_file(path.join(in_dir, file), path.join(out_dir, file)) + self.utils.add_unprocessed_file(f'{folder}/{file}', f'{target_folder}/{file}') self.utils.errors += 1 if not self.params.hide_errors: self.printer.error(f"File {file} can't be processed! Error: {e}") - self.printer.files(file, path.splitext(file)[0], extension, f"{bit_rate}") - return out_file + self.printer.files(file, os.path.splitext(file)[0], extension, f"{bitrate}") + return f'{target_folder}/{os.path.splitext(file)[0]}.{extension}' - def video(self, in_dir: str, file: str, out_dir: str, extension: str) -> str: + + def video(self, folder, file, target_folder, extension): if not self.params.video_skip: - out_file = self.utils.check_duplicates(in_dir, out_dir, f'{path.splitext(file)[0]}.{extension}') codec = self.params.video_codec crf = self.params.video_crf try: (FFmpeg() - .input(path.join(in_dir, file)) + .input(f'{folder}/{file}') .option("hide_banner") .option("hwaccel", "auto") - .output(out_file,{"codec:v": codec, "v:b": 0, "loglevel": "error"}, crf=crf) + .output(self.utils.check_duplicates(f'{target_folder}/{os.path.splitext(file)[0]}.{extension}'), + {"codec:v": codec, "v:b": 0, "loglevel": "error"}, crf=crf) .execute() ) - self.printer.files(file, path.splitext(file)[0], extension, codec) + self.printer.files(file, os.path.splitext(file)[0], extension, codec) except FFmpegError as e: - self.utils.add_unprocessed_file(f'{in_dir}/{file}', f'{out_dir}/{file}') + self.utils.add_unprocessed_file(f'{folder}/{file}', f'{target_folder}/{file}') self.utils.errors += 1 if not self.params.hide_errors: self.printer.error(f"File {file} can't be processed! Error: {e}") - return out_file else: - self.utils.add_unprocessed_file(f'{in_dir}/{file}', f'{out_dir}/{file}') - return f'{out_dir}/{path.splitext(file)[0]}.{extension}' + self.utils.add_unprocessed_file(f'{folder}/{file}', f'{target_folder}/{file}') + return f'{target_folder}/{os.path.splitext(file)[0]}.{extension}' - def image(self, in_dir: str, file: str, out_dir: str, extension: str) -> str: + + def image(self, folder, file, target_folder, extension): quality = self.params.image_quality - out_file = self.utils.check_duplicates(in_dir, out_dir, f"{path.splitext(file)[0]}.{extension}") try: - image = Image.open(path.join(in_dir, file)) + image = Image.open(f'{folder}/{file}') if (extension == "jpg" or extension == "jpeg" or (extension == "webp" and not self.params.webp_rgba)): @@ -112,40 +109,40 @@ class Compress: new_size = (int(width / res_downscale), int(height / res_downscale)) image = image.resize(new_size) - image.save(out_file, + image.save(self.utils.check_duplicates(f"{target_folder}/{os.path.splitext(file)[0]}.{extension}"), optimize=True, lossless=self.params.image_lossless, quality=quality, minimize_size=True) - self.printer.files(file, path.splitext(file)[0], extension, f"{quality}%") + self.printer.files(file, os.path.splitext(file)[0], extension, f"{quality}%") except Exception as e: - self.utils.add_unprocessed_file(path.join(in_dir, file), path.join(out_dir, file)) + self.utils.add_unprocessed_file(f'{folder}/{file}', f'{target_folder}/{file}') self.utils.errors += 1 if not self.params.hide_errors: self.printer.error(f"File {file} can't be processed! Error: {e}") - return out_file + return f'{target_folder}/{os.path.splitext(file)[0]}.{extension}' - def unknown(self, in_dir: str, filename: str, out_dir: str) -> str: + + def unknown(self, folder, file, target_folder): if self.params.force_compress: - self.printer.unknown_file(filename) - out_file = self.utils.check_duplicates(in_dir, out_dir, filename) + self.printer.unknown_file(file) try: (FFmpeg() - .input(path.join(in_dir, filename)) - .output(out_file) + .input(f'{folder}/{file}') + .output(self.utils.check_duplicates(f'{target_folder}/{file}')) .execute() ) except FFmpegError as e: - self.utils.add_unprocessed_file(path.join(in_dir, filename), path.join(out_dir, filename)) + self.utils.add_unprocessed_file(f'{folder}/{file}', f'{target_folder}/{file}') self.utils.errors += 1 if not self.params.hide_errors: - self.printer.error(f"File {filename} can't be processed! Error: {e}") - return out_file + self.printer.error(f"File {file} can't be processed! Error: {e}") else: - self.utils.add_unprocessed_file(path.join(in_dir, filename), path.join(out_dir, filename)) - return path.join(out_dir, filename) + self.utils.add_unprocessed_file(f'{folder}/{file}', f'{target_folder}/{file}') + return f'{target_folder}/{file}' - def compress(self, _dir: str, filename: str, source: str, output: str): + + def compress(self, _dir, filename, source, output): match File.get_type(filename): case "audio": out_file = self.audio(_dir, filename, output, self.params.audio_ext) @@ -157,7 +154,10 @@ class Compress: out_file = self.unknown(_dir, filename, output) if self.params.mimic_mode: - self.utils.mimic_rename(out_file, path.join(_dir, filename), source) + try: + os.rename(out_file, f'{_dir}/{filename}'.replace(source, f"{source}_compressed")) + except FileNotFoundError: + self.printer.warning(f"File {out_file} failed to copy to out dir") self.printer.bar.update() self.printer.bar.next() diff --git a/vnrecode/params.py b/vnrecode/params.py index ea15691..789cac5 100644 --- a/vnrecode/params.py +++ b/vnrecode/params.py @@ -1,4 +1,4 @@ -from argparse import ArgumentParser, Namespace +from argparse import ArgumentParser from dataclasses import dataclass from typing import Self import tomllib @@ -32,7 +32,30 @@ class Params: @classmethod def setup(cls) -> Self: - args = cls.get_args() + parser = ArgumentParser(prog="vnrecode", + description="Python utility to compress Visual Novel Resources" + ) + parser.add_argument("source", help="SourceDir") + parser.add_argument("-c", "--config", help="ConfigFile") + parser.add_argument("-u", type=bool, help="CopyUnprocessed", default=True) + parser.add_argument("-f", "--force", type=bool, help="ForceCompress", default=False) + parser.add_argument("-m", "--mimic", type=bool, help="MimicMode", default=True) + parser.add_argument("-s", "--silent", type=bool, help="HideErrors", default=True) + parser.add_argument("--webprgba", type=bool, help="WebpRGBA", default=True) + parser.add_argument("-j", "--jobs", type=int, help="Workers", default=16) + parser.add_argument("-ae", "--aext", help="Audio Extension", default="opus") + parser.add_argument("-ab", "--abit", help="Audio Bitrate", default="128k") + parser.add_argument("-id", "--idown", type=int, help="Image Downscale", default=1) + parser.add_argument("-ie", "--iext", help="Image Extension", default="avif") + parser.add_argument("-ife", "--ifallext", help="Image Fallback Extension", default="webp") + parser.add_argument("-il", "--ilossless", type=bool, help="Image Lossless", default=True) + parser.add_argument("-iq", "--iquality", type=int, help="Image Quality", default=100) + parser.add_argument("--vcrf", help="Video CRF", type=int, default=27) + parser.add_argument("-vs", "--vskip", help="Video Skip", default=False) + parser.add_argument("-ve", "--vext", help="Video Extension", default="webm") + parser.add_argument("-vc", "--vcodec", help="Video Codec", default="libvpx-vp9") + args = parser.parse_args() + if args.config is not None: if os.path.isfile(args.config): with open(args.config, "rb") as cfile: @@ -41,23 +64,23 @@ class Params: print("Failed to find config. Check `vnrecode -h` to more info") exit(255) - copy_unprocessed = config["FFMPEG"]["CopyUnprocessed"] if args.config else args.unproc + copy_unprocessed = config["FFMPEG"]["CopyUnprocessed"] if args.config else args.u force_compress = config["FFMPEG"]["ForceCompress"] if args.config else args.force mimic_mode = config["FFMPEG"]["MimicMode"] if args.config else args.mimic - hide_errors = config["FFMPEG"]["HideErrors"] if args.config else args.show_errors + hide_errors = config["FFMPEG"]["HideErrors"] if args.config else args.silent workers = config["FFMPEG"]["Workers"] if args.config else args.jobs - webp_rgba = config["FFMPEG"]["WebpRGBA"] if args.config else args.webp_rgba - audio_ext = config["AUDIO"]["Extension"] if args.config else args.a_ext - audio_bitrate = config["AUDIO"]["BitRate"] if args.config else args.a_bit - image_downscale = config["IMAGE"]["ResDownScale"] if args.config else args.i_down - image_ext = config["IMAGE"]["Extension"] if args.config else args.i_ext - image_fall_ext = config["IMAGE"]["FallBackExtension"] if args.config else args.i_fallext - image_lossless = config["IMAGE"]["Lossless"] if args.config else args.i_lossless - image_quality = config["IMAGE"]["Quality"] if args.config else args.i_quality - video_crf = config["VIDEO"]["CRF"] if args.config else args.v_crf - video_skip = config["VIDEO"]["SkipVideo"] if args.config else args.v_skip - video_ext = config["VIDEO"]["Extension"] if args.config else args.v_ext - video_codec = config["VIDEO"]["Codec"] if args.config else args.v_codec + webp_rgba = config["FFMPEG"]["WebpRGBA"] if args.config else args.webprgba + audio_ext = config["AUDIO"]["Extension"] if args.config else args.aext + audio_bitrate = config["AUDIO"]["BitRate"] if args.config else args.abit + image_downscale = config["IMAGE"]["ResDownScale"] if args.config else args.idown + image_ext = config["IMAGE"]["Extension"] if args.config else args.iext + image_fall_ext = config["IMAGE"]["FallBackExtension"] if args.config else args.ifallext + image_lossless = config["IMAGE"]["Lossless"] if args.config else args.ilossless + image_quality = config["IMAGE"]["Quality"] if args.config else args.iquality + video_crf = config["VIDEO"]["CRF"] if args.config else args.vcrf + video_skip = config["VIDEO"]["SkipVideo"] if args.config else args.vskip + video_ext = config["VIDEO"]["Extension"] if args.config else args.vext + video_codec = config["VIDEO"]["Codec"] if args.config else args.vcodec source = args.source return cls( @@ -66,30 +89,3 @@ class Params: image_downscale, image_ext, image_fall_ext, image_lossless, image_quality, video_crf, video_skip, video_ext, video_codec, source ) - - @staticmethod - def get_args() -> Namespace: - parser = ArgumentParser(prog="vnrecode", - description="Python utility to compress Visual Novel Resources" - ) - parser.add_argument("source", help="Directory with game files to recode") - parser.add_argument("-c", dest='config', help="Utility config file") - parser.add_argument("-nu", dest='unproc', action='store_false', help="Don't copy unprocessed") - parser.add_argument("-f", "--force", action='store_true', help="Try to recode unknown files") - parser.add_argument("-nm", "--no-mimic", dest='mimic', action='store_false', help="Disable mimic mode") - parser.add_argument("-v", "--show_errors", action='store_false', help="Show recode errors") - parser.add_argument("--webp-rgb", dest='webp_rgba', action='store_false', help="Recode .webp without alpha channel") - parser.add_argument("-j", "--jobs", type=int, help="Number of threads", default=16) - parser.add_argument("-ae", dest="a_ext", help="Audio extension", default="opus") - parser.add_argument("-ab", dest="a_bit", help="Audio bit rate", default="128k") - parser.add_argument("-id", dest="i_down", type=int, help="Image resolution downscale multiplier", default=1) - parser.add_argument("-ie", dest="i_ext", help="Image extension", default="avif") - parser.add_argument("-ife", dest="i_fallext", help="Image fallback extension", default="webp") - parser.add_argument("-il", dest='i_lossless', action='store_false', help="Image losing compression mode") - parser.add_argument("-iq", dest="i_quality", type=int, help="Image quality", default=100) - parser.add_argument("--v_crf", help="Video CRF number", type=int, default=27) - parser.add_argument("-vs", dest="v_skip", action='store_true', help="Skip video recoding") - parser.add_argument("-ve", dest="v_ext", help="Video extension", default="webm") - parser.add_argument("-vc", dest="v_codec", help="Video codec name", default="libvpx-vp9") - args = parser.parse_args() - return args \ No newline at end of file diff --git a/vnrecode/printer.py b/vnrecode/printer.py index 30aa496..6080caf 100644 --- a/vnrecode/printer.py +++ b/vnrecode/printer.py @@ -15,7 +15,7 @@ class Printer: # Fill whole string with spaces for cleaning progress bar @staticmethod - def clean_str(string: str) -> str: + def clean_str(string): return string + " " * (os.get_terminal_size().columns - len(string)) @staticmethod @@ -23,20 +23,20 @@ class Printer: if sys.platform == "win32": colorama.init() - def bar_print(self, string: str): + def bar_print(self, string): print(string) self.bar.update() - def info(self, string: str): + def info(self, string): self.bar_print(self.clean_str(f"\r\033[100m- {string}\033[49m")) - def warning(self, string: str): + def warning(self, string): self.bar_print(self.clean_str(f"\r\033[93m!\033[0m {string}\033[49m")) - def error(self, string: str): + def error(self, string): self.bar_print(self.clean_str(f"\r\033[31m\u2715\033[0m {string}\033[49m")) - def files(self, source: str, dest: str, dest_ext: str, comment: str): + def files(self, source, dest, dest_ext, comment): source_ext = os.path.splitext(source)[1] source_name = os.path.splitext(source)[0] diff --git a/vnrecode/utils.py b/vnrecode/utils.py index 3410eae..040da09 100644 --- a/vnrecode/utils.py +++ b/vnrecode/utils.py @@ -1,11 +1,6 @@ from shutil import copyfile -from glob import glob import sys import os -import re - -import fnmatch - class Utils: @@ -13,7 +8,6 @@ class Utils: self.errors = 0 self.params = params self.printer = printer - self.duplicates = [] @staticmethod def sys_pause(): @@ -21,15 +15,15 @@ class Utils: os.system("pause") @staticmethod - def get_size(directory: str) -> int: + def get_size(directory): total_size = 0 for folder, folders, files in os.walk(directory): for file in files: - if not os.path.islink(os.path.join(folder, file)): - total_size += os.path.getsize(os.path.join(folder, file)) + if not os.path.islink(f"{folder}/{file}"): + total_size += os.path.getsize(f"{folder}/{file}") return total_size - def get_compression(self, source: str, output: str): + def get_compression(self, source, output): try: source = self.get_size(source) output = self.get_size(output) @@ -39,56 +33,39 @@ class Utils: except ZeroDivisionError: self.printer.warning("Nothing compressed!") - def get_compression_status(self, source: str): - source_len = 0 - output_len = 0 + def get_compression_status(self, orig_folder): + orig_folder_len = 0 + comp_folder_len = 0 - for folder, folders, files in os.walk(source): - source_len += len(files) + for folder, folders, files in os.walk(orig_folder): + orig_folder_len += len(files) - for folder, folders, files in os.walk(f'{source}_compressed'): + for folder, folders, files in os.walk(f'{orig_folder}_compressed'): for file in files: if not os.path.splitext(file)[1].count("(copy)"): - output_len += 1 + comp_folder_len += 1 if self.errors != 0: self.printer.warning("Some files failed to compress!") - if source_len == output_len: + if orig_folder_len == comp_folder_len: self.printer.info("Success!") + self.get_compression(orig_folder, f"{orig_folder}_compressed") else: self.printer.warning("Original and compressed folders are not identical!") - self.get_compression(source, f"{source}_compressed") + self.get_compression(orig_folder, f"{orig_folder}_compressed") - def add_unprocessed_file(self, source: str, output: str): + def add_unprocessed_file(self, orig_folder, new_folder): if self.params.copy_unprocessed: - filename = os.path.split(source)[-1] - copyfile(source, output) + filename = orig_folder.split("/").pop() + copyfile(orig_folder, new_folder) self.printer.info(f"File {filename} copied to compressed folder.") - def check_duplicates(self, source: str, output: str, filename: str) -> str: - re_pattern = re.compile(os.path.splitext(filename)[0]+r".[a-zA-Z0-9]+$", re.IGNORECASE) - duplicates = [name for name in os.listdir(source) if re_pattern.match(name)] - - if len(duplicates) > 1: - if filename.lower() not in (duplicate.lower() for duplicate in self.duplicates): - self.duplicates.append(filename) - new_name = os.path.splitext(filename)[0] + "(vncopy)" + os.path.splitext(filename)[1] - return os.path.join(output, new_name) - return os.path.join(output, filename) - - def print_duplicates(self): - for filename in self.duplicates: + def check_duplicates(self, new_folder): + filename = new_folder.split().pop() + if os.path.exists(new_folder): self.printer.warning( f'Duplicate file has been found! Check manually this files - "{filename}", ' - f'"{os.path.splitext(filename)[0] + "(vncopy)" + os.path.splitext(filename)[1]}"' - ) - - def mimic_rename(self, filename: str, target: str, source: str): - if filename.count("(vncopy)"): - orig_name = filename.replace("(vncopy)", "") - index = self.duplicates.index(os.path.split(orig_name)[-1]) - self.duplicates[index] = os.path.split(target)[-1] - target = os.path.splitext(target)[0] + "(vncopy)" + os.path.splitext(target)[1] - - os.rename(filename, target.replace(source, f"{source}_compressed")) \ No newline at end of file + f'"{os.path.splitext(filename)[0] + "(copy)" + os.path.splitext(filename)[1]}"') + return os.path.splitext(new_folder)[0] + "(copy)" + os.path.splitext(new_folder)[1] + return new_folder