Problem solved.
Assuming the issue was related to the large file size, I decided to 'batch' the data load for files that failed to load whole.
import os
import glob
import pandas as pds
import warnings
warnings.filterwarnings("ignore", message="iteritems is deprecated")
#spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
directory = r"/lakehouse/default/Files/C4SQLStage/DBTZBA/*.csv"
for filename in glob.glob(directory):
f = os.path.join(directory, filename)
# Load each csv file into a new Pandas DataFrame
# We use Pandas here as it supports multi-character delimiters
df = pds.read_csv(f, delimiter='', engine='python')
# Get column details for checking the number of columns
col = df.columns
# For 54 column table versions, set the column names
if len(col) != 55:
#print(f"{filename}, Columns: {len(col)}, DBName: {df.iat[0, 1]}, Date Last Updated: {df.iat[0, 53]}")
if df.iat[0, 1][0:2].upper() == 'DW':
df.columns = ['ClientID','C4SqlClientDatabaseName','cMarktNr','dtDatum','siKassNr','lBonNr','lPosNr','cStornoZle','cStorZleManag','cRueckZahlArt','cGewArt','cPfandArt','cNegArt','cStornoArt','cPreisInfo','cPreisUeb','cRabatt','cWaagArt','cDatenEing','cArtInTrAkt','cZwSumme','cPresseCode','cArtSatz','cManEing','cManBedRab','cMxNAutoRab','cPersRabatt','cAutoRabatt','cBonbezug','cKombiRab','dPluNr','dMenge','dVKPreis','dUmsatz','dSTPreis','siAbteilung','siRabSatz','dRabSumme','dArtNr','sArtBez','dEKPreis','cArtBezLang','siHauptabt','lRegalnr','siFachfolnr','lAktionsnr','lWgr','siPrzMwSt','dRabWertBon','dPreisUebWert','dVirtRabWert','lRabAktNr','lBAAutorNr','siBACodeNr']
else:
df.columns = ['cMarktNr','dtDatum','siKassNr','lBonNr','lPosNr','cStornoZle','cStorZleManag','cRueckZahlArt','cGewArt','cPfandArt','cNegArt','cStornoArt','cPreisInfo','cPreisUeb','cRabatt','cWaagArt','cDatenEing','cArtInTrAkt','cZwSumme','cPresseCode','cArtSatz','cManEing','cManBedRab','cMxNAutoRab','cPersRabatt','cAutoRabatt','cBonbezug','cKombiRab','dPluNr','dMenge','dVKPreis','dUmsatz','dSTPreis','siAbteilung','siRabSatz','dRabSumme','dArtNr','sArtBez','dEKPreis','cArtBezLang','siHauptabt','lRegalnr','siFachfolnr','lAktionsnr','lWgr','siPrzMwSt','dRabWertBon','dPreisUebWert','dVirtRabWert','lRabAktNr','lBAAutorNr','siBACodeNr','ClientID','C4SqlClientDatabaseName']
# For 55 column table versions, set the column names
else:
#print(f"{filename}, Columns: {len(col)}, DBName: {df.iat[0, 1]}, Date Last Updated: {df.iat[0, 54]}")
if df.iat[0, 1][0:2].upper() == 'DW':
df.columns = ['ClientID','C4SqlClientDatabaseName','cMarktNr','dtDatum','siKassNr','lBonNr','lPosNr','cStornoZle','cStorZleManag','cRueckZahlArt','cGewArt','cPfandArt','cNegArt','cStornoArt','cPreisInfo','cPreisUeb','cRabatt','cWaagArt','cDatenEing','cArtInTrAkt','cZwSumme','cPresseCode','cArtSatz','cManEing','cManBedRab','cMxNAutoRab','cPersRabatt','cAutoRabatt','cBonbezug','cKombiRab','dPluNr','dMenge','dVKPreis','dUmsatz','dSTPreis','siAbteilung','siRabSatz','dRabSumme','dArtNr','sArtBez','dEKPreis','cArtBezLang','siHauptabt','lRegalnr','siFachfolnr','lAktionsnr','lWgr','siPrzMwSt','dRabWertBon','dPreisUebWert','dVirtRabWert','lRabAktNr','lBAAutorNr','siBACodeNr','C4SQL_LastUpdated']
else:
df.columns = ['cMarktNr','dtDatum','siKassNr','lBonNr','lPosNr','cStornoZle','cStorZleManag','cRueckZahlArt','cGewArt','cPfandArt','cNegArt','cStornoArt','cPreisInfo','cPreisUeb','cRabatt','cWaagArt','cDatenEing','cArtInTrAkt','cZwSumme','cPresseCode','cArtSatz','cManEing','cManBedRab','cMxNAutoRab','cPersRabatt','cAutoRabatt','cBonbezug','cKombiRab','dPluNr','dMenge','dVKPreis','dUmsatz','dSTPreis','siAbteilung','siRabSatz','dRabSumme','dArtNr','sArtBez','dEKPreis','cArtBezLang','siHauptabt','lRegalnr','siFachfolnr','lAktionsnr','lWgr','siPrzMwSt','dRabWertBon','dPreisUebWert','dVirtRabWert','lRabAktNr','lBAAutorNr','siBACodeNr','ClientID','C4SqlClientDatabaseName','C4SQL_LastUpdated']
# Create a new DataFrame using the common subset of columns from the DBTZBA table
df_common = df.loc[:, ['ClientID','C4SqlClientDatabaseName','cMarktNr','dtDatum','siKassNr','lBonNr','lPosNr','cStornoZle','cStorZleManag','cRueckZahlArt','cGewArt','cPfandArt','cNegArt','cStornoArt','cPreisInfo','cPreisUeb','cRabatt','cWaagArt','cDatenEing','cArtInTrAkt','cZwSumme','cPresseCode','cArtSatz','cManEing','cManBedRab','cMxNAutoRab','cPersRabatt','cAutoRabatt','cBonbezug','cKombiRab','dPluNr','dMenge','dVKPreis','dUmsatz','dSTPreis','siAbteilung','siRabSatz','dRabSumme','dArtNr','sArtBez','dEKPreis','cArtBezLang','siHauptabt','lRegalnr','siFachfolnr','lAktionsnr','lWgr','siPrzMwSt','dRabWertBon','dPreisUebWert','dVirtRabWert','lRabAktNr','lBAAutorNr','siBACodeNr']]
try:
# Set up a spark DataFrame here to handle the write (append) to the new table
spark_df = spark.createDataFrame(df_common)
spark_df.write.mode("append").format("delta").saveAsTable("DBTZBA")
except Exception as e:
print(f"Error processing file: {filename}, Trying Month level batching.")
# Iterate through the range of months represented in dtDatum values
for unqValue in pds.to_datetime(df_common['dtDatum']).dt.strftime('%Y-%m').unique():
# Filter the DBTZBA DataFrame for the selected month
spark_df = spark.createDataFrame(df_common[pds.to_datetime(df_common['dtDatum']).dt.strftime('%Y-%m') == unqValue])
# Append the csv data to the new table
# We use spark here as it supports writing to tables
try:
spark_df.write.mode("append").format("delta").saveAsTable("DBTZBA")
except Exception as e2:
print(f"Error processing file: {filename}, Error: {e2}")