Skip to main content
cancel
Showing results for 
Search instead for 
Did you mean: 

Grow your Fabric skills and prepare for the DP-600 certification exam by completing the latest Microsoft Fabric challenge.

Reply
GregMarbais
Advocate V
Advocate V

Incremental Refresh with Deleting Duplicate Records before Appending Staging Data

Hey all, I'd appreciate some help because I'm stuck on how to delete duplicate records from a table before appending the records in my staging table.

Scenario: I have a table in an on-prem data source. I'm able to copy data using a Data Pipeline (This was a fantastic recent update). I can pass a query to get the latest records but the records based on an updated_at datetime column and stick those records in a staging table. The issue is that the records can be updated so an order that was created can be updated to include a shipping date when it actually ships out the door. So I need a way to delete the record in the production table before appending the staging data.

 

Here's what I've got: (This is a simplified version - I'm actually iterating over a handful of tables but the steps are the same)

  1. A table that I've been doing full refereshes on (I'm calling this the "production table") but want to move to incremental refresh (it's a large table but not unweildy - the big issue is just the amount of time it takes to do a full refresh).
  2. A DataflowGen2 that grabs the max updated_at datetime for the table and puts it in a lakehouse table.
  3. A Data Pipeline
    1. Looks up the last updated_at datetime for the table
    2. A Copy Data activity that uses the latest udated_at datetime in the query to our on-prem data source and puts the data in a staging table in the lakehouse
  4. <This is what I'm missing> A notebook to delete the rows in the production table with matching IDs in the staging table.

So I know the brute force way is to build a notebook that loads the production table using a left anti query then load the staging table into the notebook then append the staging data to the production table and overwrite the production table in the lakehouse. It seems like a very compute-heavy approach. It's also relatively time consuming too which means less frequent refreshes. But it does work.

 

So I'm thinking there has to be a better way that I'm just not thinking about. I've tried:

  1. Running a DELETE SQL command but the structure I found here but it won't work since it generates an error about compound  statements not being supported.
      DELETE FROM production_table AS t1
        WHERE EXISTS (SELECT id FROM staging_table as t2 WHERE t1.id = t2.id)

  2. I tried iterating over the staging table and calling the DELETE command for a specific ID
      DELETE FROM production_table AS t1
        WHERE t1.id = 'id_passed_from_staging_table_iteration'

Prior to choosing to use a dataflow gen2 to build the table with the latest updated_at date, I did try building an SQL script to create a table but that generated an error. I saw the structure in this Youtube video. I could create a schema but couldn't create a table (I did talk to tech support and they said table creation isn't supported in the Lakehouse since we're accessing parquet files via an SQL endpoint).

 

Most of the incremental refresh documentation doesn't include deleting duplicate rows from the production table before appending the staging table. They were very helpful in getting this far.

 

So I'm at this point where I have a functional solution but I feel like it's unnecessarily resource intensive. I'm hoping to tap into the community to see if there's a better way. I'm a citizen developer so not super technical but have done a lot with PowerBI and already built out a lot in Fabric. But this one has me stumped and I figure it's because I haven't learned it yet or I'm looking for a solution that doesn't exist.

 

Please let me know if anyone has figured this out or has any ideas or if I'm just nuts and looking to do the impossible.

1 ACCEPTED SOLUTION
GregMarbais
Advocate V
Advocate V

Hello Everyone, thanks for the ideas on this. I've been able to successfully build out an Incremental refresh structure without using a notebook. Some notes on my use case:

  1. Existing rows in my tables can be updated so I’ll need to delete the old data before appending the new data when pulling in incremental data
  2. I can’t risk using a notebook for this. I'm using notebooks to push data to an API and can't risk a conflict where the API fails because I'm already running too many notebooks for incremental refreshes.
  3. Our data sources are mostly on-prem.
  4. We've subscribed to Fabric capacity (F64)
  5. I'm a citizen developer and we don't have many data analytics folks who have experience with Data Factory so I'm going to rely more on the citizen dev tools where possible - although learning Data Factory wasn't as hard as I thought it was going to be when they first launched Fabric into Public Preview.
  6. I'm following the medallion model/architecture though I use names that my users who aren't familiar with bronze/silver/gold would understand - specifically Raw/Cleaned/Refined.

 

So here's some details on how I've set this up. I'll post additional details.

What I set up:

  1. Staging Lakehouse - this is needed to copy data from an external source into Fabric (I tried going direct to a Data Warehouse but you can’t – the data needs to be staged)
  2. Data Warehouse for Raw Data - Only data warehouses support deleting rows in a table using a script so you can’t use a Lakehouse for incremental refresh (unless you use a notebook).
  3. Data Warehouse for Cleaned Data – For the tables that get cleaned.
  4. Data Warehouse for the Refined Data - For the tables that are refined.
  5. Lakehouse - I'm not sure what to call this but really the purpose here is just a repository of links to the tables in the data warehouses. It gives me a layer of abstraction. So when I have someone using a lakehouse in another workspace that needs access to this data, I link to this Lakehouse and if I ever change a table - like if a table needs cleaned that didn't before, I can just update the link in this Lakehouse and I don't need to remember to update any other lakehouses that my users are building against.

Dataflow Gen 2: Admin Table – I start with a Dataflow Gen2 which is just a manual set of tables that I use to iterate over for pulling in data. So that includes these tables and the columns listed:

  • Tables to import raw data: Table_Name, Date_Column, Unique_Identifier_Column, Dataflow_Needed*
  • Tables to clean: Table_Name, Date_Column, Unique_Identifier_Column
  • Tables to refine: Source_Table_Name, Source_Data_Warehouse, Schema,  Refined_Table_Name, Date_Column, Unique_Identifier_Column

*This is 1 if the table needs to use a dataflow to copy into the data warehouse instead of a straight copy in Data Factory. I’ve found that long text fields that exceed varchar(8000) need to be copied using a dataflow or custom scripting. Since I’m trying to stay low-code, I’m going with the dataflow gen2 for that. But you could add a column to pull in a script for that specific table if preferred.

 

A data pipeline manages everything. Here’s what’s in it:

  1. Lookup Activity: Looks up table in staging lakehouse with the list of tables to import
  2. ForEach Activity: Iterates over the rows in the list of tables to import
    1. Script: Grab the max date (value in Date_Column) for the table (value in Table_Name)
    2. Copy data Activity: Copies the data from the on-prem data source after the max date for the table and copies it to the staging lakehouse. I have this add “staging__” to the front of the table name.
    3. Script: Delete matching IDs (from the Unique_Identifier_Column) for the table in the Data Warehouse for Raw Data that match the staging table in the staging lakehouse.
    4. If Condition: Checks to see if the table requires a Dataflow to copy
      1.      If the table does NOT need a dataflow
        1. Script: Insert data from the staging table in the staging lakehouse into the table in the Data Warehouse for Raw Data
      2.      If the table does need a dataflow then nothing happens – that’s covered in step 3
    5. Dataflow Gen2: Append staging data from staging lakehouse to the tables in the data warehouse for raw data – this doesn’t do any processing of the data
    6. Lookup Activity: Looks up table in staging lakehouse with list of tables that need cleaned.
    7. ForEach Activity: Iterates over the rows in the list of tables to clean
      1. Script: Grab the max date (value in Date_Column) for the table (value in Table_Name)
      2. Copy data Activity: Copies the data from the table in the Data Warehouse for raw data after the max date for the table and copies it to the staging lakehouse. I have this add “cleanstaging__” to the front of the table name.
      3. Script: Delete matching IDs (from the Unique_Identifier_Column) for the table in the Data Warehouse for Cleaned Data that match the staging table in the staging lakehouse.
    8. Dataflow Gen2: Cleans and Appends data from staging lakehouse (from the version starting with “cleanstaging__” to the tables in the data warehouse for cleaned data – this dataflow does take steps to clean the tables before appending them
    9. Lookup Activity: Looks up the table in the staging lakehouse with the list of tables that need refined.
    10. ForEach Activity: Iterates over the rows in the list of tables to clean
      1. Script: Grab the max date (value in Date_Column) for the table (value in Refined_Table_Name)
      2. Copy data Activity: Copies the data from the table in the Data Warehouse for cleaned data after the max date for the table and copies it to the staging lakehouse. I have this add “refinestaging__” to the front of the table name.
      3. Script: Delete matching IDs (from the Unique_Identifier_Column) for the table in the data warehouse for refined data that match the staging table in the staging lakehouse.
    11. Dataflow Gen2: Refines and Appends data from staging lakehouse (from the version starting with “refinestaging __” to the tables in the data warehouse for refined data – this dataflow does take steps to refine the tables before appending them

 

There are many nuances so let me know if you have questions. The pattern is pretty simple and basically repeated for each step in the medallion architecture. It would be easy to simplify clean and refining the data. And most of the dataflow gen2 entries could be replaced by SQL scripts if needed/preferred.

 

Notebooks are easier than all of this. But the limitation on number of notebooks running at one time could cause conflicts with other scheduled notebooks/pipelines (with notebooks) or even with your users who are running ad hoc notebooks.

View solution in original post

7 REPLIES 7
GregMarbais
Advocate V
Advocate V

Hello Everyone, thanks for the ideas on this. I've been able to successfully build out an Incremental refresh structure without using a notebook. Some notes on my use case:

  1. Existing rows in my tables can be updated so I’ll need to delete the old data before appending the new data when pulling in incremental data
  2. I can’t risk using a notebook for this. I'm using notebooks to push data to an API and can't risk a conflict where the API fails because I'm already running too many notebooks for incremental refreshes.
  3. Our data sources are mostly on-prem.
  4. We've subscribed to Fabric capacity (F64)
  5. I'm a citizen developer and we don't have many data analytics folks who have experience with Data Factory so I'm going to rely more on the citizen dev tools where possible - although learning Data Factory wasn't as hard as I thought it was going to be when they first launched Fabric into Public Preview.
  6. I'm following the medallion model/architecture though I use names that my users who aren't familiar with bronze/silver/gold would understand - specifically Raw/Cleaned/Refined.

 

So here's some details on how I've set this up. I'll post additional details.

What I set up:

  1. Staging Lakehouse - this is needed to copy data from an external source into Fabric (I tried going direct to a Data Warehouse but you can’t – the data needs to be staged)
  2. Data Warehouse for Raw Data - Only data warehouses support deleting rows in a table using a script so you can’t use a Lakehouse for incremental refresh (unless you use a notebook).
  3. Data Warehouse for Cleaned Data – For the tables that get cleaned.
  4. Data Warehouse for the Refined Data - For the tables that are refined.
  5. Lakehouse - I'm not sure what to call this but really the purpose here is just a repository of links to the tables in the data warehouses. It gives me a layer of abstraction. So when I have someone using a lakehouse in another workspace that needs access to this data, I link to this Lakehouse and if I ever change a table - like if a table needs cleaned that didn't before, I can just update the link in this Lakehouse and I don't need to remember to update any other lakehouses that my users are building against.

Dataflow Gen 2: Admin Table – I start with a Dataflow Gen2 which is just a manual set of tables that I use to iterate over for pulling in data. So that includes these tables and the columns listed:

  • Tables to import raw data: Table_Name, Date_Column, Unique_Identifier_Column, Dataflow_Needed*
  • Tables to clean: Table_Name, Date_Column, Unique_Identifier_Column
  • Tables to refine: Source_Table_Name, Source_Data_Warehouse, Schema,  Refined_Table_Name, Date_Column, Unique_Identifier_Column

*This is 1 if the table needs to use a dataflow to copy into the data warehouse instead of a straight copy in Data Factory. I’ve found that long text fields that exceed varchar(8000) need to be copied using a dataflow or custom scripting. Since I’m trying to stay low-code, I’m going with the dataflow gen2 for that. But you could add a column to pull in a script for that specific table if preferred.

 

A data pipeline manages everything. Here’s what’s in it:

  1. Lookup Activity: Looks up table in staging lakehouse with the list of tables to import
  2. ForEach Activity: Iterates over the rows in the list of tables to import
    1. Script: Grab the max date (value in Date_Column) for the table (value in Table_Name)
    2. Copy data Activity: Copies the data from the on-prem data source after the max date for the table and copies it to the staging lakehouse. I have this add “staging__” to the front of the table name.
    3. Script: Delete matching IDs (from the Unique_Identifier_Column) for the table in the Data Warehouse for Raw Data that match the staging table in the staging lakehouse.
    4. If Condition: Checks to see if the table requires a Dataflow to copy
      1.      If the table does NOT need a dataflow
        1. Script: Insert data from the staging table in the staging lakehouse into the table in the Data Warehouse for Raw Data
      2.      If the table does need a dataflow then nothing happens – that’s covered in step 3
    5. Dataflow Gen2: Append staging data from staging lakehouse to the tables in the data warehouse for raw data – this doesn’t do any processing of the data
    6. Lookup Activity: Looks up table in staging lakehouse with list of tables that need cleaned.
    7. ForEach Activity: Iterates over the rows in the list of tables to clean
      1. Script: Grab the max date (value in Date_Column) for the table (value in Table_Name)
      2. Copy data Activity: Copies the data from the table in the Data Warehouse for raw data after the max date for the table and copies it to the staging lakehouse. I have this add “cleanstaging__” to the front of the table name.
      3. Script: Delete matching IDs (from the Unique_Identifier_Column) for the table in the Data Warehouse for Cleaned Data that match the staging table in the staging lakehouse.
    8. Dataflow Gen2: Cleans and Appends data from staging lakehouse (from the version starting with “cleanstaging__” to the tables in the data warehouse for cleaned data – this dataflow does take steps to clean the tables before appending them
    9. Lookup Activity: Looks up the table in the staging lakehouse with the list of tables that need refined.
    10. ForEach Activity: Iterates over the rows in the list of tables to clean
      1. Script: Grab the max date (value in Date_Column) for the table (value in Refined_Table_Name)
      2. Copy data Activity: Copies the data from the table in the Data Warehouse for cleaned data after the max date for the table and copies it to the staging lakehouse. I have this add “refinestaging__” to the front of the table name.
      3. Script: Delete matching IDs (from the Unique_Identifier_Column) for the table in the data warehouse for refined data that match the staging table in the staging lakehouse.
    11. Dataflow Gen2: Refines and Appends data from staging lakehouse (from the version starting with “refinestaging __” to the tables in the data warehouse for refined data – this dataflow does take steps to refine the tables before appending them

 

There are many nuances so let me know if you have questions. The pattern is pretty simple and basically repeated for each step in the medallion architecture. It would be easy to simplify clean and refining the data. And most of the dataflow gen2 entries could be replaced by SQL scripts if needed/preferred.

 

Notebooks are easier than all of this. But the limitation on number of notebooks running at one time could cause conflicts with other scheduled notebooks/pipelines (with notebooks) or even with your users who are running ad hoc notebooks.

Element115
Power Participant
Power Participant

Another idea... why not do an UPDATE of the records instead of DELETE using a Notebook?

 

Also, you don't need to store the updated_at in a LH.  From within a pipeline, using a Script activity, you query the LH table and grab the latest date and compare to the latest on-prem date (or you can add an integer index column to both the on-prem and the LH table and then just grab the MAX index from the LH table and then compare that to the index value from the on-prem table).  

 

Either way, you save this value in a pipeline variable.  Then use it in a SQL query in the Copy data settings to the on-prem data that hasn't been persisted in the LH yet.

 

I am in the middle of testing this myself.  

I've not tried an update statement, I'll explore that!

Hi @GregMarbais ,

We haven’t heard from you on the last response and was just checking back to see if you have a resolution yet .
In case if you have any resolution please do share that same with the community as it can be helpful to others .
Otherwise, will respond back with the more details and we will try to help .

@v-gchenna-msft I think i have a solution but I'm testing it to make sure it does everything I need. The trick (I think) is not using a lakehouse but instead using a Datawarehouse (without a lakehouse). I'm working through it now and will post it as soon as I have a definitive answer. I'm very hopeful!

Hi @GregMarbais ,

We haven’t heard from you on the last response and was just checking back to see if you have a resolution yet .
In case if you have any resolution please do share that same with the community as it can be helpful to others .
Otherwise, will respond back with the more details and we will try to help .

Element115
Power Participant
Power Participant

Unfortunately, I think you hit the nail on the head, meaning your only option is a Notebook using PySpark (unless the Notebook route also allows T-SQL DELETE statements).  I say only option because a DFg2 doesn't allow modifying a LH, nor does a Script activity in a pipeline. Best of luck.

Helpful resources

Announcements
Europe Fabric Conference

Europe’s largest Microsoft Fabric Community Conference

Join the community in Stockholm for expert Microsoft Fabric learning including a very exciting keynote from Arun Ulag, Corporate Vice President, Azure Data.

RTI Forums Carousel3

New forum boards available in Real-Time Intelligence.

Ask questions in Eventhouse and KQL, Eventstream, and Reflex.

MayFBCUpdateCarousel

Fabric Monthly Update - May 2024

Check out the May 2024 Fabric update to learn about new features.