# Third-party from wand.image import Image import filetype # Built-in from os import path, walk from sys import argv, stderr from shutil import move from fractions import Fraction import sqlite3 def decimal_from_rational64u(dms: str, ref): """Convert coordinates from rational64u EXIF uses to represent degrees, minutes, and seconds of a point to decimal format Take into account reference cardinal direction and turn [S]outh and [W]est coordinates negative General formula is dec_degree = degreesNumerator / degreesDenominator + minutesNumerator / minutesDenominator / 60 + secondsNumerator / secondsDenominator / 3600 https://en.wikipedia.org/wiki/Geographic_coordinate_conversion https://gis.stackexchange.com/questions/136925/how-to-parse-exif-gps-information-to-lat-lng-decimal-numbers >>> decimal_from_rational64u(dms="42/1, 18/1, 2914/100", "S") -42.30809 """ # 1 split by comma # 2 turn each into Fraction # 3 zip fractions with their respective (degrees, minutes, seconds) denominator # 4 divide the fraction # 5 sum up the result # 6 convert to decimal # 7 round to 5th decimal point dec_coordinates = round( float( sum( a / b for (a, b) in zip((Fraction(f) for f in dms.split(",")), (1, 60, 3600)) ) ), 5, ) if ref in ("S", "W"): dec_coordinates = -dec_coordinates return dec_coordinates def process_pictures(source: str, dest_shrunk: str, dest_original: str): """Process images from the base directory in the first command line argument. Place the resized copies to dest_shrunk and move the originals to dest_original. Return a dict for each image processed for database collection. """ # walk every pic # We only care about files in the root of the path # Ignore any nested directories (root, _, filenames) = next(walk(source, topdown=True), (None, None, [])) for filename in filenames: # FIXME[0]:what if picture with the same name already exists? # skip any non-image files if not filetype.image_match(path.join(root, filename)): continue # process pictures with wand lib, imagemagick wrapper exif = {} with Image(filename=path.join(root, filename)) as image: exif.update( (k[5:], v) for k, v in image.metadata.items() if k.startswith("exif:") ) with image.clone() as cloned: # adjust an image so that its orientation is suitable for viewing # (i.e. top-left orientation) by checking EXIF data cloned.auto_orient() # strip an image of all profiles and comments cloned.strip() # Q: may damage icc, do we allow that or use smh else? # resize the shorter side to be no more than 1000px # https://legacy.imagemagick.org/discourse-server/viewtopic.php?p=44329#p44329 cloned.transform(resize="1000^>") # Q: what do we want here? # move them to the processed folder cloned.save(filename=path.join(dest_shrunk, filename)) # move the originals out of the working directory # Q: do we strip exif from originals? move(path.join(root, filename), dest_original) try: # return the freshly processed picture info yield { "ResizedImage": path.join(dest_shrunk, filename), "OriginalImage": path.join(dest_original, filename), "DateTimeOriginal": exif["DateTimeOriginal"], # Q: normalize it? "GPSLatitude": decimal_from_rational64u( exif["GPSLatitude"], exif["GPSLatitudeRef"] ), "GPSLongitude": decimal_from_rational64u( exif["GPSLongitude"], exif["GPSLongitudeRef"] ), } except KeyError as e: print(f"Image '{filename}' has no valid exif") continue def update_database(pic_info: dict, db_location: str): """Append new image information to the existing database or create a new one, if it does not exist yet """ # make sure the database exists check_database(db_location) # FIXME[1]: closure it, so we open it only once? con = sqlite3.connect(db_location) cur = con.cursor() # insert new pictures to the image table cur.execute( """INSERT INTO images(resizedpath, origpath, date, GPSLatitude, GPSLongitude) VALUES (:ResizedImage, :OriginalImage, :DateTimeOriginal, :GPSLatitude, :GPSLongitude) """, pic_info, ) con.commit() # FIXME[1] con.close() def check_database(database_path: str): """Check if there is a database at DB_LOCATION. Just return if there is. If not, create a new one. """ # db exists? if path.exists(database_path): return # make one else: print("No DB, creating", path.abspath(database_path)) con = sqlite3.connect(database_path) cur = con.cursor() # Create table cur.execute( """CREATE TABLE images ( imgid INTEGER PRIMARY KEY, resizedpath TEXT NOT NULL, origpath TEXT NOT NULL, date TEXT, GPSLatitude REAL, GPSLongitude REAL )""" ) con.commit() # create the rest of the tables, while we're at it cur.execute( """CREATE TABLE sessions ( sessionid INTEGER PRIMARY KEY, cookie TEXT UNIQUE NOT NULL, time TEXT, description TEXT )""" ) con.commit() cur.execute( """CREATE TABLE marks ( imgid INTEGER, sessionid INTEGER, mark INTEGER, PRIMARY KEY (imgid, sessionid), FOREIGN KEY (imgid) REFERENCES images (imgid) ON DELETE NO ACTION ON UPDATE NO ACTION, FOREIGN KEY (sessionid) REFERENCES sessions (sessionid) ON DELETE NO ACTION ON UPDATE NO ACTION )""" ) con.commit() # we only use this occasionaly with new databases, # so don't bother with transfer logic, just close and reopen it later con.close() def run(db_location: str, source: str, dest_shrunk: str, dest_original: str): """Core program logic""" pics_processed = 0 # process each pic and add it to the database for pic in process_pictures(source, dest_shrunk, dest_original): update_database(pic, db_location) pics_processed += 1 if pics_processed == 0: print("No pictures processed from", source) print("Do we have enough permissions?") else: print("Pictures processed:", pics_processed) def usage(): """Brief usage explanation""" print("USAGE: python {name} /path/to/images".format(name=argv[0]), file=stderr) def main(): if len(argv) != 2: usage() exit(1) import sys import os # append root directory to sys.path # to allow import globals from ../config.py sys.path.append(os.path.dirname(__file__) + "/..") import config as cfg run( cfg.DB_LOCATION, argv[1], path.normcase(cfg.DEST_SHRUNK), path.normcase(cfg.DEST_ORIGINAL), ) if __name__ == "__main__": main()