Compare commits

..

No commits in common. "bc84703b73bf1d0e6f6def3368bbce59170ef8c9" and "d8e55bac9a8b9cacba3d96e7bad1ff8ed6f44a46" have entirely different histories.

7 changed files with 185 additions and 333 deletions

1
.gitignore vendored
View file

@ -1,6 +1,5 @@
/output/ /output/
/tests/ /tests/
/tests_compressed/
/build/ /build/
/dist/ /dist/
/vntools.egg-info/ /vntools.egg-info/

View file

@ -7,10 +7,6 @@ from .utils import Utils
def init(): def init():
"""
This function creates all needed class instances and run utility
:return: None
"""
params = Params.setup() params = Params.setup()
printer = Printer(params.source) printer = Printer(params.source)
utils = Utils(params, printer) utils = Utils(params, printer)

View file

@ -1,7 +1,6 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
from concurrent.futures import ThreadPoolExecutor, as_completed from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime from datetime import datetime
from pathlib import Path
import shutil import shutil
import os import os
@ -12,48 +11,43 @@ from .utils import Utils
class Application: class Application:
"""
Main class for utility
"""
def __init__(self, params_inst: Params, compress_inst: Compress, printer_inst: Printer, utils_inst: Utils): def __init__(self, params: Params, compress: Compress, printer: Printer, utils: Utils):
self.__params = params_inst self.params = params
self.__compress = compress_inst.compress self.compress = compress.compress
self.__printer = printer_inst self.printer = printer
self.__utils = utils_inst self.utils = utils
def compress_worker(self, folder: str, file: str, source: str, output: str):
if os.path.isfile(os.path.join(folder, file)):
self.compress(folder, file, source, output)
def run(self): def run(self):
"""
Method creates a folder in which all the recoded files will be placed,
creates a queue of recoding processes for each file and, when the files are run out in the original folder,
calls functions to display the result
:return: None
"""
start_time = datetime.now() start_time = datetime.now()
self.__printer.win_ascii_esc() self.printer.win_ascii_esc()
source = self.__params.source source = os.path.abspath(self.params.source)
if self.__params.dest.exists(): if os.path.exists(f"{source}_compressed"):
shutil.rmtree(self.__params.dest) shutil.rmtree(f"{source}_compressed")
self.__printer.info("Creating folders...") self.printer.info("Creating folders...")
for folder, folders, files in os.walk(source): for folder, folders, files in os.walk(source):
output = Path(folder.replace(str(source), str(self.__params.dest))) if not os.path.exists(folder.replace(source, f"{source}_compressed")):
if not output.exists(): os.mkdir(folder.replace(source, f"{source}_compressed"))
os.mkdir(output)
self.__printer.info(f'Compressing "{folder}" folder...') self.printer.info(f'Compressing "{folder.replace(source, os.path.split(source)[-1])}" folder...')
output = folder.replace(source, f"{source}_compressed")
with ThreadPoolExecutor(max_workers=self.__params.workers) as executor: with ThreadPoolExecutor(max_workers=self.params.workers) as executor:
futures = [ futures = [
executor.submit(self.__compress, Path(folder, file), Path(output)) executor.submit(self.compress, folder, file, source, output)
for file in files if Path(folder, file).is_file() for file in files if os.path.isfile(os.path.join(folder, file))
] ]
for future in as_completed(futures): for future in as_completed(futures):
future.result() future.result()
self.__utils.print_duplicates() self.utils.print_duplicates()
self.__utils.get_recode_status() self.utils.get_compression_status(source)
self.__utils.sys_pause() self.utils.sys_pause()
print(f"Time taken: {datetime.now() - start_time}") print(f"Time taken: {datetime.now() - start_time}")

View file

@ -1,6 +1,6 @@
from ffmpeg import FFmpeg, FFmpegError from ffmpeg import FFmpeg, FFmpegError
from pathlib import Path
from PIL import Image from PIL import Image
from os import path
import pillow_avif import pillow_avif
from .printer import Printer from .printer import Printer
@ -9,17 +9,10 @@ from .utils import Utils
class File: class File:
"""
Class contains some methods to work with files
"""
@staticmethod @staticmethod
def get_type(path: Path) -> str: def get_type(filename: str) -> str:
"""
Method returns filetype string for file
:param path: Path of file to determine type
:return: filetype string: audio, image, video, unknown
"""
extensions = { extensions = {
"audio": ['.aac', '.flac', '.m4a', '.mp3', '.ogg', '.opus', '.raw', '.wav', '.wma'], "audio": ['.aac', '.flac', '.m4a', '.mp3', '.ogg', '.opus', '.raw', '.wav', '.wma'],
"image": ['.apng', '.avif', '.bmp', '.tga', '.tiff', '.dds', '.svg', '.webp', '.jpg', '.jpeg', '.png'], "image": ['.apng', '.avif', '.bmp', '.tga', '.tiff', '.dds', '.svg', '.webp', '.jpg', '.jpeg', '.png'],
@ -28,17 +21,12 @@ class File:
} }
for file_type in extensions: for file_type in extensions:
if path.suffix in extensions[file_type]: if path.splitext(filename)[1] in extensions[file_type]:
return file_type return file_type
return "unknown" return "unknown"
@staticmethod @staticmethod
def has_transparency(img: Image) -> bool: def has_transparency(img: Image) -> bool:
"""
Method checks if image has transparency
:param img: Pillow Image
:return: bool
"""
if img.info.get("transparency", None) is not None: if img.info.get("transparency", None) is not None:
return True return True
if img.mode == "P": if img.mode == "P":
@ -55,58 +43,70 @@ class File:
class Compress: class Compress:
def __init__(self, params_inst: Params, printer_inst: Printer, utils_inst: Utils): def __init__(self, params: Params, printer: Printer, utils: Utils):
self.__params = params_inst self.params = params
self.__printer = printer_inst self.printer = printer
self.__utils = utils_inst self.utils = utils
def audio(self, input_path: Path, output_dir: Path, extension: str) -> Path: def audio(self, in_dir: str, file: str, out_dir: str, extension: str) -> str:
""" bit_rate = self.params.audio_bitrate
Method recodes audio files to another format using ffmpeg utility out_file = self.utils.check_duplicates(in_dir, out_dir, f'{path.splitext(file)[0]}.{extension}')
:param input_path: Path of the original audio file
:param output_dir: Path of the output (compression) folder
:param extension: Extension of the new audio file
:return: Path of compressed audio file with md5 hash as prefix
"""
bit_rate = self.__params.audio_bitrate
prefix = self.__utils.get_hash(input_path.name)
out_file = Path(output_dir, f'{prefix}_{input_path.stem}.{extension}')
try: try:
(FFmpeg() (FFmpeg()
.input(input_path) .input(path.join(in_dir, file))
.option("hide_banner") .option("hide_banner")
.output(out_file,{"b:a": bit_rate, "loglevel": "error"}) .output(out_file,{"b:a": bit_rate, "loglevel": "error"})
.execute() .execute()
) )
except FFmpegError as e: except FFmpegError as e:
self.__utils.catch_unprocessed(input_path, out_file, e) self.utils.add_unprocessed_file(path.join(in_dir, file), path.join(out_dir, file))
self.__printer.files(input_path, out_file, f"{bit_rate}") self.utils.errors += 1
if not self.params.hide_errors:
self.printer.error(f"File {file} can't be processed! Error: {e}")
self.printer.files(file, path.splitext(file)[0], extension, f"{bit_rate}")
return out_file return out_file
def image(self, input_path: Path, output_dir: Path, extension: str) -> Path: def video(self, in_dir: str, file: str, out_dir: str, extension: str) -> str:
""" if not self.params.video_skip:
Method recodes image files to another format using Pillow out_file = self.utils.check_duplicates(in_dir, out_dir, f'{path.splitext(file)[0]}.{extension}')
:param input_path: Path of the original image file codec = self.params.video_codec
:param output_dir: Path of the output (compression) folder crf = self.params.video_crf
:param extension: Extension of the new image file
:return: Path of compressed image file with md5 hash as prefix try:
""" (FFmpeg()
quality = self.__params.image_quality .input(path.join(in_dir, file))
prefix = self.__utils.get_hash(input_path.name) .option("hide_banner")
out_file = Path(output_dir, f"{prefix}_{input_path.stem}.{extension}") .option("hwaccel", "auto")
.output(out_file,{"codec:v": codec, "v:b": 0, "loglevel": "error"}, crf=crf)
.execute()
)
self.printer.files(file, path.splitext(file)[0], extension, codec)
except FFmpegError as e:
self.utils.add_unprocessed_file(f'{in_dir}/{file}', f'{out_dir}/{file}')
self.utils.errors += 1
if not self.params.hide_errors:
self.printer.error(f"File {file} can't be processed! Error: {e}")
return out_file
else:
self.utils.add_unprocessed_file(f'{in_dir}/{file}', f'{out_dir}/{file}')
return f'{out_dir}/{path.splitext(file)[0]}.{extension}'
def image(self, in_dir: str, file: str, out_dir: str, extension: str) -> str:
quality = self.params.image_quality
out_file = self.utils.check_duplicates(in_dir, out_dir, f"{path.splitext(file)[0]}.{extension}")
try: try:
image = Image.open(input_path) image = Image.open(path.join(in_dir, file))
if (extension == "jpg" or extension == "jpeg" or if (extension == "jpg" or extension == "jpeg" or
(extension == "webp" and not self.__params.webp_rgba)): (extension == "webp" and not self.params.webp_rgba)):
if File.has_transparency(image): if File.has_transparency(image):
self.__printer.warning(f"{input_path.name} has transparency. Changing to fallback...") self.printer.warning(f"{file} has transparency. Changing to fallback...")
out_file = Path(output_dir, f"{prefix}_{input_path.stem}.{self.__params.image_fall_ext}") extension = self.params.image_fall_ext
if File.has_transparency(image): if File.has_transparency(image):
image.convert('RGBA') image.convert('RGBA')
res_downscale = self.__params.image_downscale res_downscale = self.params.image_downscale
if res_downscale != 1: if res_downscale != 1:
width, height = image.size width, height = image.size
new_size = (int(width / res_downscale), int(height / res_downscale)) new_size = (int(width / res_downscale), int(height / res_downscale))
@ -114,84 +114,50 @@ class Compress:
image.save(out_file, image.save(out_file,
optimize=True, optimize=True,
lossless=self.__params.image_lossless, lossless=self.params.image_lossless,
quality=quality, quality=quality,
minimize_size=True) minimize_size=True)
self.__printer.files(input_path, out_file, f"{quality}%") self.printer.files(file, path.splitext(file)[0], extension, f"{quality}%")
except Exception as e: except Exception as e:
self.__utils.catch_unprocessed(input_path, out_file, e) self.utils.add_unprocessed_file(path.join(in_dir, file), path.join(out_dir, file))
self.utils.errors += 1
if not self.params.hide_errors:
self.printer.error(f"File {file} can't be processed! Error: {e}")
return out_file return out_file
def video(self, input_path: Path, output_dir: Path, extension: str) -> Path: def unknown(self, in_dir: str, filename: str, out_dir: str) -> str:
""" if self.params.force_compress:
Method recodes video files to another format using ffmpeg utility self.printer.unknown_file(filename)
:param input_path: Path of the original video file out_file = self.utils.check_duplicates(in_dir, out_dir, filename)
:param output_dir: Path of the output (compression) folder
:param extension: Extension of the new video file
:return: Path of compressed video file with md5 hash as prefix
"""
prefix = self.__utils.get_hash(input_path.name)
out_file = Path(output_dir, f'{prefix}_{input_path.stem}.{extension}')
if not self.__params.video_skip:
codec = self.__params.video_codec
crf = self.__params.video_crf
try: try:
(FFmpeg() (FFmpeg()
.input(input_path) .input(path.join(in_dir, filename))
.option("hide_banner")
.option("hwaccel", "auto")
.output(out_file,{"codec:v": codec, "v:b": 0, "loglevel": "error"}, crf=crf)
.execute()
)
self.__printer.files(input_path, out_file, codec)
except FFmpegError as e:
self.__utils.catch_unprocessed(input_path, out_file, e)
else:
self.__utils.copy_unprocessed(input_path, out_file)
return out_file
def unknown(self, input_path: Path, output_dir: Path) -> Path:
"""
Method recodes files with "unknown" file format using ffmpeg,
in the hope that ffmpeg supports this file type and the default settings for it will reduce its size
:param input_path: Path of the original file
:param output_dir: Path of the output (compression) folder
:return: Path of compressed file with md5 hash as prefix
"""
prefix = self.__utils.get_hash(input_path.name)
out_file = Path(output_dir, f"{prefix}_{input_path.name}")
if self.__params.force_compress:
self.__printer.unknown_file(input_path.name)
try:
(FFmpeg()
.input(input_path)
.output(out_file) .output(out_file)
.execute() .execute()
) )
except FFmpegError as e: except FFmpegError as e:
self.__utils.catch_unprocessed(input_path, out_file, e) self.utils.add_unprocessed_file(path.join(in_dir, filename), path.join(out_dir, filename))
self.utils.errors += 1
if not self.params.hide_errors:
self.printer.error(f"File {filename} can't be processed! Error: {e}")
return out_file
else: else:
self.__utils.copy_unprocessed(input_path, out_file) self.utils.add_unprocessed_file(path.join(in_dir, filename), path.join(out_dir, filename))
return out_file return path.join(out_dir, filename)
def compress(self, source: Path, output: Path): def compress(self, _dir: str, filename: str, source: str, output: str):
""" match File.get_type(filename):
It the core method for this program. Method determines file type and call compress function for it
:param source: Path of file to compress
:param output: Path of output file
:return: None
"""
match File.get_type(source):
case "audio": case "audio":
out_file = self.audio(source, output, self.__params.audio_ext) out_file = self.audio(_dir, filename, output, self.params.audio_ext)
case "image": case "image":
out_file = self.image(source, output, self.__params.image_ext) out_file = self.image(_dir, filename, output, self.params.image_ext)
case "video": case "video":
out_file = self.video(source, output, self.__params.video_ext) out_file = self.video(_dir, filename, output, self.params.video_ext)
case "unknown": case "unknown":
out_file = self.unknown(source, output) out_file = self.unknown(_dir, filename, output)
self.__utils.out_rename(out_file, source) if self.params.mimic_mode:
self.__printer.bar.update() self.utils.mimic_rename(out_file, path.join(_dir, filename), source)
self.__printer.bar.next()
self.printer.bar.update()
self.printer.bar.next()

View file

@ -1,16 +1,12 @@
from argparse import ArgumentParser, Namespace from argparse import ArgumentParser, Namespace
from dataclasses import dataclass from dataclasses import dataclass
from pathlib import Path
from typing import Self from typing import Self
import tomllib import tomllib
import os
@dataclass @dataclass
class Params: class Params:
"""
This dataclass contains all parameters for utility
"""
copy_unprocessed: bool copy_unprocessed: bool
force_compress: bool force_compress: bool
mimic_mode: bool mimic_mode: bool
@ -32,18 +28,13 @@ class Params:
video_ext: str video_ext: str
video_codec: str video_codec: str
source: Path source: str
dest: Path
@classmethod @classmethod
def setup(cls) -> Self: def setup(cls) -> Self:
"""
Method initialize all parameters and returns class instance
:return: Params instance
"""
args = cls.get_args() args = cls.get_args()
if args.config is not None: if args.config is not None:
if Path(args.config).is_file(): if os.path.isfile(args.config):
with open(args.config, "rb") as cfile: with open(args.config, "rb") as cfile:
config = tomllib.load(cfile) config = tomllib.load(cfile)
else: else:
@ -67,22 +58,17 @@ class Params:
video_skip = config["VIDEO"]["SkipVideo"] if args.config else args.v_skip video_skip = config["VIDEO"]["SkipVideo"] if args.config else args.v_skip
video_ext = config["VIDEO"]["Extension"] if args.config else args.v_ext video_ext = config["VIDEO"]["Extension"] if args.config else args.v_ext
video_codec = config["VIDEO"]["Codec"] if args.config else args.v_codec video_codec = config["VIDEO"]["Codec"] if args.config else args.v_codec
source = Path(args.source) source = args.source
dest = Path(f"{args.source}_compressed")
return cls( return cls(
copy_unprocessed, force_compress, mimic_mode, hide_errors, webp_rgba, workers, copy_unprocessed, force_compress, mimic_mode, hide_errors, webp_rgba, workers,
audio_ext, audio_bitrate, audio_ext, audio_bitrate,
image_downscale, image_ext, image_fall_ext, image_lossless, image_quality, image_downscale, image_ext, image_fall_ext, image_lossless, image_quality,
video_crf, video_skip, video_ext, video_codec, source, dest video_crf, video_skip, video_ext, video_codec, source
) )
@staticmethod @staticmethod
def get_args() -> Namespace: def get_args() -> Namespace:
"""
Method gets CLI arguments and returns argparse.Namespace instance
:return: argparse.Namespace of CLI args
"""
parser = ArgumentParser(prog="vnrecode", parser = ArgumentParser(prog="vnrecode",
description="Python utility to compress Visual Novel Resources" description="Python utility to compress Visual Novel Resources"
) )

View file

@ -1,93 +1,46 @@
from progress.bar import IncrementalBar from progress.bar import IncrementalBar
from pathlib import Path
import colorama import colorama
import sys import sys
import os import os
import re
class Printer: class Printer:
"""
Class implements CLI UI for this utility
"""
def __init__(self, source: Path): def __init__(self, folder):
"""
:param source: Path of original (compressing) folder to count its files for progress bar
"""
file_count = 0 file_count = 0
for folder, folders, file in os.walk(source): for folder, folders, file in os.walk(folder):
file_count += len(file) file_count += len(file)
self.bar = IncrementalBar('Compressing', max=file_count, suffix='[%(index)d/%(max)d] (%(percent).1f%%)') self.bar = IncrementalBar('Compressing', max=file_count, suffix='[%(index)d/%(max)d] (%(percent).1f%%)')
self.bar.update() self.bar.update()
# Fill whole string with spaces for cleaning progress bar
@staticmethod @staticmethod
def clean_str(string: str) -> str: def clean_str(string: str) -> str:
""" return string + " " * (os.get_terminal_size().columns - len(string))
Method fills end of string with spaces to remove progress bar garbage from console
:param string: String to "clean"
:return: "Clean" string
"""
ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])')
return string + " " * (os.get_terminal_size().columns - len(ansi_escape.sub('', string)))
@staticmethod @staticmethod
def win_ascii_esc(): def win_ascii_esc():
"""
Method setups colorama for cmd
:return: None
"""
if sys.platform == "win32": if sys.platform == "win32":
colorama.init() colorama.init()
def bar_print(self, string: str): def bar_print(self, string: str):
"""
Method prints some string in console and updates progress bar
:param string: String to print
:return: None
"""
print(string) print(string)
self.bar.update() self.bar.update()
def info(self, string: str): def info(self, string: str):
"""
Method prints string with decor for info messages
:param string: String to print
:return: None
"""
self.bar_print(self.clean_str(f"\r\033[100m- {string}\033[49m")) self.bar_print(self.clean_str(f"\r\033[100m- {string}\033[49m"))
def warning(self, string: str): def warning(self, string: str):
"""
Method prints string with decor for warning messages
:param string: String to print
:return: None
"""
self.bar_print(self.clean_str(f"\r\033[93m!\033[0m {string}\033[49m")) self.bar_print(self.clean_str(f"\r\033[93m!\033[0m {string}\033[49m"))
def error(self, string: str): def error(self, string: str):
"""
Method prints string with decor for error messages
:param string: String to print
:return: None
"""
self.bar_print(self.clean_str(f"\r\033[31m\u2715\033[0m {string}\033[49m")) self.bar_print(self.clean_str(f"\r\033[31m\u2715\033[0m {string}\033[49m"))
def files(self, source_path: Path, output_path: Path, comment: str): def files(self, source: str, dest: str, dest_ext: str, comment: str):
""" source_ext = os.path.splitext(source)[1]
Method prints the result of recoding a file with some decorations in the form: source_name = os.path.splitext(source)[0]
input file name -> output file name (quality setting)
:param source_path: Input file Path
:param output_path: Output file Path
:param comment: Comment about recode quality setting
:return: None
"""
self.bar_print(self.clean_str(f"\r\033[0;32m\u2713\033[0m \033[0;37m{source_path.stem}\033[0m{source_path.suffix}\033[0;37m -> "
f"{source_path.stem}\033[0m{output_path.suffix}\033[0;37m ({comment})\033[0m"))
def unknown_file(self, filename: str): self.bar_print(self.clean_str(f"\r\033[0;32m\u2713\033[0m \033[0;37m{source_name}\033[0m{source_ext}\033[0;37m -> {dest}\033[0m.{dest_ext}\033[0;37m ({comment})\033[0m"))
"""
Method prints the result of recoding unknown file def unknown_file(self, file):
:param filename: Name of unknown file self.bar_print(self.clean_str(f"\r* \033[0;33m{file}\033[0m (File will be force compressed via ffmpeg)"))
:return:
"""
self.bar_print(self.clean_str(f"\r\u2713 \033[0;33m{filename}\033[0m (File will be force compressed via ffmpeg)"))

View file

@ -1,136 +1,94 @@
from shutil import copyfile from shutil import copyfile
from pathlib import Path from glob import glob
import hashlib
import sys import sys
import os import os
import re
from vnrecode.printer import Printer import fnmatch
from vnrecode.params import Params
class Utils: class Utils:
"""
Class contains various methods for internal utility use
"""
def __init__(self, params_inst: Params, printer_inst: Printer): def __init__(self, params, printer):
self.__errors = 0 self.errors = 0
self.__params = params_inst self.params = params
self.__printer = printer_inst self.printer = printer
self.__duplicates = {} self.duplicates = []
@staticmethod @staticmethod
def sys_pause(): def sys_pause():
"""
Method calls pause for Windows cmd shell
:return: None
"""
if sys.platform == "win32": if sys.platform == "win32":
os.system("pause") os.system("pause")
@staticmethod @staticmethod
def get_hash(filename: str) -> str: def get_size(directory: str) -> int:
""" total_size = 0
Method returns 8 chars of md5 hash for filename for folder, folders, files in os.walk(directory):
:param filename: File name to get md5
:return: 8 chars of md5 hash
"""
return hashlib.md5(filename.encode()).hexdigest()[:8]
def get_recode_status(self):
"""
Method prints recoding results
:return: None
"""
source_len = 0
output_len = 0
for folder, folders, files in os.walk(self.__params.source):
source_len += len(files)
for folder, folders, files in os.walk(self.__params.dest):
for file in files: for file in files:
if not file.count("(vncopy)"): if not os.path.islink(os.path.join(folder, file)):
output_len += 1 total_size += os.path.getsize(os.path.join(folder, file))
return total_size
if self.__errors != 0: def get_compression(self, source: str, output: str):
self.__printer.warning("Some files failed to compress!")
if source_len == output_len:
self.__printer.info("Success!")
else:
self.__printer.warning("Original and compressed folders are not identical!")
try: try:
source = sum(file.stat().st_size for file in self.__params.source.glob('**/*') if file.is_file()) source = self.get_size(source)
output = sum(file.stat().st_size for file in self.__params.dest.glob('**/*') if file.is_file()) output = self.get_size(output)
print(f"\nResult: {source/1024/1024:.2f}MB -> " print(f"\nResult: {source/1024/1024:.2f}MB -> "
f"{output/1024/1024:.2f}MB ({(output - source)/1024/1024:.2f}MB)") f"{output/1024/1024:.2f}MB ({(output - source)/1024/1024:.2f}MB)")
except ZeroDivisionError: except ZeroDivisionError:
self.__printer.warning("Nothing compressed!") self.printer.warning("Nothing compressed!")
def catch_unprocessed(self, input_path: Path, output_path: Path, error): def get_compression_status(self, source: str):
""" source_len = 0
Method processes files that have not been recoded due to an error and prints error to console output_len = 0
if hide_errors parameter is False
:param input_path: Path of unprocessed file
:param output_path: Destination path of unprocessed file
:param error: Recoding exception
:return: None
"""
self.copy_unprocessed(input_path, output_path)
self.__errors += 1
if not self.__params.hide_errors:
self.__printer.error(f"File {input_path.name} can't be processed! Error: {error}")
def copy_unprocessed(self, input_path: Path, output_path: Path): for folder, folders, files in os.walk(source):
""" source_len += len(files)
Method copies an unprocessed file from the source folder to the destination folder
:param input_path: Path of unprocessed file
:param output_path: Destination path of unprocessed file
:return: None
"""
if self.__params.copy_unprocessed:
copyfile(input_path, output_path)
self.__printer.info(f"File {input_path.name} copied to compressed folder.")
def catch_duplicates(self, path: Path) -> Path: for folder, folders, files in os.walk(f'{source}_compressed'):
""" for file in files:
Method checks if file path exists and returns folder/filename(vncopy).ext path if not os.path.splitext(file)[1].count("(copy)"):
if duplicate founded output_len += 1
:param path: Some file Path
:return: Duplicate path name with (vncopy) on end if self.errors != 0:
""" self.printer.warning("Some files failed to compress!")
if path.is_file() and path.exists():
orig_name = path.name.replace("(vncopy)", "") if source_len == output_len:
new_path = Path(path.parent, path.stem + "(vncopy)" + path.suffix) self.printer.info("Success!")
try: self.__duplicates[orig_name] else:
except KeyError: self.__duplicates[orig_name] = [] self.printer.warning("Original and compressed folders are not identical!")
if not new_path.name in self.__duplicates[orig_name]: self.get_compression(source, f"{source}_compressed")
self.__duplicates[orig_name].append(new_path.name)
return self.catch_duplicates(new_path) def add_unprocessed_file(self, source: str, output: str):
return path if self.params.copy_unprocessed:
filename = os.path.split(source)[-1]
copyfile(source, output)
self.printer.info(f"File {filename} copied to compressed folder.")
def check_duplicates(self, source: str, output: str, filename: str) -> str:
re_pattern = re.compile(os.path.splitext(filename)[0]+r".[a-zA-Z0-9]+$", re.IGNORECASE)
duplicates = [name for name in os.listdir(source) if re_pattern.match(name)]
if len(duplicates) > 1:
if filename.lower() not in (duplicate.lower() for duplicate in self.duplicates):
self.duplicates.append(filename)
new_name = os.path.splitext(filename)[0] + "(vncopy)" + os.path.splitext(filename)[1]
return os.path.join(output, new_name)
return os.path.join(output, filename)
def print_duplicates(self): def print_duplicates(self):
""" for filename in self.duplicates:
Method prints message about all duplicates generated during recode process self.printer.warning(
:return: None f'Duplicate file has been found! Check manually this files - "{filename}", '
""" f'"{os.path.splitext(filename)[0] + "(vncopy)" + os.path.splitext(filename)[1]}"'
for filename in self.__duplicates.keys():
self.__printer.warning(
f'Duplicate file has been found! Check manually this files - "{filename}", ' +
', '.join(self.__duplicates[filename])
) )
def out_rename(self, out_path: Path, target: Path): def mimic_rename(self, filename: str, target: str, source: str):
""" if filename.count("(vncopy)"):
Method removes md5 hash from file name and changes file extension in dependence of mimic mode orig_name = filename.replace("(vncopy)", "")
:param out_path: Recoded file Path index = self.duplicates.index(os.path.split(orig_name)[-1])
:param target: Target filename self.duplicates[index] = os.path.split(target)[-1]
:return: None target = os.path.splitext(target)[0] + "(vncopy)" + os.path.splitext(target)[1]
"""
if not self.__params.mimic_mode: os.rename(filename, target.replace(source, f"{source}_compressed"))
dest_name = self.catch_duplicates(Path(out_path.parent, target.stem+out_path.suffix))
os.rename(out_path, dest_name)
else:
os.rename(out_path, Path(out_path.parent, target.name))