Python Kafka: Veri Yazma ve Okuma

Merhabalar, veri ile uğraşan hemen hemen herkes Kafka’ya veri yazıp Kafka’dan veri okumadıysa bile Kafka’nın adını mutlaka duymuştur. Bu yazımızda Python Kafka ile Kafka’ya mesaj gönderip Kafka’dan mesaj okuyacağız. İlk önce elle yazdığımız basit bir metni, daha sonrada pandas dataframe’i mesaj olarak gönderip okuyacağız.

1.Giriş

Apache Kafka son dönemlerin en popüler kendi tabirleri ile “A distributed streaming platform” u. Ben Kafka’yı Türkçe olarak şöyle tanımlıyorum; “büyük veri prensiplerine uygun olarak çalışan (dağıtık, yatay ölçeklenebilir ve hataya karşı dayanıklı) bir veri akış platformu”. Kafka, veri merkezlerinde birbirleri arasında veri alış-verişi yapan sunucu/sistemler arasında mesajların geçici olarak depolandığı yayınla/abone ol (publish/subscribe) mantığı ile çalışan bir platform. Bir haber sitesini Kafka olarak düşünür isek yazarlar yazılarını burada yayınlıyor (publish) okuyucular ise abone oluyorlar (subscribe). Buradaki aboneliği hesap açarak siteye abone olmak şeklinde değil de bir yazarın günlük yazılarına gelip bakma şeklinde düşünmek lazım.

Uygulamada kullanılan ortam bilgileri:

  • Windows 10 İşletim Sistemi (64 bit)
  • Python 3.6.8
  • Jupyter Notebook
  • Apache Kafka

Kafka kurulumu için en kolay ve pratik çözüm Docker kullanmak. Docker ile Kafka kurulumları için bu yazımızdan yararlanabilirsiniz. Uygulamaya geçmeden önce Kafka terminolojisine biraz göz atalım.

2. Kafka Temel Kavramlar

Mesaj: Kafka’da işlem gören en küçük parçadır. Bunun anlayacağımız karşılığı bir csv dosyasındaki bir satır veya ilişkisel veri tabanı tablosundaki bir kayıt/satırdır.

Topic: Yayıncı ile tüketicinin buluştuğu konu adıdır. İlişkisel veri tabanlarındaki tablo adı olarak düşünebiliriz.

Broker: Kafka platformunu oluşturan her bir sunucuya verilen addır. Kodlarda bootstrap-server veya broker-list şeklinde karşımıza çıkabilir.

Producer: Kafka’ya mesaj gönderen sınıf/nesne/uygulama.

Consumer: Kafka’dan mesaj okuyan sınıf/nesne/uygulama.

Consumer group: Bir Kafka topiğinde bulunan mesajları müşterek olarak tüketen Consumer’lardır. Mesajlar gruptaki tüketiciler tarafından paylaşılır.

Partition: Performans ve hataya karşı dayanıklılık gibi sebeplerle bir topic birden fazla parçadan oluşabilir. Gönderilen mesajlar bu parçalara dağıtılır.

Offset: Her bir parçada bulunan mesajın parça içindeki sıra numarasıdır. Temel amacı, tüketicilerin mesajları mükerrer okumamasını sağlamaktır. Kafka, parça içinde mesajların geliş sırasını korumayı garanti ederken topic bazında mesajların geliş sırasını garanti edemez.

Bu temel kavramlara değindikten sonra şimdi uygulamaya geçebiliriz.

3. Kurulum

Kurulum için çok fazla hazırlığa gerek yok. Giriş kısmındaki ortam hazır ise bir Python paketi yüklemek yeterlidir.

python -m pip install kafka-python --user
python -m pip install pandas --user

İkinci olarak Kafka üzerinde bir topic yaratmamız gerekiyor. Bunun için Kafka kurduğumuz yerde:

 kafka-topics.sh --bootstrap-server 192.168.206.70:9092 --topic deneme --partitions 1 --replication-factor 1

komutu ile deneme adında bir topic oluşturabiliriz. Ip numarasının olduğu yeri siz kendi ortamınıza göre ayarlarsınız. Bu çoğu zaman localhost olur. Bazı kafka versiyonlarında bootstrap-server yerine zookeeper kullanmak gerekebilir.

4. Python ile Kafka’ya Mesaj Gönderme (KafkaProducer)

Uygulama esnasında kodların içine açıklama yazdığım için kod aralarında tekrar açıklama yapmayacağım. Anlaşılmayan noktalar varsa lütfen yorum kısmında belirtin, ilave açıklama yaparım.

# Import libs
from kafka import KafkaProducer
# Create a producer object
# Ip can be different in your case
producer = KafkaProducer(bootstrap_servers=['192.168.206.70:9092'])
# Basit bir mesajın oluşturulması ve gönderilmesi
# Asynchronous varsayılan
# b string'i byte'a çevirmek için
# deneme topic adı
producer.send('deneme', b'python producer ile gonderildi')
producer.flush()

Yukarıdaki mesajı kafka-console-consumer ile görebilirsiniz. Nasılı için bu yazıdan yardım alabilirsiniz.

Şimdi mesaj ile beraber anahtar (key) de gönderelim.

producer.send('deneme', key=b'key-1', value=b'mesaj-1')
producer.flush()

5. Python ile Kafka’dan Mesaj Okumak(KafkaConsumer)

# Import libs
from kafka import KafkaConsumer
# Create consumer object
# Consume earliest messages, 
# If there is no new message in 10 secondsstop consuming.
consumer = KafkaConsumer('deneme', # topic
                        group_id='my-group',
                         # consume earliest available messages, for latest 'latest'
                         auto_offset_reset='earliest', 
                         # don't commit offsets
                         enable_auto_commit=False,
                         # stop iteration if no message after 10 secs
                         consumer_timeout_ms=10000,
                         # kafka servers and port
                         bootstrap_servers=['192.168.206.70:9092'])
# List avaiable topics
consumer.topics()
# subscribe a topic
consumer.subscribe("deneme")
# With for loop print messages in deneme topic
for message in consumer:
    # message value and key are raw bytes -- decode if necessary!
    # e.g., for unicode: `message.value.decode('utf-8')`
    print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
                                          message.offset, message.key,
                                          message.value.decode('utf-8')))
# Expected output
deneme:0:39: key=None value=python producer ile gonderildi

6. Pandas Dataframe Satırlarını Kafka’ya Mesaj Olarak Göndermek

Yukarıda elimizle yazdığımız bir metni mesaj olarak Kafka’ya gönderdik. Ancak gerçek hayatta Python ile veri manipülasyonunu pandas paketi ile yaparız. Peki pandas dataframe satırlarını mesaj olarak gönderebilir miyiz? Çünkü ancak bu halde Python ile Kafka’ya birşeyler göndermek anlamlı olacak. Elbette gönderebiliriz. Gerekli hazırlıkları yaptıktan sonra neden olmasın. Aşağıda csv dosyasından okuyarak elde ettiğimiz bir dataframe’i her bir satır bir mesaj olacak şekilde Kafka’ya göndereceğiz.

# Read csv file and quick overview
import pandas as pd
simple_data = pd.read_csv("D:/Datasets/simple_data.csv")
simple_data.head()
 siranoisimyasmesleksehiraylik_gelir
01Cemal35IsciAnkara3500
12Ceyda42MemurKayseri4200
23Timur30MüzisyenIstanbul9000
34Burcu29PazarlamaciAnkara4200
45Yasemin23NaNBursa4800
# Null check
simple_data.isnull().sum()

# Output:
sirano         0
isim           0
yas            0
meslek         2
sehir          0
aylik_gelir    0
dtype: int64
# Fill null values
simple_data.fillna("Bilinmiyor", inplace=True)
# Concat columns into one with seperated ,
# They will be sent to Kafka as a message
simple_data['kafka_message'] = simple_data['sirano'].astype(str) + "," + \
simple_data['isim'] + "," + simple_data['yas'].astype(str) + "," + \
simple_data['meslek'] + "," + simple_data['sehir'] + "," + \
simple_data['aylik_gelir'].astype(str)
# see what happend
simple_data.head()
 siranoisimyasmesleksehiraylik_gelirkafka_message
01Cemal35IsciAnkara35001,Cemal,35,Isci,Ankara,3500
12Ceyda42MemurKayseri42002,Ceyda,42,Memur,Kayseri,4200
23Timur30MüzisyenIstanbul90003,Timur,30,Müzisyen,Istanbul,9000
34Burcu29PazarlamaciAnkara42004,Burcu,29,Pazarlamaci,Ankara,4200
45Yasemin23BilinmiyorBursa48005,Yasemin,23,Bilinmiyor,Bursa,4800
Tablo-1: simple_data.csv

Kafka’ya göndereceğimiz mesajı tek sütun( kafka_message) içinde topladık. Şimdi dataframe iterrows() metodu ile her bir satırı dolaşıp mesaj gönderelim. Burada her ne kadar sirano mesaj içinde olsa da ben Kafka’ya onu key olarak da göndereceğim.

  for index, row in simple_data.iterrows():
    producer.send('deneme', key=str(row[0]).encode(), value=row[-1].encode())

row[0] sirano sütununa karşılık gelirken row[-1] kafka_message sütununa karşılık geliyor. encode() metotları ise bildiğimiz string veri türünü byte türüne çevirmek için. Çünkü Kafka veriyi byte halinde saklıyor. Yazarken serialization, okurken de deserialization yapmamız gerekiyor. Bu durum sadece Python’a özgü değil, Java ile de bu işi yapmak zorundayız. Şimdi Consumer tarafından bir bakalım mesajlarımız gitmiş mi?

for message in consumer:
    # message value and key are raw bytes -- decode if necessary!
    # e.g., for unicode: `message.value.decode('utf-8')`
    print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
                                          message.offset, 
                                          message.key.decode('utf-8'),
                                          message.value.decode('utf-8')))
deneme:0:40: key=1 value=1,Cemal,35,Isci,Ankara,3500
deneme:0:41: key=2 value=2,Ceyda,42,Memur,Kayseri,4200
deneme:0:42: key=3 value=3,Timur,30,Müzisyen,Istanbul,9000
deneme:0:43: key=4 value=4,Burcu,29,Pazarlamaci,Ankara,4200
deneme:0:44: key=5 value=5,Yasemin,23,Bilinmiyor,Bursa,4800
deneme:0:45: key=6 value=6,Ali,33,Memur,Ankara,4250
deneme:0:46: key=7 value=7,Dilek,29,Pazarlamaci,Istanbul,7300
deneme:0:47: key=8 value=8,Murat,31,Müzisyen,Istanbul,12000
deneme:0:48: key=9 value=9,Ahmet,33,Doktor,Ankara,18000
deneme:0:49: key=10 value=10,Muhittin,46,Berber,Istanbul,12000
deneme:0:50: key=11 value=11,Hicaziye,47,Tuhafiyeci,Ankara,4800
deneme:0:51: key=12 value=12,Harun,43,Tornacı,Ankara,4200
deneme:0:52: key=13 value=13,Hakkı,33,Memur,Çorum,3750
deneme:0:53: key=14 value=14,Gülizar,37,Doktor,İzmir,14250
deneme:0:54: key=15 value=15,Şehmuz,41,Bilinmiyor,Ankara,8700
deneme:0:55: key=16 value=16,Gençay,46,Berber,Ankara,8800
deneme:0:56: key=16 value=16,Gençay,46,Berber,Ankara,8800

Harika! Gayet başarılı bir şekilde her satır mesaj olarak gitmiş. Ben burada yazmayacağım, ancak isterseniz mesajları tekrar okuyup pandas dataframe elde edebilirsiniz. Daha fazla seçenek ve bilgi için kullandığımız paketin sayfasını buradan inceleyebilirsiniz.

Bu yazımızda Python ile Kafka’ya hem basit bir metni hem de pandas dataframe’i mesaj gönderdik ve gönderdiğimiz mesajları okuduk. Başka bir yazıda görüşmek dileğiyle hoşçakalın…

Yazar Hakkında
Toplam 180 yazı
Erkan ŞİRİN
Erkan ŞİRİN
10 yılı aşkın süredir yurtiçi ve yurtdışında sektörde büyük veri mühendisliği, platform yönetimi ve makine öğrenmesi ile ilgili çalışmalar yürütmekte ve aynı zamanda birçok kurum ve şirkete danışmanlık ve eğitimler vermektedir. Çalışma alanları: Data ve MLOps platformları, gerçek zamanlı veri işleme, değişen veriyi yakalama (CDC) ve Lakehouse.
Yorumlar (Yorum yapılmamış)

Bir yanıt yazın

E-posta adresiniz yayınlanmayacak. Gerekli alanlar * ile işaretlenmişlerdir

×

Bir Şeyler Ara