You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
photovoter_backend/util/import_photos.py

132 lines
4.2 KiB

# Third-party
from wand.image import Image
import filetype
# Built-in
from os import path, walk
from sys import argv, stderr
from shutil import move
import sqlite3
# update database residing here
DB_LOCATION = "../../testbox/photovoter.dblite"
# place compressed images here (needs to exist)
DEST_STRUNK = "../../testbox/image/"
# move originals here (needs to exist)
DEST_ORIGINAL = "../../testbox/original/"
def usage():
"""Brief usage explanation"""
print("USAGE: ./{name} /path/to/images".format(name=argv[0]), file=stderr)
def process_pictures():
"""Process images from the base directory in the first command line argument.
Place the resized copies to DEST_STRUNK and
move the originals to DEST_ORIGINAL.
Return a dict for each image processed for database collection.
"""
# walk every pic
# We only care about files in the root of the path
# Ignore any nested directories
(root, _, filenames) = next(walk(argv[1], topdown=True), (None, None, []))
for filename in filenames:
# FIXME:what if picture with the same name already exists?
# skip any non-image files
if not filetype.image_match(path.join(root, filename)):
continue
# process pictures with wand lib, imagemagick wrapper
exif = {}
with Image(filename=path.join(root, filename)) as image:
exif.update(
(k[5:], v) for k, v in image.metadata.items() if k.startswith("exif:")
)
with image.clone() as cloned:
cloned.strip() # Q: may damage icc, do we allow that or use smh else?
cloned.transform(resize="50%") # Q: what do we want here?
# move them to the processed folder
cloned.save(filename=path.join(DEST_STRUNK, filename))
# move the originals out of the working directory
# Q: do we strip exif from originals?
move(path.join(root, filename), DEST_ORIGINAL)
# return the freshly processed picture info
yield {
"ResizedImage": path.join(DEST_STRUNK, filename),
"OriginalImage": path.join(DEST_ORIGINAL, filename),
"DateTimeOriginal": exif["DateTimeOriginal"], # Q: normalize it?
"GPSLatitude": exif["GPSLatitude"],
"GPSLatitudeRef": exif["GPSLatitudeRef"],
"GPSLongitude": exif["GPSLongitude"],
"GPSLongitudeRef": exif["GPSLongitudeRef"],
}
def update_database():
"""Append new image information to the existing database
or create a new one, if it does not exist yet
"""
# make sure the database exists
check_database(DB_LOCATION)
# insert new pictures to the image table
pass
def check_database(database_path: str):
"""Check if there is a database at DB_LOCATION.
Just return if there is. If not, create a new one.
"""
# db exists?
if path.exists(database_path):
return
# make one
else:
print("No DB, creating", database_path)
con = sqlite3.connect(database_path)
cur = con.cursor()
# Create table
cur.execute(
"""CREATE TABLE images
(
imgid INTEGER PRIMARY KEY,
resizedpath TEXT NOT NULL,
origpath TEXT NOT NULL,
date TEXT,
GPSLatitude TEXT,
GPSLatitudeRef TEXT,
GPSLongitude TEXT,
GPSLongitudeRef TEXT
)"""
)
con.commit()
# we only use this occasionaly with the new databases,
# so don't bother with transfer logic, just close and reopen it later
con.close()
def main():
if len(argv) != 2:
usage()
exit(1)
pics_processed = 0
# process each pic and add it to the database
for pic in process_pictures():
update_database()
print(pic) # just print into strout for now
pics_processed += 1
if pics_processed == 0:
print("No more pictures processed from", argv[1])
print("Do we have enough permissions?")
else:
print("Pictures processed:", pics_processed)
if __name__ == "__main__":
main()