Apicurio Schema Registry ve Kafka ile Veri Akışı Yönetimi
Büyük veri sistemlerinde veri güvenliğini sağlamak ve veri formatını belirli bir yapıda tutmak, veri akışını yönetmede oldukça önemli bir rol oynar. Apicurio Schema Registry, veri yapısının standartlaşmasını sağlayarak Kafka gibi gerçek zamanlı veri hatlarında ve akış uygulamalarında veri doğruluğunu güvence altına alır. Kafka ise yüksek hacimli veri işlemlerini hızlıca gerçekleştiren bir mesajlaşma altyapısı sunar.
Bu yazıda, Apicurio Registry ile Kafka’nın birlikte nasıl çalıştığını, veri akışını nasıl sağladığımızı ve PostgreSQL gibi bir veritabanında bu verileri nasıl sakladığımızı uygulamalı bir şekilde ele alacağım. Bir Kafka veri akışı kurarak ve Apicurio Schema Registry ile veri şemalarını yöneterek büyük ölçekli veri işlemleri için güvenli ve düzenli bir yapı sağlayacağız. PostgreSQL kullanarak veri tabanı düzenlemesi yapacak, Docker ile ortam kurulumunu tamamlayacağız ve bir müşteri verisi akışı senaryosu oluşturacağız.
Gereksinimler
- Docker kurulu bir bilgisayar ve internet
Proje Ortamının Kurulması
İlk olarak, sanal bir Python ortamı oluşturarak gerekli bağımlılıkları yükleyip kurulum adımlarına başlıyorum. Bu ortam, bağımlılık yönetimini kolaylaştırarak projenin sorunsuz çalışmasını sağlayacak.
1. Sanal Ortam Oluşturulması ve Gereksinimlerin Yüklenmesi
Öncelikle kendimize yeni bir proje dizini ve ortam oluşturalım. Ardından proje için gerekli olan kütüphaneleri yükleyelim.
python3 -m venv schema_project
source schema_project/bin/activate
requirements.txt:
confluent_kafka psycopg2-binary requests avro-python3 pandas sqlmodel
pip install -r requirements.txt
2. Docker Ayarları ve Docker-Compose Yapılandırması
Docker kullanarak PostgreSQL, Kafka ve Apicurio Registry gibi hizmetlerin hepsini tek bir ortamda başlatıyoruz. Bu yapılandırma, farklı hizmetlerin uyum içinde çalışmasını sağlıyor.
Docker başlatalım:
sudo systemctl start docker
Kafka Dockerfile, start-kafka.sh, server.properties
Dockerfile:
Dockerfile
, gerekli bağımlılıkları kurarak ve Kafka’yı indirip yapılandırarak Kafka sunucusunu Docker üzerinde çalışmaya hazır hale getirir.
FROM openjdk:17.0.2-jdk-buster RUN apt update RUN apt install -y curl RUN apt install -y python3 RUN apt install -y python3-pip RUN apt install -y git RUN apt install -y libpq-dev ENV KAFKA_VERSION 3.8.0 ENV SCALA_VERSION 2.12 RUN mkdir /tmp/kafka && \ curl "https://downloads.apache.org/kafka/${KAFKA_VERSION}/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz" \ -o /tmp/kafka/kafka.tgz && \ mkdir /kafka && cd /kafka && \ tar -xvzf /tmp/kafka/kafka.tgz --strip 1 RUN mkdir -p /data/kafka COPY start-kafka.sh /usr/bin RUN chmod +x /usr/bin/start-kafka.sh CMD ["start-kafka.sh"]
start-kafka.sh:
start-kafka.sh
Kafka’nın belirtilen cluster-id
ile başlatılmasını ve ardından server.properties
ayarlarıyla Kafka sunucusunun çalıştırılmasını sağlar.
#!/bin/bash /kafka/bin/kafka-storage.sh format --config /kafka/config/server.properties --cluster-id 'EP6hyiddQNW5FPrAvR9kWw' --ignore-formatted /kafka/bin/kafka-server-start.sh /kafka/config/server.properties
server.properties: Kafka’nın temel yapılandırma dosyasıdır.
############################# Server Basics ############################# process.roles=broker,controller node.id=1 controller.quorum.voters=1@kafka:9093 ############################# Socket Server Settings ############################# listeners=PLAINTEXT://:9091,CONTROLLER://:9093,EXTERNAL://:9092 inter.broker.listener.name=PLAINTEXT advertised.listeners=PLAINTEXT://kafka:9091,EXTERNAL://localhost:9092 controller.listener.names=CONTROLLER listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 ############################# Log Basics ############################# log.dirs=/data/kafka num.partitions=3 num.recovery.threads.per.data.dir=1 ############################# Internal Topic Settings ############################# offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 ############################# Log Retention Policy ############################# log.retention.hours=168 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 ################################ Custom Propertied Added by Erkan ######################### controlled.shutdown.enable=true delete.topic.enable=true
Dosyayı oluşturmak için komut:
touch docker-compose.yaml
Docker Compose oluşturulması
networks: my_network: driver: bridge services: kafka: container_name: kafka image: kafka:3.8.0 build: context: ./kafka ports: - "9092:9092" networks: - my_network volumes: - ./kafka/config/server.properties:/kafka/config/server.properties - ./data/kafka/:/data/kafka/ postgresql: image: 'postgres:15-bullseye' environment: POSTGRES_USER: myuser POSTGRES_PASSWORD: mypassword POSTGRES_DB: mydatabase ports: - '5432:5432' networks: - my_network apicurio-registry-kafkasql: image: apicurio/apicurio-registry-kafkasql:2.6.5.Final environment: KAFKA_BOOTSTRAP_SERVERS: kafka:9091 REGISTRY_FEATURES_GROUPS_ENABLED: "true" ports: - "8080:8080" depends_on: - kafka networks: - my_network apicurio-registry-ui: image: apicurio/apicurio-studio-ui:0.2.62.Final-announce ports: - "8081:8081" depends_on: - apicurio-registry-kafkasql networks: - my_network
docker-compose dosyasındaki her servisin işlevi ve yapılandırmasından aşağıda kısaca açıklamak istiyorum.
Kurulacak Servisler ve Temel İşlevleri
- Kafka:
- Kafka, Apicurio Schema Registry ile veri ve şema yönetimi için bir mesajlaşma platformu olarak çalışır.
- 9092 portu üzerinden dış dünyaya açılır ve my_network isimli özel bir ağda çalışır.
- server.properties dosyasındaki konfigürasyonlar, Kafka’nın yapılandırmasını özelleştirir. Ayrıca, verileri kalıcı hale getirmek için bir volumes ayarına sahiptir.
- PostgreSQL:
- PostgreSQL, verilerin depolanmasını sağlamak için bir ilişkisel veritabanı olarak çalışır.
- 5432 portundan erişilebilir ve varsayılan kullanıcı, parola ve veritabanı bilgileri environment bölümünde belirtilmiştir.
- Apicurio Registry gibi uygulamalara, yapılandırılmış veritabanı üzerinden veri yönetimi sağlar.
- Apicurio Registry (apicurio-registry-kafkasql):
- Apicurio Schema Registry’nin ana bileşenidir ve şemaları Kafka ile yönetmek için kullanılır.
KAFKA_BOOTSTRAP_SERVERS
ortam değişkeni ile Kafka sunucusuna bağlanır.REGISTRY_FEATURES_GROUPS_ENABLED
ayarı ise şema gruplarını etkinleştirir.- 8080 portunda çalışır ve Kafka servisine bağımlıdır, bu yüzden
depends_on
ifadesi ile Kafka servisi başlatıldıktan sonra aktif hale gelir.
- Apicurio Registry UI (apicurio-registry-ui):
- Apicurio’nun kullanıcı arayüzünü sağlar, kullanıcıların şemaları görsel olarak yönetmesine olanak tanır.
- 8081 portu üzerinden erişilebilirdir.
- apicurio-registry-kafkasql servisine bağlı olarak çalışır ve kullanıcıların şema yönetimi işlemlerini daha kolay bir arayüzle gerçekleştirmesini sağlar.
Özetledocker-compose.yaml
yukarıdaki dört servisin bir köprü ağı (bridge network) içinde sorunsuz bir şekilde iletişim kurmasını sağlar.
Konteynerlarımızı başlatalım:
docker-compose up --build -d
Tüm konteynerlerin çalışıp çalışmadığını doğrulamak için docker ps komutunu kullanıyoruz.
(schema_project) (base) [train@trainvm schema_registry]$ docker ps CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES d13cb6f6ac39 apicurio/apicurio-studio-ui:0.2.62.Final-announce "/usr/local/s2i/run" 27 seconds ago Up 25 seconds 8080/tcp, 8443/tcp, 8778/tcp, 9779/tcp, 0.0.0.0:8081->8081/tcp, :::8081->8081/tcp schema_registry-apicurio-registry-ui-1 6fd76315c36f apicurio/apicurio-registry-kafkasql:2.6.5.Final "/opt/jboss/containe…" 27 seconds ago Up 25 seconds 8443/tcp, 8778/tcp, 0.0.0.0:8080->8080/tcp, :::8080->8080/tcp, 9779/tcp schema_registry-apicurio-registry-kafkasql-1 66f478f6b4b5 kafka:3.8.0 "start-kafka.sh" 27 seconds ago Up 26 seconds 0.0.0.0:9092->9092/tcp, :::9092->9092/tcp kafka 54bcb258c159 postgres:15-bullseye "docker-entrypoint.s…" 27 seconds ago Up 26 seconds 0.0.0.0:5432->5432/tcp, :::5432->5432/tcp schema_registry-postgresql-1
Schema, Producer ve Consumer Oluşturulması
1. Schema
schema_registration.py
dosyası, bir Avro şemasını Apicurio Registry’ye kaydetmeyi sağlar. Böylece belirli bir veri yapısının tanımı Apicurio’da depolanarak veri akışı ve doğrulaması yapılabilir.
Dosyayı oluşturmak için komut:
touch schema_registration.py
Dosya içeriği:
import requests import json # Avro Şeması Tanımı schema = { "type": "record", "name": "Customer", "fields": [ {"name": "RowNumber", "type": "int"}, {"name": "CustomerId", "type": "int"}, {"name": "Surname", "type": "string"}, {"name": "CreditScore", "type": "int"}, {"name": "Geography", "type": "string"}, {"name": "Gender", "type": "string"}, {"name": "Age", "type": "int"}, {"name": "Tenure", "type": "int"}, {"name": "Balance", "type": "float"}, {"name": "NumOfProducts", "type": "int"}, {"name": "HasCrCard", "type": "int"}, {"name": "IsActiveMember", "type": "int"}, {"name": "EstimatedSalary", "type": "float"}, {"name": "Exited", "type": "int"} ] } # Şemayı Apicurio Registry'ye kaydet url = "http://localhost:8080/apis/registry/v2/groups/default/artifacts" headers = {"Content-Type": "application/json"} response = requests.post(url, headers=headers, data=json.dumps(schema)) if response.status_code == 200 or response.status_code == 201: print("Şema başarıyla kaydedildi.") else: print("Şema kaydedilemedi:", response.status_code, response.text)
Çalıştırmak için:
python schema_registration.py
Beklenen sonuç:
'Şema başarıyla kaydedildi.'
Ardından http://localhost:8080/ adresine giderek oluşan şemayı görebilirsiniz. Producer ve Consumer içinde ihtiyacımız olacak schema id’sini bu arayüzden görüyoruz.
2. Python Producer
producer.py
Apicurio Registry’de kayıtlı Customer
adlı Avro şemasını kullanarak, Churn_Modelling.csv
veri setindeki müşteri bilgilerini churn-topic
adlı Kafka konusuna Avro formatında gönderir. Veriler iki saniyelik aralıklarla Kafka’ya gönderilir.
Dosyayı oluşturmak için komut:
touch producer.py
Dosya içeriği:
import requests import pandas as pd from confluent_kafka import Producer from avro.schema import Parse from avro.io import DatumWriter, BinaryEncoder from io import BytesIO import json import time # Apicurio Registry ve Kafka bağlantı ayarları REGISTRY_URL = 'http://localhost:8080/apis/registry/v2/groups/default/artifacts/33e4eff9-b9d6-47f4-9814-c1383d043625' """Şema id REGISTRY_URL'nin sonuna yazılacak! örn: 33e4eff9-b9d6-47f4-9814-c1383d043625""" KAFKA_BOOTSTRAP_SERVERS = 'localhost:9092' producer_config = { 'bootstrap.servers': KAFKA_BOOTSTRAP_SERVERS, 'client.id': 'my-producer', 'linger.ms': 10, 'acks': 'all' } # Producer'ı oluşturma producer = Producer(producer_config) # Veri setini yükleme df = pd.read_csv("https://raw.githubusercontent.com/erkansirin78/datasets/refs/heads/master/Churn_Modelling.csv") # Apicurio Registry'den şema alma response = requests.get(REGISTRY_URL) if response.status_code == 200: schema_content = response.json() if schema_content: print("Şema başarıyla alındı.") schema = Parse(json.dumps(schema_content)) # JSON string'e dönüştürdük else: print("Şema içeriği bulunamadı. Yanıt:", response.json()) exit(1) else: print(f"Şema alınamadı: {response.status_code}, {response.text}") exit(1) # Veriyi Avro formatında Kafka’ya hazırlayan fonksiyon def send_avro_message(row): writer = DatumWriter(schema) bytes_writer = BytesIO() encoder = BinaryEncoder(bytes_writer) writer.write(row, encoder) return bytes_writer.getvalue() # Mesajları Kafka’ya Avro formatında gönderme for _, row in df.iterrows(): avro_message = send_avro_message(row.to_dict()) time.sleep(2) producer.produce("churn-topic", avro_message) print(f"Sent message: {row.to_dict()}") producer.flush()
Özetleyecek olursak burada kullandığımız send_avro_message fonksiyonu, aldığı veriyi Avro şemasına göre serileştirerek (bayt dizisine dönüştürerek) Kafka’ya gönderilmeye uygun hale getirir. Bu sayede veriler Kafka’da yapılandırılmış bir şekilde saklanabilir ve Avro’nun sağladığı avantajlardan (schema evolution, sıkıştırma vb.) yararlanılabilir.
Çalıştırmak için: (Yeni sekmede açın, Env. aktif edin, dosya konumuna gelerek çalıştırın. )
python producer.py
Beklenen çıktı:
3. Python Consumer
consumer.py
, Apicurio Registry’deki Customer
Avro şemasını kullanarak churn-topic
adlı Kafka konusundan gelen mesajları okur. Her mesaj Avro formatında çözülür ve PostgreSQL’de ChurnTable
tablosuna kaydedilir. Yeni kayıtlar veritabanına eklenirken, mevcut kayıtlar göz ardı edilir.
Dosyayı oluşturmak için komut:
touch consumer.py
Dosya içeriği:
from sqlmodel import Field, SQLModel, create_engine, Session import requests from confluent_kafka import Consumer from avro.io import DatumReader, BinaryDecoder from io import BytesIO from avro.schema import parse import json # Apicurio Registry URL ve Şema Bilgisi REGISTRY_URL = "http://localhost:8080/apis/registry/v2/groups/default" schema_id = "33e4eff9-b9d6-47f4-9814-c1383d043625" # Şema çekme ve ayrıştırma schema_response = requests.get(f"{REGISTRY_URL}/artifacts/{schema_id}") schema = parse(json.dumps(schema_response.json())) # JSON'u string formatına dönüştürdük # SQLModel Tablo Tanımı class ChurnTable(SQLModel, table=True): RowNumber: int = Field(primary_key=True) CustomerId: int Surname: str CreditScore: int Geography: str Gender: str Age: int Tenure: int Balance: float NumOfProducts: int HasCrCard: int IsActiveMember: int EstimatedSalary: float Exited: int # PostgreSQL bağlantısı DATABASE_URL = "postgresql://myuser:mypassword@localhost:5432/mydatabase" engine = create_engine(DATABASE_URL) # Tabloyu oluştur SQLModel.metadata.create_all(engine) # Kafka Consumer ayarları consumer_config = { 'bootstrap.servers': 'localhost:9092', 'group.id': 'my-group', 'auto.offset.reset': 'earliest' } consumer = Consumer(consumer_config) consumer.subscribe(['churn-topic']) # Mesajları tüketme ve PostgreSQL'e ekleme with Session(engine) as session: while True: message = consumer.poll(1.0) if message is None: continue if message.error(): print(f"Consumer error: {message.error()}") continue # Avro verisini çözümleme avro_bytes = BytesIO(message.value()) decoder = BinaryDecoder(avro_bytes) reader = DatumReader(schema) record = reader.read(decoder) # SQLModel nesnesi oluştur ve veritabanına ekle # Mevcut kaydı kontrol et existing_record = session.get(ChurnTable, record['RowNumber']) if existing_record is None: # Kayıt yoksa ekle churn_record = ChurnTable( RowNumber=record['RowNumber'], CustomerId=record['CustomerId'], Surname=record['Surname'], CreditScore=record['CreditScore'], Geography=record['Geography'], Gender=record['Gender'], Age=record['Age'], Tenure=record['Tenure'], Balance=record['Balance'], NumOfProducts=record['NumOfProducts'], HasCrCard=record['HasCrCard'], IsActiveMember=record['IsActiveMember'], EstimatedSalary=record['EstimatedSalary'], Exited=record['Exited'] ) session.add(churn_record) session.commit() else: print(f"Record with RowNumber {record['RowNumber']} already exists. Skipping.") # Eklenen kaydı ekrana yazdır print(f"Consumed Record: {record}") # PostgreSQL ve Kafka Consumer bağlantılarını kapatma consumer.close()
Çalıştırmak için: (Yeni sekmede açın, Env aktif edin, dosya konumuna gelerek çalıştırın. )
python consumer.py
Beklenen çıktı:
POSTGRESQL Konteynerine Bağlanma
Verilerin doğru şekilde işlendiğini ve kaydedildiğini görmek için PostgreSQL veritabanına bağlanıyoruz. Ayrı bir sekme açalım, environmentı aktif edelim ve proje dizininde olduğumuzdan emin olalım.
Konteynere bağlanmak için komutu kullanalım:
docker exec -it schema_registry-postgresql-1 bash
Bu komut ile root’a bağlandık şimdi oluşturduğumuz kullanıcı ile mydatabase’e erişelim.
psql -U myuser -d mydatabase
ChurnTable
tablosunda verilerin başarılı bir şekilde kaydedildiğini doğrulamak için örnek sorgular çalıştırıyoruz:
mydatabase=# \dt List of relations Schema | Name | Type | Owner --------+------------+-------+-------- public | churntable | table | myuser (1 row)
mydatabase=# select * from churntable limit(5); mydatabase=# RowNumber | CustomerId | Surname | CreditScore | Geography | Gender | Age | Tenure | Balance | NumOfProducts | HasCrCard | IsActiveMember | EstimatedSalary | Exited -----------+------------+----------+-------------+-----------+--------+-----+--------+----------------+---------------+-----------+----------------+-----------------+-------- 3 | 15619304 | Onio | 502 | France | Female | 42 | 8 | 159660.796875 | 3 | 1 | 0 | 113931.5703125 | 1 1 | 15634602 | Hargrave | 619 | France | Female | 42 | 2 | 0 | 1 | 1 | 1 | 101348.8828125 | 1 2 | 15647311 | Hill | 608 | Spain | Female | 41 | 1 | 83807.859375 | 1 | 0 | 1 | 112542.578125 | 0 4 | 15701354 | Boni | 699 | France | Female | 39 | 1 | 0 | 2 | 0 | 0 | 93826.6328125 | 0 5 | 15737888 | Mitchell | 850 | Spain | Female | 43 | 2 | 125510.8203125 | 1 | 1 | 1 | 79084.1015625 | 0 (5 rows)
Makalemizin sonuna geldik. Apicurio Registry ile şema yönetimi, Kafka ve PostgreSQL ile veri akışı ve Docker ile ortam yapılandırmasının birleşimi, gerçek dünya uygulamaları için sağlam bir çözüm ortaya koyuyor. Bu yazıda, güvenli ve yapılandırılmış bir veri akış sisteminin nasıl kurulabileceğine dair kapsamlı bir örnek sunmaya çalıştım. Umarım veri yolculuğunuzda sizlere yardımcı olur.
Sevgiler,
Görsel Tasarım: Veri Akış Örneği – Merve Öztiryaki