MGDC for SharePoint FAQ: How do I process Deltas?
Published Mar 07 2024 12:56 PM 1,127 Views
Microsoft

This is a follow up on the blog about delta datasets. If you haven’t read it yet, take a look at MGDC for SharePoint FAQ: How can I use Delta State Datasets?

 

Our team got some follow-up questions on this, so I thought it would make sense to write a little more and make things clear. 

 

First of all, from some conversations with CoPilot, the basic SQL code for merging a delta would be something like this:

 

 

 

-- Start a transaction
BEGIN TRANSACTION;

-- Assuming the Users table has a primary key constraint on user_id
-- and the UserChanges table has a foreign key constraint on user_id referencing Users
-- First, delete the users that have operation = 'Deleted' in UserChanges

DELETE FROM Users
WHERE user_id IN
    (SELECT user_id 
     FROM UserChanges 
     WHERE operation = 'Deleted');

-- Next, update the users that have operation = 'Updated' in UserChanges

UPDATE Users
SET user_name = UC.user_name,
    user_age = UC.user_age
FROM Users U
JOIN UserChanges UC ON U.user_id = UC.user_id
WHERE UC.operation = 'Updated';

-- Finally, insert the users that have operation = 'Created' in UserChanges

INSERT INTO Users (user_id, user_name, user_age)
SELECT user_id, user_name, user_age 
    FROM UserChanges 
    WHERE operation = 'Created';

-- Commit the transaction
COMMIT TRANSACTION;

 

 

 

Note that the column names used (shown here as user_id, user_name and user_age) need to be updated for each dataset, but the structure will be the same.

 

I also asked CoPilot to translate this SQL code to PySpark and it suggested the code below (with a few minor manual touches):

 

 

 

# Import SparkSession and functions

from pyspark.sql import SparkSession
from pyspark.sql import functions as F

# Create SparkSession

spark = SparkSession.builder.appName("Delta dataset").getOrCreate()

# Assuming the Users and UserChanges tables are already loaded as DataFrames

users = spark.table("Users")
user_changes = spark.table("UserChanges")

# First, delete the users that have operation = 'Deleted' in UserChanges

users = users.join(user_changes.filter(user_changes.operation == "Deleted"), "user_id", "left_anti")

# Next, update the users that have operation = 'Updated' in UserChanges

users = users.join(user_changes.filter(user_changes.operation == "Updated"), "user_id", "left_outer") \
             .select(F.coalesce(user_changes.user_name, users.user_name).alias("user_name"),
                     F.coalesce(user_changes.user_age, users.user_age).alias("user_age"),
                     users.user_id)

# Finally, insert the users that have operation = 'Created' in UserChanges

users = users.union(user_changes.filter(user_changes.operation == "Created")
                    .select("user_name", "user_age", "user_id"))

 

 

 

After that, there’s the question of how to run this in Azure Data Factory or Azure Synapse.

I would suggest going with Azure Synapse. You could get some inspiration from the template that we published https://go.microsoft.com/fwlink/?linkid=2207816. This includes examples of how to get the data and run a notebook to produce a new dataset.

 

Another good resource is this guide on “How to transform data by running a Synapse Notebook”. The link is at https://learn.microsoft.com/en-us/azure/data-factory/transform-data-synapse-notebook.

 

The more notable part missing from the code above is how to read the data from ADLS v2. For that, here is a link to stack overflow article on how to bring the data in and out of ADLS v2 using Linked Services. There is an article specifically on that at https://learn.microsoft.com/en-us/azure/synapse-analytics/spark/tutorial-spark-pool-filesystem-spec.

 

That's it! For more general information MGDC for SharePoint, visit the main blog at Links about SharePoint on MGDC.

Co-Authors
Version history
Last update:
‎Mar 07 2024 03:54 PM
Updated by: