Skip to content

Instantly share code, notes, and snippets.

@yrvsyh
Last active December 3, 2023 04:05
Show Gist options
  • Save yrvsyh/c10b54bc014646e4ab90d9060ac0bd6a to your computer and use it in GitHub Desktop.
Save yrvsyh/c10b54bc014646e4ab90d9060ac0bd6a to your computer and use it in GitHub Desktop.
archive photo and video by date
#!/usr/bin/env python3
import os
import re
import sys
import math
import errno
import shutil
import signal
import hashlib
import fnmatch
import exifread
import subprocess
from datetime import datetime, timezone, timedelta
from rich import progress
def _monkey_patch_exifread():
from exifread import HEICExifFinder
from exifread.heic import NoParser
_old_get_parser = HEICExifFinder.get_parser
def _get_parser(self, box):
try:
return _old_get_parser(self, box)
except NoParser:
return None
HEICExifFinder.get_parser = _get_parser
if exifread.__version__ == '3.0.0':
_monkey_patch_exifread()
def convert_size(size_bytes):
if size_bytes == 0:
return "0B"
size_name = ("B", "KB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB")
i = int(math.floor(math.log(size_bytes, 1024)))
p = math.pow(1024, i)
s = round(size_bytes / p, 2)
return "%s %s" % (s, size_name[i])
photo_ext = ['.jpg', '.jpeg', '.png', '.gif', '.heic', '.nef']
video_ext = ['.mp4', '.mov', '.hevc']
live_ext = ['.mov']
def get_media_type(path):
_, ext = os.path.splitext(path)
if ext.lower() in photo_ext:
return 'photo'
elif ext.lower() in video_ext:
return 'video'
else:
return None
def get_media_time(path):
media_time = None
media_type = get_media_type(path)
if media_type == 'photo':
with open(path, 'rb') as f:
try:
exif = exifread.process_file(f)
media_time_str = exif.get('EXIF DateTimeOriginal')
if media_time_str is None:
media_time_str = exif.get('Image DateTime')
if media_time_str is not None:
media_time = datetime.strptime(str(media_time_str), '%Y:%m:%d %H:%M:%S')
except Exception as e :
print(f'\033[31m[Exif ] {path} {e}\033[0m')
elif media_type == 'video':
result = subprocess.run(f'ffprobe -v quiet -select_streams v:0 -show_entries stream_tags=creation_time -of default=noprint_wrappers=1:nokey=1 {path}', shell=True, stdout=subprocess.PIPE)
media_time_str = result.stdout.decode('utf-8')
if len(media_time_str) > 0:
media_time = datetime.fromisoformat(media_time_str[:-1]).astimezone(timezone(timedelta(hours=8))).replace(tzinfo=None)
return media_time
def get_modify_time(path):
return datetime.fromtimestamp(os.stat(path).st_mtime).replace(microsecond=0)
def get_file_size(path):
return os.stat(path).st_size
def get_file_hash(path, block_size=256*128, hr=False):
md5 = hashlib.md5()
with open(path,'rb') as f:
for chunk in iter(lambda: f.read(block_size), b''):
md5.update(chunk)
if hr:
return md5.hexdigest()
return md5.digest()
def check_file_dup(file1, file2, use_hash = False):
if get_file_size(file1) == get_file_size(file2):
return True if not use_hash else get_file_hash(file1) == get_file_hash(file2)
return False
def link_or_copy(src, dst):
target_name = os.path.basename(dst)
if os.path.exists(dst):
print(f'\033[31m[Skip] {src} <-> {target_name}\033[0m')
else:
try:
os.link(src, dst)
print('[Link]', src, '-->', target_name)
return True
except Exception as e:
if not isinstance(e, IOError) or e.errno != errno.EXDEV:
print(f'\033[31m[ErrL] {e}\033\0m')
try:
print('[Copy ]', src, '-->', target_name)
shutil.copy2(src, dst)
return True
except Exception as e:
print(f'\033[31m[ErrC] {e}\033\0m')
return False
class MediaFile(object):
live_photo_map = {}
@classmethod
def check_live_photo(cls, all_file_path):
for path in all_file_path:
name, ext = os.path.splitext(path)
if ext.lower() in photo_ext:
live_video_path = None
for ext in live_ext + [ext.upper() for ext in live_ext]:
live_video_path = name+ext
if os.path.exists(live_video_path):
photo_time = get_media_time(path)
photo_time = photo_time if photo_time is not None else get_modify_time(path)
photo_time = photo_time.strftime('%Y%m%d%H')
video_time = get_media_time(live_video_path)
video_time = video_time if video_time is not None else get_modify_time(live_video_path)
video_time = video_time.strftime('%Y%m%d%H')
if photo_time == video_time:
cls.live_photo_map[path] = live_video_path
break
def __init__(self, path):
self.path = path
self.basename = os.path.basename(path)
self.name, self.ext = os.path.splitext(self.basename)
self.media_type = get_media_type(path)
self.live_video = MediaFile.live_photo_map.get(self.path, None)
self.media_time = get_media_time(path)
self.modify_time = get_modify_time(path)
self.size = get_file_size(path)
self.hash = None
# self.hash = get_file_hash(path, hr=True) if self.media_type == 'photo' else None
def _link_or_copy(self, target_dir, target_name=None):
target_name = target_name if target_name is not None else self.basename
dst = os.path.join(target_dir, target_name)
src = self.path
link_or_copy(src, dst)
def archive(self, target_dir, other_dir):
if self.media_type is not None:
date_time = self.media_time if self.media_time is not None else self.modify_time
date_str = date_time.strftime('%Y-%m')
target_dir = os.path.join(target_dir, date_str)
os.makedirs(target_dir, exist_ok=True)
if self.hash is not None:
target_name = f'{date_time.strftime("%Y%m%d_%H%M%S")}00_{self.hash}{self.ext.lower()}'
else:
for idx in range(100):
target_name = f'{date_time.strftime("%Y%m%d_%H%M%S")}{idx:02d}{self.ext.lower()}'
dst = os.path.join(target_dir, target_name)
if not os.path.exists(dst):
break
if os.path.exists(dst) and check_file_dup(self.path, dst, use_hash=False):
print(f'\033[33m[Dupl] {self.path} <-> {target_name}\033[0m')
return
self._link_or_copy(target_dir, target_name)
if self.live_video is not None:
_, ext = os.path.splitext(os.path.basename(self.live_video))
target_name, _ = os.path.splitext(target_name)
target_name = target_name+ext.lower()
print(f'\033[32m[Live] {self.path} <-> {target_name}\033[0m')
link_or_copy(self.live_video, os.path.join(target_dir, target_name))
else:
self._link_or_copy(other_dir)
def walk_dir(root, excludes):
for file in os.listdir(root):
path = os.path.join(root, file)
if re.match(excludes, path):
continue
if os.path.isdir(path):
yield from walk_dir(path, excludes)
else:
yield path
need_exit = False
def handle_exit(signal,frame):
global need_exit
need_exit = True
signal.signal(signal.SIGINT, handle_exit)
def process_media(source_dir, target_dir, other_dir, excludes):
all_files = [path for path in walk_dir(source_dir, excludes)]
MediaFile.check_live_photo(all_files)
with progress.Progress(
progress.TextColumn("[bold blue]{task.description}"),
progress.BarColumn(bar_width=None),
progress.TextColumn("[bold green]{task.completed}/{task.total}"),
progress.TaskProgressColumn(),
progress.TimeRemainingColumn(),
) as prog:
task = prog.add_task("Processing...", total=len(all_files))
for path in all_files:
if path not in MediaFile.live_photo_map.values():
MediaFile(path).archive(target_dir, other_dir)
prog.update(task, advance=1)
global need_exit
if need_exit:
break
if __name__ == '__main__':
if len(sys.argv) != 4:
exit(-1)
source_dir = sys.argv[1]
if not os.path.exists(source_dir):
exit(-1)
target_dir = sys.argv[2]
other_dir = sys.argv[3]
other_dir = os.path.join(target_dir, other_dir)
os.makedirs(other_dir, exist_ok=True)
excludes = ['./.venv/*', '*/@eaDir/*', f'{target_dir}/*']
excludes = r'|'.join([fnmatch.translate(x) for x in excludes]) or r'$.'
process_media(source_dir, target_dir, other_dir, excludes)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment