Python Pandas Dataframe’i Elasticsearch’e Yazmak
Merhabalar bu yazımızda Python Pandas Dataframe verisini Python kullanarak Elasticsearch’e yazacağız. Şükür ki Python çok gelişmiş ve zengin bir dil. Birileri biz kullanalım diye kütüphane yazmış, biz de bunu kullanarak dataframe’i Elasticsearch’e yazacağız. Bu yazıyı takip etmek için bazı ön koşullarımız olacak.
- Elasticsearch 7.9.0 kurulu ve 9200 portundan erişilebilir durumdadır. Örneğin, benim elasticsearch sunucum aşağıdaki adreste şuan çalışıyor. Yüklü değil ise docker ile hızlı bir şekilde kullanıma hazır hale getirebilirsiniz. Browser’a localhost:9200 girdiğimizde:
- Python3 yüklü ve çalışır durumdadır.
- Kullanacağımız modülün adı elasticsearch. python -m pip install elasticsearch komutu ile bu modülü yüklemiş durumdasınız.
- Kullanacağımız veri setine buradan ulaşabilirsiniz.
- Jupyter kullanabiliyorsunuz.
O halde başlayalım.
Veri seti, klasik perakende satış verisi.
Kütüphanelerimizi indirelim:
# Kütüphaneler import pandas as pd from elasticsearch import Elasticsearch, helpers
Elasticsearh sınıfından nesne yaratalım. Biliyoruz ki bu yazıda ağırlıklı olarak elasticsearch modülünü kullanacağız. Sizin elasticsearch sunucunuz farklı olabilir. Genelde: localhost:9200
# Elasticsearch sunucusuna bağlanan Elasticsearch sınıfından bir nesne yaratalım es = Elasticsearch("localhost:9200")
Veriyi okuyup pandas dataframe olarak tutalım:
df= pd.read_csv("https://github.com/erkansirin78/datasets/raw/master/spark_book_data/retail-data/all/online-retail-dataset.csv") df.head()
InvoiceNo | StockCode | Description | Quantity | InvoiceDate | UnitPrice | CustomerID | Country | |
---|---|---|---|---|---|---|---|---|
0 | 536365 | 85123A | WHITE HANGING HEART T-LIGHT HOLDER | 6 | 12/1/2010 8:26 | 2.55 | 17850.0 | United Kingdom |
1 | 536365 | 71053 | WHITE METAL LANTERN | 6 | 12/1/2010 8:26 | 3.39 | 17850.0 | United Kingdom |
2 | 536365 | 84406B | CREAM CUPID HEARTS COAT HANGER | 8 | 12/1/2010 8:26 | 2.75 | 17850.0 | United Kingdom |
3 | 536365 | 84029G | KNITTED UNION FLAG HOT WATER BOTTLE | 6 | 12/1/2010 8:26 | 3.39 | 17850.0 | United Kingdom |
4 | 536365 | 84029E | RED WOOLLY HOTTIE WHITE HEART. | 6 | 12/1/2010 8:26 | 3.39 | 17850.0 | United Kingdom |
Null değerleri düşürelim. Yazarken hata alabiliyor.
df.dropna(inplace=True)
Verimize uygun bir mapping (şema) oluşturalım:
online_retail_index = { "settings": { "index": { "analysis": { "analyzer": { "custom_analyzer": { "type":"custom", "tokenizer":"standard", "filter":[ "lowercase", "custom_edge_ngram","asciifolding" ] } }, "filter": { "custom_edge_ngram": { "type": "edge_ngram", "min_gram":2, "max_gram": 10 } } } } }, "mappings": { "properties": { "InvoiceNo": { "type": "keyword" }, "StockCode": { "type": "keyword" }, "Description": { "type": "text" }, "Quantity": {"type": "integer"}, "InvoiceDate": { "type": "date", "format": "MM/dd/yyyy hh:ss" }, "UnitPrice": {"type": "float"}, "CustomerID": {"type": "keyword"}, "Country": {"type": "keyword"} } } }
Dataframe’i okuyup Elasticsearch’e yazmaya hazır hale getiren bir fonksiyon yazalım. Burada yazma işi olmuyor bulk api ile yazmak için veri yapısı değişiyor.
def dataframe_to_es(df, es_index): for df_idx, line in df.iterrows(): yield { "_index": es_index, "_id":df_idx, "_source" : { "InvoiceNo": line[0], "StockCode": line[1], "Description": line[2], "Quantity": line[3], "InvoiceDate": line[4], "UnitPrice": line[5], "CustomerID": line[6], "Country": line[7] } }
Şimdi fonksiyonu kullanalım. Öncesinde eğer varsa indeksi silelim. Çünkü indeks veri aktarılırken otomatik yaratılacaktır. Bunu biraz da ilişkisel veri tabanındaki DROP TABLE IF EXISTS sql cümleciğine benzetebiliriz. Daha sonra fonksiyonu kullanarak yazma işlemini gerekleştirelim.
# Index varsa sil
try:
es.indices.delete("online_retail_python")
print("online_retail_python index deleted.")
except:
print("No index")
# Indeksi yukarıdaki mappingse göre oluştur.
es.indices.create("online_retail_python", body=online_retail_index)
{'acknowledged': True,
'shards_acknowledged': True,
'index': 'online_retail_python'}
# Bulk api ile indeksle helpers.bulk(es, dataframe_to_es(df, "online_retail_python"), raise_on_error=False)
Çıktı:
(406829, [])
şeklinde olacaktır.
Basit bir arama yapalım:
keyword = "Coffee" res = es.search(index='online_retail_python', body={ "query": { "bool": { "should": [ { "match": { "Description": keyword } } ] } } })
res sözlüğündeki sonuçları görelim.
res['hits']['hits'][:4]
Çıktısı:
[{'_index': 'online_retail_python', '_type': '_doc', '_id': '9103', '_score': 5.77428, '_source': {'InvoiceNo': '537192', 'StockCode': '20748', 'Description': 'KENSINGTON COFFEE SET', 'Quantity': 1, 'InvoiceDate': '12/5/2010 13:42', 'UnitPrice': 12.75, 'CustomerID': 16402.0, 'Country': 'United Kingdom'}}, {'_index': 'online_retail_python', '_type': '_doc', '_id': '14891', '_score': 5.77428, '_source': {'InvoiceNo': '537624', 'StockCode': '20748', 'Description': 'KENSINGTON COFFEE SET', 'Quantity': 1, 'InvoiceDate': '12/7/2010 14:41', 'UnitPrice': 12.75, 'CustomerID': 12748.0, 'Country': 'United Kingdom'}}, {'_index': 'online_retail_python', '_type': '_doc', '_id': '22385', '_score': 5.77428, '_source': {'InvoiceNo': '538167', 'StockCode': '20748', 'Description': 'KENSINGTON COFFEE SET', 'Quantity': 1, 'InvoiceDate': '12/9/2010 18:58', 'UnitPrice': 12.75, 'CustomerID': 14713.0, 'Country': 'United Kingdom'}}, {'_index': 'online_retail_python', '_type': '_doc', '_id': '17532', '_score': 5.77428, '_source': {'InvoiceNo': '537765', 'StockCode': '20748', 'Description': 'KENSINGTON COFFEE SET', 'Quantity': 1, 'InvoiceDate': '12/8/2010 12:08', 'UnitPrice': 12.75, 'CustomerID': 14606.0, 'Country': 'United Kingdom'}}]
İşte bu kadar. Hayırlı olsun. Elasticsearch’ün JSON dünyasında kaybolmadan python yeteneklerinizi kullanarak bir dataframe’i Elasticsearch’e aktardınız.
Başka bir yazıda görüşmek dileğiyle, mutlu analizler…