Dela via


Använda anpassade aktiviteter i en Azure Data Factory- eller Azure Synapse Analytics-pipeline

GÄLLER FÖR: Azure Data Factory Azure Synapse Analytics

Tips

Prova Data Factory i Microsoft Fabric, en allt-i-ett-analyslösning för företag. Microsoft Fabric omfattar allt från dataflytt till datavetenskap, realtidsanalys, business intelligence och rapportering. Lär dig hur du startar en ny utvärderingsversion kostnadsfritt!

Det finns två typer av aktiviteter som du kan använda i en Azure Data Factory- eller Synapse-pipeline.

Om du vill flytta data till/från ett datalager som tjänsten inte stöder, eller transformera/bearbeta data på ett sätt som inte stöds av tjänsten, kan du skapa en anpassad aktivitet med din egen dataflytt eller transformeringslogik och använda aktiviteten i en pipeline. Den anpassade aktiviteten kör din anpassade kodlogik på en Azure Batch-pool med virtuella datorer.

Kommentar

Vi rekommenderar att du använder Azure Az PowerShell-modulen för att interagera med Azure. Se Installera Azure PowerShell för att komma igång. Information om hur du migrerar till Az PowerShell-modulen finns i artikeln om att migrera Azure PowerShell från AzureRM till Az.

Se följande artiklar om du är nybörjare på Azure Batch-tjänsten:

Viktigt!

När du skapar en ny Azure Batch-pool måste "VirtualMachineConfiguration" användas och INTE "CloudServiceConfiguration".

Lägga till anpassade aktiviteter i en pipeline med användargränssnittet

Utför följande steg för att använda en anpassad aktivitet i en pipeline:

  1. Sök efter Anpassad i panelen Pipelineaktiviteter och dra en Anpassad aktivitet till pipelinearbetsytan.

  2. Välj den nya anpassade aktiviteten på arbetsytan om den inte redan är markerad.

  3. Välj fliken Azure Batch för att välja eller skapa en ny länkad Azure Batch-tjänst som ska köra den anpassade aktiviteten.

    Visar användargränssnittet för en anpassad aktivitet.

  4. Välj fliken Inställningar och ange ett kommando som ska köras på Azure Batch och valfri avancerad information.

    Visar användargränssnittet för fliken Inställningar för en anpassad aktivitet.

Länkad Azure Batch-tjänst

Följande JSON definierar en Azure Batch-länkad exempeltjänst. Mer information finns i Beräkningsmiljöer som stöds

{
    "name": "AzureBatchLinkedService",
    "properties": {
        "type": "AzureBatch",
        "typeProperties": {
            "accountName": "batchaccount",
            "accessKey": {
                "type": "SecureString",
                "value": "access key"
            },
            "batchUri": "https://batchaccount.region.batch.azure.com",
            "poolName": "poolname",
            "linkedServiceName": {
                "referenceName": "StorageLinkedService",
                "type": "LinkedServiceReference"
            }
        }
    }
}

Mer information om länkad Azure Batch-tjänst finns i artikeln Compute linked services (Beräkningslänkade tjänster ).

Anpassad aktivitet

Följande JSON-kodfragment definierar en pipeline med en enkel anpassad aktivitet. Aktivitetsdefinitionen har en referens till den länkade Azure Batch-tjänsten.

{
  "name": "MyCustomActivityPipeline",
  "properties": {
    "description": "Custom activity sample",
    "activities": [{
      "type": "Custom",
      "name": "MyCustomActivity",
      "linkedServiceName": {
        "referenceName": "AzureBatchLinkedService",
        "type": "LinkedServiceReference"
      },
      "typeProperties": {
        "command": "helloworld.exe",
        "folderPath": "customactv2/helloworld",
        "resourceLinkedService": {
          "referenceName": "StorageLinkedService",
          "type": "LinkedServiceReference"
        }
      }
    }]
  }
}

I det här exemplet är helloworld.exe ett anpassat program som lagras i mappen customactv2/helloworld för Azure Storage-kontot som används i resourceLinkedService. Den anpassade aktiviteten skickar det här anpassade programmet som ska köras i Azure Batch. Du kan ersätta kommandot med valfritt föredraget program som kan köras på målåtgärdssystemet för Azure Batch-poolnoderna.

I följande tabell beskrivs namn och beskrivningar av egenskaper som är specifika för den här aktiviteten.

Fastighet Beskrivning Obligatoriskt
namn Namnet på aktiviteten i pipelinen Ja
beskrivning Text som beskriver vad aktiviteten gör. Nej
typ För Anpassad aktivitet är aktivitetstypen Anpassad. Ja
länkadtjänstnamn Länkad tjänst till Azure Batch. Mer information om den här länkade tjänsten finns i artikeln Compute linked services (Beräkningslänkade tjänster ). Ja
kommando Kommando för det anpassade program som ska köras. Om programmet redan är tillgängligt på Azure Batch-poolnoden kan resourceLinkedService och folderPath utelämnas. Du kan till exempel ange kommandot som cmd /c dir, som stöds internt av noden Windows Batch-pool. Ja
resurslänkad tjänst Länkad Azure Storage-tjänst till lagringskontot där det anpassade programmet lagras Nej*
folderPath Sökväg till mappen för det anpassade programmet och alla dess beroenden

Om du har beroenden som lagras i undermappar, dvs. i en hierarkisk mappstruktur under folderPath , plattas mappstrukturen för närvarande ut när filerna kopieras till Azure Batch. Alla filer kopieras alltså till en enda mapp utan undermappar. Du kan kringgå det här beteendet genom att komprimera filerna, kopiera den komprimerade filen och sedan packa upp den med anpassad kod på önskad plats.
Nej*
referensobjekt En matris med befintliga länkade tjänster och datauppsättningar. De länkade tjänsterna och datauppsättningarna skickas till det anpassade programmet i JSON-format så att din anpassade kod kan referera till tjänstens resurser Nej
utökadeEgenskaper Användardefinierade egenskaper som kan skickas till det anpassade programmet i JSON-format så att din anpassade kod kan referera till ytterligare egenskaper Nej
lagringstidIDagar Kvarhållningstiden för filer som skickas in för anpassad aktivitet. Standardvärdet är 30 dagar. Nej

* Egenskaperna resourceLinkedService och folderPath måste antingen båda anges eller båda utelämnas.

Anteckning

Om du skickar länkade tjänster som referenceObjects i anpassad aktivitet är det en bra säkerhetspraxis att skicka en Azure Key Vault-aktiverad länkad tjänst (eftersom den inte innehåller några säkra strängar) och hämta autentiseringsuppgifterna med hemligt namn direkt från Key Vault från koden. Du hittar ett exempel här som refererar till den AKV-aktiverade länkade tjänsten, hämtar autentiseringsuppgifterna från Key Vault och sedan kommer åt lagringen i koden.

Kommentar

För närvarande stöds endast Azure Blob Storage för resourceLinkedService i anpassade aktiviteter, och det är den enda länkade tjänsten som skapas som standard utan möjlighet att välja andra anslutningar som ADLS Gen2.

Behörigheter för anpassad aktivitet

Den anpassade aktiviteten anger det automatiska Azure Batch-användarkontot till Icke-administratörsåtkomst med uppgiftsomfång (standardspecifikationen för automatisk användare). Du kan inte ändra behörighetsnivån för det automatiska användarkontot. Mer information finns i Kör uppgifter under användarkonton i Batch | Automatiska användarkonton.

Utföra kommandon

Du kan köra ett kommando direkt med anpassad aktivitet. I följande exempel körs kommandot "echo hello world" på målnoderna för Azure Batch-pooler och skriver ut utdata till stdout.

{
  "name": "MyCustomActivity",
  "properties": {
    "description": "Custom activity sample",
    "activities": [{
      "type": "Custom",
      "name": "MyCustomActivity",
      "linkedServiceName": {
        "referenceName": "AzureBatchLinkedService",
        "type": "LinkedServiceReference"
      },
      "typeProperties": {
        "command": "cmd /c echo hello world"
      }
    }]
  }
}

Överföra objekt och egenskaper

Det här exemplet visar hur du kan använda referenceObjects och extendedProperties för att skicka objekt och användardefinierade egenskaper från tjänsten till ditt anpassade program.

{
  "name": "MyCustomActivityPipeline",
  "properties": {
    "description": "Custom activity sample",
    "activities": [{
      "type": "Custom",
      "name": "MyCustomActivity",
      "linkedServiceName": {
        "referenceName": "AzureBatchLinkedService",
        "type": "LinkedServiceReference"
      },
      "typeProperties": {
        "command": "SampleApp.exe",
        "folderPath": "customactv2/SampleApp",
        "resourceLinkedService": {
          "referenceName": "StorageLinkedService",
          "type": "LinkedServiceReference"
        },
        "referenceObjects": {
          "linkedServices": [{
            "referenceName": "AzureBatchLinkedService",
            "type": "LinkedServiceReference"
          }]
        },
        "extendedProperties": {          
          "connectionString": {
            "type": "SecureString",
            "value": "aSampleSecureString"
          },
          "PropertyBagPropertyName1": "PropertyBagValue1",
          "propertyBagPropertyName2": "PropertyBagValue2",
          "dateTime1": "2015-04-12T12:13:14Z"
        }
      }
    }]
  }
}

När aktiviteten körs lagras referenceObjects och extendedProperties i följande filer som distribueras till samma körningsmapp i SampleApp.exe:

  • activity.json

    Lagrar utökade egenskaper och egenskaper för den anpassade aktiviteten.

  • linkedServices.json

    Lagrar en matris med länkade tjänster som definierats i egenskapen referenceObjects.

  • datasets.json

    Lagrar en matris med datauppsättningar som definierats i egenskapen referenceObjects.

Följande exempelkod visar hur SampleApp.exe kan komma åt nödvändig information från JSON-filer:

using Newtonsoft.Json;
using System;
using System.IO;

namespace SampleApp
{
    class Program
    {
        static void Main(string[] args)
        {
            //From Extend Properties
            dynamic activity = JsonConvert.DeserializeObject(File.ReadAllText("activity.json"));
            Console.WriteLine(activity.typeProperties.extendedProperties.connectionString.value);

            // From LinkedServices
            dynamic linkedServices = JsonConvert.DeserializeObject(File.ReadAllText("linkedServices.json"));
            Console.WriteLine(linkedServices[0].properties.typeProperties.accountName);
        }
    }
}

Hämta körningsutdata

Du kan starta en pipelinekörning med följande PowerShell-kommando:

$runId = Invoke-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineName $pipelineName

När pipelinen är igång kan du kontrollera exekveringsutdata med hjälp av följande kommandon:

while ($True) {
    $result = Get-AzDataFactoryV2ActivityRun -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineRunId $runId -RunStartedAfter (Get-Date).AddMinutes(-30) -RunStartedBefore (Get-Date).AddMinutes(30)

    if(!$result) {
        Write-Host "Waiting for pipeline to start..." -foregroundcolor "Yellow"
    }
    elseif (($result | Where-Object { $_.Status -eq "InProgress" } | Measure-Object).count -ne 0) {
        Write-Host "Pipeline run status: In Progress" -foregroundcolor "Yellow"
    }
    else {
        Write-Host "Pipeline '"$pipelineName"' run finished. Result:" -foregroundcolor "Yellow"
        $result
        break
    }
    ($result | Format-List | Out-String)
    Start-Sleep -Seconds 15
}

Write-Host "Activity `Output` section:" -foregroundcolor "Yellow"
$result.Output -join "`r`n"

Write-Host "Activity `Error` section:" -foregroundcolor "Yellow"
$result.Error -join "`r`n"

Stdout och stderr för ditt anpassade program sparas i containern adfjobs i den länkade Azure Storage-tjänsten du definierade när du skapade Azure Batch Linked Service, med en GUID för uppgiften. Du kan få den detaljerade sökvägen från Activity Run-utdata med hjälp av följande kodsnutt:

Pipeline ' MyCustomActivity' run finished. Result:

ResourceGroupName : resourcegroupname
DataFactoryName   : datafactoryname
ActivityName      : MyCustomActivity
PipelineRunId     : xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx
PipelineName      : MyCustomActivity
Input             : {command}
Output            : {exitcode, outputs, effectiveIntegrationRuntime}
LinkedServiceName :
ActivityRunStart  : 10/5/2017 3:33:06 PM
ActivityRunEnd    : 10/5/2017 3:33:28 PM
DurationInMs      : 21203
Status            : Succeeded
Error             : {errorCode, message, failureType, target}

Activity Output section:
"exitcode": 0
"outputs": [
  "https://<container>.blob.core.windows.net/adfjobs/<GUID>/output/stdout.txt",
  "https://<container>.blob.core.windows.net/adfjobs/<GUID>/output/stderr.txt"
]
"effectiveIntegrationRuntime": "DefaultIntegrationRuntime (East US)"
Activity Error section:
"errorCode": ""
"message": ""
"failureType": ""
"target": "MyCustomActivity"

Om du vill använda innehållet i stdout.txt i underordnade aktiviteter kan du hämta sökvägen till stdout.txt-filen i uttrycket "@activity('MyCustomActivity').output.outputs[0]".

Viktigt!

  • Activity.json, linkedServices.json och datasets.json lagras i körningsmappen för Batch-aktiviteten. I det här exemplet lagras activity.json, linkedServices.json och datasets.json i https://adfv2storage.blob.core.windows.net/adfjobs/<GUID>/runtime/ sökvägen. Om det behövs måste du rensa dem separat.
  • För länkade tjänster som använder lokalt installerad integrationskörning krypteras känslig information som nycklar eller lösenord av integrationskörningen med egen värd för att säkerställa att autentiseringsuppgifterna finns kvar i den kunddefinierade privata nätverksmiljön. Vissa känsliga fält kan saknas när du refererar till din anpassade programkod på det här sättet. Använd SecureString i extendedProperties i stället för att använda länkad tjänstreferens om det behövs.

Skicka utdata till en annan aktivitet

Du kan skicka anpassade värden från koden i en anpassad aktivitet tillbaka till tjänsten. Du kan göra det genom att skriva in dem i outputs.json från ditt program. Tjänsten kopierar innehållet i outputs.json och lägger till det som värdet på egenskapen customOutput i aktivitetsutdata. (Storleksgränsen är 2 MB.) Om du vill använda innehållet outputs.json i i underordnade aktiviteter kan du hämta värdet med uttrycket @activity('<MyCustomActivity>').output.customOutput.

Hämta SecureString-utdata

Känsliga egenskapsvärden som är avsedda som typen SecureString, som visas i några av exemplen i den här artikeln, maskeras på fliken Övervakning i användargränssnittet. Vid faktisk pipelinekörning serialiseras dock en SecureString-egenskap som JSON i activity.json-filen som klartext. Till exempel:

"extendedProperties": {
  "connectionString": {
    "type": "SecureString",
    "value": "aSampleSecureString"
  }
}

Den här serialiseringen är inte säker och är inte avsedd att vara säker. Avsikten är ett tips till tjänsten om att maskera värdet på fliken Övervakning.

Om du vill komma åt egenskaper av typen SecureString från en anpassad aktivitet läser activity.json du filen, som placeras i samma mapp som din .EXE, deserialiserar JSON och kommer sedan åt JSON-egenskapen (extendedProperties => [propertyName] => värde).

Automatisk skalning av Azure Batch

Du kan också skapa en Azure Batch-pool med funktionen autoskalning . Du kan till exempel skapa en Azure-batchpool med 0 dedikerade virtuella datorer och en autoskalningsformel baserat på antalet väntande aktiviteter.

Exempelformeln här uppnår följande beteende: När poolen initialt skapas, börjar den med en virtuell dator. $PendingTasks metrik definierar antalet uppgifter i körande + aktivt (i kö) tillstånd. Formeln hittar det genomsnittliga antalet väntande aktiviteter under de senaste 180 sekunderna och anger TargetDedicated i enlighet med detta. Det säkerställer att TargetDedicated aldrig går längre än 25 virtuella datorer. När nya uppgifter skickas in växer poolen automatiskt och när uppgifterna slutförs blir virtuella datorer tillgängliga en i taget och den automatiska skalningen minskar antalet virtuella datorer. startingNumberOfVMs och maxNumberofVMs kan justeras efter dina behov.

Autoskalningsformel:

startingNumberOfVMs = 1;
maxNumberofVMs = 25;
pendingTaskSamplePercent = $PendingTasks.GetSamplePercent(180 * TimeInterval_Second);
pendingTaskSamples = pendingTaskSamplePercent < 70 ? startingNumberOfVMs : avg($PendingTasks.GetSample(180 * TimeInterval_Second));
$TargetDedicated=min(maxNumberofVMs,pendingTaskSamples);

Se Skala beräkningsnoder automatiskt i en Azure Batch-pool för mer information.

Om poolen använder standardvärdet autoScaleEvaluationInterval kan Det ta 15–30 minuter för Batch-tjänsten att förbereda den virtuella datorn innan den anpassade aktiviteten körs. Om poolen använder ett annat autoScaleEvaluationInterval kan Batch-tjänsten beräkna autoScaleEvaluationInterval + 10 minuter.

Se följande artiklar som förklarar hur du transformerar data på andra sätt: