{ "name": "FullLoad", "properties": { "activities": [ { "name": "Copy Data From Delta", "type": "Copy", "dependsOn": [ { "activity": "Get Latest Version", "dependencyConditions": [ "Succeeded" ] } ], "policy": { "timeout": "2:00:00", "retry": 1, "retryIntervalInSeconds": 30, "secureOutput": false, "secureInput": false }, "userProperties": [], "typeProperties": { "source": { "type": "AzureSqlSource", "sqlReaderQuery": { "value": "Select @{activity('Get Table Metadata').output.firstRow.COLUMN_NAME} from @{pipeline().parameters.SourceDBOSchema}.@{activity('Get Table Metadata').output.firstRow.TABLE_NAME}", "type": "Expression" }, "queryTimeout": "02:00:00", "partitionOption": "None" }, "sink": { "type": "SqlPoolSink", "preCopyScript": { "value": "IF OBJECT_ID(N'[@{pipeline().parameters.DestinationDBOSchema}].[@{pipeline().parameters.DestinationDBOTable}]','U') IS NOT NULL\nBEGIN\nDROP TABLE [@{pipeline().parameters.DestinationDBOSchema}].[@{pipeline().parameters.DestinationDBOTable}]\nEND\n\n@{activity('Get Table Metadata').output.firstRow.SQL_CREATE}", "type": "Expression" }, "allowCopyCommand": true }, "enableStaging": true, "stagingSettings": { "linkedServiceName": { "referenceName": "GMDataLake", "type": "LinkedServiceReference" }, "path": "staging" } }, "inputs": [ { "referenceName": "ServerlessQuery", "type": "DatasetReference" } ], "outputs": [ { "referenceName": "SQLPoolTable", "type": "DatasetReference", "parameters": { "Schema": { "value": "@pipeline().parameters.DestinationDBOSchema", "type": "Expression" }, "Table": { "value": "@pipeline().parameters.DestinationDBOTable", "type": "Expression" } } } ] }, { "name": "Get Table Metadata", "type": "Lookup", "dependsOn": [ { "activity": "Get Control Table Information", "dependencyConditions": [ "Succeeded" ] } ], "policy": { "timeout": "7.00:00:00", "retry": 0, "retryIntervalInSeconds": 30, "secureOutput": false, "secureInput": false }, "userProperties": [], "typeProperties": { "source": { "type": "AzureSqlSource", "sqlReaderQuery": { "value": "SELECT TABLE_NAME\n ,STUFF((SELECT ', ' + CAST(COLUMN_NAME AS VARCHAR(100)) [text()]\n FROM INFORMATION_SCHEMA.COLUMNS \n WHERE TABLE_NAME = t.TABLE_NAME and TABLE_SCHEMA = t.TABLE_SCHEMA\n\t\t and COLUMN_NAME not in ('$FileName','_SysRowId', 'LSN', 'LastProcessedChange_DateTime', 'DataLakeModified_DateTime')\n\t\t order by Ordinal_position Asc\n FOR XML PATH(''), TYPE)\n .value('.','NVARCHAR(MAX)'),1,2,' ') COLUMN_NAME,\n\n\t'CREATE TABLE ' + TABLE_NAME + ' ( ' + STUFF(( \n SELECT CHAR(13) + ', [' + c.COLUMN_NAME + '] ' + c.DATA_TYPE + ' ' +\n CASE \n WHEN c.DATA_TYPE IN ('varchar', 'char', 'varbinary', 'binary') \n THEN '(' + CASE WHEN c.CHARACTER_MAXIMUM_LENGTH = -1 OR c.CHARACTER_MAXIMUM_LENGTH is null\n THEN 'MAX' \n ELSE CAST(c.CHARACTER_MAXIMUM_LENGTH AS VARCHAR(5)) \n END + ')' \n WHEN c.DATA_TYPE IN ('nvarchar', 'nchar') \n THEN '(' + CASE WHEN c.CHARACTER_MAXIMUM_LENGTH = -1 OR c.CHARACTER_MAXIMUM_LENGTH is null\n THEN 'MAX' \n ELSE CAST(c.CHARACTER_MAXIMUM_LENGTH AS VARCHAR(5)) \n END + ')' \n WHEN c.DATA_TYPE IN ('datetime2', 'time2', 'datetimeoffset') \n THEN '(' + CAST(c.DATETIME_PRECISION AS VARCHAR(5)) + ')' \n WHEN c.DATA_TYPE in ('decimal' , 'numeric')\n THEN '(' + CAST(c.NUMERIC_PRECISION AS VARCHAR(5)) + ',' + CAST(c.NUMERIC_SCALE AS VARCHAR(5)) + ')' \n ELSE '' \n END + \n --CASE WHEN c.collation_name IS NOT NULL AND c.system_type_id = c.user_type_id \n -- THEN ' COLLATE ' + c.collation_name \n -- ELSE '' \n --END + \n CASE WHEN c.IS_NULLABLE = 'YES' \n THEN ' NULL' \n ELSE ' NOT NULL' \n END \n --CASE WHEN c.default_object_id != 0 \n -- THEN ' CONSTRAINT [' + OBJECT_NAME(c.default_object_id) + ']' + \n -- ' DEFAULT ' + OBJECT_DEFINITION(c.default_object_id) \n -- ELSE '' \n --END + \n ---CASE WHEN cc.[object_id] IS NOT NULL \n -- THEN ' CONSTRAINT [' + cc.name + '] CHECK ' + cc.[definition] \n -- ELSE '' \n --END + \n --CASE WHEN c.is_identity = 1 \n -- THEN ' IDENTITY(' + CAST(IDENTITYPROPERTY(c.[object_id], 'SeedValue') AS VARCHAR(5)) + ',' + \n -- CAST(IDENTITYPROPERTY(c.[object_id], 'IncrementValue') AS VARCHAR(5)) + ')' \n -- ELSE '' \n --END \n FROM INFORMATION_SCHEMA.COLUMNS c\n\tWHERE TABLE_NAME = t.TABLE_NAME and TABLE_SCHEMA = t.TABLE_SCHEMA\n\tand COLUMN_NAME not in ('$FileName','_SysRowId', 'LSN', 'LastProcessedChange_DateTime', 'DataLakeModified_DateTime')\n\torder by Ordinal_position Asc\n\tFOR XML PATH(''), TYPE)\n\t.value('.','NVARCHAR(MAX)'),1,2,' ') + '\n)\n WITH \n ( \n DISTRIBUTION = HASH (@{activity('Get Control Table Information').output.firstRow.HashColumn}), \n CLUSTERED COLUMNSTORE INDEX \n ); ' SQL_CREATE\n\t\t\nFROM INFORMATION_SCHEMA.COLUMNS t\nWHERE TABLE_NAME = '@{pipeline().parameters.SourceDBOTable}'\nand TABLE_SCHEMA = '@{pipeline().parameters.SourceDBOSchema}'\nGROUP BY TABLE_NAME, TABLE_SCHEMA", "type": "Expression" }, "queryTimeout": "02:00:00", "partitionOption": "None" }, "dataset": { "referenceName": "ServerlessQuery", "type": "DatasetReference" }, "firstRowOnly": true } }, { "name": "Reset Table Status", "type": "Lookup", "dependsOn": [ { "activity": "Copy Data From Delta", "dependencyConditions": [ "Succeeded" ] } ], "policy": { "timeout": "7.00:00:00", "retry": 0, "retryIntervalInSeconds": 30, "secureOutput": false, "secureInput": false }, "userProperties": [], "typeProperties": { "source": { "type": "SqlPoolSource", "sqlReaderQuery": { "value": "DECLARE @Date DATETIME;\nSET @Date = GETDATE();\n\nUPDATE @{pipeline().parameters.DestinationDBOSchema}.CDCTableStatus \nSET LastCommitVersion = @{activity('Get Latest Version').output.runOutput}, LastUpdatedAt = @Date\nWHERE TableName = '@{pipeline().parameters.DestinationDBOTable}';\n\nSELECT GetDate() as RunDate;", "type": "Expression" }, "queryTimeout": "02:00:00" }, "dataset": { "referenceName": "SQLPoolQuery", "type": "DatasetReference" }, "firstRowOnly": false } }, { "name": "Get Control Table Information", "type": "Lookup", "dependsOn": [], "policy": { "timeout": "7.00:00:00", "retry": 0, "retryIntervalInSeconds": 30, "secureOutput": false, "secureInput": false }, "userProperties": [], "typeProperties": { "source": { "type": "SqlPoolSource", "sqlReaderQuery": { "value": "SELECT * FROM dbo.CDCTableStatus \nWHERE Enabled = 1 AND TableName = '@{pipeline().parameters.DestinationDBOTable}' \n--AND LastCommitVersion is not null", "type": "Expression" }, "queryTimeout": "02:00:00" }, "dataset": { "referenceName": "SQLPoolQuery", "type": "DatasetReference" }, "firstRowOnly": true } }, { "name": "Get Latest Version", "type": "DatabricksNotebook", "dependsOn": [ { "activity": "Get Table Metadata", "dependencyConditions": [ "Succeeded" ] } ], "policy": { "timeout": "7.00:00:00", "retry": 0, "retryIntervalInSeconds": 30, "secureOutput": false, "secureInput": false }, "userProperties": [], "typeProperties": { "notebookPath": "/Shared/SynapseCDC/GenerateCDCData", "baseParameters": { "DeltaTablePath": { "value": "@activity('Get Control Table Information').output.firstRow.Path", "type": "Expression" }, "DeltaCommitVersion": "0", "ADLSAccountName": "gmadlshared", "ADLSContainerName": { "value": "@activity('Get Control Table Information').output.firstRow.ContainerName", "type": "Expression" }, "SQLServer": "null", "SQLDatabase": "null", "WriteCDCData": "false", "SQLCDCTableName": "null", "SQLCDCSchemaName": "null" } }, "linkedServiceName": { "referenceName": "AzureADFTest", "type": "LinkedServiceReference" } } ], "parameters": { "SourceDBOSchema": { "type": "string", "defaultValue": "data" }, "DestinationDBOSchema": { "type": "string", "defaultValue": "dbo" }, "DestinationDBOTable": { "type": "string", "defaultValue": "Populations" }, "SourceDBOTable": { "type": "string", "defaultValue": "Populations" } }, "folder": { "name": "DeltaCDC" }, "annotations": [] } }