Anteckning
Åtkomst till den här sidan kräver auktorisering. Du kan prova att logga in eller ändra kataloger.
Åtkomst till den här sidan kräver auktorisering. Du kan prova att ändra kataloger.
I den här artikeln får du lära dig hur du använder Apache Spark MLlib för att skapa ett maskininlärningsprogram som utför enkel förutsägelseanalys på en öppen Azure-datauppsättning. Spark tillhandahåller inbyggda maskininlärningsbibliotek. I det här exemplet används klassificering via logistisk regression.
SparkML och MLlib är centrala Spark-bibliotek som tillhandahåller många verktyg som är användbara för maskininlärningsuppgifter, inklusive verktyg som är lämpliga för:
- Omdöme
- Tillbakagång
- Klustring
- Ämnesmodellering
- Singulär värde nedbrytning (SVD) och huvudkomponentanalys (PCA)
- Hypotestestning och beräkning av exempelstatistik
Förstå klassificering och logistisk regression
Klassificering, en populär maskininlärningsuppgift, är processen att sortera indata i kategorier. Det är en klassificeringsalgoritms jobb att ta reda på hur du tilldelar etiketter till indata som du anger. Du kan till exempel tänka på en maskininlärningsalgoritm som accepterar aktieinformation som indata och delar in aktien i två kategorier: aktier som du bör sälja och aktier som du bör behålla.
Logistisk regression är en algoritm som du kan använda för klassificering. Sparks logistiska regressions-API är användbart för binär klassificering eller klassificering av indata i en av två grupper. Mer information om logistisk regression finns i Wikipedia.
Sammanfattningsvis skapar processen för logistisk regression en logistisk funktion som du kan använda för att förutsäga sannolikheten att en indatavektor hör hemma i en grupp eller en annan.
Exempel på förutsägelseanalys på NYC-taxidata
I det här exemplet använder du Spark för att utföra en förutsägelseanalys av tipsdata för taxiresor från New York. Data är tillgängliga via Azure Open Datasets. Den här delmängden av datamängden innehåller information om gula taxiresor, inklusive information om varje resa, start- och sluttid och platser, kostnaden och andra intressanta attribut.
Viktigt!
Det kan tillkomma ytterligare avgifter för att hämta dessa data från lagringsplatsen.
I följande steg utvecklar du en modell för att förutsäga om en viss resa innehåller ett tips eller inte.
Skapa en Apache Spark-maskininlärningsmodell
Skapa en notebook-fil med hjälp av PySpark-kerneln. Anvisningar finns i Skapa en notebook-fil.
Importera de typer som krävs för det här programmet. Kopiera och klistra in följande kod i en tom cell och tryck sedan på Skift+Retur. Eller kör cellen med hjälp av den blå uppspelningsikonen till vänster om koden.
import matplotlib.pyplot as plt from datetime import datetime from dateutil import parser from pyspark.sql.functions import unix_timestamp, date_format, col, when from pyspark.ml import Pipeline from pyspark.ml import PipelineModel from pyspark.ml.feature import RFormula from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorIndexer from pyspark.ml.classification import LogisticRegression from pyspark.mllib.evaluation import BinaryClassificationMetrics from pyspark.ml.evaluation import BinaryClassificationEvaluatorPå grund av PySpark-kerneln behöver du inte skapa några kontexter explicit. Spark-kontexten skapas automatiskt åt dig när du kör den första kodcellen.
Konstruera indataramen
Eftersom rådata är i Parquet-format kan du använda Spark-kontexten för att hämta filen till minnet som en DataFrame direkt. Även om koden i följande steg använder standardalternativen är det möjligt att framtvinga mappning av datatyper och andra schemaattribut om det behövs.
Kör följande rader för att skapa en Spark DataFrame genom att klistra in koden i en ny cell. Det här steget hämtar data via API:et Öppna datauppsättningar. Om du hämtar alla dessa data genereras cirka 1,5 miljarder rader.
Beroende på storleken på din serverlösa Apache Spark-pool kan rådata vara för stora eller ta för lång tid att arbeta med. Du kan filtrera ned dessa data till något mindre. I följande kodexempel används
start_dateochend_dateför att tillämpa ett filter som returnerar en enskild månad av data.from azureml.opendatasets import NycTlcYellow from datetime import datetime from dateutil import parser end_date = parser.parse('2018-05-08 00:00:00') start_date = parser.parse('2018-05-01 00:00:00') nyc_tlc = NycTlcYellow(start_date=start_date, end_date=end_date) filtered_df = spark.createDataFrame(nyc_tlc.to_pandas_dataframe())Nackdelen med enkel filtrering är att det ur ett statistiskt perspektiv kan introducera bias i data. En annan metod är att använda den inbyggda samplingen i Spark.
Följande kod minskar datamängden till cirka 2 000 rader om den tillämpas efter föregående kod. Du kan använda det här samplingssteget i stället för det enkla filtret eller tillsammans med det enkla filtret.
# To make development easier, faster, and less expensive, downsample for now sampled_taxi_df = filtered_df.sample(True, 0.001, seed=1234)Nu är det möjligt att titta på data för att se vad som lästes. Det är normalt bättre att granska data med en delmängd i stället för den fullständiga uppsättningen, beroende på datamängdens storlek.
Följande kod erbjuder två sätt att visa data. Det första sättet är grundläggande. Det andra sättet ger en mycket mer omfattande rutnätsupplevelse, tillsammans med möjligheten att visualisera data grafiskt.
#sampled_taxi_df.show(5) display(sampled_taxi_df)Beroende på storleken på den genererade datamängden och ditt behov av att experimentera eller köra anteckningsboken många gånger, kanske du vill cachelagra datamängden lokalt i arbetsytan. Det finns tre sätt att utföra explicit cachelagring:
- Spara DataFrame lokalt som en fil.
- Spara DataFrame som en tillfällig tabell eller vy.
- Spara DataFrame som en permanent tabell.
De två första metoderna ingår i följande kodexempel.
Att skapa en tillfällig tabell eller vy ger olika åtkomstsökvägar till data, men den varar bara under hela Spark-instanssessionen.
sampled_taxi_df.createOrReplaceTempView("nytaxi")
Förbered datan
Data i dess rådataform är ofta inte lämpliga för direkt överföring till en modell. Du måste utföra en serie åtgärder på data för att få dem till ett tillstånd där modellen kan använda dem.
I följande kod utför du fyra åtgärdsklasser:
- Borttagning av avvikande värden eller felaktiga värden genom filtrering.
- Borttagning av kolumner som inte behövs.
- Skapandet av nya kolumner som härleds från rådata för att få modellen att fungera mer effektivt. Den här åtgärden kallas ibland för featurization.
- Märkning. Eftersom du utför binär klassificering (kommer det att finnas ett tips eller inte på en viss resa), finns det ett behov av att konvertera tipsbeloppet till ett 0- eller 1-värde.
taxi_df = sampled_taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'rateCodeId', 'passengerCount'\
, 'tripDistance', 'tpepPickupDateTime', 'tpepDropoffDateTime'\
, date_format('tpepPickupDateTime', 'hh').alias('pickupHour')\
, date_format('tpepPickupDateTime', 'EEEE').alias('weekdayString')\
, (unix_timestamp(col('tpepDropoffDateTime')) - unix_timestamp(col('tpepPickupDateTime'))).alias('tripTimeSecs')\
, (when(col('tipAmount') > 0, 1).otherwise(0)).alias('tipped')
)\
.filter((sampled_taxi_df.passengerCount > 0) & (sampled_taxi_df.passengerCount < 8)\
& (sampled_taxi_df.tipAmount >= 0) & (sampled_taxi_df.tipAmount <= 25)\
& (sampled_taxi_df.fareAmount >= 1) & (sampled_taxi_df.fareAmount <= 250)\
& (sampled_taxi_df.tipAmount < sampled_taxi_df.fareAmount)\
& (sampled_taxi_df.tripDistance > 0) & (sampled_taxi_df.tripDistance <= 100)\
& (sampled_taxi_df.rateCodeId <= 5)
& (sampled_taxi_df.paymentType.isin({"1", "2"}))
)
Du gör sedan en andra genomgång av datan för att lägga till de slutliga funktionerna.
taxi_featurised_df = taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'passengerCount'\
, 'tripDistance', 'weekdayString', 'pickupHour','tripTimeSecs','tipped'\
, when((taxi_df.pickupHour <= 6) | (taxi_df.pickupHour >= 20),"Night")\
.when((taxi_df.pickupHour >= 7) & (taxi_df.pickupHour <= 10), "AMRush")\
.when((taxi_df.pickupHour >= 11) & (taxi_df.pickupHour <= 15), "Afternoon")\
.when((taxi_df.pickupHour >= 16) & (taxi_df.pickupHour <= 19), "PMRush")\
.otherwise(0).alias('trafficTimeBins')
)\
.filter((taxi_df.tripTimeSecs >= 30) & (taxi_df.tripTimeSecs <= 7200))
Skapa en logistisk regressionsmodell
Den sista uppgiften är att konvertera etiketterade data till ett format som kan analyseras genom logistisk regression. Indata till en logistisk regressionsalgoritm måste vara en uppsättning etikett-/funktionsvektorpar, där funktionsvektorn är en vektor med tal som representerar indatapunkten.
Därför måste du konvertera de kategoriska kolumnerna till tal. Mer specifikt behöver du konvertera kolumnerna trafficTimeBins och weekdayString till heltalsrepresentationer. Det finns flera metoder för att utföra konverteringen. I följande exempel används metoden OneHotEncoder , vilket är vanligt.
# Because the sample uses an algorithm that works only with numeric features, convert them so they can be consumed
sI1 = StringIndexer(inputCol="trafficTimeBins", outputCol="trafficTimeBinsIndex")
en1 = OneHotEncoder(dropLast=False, inputCol="trafficTimeBinsIndex", outputCol="trafficTimeBinsVec")
sI2 = StringIndexer(inputCol="weekdayString", outputCol="weekdayIndex")
en2 = OneHotEncoder(dropLast=False, inputCol="weekdayIndex", outputCol="weekdayVec")
# Create a new DataFrame that has had the encodings applied
encoded_final_df = Pipeline(stages=[sI1, en1, sI2, en2]).fit(taxi_featurised_df).transform(taxi_featurised_df)
Den här åtgärden resulterar i en ny DataFrame med alla kolumner i rätt format för att träna en modell.
Träna en logistisk regressionsmodell
Den första uppgiften är att dela upp datamängden i en träningsuppsättning och en test- eller valideringsuppsättning. Uppdelningen här är godtycklig. Experimentera med olika delningsinställningar för att se om de påverkar modellen.
# Decide on the split between training and testing data from the DataFrame
trainingFraction = 0.7
testingFraction = (1-trainingFraction)
seed = 1234
# Split the DataFrame into test and training DataFrames
train_data_df, test_data_df = encoded_final_df.randomSplit([trainingFraction, testingFraction], seed=seed)
Nu när det finns två DataFrames är nästa uppgift att skapa modellformeln och köra den mot träningsdataramen. Sedan kan du verifiera mot testningen av DataFrame. Experimentera med olika versioner av modellformeln för att se effekten av olika kombinationer.
Anmärkning
Om du vill spara modellen tilldelar du rollen Storage Blob Data Contributor till Azure SQL Database-serverns resursomfång. Detaljerade steg finns i Tilldela Azure-roller med hjälp av Azure-portalen. Endast medlemmar med ägarbehörighet kan utföra det här steget.
## Create a new logistic regression object for the model
logReg = LogisticRegression(maxIter=10, regParam=0.3, labelCol = 'tipped')
## The formula for the model
classFormula = RFormula(formula="tipped ~ pickupHour + weekdayVec + passengerCount + tripTimeSecs + tripDistance + fareAmount + paymentType+ trafficTimeBinsVec")
## Undertake training and create a logistic regression model
lrModel = Pipeline(stages=[classFormula, logReg]).fit(train_data_df)
## Saving the model is optional, but it's another form of inter-session cache
datestamp = datetime.now().strftime('%m-%d-%Y-%s')
fileName = "lrModel_" + datestamp
logRegDirfilename = fileName
lrModel.save(logRegDirfilename)
## Predict tip 1/0 (yes/no) on the test dataset; evaluation using area under ROC
predictions = lrModel.transform(test_data_df)
predictionAndLabels = predictions.select("label","prediction").rdd
metrics = BinaryClassificationMetrics(predictionAndLabels)
print("Area under ROC = %s" % metrics.areaUnderROC)
Utdata från den här cellen är:
Area under ROC = 0.9779470729751403
Skapa en visuell representation av förutsägelsen
Nu kan du skapa en slutlig visualisering som hjälper dig att resonera kring resultatet av det här testet. En ROC-kurva är ett sätt att granska resultatet.
## Plot the ROC curve; no need for pandas, because this uses the modelSummary object
modelSummary = lrModel.stages[-1].summary
plt.plot([0, 1], [0, 1], 'r--')
plt.plot(modelSummary.roc.select('FPR').collect(),
modelSummary.roc.select('TPR').collect())
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.show()
Stäng av Spark-instansen
När du har kört programmet stänger du anteckningsboken för att frigöra resurserna genom att stänga fliken. Eller välj Avsluta session från statuspanelen längst ned i anteckningsboken.
Se även
Nästa steg
- Dokumentation om .NET för Apache Spark
- Azure Synapse Analytics
- Officiell dokumentation om Apache Spark
Anmärkning
En del av den officiella Apache Spark-dokumentationen förlitar sig på att använda Spark-konsolen, som inte är tillgänglig på Apache Spark i Azure Synapse Analytics. Använd notebook- eller IntelliJ-upplevelserna i stället.