Last active
December 3, 2023 04:05
-
-
Save yrvsyh/c10b54bc014646e4ab90d9060ac0bd6a to your computer and use it in GitHub Desktop.
archive photo and video by date
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import os | |
import re | |
import sys | |
import math | |
import errno | |
import shutil | |
import signal | |
import hashlib | |
import fnmatch | |
import exifread | |
import subprocess | |
from datetime import datetime, timezone, timedelta | |
from rich import progress | |
def _monkey_patch_exifread(): | |
from exifread import HEICExifFinder | |
from exifread.heic import NoParser | |
_old_get_parser = HEICExifFinder.get_parser | |
def _get_parser(self, box): | |
try: | |
return _old_get_parser(self, box) | |
except NoParser: | |
return None | |
HEICExifFinder.get_parser = _get_parser | |
if exifread.__version__ == '3.0.0': | |
_monkey_patch_exifread() | |
def convert_size(size_bytes): | |
if size_bytes == 0: | |
return "0B" | |
size_name = ("B", "KB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB") | |
i = int(math.floor(math.log(size_bytes, 1024))) | |
p = math.pow(1024, i) | |
s = round(size_bytes / p, 2) | |
return "%s %s" % (s, size_name[i]) | |
photo_ext = ['.jpg', '.jpeg', '.png', '.gif', '.heic', '.nef'] | |
video_ext = ['.mp4', '.mov', '.hevc'] | |
live_ext = ['.mov'] | |
def get_media_type(path): | |
_, ext = os.path.splitext(path) | |
if ext.lower() in photo_ext: | |
return 'photo' | |
elif ext.lower() in video_ext: | |
return 'video' | |
else: | |
return None | |
def get_media_time(path): | |
media_time = None | |
media_type = get_media_type(path) | |
if media_type == 'photo': | |
with open(path, 'rb') as f: | |
try: | |
exif = exifread.process_file(f) | |
media_time_str = exif.get('EXIF DateTimeOriginal') | |
if media_time_str is None: | |
media_time_str = exif.get('Image DateTime') | |
if media_time_str is not None: | |
media_time = datetime.strptime(str(media_time_str), '%Y:%m:%d %H:%M:%S') | |
except Exception as e : | |
print(f'\033[31m[Exif ] {path} {e}\033[0m') | |
elif media_type == 'video': | |
result = subprocess.run(f'ffprobe -v quiet -select_streams v:0 -show_entries stream_tags=creation_time -of default=noprint_wrappers=1:nokey=1 {path}', shell=True, stdout=subprocess.PIPE) | |
media_time_str = result.stdout.decode('utf-8') | |
if len(media_time_str) > 0: | |
media_time = datetime.fromisoformat(media_time_str[:-1]).astimezone(timezone(timedelta(hours=8))).replace(tzinfo=None) | |
return media_time | |
def get_modify_time(path): | |
return datetime.fromtimestamp(os.stat(path).st_mtime).replace(microsecond=0) | |
def get_file_size(path): | |
return os.stat(path).st_size | |
def get_file_hash(path, block_size=256*128, hr=False): | |
md5 = hashlib.md5() | |
with open(path,'rb') as f: | |
for chunk in iter(lambda: f.read(block_size), b''): | |
md5.update(chunk) | |
if hr: | |
return md5.hexdigest() | |
return md5.digest() | |
def check_file_dup(file1, file2, use_hash = False): | |
if get_file_size(file1) == get_file_size(file2): | |
return True if not use_hash else get_file_hash(file1) == get_file_hash(file2) | |
return False | |
def link_or_copy(src, dst): | |
target_name = os.path.basename(dst) | |
if os.path.exists(dst): | |
print(f'\033[31m[Skip] {src} <-> {target_name}\033[0m') | |
else: | |
try: | |
os.link(src, dst) | |
print('[Link]', src, '-->', target_name) | |
return True | |
except Exception as e: | |
if not isinstance(e, IOError) or e.errno != errno.EXDEV: | |
print(f'\033[31m[ErrL] {e}\033\0m') | |
try: | |
print('[Copy ]', src, '-->', target_name) | |
shutil.copy2(src, dst) | |
return True | |
except Exception as e: | |
print(f'\033[31m[ErrC] {e}\033\0m') | |
return False | |
class MediaFile(object): | |
live_photo_map = {} | |
@classmethod | |
def check_live_photo(cls, all_file_path): | |
for path in all_file_path: | |
name, ext = os.path.splitext(path) | |
if ext.lower() in photo_ext: | |
live_video_path = None | |
for ext in live_ext + [ext.upper() for ext in live_ext]: | |
live_video_path = name+ext | |
if os.path.exists(live_video_path): | |
photo_time = get_media_time(path) | |
photo_time = photo_time if photo_time is not None else get_modify_time(path) | |
photo_time = photo_time.strftime('%Y%m%d%H') | |
video_time = get_media_time(live_video_path) | |
video_time = video_time if video_time is not None else get_modify_time(live_video_path) | |
video_time = video_time.strftime('%Y%m%d%H') | |
if photo_time == video_time: | |
cls.live_photo_map[path] = live_video_path | |
break | |
def __init__(self, path): | |
self.path = path | |
self.basename = os.path.basename(path) | |
self.name, self.ext = os.path.splitext(self.basename) | |
self.media_type = get_media_type(path) | |
self.live_video = MediaFile.live_photo_map.get(self.path, None) | |
self.media_time = get_media_time(path) | |
self.modify_time = get_modify_time(path) | |
self.size = get_file_size(path) | |
self.hash = None | |
# self.hash = get_file_hash(path, hr=True) if self.media_type == 'photo' else None | |
def _link_or_copy(self, target_dir, target_name=None): | |
target_name = target_name if target_name is not None else self.basename | |
dst = os.path.join(target_dir, target_name) | |
src = self.path | |
link_or_copy(src, dst) | |
def archive(self, target_dir, other_dir): | |
if self.media_type is not None: | |
date_time = self.media_time if self.media_time is not None else self.modify_time | |
date_str = date_time.strftime('%Y-%m') | |
target_dir = os.path.join(target_dir, date_str) | |
os.makedirs(target_dir, exist_ok=True) | |
if self.hash is not None: | |
target_name = f'{date_time.strftime("%Y%m%d_%H%M%S")}00_{self.hash}{self.ext.lower()}' | |
else: | |
for idx in range(100): | |
target_name = f'{date_time.strftime("%Y%m%d_%H%M%S")}{idx:02d}{self.ext.lower()}' | |
dst = os.path.join(target_dir, target_name) | |
if not os.path.exists(dst): | |
break | |
if os.path.exists(dst) and check_file_dup(self.path, dst, use_hash=False): | |
print(f'\033[33m[Dupl] {self.path} <-> {target_name}\033[0m') | |
return | |
self._link_or_copy(target_dir, target_name) | |
if self.live_video is not None: | |
_, ext = os.path.splitext(os.path.basename(self.live_video)) | |
target_name, _ = os.path.splitext(target_name) | |
target_name = target_name+ext.lower() | |
print(f'\033[32m[Live] {self.path} <-> {target_name}\033[0m') | |
link_or_copy(self.live_video, os.path.join(target_dir, target_name)) | |
else: | |
self._link_or_copy(other_dir) | |
def walk_dir(root, excludes): | |
for file in os.listdir(root): | |
path = os.path.join(root, file) | |
if re.match(excludes, path): | |
continue | |
if os.path.isdir(path): | |
yield from walk_dir(path, excludes) | |
else: | |
yield path | |
need_exit = False | |
def handle_exit(signal,frame): | |
global need_exit | |
need_exit = True | |
signal.signal(signal.SIGINT, handle_exit) | |
def process_media(source_dir, target_dir, other_dir, excludes): | |
all_files = [path for path in walk_dir(source_dir, excludes)] | |
MediaFile.check_live_photo(all_files) | |
with progress.Progress( | |
progress.TextColumn("[bold blue]{task.description}"), | |
progress.BarColumn(bar_width=None), | |
progress.TextColumn("[bold green]{task.completed}/{task.total}"), | |
progress.TaskProgressColumn(), | |
progress.TimeRemainingColumn(), | |
) as prog: | |
task = prog.add_task("Processing...", total=len(all_files)) | |
for path in all_files: | |
if path not in MediaFile.live_photo_map.values(): | |
MediaFile(path).archive(target_dir, other_dir) | |
prog.update(task, advance=1) | |
global need_exit | |
if need_exit: | |
break | |
if __name__ == '__main__': | |
if len(sys.argv) != 4: | |
exit(-1) | |
source_dir = sys.argv[1] | |
if not os.path.exists(source_dir): | |
exit(-1) | |
target_dir = sys.argv[2] | |
other_dir = sys.argv[3] | |
other_dir = os.path.join(target_dir, other_dir) | |
os.makedirs(other_dir, exist_ok=True) | |
excludes = ['./.venv/*', '*/@eaDir/*', f'{target_dir}/*'] | |
excludes = r'|'.join([fnmatch.translate(x) for x in excludes]) or r'$.' | |
process_media(source_dir, target_dir, other_dir, excludes) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment