Metadata Driven Pipelines for Dynamic Full and Incremental Processing in Azure SQL
Published Sep 28 2023 07:35 AM 15.3K Views
Microsoft

Developing ETLs/ELTs can be a complex process when you add in business logic, large amounts of data, and the high volume of table data that needs to be moved from source to target. This is especially true in analytical workloads involving Azure SQL when there is a need to either fully reload a table or incrementally update a table. In order to handle the logic to incrementally update a table or fully reload a table in Azure SQL (or Azure Synapse), we will need to create the following assets:

  • Metadata table in Azure SQL 
    • This will contain the configurations needed to load each table end to end
  • Metadata driven pipelines
    • Parent and child pipeline templates that will orchestrate and execute the ETL/ELT end to end
  • Custom SQL logic for incremental processing
    • Dynamic SQL to perform the delete and insert based on criteria the user provides in the metadata table

*This article uses Azure SQL DB as the source and sink databases. However, Azure SQL MI, On-Prem SQL, and Synapse Dedicated Pools (along with Synapse Pipelines) will also be compatible for this solution. As a source, you can use databases like MySQL, Oracle, and others. You will just need to adjust the query syntax/connections to match the desired source.

 

Scenario

There is a need to load SQL tables from a SQL Server source on a daily frequency or multiple times a day. The requirements are to land the data first in ADLS Gen 2, and then finally load the tables into Azure SQL DB with the correct processing (incremental or full) while using a dynamic pipeline strategy to limit the number of objects used. 

Metadata Table

The first set up that is required in our dynamic ETL is going to be a metadata (sometimes called "config" table) table on the destination SQL server environment. This table contains all of the information that is needed to pass into the ADF pipelines to determine the source query, ADLS Gen 2 storage location and metadata, processing metadata, staging metadata, and other metadata critical to performing the ETL. An example of a metadata table design and sample are below. 

 

Metadata table definition

 

 

 

CREATE TABLE [meta].[ADLS_Metadata](
	[FileName] [varchar](100) NULL,
	[StorageAccount] [varchar](100) NULL,
	[StorageContainer] [varchar](100) NULL,
	[ContainerDirectoryPath] [varchar](100) NULL,
	[LoadType] [varchar](25) NULL,
	[LoadIndicator] [varchar](25) NULL,
	[SourceSchema] [varchar](25) NULL,
	[SourceTable] [varchar](100) NULL,
	[StagingSchema] [varchar](25) NULL,
	[StagingTable] [varchar](100) NULL,
	[TargetSchema] [varchar](25) NULL,
	[TargetTable] [varchar](100) NULL,
	[ColumnKey] [varchar](500) NULL,
	[WaterfallColumn] [varchar](100) NULL,
	[TableColumns] [varchar](1000) NULL
) ON [PRIMARY]
GO

 

 

 

 

Sample output of metadata table

Marc_Bushong_0-1694526868112.png

 

The ETL will be facilitated entirely from this metadata table. Any tables that are not included within this table, would not be executed in our ETL pipelines. Any new tables or work that are needed to be added, simply insert them into the metadata table and they will be available when the pipeline is triggered, without needing to alter the ADF pipelines. Whether the data is going to the same storage container or a different one, different databases, etc. the metadata table allows you dictate where, what, and how of your ETL from one central location. This is a simple metadata table example, but you can make this as robust as you desire by adding in test flags, different load frequency indicators, and many others.

 

Metadata Driven Pipelines

Now that the metadata table is constructed, time to build the dynamic ADF pipelines to orchestrate and execute the ETL.

 

Here are the ADF objects needed to execute the ETL for 'N' number of tables. These will be shown in steps below. It is important the note the power of the dynamic metadata driven pipelines, they are able to execute/facilitate an enterprise level ETL with only 3 pipelines, 2-3 linked services, and 2-3 datasets in this scenario. 

 

  • Linked Services:
    • Source SQL Server*
      • Authenticated with system-assigned managed identity.
    • Sink SQL Server*
      • Authenticated with system-assigned managed identity.
    • ADLS Gen 2 Storage
      • Authenticated with system-assigned managed identity.
    • *IF the source SQL Server and sink are the same service with the same authentication and integration runtime then you only need one linked service. Ex. both are Azure SQL DBs with the same authentication. However, if the authentication differs or they are different services (Azure SQL DB vs Azure SQL MI) then create one dynamic linked service for each.
  • Datasets:
    • Source SQL Server Dataset*
    • Sink SQL Server Dataset*
    • ADLS Gen 2 Storage Dataset
    • *One dataset per linked service. Separate datasets may not be needed if you have one dynamic linked service.
  • Pipelines:
    • Main Orchestration Pipeline
    • Full Load Processing Pipeline
    • Incremental Processing Pipeline

 

Linked Service Creation

In this scenario, the source and sink SQL environments are both Azure SQL DB with the same authentication, so there will be only one linked service created with parameters to handle the dynamic use. Feel free to use your own naming conventions for the objects and parameters, just be sure they are generic and descriptive. Ex. not using "parameter1" or "linkedService1". 

 

The generic name of the linked service will be "AzureSQLDB". The domain name and database name are referenced from the parameters that we created in the linked service to pass this connection information at runtime from the pipelines. Default values are available and will be used if there are not values passed through the pipeline.

Marc_Bushong_0-1694528416809.png

 

Marc_Bushong_1-1694528489402.png

 

The same pattern will be used for the ADLS Gen 2 linked service. In this linked service, there is a generic name used "ADLSGen2" and only the storage account parameter is used. There is no path specification used here to allow the use of the linked service for all containers and paths using the same authentication method and Integration Runtime. The path and file will be optional parameters of the dataset that references this linked service.  

Marc_Bushong_2-1694528621889.png

 

Dataset Creation

The dataset will created and using the linked services that were created above. There needs to be the parameters that are used in the linked service as well as additional parameters. The parameter names will align with the metadata table column names to provide ease of use. 

 

For the SQL environment, there needs to be the parameters "serverName" and "databaseName" which come from the linked service. Then adding the parameters "schemaName" and "tableName" to have the ability to query/use all tables in a server or database using that linked service. Create the parameters first on the "Parameters" tab and then use the 'add dynamic content' to place the reference to the parameters that were just created. These parameters will be exposed/prompted when referenced in the pipelines that are created later in this article. 

Marc_Bushong_4-1694529144169.png

 

Marc_Bushong_3-1694529127620.png

 

For ADLS Gen 2 storage, there will be a dataset for each type of file and compression. In this scenario, the data will be stored as Parquet files with snappy compression. The same concept as above is used for the parameters. For the linked service, the parameter is created on the dataset "storageAccountName". Then dataset specific parameters to identify all possible containers, paths and files within the dataset are "storageContainer", "containerDirectoryPath", and "fileName".

Marc_Bushong_6-1694529512670.png

Marc_Bushong_5-1694529293988.png

 

Pipeline and ETL Creation

With the metadata table, linked services, and datasets created, it is time to build out the metadata driven pipelines. The walkthrough below is split up into the 3 different pipelines, the main orchestration (parent) pipeline, full processing pipeline (child), and incremental processing pipeline (child). These pipelines are organized into folders for ease of access/formatting. The folders are virtual and offer no functionality other than organization in the UI. The folders are "Orchestration" which houses the main parent pipeline and "ETL" which contains the children pipelines that perform the processing. 

 

Marc_Bushong_7-1694533020004.png

 

 

Main Orchestration pipeline:

The main orchestration pipeline in this example is called "adventureWorks_Main". This pipeline will have a trigger associated with it and will control the execution of the whole ETL. This is the design of the pipeline in the UI, and each activity will be described.

Marc_Bushong_8-1694533107116.png

 

On the parent pipeline, it is critical to have pipeline parameters to allow this process to be dynamic. These pipeline parameters will be used throughout the activities, and passed to the children pipelines. They will look familiar as they will be used in the dynamic datasets, linked services, and querying the metadata table. 

  • sourceServerName
    • source server connection to passed through the parameterized dataset to the parameterized linked service
  • sourceDatabaseName
    • source database connection to passed through the parameterized dataset to the parameterized linked service
  • targetServerName
    • Target server connection to passed through the parameterized dataset to the parameterized linked service. Typically where your metadata table lives as well.
  • targetDatabaseName
    • Target database connection to passed through the parameterized dataset to the parameterized linked service. Typically where your metadata table lives as well.
  • loadIndicator
    • This is a frequency/use indicator. 'Daily' is an example that signifies the table is loaded daily. This is used as a filter. So, you can place'Test' or some other value to control which tables and different frequencies or uses to execute.
      • Ex. 'Testing Only', 'Monthly', 'Hourly'. The frequency would correspond with a trigger frequency as well.
  • waterfallLookbackDays
    • The amount of days to incrementally process. Used only to find the changed rows in incremental data sources. Requires a reliable date stamp that corresponds to tracked inserts and updates.
      • Ex. rows that have been updated within the last 120 days (-120)
    • There are many ways to incrementally process, and this is just one used in the example. This article discusses using waterfall column/columns. You would just need to adjust the parameters, syntax, and dynamic script to fit your criteria.

 

Marc_Bushong_0-1694533958416.png

 

The full reload path and the incremental reload path have the same activities and pattern, however they differ in 2 ways.

  1. Look up query. Specifically, the WHERE clause
  2. The parameters passed to the 'Execute Pipeline' activity within the 'ForEach Loop'

 

Full Reload Pattern/Queries:

Use a look up to extract the rows based on the SQL query criteria -> pass that result set to the for each loop and iterate over each table to perform the loading in the full load processing pipeline. 

 

Marc_Bushong_1-1694534507622.png

 

Lookup Activity - Full Reload:

Inside the look up activity - "Full Reload - Lookup Metadata".

Marc_Bushong_2-1694534575495.png

The parameters defined on the dataset appear as properties within the activity using the dataset. The lookup activity will query the metadata table, which lives in the target server. This uses the pipeline parameters. The "schemaName" and "tableName" parameters are not needed since the lookup activity is performing a query, so placing a "x" value allows the pipeline to validate even though these are not used. 

 

Dynamic content for "serverName" parameter

 

 

 

@pipeline().parameters.targetServerName

 

 

 

 

Dynamic content for the query. The pipeline parameter for "loadIndicator" is used in the SQL query as well as a hard coded filter for "Full" load types.

 

 

 

 

SELECT LoadType, TargetTable 
FROM 
	meta.ADLS_Metadata
WHERE 
	LoadIndicator = '@{pipeline().parameters.loadIndicator}'
	AND LoadType = 'Full'

 

 

 

 

Sample query output:

Marc_Bushong_0-1694540828098.png

 

For Each Loop - Full Reload:

The next step is to iterate through the output in the for each loop. Use the settings tab to define the items from the full reload lookup.

Marc_Bushong_1-1694540968692.png

 

 

 

 

@activity('Full Reload - Lookup Metadata').output.value

 

 

 

Inside the for each loop, an execute pipeline activity will be used to call the full load processing pipeline (child). There are parameters on the child pipeline that are required to enter when executing. The parameters are going to be the pipeline parameters (from the parent pipeline) EXCEPT for a new parameter called 'targetTable'. The 'targetTable' comes from the item value that we are iterating from the output of look up activity. 

Marc_Bushong_2-1694541123913.png

 

Full Reload - Processing Pipeline (child)

The processing pipeline called "adventureWorks_FullLoad" is executed from the parent pipeline with the pipeline parameters being passed from parent to child. Because this pipeline is called within a for each loop, each table that is being iterated will be have their own execution from this pipeline. The overall purpose and design of this pipeline is:

  1. Extract source data
  2. Load source data to ADLS Gen 2 storage
  3. Full reload of data from ADLS Gen 2 to Azure SQL DB

Marc_Bushong_0-1694543592322.png

 

Lookup Activity:

This will be the same dataset and configuration for the use of pipeline parameters as the parent pipeline lookup activity with the only difference being the query that is being passed through. 

Marc_Bushong_1-1694543732906.png

 

Query used in dynamic content. This will return all the columns associated with the row, there should only be one row returned, if there are multiple due to the addition of different testing scenarios/frequencies, then refine the filtering logic for the appropriate context.

 

 

 

SELECT * 
FROM 
	meta.ADLS_Metadata
WHERE 
	TargetTable = '@{pipeline().parameters.targetTable}'
	AND LoadIndicator = '@{pipeline().parameters.loadIndicator}'

 

 

 

 

Copy data to ADLS Gen 2:

The metadata gathered from the look up activity is used to extract the source table data and load to the location in ADLS Gen 2 specified by the metadata.

 

In the 'Source' - The pipeline parameters for the source server and source database are used for "serverName" and "databaseName". Then the output from the lookup provides the "schemaName" and "tableName". This time, a table is used instead of a query. 

Marc_Bushong_2-1694543961746.png

 

Example of activity output use in a parameter. 

 

 

 

 

@activity('Full Load - Lookup Metadata').output.firstRow.SourceSchema

 

 

 

 

In the 'Sink' - all the parameters are populated from the lookup activity reading from the metadata table. This creates a path for each file. Each file will be overwritten with every execution. If you wish to retain historical copies of the loads, you can add an archive step to move the files from this location and/or add customer logic for date suffixes in the expression builder. 

Marc_Bushong_3-1694544255454.png

 

Example of storage container with loaded files

Marc_Bushong_4-1694544312246.png

 

Copy Data to Azure SQL DB:

Once the data has been landed into ADLS Gen 2 as parquet files, it is time to load the files into the Azure SQL DB using another Copy activity.

 

In the 'Source' - this will be the same configuration as the 'Sink' of the previous copy activity. Using the output from the lookup activity which contains the metadata table result.

Marc_Bushong_5-1694544477766.png

 

In the 'Sink' - The dataset parameters will be populated with the pipeline parameters for the "targetServerName" and "targetDatabaseName". The "schemaName" and "tableName" are populated from the lookup activity output.

 

There is a 'Pre-copy script' that is being executed to truncate the table if it exists already. If the table does not exist, the script will not try to truncate the table, and the 'Table option' of 'Auto create table' being selected will handle any new tables. The write behavior will be 'Insert' since there is a full reload.

 

If schema drift is present, one solution is to replace the truncate with a drop instead to recreate the table each execution. That has other risks associated with it that need to be considered.

Marc_Bushong_6-1694544553543.png

 

Pre-copy script using pipeline parameters and lookup activity output parameters

 

 

 

IF EXISTS (SELECT * FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = '@{activity('Full Load - Lookup Metadata').output.firstRow.TargetSchema}' AND TABLE_NAME =  '@{activity('Full Load - Lookup Metadata').output.firstRow.TargetTable}' )
BEGIN 
TRUNCATE TABLE [@{activity('Full Load - Lookup Metadata').output.firstRow.TargetSchema}].[@{activity('Full Load - Lookup Metadata').output.firstRow.TargetTable}] 
END

 

 

 

Incremental Load Pattern/Queries:

The incremental processing load is going to be very similar the full reload processing method, with differences being in the filtering of lookup activities, additional parameters, and different methods to perform the loading inside the processing pipeline.

Marc_Bushong_0-1694545211875.png

 

Lookup Activity - Incremental Load

This lookup activity will be the exact same dataset configuration as the full reload version, with the minor change of 'Incremental' being hard coded as the WHERE clause 'LoadType' filter instead of 'Full'. The parameters are still using the pipeline parameters to connect to the metadata table.

Marc_Bushong_1-1694545350243.png

 

 

 

 

SELECT LoadType, TargetTable 
FROM 
	meta.ADLS_Metadata
WHERE 
	LoadIndicator = '@{pipeline().parameters.loadIndicator}'
	AND LoadType = 'Incremental'

 

 

 

For Each Loop - Incremental Load

The for each loop will use the output from the incremental reload lookup activity output in the 'Items'. 

Marc_Bushong_2-1694545463770.png

 

 

 

 

@activity('Incremental Reload - Lookup Metadata').output.value

 

 

 

Inside the for each loop, there is an execute pipeline activity which calls the incremental reload processing pipeline (child). There is one extra parameter that was not used in the full reload processing pipeline execution - 'waterfallLookbackDays' (pipeline parameter).

Marc_Bushong_3-1694545668177.png

 

Incremental Reload - Processing Pipeline (child)

The processing pipeline called "adventureWorks_IncrementalLoad" is executed from the parent pipeline with the pipeline parameters being passed from parent to child. Because this pipeline is called within a for each loop, each table that is being iterated will be have their own execution from this pipeline. The overall purpose and design of this pipeline is (differs slightly from the full reload):

  1. Extract source data
  2. Load source data to ADLS Gen 2 storage
  3. Full reload of data from ADLS Gen 2 to a staging table in Azure SQL DB
  4. Dynamic delete and insert from staging table into production table within Azure SQL DB

Marc_Bushong_4-1694545815060.png

 

Lookup Activity:

This will be the same as the full reload version of the query and the dataset configuration. 

Marc_Bushong_0-1694547065974.png

 

 

 

SELECT * 
FROM 
	meta.ADLS_Metadata
WHERE 
	TargetTable = '@{pipeline().parameters.targetTable}'
	AND LoadIndicator = '@{pipeline().parameters.loadIndicator}'

 

 

 

Copy Data to ADLS Gen 2:

This activity is where the main difference between the full reload and the incremental loads start.

 

In the 'Source' - instead of the table, there will a query used. This query will utilize the 'waterfallColumn' value from the metadata table lookup and the 'waterfallLookbackDays' pipeline parameter to filter the results to only that time period. This will allow the query to build for each table dynamically and return a subset of the source table regardless of where the source is, the table, etc. 

 

Marc_Bushong_1-1694547423241.png

Query using dynamic content

 

 

@concat(
    'SELECT * 
     FROM ', '[', activity('Incremental Load - Lookup Metadata').output.firstRow.SourceSchema, '].[', activity('Incremental Load - Lookup Metadata').output.firstRow.SourceTable, '] ',
     'WHERE ', 'CONVERT(DATE, ', activity('Incremental Load - Lookup Metadata').output.firstRow.WaterfallColumn, ') >= DATEADD(DAY,', pipeline().parameters.waterfallLookbackDays, ', GETDATE())'  )

 

 

 

If the source is different than SQL, you can adjust the dynamic query to match the syntax for the source environment like Oracle, MySQL, etc. 

 

In the 'Sink' - the configuration will the same as the full reload. The dataset parameters will come from the lookup activity output on the same dataset. 

Marc_Bushong_2-1694547529098.png

 

 

 

@activity('Incremental Load - Lookup Metadata').output.firstRow.StorageAccount

 

 

 

Copy Data from ADLS Gen 2 to Stage Table in Azure SQL DB:

The next step is to load the data from ADLS Gen 2 into a staging table to prep to perform the incremental processing. The staging table allows for temporary data to be stored and leverage the full compute power of the Azure SQL DB, as well as maintaining more control over the processing.

 

In both 'Source' and 'Sink' - The configurations that are used will be the same as the full reload version, with the only differences being the parameters are pointing to the staging table referenced in the metadata table instead of the final version of the table. This table is in a different schema and has a 'STAGE_' prefix on the table name. In the 'Sink', the process of truncating the table pre copy, full loading, and/or auto creating tables that don't exist is the same. 

Marc_Bushong_0-1694548141988.png

 

 

 

 

@activity('Incremental Load - Lookup Metadata').output.firstRow.FileName

 

 

 

Marc_Bushong_0-1695748896710.png

 

Pipeline parameter example for 'tableName' pointing to the 'StagingTable'

 

 

 

@activity('Incremental Load - Lookup Metadata').output.firstRow.StagingTable

 

 

 

Pre-copy script - referencing the staging tables

 

 

IF EXISTS (SELECT * FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = '@{activity('Incremental Load - Lookup Metadata').output.firstRow.StagingSchema}' AND TABLE_NAME =  '@{activity('Incremental Load - Lookup Metadata').output.firstRow.StagingTable}' )
BEGIN
TRUNCATE TABLE [@{activity('Incremental Load - Lookup Metadata').output.firstRow.StagingSchema}].[@{activity('Incremental Load - Lookup Metadata').output.firstRow.StagingTable}]
END

 

 

 

Dynamic Delete and Insert Script

This step is what actually performs the incremental processing of the tables. It will delete the rows that are in the staging table (simulating that data has changed and needs to be either updated or inserted) and then insert the staging table rows into the production table. The script will be provided and explained below. 

 

This is a 'Script' activity with the linked service parameters pointing to the location of the target tables and metadata table. There is also an input script parameter 'FileName' that uses the 'FileName' value from the lookup activity. The option for 'NonQuery' is selected since this script performs a DDL statement and does not return a result. If a result was being returned instead of DDL statements, then 'Query' would be selected. 

Marc_Bushong_0-1694549282692.png

FileName parameter dynamic content

 

 

 

@activity('Incremental Load - Lookup Metadata').output.firstRow.FileName

 

 

 

Dynamic Delete and Insert SQL Script

 

 

--DECLARE @FileName VARCHAR (500) -- Manual Runs inside procedure will toggle this for troublshooting
DECLARE @TargetTable VARCHAR (500)
DECLARE @StagingTable VARCHAR (500)
DECLARE @WhereClause VARCHAR(MAX) 
DECLARE @StagingSchema VARCHAR (50)
DECLARE @TargetSchema VARCHAR (50)
DECLARE @FullStagingTableName VARCHAR (500)
DECLARE @FullTargetTableName VARCHAR (500)
DECLARE @TargetTableColumnList NVARCHAR(MAX)
DECLARE @DeleteStatementSQL NVARCHAR (MAX)
DECLARE @InsertStatementSQL NVARCHAR (MAX)
DECLARE @StatisticsUpdateSQL NVARCHAR (MAX)


--SET @FileName = 'SalesOrderHeader.parquet' -- Manual Runs inside procedure will toggle this for troublshooting
SET @TargetTable = (SELECT TargetTable FROM meta.ADLS_Metadata WHERE FileName = @FileName)
SET @TargetSchema = (SELECT TargetSchema FROM meta.ADLS_Metadata WHERE FileName = @FileName)
SET @StagingTable = (SELECT StagingTable FROM meta.ADLS_Metadata WHERE FileName = @FileName)
SET @StagingSchema = (SELECT StagingSchema FROM meta.ADLS_Metadata WHERE FileName = @FileName)
SET @FullStagingTableName = CONCAT(@StagingSchema, '.', @StagingTable)
SET @FullTargetTableName = CONCAT(@TargetSchema, '.', @TargetTable)
SET @TargetTableColumnList = (	SELECT 
									ColumnList = STRING_AGG('[' + col.NAME + ']', ',' )
								FROM
									sys.tables tab
										LEFT JOIN 
									sys.schemas sch
										ON tab.schema_id = sch.schema_id
										LEFT JOIN 
									sys.columns col
										ON tab.object_id = col.object_id
								WHERE 
									sch.name = @TargetSchema
									AND tab.name = @TargetTable
									AND col.is_identity = 0
							)
 ;

WITH PrimaryKeyList AS (
						SELECT 
							ColumnKey = RTRIM(LTRIM(Value)),
							RowNumber = ROW_NUMBER () OVER (ORDER BY value ASC)

						FROM
							meta.ADLS_Metadata
								CROSS APPLY 
							STRING_SPLIT( ColumnKey, ',')
						WHERE 
							FileName = @FileName
						)
 
 /******* Section for single primary key OR Keys that do not need to be concated to be uniquely identified *********************/
        SELECT
            @WhereClause =   STRING_AGG(CASE 
                                            WHEN E.ColumnKey IS NOT NULL THEN CONCAT( Beg.ColumnKey,' IN (SELECT ', Beg.ColumnKey, ' FROM ', @FullStagingTableName, ') AND')
                                            ELSE CONCAT( Beg.ColumnKey,' IN (SELECT ', Beg.ColumnKey, ' FROM ', @FullStagingTableName, ')' )
                                        END, ' ')
        FROM 
            PrimaryKeyList Beg
                LEFT JOIN
            PrimaryKeyList E
                ON Beg.Rownumber = E.Rownumber - 1 
                ;
/***************************************************************************************************************************************/

/************************* Section used to concat a composite key and create the unique identifier during the load process if it does not exist in the source tables *******************
SELECT
    @WhereClause = CONCAT(	'CONCAT(', 
							STRING_AGG(CASE 
											WHEN E.ColumnKey IS NOT NULL THEN  Beg.ColumnKey
											ELSE CONCAT(Beg.ColumnKey, ') ')
										END, ', '
										),
							'IN (SELECT CONCAT(', 
							STRING_AGG(CASE 
											WHEN E.ColumnKey IS NOT NULL THEN  Beg.ColumnKey
											ELSE CONCAT(Beg.ColumnKey, ') ')
										END, ', '
										),
							'FROM ', @FullStagingTableName, ')'
						)
FROM 
    PrimaryKeyList Beg
        LEFT JOIN
    PrimaryKeyList E
        ON Beg.Rownumber = E.Rownumber - 1 
        ;
 
 *********************************************************************************************************************************************************/
 
SELECT
    @DeleteStatementSQL = CONCAT('DELETE FROM ', @FullTargetTableName, ' WHERE ', @WhereClause) ;
 
SELECT 
    @InsertStatementSQL = CONCAT('INSERT INTO ', @FullTargetTableName, ' (', @TargetTableColumnList, ') ', ' SELECT ', @TargetTableColumnList, ' FROM ', @FullStagingTableName)
 
--SELECT 
--	@StatisticsUpdateSQL = CONCAT('UPDATE STATISTICS ', @FullTargetTableName) 

--PRINT @DeleteStatementSQL
--PRINT @InsertStatementSQL
--PRINT @StatisticsUpdateSQL
 
EXECUTE sp_executesql @DeleteStatementSQL ; 

EXECUTE sp_executesql @InsertStatementSQL ;

--EXECUTE sp_executesql @StatisticsUpdateSQL ; Used in Dedicated SQL Pool to update statistics once tables have been loaded

 

 

 

See examples of the different steps of the script below for the table 'SalesOrderHeader'

 

Metadata table results for the staging table, target table, and the primary keys for the target table 'SalesOrderHeader'. You will notice that this table has multiple primary keys to provide the unique record for the data. This script will handle multiple primary keys or single primary keys in a method shown later. 

Marc_Bushong_0-1694549779525.png

 

First, the variables are built. One important variable is @TargetTableColumnList which compiles a comma separated list of the target table columns from the system tables. You will not need to maintain the columns in the target table since the script will compile a list from the system tables and exclude identity columns since these are not updated or inserted. If that is needed, then logic can be added to turn the identity insert on and off in the script. 

Marc_Bushong_1-1694550060577.png

 

The next step is to build the WHERE clause of our delete statement. This is done by using the column keys and splitting them out into different predicates. Executing the code down to the @WhereClause creation will produce this output.  

 

 

 

@WhereClause = rowguid IN (SELECT rowguid FROM stage.STAGE_SalesOrderHeader) AND SalesOrderNumber IN (SELECT SalesOrderNumber FROM stage.STAGE_SalesOrderHeader) AND SalesOrderID IN (SELECT SalesOrderID FROM stage.STAGE_SalesOrderHeader)

 

 

 

There is a section commented out for handling composite keys that will not evaluate with each column key in their own predicate. In this scenario, the values are concatenated. The commented section for composite keys will show the below result for the same table/key combination. 

 

 

 

@WhereClause = CONCAT(rowguid, SalesOrderNumber, SalesOrderID) IN (SELECT CONCAT(rowguid, SalesOrderNumber, SalesOrderID) FROM stage.STAGE_SalesOrderHeader)

 

 

 

Next, the delete and insert statements are created using the dynamic SQL in the script and previous steps. Here are the outputs from our example. 

 

Delete statement

 

 

DELETE FROM salesLT.SalesOrderHeader WHERE rowguid IN (SELECT rowguid FROM stage.STAGE_SalesOrderHeader) AND SalesOrderNumber IN (SELECT SalesOrderNumber FROM stage.STAGE_SalesOrderHeader) AND SalesOrderID IN (SELECT SalesOrderID FROM stage.STAGE_SalesOrderHeader)

 

 

Insert statement

 

 

INSERT INTO salesLT.SalesOrderHeader ([SalesOrderID],[RevisionNumber],[OrderDate],[DueDate],[ShipDate],[Status],[OnlineOrderFlag],[SalesOrderNumber],[PurchaseOrderNumber],[AccountNumber],[CustomerID],[ShipToAddressID],[BillToAddressID],[ShipMethod],[CreditCardApprovalCode],[SubTotal],[TaxAmt],[Freight],[TotalDue],[Comment],[rowguid],[ModifiedDate])  SELECT [SalesOrderID],[RevisionNumber],[OrderDate],[DueDate],[ShipDate],[Status],[OnlineOrderFlag],[SalesOrderNumber],[PurchaseOrderNumber],[AccountNumber],[CustomerID],[ShipToAddressID],[BillToAddressID],[ShipMethod],[CreditCardApprovalCode],[SubTotal],[TaxAmt],[Freight],[TotalDue],[Comment],[rowguid],[ModifiedDate] FROM stage.STAGE_SalesOrderHeader

 

 

 

Finally, those statements are passed into sp_executesql to be executed. 

Summary

The template and scripts will allow you to build dynamic metadata driven ETL process at enterprise scale with as little as 3 pipelines to facilitate 'N' number of tables. This metadata driven approach is highly flexible and scalable, which will allow you to build upon this solution and even cater it to your exact needs. Even if the requirements or change tracking logic is more complex than waterfall columns or composite keys, there is still an ability to add complex logic into this process to handle your ETL needs.

1 Comment
Co-Authors
Version history
Last update:
‎Sep 28 2023 07:35 AM
Updated by: