Pengembangan model di PySpark ML pada dataset dengan tipe data berbeda untuk rusty dummies

Apakah Anda sudah tahu cara bekerja dengan berbagai tipe data di PySpark ML? Tidak? Maka Anda sangat perlu mengunjungi kami.



gambar



Halo! Saya ingin membahas secara detail satu hal yang menarik, tetapi, sayangnya, bukan topik dalam dokumentasi Spark: bagaimana cara melatih model di PySpark ML pada dataset dengan tipe data yang berbeda (string dan angka)? Keinginan untuk menulis artikel ini disebabkan oleh kebutuhan untuk menjelajahi Internet selama beberapa hari untuk mencari artikel yang diperlukan dengan kode tersebut, karena tutorial resmi dari Spark memberikan contoh bekerja tidak hanya dengan tanda-tanda dari satu tipe data, tetapi umumnya dengan satu tanda, tetapi informasi tentang cara bekerja dengan beberapa kolom semakin banyak jenis data yang berbeda, tidak ada. Namun, setelah mempelajari secara rinci kemampuan PySpark untuk bekerja dengan data, saya berhasil menulis kode yang berfungsi dan memahami bagaimana semuanya terjadi, yang ingin saya bagikan dengan Anda. Jadi kecepatan penuh, teman!



Awalnya, mari impor semua perpustakaan yang diperlukan untuk bekerja, dan kemudian kita akan menganalisis kode secara rinci sehingga setiap "teko berkarat" yang menghargai diri sendiri, seperti, omong-omong, saya baru-baru ini, akan memahami semuanya:



#  
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import *
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
import pyspark.sql.functions as sf
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.regression import GBTRegressor
#other types of regression models
#     
#from pyspark.ml.regression import LinearRegression
#from pyspark.ml.regression import RandomForestRegressor
#from pyspark.ml.regression import GeneralizedLinearRegression
#from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator


Sekarang mari kita buat konteks Spark (lokal) dan sesi Spark dan periksa apakah semuanya berfungsi dengan menampilkannya di layar. Membuat sesi Spark adalah titik awal untuk bekerja dengan kumpulan data di Spark:



#  
sc = SparkContext('local')
spark = SparkSession(sc)
spark






Ada alat untuk bekerja dengan data, sekarang mari kita muat. Artikel ini menggunakan kumpulan data yang diambil dari situs kompetisi machine learning Kaggle:

https://www.kaggle.com/unitednations/international-greenhouse-gas-emissions

yang, setelah diunduh, disimpan di path_csv dalam format .csv dan memiliki opsi berikut:



  • header: jika baris pertama pada file kita adalah header, maka kita beri tanda "true"
  • pembatas: kami memberi tanda yang memisahkan data dari satu baris dengan tanda, seringkali itu adalah "," atau ";"
  • inferSchema: jika "benar", maka PySpark akan secara otomatis mendeteksi jenis setiap kolom, jika tidak Anda harus menulisnya sendiri


#   .csv  path_csv
path_csv = 'greenhouse_gas_inventory_data_data.csv'
data = spark.read.format("csv")\
        .option("header", "true")\
        .option("delimiter", ",")\
        .option("inferSchema", "true")\
        .load(path_csv)


Untuk lebih memahami jenis data yang kita hadapi, mari kita lihat beberapa barisnya:



#   
data.show()




Mari kita lihat juga berapa banyak baris yang kita miliki dalam dataset:

#  
data.select('year').count()






Dan terakhir, mari kita simpulkan jenis data kita, yang, seperti yang kita ingat, kita meminta PySpark untuk menentukan secara otomatis menggunakan opsi ("inferSchema", "true"):



#     
data.printSchema()






Sekarang mari beralih ke kursus utama kita - bekerja dengan beberapa tanda dari tipe data yang berbeda. Spark dapat melatih model pada data yang ditransformasikan, di mana kolom yang diprediksi adalah vektor dan kolom dengan fitur juga merupakan vektor, yang memperumit tugas ... Tapi kami tidak menyerah, dan untuk melatih model di PySpark kami akan menggunakan Pipeline, di mana kami akan meneruskan rencana tindakan tertentu (variabel tahapan):



  1. langkah label_stringIdx: kita mengubah kolom dari dataset nilai yang ingin kita prediksi menjadi string vektor Spark dan menamainya kembali menjadi label dengan parameter handleInvalid = 'keep', yang berarti bahwa kolom prediksi kita mendukung null
  2. step stringIndexer: mengonversi kolom string ke string kategorikal Spark
  3. encoder: ()
  4. assembler: Spark, , VectorAssembler(), ( ) (assemblerInputs) «features»
  5. gbt: PySpark ML GBTRegressor,


#value -      - 
stages = []
label_stringIdx = StringIndexer(inputCol = 'value', outputCol = 'label', handleInvalid = 'keep')
stages += [label_stringIdx]

#depend on categorical columns: country and types of emission
#   :    
categoricalColumns = ['country_or_area', 'category']
for categoricalCol in categoricalColumns:
    #        
    stringIndexer = StringIndexer(inputCol = categoricalCol,
                                  outputCol = categoricalCol + 'Index',
                                  handleInvalid = 'keep')
    encoder = OneHotEncoder(inputCol=stringIndexer.getOutputCol(),
                            outputCol=categoricalCol + "classVec")
    stages += [stringIndexer, encoder]

#   : 
numericCols = ['year']
assemblerInputs = [c + "classVec" for c in categoricalColumns] + numericCols
#    - - 
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]


Mari bagi kumpulan data kita menjadi sampel pelatihan dan pengujian dengan rasio favorit masing-masing 70% hingga 30%, dan mulai melatih model menggunakan pohon penguat regresi gradien (GBTRegressor), yang seharusnya memprediksi vektor label berdasarkan fitur yang sebelumnya digabungkan menjadi satu "fitur" vektor dengan batas iterable maxIter = 10:



#       (30% )
(trainingData, testData) = data.randomSplit([0.7, 0.3])

#  (   )
gbt = GBTRegressor(labelCol="label", featuresCol="features", maxIter=10)
stages += [gbt]

#   stages    
pipeline = Pipeline(stages=stages)


Dan sekarang kita hanya perlu mengirim komputer sebuah rencana tindakan dan set data pelatihan:



#  
model = pipeline.fit(trainingData)

#     
predictions = model.transform(testData)


Mari simpan model kita sehingga kita selalu bisa kembali menggunakannya tanpa pelatihan ulang:



# 
pipeline.write().overwrite().save('model/gbtregr_model')


Dan jika Anda memutuskan untuk mulai menggunakan model terlatih untuk prediksi lagi, cukup tulis:



#     
load_model = pipeline.read().load('model/gbtregr_model')




Jadi, kita telah melihat bagaimana sebuah alat untuk bekerja dengan data besar dalam bahasa Python, PySpark, bekerja dengan beberapa kolom fitur dari tipe data yang berbeda diimplementasikan.



Sekarang saatnya menerapkan ini ke model Anda ...



All Articles