Compare commits

...

7 commits

7 changed files with 334 additions and 186 deletions

1
.gitignore vendored
View file

@ -1,5 +1,6 @@
/output/ /output/
/tests/ /tests/
/tests_compressed/
/build/ /build/
/dist/ /dist/
/vntools.egg-info/ /vntools.egg-info/

View file

@ -7,6 +7,10 @@ from .utils import Utils
def init(): def init():
"""
This function creates all needed class instances and run utility
:return: None
"""
params = Params.setup() params = Params.setup()
printer = Printer(params.source) printer = Printer(params.source)
utils = Utils(params, printer) utils = Utils(params, printer)

View file

@ -1,6 +1,7 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
from concurrent.futures import ThreadPoolExecutor, as_completed from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime from datetime import datetime
from pathlib import Path
import shutil import shutil
import os import os
@ -11,43 +12,48 @@ from .utils import Utils
class Application: class Application:
"""
Main class for utility
"""
def __init__(self, params: Params, compress: Compress, printer: Printer, utils: Utils): def __init__(self, params_inst: Params, compress_inst: Compress, printer_inst: Printer, utils_inst: Utils):
self.params = params self.__params = params_inst
self.compress = compress.compress self.__compress = compress_inst.compress
self.printer = printer self.__printer = printer_inst
self.utils = utils self.__utils = utils_inst
def compress_worker(self, folder: str, file: str, source: str, output: str):
if os.path.isfile(os.path.join(folder, file)):
self.compress(folder, file, source, output)
def run(self): def run(self):
"""
Method creates a folder in which all the recoded files will be placed,
creates a queue of recoding processes for each file and, when the files are run out in the original folder,
calls functions to display the result
:return: None
"""
start_time = datetime.now() start_time = datetime.now()
self.printer.win_ascii_esc() self.__printer.win_ascii_esc()
source = os.path.abspath(self.params.source) source = self.__params.source
if os.path.exists(f"{source}_compressed"): if self.__params.dest.exists():
shutil.rmtree(f"{source}_compressed") shutil.rmtree(self.__params.dest)
self.printer.info("Creating folders...") self.__printer.info("Creating folders...")
for folder, folders, files in os.walk(source): for folder, folders, files in os.walk(source):
if not os.path.exists(folder.replace(source, f"{source}_compressed")): output = Path(folder.replace(str(source), str(self.__params.dest)))
os.mkdir(folder.replace(source, f"{source}_compressed")) if not output.exists():
os.mkdir(output)
self.printer.info(f'Compressing "{folder.replace(source, os.path.split(source)[-1])}" folder...') self.__printer.info(f'Compressing "{folder}" folder...')
output = folder.replace(source, f"{source}_compressed")
with ThreadPoolExecutor(max_workers=self.params.workers) as executor: with ThreadPoolExecutor(max_workers=self.__params.workers) as executor:
futures = [ futures = [
executor.submit(self.compress, folder, file, source, output) executor.submit(self.__compress, Path(folder, file), Path(output))
for file in files if os.path.isfile(os.path.join(folder, file)) for file in files if Path(folder, file).is_file()
] ]
for future in as_completed(futures): for future in as_completed(futures):
future.result() future.result()
self.utils.print_duplicates() self.__utils.print_duplicates()
self.utils.get_compression_status(source) self.__utils.get_recode_status()
self.utils.sys_pause() self.__utils.sys_pause()
print(f"Time taken: {datetime.now() - start_time}") print(f"Time taken: {datetime.now() - start_time}")

View file

@ -1,6 +1,6 @@
from ffmpeg import FFmpeg, FFmpegError from ffmpeg import FFmpeg, FFmpegError
from pathlib import Path
from PIL import Image from PIL import Image
from os import path
import pillow_avif import pillow_avif
from .printer import Printer from .printer import Printer
@ -9,10 +9,17 @@ from .utils import Utils
class File: class File:
"""
Class contains some methods to work with files
"""
@staticmethod @staticmethod
def get_type(filename: str) -> str: def get_type(path: Path) -> str:
"""
Method returns filetype string for file
:param path: Path of file to determine type
:return: filetype string: audio, image, video, unknown
"""
extensions = { extensions = {
"audio": ['.aac', '.flac', '.m4a', '.mp3', '.ogg', '.opus', '.raw', '.wav', '.wma'], "audio": ['.aac', '.flac', '.m4a', '.mp3', '.ogg', '.opus', '.raw', '.wav', '.wma'],
"image": ['.apng', '.avif', '.bmp', '.tga', '.tiff', '.dds', '.svg', '.webp', '.jpg', '.jpeg', '.png'], "image": ['.apng', '.avif', '.bmp', '.tga', '.tiff', '.dds', '.svg', '.webp', '.jpg', '.jpeg', '.png'],
@ -21,12 +28,17 @@ class File:
} }
for file_type in extensions: for file_type in extensions:
if path.splitext(filename)[1] in extensions[file_type]: if path.suffix in extensions[file_type]:
return file_type return file_type
return "unknown" return "unknown"
@staticmethod @staticmethod
def has_transparency(img: Image) -> bool: def has_transparency(img: Image) -> bool:
"""
Method checks if image has transparency
:param img: Pillow Image
:return: bool
"""
if img.info.get("transparency", None) is not None: if img.info.get("transparency", None) is not None:
return True return True
if img.mode == "P": if img.mode == "P":
@ -43,70 +55,58 @@ class File:
class Compress: class Compress:
def __init__(self, params: Params, printer: Printer, utils: Utils): def __init__(self, params_inst: Params, printer_inst: Printer, utils_inst: Utils):
self.params = params self.__params = params_inst
self.printer = printer self.__printer = printer_inst
self.utils = utils self.__utils = utils_inst
def audio(self, in_dir: str, file: str, out_dir: str, extension: str) -> str: def audio(self, input_path: Path, output_dir: Path, extension: str) -> Path:
bit_rate = self.params.audio_bitrate """
out_file = self.utils.check_duplicates(in_dir, out_dir, f'{path.splitext(file)[0]}.{extension}') Method recodes audio files to another format using ffmpeg utility
:param input_path: Path of the original audio file
:param output_dir: Path of the output (compression) folder
:param extension: Extension of the new audio file
:return: Path of compressed audio file with md5 hash as prefix
"""
bit_rate = self.__params.audio_bitrate
prefix = self.__utils.get_hash(input_path.name)
out_file = Path(output_dir, f'{prefix}_{input_path.stem}.{extension}')
try: try:
(FFmpeg() (FFmpeg()
.input(path.join(in_dir, file)) .input(input_path)
.option("hide_banner") .option("hide_banner")
.output(out_file,{"b:a": bit_rate, "loglevel": "error"}) .output(out_file,{"b:a": bit_rate, "loglevel": "error"})
.execute() .execute()
) )
except FFmpegError as e: except FFmpegError as e:
self.utils.add_unprocessed_file(path.join(in_dir, file), path.join(out_dir, file)) self.__utils.catch_unprocessed(input_path, out_file, e)
self.utils.errors += 1 self.__printer.files(input_path, out_file, f"{bit_rate}")
if not self.params.hide_errors:
self.printer.error(f"File {file} can't be processed! Error: {e}")
self.printer.files(file, path.splitext(file)[0], extension, f"{bit_rate}")
return out_file return out_file
def video(self, in_dir: str, file: str, out_dir: str, extension: str) -> str: def image(self, input_path: Path, output_dir: Path, extension: str) -> Path:
if not self.params.video_skip: """
out_file = self.utils.check_duplicates(in_dir, out_dir, f'{path.splitext(file)[0]}.{extension}') Method recodes image files to another format using Pillow
codec = self.params.video_codec :param input_path: Path of the original image file
crf = self.params.video_crf :param output_dir: Path of the output (compression) folder
:param extension: Extension of the new image file
:return: Path of compressed image file with md5 hash as prefix
"""
quality = self.__params.image_quality
prefix = self.__utils.get_hash(input_path.name)
out_file = Path(output_dir, f"{prefix}_{input_path.stem}.{extension}")
try: try:
(FFmpeg() image = Image.open(input_path)
.input(path.join(in_dir, file))
.option("hide_banner")
.option("hwaccel", "auto")
.output(out_file,{"codec:v": codec, "v:b": 0, "loglevel": "error"}, crf=crf)
.execute()
)
self.printer.files(file, path.splitext(file)[0], extension, codec)
except FFmpegError as e:
self.utils.add_unprocessed_file(f'{in_dir}/{file}', f'{out_dir}/{file}')
self.utils.errors += 1
if not self.params.hide_errors:
self.printer.error(f"File {file} can't be processed! Error: {e}")
return out_file
else:
self.utils.add_unprocessed_file(f'{in_dir}/{file}', f'{out_dir}/{file}')
return f'{out_dir}/{path.splitext(file)[0]}.{extension}'
def image(self, in_dir: str, file: str, out_dir: str, extension: str) -> str:
quality = self.params.image_quality
out_file = self.utils.check_duplicates(in_dir, out_dir, f"{path.splitext(file)[0]}.{extension}")
try:
image = Image.open(path.join(in_dir, file))
if (extension == "jpg" or extension == "jpeg" or if (extension == "jpg" or extension == "jpeg" or
(extension == "webp" and not self.params.webp_rgba)): (extension == "webp" and not self.__params.webp_rgba)):
if File.has_transparency(image): if File.has_transparency(image):
self.printer.warning(f"{file} has transparency. Changing to fallback...") self.__printer.warning(f"{input_path.name} has transparency. Changing to fallback...")
extension = self.params.image_fall_ext out_file = Path(output_dir, f"{prefix}_{input_path.stem}.{self.__params.image_fall_ext}")
if File.has_transparency(image): if File.has_transparency(image):
image.convert('RGBA') image.convert('RGBA')
res_downscale = self.params.image_downscale res_downscale = self.__params.image_downscale
if res_downscale != 1: if res_downscale != 1:
width, height = image.size width, height = image.size
new_size = (int(width / res_downscale), int(height / res_downscale)) new_size = (int(width / res_downscale), int(height / res_downscale))
@ -114,50 +114,84 @@ class Compress:
image.save(out_file, image.save(out_file,
optimize=True, optimize=True,
lossless=self.params.image_lossless, lossless=self.__params.image_lossless,
quality=quality, quality=quality,
minimize_size=True) minimize_size=True)
self.printer.files(file, path.splitext(file)[0], extension, f"{quality}%") self.__printer.files(input_path, out_file, f"{quality}%")
except Exception as e: except Exception as e:
self.utils.add_unprocessed_file(path.join(in_dir, file), path.join(out_dir, file)) self.__utils.catch_unprocessed(input_path, out_file, e)
self.utils.errors += 1
if not self.params.hide_errors:
self.printer.error(f"File {file} can't be processed! Error: {e}")
return out_file return out_file
def unknown(self, in_dir: str, filename: str, out_dir: str) -> str: def video(self, input_path: Path, output_dir: Path, extension: str) -> Path:
if self.params.force_compress: """
self.printer.unknown_file(filename) Method recodes video files to another format using ffmpeg utility
out_file = self.utils.check_duplicates(in_dir, out_dir, filename) :param input_path: Path of the original video file
:param output_dir: Path of the output (compression) folder
:param extension: Extension of the new video file
:return: Path of compressed video file with md5 hash as prefix
"""
prefix = self.__utils.get_hash(input_path.name)
out_file = Path(output_dir, f'{prefix}_{input_path.stem}.{extension}')
if not self.__params.video_skip:
codec = self.__params.video_codec
crf = self.__params.video_crf
try: try:
(FFmpeg() (FFmpeg()
.input(path.join(in_dir, filename)) .input(input_path)
.option("hide_banner")
.option("hwaccel", "auto")
.output(out_file,{"codec:v": codec, "v:b": 0, "loglevel": "error"}, crf=crf)
.execute()
)
self.__printer.files(input_path, out_file, codec)
except FFmpegError as e:
self.__utils.catch_unprocessed(input_path, out_file, e)
else:
self.__utils.copy_unprocessed(input_path, out_file)
return out_file
def unknown(self, input_path: Path, output_dir: Path) -> Path:
"""
Method recodes files with "unknown" file format using ffmpeg,
in the hope that ffmpeg supports this file type and the default settings for it will reduce its size
:param input_path: Path of the original file
:param output_dir: Path of the output (compression) folder
:return: Path of compressed file with md5 hash as prefix
"""
prefix = self.__utils.get_hash(input_path.name)
out_file = Path(output_dir, f"{prefix}_{input_path.name}")
if self.__params.force_compress:
self.__printer.unknown_file(input_path.name)
try:
(FFmpeg()
.input(input_path)
.output(out_file) .output(out_file)
.execute() .execute()
) )
except FFmpegError as e: except FFmpegError as e:
self.utils.add_unprocessed_file(path.join(in_dir, filename), path.join(out_dir, filename)) self.__utils.catch_unprocessed(input_path, out_file, e)
self.utils.errors += 1
if not self.params.hide_errors:
self.printer.error(f"File {filename} can't be processed! Error: {e}")
return out_file
else: else:
self.utils.add_unprocessed_file(path.join(in_dir, filename), path.join(out_dir, filename)) self.__utils.copy_unprocessed(input_path, out_file)
return path.join(out_dir, filename) return out_file
def compress(self, _dir: str, filename: str, source: str, output: str): def compress(self, source: Path, output: Path):
match File.get_type(filename): """
It the core method for this program. Method determines file type and call compress function for it
:param source: Path of file to compress
:param output: Path of output file
:return: None
"""
match File.get_type(source):
case "audio": case "audio":
out_file = self.audio(_dir, filename, output, self.params.audio_ext) out_file = self.audio(source, output, self.__params.audio_ext)
case "image": case "image":
out_file = self.image(_dir, filename, output, self.params.image_ext) out_file = self.image(source, output, self.__params.image_ext)
case "video": case "video":
out_file = self.video(_dir, filename, output, self.params.video_ext) out_file = self.video(source, output, self.__params.video_ext)
case "unknown": case "unknown":
out_file = self.unknown(_dir, filename, output) out_file = self.unknown(source, output)
if self.params.mimic_mode: self.__utils.out_rename(out_file, source)
self.utils.mimic_rename(out_file, path.join(_dir, filename), source) self.__printer.bar.update()
self.__printer.bar.next()
self.printer.bar.update()
self.printer.bar.next()

View file

@ -1,12 +1,16 @@
from argparse import ArgumentParser, Namespace from argparse import ArgumentParser, Namespace
from dataclasses import dataclass from dataclasses import dataclass
from pathlib import Path
from typing import Self from typing import Self
import tomllib import tomllib
import os
@dataclass @dataclass
class Params: class Params:
"""
This dataclass contains all parameters for utility
"""
copy_unprocessed: bool copy_unprocessed: bool
force_compress: bool force_compress: bool
mimic_mode: bool mimic_mode: bool
@ -28,13 +32,18 @@ class Params:
video_ext: str video_ext: str
video_codec: str video_codec: str
source: str source: Path
dest: Path
@classmethod @classmethod
def setup(cls) -> Self: def setup(cls) -> Self:
"""
Method initialize all parameters and returns class instance
:return: Params instance
"""
args = cls.get_args() args = cls.get_args()
if args.config is not None: if args.config is not None:
if os.path.isfile(args.config): if Path(args.config).is_file():
with open(args.config, "rb") as cfile: with open(args.config, "rb") as cfile:
config = tomllib.load(cfile) config = tomllib.load(cfile)
else: else:
@ -58,17 +67,22 @@ class Params:
video_skip = config["VIDEO"]["SkipVideo"] if args.config else args.v_skip video_skip = config["VIDEO"]["SkipVideo"] if args.config else args.v_skip
video_ext = config["VIDEO"]["Extension"] if args.config else args.v_ext video_ext = config["VIDEO"]["Extension"] if args.config else args.v_ext
video_codec = config["VIDEO"]["Codec"] if args.config else args.v_codec video_codec = config["VIDEO"]["Codec"] if args.config else args.v_codec
source = args.source source = Path(args.source)
dest = Path(f"{args.source}_compressed")
return cls( return cls(
copy_unprocessed, force_compress, mimic_mode, hide_errors, webp_rgba, workers, copy_unprocessed, force_compress, mimic_mode, hide_errors, webp_rgba, workers,
audio_ext, audio_bitrate, audio_ext, audio_bitrate,
image_downscale, image_ext, image_fall_ext, image_lossless, image_quality, image_downscale, image_ext, image_fall_ext, image_lossless, image_quality,
video_crf, video_skip, video_ext, video_codec, source video_crf, video_skip, video_ext, video_codec, source, dest
) )
@staticmethod @staticmethod
def get_args() -> Namespace: def get_args() -> Namespace:
"""
Method gets CLI arguments and returns argparse.Namespace instance
:return: argparse.Namespace of CLI args
"""
parser = ArgumentParser(prog="vnrecode", parser = ArgumentParser(prog="vnrecode",
description="Python utility to compress Visual Novel Resources" description="Python utility to compress Visual Novel Resources"
) )

View file

@ -1,46 +1,93 @@
from progress.bar import IncrementalBar from progress.bar import IncrementalBar
from pathlib import Path
import colorama import colorama
import sys import sys
import os import os
import re
class Printer: class Printer:
"""
Class implements CLI UI for this utility
"""
def __init__(self, folder): def __init__(self, source: Path):
"""
:param source: Path of original (compressing) folder to count its files for progress bar
"""
file_count = 0 file_count = 0
for folder, folders, file in os.walk(folder): for folder, folders, file in os.walk(source):
file_count += len(file) file_count += len(file)
self.bar = IncrementalBar('Compressing', max=file_count, suffix='[%(index)d/%(max)d] (%(percent).1f%%)') self.bar = IncrementalBar('Compressing', max=file_count, suffix='[%(index)d/%(max)d] (%(percent).1f%%)')
self.bar.update() self.bar.update()
# Fill whole string with spaces for cleaning progress bar
@staticmethod @staticmethod
def clean_str(string: str) -> str: def clean_str(string: str) -> str:
return string + " " * (os.get_terminal_size().columns - len(string)) """
Method fills end of string with spaces to remove progress bar garbage from console
:param string: String to "clean"
:return: "Clean" string
"""
ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])')
return string + " " * (os.get_terminal_size().columns - len(ansi_escape.sub('', string)))
@staticmethod @staticmethod
def win_ascii_esc(): def win_ascii_esc():
"""
Method setups colorama for cmd
:return: None
"""
if sys.platform == "win32": if sys.platform == "win32":
colorama.init() colorama.init()
def bar_print(self, string: str): def bar_print(self, string: str):
"""
Method prints some string in console and updates progress bar
:param string: String to print
:return: None
"""
print(string) print(string)
self.bar.update() self.bar.update()
def info(self, string: str): def info(self, string: str):
"""
Method prints string with decor for info messages
:param string: String to print
:return: None
"""
self.bar_print(self.clean_str(f"\r\033[100m- {string}\033[49m")) self.bar_print(self.clean_str(f"\r\033[100m- {string}\033[49m"))
def warning(self, string: str): def warning(self, string: str):
"""
Method prints string with decor for warning messages
:param string: String to print
:return: None
"""
self.bar_print(self.clean_str(f"\r\033[93m!\033[0m {string}\033[49m")) self.bar_print(self.clean_str(f"\r\033[93m!\033[0m {string}\033[49m"))
def error(self, string: str): def error(self, string: str):
"""
Method prints string with decor for error messages
:param string: String to print
:return: None
"""
self.bar_print(self.clean_str(f"\r\033[31m\u2715\033[0m {string}\033[49m")) self.bar_print(self.clean_str(f"\r\033[31m\u2715\033[0m {string}\033[49m"))
def files(self, source: str, dest: str, dest_ext: str, comment: str): def files(self, source_path: Path, output_path: Path, comment: str):
source_ext = os.path.splitext(source)[1] """
source_name = os.path.splitext(source)[0] Method prints the result of recoding a file with some decorations in the form:
input file name -> output file name (quality setting)
:param source_path: Input file Path
:param output_path: Output file Path
:param comment: Comment about recode quality setting
:return: None
"""
self.bar_print(self.clean_str(f"\r\033[0;32m\u2713\033[0m \033[0;37m{source_path.stem}\033[0m{source_path.suffix}\033[0;37m -> "
f"{source_path.stem}\033[0m{output_path.suffix}\033[0;37m ({comment})\033[0m"))
self.bar_print(self.clean_str(f"\r\033[0;32m\u2713\033[0m \033[0;37m{source_name}\033[0m{source_ext}\033[0;37m -> {dest}\033[0m.{dest_ext}\033[0;37m ({comment})\033[0m")) def unknown_file(self, filename: str):
"""
def unknown_file(self, file): Method prints the result of recoding unknown file
self.bar_print(self.clean_str(f"\r* \033[0;33m{file}\033[0m (File will be force compressed via ffmpeg)")) :param filename: Name of unknown file
:return:
"""
self.bar_print(self.clean_str(f"\r\u2713 \033[0;33m{filename}\033[0m (File will be force compressed via ffmpeg)"))

View file

@ -1,94 +1,136 @@
from shutil import copyfile from shutil import copyfile
from glob import glob from pathlib import Path
import hashlib
import sys import sys
import os import os
import re
import fnmatch from vnrecode.printer import Printer
from vnrecode.params import Params
class Utils: class Utils:
"""
Class contains various methods for internal utility use
"""
def __init__(self, params, printer): def __init__(self, params_inst: Params, printer_inst: Printer):
self.errors = 0 self.__errors = 0
self.params = params self.__params = params_inst
self.printer = printer self.__printer = printer_inst
self.duplicates = [] self.__duplicates = {}
@staticmethod @staticmethod
def sys_pause(): def sys_pause():
"""
Method calls pause for Windows cmd shell
:return: None
"""
if sys.platform == "win32": if sys.platform == "win32":
os.system("pause") os.system("pause")
@staticmethod @staticmethod
def get_size(directory: str) -> int: def get_hash(filename: str) -> str:
total_size = 0 """
for folder, folders, files in os.walk(directory): Method returns 8 chars of md5 hash for filename
for file in files: :param filename: File name to get md5
if not os.path.islink(os.path.join(folder, file)): :return: 8 chars of md5 hash
total_size += os.path.getsize(os.path.join(folder, file)) """
return total_size return hashlib.md5(filename.encode()).hexdigest()[:8]
def get_compression(self, source: str, output: str): def get_recode_status(self):
"""
Method prints recoding results
:return: None
"""
source_len = 0
output_len = 0
for folder, folders, files in os.walk(self.__params.source):
source_len += len(files)
for folder, folders, files in os.walk(self.__params.dest):
for file in files:
if not file.count("(vncopy)"):
output_len += 1
if self.__errors != 0:
self.__printer.warning("Some files failed to compress!")
if source_len == output_len:
self.__printer.info("Success!")
else:
self.__printer.warning("Original and compressed folders are not identical!")
try: try:
source = self.get_size(source) source = sum(file.stat().st_size for file in self.__params.source.glob('**/*') if file.is_file())
output = self.get_size(output) output = sum(file.stat().st_size for file in self.__params.dest.glob('**/*') if file.is_file())
print(f"\nResult: {source/1024/1024:.2f}MB -> " print(f"\nResult: {source/1024/1024:.2f}MB -> "
f"{output/1024/1024:.2f}MB ({(output - source)/1024/1024:.2f}MB)") f"{output/1024/1024:.2f}MB ({(output - source)/1024/1024:.2f}MB)")
except ZeroDivisionError: except ZeroDivisionError:
self.printer.warning("Nothing compressed!") self.__printer.warning("Nothing compressed!")
def get_compression_status(self, source: str): def catch_unprocessed(self, input_path: Path, output_path: Path, error):
source_len = 0 """
output_len = 0 Method processes files that have not been recoded due to an error and prints error to console
if hide_errors parameter is False
:param input_path: Path of unprocessed file
:param output_path: Destination path of unprocessed file
:param error: Recoding exception
:return: None
"""
self.copy_unprocessed(input_path, output_path)
self.__errors += 1
if not self.__params.hide_errors:
self.__printer.error(f"File {input_path.name} can't be processed! Error: {error}")
for folder, folders, files in os.walk(source): def copy_unprocessed(self, input_path: Path, output_path: Path):
source_len += len(files) """
Method copies an unprocessed file from the source folder to the destination folder
:param input_path: Path of unprocessed file
:param output_path: Destination path of unprocessed file
:return: None
"""
if self.__params.copy_unprocessed:
copyfile(input_path, output_path)
self.__printer.info(f"File {input_path.name} copied to compressed folder.")
for folder, folders, files in os.walk(f'{source}_compressed'): def catch_duplicates(self, path: Path) -> Path:
for file in files: """
if not os.path.splitext(file)[1].count("(copy)"): Method checks if file path exists and returns folder/filename(vncopy).ext path
output_len += 1 if duplicate founded
:param path: Some file Path
if self.errors != 0: :return: Duplicate path name with (vncopy) on end
self.printer.warning("Some files failed to compress!") """
if path.is_file() and path.exists():
if source_len == output_len: orig_name = path.name.replace("(vncopy)", "")
self.printer.info("Success!") new_path = Path(path.parent, path.stem + "(vncopy)" + path.suffix)
else: try: self.__duplicates[orig_name]
self.printer.warning("Original and compressed folders are not identical!") except KeyError: self.__duplicates[orig_name] = []
self.get_compression(source, f"{source}_compressed") if not new_path.name in self.__duplicates[orig_name]:
self.__duplicates[orig_name].append(new_path.name)
def add_unprocessed_file(self, source: str, output: str): return self.catch_duplicates(new_path)
if self.params.copy_unprocessed: return path
filename = os.path.split(source)[-1]
copyfile(source, output)
self.printer.info(f"File {filename} copied to compressed folder.")
def check_duplicates(self, source: str, output: str, filename: str) -> str:
re_pattern = re.compile(os.path.splitext(filename)[0]+r".[a-zA-Z0-9]+$", re.IGNORECASE)
duplicates = [name for name in os.listdir(source) if re_pattern.match(name)]
if len(duplicates) > 1:
if filename.lower() not in (duplicate.lower() for duplicate in self.duplicates):
self.duplicates.append(filename)
new_name = os.path.splitext(filename)[0] + "(vncopy)" + os.path.splitext(filename)[1]
return os.path.join(output, new_name)
return os.path.join(output, filename)
def print_duplicates(self): def print_duplicates(self):
for filename in self.duplicates: """
self.printer.warning( Method prints message about all duplicates generated during recode process
f'Duplicate file has been found! Check manually this files - "{filename}", ' :return: None
f'"{os.path.splitext(filename)[0] + "(vncopy)" + os.path.splitext(filename)[1]}"' """
for filename in self.__duplicates.keys():
self.__printer.warning(
f'Duplicate file has been found! Check manually this files - "{filename}", ' +
', '.join(self.__duplicates[filename])
) )
def mimic_rename(self, filename: str, target: str, source: str): def out_rename(self, out_path: Path, target: Path):
if filename.count("(vncopy)"): """
orig_name = filename.replace("(vncopy)", "") Method removes md5 hash from file name and changes file extension in dependence of mimic mode
index = self.duplicates.index(os.path.split(orig_name)[-1]) :param out_path: Recoded file Path
self.duplicates[index] = os.path.split(target)[-1] :param target: Target filename
target = os.path.splitext(target)[0] + "(vncopy)" + os.path.splitext(target)[1] :return: None
"""
os.rename(filename, target.replace(source, f"{source}_compressed")) if not self.__params.mimic_mode:
dest_name = self.catch_duplicates(Path(out_path.parent, target.stem+out_path.suffix))
os.rename(out_path, dest_name)
else:
os.rename(out_path, Path(out_path.parent, target.name))