You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

246 lines
7.9 KiB

# Third-party
from wand.image import Image
import filetype
# Built-in
from os import path, walk
from sys import argv, stderr
from shutil import move
from fractions import Fraction
import sqlite3
def decimal_from_rational64u(dms: str, ref):
"""Convert coordinates from rational64u EXIF uses to represent
degrees, minutes, and seconds of a point to decimal format
Take into account reference cardinal direction and
turn [S]outh and [W]est coordinates negative
General formula is
dec_degree = degreesNumerator / degreesDenominator
+ minutesNumerator / minutesDenominator / 60
+ secondsNumerator / secondsDenominator / 3600
https://en.wikipedia.org/wiki/Geographic_coordinate_conversion
https://gis.stackexchange.com/questions/136925/how-to-parse-exif-gps-information-to-lat-lng-decimal-numbers
>>> decimal_from_rational64u(dms="42/1, 18/1, 2914/100", "S")
-42.30809
"""
# 1 split by comma
# 2 turn each into Fraction
# 3 zip fractions with their respective (degrees, minutes, seconds) denominator
# 4 divide the fraction
# 5 sum up the result
# 6 convert to decimal
# 7 round to 5th decimal point
dec_coordinates = round(
float(
sum(
a / b
for (a, b) in zip((Fraction(f) for f in dms.split(",")), (1, 60, 3600))
)
),
5,
)
if ref in ("S", "W"):
dec_coordinates = -dec_coordinates
return dec_coordinates
def process_pictures(source: str, dest_shrunk: str, dest_original: str):
"""Process images from the base directory in the first command line argument.
Place the resized copies to dest_shrunk and
move the originals to dest_original.
Return a dict for each image processed for database collection.
"""
# walk every pic
# We only care about files in the root of the path
# Ignore any nested directories
(root, _, filenames) = next(walk(source, topdown=True), (None, None, []))
for filename in filenames:
# FIXME[0]:what if picture with the same name already exists?
# skip any non-image files
if not filetype.image_match(path.join(root, filename)):
continue
# process pictures with wand lib, imagemagick wrapper
exif = {}
with Image(filename=path.join(root, filename)) as image:
exif.update(
(k[5:], v) for k, v in image.metadata.items() if k.startswith("exif:")
)
with image.clone() as cloned:
# adjust an image so that its orientation is suitable for viewing
# (i.e. top-left orientation) by checking EXIF data
cloned.auto_orient()
# strip an image of all profiles and comments
cloned.strip() # Q: may damage icc, do we allow that or use smh else?
# resize the shorter side to be no more than 1000px
# https://legacy.imagemagick.org/discourse-server/viewtopic.php?p=44329#p44329
cloned.transform(resize="1000^>") # Q: what do we want here?
# move them to the processed folder
cloned.save(filename=path.join(dest_shrunk, filename))
# move the originals out of the working directory
# Q: do we strip exif from originals?
move(path.join(root, filename), dest_original)
try:
# return the freshly processed picture info
yield {
"ResizedImage": path.join(dest_shrunk, filename),
"OriginalImage": path.join(dest_original, filename),
"DateTimeOriginal": exif["DateTimeOriginal"], # Q: normalize it?
"GPSLatitude": decimal_from_rational64u(
exif["GPSLatitude"], exif["GPSLatitudeRef"]
),
"GPSLongitude": decimal_from_rational64u(
exif["GPSLongitude"], exif["GPSLongitudeRef"]
),
}
except KeyError as e:
print(f"Image '{filename}' has no valid exif")
continue
def update_database(pic_info: dict, db_location: str):
"""Append new image information to the existing database
or create a new one, if it does not exist yet
"""
# make sure the database exists
check_database(db_location)
# FIXME[1]: closure it, so we open it only once?
con = sqlite3.connect(db_location)
cur = con.cursor()
# insert new pictures to the image table
cur.execute(
"""INSERT INTO images(resizedpath,
origpath,
date,
GPSLatitude,
GPSLongitude)
VALUES (:ResizedImage,
:OriginalImage,
:DateTimeOriginal,
:GPSLatitude,
:GPSLongitude)
""",
pic_info,
)
con.commit()
# FIXME[1]
con.close()
def check_database(database_path: str):
"""Check if there is a database at DB_LOCATION.
Just return if there is. If not, create a new one.
"""
# db exists?
if path.exists(database_path):
return
# make one
else:
print("No DB, creating", path.abspath(database_path))
con = sqlite3.connect(database_path)
cur = con.cursor()
# Create table
cur.execute(
"""CREATE TABLE images
(
imgid INTEGER PRIMARY KEY,
resizedpath TEXT NOT NULL,
origpath TEXT NOT NULL,
date TEXT,
GPSLatitude REAL,
GPSLongitude REAL
)"""
)
con.commit()
# create the rest of the tables, while we're at it
cur.execute(
"""CREATE TABLE sessions
(
sessionid INTEGER PRIMARY KEY,
cookie TEXT UNIQUE NOT NULL,
time TEXT,
description TEXT
)"""
)
con.commit()
cur.execute(
"""CREATE TABLE marks
(
imgid INTEGER,
sessionid INTEGER,
mark INTEGER,
PRIMARY KEY (imgid, sessionid),
FOREIGN KEY (imgid)
REFERENCES images (imgid)
ON DELETE NO ACTION
ON UPDATE NO ACTION,
FOREIGN KEY (sessionid)
REFERENCES sessions (sessionid)
ON DELETE NO ACTION
ON UPDATE NO ACTION
)"""
)
con.commit()
# we only use this occasionaly with new databases,
# so don't bother with transfer logic, just close and reopen it later
con.close()
def run(db_location: str, source: str, dest_shrunk: str, dest_original: str):
"""Core program logic"""
pics_processed = 0
# process each pic and add it to the database
for pic in process_pictures(source, dest_shrunk, dest_original):
update_database(pic, db_location)
pics_processed += 1
if pics_processed == 0:
print("No pictures processed from", source)
print("Do we have enough permissions?")
else:
print("Pictures processed:", pics_processed)
def usage():
"""Brief usage explanation"""
print("USAGE: python {name} /path/to/images".format(name=argv[0]), file=stderr)
def main():
if len(argv) != 2:
usage()
exit(1)
import sys
import os
# append root directory to sys.path
# to allow import globals from ../config.py
sys.path.append(os.path.dirname(__file__) + "/..")
import config as cfg
run(
cfg.DB_LOCATION,
argv[1],
path.normcase(cfg.DEST_SHRUNK),
path.normcase(cfg.DEST_ORIGINAL),
)
if __name__ == "__main__":
main()