From 9bb3cdcccbbab5a97160d4ab247f549d441ad6e9 Mon Sep 17 00:00:00 2001 From: OleSTEEP Date: Fri, 18 Oct 2024 20:50:40 +0300 Subject: [PATCH 1/7] vnrecode: make it private! --- vnrecode/application.py | 40 ++++++++--------- vnrecode/compress.py | 98 ++++++++++++++++++++--------------------- vnrecode/params.py | 4 +- vnrecode/utils.py | 80 ++++++++++++++++----------------- 4 files changed, 107 insertions(+), 115 deletions(-) diff --git a/vnrecode/application.py b/vnrecode/application.py index 912c526..239bb05 100755 --- a/vnrecode/application.py +++ b/vnrecode/application.py @@ -13,41 +13,37 @@ from .utils import Utils class Application: def __init__(self, params: Params, compress: Compress, printer: Printer, utils: Utils): - self.params = params - self.compress = compress.compress - self.printer = printer - self.utils = utils - - def compress_worker(self, folder: str, file: str, source: str, output: str): - if os.path.isfile(os.path.join(folder, file)): - self.compress(folder, file, source, output) + self.__params = params + self.__compress = compress.compress + self.__printer = printer + self.__utils = utils def run(self): start_time = datetime.now() - self.printer.win_ascii_esc() + self.__printer.win_ascii_esc() - source = os.path.abspath(self.params.source) + source = self.__params.source - if os.path.exists(f"{source}_compressed"): - shutil.rmtree(f"{source}_compressed") + if os.path.exists(self.__params.dest): + shutil.rmtree(self.__params.dest) - self.printer.info("Creating folders...") + self.__printer.info("Creating folders...") for folder, folders, files in os.walk(source): - if not os.path.exists(folder.replace(source, f"{source}_compressed")): - os.mkdir(folder.replace(source, f"{source}_compressed")) + if not os.path.exists(folder.replace(source, self.__params.dest)): + os.mkdir(folder.replace(source, self.__params.dest)) - self.printer.info(f'Compressing "{folder.replace(source, os.path.split(source)[-1])}" folder...') - output = folder.replace(source, f"{source}_compressed") + self.__printer.info(f'Compressing "{folder.replace(source, os.path.split(source)[-1])}" folder...') + output = folder.replace(source, self.__params.dest) - with ThreadPoolExecutor(max_workers=self.params.workers) as executor: + with ThreadPoolExecutor(max_workers=self.__params.workers) as executor: futures = [ - executor.submit(self.compress, folder, file, source, output) + executor.submit(self.__compress, folder, file, output) for file in files if os.path.isfile(os.path.join(folder, file)) ] for future in as_completed(futures): future.result() - self.utils.print_duplicates() - self.utils.get_compression_status(source) - self.utils.sys_pause() + self.__utils.print_duplicates() + self.__utils.get_compression_status() + self.__utils.sys_pause() print(f"Time taken: {datetime.now() - start_time}") \ No newline at end of file diff --git a/vnrecode/compress.py b/vnrecode/compress.py index 18fb20e..c484243 100644 --- a/vnrecode/compress.py +++ b/vnrecode/compress.py @@ -44,13 +44,13 @@ class File: class Compress: def __init__(self, params: Params, printer: Printer, utils: Utils): - self.params = params - self.printer = printer - self.utils = utils + self.__params = params + self.__printer = printer + self.__utils = utils def audio(self, in_dir: str, file: str, out_dir: str, extension: str) -> str: - bit_rate = self.params.audio_bitrate - out_file = self.utils.check_duplicates(in_dir, out_dir, f'{path.splitext(file)[0]}.{extension}') + bit_rate = self.__params.audio_bitrate + out_file = self.__utils.check_duplicates(in_dir, out_dir, f'{path.splitext(file)[0]}.{extension}') try: (FFmpeg() .input(path.join(in_dir, file)) @@ -59,18 +59,18 @@ class Compress: .execute() ) except FFmpegError as e: - self.utils.add_unprocessed_file(path.join(in_dir, file), path.join(out_dir, file)) - self.utils.errors += 1 - if not self.params.hide_errors: - self.printer.error(f"File {file} can't be processed! Error: {e}") - self.printer.files(file, path.splitext(file)[0], extension, f"{bit_rate}") + self.__utils.add_unprocessed_file(path.join(in_dir, file), path.join(out_dir, file)) + self.__utils.errors += 1 + if not self.__params.hide_errors: + self.__printer.error(f"File {file} can't be processed! Error: {e}") + self.__printer.files(file, path.splitext(file)[0], extension, f"{bit_rate}") return out_file def video(self, in_dir: str, file: str, out_dir: str, extension: str) -> str: - if not self.params.video_skip: - out_file = self.utils.check_duplicates(in_dir, out_dir, f'{path.splitext(file)[0]}.{extension}') - codec = self.params.video_codec - crf = self.params.video_crf + if not self.__params.video_skip: + out_file = self.__utils.check_duplicates(in_dir, out_dir, f'{path.splitext(file)[0]}.{extension}') + codec = self.__params.video_codec + crf = self.__params.video_crf try: (FFmpeg() @@ -80,33 +80,33 @@ class Compress: .output(out_file,{"codec:v": codec, "v:b": 0, "loglevel": "error"}, crf=crf) .execute() ) - self.printer.files(file, path.splitext(file)[0], extension, codec) + self.__printer.files(file, path.splitext(file)[0], extension, codec) except FFmpegError as e: - self.utils.add_unprocessed_file(f'{in_dir}/{file}', f'{out_dir}/{file}') - self.utils.errors += 1 - if not self.params.hide_errors: - self.printer.error(f"File {file} can't be processed! Error: {e}") + self.__utils.add_unprocessed_file(f'{in_dir}/{file}', f'{out_dir}/{file}') + self.__utils.errors += 1 + if not self.__params.hide_errors: + self.__printer.error(f"File {file} can't be processed! Error: {e}") return out_file else: - self.utils.add_unprocessed_file(f'{in_dir}/{file}', f'{out_dir}/{file}') + self.__utils.add_unprocessed_file(f'{in_dir}/{file}', f'{out_dir}/{file}') return f'{out_dir}/{path.splitext(file)[0]}.{extension}' def image(self, in_dir: str, file: str, out_dir: str, extension: str) -> str: - quality = self.params.image_quality - out_file = self.utils.check_duplicates(in_dir, out_dir, f"{path.splitext(file)[0]}.{extension}") + quality = self.__params.image_quality + out_file = self.__utils.check_duplicates(in_dir, out_dir, f"{path.splitext(file)[0]}.{extension}") try: image = Image.open(path.join(in_dir, file)) if (extension == "jpg" or extension == "jpeg" or - (extension == "webp" and not self.params.webp_rgba)): + (extension == "webp" and not self.__params.webp_rgba)): if File.has_transparency(image): - self.printer.warning(f"{file} has transparency. Changing to fallback...") - extension = self.params.image_fall_ext + self.__printer.warning(f"{file} has transparency. Changing to fallback...") + extension = self.__params.image_fall_ext if File.has_transparency(image): image.convert('RGBA') - res_downscale = self.params.image_downscale + res_downscale = self.__params.image_downscale if res_downscale != 1: width, height = image.size new_size = (int(width / res_downscale), int(height / res_downscale)) @@ -114,21 +114,21 @@ class Compress: image.save(out_file, optimize=True, - lossless=self.params.image_lossless, + lossless=self.__params.image_lossless, quality=quality, minimize_size=True) - self.printer.files(file, path.splitext(file)[0], extension, f"{quality}%") + self.__printer.files(file, path.splitext(file)[0], extension, f"{quality}%") except Exception as e: - self.utils.add_unprocessed_file(path.join(in_dir, file), path.join(out_dir, file)) - self.utils.errors += 1 - if not self.params.hide_errors: - self.printer.error(f"File {file} can't be processed! Error: {e}") + self.__utils.add_unprocessed_file(path.join(in_dir, file), path.join(out_dir, file)) + self.__utils.errors += 1 + if not self.__params.hide_errors: + self.__printer.error(f"File {file} can't be processed! Error: {e}") return out_file def unknown(self, in_dir: str, filename: str, out_dir: str) -> str: - if self.params.force_compress: - self.printer.unknown_file(filename) - out_file = self.utils.check_duplicates(in_dir, out_dir, filename) + if self.__params.force_compress: + self.__printer.unknown_file(filename) + out_file = self.__utils.check_duplicates(in_dir, out_dir, filename) try: (FFmpeg() .input(path.join(in_dir, filename)) @@ -136,28 +136,28 @@ class Compress: .execute() ) except FFmpegError as e: - self.utils.add_unprocessed_file(path.join(in_dir, filename), path.join(out_dir, filename)) - self.utils.errors += 1 - if not self.params.hide_errors: - self.printer.error(f"File {filename} can't be processed! Error: {e}") + self.__utils.add_unprocessed_file(path.join(in_dir, filename), path.join(out_dir, filename)) + self.__utils.errors += 1 + if not self.__params.hide_errors: + self.__printer.error(f"File {filename} can't be processed! Error: {e}") return out_file else: - self.utils.add_unprocessed_file(path.join(in_dir, filename), path.join(out_dir, filename)) + self.__utils.add_unprocessed_file(path.join(in_dir, filename), path.join(out_dir, filename)) return path.join(out_dir, filename) - def compress(self, _dir: str, filename: str, source: str, output: str): + def compress(self, dir_: str, filename: str, output: str): match File.get_type(filename): case "audio": - out_file = self.audio(_dir, filename, output, self.params.audio_ext) + out_file = self.audio(dir_, filename, output, self.__params.audio_ext) case "image": - out_file = self.image(_dir, filename, output, self.params.image_ext) + out_file = self.image(dir_, filename, output, self.__params.image_ext) case "video": - out_file = self.video(_dir, filename, output, self.params.video_ext) + out_file = self.video(dir_, filename, output, self.__params.video_ext) case "unknown": - out_file = self.unknown(_dir, filename, output) + out_file = self.unknown(dir_, filename, output) - if self.params.mimic_mode: - self.utils.mimic_rename(out_file, path.join(_dir, filename), source) + if self.__params.mimic_mode: + self.__utils.mimic_rename(out_file, path.join(dir_, filename)) - self.printer.bar.update() - self.printer.bar.next() + self.__printer.bar.update() + self.__printer.bar.next() diff --git a/vnrecode/params.py b/vnrecode/params.py index ea15691..ad4ec59 100644 --- a/vnrecode/params.py +++ b/vnrecode/params.py @@ -29,6 +29,7 @@ class Params: video_codec: str source: str + dest: str @classmethod def setup(cls) -> Self: @@ -59,12 +60,13 @@ class Params: video_ext = config["VIDEO"]["Extension"] if args.config else args.v_ext video_codec = config["VIDEO"]["Codec"] if args.config else args.v_codec source = args.source + dest = f"{source}_compressed" return cls( copy_unprocessed, force_compress, mimic_mode, hide_errors, webp_rgba, workers, audio_ext, audio_bitrate, image_downscale, image_ext, image_fall_ext, image_lossless, image_quality, - video_crf, video_skip, video_ext, video_codec, source + video_crf, video_skip, video_ext, video_codec, source, dest ) @staticmethod diff --git a/vnrecode/utils.py b/vnrecode/utils.py index 3410eae..a75e4c3 100644 --- a/vnrecode/utils.py +++ b/vnrecode/utils.py @@ -1,19 +1,16 @@ from shutil import copyfile -from glob import glob import sys import os import re -import fnmatch - class Utils: - def __init__(self, params, printer): - self.errors = 0 - self.params = params - self.printer = printer - self.duplicates = [] + def __init__(self, params_inst, printer_inst): + self.__errors = 0 + self.__params = params_inst + self.__printer = printer_inst + self.__duplicates = [] @staticmethod def sys_pause(): @@ -29,66 +26,63 @@ class Utils: total_size += os.path.getsize(os.path.join(folder, file)) return total_size - def get_compression(self, source: str, output: str): + def get_compression_status(self): + source_len = 0 + output_len = 0 + + for folder, folders, files in os.walk(self.__params.source): + source_len += len(files) + + for folder, folders, files in os.walk(self.__params.dest): + for file in files: + if not os.path.splitext(file)[1].count("(vncopy)"): + output_len += 1 + + if self.__errors != 0: + self.__printer.warning("Some files failed to compress!") + + if source_len == output_len: + self.__printer.info("Success!") + else: + self.__printer.warning("Original and compressed folders are not identical!") try: - source = self.get_size(source) - output = self.get_size(output) + source = self.get_size(self.__params.source) + output = self.get_size(self.__params.dest) print(f"\nResult: {source/1024/1024:.2f}MB -> " f"{output/1024/1024:.2f}MB ({(output - source)/1024/1024:.2f}MB)") except ZeroDivisionError: - self.printer.warning("Nothing compressed!") - - def get_compression_status(self, source: str): - source_len = 0 - output_len = 0 - - for folder, folders, files in os.walk(source): - source_len += len(files) - - for folder, folders, files in os.walk(f'{source}_compressed'): - for file in files: - if not os.path.splitext(file)[1].count("(copy)"): - output_len += 1 - - if self.errors != 0: - self.printer.warning("Some files failed to compress!") - - if source_len == output_len: - self.printer.info("Success!") - else: - self.printer.warning("Original and compressed folders are not identical!") - self.get_compression(source, f"{source}_compressed") + self.__printer.warning("Nothing compressed!") def add_unprocessed_file(self, source: str, output: str): - if self.params.copy_unprocessed: + if self.__params.copy_unprocessed: filename = os.path.split(source)[-1] copyfile(source, output) - self.printer.info(f"File {filename} copied to compressed folder.") + self.__printer.info(f"File {filename} copied to compressed folder.") def check_duplicates(self, source: str, output: str, filename: str) -> str: re_pattern = re.compile(os.path.splitext(filename)[0]+r".[a-zA-Z0-9]+$", re.IGNORECASE) duplicates = [name for name in os.listdir(source) if re_pattern.match(name)] if len(duplicates) > 1: - if filename.lower() not in (duplicate.lower() for duplicate in self.duplicates): - self.duplicates.append(filename) + if filename.lower() not in (duplicate.lower() for duplicate in self.__duplicates): + self.__duplicates.append(filename) new_name = os.path.splitext(filename)[0] + "(vncopy)" + os.path.splitext(filename)[1] return os.path.join(output, new_name) return os.path.join(output, filename) def print_duplicates(self): - for filename in self.duplicates: - self.printer.warning( + for filename in self.__duplicates: + self.__printer.warning( f'Duplicate file has been found! Check manually this files - "{filename}", ' f'"{os.path.splitext(filename)[0] + "(vncopy)" + os.path.splitext(filename)[1]}"' ) - def mimic_rename(self, filename: str, target: str, source: str): + def mimic_rename(self, filename: str, target: str): if filename.count("(vncopy)"): orig_name = filename.replace("(vncopy)", "") - index = self.duplicates.index(os.path.split(orig_name)[-1]) - self.duplicates[index] = os.path.split(target)[-1] + index = self.__duplicates.index(os.path.split(orig_name)[-1]) + self.__duplicates[index] = os.path.split(target)[-1] target = os.path.splitext(target)[0] + "(vncopy)" + os.path.splitext(target)[1] - os.rename(filename, target.replace(source, f"{source}_compressed")) \ No newline at end of file + os.rename(filename, target.replace(self.__params.source, self.__params.dest)) \ No newline at end of file From 4e6fd332c54917ef9bc6a839f3c689191bcbaeba Mon Sep 17 00:00:00 2001 From: OleSTEEP Date: Fri, 18 Oct 2024 22:54:37 +0300 Subject: [PATCH 2/7] vnrecode: rewrite duplicates processing --- .gitignore | 1 + vnrecode/compress.py | 52 +++++++++++++++++--------------------------- vnrecode/utils.py | 47 ++++++++++++++++++++------------------- 3 files changed, 46 insertions(+), 54 deletions(-) diff --git a/.gitignore b/.gitignore index e6aa831..e3947bb 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,6 @@ /output/ /tests/ +/tests_compressed/ /build/ /dist/ /vntools.egg-info/ diff --git a/vnrecode/compress.py b/vnrecode/compress.py index c484243..d1a2919 100644 --- a/vnrecode/compress.py +++ b/vnrecode/compress.py @@ -50,7 +50,8 @@ class Compress: def audio(self, in_dir: str, file: str, out_dir: str, extension: str) -> str: bit_rate = self.__params.audio_bitrate - out_file = self.__utils.check_duplicates(in_dir, out_dir, f'{path.splitext(file)[0]}.{extension}') + prefix = self.__utils.get_hash(file) + out_file = path.join(out_dir, f'{prefix}_{path.splitext(file)[0]}.{extension}') try: (FFmpeg() .input(path.join(in_dir, file)) @@ -59,16 +60,14 @@ class Compress: .execute() ) except FFmpegError as e: - self.__utils.add_unprocessed_file(path.join(in_dir, file), path.join(out_dir, file)) - self.__utils.errors += 1 - if not self.__params.hide_errors: - self.__printer.error(f"File {file} can't be processed! Error: {e}") + self.__utils.catch_unprocessed(path.join(in_dir, file), out_file, e) self.__printer.files(file, path.splitext(file)[0], extension, f"{bit_rate}") return out_file def video(self, in_dir: str, file: str, out_dir: str, extension: str) -> str: + prefix = self.__utils.get_hash(file) + out_file = path.join(out_dir, f'{prefix}_{path.splitext(file)[0]}.{extension}') if not self.__params.video_skip: - out_file = self.__utils.check_duplicates(in_dir, out_dir, f'{path.splitext(file)[0]}.{extension}') codec = self.__params.video_codec crf = self.__params.video_crf @@ -82,18 +81,15 @@ class Compress: ) self.__printer.files(file, path.splitext(file)[0], extension, codec) except FFmpegError as e: - self.__utils.add_unprocessed_file(f'{in_dir}/{file}', f'{out_dir}/{file}') - self.__utils.errors += 1 - if not self.__params.hide_errors: - self.__printer.error(f"File {file} can't be processed! Error: {e}") - return out_file + self.__utils.catch_unprocessed(path.join(in_dir, file), out_file, e) else: - self.__utils.add_unprocessed_file(f'{in_dir}/{file}', f'{out_dir}/{file}') - return f'{out_dir}/{path.splitext(file)[0]}.{extension}' + self.__utils.copy_unprocessed(path.join(in_dir, file), out_file) + return out_file def image(self, in_dir: str, file: str, out_dir: str, extension: str) -> str: quality = self.__params.image_quality - out_file = self.__utils.check_duplicates(in_dir, out_dir, f"{path.splitext(file)[0]}.{extension}") + prefix = self.__utils.get_hash(file) + out_file = path.join(out_dir, f"{prefix}_{path.splitext(file)[0]}.{extension}") try: image = Image.open(path.join(in_dir, file)) @@ -119,31 +115,25 @@ class Compress: minimize_size=True) self.__printer.files(file, path.splitext(file)[0], extension, f"{quality}%") except Exception as e: - self.__utils.add_unprocessed_file(path.join(in_dir, file), path.join(out_dir, file)) - self.__utils.errors += 1 - if not self.__params.hide_errors: - self.__printer.error(f"File {file} can't be processed! Error: {e}") + self.__utils.catch_unprocessed(path.join(in_dir, file), out_file, e) return out_file - def unknown(self, in_dir: str, filename: str, out_dir: str) -> str: + def unknown(self, in_dir: str, file: str, out_dir: str) -> str: + prefix = self.__utils.get_hash(file) + out_file = path.join(out_dir, f"{prefix}_{file}") if self.__params.force_compress: - self.__printer.unknown_file(filename) - out_file = self.__utils.check_duplicates(in_dir, out_dir, filename) + self.__printer.unknown_file(file) try: (FFmpeg() - .input(path.join(in_dir, filename)) + .input(path.join(in_dir, file)) .output(out_file) .execute() ) except FFmpegError as e: - self.__utils.add_unprocessed_file(path.join(in_dir, filename), path.join(out_dir, filename)) - self.__utils.errors += 1 - if not self.__params.hide_errors: - self.__printer.error(f"File {filename} can't be processed! Error: {e}") - return out_file + self.__utils.catch_unprocessed(path.join(in_dir, file), out_file, e) else: - self.__utils.add_unprocessed_file(path.join(in_dir, filename), path.join(out_dir, filename)) - return path.join(out_dir, filename) + self.__utils.copy_unprocessed(path.join(in_dir, file), out_file) + return out_file def compress(self, dir_: str, filename: str, output: str): match File.get_type(filename): @@ -156,8 +146,6 @@ class Compress: case "unknown": out_file = self.unknown(dir_, filename, output) - if self.__params.mimic_mode: - self.__utils.mimic_rename(out_file, path.join(dir_, filename)) - + self.__utils.out_rename(out_file, filename) self.__printer.bar.update() self.__printer.bar.next() diff --git a/vnrecode/utils.py b/vnrecode/utils.py index a75e4c3..bf4e198 100644 --- a/vnrecode/utils.py +++ b/vnrecode/utils.py @@ -1,7 +1,7 @@ from shutil import copyfile +import hashlib import sys import os -import re class Utils: @@ -26,6 +26,10 @@ class Utils: total_size += os.path.getsize(os.path.join(folder, file)) return total_size + @staticmethod + def get_hash(filename: str) -> str: + return hashlib.md5(filename.encode()).hexdigest()[:8] + def get_compression_status(self): source_len = 0 output_len = 0 @@ -54,22 +58,23 @@ class Utils: except ZeroDivisionError: self.__printer.warning("Nothing compressed!") - def add_unprocessed_file(self, source: str, output: str): + def catch_unprocessed(self, source, output, error): + self.copy_unprocessed(source, error) + self.__errors += 1 + if not self.__params.hide_errors: + self.__printer.error(f"File {os.path.split(source)[-1]} can't be processed! Error: {error}") + + def copy_unprocessed(self, source, output): if self.__params.copy_unprocessed: - filename = os.path.split(source)[-1] copyfile(source, output) - self.__printer.info(f"File {filename} copied to compressed folder.") + self.__printer.info(f"File {os.path.split(source)[-1]} copied to compressed folder.") - def check_duplicates(self, source: str, output: str, filename: str) -> str: - re_pattern = re.compile(os.path.splitext(filename)[0]+r".[a-zA-Z0-9]+$", re.IGNORECASE) - duplicates = [name for name in os.listdir(source) if re_pattern.match(name)] - - if len(duplicates) > 1: - if filename.lower() not in (duplicate.lower() for duplicate in self.__duplicates): - self.__duplicates.append(filename) - new_name = os.path.splitext(filename)[0] + "(vncopy)" + os.path.splitext(filename)[1] - return os.path.join(output, new_name) - return os.path.join(output, filename) + def catch_duplicates(self, path: str) -> str: + if os.path.exists(path): + new_path = os.path.splitext(path)[0] + "(vncopy)" + os.path.splitext(path)[1] + self.__duplicates.append(new_path) + return new_path + return path def print_duplicates(self): for filename in self.__duplicates: @@ -78,11 +83,9 @@ class Utils: f'"{os.path.splitext(filename)[0] + "(vncopy)" + os.path.splitext(filename)[1]}"' ) - def mimic_rename(self, filename: str, target: str): - if filename.count("(vncopy)"): - orig_name = filename.replace("(vncopy)", "") - index = self.__duplicates.index(os.path.split(orig_name)[-1]) - self.__duplicates[index] = os.path.split(target)[-1] - target = os.path.splitext(target)[0] + "(vncopy)" + os.path.splitext(target)[1] - - os.rename(filename, target.replace(self.__params.source, self.__params.dest)) \ No newline at end of file + def out_rename(self, filename: str, target: str): + if not self.__params.mimic_mode: + dest_name = self.catch_duplicates(os.path.join(os.path.dirname(filename), target)) + os.rename(filename, dest_name) + else: + os.rename(filename, os.path.join(os.path.dirname(filename), target)) From 1c1e8a92921146f7f243e6ba932d09ceee6b5b89 Mon Sep 17 00:00:00 2001 From: OleSTEEP Date: Sat, 19 Oct 2024 00:25:52 +0300 Subject: [PATCH 3/7] vnrecode: pathlib for all paths --- vnrecode/application.py | 25 ++++----- vnrecode/compress.py | 116 ++++++++++++++++++++-------------------- vnrecode/params.py | 12 ++--- vnrecode/printer.py | 14 +++-- vnrecode/utils.py | 45 +++++++++------- 5 files changed, 108 insertions(+), 104 deletions(-) diff --git a/vnrecode/application.py b/vnrecode/application.py index 239bb05..3b6843e 100755 --- a/vnrecode/application.py +++ b/vnrecode/application.py @@ -1,6 +1,7 @@ #!/usr/bin/env python3 from concurrent.futures import ThreadPoolExecutor, as_completed from datetime import datetime +from pathlib import Path import shutil import os @@ -12,11 +13,11 @@ from .utils import Utils class Application: - def __init__(self, params: Params, compress: Compress, printer: Printer, utils: Utils): - self.__params = params - self.__compress = compress.compress - self.__printer = printer - self.__utils = utils + def __init__(self, params_inst: Params, compress_inst: Compress, printer_inst: Printer, utils_inst: Utils): + self.__params = params_inst + self.__compress = compress_inst.compress + self.__printer = printer_inst + self.__utils = utils_inst def run(self): start_time = datetime.now() @@ -24,21 +25,21 @@ class Application: source = self.__params.source - if os.path.exists(self.__params.dest): + if self.__params.dest.exists(): shutil.rmtree(self.__params.dest) self.__printer.info("Creating folders...") for folder, folders, files in os.walk(source): - if not os.path.exists(folder.replace(source, self.__params.dest)): - os.mkdir(folder.replace(source, self.__params.dest)) + output = Path(folder.replace(str(source), str(self.__params.dest))) + if not output.exists(): + os.mkdir(output) - self.__printer.info(f'Compressing "{folder.replace(source, os.path.split(source)[-1])}" folder...') - output = folder.replace(source, self.__params.dest) + self.__printer.info(f'Compressing "{output}" folder...') with ThreadPoolExecutor(max_workers=self.__params.workers) as executor: futures = [ - executor.submit(self.__compress, folder, file, output) - for file in files if os.path.isfile(os.path.join(folder, file)) + executor.submit(self.__compress, Path(folder, file), Path(output)) + for file in files if Path(folder, file).is_file() ] for future in as_completed(futures): future.result() diff --git a/vnrecode/compress.py b/vnrecode/compress.py index d1a2919..7b06988 100644 --- a/vnrecode/compress.py +++ b/vnrecode/compress.py @@ -1,6 +1,6 @@ from ffmpeg import FFmpeg, FFmpegError +from pathlib import Path from PIL import Image -from os import path import pillow_avif from .printer import Printer @@ -11,7 +11,7 @@ from .utils import Utils class File: @staticmethod - def get_type(filename: str) -> str: + def get_type(filename: Path) -> str: extensions = { "audio": ['.aac', '.flac', '.m4a', '.mp3', '.ogg', '.opus', '.raw', '.wav', '.wma'], @@ -21,7 +21,7 @@ class File: } for file_type in extensions: - if path.splitext(filename)[1] in extensions[file_type]: + if filename.suffix in extensions[file_type]: return file_type return "unknown" @@ -43,61 +43,39 @@ class File: class Compress: - def __init__(self, params: Params, printer: Printer, utils: Utils): - self.__params = params - self.__printer = printer - self.__utils = utils + def __init__(self, params_inst: Params, printer_inst: Printer, utils_inst: Utils): + self.__params = params_inst + self.__printer = printer_inst + self.__utils = utils_inst - def audio(self, in_dir: str, file: str, out_dir: str, extension: str) -> str: + def audio(self, input_path: Path, output_dir: Path, extension: str) -> Path: bit_rate = self.__params.audio_bitrate - prefix = self.__utils.get_hash(file) - out_file = path.join(out_dir, f'{prefix}_{path.splitext(file)[0]}.{extension}') + prefix = self.__utils.get_hash(input_path.name) + out_file = Path(output_dir, f'{prefix}_{input_path.stem}.{extension}') try: (FFmpeg() - .input(path.join(in_dir, file)) + .input(input_path) .option("hide_banner") .output(out_file,{"b:a": bit_rate, "loglevel": "error"}) .execute() ) except FFmpegError as e: - self.__utils.catch_unprocessed(path.join(in_dir, file), out_file, e) - self.__printer.files(file, path.splitext(file)[0], extension, f"{bit_rate}") + self.__utils.catch_unprocessed(input_path, out_file, e) + self.__printer.files(input_path, out_file, f"{bit_rate}") return out_file - def video(self, in_dir: str, file: str, out_dir: str, extension: str) -> str: - prefix = self.__utils.get_hash(file) - out_file = path.join(out_dir, f'{prefix}_{path.splitext(file)[0]}.{extension}') - if not self.__params.video_skip: - codec = self.__params.video_codec - crf = self.__params.video_crf - - try: - (FFmpeg() - .input(path.join(in_dir, file)) - .option("hide_banner") - .option("hwaccel", "auto") - .output(out_file,{"codec:v": codec, "v:b": 0, "loglevel": "error"}, crf=crf) - .execute() - ) - self.__printer.files(file, path.splitext(file)[0], extension, codec) - except FFmpegError as e: - self.__utils.catch_unprocessed(path.join(in_dir, file), out_file, e) - else: - self.__utils.copy_unprocessed(path.join(in_dir, file), out_file) - return out_file - - def image(self, in_dir: str, file: str, out_dir: str, extension: str) -> str: + def image(self, input_path: Path, output_dir: Path, extension: str) -> Path: quality = self.__params.image_quality - prefix = self.__utils.get_hash(file) - out_file = path.join(out_dir, f"{prefix}_{path.splitext(file)[0]}.{extension}") + prefix = self.__utils.get_hash(input_path.name) + out_file = Path(output_dir, f"{prefix}_{input_path.stem}.{extension}") try: - image = Image.open(path.join(in_dir, file)) + image = Image.open(input_path) if (extension == "jpg" or extension == "jpeg" or (extension == "webp" and not self.__params.webp_rgba)): if File.has_transparency(image): - self.__printer.warning(f"{file} has transparency. Changing to fallback...") - extension = self.__params.image_fall_ext + self.__printer.warning(f"{input_path.name} has transparency. Changing to fallback...") + out_file = Path(output_dir, f"{prefix}_{input_path.stem}.{self.__params.image_fall_ext}") if File.has_transparency(image): image.convert('RGBA') @@ -113,39 +91,61 @@ class Compress: lossless=self.__params.image_lossless, quality=quality, minimize_size=True) - self.__printer.files(file, path.splitext(file)[0], extension, f"{quality}%") + self.__printer.files(input_path, out_file, f"{quality}%") except Exception as e: - self.__utils.catch_unprocessed(path.join(in_dir, file), out_file, e) + self.__utils.catch_unprocessed(input_path, out_file, e) return out_file - def unknown(self, in_dir: str, file: str, out_dir: str) -> str: - prefix = self.__utils.get_hash(file) - out_file = path.join(out_dir, f"{prefix}_{file}") - if self.__params.force_compress: - self.__printer.unknown_file(file) + def video(self, input_path: Path, output_dir: Path, extension: str) -> Path: + prefix = self.__utils.get_hash(input_path.name) + out_file = Path(output_dir, f'{prefix}_{input_path.stem}.{extension}') + if not self.__params.video_skip: + codec = self.__params.video_codec + crf = self.__params.video_crf + try: (FFmpeg() - .input(path.join(in_dir, file)) + .input(input_path) + .option("hide_banner") + .option("hwaccel", "auto") + .output(out_file,{"codec:v": codec, "v:b": 0, "loglevel": "error"}, crf=crf) + .execute() + ) + self.__printer.files(input_path, out_file, codec) + except FFmpegError as e: + self.__utils.catch_unprocessed(input_path, out_file, e) + else: + self.__utils.copy_unprocessed(input_path, out_file) + return out_file + + def unknown(self, input_path: Path, output_dir: Path) -> Path: + prefix = self.__utils.get_hash(input_path.name) + out_file = Path(output_dir, f"{prefix}_{input_path.name}") + if self.__params.force_compress: + self.__printer.unknown_file(input_path.name) + try: + (FFmpeg() + .input(input_path) .output(out_file) .execute() ) except FFmpegError as e: - self.__utils.catch_unprocessed(path.join(in_dir, file), out_file, e) + self.__utils.catch_unprocessed(input_path, out_file, e) else: - self.__utils.copy_unprocessed(path.join(in_dir, file), out_file) + self.__utils.copy_unprocessed(input_path, out_file) return out_file - def compress(self, dir_: str, filename: str, output: str): - match File.get_type(filename): + def compress(self, source: Path, output: Path): + match File.get_type(source): case "audio": - out_file = self.audio(dir_, filename, output, self.__params.audio_ext) + out_file = self.audio(source, output, self.__params.audio_ext) case "image": - out_file = self.image(dir_, filename, output, self.__params.image_ext) + out_file = self.image(source, output, self.__params.image_ext) case "video": - out_file = self.video(dir_, filename, output, self.__params.video_ext) + out_file = self.video(source, output, self.__params.video_ext) case "unknown": - out_file = self.unknown(dir_, filename, output) + out_file = self.unknown(source, output) - self.__utils.out_rename(out_file, filename) + self.__utils.out_rename(out_file, source.name) self.__printer.bar.update() self.__printer.bar.next() diff --git a/vnrecode/params.py b/vnrecode/params.py index ad4ec59..d4db410 100644 --- a/vnrecode/params.py +++ b/vnrecode/params.py @@ -1,8 +1,8 @@ from argparse import ArgumentParser, Namespace from dataclasses import dataclass +from pathlib import Path from typing import Self import tomllib -import os @dataclass class Params: @@ -28,14 +28,14 @@ class Params: video_ext: str video_codec: str - source: str - dest: str + source: Path + dest: Path @classmethod def setup(cls) -> Self: args = cls.get_args() if args.config is not None: - if os.path.isfile(args.config): + if Path(args.config).is_file(): with open(args.config, "rb") as cfile: config = tomllib.load(cfile) else: @@ -59,8 +59,8 @@ class Params: video_skip = config["VIDEO"]["SkipVideo"] if args.config else args.v_skip video_ext = config["VIDEO"]["Extension"] if args.config else args.v_ext video_codec = config["VIDEO"]["Codec"] if args.config else args.v_codec - source = args.source - dest = f"{source}_compressed" + source = Path(args.source) + dest = Path(f"{args.source}_compressed") return cls( copy_unprocessed, force_compress, mimic_mode, hide_errors, webp_rgba, workers, diff --git a/vnrecode/printer.py b/vnrecode/printer.py index 30aa496..4de4f3c 100644 --- a/vnrecode/printer.py +++ b/vnrecode/printer.py @@ -1,12 +1,12 @@ from progress.bar import IncrementalBar +from pathlib import Path import colorama import sys import os - class Printer: - def __init__(self, folder): + def __init__(self, folder: Path): file_count = 0 for folder, folders, file in os.walk(folder): file_count += len(file) @@ -36,11 +36,9 @@ class Printer: def error(self, string: str): self.bar_print(self.clean_str(f"\r\033[31m\u2715\033[0m {string}\033[49m")) - def files(self, source: str, dest: str, dest_ext: str, comment: str): - source_ext = os.path.splitext(source)[1] - source_name = os.path.splitext(source)[0] + def files(self, source_path: Path, output_path: Path, comment: str): + self.bar_print(self.clean_str(f"\r\033[0;32m\u2713\033[0m \033[0;37m{source_path.stem}\033[0m{source_path.suffix}\033[0;37m -> " + f"{source_path.stem}\033[0m{output_path.suffix}\033[0;37m ({comment})\033[0m")) - self.bar_print(self.clean_str(f"\r\033[0;32m\u2713\033[0m \033[0;37m{source_name}\033[0m{source_ext}\033[0;37m -> {dest}\033[0m.{dest_ext}\033[0;37m ({comment})\033[0m")) - - def unknown_file(self, file): + def unknown_file(self, file: str): self.bar_print(self.clean_str(f"\r* \033[0;33m{file}\033[0m (File will be force compressed via ffmpeg)")) diff --git a/vnrecode/utils.py b/vnrecode/utils.py index bf4e198..2b52f30 100644 --- a/vnrecode/utils.py +++ b/vnrecode/utils.py @@ -1,12 +1,16 @@ from shutil import copyfile +from pathlib import Path import hashlib import sys import os +from vnrecode.printer import Printer +from vnrecode.params import Params + class Utils: - def __init__(self, params_inst, printer_inst): + def __init__(self, params_inst: Params, printer_inst: Printer): self.__errors = 0 self.__params = params_inst self.__printer = printer_inst @@ -18,12 +22,13 @@ class Utils: os.system("pause") @staticmethod - def get_size(directory: str) -> int: + def get_size(directory: Path) -> int: total_size = 0 for folder, folders, files in os.walk(directory): for file in files: - if not os.path.islink(os.path.join(folder, file)): - total_size += os.path.getsize(os.path.join(folder, file)) + path = Path(folder, file) + if not path.is_symlink(): + total_size += path.stat().st_size return total_size @staticmethod @@ -39,7 +44,7 @@ class Utils: for folder, folders, files in os.walk(self.__params.dest): for file in files: - if not os.path.splitext(file)[1].count("(vncopy)"): + if not file.count("(vncopy)"): output_len += 1 if self.__errors != 0: @@ -58,20 +63,20 @@ class Utils: except ZeroDivisionError: self.__printer.warning("Nothing compressed!") - def catch_unprocessed(self, source, output, error): - self.copy_unprocessed(source, error) + def catch_unprocessed(self, input_path: Path, output_path: Path, error): + self.copy_unprocessed(input_path, output_path) self.__errors += 1 if not self.__params.hide_errors: - self.__printer.error(f"File {os.path.split(source)[-1]} can't be processed! Error: {error}") + self.__printer.error(f"File {input_path.name} can't be processed! Error: {error}") - def copy_unprocessed(self, source, output): + def copy_unprocessed(self, input_path: Path, output_path: Path): if self.__params.copy_unprocessed: - copyfile(source, output) - self.__printer.info(f"File {os.path.split(source)[-1]} copied to compressed folder.") + copyfile(input_path, output_path) + self.__printer.info(f"File {input_path.name} copied to compressed folder.") - def catch_duplicates(self, path: str) -> str: - if os.path.exists(path): - new_path = os.path.splitext(path)[0] + "(vncopy)" + os.path.splitext(path)[1] + def catch_duplicates(self, path: Path) -> Path: + if path.exists(): + new_path = Path(path.stem + "(vncopy)" + path.suffix) self.__duplicates.append(new_path) return new_path return path @@ -79,13 +84,13 @@ class Utils: def print_duplicates(self): for filename in self.__duplicates: self.__printer.warning( - f'Duplicate file has been found! Check manually this files - "{filename}", ' - f'"{os.path.splitext(filename)[0] + "(vncopy)" + os.path.splitext(filename)[1]}"' + f'Duplicate file has been found! Check manually this files - "{filename.name}", ' + f'"{filename.stem + "(vncopy)" + filename.suffix}"' ) - def out_rename(self, filename: str, target: str): + def out_rename(self, out_path: Path, target: str): if not self.__params.mimic_mode: - dest_name = self.catch_duplicates(os.path.join(os.path.dirname(filename), target)) - os.rename(filename, dest_name) + dest_name = self.catch_duplicates(Path(out_path.parent, target)) + os.rename(out_path, dest_name) else: - os.rename(filename, os.path.join(os.path.dirname(filename), target)) + os.rename(out_path, Path(out_path.parent, target)) From df20bd363685700213942e5f5cebf02776b154a0 Mon Sep 17 00:00:00 2001 From: OleSTEEP Date: Sat, 19 Oct 2024 01:45:35 +0300 Subject: [PATCH 4/7] vnrecode: docstrings --- vnrecode/__main__.py | 4 +++ vnrecode/application.py | 11 ++++++- vnrecode/compress.py | 52 +++++++++++++++++++++++++++++++-- vnrecode/params.py | 12 ++++++++ vnrecode/printer.py | 57 ++++++++++++++++++++++++++++++++---- vnrecode/utils.py | 64 ++++++++++++++++++++++++++++++++--------- 6 files changed, 177 insertions(+), 23 deletions(-) diff --git a/vnrecode/__main__.py b/vnrecode/__main__.py index faf1839..fe45306 100644 --- a/vnrecode/__main__.py +++ b/vnrecode/__main__.py @@ -7,6 +7,10 @@ from .utils import Utils def init(): + """ + This function creates all needed class instances and run utility + :return: None + """ params = Params.setup() printer = Printer(params.source) utils = Utils(params, printer) diff --git a/vnrecode/application.py b/vnrecode/application.py index 3b6843e..c43102b 100755 --- a/vnrecode/application.py +++ b/vnrecode/application.py @@ -12,6 +12,9 @@ from .utils import Utils class Application: + """ + Main class for utility + """ def __init__(self, params_inst: Params, compress_inst: Compress, printer_inst: Printer, utils_inst: Utils): self.__params = params_inst @@ -20,6 +23,12 @@ class Application: self.__utils = utils_inst def run(self): + """ + Method creates a folder in which all the recoded files will be placed, + creates a queue of recoding processes for each file and, when the files are run out in the original folder, + calls functions to display the result + :return: None + """ start_time = datetime.now() self.__printer.win_ascii_esc() @@ -45,6 +54,6 @@ class Application: future.result() self.__utils.print_duplicates() - self.__utils.get_compression_status() + self.__utils.get_recode_status() self.__utils.sys_pause() print(f"Time taken: {datetime.now() - start_time}") \ No newline at end of file diff --git a/vnrecode/compress.py b/vnrecode/compress.py index 7b06988..4050e4c 100644 --- a/vnrecode/compress.py +++ b/vnrecode/compress.py @@ -9,10 +9,17 @@ from .utils import Utils class File: + """ + Class contains some methods to work with files + """ @staticmethod - def get_type(filename: Path) -> str: - + def get_type(path: Path) -> str: + """ + Method returns filetype string for file + :param path: Path of file to determine type + :return: filetype string: audio, image, video, unknown + """ extensions = { "audio": ['.aac', '.flac', '.m4a', '.mp3', '.ogg', '.opus', '.raw', '.wav', '.wma'], "image": ['.apng', '.avif', '.bmp', '.tga', '.tiff', '.dds', '.svg', '.webp', '.jpg', '.jpeg', '.png'], @@ -21,12 +28,17 @@ class File: } for file_type in extensions: - if filename.suffix in extensions[file_type]: + if path.suffix in extensions[file_type]: return file_type return "unknown" @staticmethod def has_transparency(img: Image) -> bool: + """ + Method checks if image has transparency + :param img: Pillow Image + :return: bool + """ if img.info.get("transparency", None) is not None: return True if img.mode == "P": @@ -49,6 +61,13 @@ class Compress: self.__utils = utils_inst def audio(self, input_path: Path, output_dir: Path, extension: str) -> Path: + """ + Method recodes audio files to another format using ffmpeg utility + :param input_path: Path of the original audio file + :param output_dir: Path of the output (compression) folder + :param extension: Extension of the new audio file + :return: Path of compressed audio file with md5 hash as prefix + """ bit_rate = self.__params.audio_bitrate prefix = self.__utils.get_hash(input_path.name) out_file = Path(output_dir, f'{prefix}_{input_path.stem}.{extension}') @@ -65,6 +84,13 @@ class Compress: return out_file def image(self, input_path: Path, output_dir: Path, extension: str) -> Path: + """ + Method recodes image files to another format using Pillow + :param input_path: Path of the original image file + :param output_dir: Path of the output (compression) folder + :param extension: Extension of the new image file + :return: Path of compressed image file with md5 hash as prefix + """ quality = self.__params.image_quality prefix = self.__utils.get_hash(input_path.name) out_file = Path(output_dir, f"{prefix}_{input_path.stem}.{extension}") @@ -97,6 +123,13 @@ class Compress: return out_file def video(self, input_path: Path, output_dir: Path, extension: str) -> Path: + """ + Method recodes video files to another format using ffmpeg utility + :param input_path: Path of the original video file + :param output_dir: Path of the output (compression) folder + :param extension: Extension of the new video file + :return: Path of compressed video file with md5 hash as prefix + """ prefix = self.__utils.get_hash(input_path.name) out_file = Path(output_dir, f'{prefix}_{input_path.stem}.{extension}') if not self.__params.video_skip: @@ -119,6 +152,13 @@ class Compress: return out_file def unknown(self, input_path: Path, output_dir: Path) -> Path: + """ + Method recodes files with "unknown" file format using ffmpeg, + in the hope that ffmpeg supports this file type and the default settings for it will reduce its size + :param input_path: Path of the original file + :param output_dir: Path of the output (compression) folder + :return: Path of compressed file with md5 hash as prefix + """ prefix = self.__utils.get_hash(input_path.name) out_file = Path(output_dir, f"{prefix}_{input_path.name}") if self.__params.force_compress: @@ -136,6 +176,12 @@ class Compress: return out_file def compress(self, source: Path, output: Path): + """ + It the core method for this program. Method determines file type and call compress function for it + :param source: Path of file to compress + :param output: Path of output file + :return: None + """ match File.get_type(source): case "audio": out_file = self.audio(source, output, self.__params.audio_ext) diff --git a/vnrecode/params.py b/vnrecode/params.py index d4db410..d0388f2 100644 --- a/vnrecode/params.py +++ b/vnrecode/params.py @@ -7,6 +7,10 @@ import tomllib @dataclass class Params: + """ + This dataclass contains all parameters for utility + """ + copy_unprocessed: bool force_compress: bool mimic_mode: bool @@ -33,6 +37,10 @@ class Params: @classmethod def setup(cls) -> Self: + """ + Method initialize all parameters and returns class instance + :return: Params instance + """ args = cls.get_args() if args.config is not None: if Path(args.config).is_file(): @@ -71,6 +79,10 @@ class Params: @staticmethod def get_args() -> Namespace: + """ + Method gets CLI arguments and returns argparse.Namespace instance + :return: argparse.Namespace of CLI args + """ parser = ArgumentParser(prog="vnrecode", description="Python utility to compress Visual Novel Resources" ) diff --git a/vnrecode/printer.py b/vnrecode/printer.py index 4de4f3c..a6046f9 100644 --- a/vnrecode/printer.py +++ b/vnrecode/printer.py @@ -5,40 +5,87 @@ import sys import os class Printer: + """ + Class implements CLI UI for this utility + """ - def __init__(self, folder: Path): + def __init__(self, source: Path): + """ + :param source: Path of original (compressing) folder to count its files for progress bar + """ file_count = 0 - for folder, folders, file in os.walk(folder): + for folder, folders, file in os.walk(source): file_count += len(file) self.bar = IncrementalBar('Compressing', max=file_count, suffix='[%(index)d/%(max)d] (%(percent).1f%%)') self.bar.update() - # Fill whole string with spaces for cleaning progress bar @staticmethod def clean_str(string: str) -> str: + """ + Method fills end of string with spaces to remove progress bar garbage from console + :param string: String to "clean" + :return: "Clean" string + """ return string + " " * (os.get_terminal_size().columns - len(string)) @staticmethod def win_ascii_esc(): + """ + Method setups colorama for cmd + :return: None + """ if sys.platform == "win32": colorama.init() def bar_print(self, string: str): + """ + Method prints some string in console and updates progress bar + :param string: String to print + :return: None + """ print(string) self.bar.update() def info(self, string: str): + """ + Method prints string with decor for info messages + :param string: String to print + :return: None + """ self.bar_print(self.clean_str(f"\r\033[100m- {string}\033[49m")) def warning(self, string: str): + """ + Method prints string with decor for warning messages + :param string: String to print + :return: None + """ self.bar_print(self.clean_str(f"\r\033[93m!\033[0m {string}\033[49m")) def error(self, string: str): + """ + Method prints string with decor for error messages + :param string: String to print + :return: None + """ self.bar_print(self.clean_str(f"\r\033[31m\u2715\033[0m {string}\033[49m")) def files(self, source_path: Path, output_path: Path, comment: str): + """ + Method prints the result of recoding a file with some decorations in the form: + input file name -> output file name (quality setting) + :param source_path: Input file Path + :param output_path: Output file Path + :param comment: Comment about recode quality setting + :return: None + """ self.bar_print(self.clean_str(f"\r\033[0;32m\u2713\033[0m \033[0;37m{source_path.stem}\033[0m{source_path.suffix}\033[0;37m -> " f"{source_path.stem}\033[0m{output_path.suffix}\033[0;37m ({comment})\033[0m")) - def unknown_file(self, file: str): - self.bar_print(self.clean_str(f"\r* \033[0;33m{file}\033[0m (File will be force compressed via ffmpeg)")) + def unknown_file(self, filename: str): + """ + Method prints the result of recoding unknown file + :param filename: Name of unknown file + :return: + """ + self.bar_print(self.clean_str(f"\r\u2713 \033[0;33m{filename}\033[0m (File will be force compressed via ffmpeg)")) diff --git a/vnrecode/utils.py b/vnrecode/utils.py index 2b52f30..7da06d3 100644 --- a/vnrecode/utils.py +++ b/vnrecode/utils.py @@ -9,6 +9,9 @@ from vnrecode.params import Params class Utils: + """ + Class contains various methods for internal utility use + """ def __init__(self, params_inst: Params, printer_inst: Printer): self.__errors = 0 @@ -18,24 +21,27 @@ class Utils: @staticmethod def sys_pause(): + """ + Method calls pause for Windows cmd shell + :return: None + """ if sys.platform == "win32": os.system("pause") - @staticmethod - def get_size(directory: Path) -> int: - total_size = 0 - for folder, folders, files in os.walk(directory): - for file in files: - path = Path(folder, file) - if not path.is_symlink(): - total_size += path.stat().st_size - return total_size - @staticmethod def get_hash(filename: str) -> str: + """ + Method returns 8 chars of md5 hash for filename + :param filename: File name to get md5 + :return: 8 chars of md5 hash + """ return hashlib.md5(filename.encode()).hexdigest()[:8] - def get_compression_status(self): + def get_recode_status(self): + """ + Method prints recoding results + :return: None + """ source_len = 0 output_len = 0 @@ -55,8 +61,8 @@ class Utils: else: self.__printer.warning("Original and compressed folders are not identical!") try: - source = self.get_size(self.__params.source) - output = self.get_size(self.__params.dest) + source = sum(file.stat().st_size for file in self.__params.source.glob('**/*') if file.is_file()) + output = sum(file.stat().st_size for file in self.__params.dest.glob('**/*') if file.is_file()) print(f"\nResult: {source/1024/1024:.2f}MB -> " f"{output/1024/1024:.2f}MB ({(output - source)/1024/1024:.2f}MB)") @@ -64,24 +70,48 @@ class Utils: self.__printer.warning("Nothing compressed!") def catch_unprocessed(self, input_path: Path, output_path: Path, error): + """ + Method processes files that have not been recoded due to an error and prints error to console + if hide_errors parameter is False + :param input_path: Path of unprocessed file + :param output_path: Destination path of unprocessed file + :param error: Recoding exception + :return: None + """ self.copy_unprocessed(input_path, output_path) self.__errors += 1 if not self.__params.hide_errors: self.__printer.error(f"File {input_path.name} can't be processed! Error: {error}") def copy_unprocessed(self, input_path: Path, output_path: Path): + """ + Method copies an unprocessed file from the source folder to the destination folder + :param input_path: Path of unprocessed file + :param output_path: Destination path of unprocessed file + :return: None + """ if self.__params.copy_unprocessed: copyfile(input_path, output_path) self.__printer.info(f"File {input_path.name} copied to compressed folder.") def catch_duplicates(self, path: Path) -> Path: - if path.exists(): + """ + Method checks if file path exists and returns folder/filename(vncopy).ext path + if duplicate founded + :param path: Some file Path + :return: Duplicate path name with (vncopy) on end + """ + if path.is_file() and path.exists(): new_path = Path(path.stem + "(vncopy)" + path.suffix) self.__duplicates.append(new_path) return new_path return path def print_duplicates(self): + """ + Method prints message about all duplicates generated during recode process + :return: None + """ for filename in self.__duplicates: self.__printer.warning( f'Duplicate file has been found! Check manually this files - "{filename.name}", ' @@ -89,6 +119,12 @@ class Utils: ) def out_rename(self, out_path: Path, target: str): + """ + Method removes md5 hash from file name and changes file extension in dependence of mimic mode + :param out_path: Recoded file Path + :param target: Target filename + :return: None + """ if not self.__params.mimic_mode: dest_name = self.catch_duplicates(Path(out_path.parent, target)) os.rename(out_path, dest_name) From 407ab98000df8b18a9d1bf2593a588e1c9210196 Mon Sep 17 00:00:00 2001 From: OleSTEEP Date: Sat, 19 Oct 2024 02:20:58 +0300 Subject: [PATCH 5/7] vnrecode: duplicates check for more than two files --- vnrecode/compress.py | 2 +- vnrecode/utils.py | 24 ++++++++++++++---------- 2 files changed, 15 insertions(+), 11 deletions(-) diff --git a/vnrecode/compress.py b/vnrecode/compress.py index 4050e4c..b1c2567 100644 --- a/vnrecode/compress.py +++ b/vnrecode/compress.py @@ -192,6 +192,6 @@ class Compress: case "unknown": out_file = self.unknown(source, output) - self.__utils.out_rename(out_file, source.name) + self.__utils.out_rename(out_file, source) self.__printer.bar.update() self.__printer.bar.next() diff --git a/vnrecode/utils.py b/vnrecode/utils.py index 7da06d3..c30abe3 100644 --- a/vnrecode/utils.py +++ b/vnrecode/utils.py @@ -17,7 +17,7 @@ class Utils: self.__errors = 0 self.__params = params_inst self.__printer = printer_inst - self.__duplicates = [] + self.__duplicates = {} @staticmethod def sys_pause(): @@ -102,9 +102,13 @@ class Utils: :return: Duplicate path name with (vncopy) on end """ if path.is_file() and path.exists(): - new_path = Path(path.stem + "(vncopy)" + path.suffix) - self.__duplicates.append(new_path) - return new_path + orig_name = path.name.replace("(vncopy)", "") + new_path = Path(path.parent, path.stem + "(vncopy)" + path.suffix) + try: self.__duplicates[orig_name] + except KeyError: self.__duplicates[orig_name] = [] + if not new_path.name in self.__duplicates[orig_name]: + self.__duplicates[orig_name].append(new_path.name) + return self.catch_duplicates(new_path) return path def print_duplicates(self): @@ -112,13 +116,13 @@ class Utils: Method prints message about all duplicates generated during recode process :return: None """ - for filename in self.__duplicates: + for filename in self.__duplicates.keys(): self.__printer.warning( - f'Duplicate file has been found! Check manually this files - "{filename.name}", ' - f'"{filename.stem + "(vncopy)" + filename.suffix}"' + f'Duplicate file has been found! Check manually this files - "{filename}", ' + + ', '.join(self.__duplicates[filename]) ) - def out_rename(self, out_path: Path, target: str): + def out_rename(self, out_path: Path, target: Path): """ Method removes md5 hash from file name and changes file extension in dependence of mimic mode :param out_path: Recoded file Path @@ -126,7 +130,7 @@ class Utils: :return: None """ if not self.__params.mimic_mode: - dest_name = self.catch_duplicates(Path(out_path.parent, target)) + dest_name = self.catch_duplicates(Path(out_path.parent, target.stem+out_path.suffix)) os.rename(out_path, dest_name) else: - os.rename(out_path, Path(out_path.parent, target)) + os.rename(out_path, Path(out_path.parent, target.name)) From a75314d2ad3878af69cd90d45decb1360353ab75 Mon Sep 17 00:00:00 2001 From: OleSTEEP Date: Sat, 19 Oct 2024 02:44:51 +0300 Subject: [PATCH 6/7] vnrecode: ignore ansi escapes for string cleaning --- vnrecode/printer.py | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/vnrecode/printer.py b/vnrecode/printer.py index a6046f9..19650aa 100644 --- a/vnrecode/printer.py +++ b/vnrecode/printer.py @@ -3,6 +3,7 @@ from pathlib import Path import colorama import sys import os +import re class Printer: """ @@ -26,7 +27,8 @@ class Printer: :param string: String to "clean" :return: "Clean" string """ - return string + " " * (os.get_terminal_size().columns - len(string)) + ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])') + return string + " " * (os.get_terminal_size().columns - len(ansi_escape.sub('', string))) @staticmethod def win_ascii_esc(): @@ -56,18 +58,18 @@ class Printer: def warning(self, string: str): """ - Method prints string with decor for warning messages - :param string: String to print - :return: None - """ + Method prints string with decor for warning messages + :param string: String to print + :return: None + """ self.bar_print(self.clean_str(f"\r\033[93m!\033[0m {string}\033[49m")) def error(self, string: str): """ - Method prints string with decor for error messages - :param string: String to print - :return: None - """ + Method prints string with decor for error messages + :param string: String to print + :return: None + """ self.bar_print(self.clean_str(f"\r\033[31m\u2715\033[0m {string}\033[49m")) def files(self, source_path: Path, output_path: Path, comment: str): From bc84703b73bf1d0e6f6def3368bbce59170ef8c9 Mon Sep 17 00:00:00 2001 From: OleSTEEP Date: Sat, 19 Oct 2024 02:51:57 +0300 Subject: [PATCH 7/7] vnrecode: fix typo in input folder name --- vnrecode/application.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/vnrecode/application.py b/vnrecode/application.py index c43102b..2b19681 100755 --- a/vnrecode/application.py +++ b/vnrecode/application.py @@ -43,7 +43,7 @@ class Application: if not output.exists(): os.mkdir(output) - self.__printer.info(f'Compressing "{output}" folder...') + self.__printer.info(f'Compressing "{folder}" folder...') with ThreadPoolExecutor(max_workers=self.__params.workers) as executor: futures = [