from openpyxl import Workbook, load_workbook from openpyxl.utils.exceptions import InvalidFileException from fastapi import File, UploadFile from asyncio import run as asyncio_run from contextlib import closing, contextmanager from itertools import chain from os import remove from pathlib import Path from shutil import copyfileobj from tempfile import NamedTemporaryFile class DataInUnnamedColumnException(Exception): """all the columns containing any data have to me named""" pass @contextmanager def parse(file: UploadFile = File(...)): """returns a dict with a pair of iterators for each sheet in the spreadsheet in a list [ {"header": header_iterator,"data": data_iterator}, # sheet1 {"header": header_iterator,"data": data_iterator}, # sheet2 # etc ] """ # prepare return list result = [] suffix = Path(file.filename).suffix try: # TODO: decide if we use subdir in /tmp here, then create it [5] with NamedTemporaryFile(delete=False, suffix=suffix, dir="/tmp") as tmp: copyfileobj(file.file, tmp) spreadsheet_file = Path(tmp.name) # Unlike a normal workbook, a read-only workbook will use lazy loading. # The workbook must be explicitly closed with the close() method. # we use contextlib.closing() to do it for us with closing(load_workbook(spreadsheet_file, read_only=True)) as wb: # TODO: Multiple worksheets per Workbook or one? [3] # assume one for now (second one have strange formatting) ws = wb.active # assume headers are in the top row header = [ cell for row in ws.iter_rows(max_row=1, values_only=True) for cell in row ] # assume data stretch is continuous # find first occurence of None in header and assert that # no unnamed column contains any data # .index is 0-based and min_col= is 1-based so we want to # go for header.index(None)+1 # unpack row generator into separate cell iterators, chain them together, # and make sure none of the cells contains any data try: last_column_with_header = header.index(None) if any( chain( *( ws.iter_rows( min_col=last_column_with_header + 1, values_only=True ) ) ) ): raise DataInUnnamedColumnException( "Data is found in a column with empty header" ) except ValueError: # header.index(None) couldn't find anything, # all cell in the first row are filled, # so all columns have headers, # we can safely continue last_column_with_header = len(header) # Construct spreadsheet data iterator # .index is 0-based and max_col= is 1-based so we might've wanted to # go for header.index(None)+1, but max_col= range is including # and we want to only include non-empty columns, and go for # the previous one (-1) so in the end # max_col=header.index(None)+1-1 data = ws.iter_rows( min_row=2, max_col=last_column_with_header, values_only=True ) result.append( {"header": (cell for cell in header if cell is not None), "data": data} ) # END [3] yield result finally: # clean up by explicitly closing the files and removing temporary spreadsheet asyncio_run(file.close()) tmp.close() remove(spreadsheet_file)