Python Kafka: Veri Yazma ve Okuma
Merhabalar, veri ile uğraşan hemen hemen herkes Kafka’ya veri yazıp Kafka’dan veri okumadıysa bile Kafka’nın adını mutlaka duymuştur. Bu yazımızda Python Kafka ile Kafka’ya mesaj gönderip Kafka’dan mesaj okuyacağız. İlk önce elle yazdığımız basit bir metni, daha sonrada pandas dataframe’i mesaj olarak gönderip okuyacağız.
1.Giriş
Apache Kafka son dönemlerin en popüler kendi tabirleri ile “A distributed streaming platform” u. Ben Kafka’yı Türkçe olarak şöyle tanımlıyorum; “büyük veri prensiplerine uygun olarak çalışan (dağıtık, yatay ölçeklenebilir ve hataya karşı dayanıklı) bir veri akış platformu”. Kafka, veri merkezlerinde birbirleri arasında veri alış-verişi yapan sunucu/sistemler arasında mesajların geçici olarak depolandığı yayınla/abone ol (publish/subscribe) mantığı ile çalışan bir platform. Bir haber sitesini Kafka olarak düşünür isek yazarlar yazılarını burada yayınlıyor (publish) okuyucular ise abone oluyorlar (subscribe). Buradaki aboneliği hesap açarak siteye abone olmak şeklinde değil de bir yazarın günlük yazılarına gelip bakma şeklinde düşünmek lazım.
Uygulamada kullanılan ortam bilgileri:
- Windows 10 İşletim Sistemi (64 bit)
- Python 3.6.8
- Jupyter Notebook
- Apache Kafka
Kafka kurulumu için en kolay ve pratik çözüm Docker kullanmak. Docker ile Kafka kurulumları için bu yazımızdan yararlanabilirsiniz. Uygulamaya geçmeden önce Kafka terminolojisine biraz göz atalım.
2. Kafka Temel Kavramlar
Mesaj: Kafka’da işlem gören en küçük parçadır. Bunun anlayacağımız karşılığı bir csv dosyasındaki bir satır veya ilişkisel veri tabanı tablosundaki bir kayıt/satırdır.
Topic: Yayıncı ile tüketicinin buluştuğu konu adıdır. İlişkisel veri tabanlarındaki tablo adı olarak düşünebiliriz.
Broker: Kafka platformunu oluşturan her bir sunucuya verilen addır. Kodlarda bootstrap-server veya broker-list şeklinde karşımıza çıkabilir.
Producer: Kafka’ya mesaj gönderen sınıf/nesne/uygulama.
Consumer: Kafka’dan mesaj okuyan sınıf/nesne/uygulama.
Consumer group: Bir Kafka topiğinde bulunan mesajları müşterek olarak tüketen Consumer’lardır. Mesajlar gruptaki tüketiciler tarafından paylaşılır.
Partition: Performans ve hataya karşı dayanıklılık gibi sebeplerle bir topic birden fazla parçadan oluşabilir. Gönderilen mesajlar bu parçalara dağıtılır.
Offset: Her bir parçada bulunan mesajın parça içindeki sıra numarasıdır. Temel amacı, tüketicilerin mesajları mükerrer okumamasını sağlamaktır. Kafka, parça içinde mesajların geliş sırasını korumayı garanti ederken topic bazında mesajların geliş sırasını garanti edemez.
Bu temel kavramlara değindikten sonra şimdi uygulamaya geçebiliriz.
3. Kurulum
Kurulum için çok fazla hazırlığa gerek yok. Giriş kısmındaki ortam hazır ise bir Python paketi yüklemek yeterlidir.
python -m pip install kafka-python --user python -m pip install pandas --user
İkinci olarak Kafka üzerinde bir topic yaratmamız gerekiyor. Bunun için Kafka kurduğumuz yerde:
kafka-topics.sh --bootstrap-server 192.168.206.70:9092 --topic deneme --partitions 1 --replication-factor 1
komutu ile deneme adında bir topic oluşturabiliriz. Ip numarasının olduğu yeri siz kendi ortamınıza göre ayarlarsınız. Bu çoğu zaman localhost olur. Bazı kafka versiyonlarında bootstrap-server yerine zookeeper kullanmak gerekebilir.
4. Python ile Kafka’ya Mesaj Gönderme (KafkaProducer)
Uygulama esnasında kodların içine açıklama yazdığım için kod aralarında tekrar açıklama yapmayacağım. Anlaşılmayan noktalar varsa lütfen yorum kısmında belirtin, ilave açıklama yaparım.
# Import libs from kafka import KafkaProducer
# Create a producer object # Ip can be different in your case producer = KafkaProducer(bootstrap_servers=['192.168.206.70:9092'])
# Basit bir mesajın oluşturulması ve gönderilmesi # Asynchronous varsayılan # b string'i byte'a çevirmek için # deneme topic adı producer.send('deneme', b'python producer ile gonderildi') producer.flush()
Yukarıdaki mesajı kafka-console-consumer ile görebilirsiniz. Nasılı için bu yazıdan yardım alabilirsiniz.
Şimdi mesaj ile beraber anahtar (key) de gönderelim.
producer.send('deneme', key=b'key-1', value=b'mesaj-1') producer.flush()
5. Python ile Kafka’dan Mesaj Okumak(KafkaConsumer)
# Import libs from kafka import KafkaConsumer
# Create consumer object # Consume earliest messages, # If there is no new message in 10 secondsstop consuming. consumer = KafkaConsumer('deneme', # topic group_id='my-group', # consume earliest available messages, for latest 'latest' auto_offset_reset='earliest', # don't commit offsets enable_auto_commit=False, # stop iteration if no message after 10 secs consumer_timeout_ms=10000, # kafka servers and port bootstrap_servers=['192.168.206.70:9092'])
# List avaiable topics consumer.topics()
# subscribe a topic consumer.subscribe("deneme")
# With for loop print messages in deneme topic for message in consumer: # message value and key are raw bytes -- decode if necessary! # e.g., for unicode: `message.value.decode('utf-8')` print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition, message.offset, message.key, message.value.decode('utf-8')))
# Expected output deneme:0:39: key=None value=python producer ile gonderildi
6. Pandas Dataframe Satırlarını Kafka’ya Mesaj Olarak Göndermek
Yukarıda elimizle yazdığımız bir metni mesaj olarak Kafka’ya gönderdik. Ancak gerçek hayatta Python ile veri manipülasyonunu pandas paketi ile yaparız. Peki pandas dataframe satırlarını mesaj olarak gönderebilir miyiz? Çünkü ancak bu halde Python ile Kafka’ya birşeyler göndermek anlamlı olacak. Elbette gönderebiliriz. Gerekli hazırlıkları yaptıktan sonra neden olmasın. Aşağıda csv dosyasından okuyarak elde ettiğimiz bir dataframe’i her bir satır bir mesaj olacak şekilde Kafka’ya göndereceğiz.
# Read csv file and quick overview import pandas as pd simple_data = pd.read_csv("D:/Datasets/simple_data.csv") simple_data.head()
sirano | isim | yas | meslek | sehir | aylik_gelir | |
---|---|---|---|---|---|---|
0 | 1 | Cemal | 35 | Isci | Ankara | 3500 |
1 | 2 | Ceyda | 42 | Memur | Kayseri | 4200 |
2 | 3 | Timur | 30 | Müzisyen | Istanbul | 9000 |
3 | 4 | Burcu | 29 | Pazarlamaci | Ankara | 4200 |
4 | 5 | Yasemin | 23 | NaN | Bursa | 4800 |
# Null check simple_data.isnull().sum() # Output: sirano 0 isim 0 yas 0 meslek 2 sehir 0 aylik_gelir 0 dtype: int64
# Fill null values simple_data.fillna("Bilinmiyor", inplace=True)
# Concat columns into one with seperated , # They will be sent to Kafka as a message simple_data['kafka_message'] = simple_data['sirano'].astype(str) + "," + \ simple_data['isim'] + "," + simple_data['yas'].astype(str) + "," + \ simple_data['meslek'] + "," + simple_data['sehir'] + "," + \ simple_data['aylik_gelir'].astype(str)
# see what happend simple_data.head()
sirano | isim | yas | meslek | sehir | aylik_gelir | kafka_message | |
---|---|---|---|---|---|---|---|
0 | 1 | Cemal | 35 | Isci | Ankara | 3500 | 1,Cemal,35,Isci,Ankara,3500 |
1 | 2 | Ceyda | 42 | Memur | Kayseri | 4200 | 2,Ceyda,42,Memur,Kayseri,4200 |
2 | 3 | Timur | 30 | Müzisyen | Istanbul | 9000 | 3,Timur,30,Müzisyen,Istanbul,9000 |
3 | 4 | Burcu | 29 | Pazarlamaci | Ankara | 4200 | 4,Burcu,29,Pazarlamaci,Ankara,4200 |
4 | 5 | Yasemin | 23 | Bilinmiyor | Bursa | 4800 | 5,Yasemin,23,Bilinmiyor,Bursa,4800 |
Kafka’ya göndereceğimiz mesajı tek sütun( kafka_message) içinde topladık. Şimdi dataframe iterrows() metodu ile her bir satırı dolaşıp mesaj gönderelim. Burada her ne kadar sirano mesaj içinde olsa da ben Kafka’ya onu key olarak da göndereceğim.
for index, row in simple_data.iterrows(): producer.send('deneme', key=str(row[0]).encode(), value=row[-1].encode())
row[0] sirano sütununa karşılık gelirken row[-1] kafka_message sütununa karşılık geliyor. encode() metotları ise bildiğimiz string veri türünü byte türüne çevirmek için. Çünkü Kafka veriyi byte halinde saklıyor. Yazarken serialization, okurken de deserialization yapmamız gerekiyor. Bu durum sadece Python’a özgü değil, Java ile de bu işi yapmak zorundayız. Şimdi Consumer tarafından bir bakalım mesajlarımız gitmiş mi?
for message in consumer: # message value and key are raw bytes -- decode if necessary! # e.g., for unicode: `message.value.decode('utf-8')` print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition, message.offset, message.key.decode('utf-8'), message.value.decode('utf-8')))
deneme:0:40: key=1 value=1,Cemal,35,Isci,Ankara,3500 deneme:0:41: key=2 value=2,Ceyda,42,Memur,Kayseri,4200 deneme:0:42: key=3 value=3,Timur,30,Müzisyen,Istanbul,9000 deneme:0:43: key=4 value=4,Burcu,29,Pazarlamaci,Ankara,4200 deneme:0:44: key=5 value=5,Yasemin,23,Bilinmiyor,Bursa,4800 deneme:0:45: key=6 value=6,Ali,33,Memur,Ankara,4250 deneme:0:46: key=7 value=7,Dilek,29,Pazarlamaci,Istanbul,7300 deneme:0:47: key=8 value=8,Murat,31,Müzisyen,Istanbul,12000 deneme:0:48: key=9 value=9,Ahmet,33,Doktor,Ankara,18000 deneme:0:49: key=10 value=10,Muhittin,46,Berber,Istanbul,12000 deneme:0:50: key=11 value=11,Hicaziye,47,Tuhafiyeci,Ankara,4800 deneme:0:51: key=12 value=12,Harun,43,Tornacı,Ankara,4200 deneme:0:52: key=13 value=13,Hakkı,33,Memur,Çorum,3750 deneme:0:53: key=14 value=14,Gülizar,37,Doktor,İzmir,14250 deneme:0:54: key=15 value=15,Şehmuz,41,Bilinmiyor,Ankara,8700 deneme:0:55: key=16 value=16,Gençay,46,Berber,Ankara,8800 deneme:0:56: key=16 value=16,Gençay,46,Berber,Ankara,8800
Harika! Gayet başarılı bir şekilde her satır mesaj olarak gitmiş. Ben burada yazmayacağım, ancak isterseniz mesajları tekrar okuyup pandas dataframe elde edebilirsiniz. Daha fazla seçenek ve bilgi için kullandığımız paketin sayfasını buradan inceleyebilirsiniz.
Bu yazımızda Python ile Kafka’ya hem basit bir metni hem de pandas dataframe’i mesaj gönderdik ve gönderdiğimiz mesajları okuduk. Başka bir yazıda görüşmek dileğiyle hoşçakalın…