Compare commits
	
		
			7 commits
		
	
	
		
			d8e55bac9a
			...
			bc84703b73
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| bc84703b73 | |||
| a75314d2ad | |||
| 407ab98000 | |||
| df20bd3636 | |||
| 1c1e8a9292 | |||
| 4e6fd332c5 | |||
| 9bb3cdcccb | 
					 7 changed files with 334 additions and 186 deletions
				
			
		
							
								
								
									
										1
									
								
								.gitignore
									
										
									
									
										vendored
									
									
								
							
							
						
						
									
										1
									
								
								.gitignore
									
										
									
									
										vendored
									
									
								
							|  | @ -1,5 +1,6 @@ | ||||||
| /output/ | /output/ | ||||||
| /tests/ | /tests/ | ||||||
|  | /tests_compressed/ | ||||||
| /build/ | /build/ | ||||||
| /dist/ | /dist/ | ||||||
| /vntools.egg-info/ | /vntools.egg-info/ | ||||||
|  |  | ||||||
|  | @ -7,6 +7,10 @@ from .utils import Utils | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| def init(): | def init(): | ||||||
|  |     """ | ||||||
|  |     This function creates all needed class instances and run utility | ||||||
|  |     :return: None | ||||||
|  |     """ | ||||||
|     params = Params.setup() |     params = Params.setup() | ||||||
|     printer = Printer(params.source) |     printer = Printer(params.source) | ||||||
|     utils = Utils(params, printer) |     utils = Utils(params, printer) | ||||||
|  |  | ||||||
|  | @ -1,6 +1,7 @@ | ||||||
| #!/usr/bin/env python3 | #!/usr/bin/env python3 | ||||||
| from concurrent.futures import ThreadPoolExecutor, as_completed | from concurrent.futures import ThreadPoolExecutor, as_completed | ||||||
| from datetime import datetime | from datetime import datetime | ||||||
|  | from pathlib import Path | ||||||
| import shutil | import shutil | ||||||
| import os | import os | ||||||
| 
 | 
 | ||||||
|  | @ -11,43 +12,48 @@ from .utils import Utils | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| class Application: | class Application: | ||||||
|  |     """ | ||||||
|  |     Main class for utility | ||||||
|  |     """ | ||||||
| 
 | 
 | ||||||
|     def __init__(self, params: Params, compress: Compress, printer: Printer, utils: Utils): |     def __init__(self, params_inst: Params, compress_inst: Compress, printer_inst: Printer, utils_inst: Utils): | ||||||
|         self.params = params |         self.__params = params_inst | ||||||
|         self.compress = compress.compress |         self.__compress = compress_inst.compress | ||||||
|         self.printer = printer |         self.__printer = printer_inst | ||||||
|         self.utils = utils |         self.__utils = utils_inst | ||||||
| 
 |  | ||||||
|     def compress_worker(self, folder: str, file: str, source: str, output: str): |  | ||||||
|         if os.path.isfile(os.path.join(folder, file)): |  | ||||||
|             self.compress(folder, file, source, output) |  | ||||||
| 
 | 
 | ||||||
|     def run(self): |     def run(self): | ||||||
|  |         """ | ||||||
|  |         Method creates a folder in which all the recoded files will be placed, | ||||||
|  |         creates a queue of recoding processes for each file and, when the files are run out in the original folder, | ||||||
|  |         calls functions to display the result | ||||||
|  |         :return: None | ||||||
|  |         """ | ||||||
|         start_time = datetime.now() |         start_time = datetime.now() | ||||||
|         self.printer.win_ascii_esc() |         self.__printer.win_ascii_esc() | ||||||
| 
 | 
 | ||||||
|         source = os.path.abspath(self.params.source) |         source = self.__params.source | ||||||
| 
 | 
 | ||||||
|         if os.path.exists(f"{source}_compressed"): |         if self.__params.dest.exists(): | ||||||
|             shutil.rmtree(f"{source}_compressed") |             shutil.rmtree(self.__params.dest) | ||||||
| 
 | 
 | ||||||
|         self.printer.info("Creating folders...") |         self.__printer.info("Creating folders...") | ||||||
|         for folder, folders, files in os.walk(source): |         for folder, folders, files in os.walk(source): | ||||||
|             if not os.path.exists(folder.replace(source, f"{source}_compressed")): |             output = Path(folder.replace(str(source), str(self.__params.dest))) | ||||||
|                 os.mkdir(folder.replace(source, f"{source}_compressed")) |             if not output.exists(): | ||||||
|  |                 os.mkdir(output) | ||||||
| 
 | 
 | ||||||
|             self.printer.info(f'Compressing "{folder.replace(source, os.path.split(source)[-1])}" folder...') |             self.__printer.info(f'Compressing "{folder}" folder...') | ||||||
|             output = folder.replace(source, f"{source}_compressed") |  | ||||||
| 
 | 
 | ||||||
|             with ThreadPoolExecutor(max_workers=self.params.workers) as executor: |             with ThreadPoolExecutor(max_workers=self.__params.workers) as executor: | ||||||
|                 futures = [ |                 futures = [ | ||||||
|                     executor.submit(self.compress, folder, file, source, output) |                     executor.submit(self.__compress, Path(folder, file), Path(output)) | ||||||
|                     for file in files if os.path.isfile(os.path.join(folder, file)) |                     for file in files if Path(folder, file).is_file() | ||||||
|                 ] |                 ] | ||||||
|                 for future in as_completed(futures): |                 for future in as_completed(futures): | ||||||
|                     future.result() |                     future.result() | ||||||
| 
 | 
 | ||||||
|         self.utils.print_duplicates() |         self.__utils.print_duplicates() | ||||||
|         self.utils.get_compression_status(source) |         self.__utils.get_recode_status() | ||||||
|         self.utils.sys_pause() |         self.__utils.sys_pause() | ||||||
|         print(f"Time taken: {datetime.now() - start_time}") |         print(f"Time taken: {datetime.now() - start_time}") | ||||||
|  | @ -1,6 +1,6 @@ | ||||||
| from ffmpeg import FFmpeg, FFmpegError | from ffmpeg import FFmpeg, FFmpegError | ||||||
|  | from pathlib import Path | ||||||
| from PIL import Image | from PIL import Image | ||||||
| from os import path |  | ||||||
| import pillow_avif | import pillow_avif | ||||||
| 
 | 
 | ||||||
| from .printer import Printer | from .printer import Printer | ||||||
|  | @ -9,10 +9,17 @@ from .utils import Utils | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| class File: | class File: | ||||||
|  |     """ | ||||||
|  |     Class contains some methods to work with files | ||||||
|  |     """ | ||||||
| 
 | 
 | ||||||
|     @staticmethod |     @staticmethod | ||||||
|     def get_type(filename: str) -> str: |     def get_type(path: Path) -> str: | ||||||
| 
 |         """ | ||||||
|  |         Method returns filetype string for file | ||||||
|  |         :param path: Path of file to determine type | ||||||
|  |         :return: filetype string: audio, image, video, unknown | ||||||
|  |         """ | ||||||
|         extensions = { |         extensions = { | ||||||
|             "audio": ['.aac', '.flac', '.m4a', '.mp3', '.ogg', '.opus', '.raw', '.wav', '.wma'], |             "audio": ['.aac', '.flac', '.m4a', '.mp3', '.ogg', '.opus', '.raw', '.wav', '.wma'], | ||||||
|             "image": ['.apng', '.avif', '.bmp', '.tga', '.tiff', '.dds', '.svg', '.webp', '.jpg', '.jpeg', '.png'], |             "image": ['.apng', '.avif', '.bmp', '.tga', '.tiff', '.dds', '.svg', '.webp', '.jpg', '.jpeg', '.png'], | ||||||
|  | @ -21,12 +28,17 @@ class File: | ||||||
|         } |         } | ||||||
| 
 | 
 | ||||||
|         for file_type in extensions: |         for file_type in extensions: | ||||||
|             if path.splitext(filename)[1] in extensions[file_type]: |             if path.suffix in extensions[file_type]: | ||||||
|                 return file_type |                 return file_type | ||||||
|         return "unknown" |         return "unknown" | ||||||
| 
 | 
 | ||||||
|     @staticmethod |     @staticmethod | ||||||
|     def has_transparency(img: Image) -> bool: |     def has_transparency(img: Image) -> bool: | ||||||
|  |         """ | ||||||
|  |         Method checks if image has transparency | ||||||
|  |         :param img: Pillow Image | ||||||
|  |         :return: bool | ||||||
|  |         """ | ||||||
|         if img.info.get("transparency", None) is not None: |         if img.info.get("transparency", None) is not None: | ||||||
|             return True |             return True | ||||||
|         if img.mode == "P": |         if img.mode == "P": | ||||||
|  | @ -43,70 +55,58 @@ class File: | ||||||
| 
 | 
 | ||||||
| class Compress: | class Compress: | ||||||
| 
 | 
 | ||||||
|     def __init__(self, params: Params, printer: Printer, utils: Utils): |     def __init__(self, params_inst: Params, printer_inst: Printer, utils_inst: Utils): | ||||||
|         self.params = params |         self.__params = params_inst | ||||||
|         self.printer = printer |         self.__printer = printer_inst | ||||||
|         self.utils = utils |         self.__utils = utils_inst | ||||||
| 
 | 
 | ||||||
|     def audio(self, in_dir: str, file: str, out_dir: str, extension: str) -> str: |     def audio(self, input_path: Path, output_dir: Path, extension: str) -> Path: | ||||||
|         bit_rate = self.params.audio_bitrate |         """ | ||||||
|         out_file = self.utils.check_duplicates(in_dir, out_dir, f'{path.splitext(file)[0]}.{extension}') |         Method recodes audio files to another format using ffmpeg utility | ||||||
|  |         :param input_path: Path of the original audio file | ||||||
|  |         :param output_dir: Path of the output (compression) folder | ||||||
|  |         :param extension: Extension of the new audio file | ||||||
|  |         :return: Path of compressed audio file with md5 hash as prefix | ||||||
|  |         """ | ||||||
|  |         bit_rate = self.__params.audio_bitrate | ||||||
|  |         prefix = self.__utils.get_hash(input_path.name) | ||||||
|  |         out_file = Path(output_dir, f'{prefix}_{input_path.stem}.{extension}') | ||||||
|         try: |         try: | ||||||
|             (FFmpeg() |             (FFmpeg() | ||||||
|              .input(path.join(in_dir, file)) |              .input(input_path) | ||||||
|              .option("hide_banner") |              .option("hide_banner") | ||||||
|              .output(out_file,{"b:a": bit_rate, "loglevel": "error"}) |              .output(out_file,{"b:a": bit_rate, "loglevel": "error"}) | ||||||
|              .execute() |              .execute() | ||||||
|              ) |              ) | ||||||
|         except FFmpegError as e: |         except FFmpegError as e: | ||||||
|             self.utils.add_unprocessed_file(path.join(in_dir, file), path.join(out_dir, file)) |             self.__utils.catch_unprocessed(input_path, out_file, e) | ||||||
|             self.utils.errors += 1 |         self.__printer.files(input_path, out_file, f"{bit_rate}") | ||||||
|             if not self.params.hide_errors: |  | ||||||
|                 self.printer.error(f"File {file} can't be processed! Error: {e}") |  | ||||||
|         self.printer.files(file, path.splitext(file)[0], extension, f"{bit_rate}") |  | ||||||
|         return out_file |         return out_file | ||||||
| 
 | 
 | ||||||
|     def video(self, in_dir: str, file: str, out_dir: str, extension: str) -> str: |     def image(self, input_path: Path, output_dir: Path, extension: str) -> Path: | ||||||
|         if not self.params.video_skip: |         """ | ||||||
|             out_file = self.utils.check_duplicates(in_dir, out_dir, f'{path.splitext(file)[0]}.{extension}') |         Method recodes image files to another format using Pillow | ||||||
|             codec = self.params.video_codec |         :param input_path: Path of the original image file | ||||||
|             crf = self.params.video_crf |         :param output_dir: Path of the output (compression) folder | ||||||
| 
 |         :param extension: Extension of the new image file | ||||||
|  |         :return: Path of compressed image file with md5 hash as prefix | ||||||
|  |         """ | ||||||
|  |         quality = self.__params.image_quality | ||||||
|  |         prefix = self.__utils.get_hash(input_path.name) | ||||||
|  |         out_file = Path(output_dir, f"{prefix}_{input_path.stem}.{extension}") | ||||||
|         try: |         try: | ||||||
|                 (FFmpeg() |             image = Image.open(input_path) | ||||||
|                  .input(path.join(in_dir, file)) |  | ||||||
|                  .option("hide_banner") |  | ||||||
|                  .option("hwaccel", "auto") |  | ||||||
|                  .output(out_file,{"codec:v": codec, "v:b": 0, "loglevel": "error"}, crf=crf) |  | ||||||
|                  .execute() |  | ||||||
|                  ) |  | ||||||
|                 self.printer.files(file, path.splitext(file)[0], extension, codec) |  | ||||||
|             except FFmpegError as e: |  | ||||||
|                 self.utils.add_unprocessed_file(f'{in_dir}/{file}', f'{out_dir}/{file}') |  | ||||||
|                 self.utils.errors += 1 |  | ||||||
|                 if not self.params.hide_errors: |  | ||||||
|                     self.printer.error(f"File {file} can't be processed! Error: {e}") |  | ||||||
|             return out_file |  | ||||||
|         else: |  | ||||||
|             self.utils.add_unprocessed_file(f'{in_dir}/{file}', f'{out_dir}/{file}') |  | ||||||
|             return f'{out_dir}/{path.splitext(file)[0]}.{extension}' |  | ||||||
| 
 |  | ||||||
|     def image(self, in_dir: str, file: str, out_dir: str, extension: str) -> str: |  | ||||||
|         quality = self.params.image_quality |  | ||||||
|         out_file = self.utils.check_duplicates(in_dir, out_dir, f"{path.splitext(file)[0]}.{extension}") |  | ||||||
|         try: |  | ||||||
|             image = Image.open(path.join(in_dir, file)) |  | ||||||
| 
 | 
 | ||||||
|             if (extension == "jpg" or extension == "jpeg" or |             if (extension == "jpg" or extension == "jpeg" or | ||||||
|                     (extension == "webp" and not self.params.webp_rgba)): |                     (extension == "webp" and not self.__params.webp_rgba)): | ||||||
|                 if File.has_transparency(image): |                 if File.has_transparency(image): | ||||||
|                     self.printer.warning(f"{file} has transparency. Changing to fallback...") |                     self.__printer.warning(f"{input_path.name} has transparency. Changing to fallback...") | ||||||
|                     extension = self.params.image_fall_ext |                     out_file = Path(output_dir, f"{prefix}_{input_path.stem}.{self.__params.image_fall_ext}") | ||||||
| 
 | 
 | ||||||
|             if File.has_transparency(image): |             if File.has_transparency(image): | ||||||
|                 image.convert('RGBA') |                 image.convert('RGBA') | ||||||
| 
 | 
 | ||||||
|             res_downscale = self.params.image_downscale |             res_downscale = self.__params.image_downscale | ||||||
|             if res_downscale != 1: |             if res_downscale != 1: | ||||||
|                 width, height = image.size |                 width, height = image.size | ||||||
|                 new_size = (int(width / res_downscale), int(height / res_downscale)) |                 new_size = (int(width / res_downscale), int(height / res_downscale)) | ||||||
|  | @ -114,50 +114,84 @@ class Compress: | ||||||
| 
 | 
 | ||||||
|             image.save(out_file, |             image.save(out_file, | ||||||
|                        optimize=True, |                        optimize=True, | ||||||
|                        lossless=self.params.image_lossless, |                        lossless=self.__params.image_lossless, | ||||||
|                        quality=quality, |                        quality=quality, | ||||||
|                        minimize_size=True) |                        minimize_size=True) | ||||||
|             self.printer.files(file, path.splitext(file)[0], extension, f"{quality}%") |             self.__printer.files(input_path, out_file, f"{quality}%") | ||||||
|         except Exception as e: |         except Exception as e: | ||||||
|             self.utils.add_unprocessed_file(path.join(in_dir, file), path.join(out_dir, file)) |             self.__utils.catch_unprocessed(input_path, out_file, e) | ||||||
|             self.utils.errors += 1 |  | ||||||
|             if not self.params.hide_errors: |  | ||||||
|                 self.printer.error(f"File {file} can't be processed! Error: {e}") |  | ||||||
|         return out_file |         return out_file | ||||||
| 
 | 
 | ||||||
|     def unknown(self, in_dir: str, filename: str, out_dir: str) -> str: |     def video(self, input_path: Path, output_dir: Path, extension: str) -> Path: | ||||||
|         if self.params.force_compress: |         """ | ||||||
|             self.printer.unknown_file(filename) |         Method recodes video files to another format using ffmpeg utility | ||||||
|             out_file = self.utils.check_duplicates(in_dir, out_dir, filename) |         :param input_path: Path of the original video file | ||||||
|  |         :param output_dir: Path of the output (compression) folder | ||||||
|  |         :param extension: Extension of the new video file | ||||||
|  |         :return: Path of compressed video file with md5 hash as prefix | ||||||
|  |         """ | ||||||
|  |         prefix = self.__utils.get_hash(input_path.name) | ||||||
|  |         out_file = Path(output_dir, f'{prefix}_{input_path.stem}.{extension}') | ||||||
|  |         if not self.__params.video_skip: | ||||||
|  |             codec = self.__params.video_codec | ||||||
|  |             crf = self.__params.video_crf | ||||||
|  | 
 | ||||||
|             try: |             try: | ||||||
|                 (FFmpeg() |                 (FFmpeg() | ||||||
|                  .input(path.join(in_dir, filename)) |                  .input(input_path) | ||||||
|  |                  .option("hide_banner") | ||||||
|  |                  .option("hwaccel", "auto") | ||||||
|  |                  .output(out_file,{"codec:v": codec, "v:b": 0, "loglevel": "error"}, crf=crf) | ||||||
|  |                  .execute() | ||||||
|  |                  ) | ||||||
|  |                 self.__printer.files(input_path, out_file, codec) | ||||||
|  |             except FFmpegError as e: | ||||||
|  |                 self.__utils.catch_unprocessed(input_path, out_file, e) | ||||||
|  |         else: | ||||||
|  |             self.__utils.copy_unprocessed(input_path, out_file) | ||||||
|  |         return out_file | ||||||
|  | 
 | ||||||
|  |     def unknown(self, input_path: Path, output_dir: Path) -> Path: | ||||||
|  |         """ | ||||||
|  |         Method recodes files with "unknown" file format using ffmpeg, | ||||||
|  |         in the hope that ffmpeg supports this file type and the default settings for it will reduce its size | ||||||
|  |         :param input_path: Path of the original file | ||||||
|  |         :param output_dir: Path of the output (compression) folder | ||||||
|  |         :return: Path of compressed file with md5 hash as prefix | ||||||
|  |         """ | ||||||
|  |         prefix = self.__utils.get_hash(input_path.name) | ||||||
|  |         out_file = Path(output_dir, f"{prefix}_{input_path.name}") | ||||||
|  |         if self.__params.force_compress: | ||||||
|  |             self.__printer.unknown_file(input_path.name) | ||||||
|  |             try: | ||||||
|  |                 (FFmpeg() | ||||||
|  |                  .input(input_path) | ||||||
|                  .output(out_file) |                  .output(out_file) | ||||||
|                  .execute() |                  .execute() | ||||||
|                  ) |                  ) | ||||||
|             except FFmpegError as e: |             except FFmpegError as e: | ||||||
|                 self.utils.add_unprocessed_file(path.join(in_dir, filename), path.join(out_dir, filename)) |                 self.__utils.catch_unprocessed(input_path, out_file, e) | ||||||
|                 self.utils.errors += 1 |  | ||||||
|                 if not self.params.hide_errors: |  | ||||||
|                     self.printer.error(f"File {filename} can't be processed! Error: {e}") |  | ||||||
|             return out_file |  | ||||||
|         else: |         else: | ||||||
|             self.utils.add_unprocessed_file(path.join(in_dir, filename), path.join(out_dir, filename)) |             self.__utils.copy_unprocessed(input_path, out_file) | ||||||
|             return path.join(out_dir, filename) |         return out_file | ||||||
| 
 | 
 | ||||||
|     def compress(self, _dir: str, filename: str, source: str, output: str): |     def compress(self, source: Path, output: Path): | ||||||
|         match File.get_type(filename): |         """ | ||||||
|  |         It the core method for this program. Method determines file type and call compress function for it | ||||||
|  |         :param source: Path of file to compress | ||||||
|  |         :param output: Path of output file | ||||||
|  |         :return: None | ||||||
|  |         """ | ||||||
|  |         match File.get_type(source): | ||||||
|             case "audio": |             case "audio": | ||||||
|                 out_file = self.audio(_dir, filename, output, self.params.audio_ext) |                 out_file = self.audio(source, output, self.__params.audio_ext) | ||||||
|             case "image": |             case "image": | ||||||
|                 out_file = self.image(_dir, filename, output, self.params.image_ext) |                 out_file = self.image(source, output, self.__params.image_ext) | ||||||
|             case "video": |             case "video": | ||||||
|                 out_file = self.video(_dir, filename, output, self.params.video_ext) |                 out_file = self.video(source, output, self.__params.video_ext) | ||||||
|             case "unknown": |             case "unknown": | ||||||
|                 out_file = self.unknown(_dir, filename, output) |                 out_file = self.unknown(source, output) | ||||||
| 
 | 
 | ||||||
|         if self.params.mimic_mode: |         self.__utils.out_rename(out_file, source) | ||||||
|             self.utils.mimic_rename(out_file, path.join(_dir, filename), source) |         self.__printer.bar.update() | ||||||
| 
 |         self.__printer.bar.next() | ||||||
|         self.printer.bar.update() |  | ||||||
|         self.printer.bar.next() |  | ||||||
|  |  | ||||||
|  | @ -1,12 +1,16 @@ | ||||||
| from argparse import ArgumentParser, Namespace | from argparse import ArgumentParser, Namespace | ||||||
| from dataclasses import dataclass | from dataclasses import dataclass | ||||||
|  | from pathlib import Path | ||||||
| from typing import Self | from typing import Self | ||||||
| import tomllib | import tomllib | ||||||
| import os |  | ||||||
| 
 | 
 | ||||||
| @dataclass | @dataclass | ||||||
| class Params: | class Params: | ||||||
| 
 | 
 | ||||||
|  |     """ | ||||||
|  |     This dataclass contains all parameters for utility | ||||||
|  |     """ | ||||||
|  | 
 | ||||||
|     copy_unprocessed: bool |     copy_unprocessed: bool | ||||||
|     force_compress: bool |     force_compress: bool | ||||||
|     mimic_mode: bool |     mimic_mode: bool | ||||||
|  | @ -28,13 +32,18 @@ class Params: | ||||||
|     video_ext: str |     video_ext: str | ||||||
|     video_codec: str |     video_codec: str | ||||||
| 
 | 
 | ||||||
|     source: str |     source: Path | ||||||
|  |     dest: Path | ||||||
| 
 | 
 | ||||||
|     @classmethod |     @classmethod | ||||||
|     def setup(cls) -> Self: |     def setup(cls) -> Self: | ||||||
|  |         """ | ||||||
|  |         Method initialize all parameters and returns class instance | ||||||
|  |         :return: Params instance | ||||||
|  |         """ | ||||||
|         args = cls.get_args() |         args = cls.get_args() | ||||||
|         if args.config is not None: |         if args.config is not None: | ||||||
|             if os.path.isfile(args.config): |             if Path(args.config).is_file(): | ||||||
|                 with open(args.config, "rb") as cfile: |                 with open(args.config, "rb") as cfile: | ||||||
|                     config = tomllib.load(cfile) |                     config = tomllib.load(cfile) | ||||||
|             else: |             else: | ||||||
|  | @ -58,17 +67,22 @@ class Params: | ||||||
|         video_skip = config["VIDEO"]["SkipVideo"] if args.config else args.v_skip |         video_skip = config["VIDEO"]["SkipVideo"] if args.config else args.v_skip | ||||||
|         video_ext = config["VIDEO"]["Extension"] if args.config else args.v_ext |         video_ext = config["VIDEO"]["Extension"] if args.config else args.v_ext | ||||||
|         video_codec = config["VIDEO"]["Codec"] if args.config else args.v_codec |         video_codec = config["VIDEO"]["Codec"] if args.config else args.v_codec | ||||||
|         source = args.source |         source = Path(args.source) | ||||||
|  |         dest = Path(f"{args.source}_compressed") | ||||||
| 
 | 
 | ||||||
|         return cls( |         return cls( | ||||||
|             copy_unprocessed, force_compress, mimic_mode, hide_errors, webp_rgba, workers, |             copy_unprocessed, force_compress, mimic_mode, hide_errors, webp_rgba, workers, | ||||||
|             audio_ext, audio_bitrate, |             audio_ext, audio_bitrate, | ||||||
|             image_downscale, image_ext, image_fall_ext, image_lossless, image_quality, |             image_downscale, image_ext, image_fall_ext, image_lossless, image_quality, | ||||||
|             video_crf, video_skip, video_ext, video_codec, source |             video_crf, video_skip, video_ext, video_codec, source, dest | ||||||
|         ) |         ) | ||||||
| 
 | 
 | ||||||
|     @staticmethod |     @staticmethod | ||||||
|     def get_args() -> Namespace: |     def get_args() -> Namespace: | ||||||
|  |         """ | ||||||
|  |         Method gets CLI arguments and returns argparse.Namespace instance | ||||||
|  |         :return: argparse.Namespace of CLI args | ||||||
|  |         """ | ||||||
|         parser = ArgumentParser(prog="vnrecode", |         parser = ArgumentParser(prog="vnrecode", | ||||||
|                                 description="Python utility to compress Visual Novel Resources" |                                 description="Python utility to compress Visual Novel Resources" | ||||||
|                                 ) |                                 ) | ||||||
|  |  | ||||||
|  | @ -1,46 +1,93 @@ | ||||||
| from progress.bar import IncrementalBar | from progress.bar import IncrementalBar | ||||||
|  | from pathlib import Path | ||||||
| import colorama | import colorama | ||||||
| import sys | import sys | ||||||
| import os | import os | ||||||
| 
 | import re | ||||||
| 
 | 
 | ||||||
| class Printer: | class Printer: | ||||||
|  |     """ | ||||||
|  |     Class implements CLI UI for this utility | ||||||
|  |     """ | ||||||
| 
 | 
 | ||||||
|     def __init__(self, folder): |     def __init__(self, source: Path): | ||||||
|  |         """ | ||||||
|  |         :param source: Path of original (compressing) folder to count its files for progress bar | ||||||
|  |         """ | ||||||
|         file_count = 0 |         file_count = 0 | ||||||
|         for folder, folders, file in os.walk(folder): |         for folder, folders, file in os.walk(source): | ||||||
|             file_count += len(file) |             file_count += len(file) | ||||||
|         self.bar = IncrementalBar('Compressing', max=file_count, suffix='[%(index)d/%(max)d] (%(percent).1f%%)') |         self.bar = IncrementalBar('Compressing', max=file_count, suffix='[%(index)d/%(max)d] (%(percent).1f%%)') | ||||||
|         self.bar.update() |         self.bar.update() | ||||||
| 
 | 
 | ||||||
|     # Fill whole string with spaces for cleaning progress bar |  | ||||||
|     @staticmethod |     @staticmethod | ||||||
|     def clean_str(string: str) -> str: |     def clean_str(string: str) -> str: | ||||||
|         return string + " " * (os.get_terminal_size().columns - len(string)) |         """ | ||||||
|  |         Method fills end of string with spaces to remove progress bar garbage from console | ||||||
|  |         :param string: String to "clean" | ||||||
|  |         :return: "Clean" string | ||||||
|  |         """ | ||||||
|  |         ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])') | ||||||
|  |         return string + " " * (os.get_terminal_size().columns - len(ansi_escape.sub('', string))) | ||||||
| 
 | 
 | ||||||
|     @staticmethod |     @staticmethod | ||||||
|     def win_ascii_esc(): |     def win_ascii_esc(): | ||||||
|  |         """ | ||||||
|  |         Method setups colorama for cmd | ||||||
|  |         :return: None | ||||||
|  |         """ | ||||||
|         if sys.platform == "win32": |         if sys.platform == "win32": | ||||||
|             colorama.init() |             colorama.init() | ||||||
| 
 | 
 | ||||||
|     def bar_print(self, string: str): |     def bar_print(self, string: str): | ||||||
|  |         """ | ||||||
|  |         Method prints some string in console and updates progress bar | ||||||
|  |         :param string: String to print | ||||||
|  |         :return: None | ||||||
|  |         """ | ||||||
|         print(string) |         print(string) | ||||||
|         self.bar.update() |         self.bar.update() | ||||||
| 
 | 
 | ||||||
|     def info(self, string: str): |     def info(self, string: str): | ||||||
|  |         """ | ||||||
|  |         Method prints string with decor for info messages | ||||||
|  |         :param string: String to print | ||||||
|  |         :return: None | ||||||
|  |         """ | ||||||
|         self.bar_print(self.clean_str(f"\r\033[100m- {string}\033[49m")) |         self.bar_print(self.clean_str(f"\r\033[100m- {string}\033[49m")) | ||||||
| 
 | 
 | ||||||
|     def warning(self, string: str): |     def warning(self, string: str): | ||||||
|  |         """ | ||||||
|  |         Method prints string with decor for warning messages | ||||||
|  |         :param string: String to print | ||||||
|  |         :return: None | ||||||
|  |         """ | ||||||
|         self.bar_print(self.clean_str(f"\r\033[93m!\033[0m {string}\033[49m")) |         self.bar_print(self.clean_str(f"\r\033[93m!\033[0m {string}\033[49m")) | ||||||
| 
 | 
 | ||||||
|     def error(self, string: str): |     def error(self, string: str): | ||||||
|  |         """ | ||||||
|  |         Method prints string with decor for error messages | ||||||
|  |         :param string: String to print | ||||||
|  |         :return: None | ||||||
|  |         """ | ||||||
|         self.bar_print(self.clean_str(f"\r\033[31m\u2715\033[0m {string}\033[49m")) |         self.bar_print(self.clean_str(f"\r\033[31m\u2715\033[0m {string}\033[49m")) | ||||||
| 
 | 
 | ||||||
|     def files(self, source: str, dest: str, dest_ext: str, comment: str): |     def files(self, source_path: Path, output_path: Path, comment: str): | ||||||
|         source_ext = os.path.splitext(source)[1] |         """ | ||||||
|         source_name = os.path.splitext(source)[0] |         Method prints the result of recoding a file with some decorations in the form: | ||||||
|  |         input file name -> output file name (quality setting) | ||||||
|  |         :param source_path: Input file Path | ||||||
|  |         :param output_path: Output file Path | ||||||
|  |         :param comment: Comment about recode quality setting | ||||||
|  |         :return: None | ||||||
|  |         """ | ||||||
|  |         self.bar_print(self.clean_str(f"\r\033[0;32m\u2713\033[0m \033[0;37m{source_path.stem}\033[0m{source_path.suffix}\033[0;37m -> " | ||||||
|  |                                       f"{source_path.stem}\033[0m{output_path.suffix}\033[0;37m ({comment})\033[0m")) | ||||||
| 
 | 
 | ||||||
|         self.bar_print(self.clean_str(f"\r\033[0;32m\u2713\033[0m \033[0;37m{source_name}\033[0m{source_ext}\033[0;37m -> {dest}\033[0m.{dest_ext}\033[0;37m ({comment})\033[0m")) |     def unknown_file(self, filename: str): | ||||||
| 
 |         """ | ||||||
|     def unknown_file(self, file): |         Method prints the result of recoding unknown file | ||||||
|         self.bar_print(self.clean_str(f"\r* \033[0;33m{file}\033[0m (File will be force compressed via ffmpeg)")) |         :param filename: Name of unknown file | ||||||
|  |         :return: | ||||||
|  |         """ | ||||||
|  |         self.bar_print(self.clean_str(f"\r\u2713 \033[0;33m{filename}\033[0m (File will be force compressed via ffmpeg)")) | ||||||
|  |  | ||||||
|  | @ -1,94 +1,136 @@ | ||||||
| from shutil import copyfile | from shutil import copyfile | ||||||
| from glob import glob | from pathlib import Path | ||||||
|  | import hashlib | ||||||
| import sys | import sys | ||||||
| import os | import os | ||||||
| import re |  | ||||||
| 
 | 
 | ||||||
| import fnmatch | from vnrecode.printer import Printer | ||||||
|  | from vnrecode.params import Params | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| class Utils: | class Utils: | ||||||
|  |     """ | ||||||
|  |     Class contains various methods for internal utility use | ||||||
|  |     """ | ||||||
| 
 | 
 | ||||||
|     def __init__(self, params, printer): |     def __init__(self, params_inst: Params, printer_inst: Printer): | ||||||
|         self.errors = 0 |         self.__errors = 0 | ||||||
|         self.params = params |         self.__params = params_inst | ||||||
|         self.printer = printer |         self.__printer = printer_inst | ||||||
|         self.duplicates = [] |         self.__duplicates = {} | ||||||
| 
 | 
 | ||||||
|     @staticmethod |     @staticmethod | ||||||
|     def sys_pause(): |     def sys_pause(): | ||||||
|  |         """ | ||||||
|  |         Method calls pause for Windows cmd shell | ||||||
|  |         :return: None | ||||||
|  |         """ | ||||||
|         if sys.platform == "win32": |         if sys.platform == "win32": | ||||||
|             os.system("pause") |             os.system("pause") | ||||||
| 
 | 
 | ||||||
|     @staticmethod |     @staticmethod | ||||||
|     def get_size(directory: str) -> int: |     def get_hash(filename: str) -> str: | ||||||
|         total_size = 0 |         """ | ||||||
|         for folder, folders, files in os.walk(directory): |         Method returns 8 chars of md5 hash for filename | ||||||
|             for file in files: |         :param filename: File name to get md5 | ||||||
|                 if not os.path.islink(os.path.join(folder, file)): |         :return: 8 chars of md5 hash | ||||||
|                     total_size += os.path.getsize(os.path.join(folder, file)) |         """ | ||||||
|         return total_size |         return hashlib.md5(filename.encode()).hexdigest()[:8] | ||||||
| 
 | 
 | ||||||
|     def get_compression(self, source: str, output: str): |     def get_recode_status(self): | ||||||
|  |         """ | ||||||
|  |         Method prints recoding results | ||||||
|  |         :return: None | ||||||
|  |         """ | ||||||
|  |         source_len = 0 | ||||||
|  |         output_len = 0 | ||||||
|  | 
 | ||||||
|  |         for folder, folders, files in os.walk(self.__params.source): | ||||||
|  |             source_len += len(files) | ||||||
|  | 
 | ||||||
|  |         for folder, folders, files in os.walk(self.__params.dest): | ||||||
|  |             for file in files: | ||||||
|  |                 if not file.count("(vncopy)"): | ||||||
|  |                     output_len += 1 | ||||||
|  | 
 | ||||||
|  |         if self.__errors != 0: | ||||||
|  |             self.__printer.warning("Some files failed to compress!") | ||||||
|  | 
 | ||||||
|  |         if source_len == output_len: | ||||||
|  |             self.__printer.info("Success!") | ||||||
|  |         else: | ||||||
|  |             self.__printer.warning("Original and compressed folders are not identical!") | ||||||
|         try: |         try: | ||||||
|             source = self.get_size(source) |             source = sum(file.stat().st_size for file in self.__params.source.glob('**/*') if file.is_file()) | ||||||
|             output = self.get_size(output) |             output = sum(file.stat().st_size for file in self.__params.dest.glob('**/*') if file.is_file()) | ||||||
| 
 | 
 | ||||||
|             print(f"\nResult: {source/1024/1024:.2f}MB -> " |             print(f"\nResult: {source/1024/1024:.2f}MB -> " | ||||||
|                   f"{output/1024/1024:.2f}MB ({(output - source)/1024/1024:.2f}MB)") |                   f"{output/1024/1024:.2f}MB ({(output - source)/1024/1024:.2f}MB)") | ||||||
|         except ZeroDivisionError: |         except ZeroDivisionError: | ||||||
|             self.printer.warning("Nothing compressed!") |             self.__printer.warning("Nothing compressed!") | ||||||
| 
 | 
 | ||||||
|     def get_compression_status(self, source: str): |     def catch_unprocessed(self, input_path: Path, output_path: Path, error): | ||||||
|         source_len = 0 |         """ | ||||||
|         output_len = 0 |         Method processes files that have not been recoded due to an error and prints error to console | ||||||
|  |         if hide_errors parameter is False | ||||||
|  |         :param input_path: Path of unprocessed file | ||||||
|  |         :param output_path: Destination path of unprocessed file | ||||||
|  |         :param error: Recoding exception | ||||||
|  |         :return: None | ||||||
|  |         """ | ||||||
|  |         self.copy_unprocessed(input_path, output_path) | ||||||
|  |         self.__errors += 1 | ||||||
|  |         if not self.__params.hide_errors: | ||||||
|  |             self.__printer.error(f"File {input_path.name} can't be processed! Error: {error}") | ||||||
| 
 | 
 | ||||||
|         for folder, folders, files in os.walk(source): |     def copy_unprocessed(self, input_path: Path, output_path: Path): | ||||||
|             source_len += len(files) |         """ | ||||||
|  |         Method copies an unprocessed file from the source folder to the destination folder | ||||||
|  |         :param input_path: Path of unprocessed file | ||||||
|  |         :param output_path: Destination path of unprocessed file | ||||||
|  |         :return: None | ||||||
|  |         """ | ||||||
|  |         if self.__params.copy_unprocessed: | ||||||
|  |             copyfile(input_path, output_path) | ||||||
|  |             self.__printer.info(f"File {input_path.name} copied to compressed folder.") | ||||||
| 
 | 
 | ||||||
|         for folder, folders, files in os.walk(f'{source}_compressed'): |     def catch_duplicates(self, path: Path) -> Path: | ||||||
|             for file in files: |         """ | ||||||
|                 if not os.path.splitext(file)[1].count("(copy)"): |         Method checks if file path exists and returns folder/filename(vncopy).ext path | ||||||
|                     output_len += 1 |         if duplicate founded | ||||||
| 
 |         :param path: Some file Path | ||||||
|         if self.errors != 0: |         :return: Duplicate path name with (vncopy) on end | ||||||
|             self.printer.warning("Some files failed to compress!") |         """ | ||||||
| 
 |         if path.is_file() and path.exists(): | ||||||
|         if source_len == output_len: |             orig_name = path.name.replace("(vncopy)", "") | ||||||
|             self.printer.info("Success!") |             new_path = Path(path.parent, path.stem + "(vncopy)" + path.suffix) | ||||||
|         else: |             try: self.__duplicates[orig_name] | ||||||
|             self.printer.warning("Original and compressed folders are not identical!") |             except KeyError: self.__duplicates[orig_name] = [] | ||||||
|         self.get_compression(source, f"{source}_compressed") |             if not new_path.name in self.__duplicates[orig_name]: | ||||||
| 
 |                 self.__duplicates[orig_name].append(new_path.name) | ||||||
|     def add_unprocessed_file(self, source: str, output: str): |             return self.catch_duplicates(new_path) | ||||||
|         if self.params.copy_unprocessed: |         return path | ||||||
|             filename = os.path.split(source)[-1] |  | ||||||
|             copyfile(source, output) |  | ||||||
|             self.printer.info(f"File {filename} copied to compressed folder.") |  | ||||||
| 
 |  | ||||||
|     def check_duplicates(self, source: str, output: str, filename: str) -> str: |  | ||||||
|         re_pattern = re.compile(os.path.splitext(filename)[0]+r".[a-zA-Z0-9]+$", re.IGNORECASE) |  | ||||||
|         duplicates = [name for name in os.listdir(source) if re_pattern.match(name)] |  | ||||||
| 
 |  | ||||||
|         if len(duplicates) > 1: |  | ||||||
|             if filename.lower() not in (duplicate.lower() for duplicate in self.duplicates): |  | ||||||
|                 self.duplicates.append(filename) |  | ||||||
|                 new_name = os.path.splitext(filename)[0] + "(vncopy)" + os.path.splitext(filename)[1] |  | ||||||
|                 return os.path.join(output, new_name) |  | ||||||
|         return os.path.join(output, filename) |  | ||||||
| 
 | 
 | ||||||
|     def print_duplicates(self): |     def print_duplicates(self): | ||||||
|         for filename in self.duplicates: |         """ | ||||||
|             self.printer.warning( |         Method prints message about all duplicates generated during recode process | ||||||
|                 f'Duplicate file has been found! Check manually this files - "{filename}", ' |         :return: None | ||||||
|                 f'"{os.path.splitext(filename)[0] + "(vncopy)" + os.path.splitext(filename)[1]}"' |         """ | ||||||
|  |         for filename in self.__duplicates.keys(): | ||||||
|  |             self.__printer.warning( | ||||||
|  |                 f'Duplicate file has been found! Check manually this files - "{filename}", ' + | ||||||
|  |                 ', '.join(self.__duplicates[filename]) | ||||||
|             ) |             ) | ||||||
| 
 | 
 | ||||||
|     def mimic_rename(self, filename: str, target: str, source: str): |     def out_rename(self, out_path: Path, target: Path): | ||||||
|         if filename.count("(vncopy)"): |         """ | ||||||
|             orig_name = filename.replace("(vncopy)", "") |         Method removes md5 hash from file name and changes file extension in dependence of mimic mode | ||||||
|             index = self.duplicates.index(os.path.split(orig_name)[-1]) |         :param out_path: Recoded file Path | ||||||
|             self.duplicates[index] = os.path.split(target)[-1] |         :param target: Target filename | ||||||
|             target = os.path.splitext(target)[0] + "(vncopy)" + os.path.splitext(target)[1] |         :return: None | ||||||
| 
 |         """ | ||||||
|         os.rename(filename, target.replace(source, f"{source}_compressed")) |         if not self.__params.mimic_mode: | ||||||
|  |             dest_name = self.catch_duplicates(Path(out_path.parent, target.stem+out_path.suffix)) | ||||||
|  |             os.rename(out_path, dest_name) | ||||||
|  |         else: | ||||||
|  |             os.rename(out_path, Path(out_path.parent, target.name)) | ||||||
|  |  | ||||||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue