Share via


Zelfstudie: Een machine learning-app bouwen met Apache Spark MLlib en Azure Synapse Analytics

In dit artikel leert u hoe u Apache Spark MLlib gebruikt om een machine learning-toepassing te maken die eenvoudige voorspellende analyses uitvoert op een open Azure-gegevensset. Spark biedt ingebouwde machine learning-bibliotheken. In dit voorbeeld wordt gebruikgemaakt van classificatie via logistieke regressie.

SparkML en MLlib zijn kernbibliotheken van Spark die veel hulpprogramma's bieden die nuttig zijn voor machine learning-taken, waaronder hulpprogramma's die geschikt zijn voor:

  • Classificatie
  • Regressie
  • Clustervorming
  • Onderwerpmodellering
  • Enkelvoudige waardeontleding (SVD) en hoofdcomponentenanalyse (PCA)
  • Hypothese testen en berekenen van steekproefstatistieken

Inzicht in classificatie en logistieke regressie

Classificatie, een populaire machine learning-taak, is het proces van het sorteren van invoergegevens in categorieën. Het is de taak van een classificatie-algoritme om erachter te komen hoe labels toe te wijzen aan invoergegevens die u opgeeft. U kunt bijvoorbeeld een machine learning-algoritme bedenken dat aandelengegevens als invoer accepteert en het aandeel in twee categorieën verdeelt: aandelen die u moet verkopen en aandelen die u moet behouden.

Logistieke regressie is een algoritme dat u kunt gebruiken voor classificatie. De logistieke regressie-API van Spark is handig voor binaire classificatie of het classificeren van invoergegevens in een van twee groepen. Zie Wikipedia voor meer informatie over logistieke regressie.

Samenvattend produceert het proces van logistieke regressie een logistieke functie die u kunt gebruiken om de waarschijnlijkheid te voorspellen dat een invoervector deel uitmaakt van de ene groep of de andere.

Voorbeeld van voorspellende analyse van taxigegevens in NYC

In dit voorbeeld gebruikt u Spark om een voorspellende analyse uit te voeren op taxirittipgegevens uit New York. De gegevens zijn beschikbaar via Azure Open Datasets. Deze subset van de gegevensset bevat informatie over gele taxiritten, waaronder informatie over elke reis, de begin- en eindtijd en locaties, de kosten en andere interessante kenmerken.

Belangrijk

Er kunnen extra kosten gelden voor het ophalen van deze gegevens uit de opslaglocatie.

In de volgende stappen ontwikkelt u een model om te voorspellen of een bepaalde reis een tip bevat of niet.

Een Machine Learning-model voor Apache Spark maken

  1. Maak een notebook met behulp van de PySpark-kernel. Zie Een notebook maken voor instructies.

  2. Importeer de typen die vereist zijn voor deze toepassing. Kopieer en plak de volgende code in een lege cel en druk op Shift+Enter. Of voer de cel uit met behulp van het blauwe afspeelicoon links van de code.

    import matplotlib.pyplot as plt
    from datetime import datetime
    from dateutil import parser
    from pyspark.sql.functions import unix_timestamp, date_format, col, when
    from pyspark.ml import Pipeline
    from pyspark.ml import PipelineModel
    from pyspark.ml.feature import RFormula
    from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorIndexer
    from pyspark.ml.classification import LogisticRegression
    from pyspark.mllib.evaluation import BinaryClassificationMetrics
    from pyspark.ml.evaluation import BinaryClassificationEvaluator
    

    Vanwege de PySpark-kernel hoeft u niet expliciet contexten te maken. De Spark-context wordt automatisch voor u gemaakt wanneer u de eerste codecel uitvoert.

Het dataframe voor invoer maken

Omdat de onbewerkte gegevens een Parquet-indeling hebben, kunt u de Spark-context gebruiken om het bestand rechtstreeks als een DataFrame in het geheugen op te halen. Hoewel de code in de volgende stappen gebruikmaakt van de standaardopties, is het mogelijk om indien nodig de toewijzing van gegevenstypen en andere schemakenmerken af te dwingen.

  1. Voer de volgende regels uit om een Spark DataFrame te maken door de code in een nieuwe cel te plakken. Met deze stap worden de gegevens opgehaald via de Open Datasets-API. Als u al deze gegevens ophaalt, worden er ongeveer 1,5 miljard rijen gegenereerd.

    Afhankelijk van de grootte van uw serverloze Apache Spark-pool zijn de onbewerkte gegevens mogelijk te groot of duurt het te veel tijd om aan te werken. U kunt deze gegevens filteren op iets kleiners. In het volgende codevoorbeeld worden start_date en end_date gebruikt om een filter toe te passen dat één maand gegevens retourneert.

    from azureml.opendatasets import NycTlcYellow
    
    from datetime import datetime
    from dateutil import parser
    
    end_date = parser.parse('2018-05-08 00:00:00')
    start_date = parser.parse('2018-05-01 00:00:00')
    
    nyc_tlc = NycTlcYellow(start_date=start_date, end_date=end_date)
    filtered_df = spark.createDataFrame(nyc_tlc.to_pandas_dataframe())
    
    
  2. Het nadeel van eenvoudig filteren is dat het vanuit een statistisch perspectief vooroordelen kan veroorzaken in de gegevens. Een andere benadering is het gebruik van de steekproeven die zijn ingebouwd in Spark.

    De volgende code vermindert de gegevensset tot ongeveer 2000 rijen, als deze wordt toegepast na de voorgaande code. U kunt deze steekproefstap gebruiken in plaats van het eenvoudige filter of in combinatie met het eenvoudige filter.

    # To make development easier, faster, and less expensive, downsample for now
    sampled_taxi_df = filtered_df.sample(True, 0.001, seed=1234)
    
  3. Het is nu mogelijk om de gegevens te bekijken om te zien wat er is gelezen. Het is normaal gesproken beter om gegevens te controleren met een subset in plaats van de volledige set, afhankelijk van de grootte van de gegevensset.

    De volgende code biedt twee manieren om de gegevens weer te geven. De eerste manier is eenvoudig. De tweede manier biedt een veel rijkere rasterervaring, samen met de mogelijkheid om de gegevens grafisch te visualiseren.

    #sampled_taxi_df.show(5)
    display(sampled_taxi_df)
    
  4. Afhankelijk van de grootte van de gegenereerde gegevensset en uw behoefte om het notebook meerdere keren te experimenteren of uit te voeren, wilt u de gegevensset mogelijk lokaal in de cache opslaan in de werkruimte. Er zijn drie manieren om expliciete caching uit te voeren:

    • Sla het DataFrame lokaal op als een bestand.
    • Sla het DataFrame op als een tijdelijke tabel of weergave.
    • Sla het DataFrame op als een permanente tabel.

De eerste twee van deze benaderingen zijn opgenomen in de volgende codevoorbeelden.

Het maken van een tijdelijke tabel of weergave biedt verschillende toegangspaden naar de gegevens, maar deze duurt alleen voor de duur van de Spark-exemplaarsessie.

sampled_taxi_df.createOrReplaceTempView("nytaxi")

De gegevens voorbereiden

De gegevens in de onbewerkte vorm zijn vaak niet geschikt voor het rechtstreeks doorgeven aan een model. U moet een reeks acties uitvoeren op de gegevens om deze in een status te krijgen waarin het model deze kan gebruiken.

In de volgende code voert u vier klassen bewerkingen uit:

  • Het verwijderen van uitbijters of onjuiste waarden door te filteren.
  • Het verwijderen van kolommen, die niet nodig zijn.
  • Het maken van nieuwe kolommen die zijn afgeleid van de onbewerkte gegevens om het model effectiever te laten werken. Soms wordt deze bewerking featurisatie genoemd.
  • Etikettering. Omdat u binaire classificatie uitvoert (is er een tip of niet op een bepaalde reis), moet u het fooibedrag omzetten in een waarde van 0 of 1.
taxi_df = sampled_taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'rateCodeId', 'passengerCount'\
                                , 'tripDistance', 'tpepPickupDateTime', 'tpepDropoffDateTime'\
                                , date_format('tpepPickupDateTime', 'hh').alias('pickupHour')\
                                , date_format('tpepPickupDateTime', 'EEEE').alias('weekdayString')\
                                , (unix_timestamp(col('tpepDropoffDateTime')) - unix_timestamp(col('tpepPickupDateTime'))).alias('tripTimeSecs')\
                                , (when(col('tipAmount') > 0, 1).otherwise(0)).alias('tipped')
                                )\
                        .filter((sampled_taxi_df.passengerCount > 0) & (sampled_taxi_df.passengerCount < 8)\
                                & (sampled_taxi_df.tipAmount >= 0) & (sampled_taxi_df.tipAmount <= 25)\
                                & (sampled_taxi_df.fareAmount >= 1) & (sampled_taxi_df.fareAmount <= 250)\
                                & (sampled_taxi_df.tipAmount < sampled_taxi_df.fareAmount)\
                                & (sampled_taxi_df.tripDistance > 0) & (sampled_taxi_df.tripDistance <= 100)\
                                & (sampled_taxi_df.rateCodeId <= 5)
                                & (sampled_taxi_df.paymentType.isin({"1", "2"}))
                                )

Vervolgens maakt u een tweede pass over de gegevens om de uiteindelijke functies toe te voegen.

taxi_featurised_df = taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'passengerCount'\
                                                , 'tripDistance', 'weekdayString', 'pickupHour','tripTimeSecs','tipped'\
                                                , when((taxi_df.pickupHour <= 6) | (taxi_df.pickupHour >= 20),"Night")\
                                                .when((taxi_df.pickupHour >= 7) & (taxi_df.pickupHour <= 10), "AMRush")\
                                                .when((taxi_df.pickupHour >= 11) & (taxi_df.pickupHour <= 15), "Afternoon")\
                                                .when((taxi_df.pickupHour >= 16) & (taxi_df.pickupHour <= 19), "PMRush")\
                                                .otherwise(0).alias('trafficTimeBins')
                                              )\
                                       .filter((taxi_df.tripTimeSecs >= 30) & (taxi_df.tripTimeSecs <= 7200))

Een logistiek regressiemodel maken

De laatste taak is het converteren van de gelabelde gegevens naar een indeling die kan worden geanalyseerd via logistieke regressie. De invoer voor een logistiek regressiealgoritme moet een set label-/functievectorparen zijn, waarbij de functievector een vector is van getallen die het invoerpunt vertegenwoordigen.

U moet dus de categorische kolommen converteren naar getallen. U moet specifiek de kolommen trafficTimeBins en weekdayString converteren naar gehele getallen. Er zijn meerdere methoden om de conversie uit te voeren. In het volgende voorbeeld wordt de OneHotEncoder benadering gebruikt. Dit is gebruikelijk.

# Because the sample uses an algorithm that works only with numeric features, convert them so they can be consumed
sI1 = StringIndexer(inputCol="trafficTimeBins", outputCol="trafficTimeBinsIndex")
en1 = OneHotEncoder(dropLast=False, inputCol="trafficTimeBinsIndex", outputCol="trafficTimeBinsVec")
sI2 = StringIndexer(inputCol="weekdayString", outputCol="weekdayIndex")
en2 = OneHotEncoder(dropLast=False, inputCol="weekdayIndex", outputCol="weekdayVec")

# Create a new DataFrame that has had the encodings applied
encoded_final_df = Pipeline(stages=[sI1, en1, sI2, en2]).fit(taxi_featurised_df).transform(taxi_featurised_df)

Deze actie resulteert in een nieuw DataFrame met alle kolommen in de juiste indeling om een model te trainen.

Een logistiek regressiemodel trainen

De eerste taak is het splitsen van de gegevensset in een trainingsset en een test- of validatieset. De splitsing hier is willekeurig. Experimenteer met verschillende splitsinstellingen om te zien of deze van invloed zijn op het model.

# Decide on the split between training and testing data from the DataFrame
trainingFraction = 0.7
testingFraction = (1-trainingFraction)
seed = 1234

# Split the DataFrame into test and training DataFrames
train_data_df, test_data_df = encoded_final_df.randomSplit([trainingFraction, testingFraction], seed=seed)

Nu er twee DataFrames zijn, is de volgende taak om de modelformule te maken en uit te voeren op het trainingsgegevensframe. Vervolgens kunt u valideren op basis van het testdataframe. Experimenteer met verschillende versies van de modelformule om de impact van verschillende combinaties te bekijken.

Opmerking

Als u het model wilt opslaan, wijst u de rol Inzender voor opslagblobgegevens toe aan het resourcebereik van de Azure SQL Database-server. Zie Azure-rollen toewijzen met behulp van de Azure-portalvoor gedetailleerde stappen. Alleen leden met eigenaarsbevoegdheden kunnen deze stap uitvoeren.

## Create a new logistic regression object for the model
logReg = LogisticRegression(maxIter=10, regParam=0.3, labelCol = 'tipped')

## The formula for the model
classFormula = RFormula(formula="tipped ~ pickupHour + weekdayVec + passengerCount + tripTimeSecs + tripDistance + fareAmount + paymentType+ trafficTimeBinsVec")

## Undertake training and create a logistic regression model
lrModel = Pipeline(stages=[classFormula, logReg]).fit(train_data_df)

## Saving the model is optional, but it's another form of inter-session cache
datestamp = datetime.now().strftime('%m-%d-%Y-%s')
fileName = "lrModel_" + datestamp
logRegDirfilename = fileName
lrModel.save(logRegDirfilename)

## Predict tip 1/0 (yes/no) on the test dataset; evaluation using area under ROC
predictions = lrModel.transform(test_data_df)
predictionAndLabels = predictions.select("label","prediction").rdd
metrics = BinaryClassificationMetrics(predictionAndLabels)
print("Area under ROC = %s" % metrics.areaUnderROC)

De uitvoer van deze cel is:

Area under ROC = 0.9779470729751403

Een visuele weergave van de voorspelling maken

U kunt nu een definitieve visualisatie maken om u te helpen redeneren over de resultaten van deze test. Een ROC-curve is een manier om het resultaat te controleren.

## Plot the ROC curve; no need for pandas, because this uses the modelSummary object
modelSummary = lrModel.stages[-1].summary

plt.plot([0, 1], [0, 1], 'r--')
plt.plot(modelSummary.roc.select('FPR').collect(),
         modelSummary.roc.select('TPR').collect())
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.show()

Grafiek met de ROC-curve voor logistieke regressie in het tipmodel.

Sluit het Spark-exemplaar af

Nadat u de toepassing hebt uitgevoerd, sluit u het notebook af om de resources vrij te geven door het tabblad te sluiten. Of selecteer Sessie beëindigen in het statusvenster onderaan het notitieblok.

Zie ook

Volgende stappen

Opmerking

Sommige officiële Apache Spark-documentatie is afhankelijk van het gebruik van de Spark-console, die niet beschikbaar is in Apache Spark in Azure Synapse Analytics. Gebruik in plaats daarvan de notebook - of IntelliJ-ervaringen .