一,引言
在实际的项目中,Azure Data Factroy 中的 Data Flow 并不能彻底帮我们完成一系列复制逻辑计算, 比如我们需要针对数据集的每一行数据进行判断计算,Data Flow 就显的有些吃力。别怕,Azure Data Factory 提供了调用 Azure Function 的组件,有了代码的加持,那么解决更复杂的都能迎刃而解!!那么就开始今天的表演吧
--------------------我是分割线--------------------?
二,正文
1,准备 Azure Function
打开 Azure Portal ,点击 "Create a resource" 快速创建 Azure Function

以下就是刚创建好的 Azure Function,Operating System 选择 "Windows",Runtime 选择:"node js"

添加 名字为 “Http_skip_holiday” 的 Function

Function Code:
- 1 const intercept = require("azure-function-log-intercept");
- 2
- 3 module.exports = async function (context, req) {
- 4 context.log('JavaScript HTTP trigger function processed a request.');
- 5 intercept(context);
- 6 let lo_date = (req.query.lo_date || (req.body && req.body.lo_date));
- 7 let skipday = (req.query.skipday || (req.body && req.body.skipday));
- 8 context.log("req.body:"+req.body);
- 9 context.log("lo_date:"+req.body.lo_date);
- 10 context.log("req.body:"+req.body.skipday);
- 11 //server Info
- 12
- 13 // Holiday Handling
- 14 let holidayArray = ['2023-01-01','2023-01-06','2023-01-07','2023-01-13','2023-01-14','2023-01-21','2023-01-27','2023-01-28'];
- 15 context.log("holidayArray.length: ", holidayArray.length);
- 16
- 17 let due_dateObj= calculate_dueDate(context,lo_date,holidayArray,skipday)
- 18 context.log("due_dateObj.Step: ", due_dateObj.Step);
- 19 context.res = {
- 20 status: 200, /* Defaults to 200 */
- 21 body: due_dateObj
- 22 };
- 23 }
- 24
- 25 function calculate_dueDate(context,lodate, holidayArray, num) {
- 26 "use strict";
- 27 let DueDateObj={};
- 28 let lo_date = new Date(lodate);
- 29 let Year = lo_date.getFullYear();
- 30 let Month = lo_date.getMonth();
- 31 let day = lo_date.getDate();
- 32
- 33 let dueDate;
- 34 let step = num;
- 35 let isWorkDay = false;
- 36 do {
- 37
- 38 let currentDate = new Date(Year, Month, day + step);
- 39
- 40 if (currentDate.toDateString() in holidayArray || (currentDate.getDay() < 1)) {
- 41 step++;
- 42 } else {
- 43 isWorkDay = true;
- 44 }
- 45 } while (!isWorkDay);
- 46
- 47
- 48 dueDate = new Date(Year, Month, day + step);
- 49 DueDateObj.DueDate=dueDate.toString("yyyy-MM-dd");
- 50 DueDateObj.Step=step;
- 51 context.log("dueDate:"+dueDate.toString("yyyy-MM-dd"));
- 52 return DueDateObj;
- 53 }
开启 Function 后,,我们使用 Postman 进行测试
注意:1)打开 Function 的 Filesystem Logs

2)如果Function 的访问基本不是 "" 那么就得在调用 Function 的 Url 后面加上验证身份的 Code


Postman 进行结果测试

2,Data Factory 中配置调用 Function
1)使用 LookUp 查询需要更新的数据集
2)利用 Foreach 循环编辑数据集,并根据每一天数据的 "inputdate","skipday" 作为参数调用 Azure Function

Foreach 的数据集合:

- @activity('Lookup_Data').output.value
Function 的 Body 参数配置

- @concat('{"lo_date":"',item().inputdate,'","skipday":',item().skipday,'}')
pipeline code
- {
- "name": "test_pipeline",
- "properties": {
- "activities": [
- {
- "name": "Lookup_Data",
- "type": "Lookup",
- "dependsOn": [],
- "policy": {
- "timeout": "0.12:00:00",
- "retry": 0,
- "retryIntervalInSeconds": 30,
- "secureOutput": false,
- "secureInput": false
- },
- "userProperties": [],
- "typeProperties": {
- "source": {
- "type": "DelimitedTextSource",
- "storeSettings": {
- "type": "AzureBlobStorageReadSettings",
- "recursive": true,
- "wildcardFolderPath": "AAA",
- "wildcardFileName": {
- "value": "@concat('User_*.csv')",
- "type": "Expression"
- },
- "enablePartitionDiscovery": false
- },
- "formatSettings": {
- "type": "DelimitedTextReadSettings"
- }
- },
- "dataset": {
- "referenceName": "AZURE_BLOB_CSV",
- "type": "DatasetReference",
- "parameters": {
- "ContainerName": "test",
- "DirectoryPath": "AAA",
- "FileName": {
- "value": "@concat('User_*.csv')",
- "type": "Expression"
- }
- }
- },
- "firstRowOnly": false
- }
- },
- {
- "name": "ForEach UPDATE Date",
- "type": "ForEach",
- "dependsOn": [
- {
- "activity": "Lookup_Data",
- "dependencyConditions": [
- "Succeeded"
- ]
- }
- ],
- "userProperties": [],
- "typeProperties": {
- "items": {
- "value": "@activity('Lookup_Data').output.value",
- "type": "Expression"
- },
- "activities": [
- {
- "name": "Azure_Function_SkipHoliday",
- "type": "AzureFunctionActivity",
- "dependsOn": [],
- "policy": {
- "timeout": "0.12:00:00",
- "retry": 0,
- "retryIntervalInSeconds": 30,
- "secureOutput": false,
- "secureInput": false
- },
- "userProperties": [],
- "typeProperties": {
- "functionName": "Http_skip_holiday",
- "method": "POST",
- "body": {
- "value": "@concat('{\"lo_date\":\"',item().inputdate,'\",\"skipday\":',item().skipday,'}')",
- "type": "Expression"
- }
- },
- "linkedServiceName": {
- "referenceName": "AzureFunction_LinkService",
- "type": "LinkedServiceReference"
- }
- }
- ]
- }
- }
- ],
- "annotations": []
- }
- }
点击 Debug 进行调试

三,结尾
Azure Data Factory(ADF)是Azure中的托管数据集成服务,允许我们迭代地构建、编排和监视您的ETL工作流程。Azure Functions现在已与ADF集成,允许我们在数据工厂管道中运行Azure函数作为步骤。大家多多练习!!!
作者:Allen
版权:转载请在文章明显位置注明作者及出处。如发现错误,欢迎批评指正。