vnrecode: pathlib for all paths

This commit is contained in:
OleSTEEP 2024-10-19 00:25:52 +03:00
parent 4e6fd332c5
commit 1c1e8a9292
5 changed files with 108 additions and 104 deletions

View file

@ -1,6 +1,7 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
from concurrent.futures import ThreadPoolExecutor, as_completed from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime from datetime import datetime
from pathlib import Path
import shutil import shutil
import os import os
@ -12,11 +13,11 @@ from .utils import Utils
class Application: class Application:
def __init__(self, params: Params, compress: Compress, printer: Printer, utils: Utils): def __init__(self, params_inst: Params, compress_inst: Compress, printer_inst: Printer, utils_inst: Utils):
self.__params = params self.__params = params_inst
self.__compress = compress.compress self.__compress = compress_inst.compress
self.__printer = printer self.__printer = printer_inst
self.__utils = utils self.__utils = utils_inst
def run(self): def run(self):
start_time = datetime.now() start_time = datetime.now()
@ -24,21 +25,21 @@ class Application:
source = self.__params.source source = self.__params.source
if os.path.exists(self.__params.dest): if self.__params.dest.exists():
shutil.rmtree(self.__params.dest) shutil.rmtree(self.__params.dest)
self.__printer.info("Creating folders...") self.__printer.info("Creating folders...")
for folder, folders, files in os.walk(source): for folder, folders, files in os.walk(source):
if not os.path.exists(folder.replace(source, self.__params.dest)): output = Path(folder.replace(str(source), str(self.__params.dest)))
os.mkdir(folder.replace(source, self.__params.dest)) if not output.exists():
os.mkdir(output)
self.__printer.info(f'Compressing "{folder.replace(source, os.path.split(source)[-1])}" folder...') self.__printer.info(f'Compressing "{output}" folder...')
output = folder.replace(source, self.__params.dest)
with ThreadPoolExecutor(max_workers=self.__params.workers) as executor: with ThreadPoolExecutor(max_workers=self.__params.workers) as executor:
futures = [ futures = [
executor.submit(self.__compress, folder, file, output) executor.submit(self.__compress, Path(folder, file), Path(output))
for file in files if os.path.isfile(os.path.join(folder, file)) for file in files if Path(folder, file).is_file()
] ]
for future in as_completed(futures): for future in as_completed(futures):
future.result() future.result()

View file

@ -1,6 +1,6 @@
from ffmpeg import FFmpeg, FFmpegError from ffmpeg import FFmpeg, FFmpegError
from pathlib import Path
from PIL import Image from PIL import Image
from os import path
import pillow_avif import pillow_avif
from .printer import Printer from .printer import Printer
@ -11,7 +11,7 @@ from .utils import Utils
class File: class File:
@staticmethod @staticmethod
def get_type(filename: str) -> str: def get_type(filename: Path) -> str:
extensions = { extensions = {
"audio": ['.aac', '.flac', '.m4a', '.mp3', '.ogg', '.opus', '.raw', '.wav', '.wma'], "audio": ['.aac', '.flac', '.m4a', '.mp3', '.ogg', '.opus', '.raw', '.wav', '.wma'],
@ -21,7 +21,7 @@ class File:
} }
for file_type in extensions: for file_type in extensions:
if path.splitext(filename)[1] in extensions[file_type]: if filename.suffix in extensions[file_type]:
return file_type return file_type
return "unknown" return "unknown"
@ -43,61 +43,39 @@ class File:
class Compress: class Compress:
def __init__(self, params: Params, printer: Printer, utils: Utils): def __init__(self, params_inst: Params, printer_inst: Printer, utils_inst: Utils):
self.__params = params self.__params = params_inst
self.__printer = printer self.__printer = printer_inst
self.__utils = utils self.__utils = utils_inst
def audio(self, in_dir: str, file: str, out_dir: str, extension: str) -> str: def audio(self, input_path: Path, output_dir: Path, extension: str) -> Path:
bit_rate = self.__params.audio_bitrate bit_rate = self.__params.audio_bitrate
prefix = self.__utils.get_hash(file) prefix = self.__utils.get_hash(input_path.name)
out_file = path.join(out_dir, f'{prefix}_{path.splitext(file)[0]}.{extension}') out_file = Path(output_dir, f'{prefix}_{input_path.stem}.{extension}')
try: try:
(FFmpeg() (FFmpeg()
.input(path.join(in_dir, file)) .input(input_path)
.option("hide_banner") .option("hide_banner")
.output(out_file,{"b:a": bit_rate, "loglevel": "error"}) .output(out_file,{"b:a": bit_rate, "loglevel": "error"})
.execute() .execute()
) )
except FFmpegError as e: except FFmpegError as e:
self.__utils.catch_unprocessed(path.join(in_dir, file), out_file, e) self.__utils.catch_unprocessed(input_path, out_file, e)
self.__printer.files(file, path.splitext(file)[0], extension, f"{bit_rate}") self.__printer.files(input_path, out_file, f"{bit_rate}")
return out_file return out_file
def video(self, in_dir: str, file: str, out_dir: str, extension: str) -> str: def image(self, input_path: Path, output_dir: Path, extension: str) -> Path:
prefix = self.__utils.get_hash(file)
out_file = path.join(out_dir, f'{prefix}_{path.splitext(file)[0]}.{extension}')
if not self.__params.video_skip:
codec = self.__params.video_codec
crf = self.__params.video_crf
try:
(FFmpeg()
.input(path.join(in_dir, file))
.option("hide_banner")
.option("hwaccel", "auto")
.output(out_file,{"codec:v": codec, "v:b": 0, "loglevel": "error"}, crf=crf)
.execute()
)
self.__printer.files(file, path.splitext(file)[0], extension, codec)
except FFmpegError as e:
self.__utils.catch_unprocessed(path.join(in_dir, file), out_file, e)
else:
self.__utils.copy_unprocessed(path.join(in_dir, file), out_file)
return out_file
def image(self, in_dir: str, file: str, out_dir: str, extension: str) -> str:
quality = self.__params.image_quality quality = self.__params.image_quality
prefix = self.__utils.get_hash(file) prefix = self.__utils.get_hash(input_path.name)
out_file = path.join(out_dir, f"{prefix}_{path.splitext(file)[0]}.{extension}") out_file = Path(output_dir, f"{prefix}_{input_path.stem}.{extension}")
try: try:
image = Image.open(path.join(in_dir, file)) image = Image.open(input_path)
if (extension == "jpg" or extension == "jpeg" or if (extension == "jpg" or extension == "jpeg" or
(extension == "webp" and not self.__params.webp_rgba)): (extension == "webp" and not self.__params.webp_rgba)):
if File.has_transparency(image): if File.has_transparency(image):
self.__printer.warning(f"{file} has transparency. Changing to fallback...") self.__printer.warning(f"{input_path.name} has transparency. Changing to fallback...")
extension = self.__params.image_fall_ext out_file = Path(output_dir, f"{prefix}_{input_path.stem}.{self.__params.image_fall_ext}")
if File.has_transparency(image): if File.has_transparency(image):
image.convert('RGBA') image.convert('RGBA')
@ -113,39 +91,61 @@ class Compress:
lossless=self.__params.image_lossless, lossless=self.__params.image_lossless,
quality=quality, quality=quality,
minimize_size=True) minimize_size=True)
self.__printer.files(file, path.splitext(file)[0], extension, f"{quality}%") self.__printer.files(input_path, out_file, f"{quality}%")
except Exception as e: except Exception as e:
self.__utils.catch_unprocessed(path.join(in_dir, file), out_file, e) self.__utils.catch_unprocessed(input_path, out_file, e)
return out_file return out_file
def unknown(self, in_dir: str, file: str, out_dir: str) -> str: def video(self, input_path: Path, output_dir: Path, extension: str) -> Path:
prefix = self.__utils.get_hash(file) prefix = self.__utils.get_hash(input_path.name)
out_file = path.join(out_dir, f"{prefix}_{file}") out_file = Path(output_dir, f'{prefix}_{input_path.stem}.{extension}')
if self.__params.force_compress: if not self.__params.video_skip:
self.__printer.unknown_file(file) codec = self.__params.video_codec
crf = self.__params.video_crf
try: try:
(FFmpeg() (FFmpeg()
.input(path.join(in_dir, file)) .input(input_path)
.option("hide_banner")
.option("hwaccel", "auto")
.output(out_file,{"codec:v": codec, "v:b": 0, "loglevel": "error"}, crf=crf)
.execute()
)
self.__printer.files(input_path, out_file, codec)
except FFmpegError as e:
self.__utils.catch_unprocessed(input_path, out_file, e)
else:
self.__utils.copy_unprocessed(input_path, out_file)
return out_file
def unknown(self, input_path: Path, output_dir: Path) -> Path:
prefix = self.__utils.get_hash(input_path.name)
out_file = Path(output_dir, f"{prefix}_{input_path.name}")
if self.__params.force_compress:
self.__printer.unknown_file(input_path.name)
try:
(FFmpeg()
.input(input_path)
.output(out_file) .output(out_file)
.execute() .execute()
) )
except FFmpegError as e: except FFmpegError as e:
self.__utils.catch_unprocessed(path.join(in_dir, file), out_file, e) self.__utils.catch_unprocessed(input_path, out_file, e)
else: else:
self.__utils.copy_unprocessed(path.join(in_dir, file), out_file) self.__utils.copy_unprocessed(input_path, out_file)
return out_file return out_file
def compress(self, dir_: str, filename: str, output: str): def compress(self, source: Path, output: Path):
match File.get_type(filename): match File.get_type(source):
case "audio": case "audio":
out_file = self.audio(dir_, filename, output, self.__params.audio_ext) out_file = self.audio(source, output, self.__params.audio_ext)
case "image": case "image":
out_file = self.image(dir_, filename, output, self.__params.image_ext) out_file = self.image(source, output, self.__params.image_ext)
case "video": case "video":
out_file = self.video(dir_, filename, output, self.__params.video_ext) out_file = self.video(source, output, self.__params.video_ext)
case "unknown": case "unknown":
out_file = self.unknown(dir_, filename, output) out_file = self.unknown(source, output)
self.__utils.out_rename(out_file, filename) self.__utils.out_rename(out_file, source.name)
self.__printer.bar.update() self.__printer.bar.update()
self.__printer.bar.next() self.__printer.bar.next()

View file

@ -1,8 +1,8 @@
from argparse import ArgumentParser, Namespace from argparse import ArgumentParser, Namespace
from dataclasses import dataclass from dataclasses import dataclass
from pathlib import Path
from typing import Self from typing import Self
import tomllib import tomllib
import os
@dataclass @dataclass
class Params: class Params:
@ -28,14 +28,14 @@ class Params:
video_ext: str video_ext: str
video_codec: str video_codec: str
source: str source: Path
dest: str dest: Path
@classmethod @classmethod
def setup(cls) -> Self: def setup(cls) -> Self:
args = cls.get_args() args = cls.get_args()
if args.config is not None: if args.config is not None:
if os.path.isfile(args.config): if Path(args.config).is_file():
with open(args.config, "rb") as cfile: with open(args.config, "rb") as cfile:
config = tomllib.load(cfile) config = tomllib.load(cfile)
else: else:
@ -59,8 +59,8 @@ class Params:
video_skip = config["VIDEO"]["SkipVideo"] if args.config else args.v_skip video_skip = config["VIDEO"]["SkipVideo"] if args.config else args.v_skip
video_ext = config["VIDEO"]["Extension"] if args.config else args.v_ext video_ext = config["VIDEO"]["Extension"] if args.config else args.v_ext
video_codec = config["VIDEO"]["Codec"] if args.config else args.v_codec video_codec = config["VIDEO"]["Codec"] if args.config else args.v_codec
source = args.source source = Path(args.source)
dest = f"{source}_compressed" dest = Path(f"{args.source}_compressed")
return cls( return cls(
copy_unprocessed, force_compress, mimic_mode, hide_errors, webp_rgba, workers, copy_unprocessed, force_compress, mimic_mode, hide_errors, webp_rgba, workers,

View file

@ -1,12 +1,12 @@
from progress.bar import IncrementalBar from progress.bar import IncrementalBar
from pathlib import Path
import colorama import colorama
import sys import sys
import os import os
class Printer: class Printer:
def __init__(self, folder): def __init__(self, folder: Path):
file_count = 0 file_count = 0
for folder, folders, file in os.walk(folder): for folder, folders, file in os.walk(folder):
file_count += len(file) file_count += len(file)
@ -36,11 +36,9 @@ class Printer:
def error(self, string: str): def error(self, string: str):
self.bar_print(self.clean_str(f"\r\033[31m\u2715\033[0m {string}\033[49m")) self.bar_print(self.clean_str(f"\r\033[31m\u2715\033[0m {string}\033[49m"))
def files(self, source: str, dest: str, dest_ext: str, comment: str): def files(self, source_path: Path, output_path: Path, comment: str):
source_ext = os.path.splitext(source)[1] self.bar_print(self.clean_str(f"\r\033[0;32m\u2713\033[0m \033[0;37m{source_path.stem}\033[0m{source_path.suffix}\033[0;37m -> "
source_name = os.path.splitext(source)[0] f"{source_path.stem}\033[0m{output_path.suffix}\033[0;37m ({comment})\033[0m"))
self.bar_print(self.clean_str(f"\r\033[0;32m\u2713\033[0m \033[0;37m{source_name}\033[0m{source_ext}\033[0;37m -> {dest}\033[0m.{dest_ext}\033[0;37m ({comment})\033[0m")) def unknown_file(self, file: str):
def unknown_file(self, file):
self.bar_print(self.clean_str(f"\r* \033[0;33m{file}\033[0m (File will be force compressed via ffmpeg)")) self.bar_print(self.clean_str(f"\r* \033[0;33m{file}\033[0m (File will be force compressed via ffmpeg)"))

View file

@ -1,12 +1,16 @@
from shutil import copyfile from shutil import copyfile
from pathlib import Path
import hashlib import hashlib
import sys import sys
import os import os
from vnrecode.printer import Printer
from vnrecode.params import Params
class Utils: class Utils:
def __init__(self, params_inst, printer_inst): def __init__(self, params_inst: Params, printer_inst: Printer):
self.__errors = 0 self.__errors = 0
self.__params = params_inst self.__params = params_inst
self.__printer = printer_inst self.__printer = printer_inst
@ -18,12 +22,13 @@ class Utils:
os.system("pause") os.system("pause")
@staticmethod @staticmethod
def get_size(directory: str) -> int: def get_size(directory: Path) -> int:
total_size = 0 total_size = 0
for folder, folders, files in os.walk(directory): for folder, folders, files in os.walk(directory):
for file in files: for file in files:
if not os.path.islink(os.path.join(folder, file)): path = Path(folder, file)
total_size += os.path.getsize(os.path.join(folder, file)) if not path.is_symlink():
total_size += path.stat().st_size
return total_size return total_size
@staticmethod @staticmethod
@ -39,7 +44,7 @@ class Utils:
for folder, folders, files in os.walk(self.__params.dest): for folder, folders, files in os.walk(self.__params.dest):
for file in files: for file in files:
if not os.path.splitext(file)[1].count("(vncopy)"): if not file.count("(vncopy)"):
output_len += 1 output_len += 1
if self.__errors != 0: if self.__errors != 0:
@ -58,20 +63,20 @@ class Utils:
except ZeroDivisionError: except ZeroDivisionError:
self.__printer.warning("Nothing compressed!") self.__printer.warning("Nothing compressed!")
def catch_unprocessed(self, source, output, error): def catch_unprocessed(self, input_path: Path, output_path: Path, error):
self.copy_unprocessed(source, error) self.copy_unprocessed(input_path, output_path)
self.__errors += 1 self.__errors += 1
if not self.__params.hide_errors: if not self.__params.hide_errors:
self.__printer.error(f"File {os.path.split(source)[-1]} can't be processed! Error: {error}") self.__printer.error(f"File {input_path.name} can't be processed! Error: {error}")
def copy_unprocessed(self, source, output): def copy_unprocessed(self, input_path: Path, output_path: Path):
if self.__params.copy_unprocessed: if self.__params.copy_unprocessed:
copyfile(source, output) copyfile(input_path, output_path)
self.__printer.info(f"File {os.path.split(source)[-1]} copied to compressed folder.") self.__printer.info(f"File {input_path.name} copied to compressed folder.")
def catch_duplicates(self, path: str) -> str: def catch_duplicates(self, path: Path) -> Path:
if os.path.exists(path): if path.exists():
new_path = os.path.splitext(path)[0] + "(vncopy)" + os.path.splitext(path)[1] new_path = Path(path.stem + "(vncopy)" + path.suffix)
self.__duplicates.append(new_path) self.__duplicates.append(new_path)
return new_path return new_path
return path return path
@ -79,13 +84,13 @@ class Utils:
def print_duplicates(self): def print_duplicates(self):
for filename in self.__duplicates: for filename in self.__duplicates:
self.__printer.warning( self.__printer.warning(
f'Duplicate file has been found! Check manually this files - "{filename}", ' f'Duplicate file has been found! Check manually this files - "{filename.name}", '
f'"{os.path.splitext(filename)[0] + "(vncopy)" + os.path.splitext(filename)[1]}"' f'"{filename.stem + "(vncopy)" + filename.suffix}"'
) )
def out_rename(self, filename: str, target: str): def out_rename(self, out_path: Path, target: str):
if not self.__params.mimic_mode: if not self.__params.mimic_mode:
dest_name = self.catch_duplicates(os.path.join(os.path.dirname(filename), target)) dest_name = self.catch_duplicates(Path(out_path.parent, target))
os.rename(filename, dest_name) os.rename(out_path, dest_name)
else: else:
os.rename(filename, os.path.join(os.path.dirname(filename), target)) os.rename(out_path, Path(out_path.parent, target))