Compare commits

...

7 commits

7 changed files with 334 additions and 186 deletions

1
.gitignore vendored
View file

@ -1,5 +1,6 @@
/output/
/tests/
/tests_compressed/
/build/
/dist/
/vntools.egg-info/

View file

@ -7,6 +7,10 @@ from .utils import Utils
def init():
"""
This function creates all needed class instances and run utility
:return: None
"""
params = Params.setup()
printer = Printer(params.source)
utils = Utils(params, printer)

View file

@ -1,6 +1,7 @@
#!/usr/bin/env python3
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime
from pathlib import Path
import shutil
import os
@ -11,43 +12,48 @@ from .utils import Utils
class Application:
"""
Main class for utility
"""
def __init__(self, params: Params, compress: Compress, printer: Printer, utils: Utils):
self.params = params
self.compress = compress.compress
self.printer = printer
self.utils = utils
def compress_worker(self, folder: str, file: str, source: str, output: str):
if os.path.isfile(os.path.join(folder, file)):
self.compress(folder, file, source, output)
def __init__(self, params_inst: Params, compress_inst: Compress, printer_inst: Printer, utils_inst: Utils):
self.__params = params_inst
self.__compress = compress_inst.compress
self.__printer = printer_inst
self.__utils = utils_inst
def run(self):
"""
Method creates a folder in which all the recoded files will be placed,
creates a queue of recoding processes for each file and, when the files are run out in the original folder,
calls functions to display the result
:return: None
"""
start_time = datetime.now()
self.printer.win_ascii_esc()
self.__printer.win_ascii_esc()
source = os.path.abspath(self.params.source)
source = self.__params.source
if os.path.exists(f"{source}_compressed"):
shutil.rmtree(f"{source}_compressed")
if self.__params.dest.exists():
shutil.rmtree(self.__params.dest)
self.printer.info("Creating folders...")
self.__printer.info("Creating folders...")
for folder, folders, files in os.walk(source):
if not os.path.exists(folder.replace(source, f"{source}_compressed")):
os.mkdir(folder.replace(source, f"{source}_compressed"))
output = Path(folder.replace(str(source), str(self.__params.dest)))
if not output.exists():
os.mkdir(output)
self.printer.info(f'Compressing "{folder.replace(source, os.path.split(source)[-1])}" folder...')
output = folder.replace(source, f"{source}_compressed")
self.__printer.info(f'Compressing "{folder}" folder...')
with ThreadPoolExecutor(max_workers=self.params.workers) as executor:
with ThreadPoolExecutor(max_workers=self.__params.workers) as executor:
futures = [
executor.submit(self.compress, folder, file, source, output)
for file in files if os.path.isfile(os.path.join(folder, file))
executor.submit(self.__compress, Path(folder, file), Path(output))
for file in files if Path(folder, file).is_file()
]
for future in as_completed(futures):
future.result()
self.utils.print_duplicates()
self.utils.get_compression_status(source)
self.utils.sys_pause()
self.__utils.print_duplicates()
self.__utils.get_recode_status()
self.__utils.sys_pause()
print(f"Time taken: {datetime.now() - start_time}")

View file

@ -1,6 +1,6 @@
from ffmpeg import FFmpeg, FFmpegError
from pathlib import Path
from PIL import Image
from os import path
import pillow_avif
from .printer import Printer
@ -9,10 +9,17 @@ from .utils import Utils
class File:
"""
Class contains some methods to work with files
"""
@staticmethod
def get_type(filename: str) -> str:
def get_type(path: Path) -> str:
"""
Method returns filetype string for file
:param path: Path of file to determine type
:return: filetype string: audio, image, video, unknown
"""
extensions = {
"audio": ['.aac', '.flac', '.m4a', '.mp3', '.ogg', '.opus', '.raw', '.wav', '.wma'],
"image": ['.apng', '.avif', '.bmp', '.tga', '.tiff', '.dds', '.svg', '.webp', '.jpg', '.jpeg', '.png'],
@ -21,12 +28,17 @@ class File:
}
for file_type in extensions:
if path.splitext(filename)[1] in extensions[file_type]:
if path.suffix in extensions[file_type]:
return file_type
return "unknown"
@staticmethod
def has_transparency(img: Image) -> bool:
"""
Method checks if image has transparency
:param img: Pillow Image
:return: bool
"""
if img.info.get("transparency", None) is not None:
return True
if img.mode == "P":
@ -43,70 +55,58 @@ class File:
class Compress:
def __init__(self, params: Params, printer: Printer, utils: Utils):
self.params = params
self.printer = printer
self.utils = utils
def __init__(self, params_inst: Params, printer_inst: Printer, utils_inst: Utils):
self.__params = params_inst
self.__printer = printer_inst
self.__utils = utils_inst
def audio(self, in_dir: str, file: str, out_dir: str, extension: str) -> str:
bit_rate = self.params.audio_bitrate
out_file = self.utils.check_duplicates(in_dir, out_dir, f'{path.splitext(file)[0]}.{extension}')
def audio(self, input_path: Path, output_dir: Path, extension: str) -> Path:
"""
Method recodes audio files to another format using ffmpeg utility
:param input_path: Path of the original audio file
:param output_dir: Path of the output (compression) folder
:param extension: Extension of the new audio file
:return: Path of compressed audio file with md5 hash as prefix
"""
bit_rate = self.__params.audio_bitrate
prefix = self.__utils.get_hash(input_path.name)
out_file = Path(output_dir, f'{prefix}_{input_path.stem}.{extension}')
try:
(FFmpeg()
.input(path.join(in_dir, file))
.input(input_path)
.option("hide_banner")
.output(out_file,{"b:a": bit_rate, "loglevel": "error"})
.execute()
)
except FFmpegError as e:
self.utils.add_unprocessed_file(path.join(in_dir, file), path.join(out_dir, file))
self.utils.errors += 1
if not self.params.hide_errors:
self.printer.error(f"File {file} can't be processed! Error: {e}")
self.printer.files(file, path.splitext(file)[0], extension, f"{bit_rate}")
self.__utils.catch_unprocessed(input_path, out_file, e)
self.__printer.files(input_path, out_file, f"{bit_rate}")
return out_file
def video(self, in_dir: str, file: str, out_dir: str, extension: str) -> str:
if not self.params.video_skip:
out_file = self.utils.check_duplicates(in_dir, out_dir, f'{path.splitext(file)[0]}.{extension}')
codec = self.params.video_codec
crf = self.params.video_crf
def image(self, input_path: Path, output_dir: Path, extension: str) -> Path:
"""
Method recodes image files to another format using Pillow
:param input_path: Path of the original image file
:param output_dir: Path of the output (compression) folder
:param extension: Extension of the new image file
:return: Path of compressed image file with md5 hash as prefix
"""
quality = self.__params.image_quality
prefix = self.__utils.get_hash(input_path.name)
out_file = Path(output_dir, f"{prefix}_{input_path.stem}.{extension}")
try:
(FFmpeg()
.input(path.join(in_dir, file))
.option("hide_banner")
.option("hwaccel", "auto")
.output(out_file,{"codec:v": codec, "v:b": 0, "loglevel": "error"}, crf=crf)
.execute()
)
self.printer.files(file, path.splitext(file)[0], extension, codec)
except FFmpegError as e:
self.utils.add_unprocessed_file(f'{in_dir}/{file}', f'{out_dir}/{file}')
self.utils.errors += 1
if not self.params.hide_errors:
self.printer.error(f"File {file} can't be processed! Error: {e}")
return out_file
else:
self.utils.add_unprocessed_file(f'{in_dir}/{file}', f'{out_dir}/{file}')
return f'{out_dir}/{path.splitext(file)[0]}.{extension}'
def image(self, in_dir: str, file: str, out_dir: str, extension: str) -> str:
quality = self.params.image_quality
out_file = self.utils.check_duplicates(in_dir, out_dir, f"{path.splitext(file)[0]}.{extension}")
try:
image = Image.open(path.join(in_dir, file))
image = Image.open(input_path)
if (extension == "jpg" or extension == "jpeg" or
(extension == "webp" and not self.params.webp_rgba)):
(extension == "webp" and not self.__params.webp_rgba)):
if File.has_transparency(image):
self.printer.warning(f"{file} has transparency. Changing to fallback...")
extension = self.params.image_fall_ext
self.__printer.warning(f"{input_path.name} has transparency. Changing to fallback...")
out_file = Path(output_dir, f"{prefix}_{input_path.stem}.{self.__params.image_fall_ext}")
if File.has_transparency(image):
image.convert('RGBA')
res_downscale = self.params.image_downscale
res_downscale = self.__params.image_downscale
if res_downscale != 1:
width, height = image.size
new_size = (int(width / res_downscale), int(height / res_downscale))
@ -114,50 +114,84 @@ class Compress:
image.save(out_file,
optimize=True,
lossless=self.params.image_lossless,
lossless=self.__params.image_lossless,
quality=quality,
minimize_size=True)
self.printer.files(file, path.splitext(file)[0], extension, f"{quality}%")
self.__printer.files(input_path, out_file, f"{quality}%")
except Exception as e:
self.utils.add_unprocessed_file(path.join(in_dir, file), path.join(out_dir, file))
self.utils.errors += 1
if not self.params.hide_errors:
self.printer.error(f"File {file} can't be processed! Error: {e}")
self.__utils.catch_unprocessed(input_path, out_file, e)
return out_file
def unknown(self, in_dir: str, filename: str, out_dir: str) -> str:
if self.params.force_compress:
self.printer.unknown_file(filename)
out_file = self.utils.check_duplicates(in_dir, out_dir, filename)
def video(self, input_path: Path, output_dir: Path, extension: str) -> Path:
"""
Method recodes video files to another format using ffmpeg utility
:param input_path: Path of the original video file
:param output_dir: Path of the output (compression) folder
:param extension: Extension of the new video file
:return: Path of compressed video file with md5 hash as prefix
"""
prefix = self.__utils.get_hash(input_path.name)
out_file = Path(output_dir, f'{prefix}_{input_path.stem}.{extension}')
if not self.__params.video_skip:
codec = self.__params.video_codec
crf = self.__params.video_crf
try:
(FFmpeg()
.input(path.join(in_dir, filename))
.input(input_path)
.option("hide_banner")
.option("hwaccel", "auto")
.output(out_file,{"codec:v": codec, "v:b": 0, "loglevel": "error"}, crf=crf)
.execute()
)
self.__printer.files(input_path, out_file, codec)
except FFmpegError as e:
self.__utils.catch_unprocessed(input_path, out_file, e)
else:
self.__utils.copy_unprocessed(input_path, out_file)
return out_file
def unknown(self, input_path: Path, output_dir: Path) -> Path:
"""
Method recodes files with "unknown" file format using ffmpeg,
in the hope that ffmpeg supports this file type and the default settings for it will reduce its size
:param input_path: Path of the original file
:param output_dir: Path of the output (compression) folder
:return: Path of compressed file with md5 hash as prefix
"""
prefix = self.__utils.get_hash(input_path.name)
out_file = Path(output_dir, f"{prefix}_{input_path.name}")
if self.__params.force_compress:
self.__printer.unknown_file(input_path.name)
try:
(FFmpeg()
.input(input_path)
.output(out_file)
.execute()
)
except FFmpegError as e:
self.utils.add_unprocessed_file(path.join(in_dir, filename), path.join(out_dir, filename))
self.utils.errors += 1
if not self.params.hide_errors:
self.printer.error(f"File {filename} can't be processed! Error: {e}")
return out_file
self.__utils.catch_unprocessed(input_path, out_file, e)
else:
self.utils.add_unprocessed_file(path.join(in_dir, filename), path.join(out_dir, filename))
return path.join(out_dir, filename)
self.__utils.copy_unprocessed(input_path, out_file)
return out_file
def compress(self, _dir: str, filename: str, source: str, output: str):
match File.get_type(filename):
def compress(self, source: Path, output: Path):
"""
It the core method for this program. Method determines file type and call compress function for it
:param source: Path of file to compress
:param output: Path of output file
:return: None
"""
match File.get_type(source):
case "audio":
out_file = self.audio(_dir, filename, output, self.params.audio_ext)
out_file = self.audio(source, output, self.__params.audio_ext)
case "image":
out_file = self.image(_dir, filename, output, self.params.image_ext)
out_file = self.image(source, output, self.__params.image_ext)
case "video":
out_file = self.video(_dir, filename, output, self.params.video_ext)
out_file = self.video(source, output, self.__params.video_ext)
case "unknown":
out_file = self.unknown(_dir, filename, output)
out_file = self.unknown(source, output)
if self.params.mimic_mode:
self.utils.mimic_rename(out_file, path.join(_dir, filename), source)
self.printer.bar.update()
self.printer.bar.next()
self.__utils.out_rename(out_file, source)
self.__printer.bar.update()
self.__printer.bar.next()

View file

@ -1,12 +1,16 @@
from argparse import ArgumentParser, Namespace
from dataclasses import dataclass
from pathlib import Path
from typing import Self
import tomllib
import os
@dataclass
class Params:
"""
This dataclass contains all parameters for utility
"""
copy_unprocessed: bool
force_compress: bool
mimic_mode: bool
@ -28,13 +32,18 @@ class Params:
video_ext: str
video_codec: str
source: str
source: Path
dest: Path
@classmethod
def setup(cls) -> Self:
"""
Method initialize all parameters and returns class instance
:return: Params instance
"""
args = cls.get_args()
if args.config is not None:
if os.path.isfile(args.config):
if Path(args.config).is_file():
with open(args.config, "rb") as cfile:
config = tomllib.load(cfile)
else:
@ -58,17 +67,22 @@ class Params:
video_skip = config["VIDEO"]["SkipVideo"] if args.config else args.v_skip
video_ext = config["VIDEO"]["Extension"] if args.config else args.v_ext
video_codec = config["VIDEO"]["Codec"] if args.config else args.v_codec
source = args.source
source = Path(args.source)
dest = Path(f"{args.source}_compressed")
return cls(
copy_unprocessed, force_compress, mimic_mode, hide_errors, webp_rgba, workers,
audio_ext, audio_bitrate,
image_downscale, image_ext, image_fall_ext, image_lossless, image_quality,
video_crf, video_skip, video_ext, video_codec, source
video_crf, video_skip, video_ext, video_codec, source, dest
)
@staticmethod
def get_args() -> Namespace:
"""
Method gets CLI arguments and returns argparse.Namespace instance
:return: argparse.Namespace of CLI args
"""
parser = ArgumentParser(prog="vnrecode",
description="Python utility to compress Visual Novel Resources"
)

View file

@ -1,46 +1,93 @@
from progress.bar import IncrementalBar
from pathlib import Path
import colorama
import sys
import os
import re
class Printer:
"""
Class implements CLI UI for this utility
"""
def __init__(self, folder):
def __init__(self, source: Path):
"""
:param source: Path of original (compressing) folder to count its files for progress bar
"""
file_count = 0
for folder, folders, file in os.walk(folder):
for folder, folders, file in os.walk(source):
file_count += len(file)
self.bar = IncrementalBar('Compressing', max=file_count, suffix='[%(index)d/%(max)d] (%(percent).1f%%)')
self.bar.update()
# Fill whole string with spaces for cleaning progress bar
@staticmethod
def clean_str(string: str) -> str:
return string + " " * (os.get_terminal_size().columns - len(string))
"""
Method fills end of string with spaces to remove progress bar garbage from console
:param string: String to "clean"
:return: "Clean" string
"""
ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])')
return string + " " * (os.get_terminal_size().columns - len(ansi_escape.sub('', string)))
@staticmethod
def win_ascii_esc():
"""
Method setups colorama for cmd
:return: None
"""
if sys.platform == "win32":
colorama.init()
def bar_print(self, string: str):
"""
Method prints some string in console and updates progress bar
:param string: String to print
:return: None
"""
print(string)
self.bar.update()
def info(self, string: str):
"""
Method prints string with decor for info messages
:param string: String to print
:return: None
"""
self.bar_print(self.clean_str(f"\r\033[100m- {string}\033[49m"))
def warning(self, string: str):
"""
Method prints string with decor for warning messages
:param string: String to print
:return: None
"""
self.bar_print(self.clean_str(f"\r\033[93m!\033[0m {string}\033[49m"))
def error(self, string: str):
"""
Method prints string with decor for error messages
:param string: String to print
:return: None
"""
self.bar_print(self.clean_str(f"\r\033[31m\u2715\033[0m {string}\033[49m"))
def files(self, source: str, dest: str, dest_ext: str, comment: str):
source_ext = os.path.splitext(source)[1]
source_name = os.path.splitext(source)[0]
def files(self, source_path: Path, output_path: Path, comment: str):
"""
Method prints the result of recoding a file with some decorations in the form:
input file name -> output file name (quality setting)
:param source_path: Input file Path
:param output_path: Output file Path
:param comment: Comment about recode quality setting
:return: None
"""
self.bar_print(self.clean_str(f"\r\033[0;32m\u2713\033[0m \033[0;37m{source_path.stem}\033[0m{source_path.suffix}\033[0;37m -> "
f"{source_path.stem}\033[0m{output_path.suffix}\033[0;37m ({comment})\033[0m"))
self.bar_print(self.clean_str(f"\r\033[0;32m\u2713\033[0m \033[0;37m{source_name}\033[0m{source_ext}\033[0;37m -> {dest}\033[0m.{dest_ext}\033[0;37m ({comment})\033[0m"))
def unknown_file(self, file):
self.bar_print(self.clean_str(f"\r* \033[0;33m{file}\033[0m (File will be force compressed via ffmpeg)"))
def unknown_file(self, filename: str):
"""
Method prints the result of recoding unknown file
:param filename: Name of unknown file
:return:
"""
self.bar_print(self.clean_str(f"\r\u2713 \033[0;33m{filename}\033[0m (File will be force compressed via ffmpeg)"))

View file

@ -1,94 +1,136 @@
from shutil import copyfile
from glob import glob
from pathlib import Path
import hashlib
import sys
import os
import re
import fnmatch
from vnrecode.printer import Printer
from vnrecode.params import Params
class Utils:
"""
Class contains various methods for internal utility use
"""
def __init__(self, params, printer):
self.errors = 0
self.params = params
self.printer = printer
self.duplicates = []
def __init__(self, params_inst: Params, printer_inst: Printer):
self.__errors = 0
self.__params = params_inst
self.__printer = printer_inst
self.__duplicates = {}
@staticmethod
def sys_pause():
"""
Method calls pause for Windows cmd shell
:return: None
"""
if sys.platform == "win32":
os.system("pause")
@staticmethod
def get_size(directory: str) -> int:
total_size = 0
for folder, folders, files in os.walk(directory):
for file in files:
if not os.path.islink(os.path.join(folder, file)):
total_size += os.path.getsize(os.path.join(folder, file))
return total_size
def get_hash(filename: str) -> str:
"""
Method returns 8 chars of md5 hash for filename
:param filename: File name to get md5
:return: 8 chars of md5 hash
"""
return hashlib.md5(filename.encode()).hexdigest()[:8]
def get_compression(self, source: str, output: str):
def get_recode_status(self):
"""
Method prints recoding results
:return: None
"""
source_len = 0
output_len = 0
for folder, folders, files in os.walk(self.__params.source):
source_len += len(files)
for folder, folders, files in os.walk(self.__params.dest):
for file in files:
if not file.count("(vncopy)"):
output_len += 1
if self.__errors != 0:
self.__printer.warning("Some files failed to compress!")
if source_len == output_len:
self.__printer.info("Success!")
else:
self.__printer.warning("Original and compressed folders are not identical!")
try:
source = self.get_size(source)
output = self.get_size(output)
source = sum(file.stat().st_size for file in self.__params.source.glob('**/*') if file.is_file())
output = sum(file.stat().st_size for file in self.__params.dest.glob('**/*') if file.is_file())
print(f"\nResult: {source/1024/1024:.2f}MB -> "
f"{output/1024/1024:.2f}MB ({(output - source)/1024/1024:.2f}MB)")
except ZeroDivisionError:
self.printer.warning("Nothing compressed!")
self.__printer.warning("Nothing compressed!")
def get_compression_status(self, source: str):
source_len = 0
output_len = 0
def catch_unprocessed(self, input_path: Path, output_path: Path, error):
"""
Method processes files that have not been recoded due to an error and prints error to console
if hide_errors parameter is False
:param input_path: Path of unprocessed file
:param output_path: Destination path of unprocessed file
:param error: Recoding exception
:return: None
"""
self.copy_unprocessed(input_path, output_path)
self.__errors += 1
if not self.__params.hide_errors:
self.__printer.error(f"File {input_path.name} can't be processed! Error: {error}")
for folder, folders, files in os.walk(source):
source_len += len(files)
def copy_unprocessed(self, input_path: Path, output_path: Path):
"""
Method copies an unprocessed file from the source folder to the destination folder
:param input_path: Path of unprocessed file
:param output_path: Destination path of unprocessed file
:return: None
"""
if self.__params.copy_unprocessed:
copyfile(input_path, output_path)
self.__printer.info(f"File {input_path.name} copied to compressed folder.")
for folder, folders, files in os.walk(f'{source}_compressed'):
for file in files:
if not os.path.splitext(file)[1].count("(copy)"):
output_len += 1
if self.errors != 0:
self.printer.warning("Some files failed to compress!")
if source_len == output_len:
self.printer.info("Success!")
else:
self.printer.warning("Original and compressed folders are not identical!")
self.get_compression(source, f"{source}_compressed")
def add_unprocessed_file(self, source: str, output: str):
if self.params.copy_unprocessed:
filename = os.path.split(source)[-1]
copyfile(source, output)
self.printer.info(f"File {filename} copied to compressed folder.")
def check_duplicates(self, source: str, output: str, filename: str) -> str:
re_pattern = re.compile(os.path.splitext(filename)[0]+r".[a-zA-Z0-9]+$", re.IGNORECASE)
duplicates = [name for name in os.listdir(source) if re_pattern.match(name)]
if len(duplicates) > 1:
if filename.lower() not in (duplicate.lower() for duplicate in self.duplicates):
self.duplicates.append(filename)
new_name = os.path.splitext(filename)[0] + "(vncopy)" + os.path.splitext(filename)[1]
return os.path.join(output, new_name)
return os.path.join(output, filename)
def catch_duplicates(self, path: Path) -> Path:
"""
Method checks if file path exists and returns folder/filename(vncopy).ext path
if duplicate founded
:param path: Some file Path
:return: Duplicate path name with (vncopy) on end
"""
if path.is_file() and path.exists():
orig_name = path.name.replace("(vncopy)", "")
new_path = Path(path.parent, path.stem + "(vncopy)" + path.suffix)
try: self.__duplicates[orig_name]
except KeyError: self.__duplicates[orig_name] = []
if not new_path.name in self.__duplicates[orig_name]:
self.__duplicates[orig_name].append(new_path.name)
return self.catch_duplicates(new_path)
return path
def print_duplicates(self):
for filename in self.duplicates:
self.printer.warning(
f'Duplicate file has been found! Check manually this files - "{filename}", '
f'"{os.path.splitext(filename)[0] + "(vncopy)" + os.path.splitext(filename)[1]}"'
"""
Method prints message about all duplicates generated during recode process
:return: None
"""
for filename in self.__duplicates.keys():
self.__printer.warning(
f'Duplicate file has been found! Check manually this files - "{filename}", ' +
', '.join(self.__duplicates[filename])
)
def mimic_rename(self, filename: str, target: str, source: str):
if filename.count("(vncopy)"):
orig_name = filename.replace("(vncopy)", "")
index = self.duplicates.index(os.path.split(orig_name)[-1])
self.duplicates[index] = os.path.split(target)[-1]
target = os.path.splitext(target)[0] + "(vncopy)" + os.path.splitext(target)[1]
os.rename(filename, target.replace(source, f"{source}_compressed"))
def out_rename(self, out_path: Path, target: Path):
"""
Method removes md5 hash from file name and changes file extension in dependence of mimic mode
:param out_path: Recoded file Path
:param target: Target filename
:return: None
"""
if not self.__params.mimic_mode:
dest_name = self.catch_duplicates(Path(out_path.parent, target.stem+out_path.suffix))
os.rename(out_path, dest_name)
else:
os.rename(out_path, Path(out_path.parent, target.name))