Skip to main content
cancel
Showing results for 
Search instead for 
Did you mean: 

Earn a 50% discount on the DP-600 certification exam by completing the Fabric 30 Days to Learn It challenge.

Reply
mkumar_upstech
Regular Visitor

Notebook runs on first Iteration however it fails on the second iteration

I have created a notebook that loads data from one lakehouse to another. if tables do not exist in the destination it creates a new table. if Exists it performs the merge operation. to do that I am doing the below major steps 
Step 1: Recieve JSON formatted parameter values from the data pipeline.  (watermark_id, source_table_name, destination_table_name, and key_columns) see the parameter sample below 

 

watermark_array = """ [
		{
			"watermark_id": 7,
			"source_database_name": "lh_bronze",
			"source_table_name": "table_1",
			"source_key_column_name": "KeyColumn",
			"destination_database_name": "lh_silver",
			"destination_table_name": "table_1",
			"destination_key_column_name": "KeyColumn"
		},
		{
			"watermark_id": 8,
			"source_database_name": "lh_bronze",
			"source_table_name": "table_2",
			"source_key_column_name": "KeyColumn",
			"destination_database_name": "lh_silver",
			"destination_table_name": "table_2",
			"destination_key_column_name": "KeyColumn"
		}]

 

Step 2: convert the parameter in datagrams and loop through each table to perform the merge.  


Step 3: executes a function that calls another function that checks the existence of the table in the source and destination 

 

 

for row in df.collect():
    watermark_id = [row][0]['watermark_id']
    source_database_name = [row][0]['source_database_name']
    source_table_name = [row][0]['source_table_name']
    source_key_column_name = [row][0]['source_key_column_name']
    destination_database_name = [row][0]['destination_database_name']
    destination_table_name = [row][0]['destination_table_name']
    destination_key_column_name = [row][0]['destination_key_column_name']

    print(watermark_id,
            source_database_name,
            source_table_name, 
            source_key_column_name, 
            destination_database_name , 
            destination_table_name, 
            destination_key_column_name )

    run_data_load(source_lakehouse = source_database_name, 
                    source_tablename = source_table_name, 
                    source_table_key_columns = source_key_column_name, 
                    destination_lakehouse = destination_database_name,
                    destination_tablename = destination_table_name,
                    destination_key_columns  = destination_key_column_name)

 

 

Step 4: in run_data_load we call the below function 

 

 

def table_exists(lakehouse_name, table_name):  
    tables = spark.catalog.listTables(lakehouse_name)  
    table_exists = any(table.name == table_name for table in tables) 
    return table_exists

 

 

 

Step 5: when it hits the line spark.catalog.listTable(lakehouse_name) 
it fails with the below error in the second table processing. However, if I try to re-run it fails in the first table itself, If I restart the session it runs for the first table and fails for the next one. 

 

[PARSE_SYNTAX_ERROR] Syntax error at or near end of input.(line 1, pos 0)

== SQL ==

^^^

---------------------------------------------------------------------------
ParseException                            Traceback (most recent call last)
Cell In[53], line 18
      8 destination_key_column_name = [row][0]['destination_key_column_name']
     10 print(watermark_id,
     11         source_database_name,
     12         source_table_name, 
   (...)
     15         destination_table_name, 
     16         destination_key_column_name )
---> 18 run_data_load(source_lakehouse = source_database_name, 
     19                 source_tablename = source_table_name, 
     20                 source_table_key_columns = source_key_column_name, 
     21                 destination_lakehouse = destination_database_name,
     22                 destination_tablename = destination_table_name,
     23                 destination_key_columns  = destination_key_column_name)
     26 # print(watermark_id,source_database_name,source_table_name, source_key_column_name, destination_database_name , destination_table_name, destination_key_column_name )

Cell In[47], line 4, in run_data_load(source_lakehouse, source_tablename, source_table_key_columns, destination_lakehouse, destination_tablename, destination_key_columns)
      2 print(1)
      3 source_normalized_data = normaloize_json_master(source_lakehouse, source_tablename)
----> 4 destination_column_list = get_column_list(destination_lakehouse,destination_tablename)
      5 try:
      6     if source_normalized_data != None:

Cell In[44], line 3, in get_column_list(lakehouse_name, table_name)
      1 def get_column_list(lakehouse_name, table_name):
----> 3     if table_exists(lakehouse_name, table_name): 
      4         print("dd", lakehouse_name, table_name)
      5         spark.catalog.setCurrentDatabase(lakehouse_name)

Cell In[38], line 3, in table_exists(lakehouse_name, table_name)
      1 def table_exists(lakehouse_name, table_name): 
      2     print("te1")
----> 3     tables = spark.catalog.listTables(lakehouse_name) 
      4     print("te2")
      5     table_exists = any(table.name == table_name for table in tables)

File /opt/spark/python/lib/pyspark.zip/pyspark/sql/catalog.py:309, in Catalog.listTables(self, dbName)
    307 if dbName is None:
    308     dbName = self.currentDatabase()
--> 309 iter = self._jcatalog.listTables(dbName).toLocalIterator()
    310 tables = []
    311 while iter.hasNext():

File ~/cluster-env/trident_env/lib/python3.10/site-packages/py4j/java_gateway.py:1322, in JavaMember.__call__(self, *args)
   1316 command = proto.CALL_COMMAND_NAME +\
   1317     self.command_header +\
   1318     args_command +\
   1319     proto.END_COMMAND_PART
   1321 answer = self.gateway_client.send_command(command)
-> 1322 return_value = get_return_value(
   1323     answer, self.gateway_client, self.target_id, self.name)
   1325 for temp_arg in temp_args:
   1326     if hasattr(temp_arg, "_detach"):

File /opt/spark/python/lib/pyspark.zip/pyspark/errors/exceptions/captured.py:175, in capture_sql_exception.<locals>.deco(*a, **kw)
    171 converted = convert_exception(e.java_exception)
    172 if not isinstance(converted, UnknownException):
    173     # Hide where the exception came from that shows a non-Pythonic
    174     # JVM exception message.
--> 175     raise converted from None
    176 else:
    177     raise

ParseException: 
[PARSE_SYNTAX_ERROR] Syntax error at or near end of input.(line 1, pos 0)

== SQL ==

^^^

 


Not sure what can be the issue with it. Is there any possible best practice that I can use to avoid this issue? 

1 ACCEPTED SOLUTION
mkumar_upstech
Regular Visitor

Hello @v-gchenna-msft Thanks for your continuous follow-up. Indeed I have a solution, As you know the default environment for Fabric has spark 3.4 and delta 2.4 which is unstable at the moment. Therefore, we created a new environment with spark 3.3 and delta 2.2 and it works like a charm. 

View solution in original post

4 REPLIES 4
mkumar_upstech
Regular Visitor

Hello @v-gchenna-msft Thanks for your continuous follow-up. Indeed I have a solution, As you know the default environment for Fabric has spark 3.4 and delta 2.4 which is unstable at the moment. Therefore, we created a new environment with spark 3.3 and delta 2.2 and it works like a charm. 

v-gchenna-msft
Community Support
Community Support

Hi @mkumar_upstech ,

Thanks for using Fabric Community.
As a part of debugging, can you please try below code -

Present Code:

def get_column_list(lakehouse_name, table_name):
    if table_exists(lakehouse_name, table_name):

 
Please update to this and have a try:

def get_column_list(lakehouse_name, table_name):
   if any(table.name == table_name for table in spark.catalog.listTables(lakehouse_name)):


Let me know if you have further queries.

Hi @mkumar_upstech ,

We haven’t heard from you on the last response and was just checking back to see if you have a resolution yet.
In case if you have any resolution please do share that same with the community as it can be helpful to others.
Otherwise, will respond back with the more details and we will try to help.

Hello @mkumar_upstech ,

We haven’t heard from you on the last response and was just checking back to see if you have a resolution yet .
In case if you have any resolution please do share that same with the community as it can be helpful to others .
If you have any question relating to the current thread, please do let us know and we will try out best to help you.
In case if you have any other question on a different issue, we request you to open a new thread .

Helpful resources

Announcements
Expanding the Synapse Forums

New forum boards available in Synapse

Ask questions in Data Engineering, Data Science, Data Warehouse and General Discussion.

LearnSurvey

Fabric certifications survey

Certification feedback opportunity for the community.

Top Solution Authors