Skip to main content
cancel
Showing results for 
Search instead for 
Did you mean: 

Grow your Fabric skills and prepare for the DP-600 certification exam by completing the latest Microsoft Fabric challenge.

Reply
AndrewWestran
Helper I
Helper I

Fabric Notebook - Message Maxsize

Hi 

 

I am facing the following issue when loading data to a table using a notebook (PySpark):

 

Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Serialized task 12182:0 was 182487782 bytes, which exceeds max allowed: spark.rpc.message.maxSize (134217728 bytes). Consider increasing spark.rpc.message.maxSize or using broadcast variables for large values. at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2682) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2618) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2617) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2617) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1190) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1190) at scala.Option.foreach(Option.scala:407) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1190) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2870) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2812) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2801) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)

 

Code as follows:

 

import os
import glob
import pandas as pds

directory = r"/lakehouse/default/Files/C4SQLStage/DBTZBA/*.csv"

for filename in glob.glob(directory):
    f = os.path.join(directory, filename)

    # Load each csv file into a new Pandas DataFrame
    # We use Pandas here as it supports multi-character delimiters
    df = pds.read_csv(f, delimiter='', engine='python')

    # Get column details for checking the number of columns
    col = df.columns

    # For 54 column table versions, set the column names
    if len(col) != 55:
        if df.iat[0, 1][0:2].upper() == 'DW':
            df.columns = ['ClientID','C4SqlClientDatabaseName','cMarktNr','dtDatum','siKassNr','lBonNr','lPosNr','cStornoZle','cStorZleManag','cRueckZahlArt','cGewArt','cPfandArt','cNegArt','cStornoArt','cPreisInfo','cPreisUeb','cRabatt','cWaagArt','cDatenEing','cArtInTrAkt','cZwSumme','cPresseCode','cArtSatz','cManEing','cManBedRab','cMxNAutoRab','cPersRabatt','cAutoRabatt','cBonbezug','cKombiRab','dPluNr','dMenge','dVKPreis','dUmsatz','dSTPreis','siAbteilung','siRabSatz','dRabSumme','dArtNr','sArtBez','dEKPreis','cArtBezLang','siHauptabt','lRegalnr','siFachfolnr','lAktionsnr','lWgr','siPrzMwSt','dRabWertBon','dPreisUebWert','dVirtRabWert','lRabAktNr','lBAAutorNr','siBACodeNr']
        else:
            df.columns = ['cMarktNr','dtDatum','siKassNr','lBonNr','lPosNr','cStornoZle','cStorZleManag','cRueckZahlArt','cGewArt','cPfandArt','cNegArt','cStornoArt','cPreisInfo','cPreisUeb','cRabatt','cWaagArt','cDatenEing','cArtInTrAkt','cZwSumme','cPresseCode','cArtSatz','cManEing','cManBedRab','cMxNAutoRab','cPersRabatt','cAutoRabatt','cBonbezug','cKombiRab','dPluNr','dMenge','dVKPreis','dUmsatz','dSTPreis','siAbteilung','siRabSatz','dRabSumme','dArtNr','sArtBez','dEKPreis','cArtBezLang','siHauptabt','lRegalnr','siFachfolnr','lAktionsnr','lWgr','siPrzMwSt','dRabWertBon','dPreisUebWert','dVirtRabWert','lRabAktNr','lBAAutorNr','siBACodeNr','ClientID','C4SqlClientDatabaseName']
    # For 55 column table versions, set the column names
    else:
        if df.iat[0, 1][0:2].upper() == 'DW':
            df.columns = ['ClientID','C4SqlClientDatabaseName','cMarktNr','dtDatum','siKassNr','lBonNr','lPosNr','cStornoZle','cStorZleManag','cRueckZahlArt','cGewArt','cPfandArt','cNegArt','cStornoArt','cPreisInfo','cPreisUeb','cRabatt','cWaagArt','cDatenEing','cArtInTrAkt','cZwSumme','cPresseCode','cArtSatz','cManEing','cManBedRab','cMxNAutoRab','cPersRabatt','cAutoRabatt','cBonbezug','cKombiRab','dPluNr','dMenge','dVKPreis','dUmsatz','dSTPreis','siAbteilung','siRabSatz','dRabSumme','dArtNr','sArtBez','dEKPreis','cArtBezLang','siHauptabt','lRegalnr','siFachfolnr','lAktionsnr','lWgr','siPrzMwSt','dRabWertBon','dPreisUebWert','dVirtRabWert','lRabAktNr','lBAAutorNr','siBACodeNr','C4SQL_LastUpdated']
        else:
            df.columns = ['cMarktNr','dtDatum','siKassNr','lBonNr','lPosNr','cStornoZle','cStorZleManag','cRueckZahlArt','cGewArt','cPfandArt','cNegArt','cStornoArt','cPreisInfo','cPreisUeb','cRabatt','cWaagArt','cDatenEing','cArtInTrAkt','cZwSumme','cPresseCode','cArtSatz','cManEing','cManBedRab','cMxNAutoRab','cPersRabatt','cAutoRabatt','cBonbezug','cKombiRab','dPluNr','dMenge','dVKPreis','dUmsatz','dSTPreis','siAbteilung','siRabSatz','dRabSumme','dArtNr','sArtBez','dEKPreis','cArtBezLang','siHauptabt','lRegalnr','siFachfolnr','lAktionsnr','lWgr','siPrzMwSt','dRabWertBon','dPreisUebWert','dVirtRabWert','lRabAktNr','lBAAutorNr','siBACodeNr','ClientID','C4SqlClientDatabaseName','C4SQL_LastUpdated']

    # Create a new spark DataFrame using the common subset of columns from the DBTZBA table
    # We use spark here as it supports writing to tables
    df_new = spark.createDataFrame(df.loc[:, ['ClientID','C4SqlClientDatabaseName','cMarktNr','dtDatum','siKassNr','lBonNr','lPosNr','cStornoZle','cStorZleManag','cRueckZahlArt','cGewArt','cPfandArt','cNegArt','cStornoArt','cPreisInfo','cPreisUeb','cRabatt','cWaagArt','cDatenEing','cArtInTrAkt','cZwSumme','cPresseCode','cArtSatz','cManEing','cManBedRab','cMxNAutoRab','cPersRabatt','cAutoRabatt','cBonbezug','cKombiRab','dPluNr','dMenge','dVKPreis','dUmsatz','dSTPreis','siAbteilung','siRabSatz','dRabSumme','dArtNr','sArtBez','dEKPreis','cArtBezLang','siHauptabt','lRegalnr','siFachfolnr','lAktionsnr','lWgr','siPrzMwSt','dRabWertBon','dPreisUebWert','dVirtRabWert','lRabAktNr','lBAAutorNr','siBACodeNr']])
   
    # Append the csv data to the new table
    df_new.write.mode("append").format("delta").saveAsTable("DBTZBA")
 
Error is on the last line of the code.
 
The file being processed is the first file in the queue that is in the region of 1GB in size. Previous files are all under 400MB.
1 ACCEPTED SOLUTION
AndrewWestran
Helper I
Helper I

Problem solved.

Assuming the issue was related to the large file size, I decided to 'batch' the data load for files that failed to load whole.

Updated Code:

 

import os
import glob
import pandas as pds
import warnings

warnings.filterwarnings("ignore", message="iteritems is deprecated")

#spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")

directory = r"/lakehouse/default/Files/C4SQLStage/DBTZBA/*.csv"

for filename in glob.glob(directory):
    f = os.path.join(directory, filename)

    # Load each csv file into a new Pandas DataFrame
    # We use Pandas here as it supports multi-character delimiters
    df = pds.read_csv(f, delimiter='', engine='python')

    # Get column details for checking the number of columns
    col = df.columns

    # For 54 column table versions, set the column names
    if len(col) != 55:
        #print(f"{filename}, Columns: {len(col)}, DBName: {df.iat[0, 1]},  Date Last Updated: {df.iat[0, 53]}")
        if df.iat[0, 1][0:2].upper() == 'DW':
            df.columns = ['ClientID','C4SqlClientDatabaseName','cMarktNr','dtDatum','siKassNr','lBonNr','lPosNr','cStornoZle','cStorZleManag','cRueckZahlArt','cGewArt','cPfandArt','cNegArt','cStornoArt','cPreisInfo','cPreisUeb','cRabatt','cWaagArt','cDatenEing','cArtInTrAkt','cZwSumme','cPresseCode','cArtSatz','cManEing','cManBedRab','cMxNAutoRab','cPersRabatt','cAutoRabatt','cBonbezug','cKombiRab','dPluNr','dMenge','dVKPreis','dUmsatz','dSTPreis','siAbteilung','siRabSatz','dRabSumme','dArtNr','sArtBez','dEKPreis','cArtBezLang','siHauptabt','lRegalnr','siFachfolnr','lAktionsnr','lWgr','siPrzMwSt','dRabWertBon','dPreisUebWert','dVirtRabWert','lRabAktNr','lBAAutorNr','siBACodeNr']
        else:
            df.columns = ['cMarktNr','dtDatum','siKassNr','lBonNr','lPosNr','cStornoZle','cStorZleManag','cRueckZahlArt','cGewArt','cPfandArt','cNegArt','cStornoArt','cPreisInfo','cPreisUeb','cRabatt','cWaagArt','cDatenEing','cArtInTrAkt','cZwSumme','cPresseCode','cArtSatz','cManEing','cManBedRab','cMxNAutoRab','cPersRabatt','cAutoRabatt','cBonbezug','cKombiRab','dPluNr','dMenge','dVKPreis','dUmsatz','dSTPreis','siAbteilung','siRabSatz','dRabSumme','dArtNr','sArtBez','dEKPreis','cArtBezLang','siHauptabt','lRegalnr','siFachfolnr','lAktionsnr','lWgr','siPrzMwSt','dRabWertBon','dPreisUebWert','dVirtRabWert','lRabAktNr','lBAAutorNr','siBACodeNr','ClientID','C4SqlClientDatabaseName']
    # For 55 column table versions, set the column names
    else:
        #print(f"{filename}, Columns: {len(col)}, DBName: {df.iat[0, 1]},  Date Last Updated: {df.iat[0, 54]}")
        if df.iat[0, 1][0:2].upper() == 'DW':
            df.columns = ['ClientID','C4SqlClientDatabaseName','cMarktNr','dtDatum','siKassNr','lBonNr','lPosNr','cStornoZle','cStorZleManag','cRueckZahlArt','cGewArt','cPfandArt','cNegArt','cStornoArt','cPreisInfo','cPreisUeb','cRabatt','cWaagArt','cDatenEing','cArtInTrAkt','cZwSumme','cPresseCode','cArtSatz','cManEing','cManBedRab','cMxNAutoRab','cPersRabatt','cAutoRabatt','cBonbezug','cKombiRab','dPluNr','dMenge','dVKPreis','dUmsatz','dSTPreis','siAbteilung','siRabSatz','dRabSumme','dArtNr','sArtBez','dEKPreis','cArtBezLang','siHauptabt','lRegalnr','siFachfolnr','lAktionsnr','lWgr','siPrzMwSt','dRabWertBon','dPreisUebWert','dVirtRabWert','lRabAktNr','lBAAutorNr','siBACodeNr','C4SQL_LastUpdated']
        else:
            df.columns = ['cMarktNr','dtDatum','siKassNr','lBonNr','lPosNr','cStornoZle','cStorZleManag','cRueckZahlArt','cGewArt','cPfandArt','cNegArt','cStornoArt','cPreisInfo','cPreisUeb','cRabatt','cWaagArt','cDatenEing','cArtInTrAkt','cZwSumme','cPresseCode','cArtSatz','cManEing','cManBedRab','cMxNAutoRab','cPersRabatt','cAutoRabatt','cBonbezug','cKombiRab','dPluNr','dMenge','dVKPreis','dUmsatz','dSTPreis','siAbteilung','siRabSatz','dRabSumme','dArtNr','sArtBez','dEKPreis','cArtBezLang','siHauptabt','lRegalnr','siFachfolnr','lAktionsnr','lWgr','siPrzMwSt','dRabWertBon','dPreisUebWert','dVirtRabWert','lRabAktNr','lBAAutorNr','siBACodeNr','ClientID','C4SqlClientDatabaseName','C4SQL_LastUpdated']

    # Create a new DataFrame using the common subset of columns from the DBTZBA table
    df_common = df.loc[:, ['ClientID','C4SqlClientDatabaseName','cMarktNr','dtDatum','siKassNr','lBonNr','lPosNr','cStornoZle','cStorZleManag','cRueckZahlArt','cGewArt','cPfandArt','cNegArt','cStornoArt','cPreisInfo','cPreisUeb','cRabatt','cWaagArt','cDatenEing','cArtInTrAkt','cZwSumme','cPresseCode','cArtSatz','cManEing','cManBedRab','cMxNAutoRab','cPersRabatt','cAutoRabatt','cBonbezug','cKombiRab','dPluNr','dMenge','dVKPreis','dUmsatz','dSTPreis','siAbteilung','siRabSatz','dRabSumme','dArtNr','sArtBez','dEKPreis','cArtBezLang','siHauptabt','lRegalnr','siFachfolnr','lAktionsnr','lWgr','siPrzMwSt','dRabWertBon','dPreisUebWert','dVirtRabWert','lRabAktNr','lBAAutorNr','siBACodeNr']]
    try:
        # Set up a spark DataFrame here to handle the write (append) to the new table
        spark_df = spark.createDataFrame(df_common)
        spark_df.write.mode("append").format("delta").saveAsTable("DBTZBA")
    except Exception as e:
        print(f"Error processing file: {filename}, Trying Month level batching.")

        # Iterate through the range of months represented in dtDatum values
        for unqValue in pds.to_datetime(df_common['dtDatum']).dt.strftime('%Y-%m').unique():

            # Filter the DBTZBA DataFrame for the selected month
            spark_df = spark.createDataFrame(df_common[pds.to_datetime(df_common['dtDatum']).dt.strftime('%Y-%m') == unqValue])

            # Append the csv data to the new table
            # We use spark here as it supports writing to tables
            try:
                spark_df.write.mode("append").format("delta").saveAsTable("DBTZBA")
            except Exception as e2:
                print(f"Error processing file: {filename}, Error: {e2}")

View solution in original post

2 REPLIES 2
AndrewWestran
Helper I
Helper I

Problem solved.

Assuming the issue was related to the large file size, I decided to 'batch' the data load for files that failed to load whole.

Updated Code:

 

import os
import glob
import pandas as pds
import warnings

warnings.filterwarnings("ignore", message="iteritems is deprecated")

#spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")

directory = r"/lakehouse/default/Files/C4SQLStage/DBTZBA/*.csv"

for filename in glob.glob(directory):
    f = os.path.join(directory, filename)

    # Load each csv file into a new Pandas DataFrame
    # We use Pandas here as it supports multi-character delimiters
    df = pds.read_csv(f, delimiter='', engine='python')

    # Get column details for checking the number of columns
    col = df.columns

    # For 54 column table versions, set the column names
    if len(col) != 55:
        #print(f"{filename}, Columns: {len(col)}, DBName: {df.iat[0, 1]},  Date Last Updated: {df.iat[0, 53]}")
        if df.iat[0, 1][0:2].upper() == 'DW':
            df.columns = ['ClientID','C4SqlClientDatabaseName','cMarktNr','dtDatum','siKassNr','lBonNr','lPosNr','cStornoZle','cStorZleManag','cRueckZahlArt','cGewArt','cPfandArt','cNegArt','cStornoArt','cPreisInfo','cPreisUeb','cRabatt','cWaagArt','cDatenEing','cArtInTrAkt','cZwSumme','cPresseCode','cArtSatz','cManEing','cManBedRab','cMxNAutoRab','cPersRabatt','cAutoRabatt','cBonbezug','cKombiRab','dPluNr','dMenge','dVKPreis','dUmsatz','dSTPreis','siAbteilung','siRabSatz','dRabSumme','dArtNr','sArtBez','dEKPreis','cArtBezLang','siHauptabt','lRegalnr','siFachfolnr','lAktionsnr','lWgr','siPrzMwSt','dRabWertBon','dPreisUebWert','dVirtRabWert','lRabAktNr','lBAAutorNr','siBACodeNr']
        else:
            df.columns = ['cMarktNr','dtDatum','siKassNr','lBonNr','lPosNr','cStornoZle','cStorZleManag','cRueckZahlArt','cGewArt','cPfandArt','cNegArt','cStornoArt','cPreisInfo','cPreisUeb','cRabatt','cWaagArt','cDatenEing','cArtInTrAkt','cZwSumme','cPresseCode','cArtSatz','cManEing','cManBedRab','cMxNAutoRab','cPersRabatt','cAutoRabatt','cBonbezug','cKombiRab','dPluNr','dMenge','dVKPreis','dUmsatz','dSTPreis','siAbteilung','siRabSatz','dRabSumme','dArtNr','sArtBez','dEKPreis','cArtBezLang','siHauptabt','lRegalnr','siFachfolnr','lAktionsnr','lWgr','siPrzMwSt','dRabWertBon','dPreisUebWert','dVirtRabWert','lRabAktNr','lBAAutorNr','siBACodeNr','ClientID','C4SqlClientDatabaseName']
    # For 55 column table versions, set the column names
    else:
        #print(f"{filename}, Columns: {len(col)}, DBName: {df.iat[0, 1]},  Date Last Updated: {df.iat[0, 54]}")
        if df.iat[0, 1][0:2].upper() == 'DW':
            df.columns = ['ClientID','C4SqlClientDatabaseName','cMarktNr','dtDatum','siKassNr','lBonNr','lPosNr','cStornoZle','cStorZleManag','cRueckZahlArt','cGewArt','cPfandArt','cNegArt','cStornoArt','cPreisInfo','cPreisUeb','cRabatt','cWaagArt','cDatenEing','cArtInTrAkt','cZwSumme','cPresseCode','cArtSatz','cManEing','cManBedRab','cMxNAutoRab','cPersRabatt','cAutoRabatt','cBonbezug','cKombiRab','dPluNr','dMenge','dVKPreis','dUmsatz','dSTPreis','siAbteilung','siRabSatz','dRabSumme','dArtNr','sArtBez','dEKPreis','cArtBezLang','siHauptabt','lRegalnr','siFachfolnr','lAktionsnr','lWgr','siPrzMwSt','dRabWertBon','dPreisUebWert','dVirtRabWert','lRabAktNr','lBAAutorNr','siBACodeNr','C4SQL_LastUpdated']
        else:
            df.columns = ['cMarktNr','dtDatum','siKassNr','lBonNr','lPosNr','cStornoZle','cStorZleManag','cRueckZahlArt','cGewArt','cPfandArt','cNegArt','cStornoArt','cPreisInfo','cPreisUeb','cRabatt','cWaagArt','cDatenEing','cArtInTrAkt','cZwSumme','cPresseCode','cArtSatz','cManEing','cManBedRab','cMxNAutoRab','cPersRabatt','cAutoRabatt','cBonbezug','cKombiRab','dPluNr','dMenge','dVKPreis','dUmsatz','dSTPreis','siAbteilung','siRabSatz','dRabSumme','dArtNr','sArtBez','dEKPreis','cArtBezLang','siHauptabt','lRegalnr','siFachfolnr','lAktionsnr','lWgr','siPrzMwSt','dRabWertBon','dPreisUebWert','dVirtRabWert','lRabAktNr','lBAAutorNr','siBACodeNr','ClientID','C4SqlClientDatabaseName','C4SQL_LastUpdated']

    # Create a new DataFrame using the common subset of columns from the DBTZBA table
    df_common = df.loc[:, ['ClientID','C4SqlClientDatabaseName','cMarktNr','dtDatum','siKassNr','lBonNr','lPosNr','cStornoZle','cStorZleManag','cRueckZahlArt','cGewArt','cPfandArt','cNegArt','cStornoArt','cPreisInfo','cPreisUeb','cRabatt','cWaagArt','cDatenEing','cArtInTrAkt','cZwSumme','cPresseCode','cArtSatz','cManEing','cManBedRab','cMxNAutoRab','cPersRabatt','cAutoRabatt','cBonbezug','cKombiRab','dPluNr','dMenge','dVKPreis','dUmsatz','dSTPreis','siAbteilung','siRabSatz','dRabSumme','dArtNr','sArtBez','dEKPreis','cArtBezLang','siHauptabt','lRegalnr','siFachfolnr','lAktionsnr','lWgr','siPrzMwSt','dRabWertBon','dPreisUebWert','dVirtRabWert','lRabAktNr','lBAAutorNr','siBACodeNr']]
    try:
        # Set up a spark DataFrame here to handle the write (append) to the new table
        spark_df = spark.createDataFrame(df_common)
        spark_df.write.mode("append").format("delta").saveAsTable("DBTZBA")
    except Exception as e:
        print(f"Error processing file: {filename}, Trying Month level batching.")

        # Iterate through the range of months represented in dtDatum values
        for unqValue in pds.to_datetime(df_common['dtDatum']).dt.strftime('%Y-%m').unique():

            # Filter the DBTZBA DataFrame for the selected month
            spark_df = spark.createDataFrame(df_common[pds.to_datetime(df_common['dtDatum']).dt.strftime('%Y-%m') == unqValue])

            # Append the csv data to the new table
            # We use spark here as it supports writing to tables
            try:
                spark_df.write.mode("append").format("delta").saveAsTable("DBTZBA")
            except Exception as e2:
                print(f"Error processing file: {filename}, Error: {e2}")
AndrewWestran
Helper I
Helper I

Full error message:

 

Error: An error occurred while calling o4931.saveAsTable. : org.apache.spark.SparkException: Job aborted. at org.apache.spark.sql.errors.QueryExecutionErrors$.jobAbortedError(QueryExecutionErrors.scala:651) at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:283) at org.apache.spark.sql.delta.files.TransactionalWrite.$anonfun$writeFiles$1(TransactionalWrite.scala:456) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:111) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:183) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:97) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66) at org.apache.spark.sql.delta.files.TransactionalWrite.writeFiles(TransactionalWrite.scala:391) at org.apache.spark.sql.delta.files.TransactionalWrite.writeFiles$(TransactionalWrite.scala:355) at org.apache.spark.sql.delta.OptimisticTransaction.writeFiles(OptimisticTransaction.scala:103) at org.apache.spark.sql.delta.files.TransactionalWrite.writeFiles(TransactionalWrite.scala:221) at org.apache.spark.sql.delta.files.TransactionalWrite.writeFiles$(TransactionalWrite.scala:218) at org.apache.spark.sql.delta.OptimisticTransaction.writeFiles(OptimisticTransaction.scala:103) at org.apache.spark.sql.delta.commands.WriteIntoDelta.write(WriteIntoDelta.scala:337) at org.apache.spark.sql.delta.commands.WriteIntoDelta.$anonfun$run$1(WriteIntoDelta.scala:98) at org.apache.spark.sql.delta.commands.WriteIntoDelta.$anonfun$run$1$adapted(WriteIntoDelta.scala:91) at org.apache.spark.sql.delta.DeltaLog.withNewTransaction(DeltaLog.scala:252) at org.apache.spark.sql.delta.commands.WriteIntoDelta.run(WriteIntoDelta.scala:91) at org.apache.spark.sql.delta.catalog.WriteIntoDeltaBuilder$$anon$1$$anon$2.insert(DeltaTableV2.scala:279) at org.apache.spark.sql.execution.datasources.v2.SupportsV1Write.writeWithV1(V1FallbackWriters.scala:79) at org.apache.spark.sql.execution.datasources.v2.SupportsV1Write.writeWithV1$(V1FallbackWriters.scala:78) at org.apache.spark.sql.execution.datasources.v2.AppendDataExecV1.writeWithV1(V1FallbackWriters.scala:34) at org.apache.spark.sql.execution.datasources.v2.V1FallbackWriters.run(V1FallbackWriters.scala:66) at org.apache.spark.sql.execution.datasources.v2.V1FallbackWriters.run$(V1FallbackWriters.scala:65) at org.apache.spark.sql.execution.datasources.v2.AppendDataExecV1.run(V1FallbackWriters.scala:34) at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:43) at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:43) at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:49) at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:108) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:111) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:183) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:97) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66) at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:108) at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:104) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:31) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:560) at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:104) at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:88) at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:82) at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:136) at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:901) at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:661) at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:595) at sun.reflect.GeneratedMethodAccessor252.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:750) Caused by: org.apache.spark.SparkException: Exception thrown in awaitResult: at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:301) at org.apache.spark.sql.execution.OptimizeWriteExchangeExec.doExecute(OptimizeWriteExchangeExec.scala:65) at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:230) at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:268) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:265) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:226) at org.apache.spark.sql.delta.constraints.DeltaInvariantCheckerExec.doExecute(DeltaInvariantCheckerExec.scala:72) at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:230) at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:268) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:265) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:226) at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:218) ... 61 more Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Serialized task 4083:0 was 182487775 bytes, which exceeds max allowed: spark.rpc.message.maxSize (134217728 bytes). Consider increasing spark.rpc.message.maxSize or using broadcast variables for large values. at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2682) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2618) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2617) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2617) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1190) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1190) at scala.Option.foreach(Option.scala:407) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1190) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2870) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2812) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2801) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) Error processing file: /lakehouse/default/Files/C4SQLStage/DBTZBA/KW10972.dbo.DBTZBA.csv, Error: An error occurred while calling o4958.saveAsTable. : org.apache.spark.SparkException: Job aborted. at org.apache.spark.sql.errors.QueryExecutionErrors$.jobAbortedError(QueryExecutionErrors.scala:651) at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:283) at org.apache.spark.sql.delta.files.TransactionalWrite.$anonfun$writeFiles$1(TransactionalWrite.scala:456) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:111) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:183) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:97) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66) at org.apache.spark.sql.delta.files.TransactionalWrite.writeFiles(TransactionalWrite.scala:391) at org.apache.spark.sql.delta.files.TransactionalWrite.writeFiles$(TransactionalWrite.scala:355) at org.apache.spark.sql.delta.OptimisticTransaction.writeFiles(OptimisticTransaction.scala:103) at org.apache.spark.sql.delta.files.TransactionalWrite.writeFiles(TransactionalWrite.scala:221) at org.apache.spark.sql.delta.files.TransactionalWrite.writeFiles$(TransactionalWrite.scala:218) at org.apache.spark.sql.delta.OptimisticTransaction.writeFiles(OptimisticTransaction.scala:103) at org.apache.spark.sql.delta.commands.WriteIntoDelta.write(WriteIntoDelta.scala:337) at org.apache.spark.sql.delta.commands.WriteIntoDelta.$anonfun$run$1(WriteIntoDelta.scala:98) at org.apache.spark.sql.delta.commands.WriteIntoDelta.$anonfun$run$1$adapted(WriteIntoDelta.scala:91) at org.apache.spark.sql.delta.DeltaLog.withNewTransaction(DeltaLog.scala:252) at org.apache.spark.sql.delta.commands.WriteIntoDelta.run(WriteIntoDelta.scala:91) at org.apache.spark.sql.delta.catalog.WriteIntoDeltaBuilder$$anon$1$$anon$2.insert(DeltaTableV2.scala:279) at org.apache.spark.sql.execution.datasources.v2.SupportsV1Write.writeWithV1(V1FallbackWriters.scala:79) at org.apache.spark.sql.execution.datasources.v2.SupportsV1Write.writeWithV1$(V1FallbackWriters.scala:78) at org.apache.spark.sql.execution.datasources.v2.AppendDataExecV1.writeWithV1(V1FallbackWriters.scala:34) at org.apache.spark.sql.execution.datasources.v2.V1FallbackWriters.run(V1FallbackWriters.scala:66) at org.apache.spark.sql.execution.datasources.v2.V1FallbackWriters.run$(V1FallbackWriters.scala:65) at org.apache.spark.sql.execution.datasources.v2.AppendDataExecV1.run(V1FallbackWriters.scala:34) at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:43) at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:43) at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:49) at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:108) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:111) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:183) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:97) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66) at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:108) at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:104) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:31) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:560) at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:104) at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:88) at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:82) at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:136) at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:901) at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:661) at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:595) at sun.reflect.GeneratedMethodAccessor252.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:750) Caused by: org.apache.spark.SparkException: Exception thrown in awaitResult: at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:301) at org.apache.spark.sql.execution.OptimizeWriteExchangeExec.doExecute(OptimizeWriteExchangeExec.scala:65) at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:230) at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:268) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:265) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:226) at org.apache.spark.sql.delta.constraints.DeltaInvariantCheckerExec.doExecute(DeltaInvariantCheckerExec.scala:72) at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:230) at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:268) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:265) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:226) at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:218) ... 61 more Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Serialized task 4155:0 was 144432865 bytes, which exceeds max allowed: spark.rpc.message.maxSize (134217728 bytes). Consider increasing spark.rpc.message.maxSize or using broadcast variables for large values. at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2682) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2618) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2617) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2617) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1190) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1190) at scala.Option.foreach(Option.scala:407) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1190) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2870) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2812) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2801) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)

Helpful resources

Announcements
Europe Fabric Conference

Europe’s largest Microsoft Fabric Community Conference

Join the community in Stockholm for expert Microsoft Fabric learning including a very exciting keynote from Arun Ulag, Corporate Vice President, Azure Data.

Expanding the Synapse Forums

New forum boards available in Synapse

Ask questions in Data Engineering, Data Science, Data Warehouse and General Discussion.

RTI Forums Carousel3

New forum boards available in Real-Time Intelligence.

Ask questions in Eventhouse and KQL, Eventstream, and Reflex.

MayFBCUpdateCarousel

Fabric Monthly Update - May 2024

Check out the May 2024 Fabric update to learn about new features.