Share via


Optimalisatie van gegevensverwerking voor Apache Spark

In dit artikel wordt beschreven hoe u de configuratie van uw Apache Spark-cluster optimaliseert voor de beste prestaties in Azure HDInsight.

Overzicht

Als u trage processen hebt bij een join of shuffle, is de oorzaak waarschijnlijk data skew. Datascheefheid is asymmetrie in de gegevens van uw taak. Een maptaak kan bijvoorbeeld 20 seconden duren. Het uitvoeren van een taak waarbij de gegevens worden samengevoegd of in willekeurige volgorde worden uitgevoerd, duurt echter uren. Als u gegevensscheefheid wilt oplossen, moet u de gehele sleutel salten, of een geïsoleerde salt gebruiken voor slechts een subset van sleutels. Als u een geïsoleerde salt gebruikt, dient u verder te filteren om uw gezouten sleutelsubset in map joins te isoleren. Een andere optie is om een bucketkolom toe te voegen en eerst vooraf in buckets samen te voegen.

Een andere factor die langzame joins veroorzaakt, kan het jointype zijn. Spark maakt standaard gebruik van het jointype SortMerge. Dit type join is het meest geschikt voor grote gegevenssets. Maar is anders rekenkundig duur omdat deze eerst de linker- en rechterkant van gegevens moet sorteren voordat ze worden samengevoegd.

Een Broadcast-join is het meest geschikt voor kleinere gegevenssets, of waarbij één zijde van de join veel kleiner is dan de andere zijde. Bij dit type samenvoeging wordt één zijde naar alle uitvoerders uitgezonden, waardoor er in het algemeen meer geheugen nodig is voor uitzendingen.

U kunt het jointype in uw configuratie wijzigen door spark.sql.autoBroadcastJoinThreshold in te stellen, of u kunt een join-hint instellen met behulp van de DataFrame API’s (dataframe.join(broadcast(df2))).

// Option 1
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 1*1024*1024*1024)

// Option 2
val df1 = spark.table("FactTableA")
val df2 = spark.table("dimMP")
df1.join(broadcast(df2), Seq("PK")).
    createOrReplaceTempView("V_JOIN")

sql("SELECT col1, col2 FROM V_JOIN")

Als u tabellen in buckets gebruikt, hebt u een derde jointype, de Merge-join. Met een correct vooraf gepartitioneerde en vooraf gesorteerde gegevensset wordt de dure sorteerfase in een SortMerge-join overgeslagen.

De volgorde van joins doet ertoe, met name bij meer complexe query’s. Begin met de meest selectieve joins. Verplaats ook joins die het aantal rijen na aggregaties vergroten, indien mogelijk.

Als u parallelle uitvoering voor Cartesian-joins wilt beheren, kunt u geneste structuren en vensterbewerking toevoegen, en misschien een of meer stappen overslaan in uw Spark-taak.

Taakuitvoering optimaliseren

  • Sla gegevens zo nodig in de cache op, bijvoorbeeld als u ze twee keer gebruikt.
  • Zend variabelen uit naar alle uitvoerders. De variabelen worden slechts eenmaal geserialiseerd, wat resulteert in snellere zoekacties.
  • Gebruik de threadpool in het stuurprogramma, wat resulteert in een snellere bewerking voor veel taken.

Controleer regelmatig uw lopende taken op prestatieproblemen. Als u meer inzicht nodig hebt in bepaalde problemen, kunt u een van de volgende hulpprogramma's voor prestatieprofilering overwegen:

Het belangrijkste aspect voor Spark 2.x-queryprestaties is de Tungsten-engine, die afhankelijk is van codegeneratie in de volledige fase. In sommige gevallen is codegeneratie in de volledige fase mogelijk uitgeschakeld. Als u bijvoorbeeld een niet-veranderlijk type (string) in de aggregatie-expressie gebruikt, wordt SortAggregate weergegeven in plaats van HashAggregate. Probeer bijvoorbeeld voor betere prestaties het volgende uit, en schakel vervolgens codegeneratie in:

MAX(AMOUNT) -> MAX(cast(AMOUNT as DOUBLE))

Volgende stappen