{ "name": "Incremental", "properties": { "activities": [ { "name": "Get last Commit Version", "type": "Lookup", "dependsOn": [], "policy": { "timeout": "7.00:00:00", "retry": 0, "retryIntervalInSeconds": 30, "secureOutput": false, "secureInput": false }, "userProperties": [], "typeProperties": { "source": { "type": "SqlPoolSource", "sqlReaderQuery": { "value": "SELECT * FROM dbo.CDCTableStatus WHERE Enabled = 1 AND TableName = '@{pipeline().parameters.DestinationDBOTable}' AND LastCommitVersion is not null", "type": "Expression" }, "queryTimeout": "02:00:00" }, "dataset": { "referenceName": "SQLPoolQuery", "type": "DatasetReference" }, "firstRowOnly": true } }, { "name": "GenerateCDCData", "type": "DatabricksNotebook", "dependsOn": [ { "activity": "Get last Commit Version", "dependencyConditions": [ "Succeeded" ] } ], "policy": { "timeout": "7.00:00:00", "retry": 0, "retryIntervalInSeconds": 30, "secureOutput": false, "secureInput": false }, "userProperties": [], "typeProperties": { "notebookPath": "/Shared/SynapseCDC/GenerateCDCData", "baseParameters": { "DeltaTablePath": { "value": "@activity('Get last Commit Version').output.firstRow.Path", "type": "Expression" }, "DeltaCommitVersion": { "value": "@string(activity('Get last Commit Version').output.firstRow.LastCommitVersion)", "type": "Expression" }, "ADLSAccountName": "gmadlshared", "ADLSContainerName": { "value": "@activity('Get last Commit Version').output.firstRow.ContainerName", "type": "Expression" }, "SQLServer": "gmsyntest01.sql.azuresynapse.net", "SQLDatabase": "TestDedicatedPool", "SQLCDCTableName": { "value": "@{pipeline().parameters.SourceCDCTable}", "type": "Expression" }, "WriteCDCData": "true", "SQLCDCSchemaName": { "value": "@{pipeline().parameters.SourceCDCSchema}", "type": "Expression" } } }, "linkedServiceName": { "referenceName": "AzureADFTest", "type": "LinkedServiceReference" } }, { "name": "Update Table Status Version", "type": "Lookup", "dependsOn": [ { "activity": "Do Merge", "dependencyConditions": [ "Succeeded" ] } ], "policy": { "timeout": "7.00:00:00", "retry": 0, "retryIntervalInSeconds": 30, "secureOutput": false, "secureInput": false }, "userProperties": [], "typeProperties": { "source": { "type": "SqlPoolSource", "sqlReaderQuery": { "value": "UPDATE CDCTableStatus\nSET LastCommitVersion = @{int(activity('GenerateCDCData').output.runOutput)}\nWHERE TableName = '@{pipeline().parameters.DestinationDBOTable}';\n\nSELECT 0 as ReturnSTatus;", "type": "Expression" }, "queryTimeout": "02:00:00" }, "dataset": { "referenceName": "SQLPoolQuery", "type": "DatasetReference" } } }, { "name": "Do Merge", "type": "Script", "dependsOn": [ { "activity": "GenerateCDCData", "dependencyConditions": [ "Succeeded" ] } ], "policy": { "timeout": "7.00:00:00", "retry": 0, "retryIntervalInSeconds": 30, "secureOutput": false, "secureInput": false }, "userProperties": [], "linkedServiceName": { "referenceName": "gmsyntest01-WorkspaceDefaultSqlServer", "type": "LinkedServiceReference", "parameters": { "DBName": "TestDedicatedPool" } }, "typeProperties": { "scripts": [ { "type": "NonQuery", "text": { "value": "DECLARE @TargetTableName nvarchar(512) = '@{pipeline().parameters.DestinationDBOTable}'\nDECLARE @TargetSchema nvarchar(512) = '@{pipeline().parameters.DestinationDBOSchema}'\nDECLARE @CDCTableName nvarchar(512) = '@{pipeline().parameters.DestinationDBOTable}'\nDECLARE @CDCSchema nvarchar(512) = '@{pipeline().parameters.SourceCDCSchema}'\n\ndeclare @UpdateColumns VARCHAR(MAX);\ndeclare @Columns VARCHAR(MAX);\nSelect @UpdateColumns = string_agg(CAST('T.' + x.COLUMN_NAME + ' = S.' + x.COLUMN_NAME AS VARCHAR(MAX)),', '),\n @Columns = string_agg(CAST(x.COLUMN_NAME as VARCHAR(MAX)), ', ')\n from INFORMATION_SCHEMA.COLUMNS x \nWHERE TABLE_NAME = @TargetTableName\nand TABLE_SCHEMA = @TargetSchema\n\nselect @UpdateColumns, @Columns\n\nDeclare @MergeStatement nvarchar(max);\n\nset @MergeStatement \n= ' MERGE ' +@TargetSchema +'.' + @TargetTableName + ' T USING '+ @CDCSchema +'.'+ @CDCTableName + ' S' + \n' ON T.@{activity('Get last Commit Version').output.firstRow.UniqueColumn} = S.@{activity('Get last Commit Version').output.firstRow.UniqueColumn}' +\n' WHEN MATCHED and S._change_type = ''update_postimage''' +\n' THEN UPDATE SET ' +\n @UpdateColumns +\n' WHEN NOT MATCHED BY TARGET THEN INSERT (' + \n@Columns +\n')\tValues (' +\n@Columns + \n')' +\n' WHEN MATCHED and S._change_type like ''%delete%''' +\n' THEN DELETE;';\n--' THEN UPDATE SET T.DeletedDateTime = GETDATE();'; \n\nExecute sp_executesql @MergeStatement;\n ", "type": "Expression" } } ] } } ], "parameters": { "SourceCDCSchema": { "type": "string", "defaultValue": "cgf" }, "SourceCDCTable": { "type": "string", "defaultValue": "Populations" }, "DestinationDBOSchema": { "type": "string", "defaultValue": "dbo" }, "DestinationDBOTable": { "type": "string", "defaultValue": "Populations" } }, "folder": { "name": "DeltaCDC" }, "annotations": [] } }