Spark ML Custom Transformer Yazma
Bu yazımızda Spark ML Custom Transformer yazacağız. Spark’ın makine öğrenimi kütüphanesi (MLlib), veri bilimcilere ve makine öğrenimi mühendislerine zengin bir araç seti sunuyor. Model geliştirme çalışmalarında bilhassa özellik mühendisliği (feature engineering) aşamasında çok yoğun veri ön hazırlık süreçleri oluyor. Bu çalışmalarda PySpark’ın pyspark.ml.feature
kütüphanesi, makine öğrenimi modelleriniz için ham verileri anlamlı ve kullanışlı özelliklere (feature) dönüştürmenize yardımcı olan bir dizi dönüştürücü (transformer) sınıfı içeriyor. Bu dönüştürücüler, verilerinizi temizleme, dönüştürme, seçme ve model eğitimine hazır hale getirme gibi önemli adımları gerçekleştirmenizi sağlıyor ve Spark ML Pipeline içinde rahatlıkla kullanabiliyorsunuz. Bazı temel dönüştürücüler aşağıdadır.
Temel Özellikler ve Dönüştürücüler
Kategorik Veri Dönüştürme:
StringIndexer
: Kategorik değişkenleri sayısal indekslere dönüştürür.OneHotEncoder
: Kategorik değişkenleri one-hot kodlamaya dönüştürür.IndexToString
: Sayısal indeksleri orijinal kategorik değerlere geri dönüştürür.
Metin Verisi Dönüştürme:
Tokenizer
: Metinleri kelimelere ayırır.StopWordsRemover
: Metinlerden anlamsız kelimeleri (stop words) kaldırır.CountVectorizer
: Metinleri kelime sayım vektörlerine dönüştürür.TFIDF
: Metinleri TF-IDF (Term Frequency-Inverse Document Frequency) vektörlerine dönüştürür.Word2Vec
: Kelimeleri sürekli vektör uzayına gömer (word embedding).
Sayısal Veri Dönüştürme:
VectorAssembler
: Farklı sütunları bir araya getirerek bir özellik vektörü oluşturur.StandardScaler
: Veriyi standartlaştırır (ortalaması 0, standart sapması 1 olacak şekilde).MinMaxScaler
: Veriyi belirli bir aralığa ölçeklendirir.Bucketizer
: Sürekli değişkenleri kategorik aralıklara (bucket) dönüştürür.PCA
: Temel bileşen analizi (PCA) uygular.
Özellik Seçimi:
VectorSlicer
: Özellik vektöründen belirli indekslere sahip özellikleri seçer.ChiSqSelector
: Ki-kare testi kullanarak kategorik özelliklerin önemini ölçer ve en önemlilerini seçer.
Ancak spark ml kütüphanesi dışında kendiniz özgün dönüşümler yaptığınızda bunu Pipeline’a ekleyemiyorsunuz. Dolayısısyla Pipeline model kullanarak ham veriyi doğrudan tahminleme için kullanmak mümkün olamıyor. Çünkü model eğitiminde ham veri hangi aşamalardan geçmiş ise tahmin için de aynı aşamalardan geçerek modele girmelidir. Bu sorunu çözmek için ml.feature kütüphanesi haricinde yaptığınız dönüşümleri Transformer türünden bir sınıf ile yazarsanız bunları da Pipeline’a ekleme şansınız olur. Bu size ham veriyi model geliştirme sürecindeki dönüşümleri otomatik olarak yaparak tahminleme yapma imkanı sağlar. Gerçekten büyük rahatlık.
Şimdi örnek olarak bir Transformer yazacağız. Bu Transformer date türünden bir sunu girdi olarak alacak ve ondan yıl, ay, gün ve tatil günü olup olmadığına dair 4 sütun ekleyecek.
Data
data = [('2013-01-01',1, 1, 13),('2013-01-02', 1, 1, 11), ('2013-01-03', 1, 1, 14), ('2013-01-04', 1, 1, 10), ('2013-01-05', 1, 1, 10) ] cols = ['date', 'store', 'item', 'label'] df = spark.createDataFrame(data, cols).withColumn('date', F.to_date('date', 'yyy-MM-dd'))
Gerekli kütüphaneleri dahil edelim
import holidays from pyspark.sql.types import * from pyspark.ml import Transformer from pyspark.sql.functions import lit, udf from pyspark.ml.param.shared import HasInputCols, HasOutputCol
Custom Transformer
class AddDateFeaturesTransformer(Transformer, HasInputCols, HasOutputCol): def __init__(self, inputCol=None, outputCols=None, country_code=None): super(AddDateFeaturesTransformer, self).__init__() self.inputCol = inputCol self.outputCols = outputCols self.country_code = country_code def is_holiday(self, date_str: date, country_code: str='TR'): date_str = str(date_str) country_holidays = holidays.CountryHoliday(country_code) date_obj = date.fromisoformat(date_str) if date_obj in country_holidays: return 1 else: return 0 def _transform(self, df): is_holiday = udf(self.is_holiday, IntegerType()) df = df.withColumn(self.outputCols[0], F.year(self.inputCol)) \ .withColumn(self.outputCols[1], F.month(self.inputCol)) \ .withColumn(self.outputCols[2], F.dayofweek(self.inputCol)) \ .withColumn(self.outputCols[3], is_holiday(self.inputCol)) return df
Şimdi bu sınıftan bir nesne yaratalım.
sct = AddDateFeaturesTransformer(inputCol='date', outputCols=['y','m','d','is_holiday'], country_code='TR')
Bu transformer ile dataframe üzerinde dönüşüm işlemleri yapalım va sonuçları yazdıralım.
sct.transform(df).show(5)
- Çıktı
+----------+-----+----+-----+----+---+---+----------+ | date|store|item|label| y| m| d|is_holiday| +----------+-----+----+-----+----+---+---+----------+ |2013-01-01| 1| 1| 13|2013| 1| 3| 1| |2013-01-02| 1| 1| 11|2013| 1| 4| 0| |2013-01-03| 1| 1| 14|2013| 1| 5| 0| |2013-01-04| 1| 1| 13|2013| 1| 6| 0| |2013-01-05| 1| 1| 10|2013| 1| 7| 0| +----------+-----+----+-----+----+---+---+----------+ only showing top 5 rows
Gördüğümüz gibi özgün dönüştürücümüz (AddDateFeaturesTransformer) başarıyla çalıştı.
Kodların hepsinin bulunduğu notebook burada.
Spark yeteneklerinizi geliştirmek için VBO Data Engineering Bootcamp‘e katılmayı düşünebilirsiniz.
Başka bir yazıda görüşene dek hoşçakalın.
Kaynaklar
Kapak Görseli: Photo by The Ride Academy on Unsplash