You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
geodata-catalog/backend/src/spreadsheet.py

104 lines
3.8 KiB

from openpyxl import Workbook, load_workbook
from openpyxl.utils.exceptions import InvalidFileException
from fastapi import File, UploadFile
from asyncio import run as asyncio_run
from contextlib import closing, contextmanager
from itertools import chain
from os import remove
from pathlib import Path
from shutil import copyfileobj
from tempfile import NamedTemporaryFile
class DataInUnnamedColumnException(Exception):
"""all the columns containing any data have to me named"""
pass
@contextmanager
def parse(file: UploadFile = File(...)):
"""returns a dict with a pair of iterators
for each sheet in the spreadsheet in a list
[
{"header": header_iterator,"data": data_iterator}, # sheet1
{"header": header_iterator,"data": data_iterator}, # sheet2
# etc
]
"""
# prepare return list
result = []
suffix = Path(file.filename).suffix
try:
# TODO: decide if we use subdir in /tmp here, then create it [5]
with NamedTemporaryFile(delete=False, suffix=suffix, dir="/tmp") as tmp:
copyfileobj(file.file, tmp)
spreadsheet_file = Path(tmp.name)
# Unlike a normal workbook, a read-only workbook will use lazy loading.
# The workbook must be explicitly closed with the close() method.
# we use contextlib.closing() to do it for us
with closing(load_workbook(spreadsheet_file, read_only=True)) as wb:
# TODO: Multiple worksheets per Workbook or one? [3]
# assume one for now (second one have strange formatting)
ws = wb.active
# assume headers are in the top row
header = [
cell
for row in ws.iter_rows(max_row=1, values_only=True)
for cell in row
]
# assume data stretch is continuous
# find first occurence of None in header and assert that
# no unnamed column contains any data
# .index is 0-based and min_col= is 1-based so we want to
# go for header.index(None)+1
# unpack row generator into separate cell iterators, chain them together,
# and make sure none of the cells contains any data
try:
last_column_with_header = header.index(None)
if any(
chain(
*(
ws.iter_rows(
min_col=last_column_with_header + 1, values_only=True
)
)
)
):
raise DataInUnnamedColumnException(
"Data is found in a column with empty header"
)
except ValueError:
# header.index(None) couldn't find anything,
# all cell in the first row are filled,
# so all columns have headers,
# we can safely continue
last_column_with_header = len(header)
# Construct spreadsheet data iterator
# .index is 0-based and max_col= is 1-based so we might've wanted to
# go for header.index(None)+1, but max_col= range is including
# and we want to only include non-empty columns, and go for
# the previous one (-1) so in the end
# max_col=header.index(None)+1-1
data = ws.iter_rows(
min_row=2, max_col=last_column_with_header, values_only=True
)
result.append(
{"header": (cell for cell in header if cell is not None), "data": data}
)
# END [3]
yield result
finally:
# clean up by explicitly closing the files and removing temporary spreadsheet
asyncio_run(file.close())
tmp.close()
remove(spreadsheet_file)