Apicurio Schema Registry ve Kafka ile Veri Akışı Yönetimi

Büyük veri sistemlerinde veri güvenliğini sağlamak ve veri formatını belirli bir yapıda tutmak, veri akışını yönetmede oldukça önemli bir rol oynar. Apicurio Schema Registry, veri yapısının standartlaşmasını sağlayarak Kafka gibi gerçek zamanlı veri hatlarında ve akış uygulamalarında veri doğruluğunu güvence altına alır. Kafka ise yüksek hacimli veri işlemlerini hızlıca gerçekleştiren bir mesajlaşma altyapısı sunar.

Veri Akış Örneği

Bu yazıda, Apicurio Registry ile Kafka’nın birlikte nasıl çalıştığını, veri akışını nasıl sağladığımızı ve PostgreSQL gibi bir veritabanında bu verileri nasıl sakladığımızı uygulamalı bir şekilde ele alacağım. Bir Kafka veri akışı kurarak ve Apicurio Schema Registry ile veri şemalarını yöneterek büyük ölçekli veri işlemleri için güvenli ve düzenli bir yapı sağlayacağız. PostgreSQL kullanarak veri tabanı düzenlemesi yapacak, Docker ile ortam kurulumunu tamamlayacağız ve bir müşteri verisi akışı senaryosu oluşturacağız.

Gereksinimler

  • Docker kurulu bir bilgisayar ve internet

Proje Ortamının Kurulması

İlk olarak, sanal bir Python ortamı oluşturarak gerekli bağımlılıkları yükleyip kurulum adımlarına başlıyorum. Bu ortam, bağımlılık yönetimini kolaylaştırarak projenin sorunsuz çalışmasını sağlayacak.

1. Sanal Ortam Oluşturulması ve Gereksinimlerin Yüklenmesi

Öncelikle kendimize yeni bir proje dizini ve ortam oluşturalım. Ardından proje için gerekli olan kütüphaneleri yükleyelim.

python3 -m venv schema_project

source schema_project/bin/activate

requirements.txt:

confluent_kafka
psycopg2-binary
requests
avro-python3
pandas
sqlmodel

pip install -r requirements.txt

2. Docker Ayarları ve Docker-Compose Yapılandırması

Docker kullanarak PostgreSQL, Kafka ve Apicurio Registry gibi hizmetlerin hepsini tek bir ortamda başlatıyoruz. Bu yapılandırma, farklı hizmetlerin uyum içinde çalışmasını sağlıyor.

Docker başlatalım:

sudo systemctl start docker

Kafka Dockerfile, start-kafka.sh, server.properties

Dockerfile:

Dockerfile, gerekli bağımlılıkları kurarak ve Kafka’yı indirip yapılandırarak Kafka sunucusunu Docker üzerinde çalışmaya hazır hale getirir.

FROM openjdk:17.0.2-jdk-buster

RUN apt update
RUN apt install -y curl
RUN apt install -y python3 
RUN apt install -y python3-pip 
RUN apt install -y git 
RUN apt install -y libpq-dev

ENV KAFKA_VERSION 3.8.0
ENV SCALA_VERSION 2.12

RUN  mkdir /tmp/kafka && \
    curl "https://downloads.apache.org/kafka/${KAFKA_VERSION}/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz" \
    -o /tmp/kafka/kafka.tgz && \
    mkdir /kafka && cd /kafka && \
    tar -xvzf /tmp/kafka/kafka.tgz --strip 1

RUN mkdir -p /data/kafka

COPY start-kafka.sh  /usr/bin

RUN chmod +x  /usr/bin/start-kafka.sh

CMD ["start-kafka.sh"]

start-kafka.sh:

start-kafka.sh  Kafka’nın belirtilen cluster-id ile başlatılmasını ve ardından server.properties ayarlarıyla Kafka sunucusunun çalıştırılmasını sağlar.

#!/bin/bash

/kafka/bin/kafka-storage.sh format --config /kafka/config/server.properties --cluster-id 'EP6hyiddQNW5FPrAvR9kWw' --ignore-formatted

/kafka/bin/kafka-server-start.sh /kafka/config/server.properties

server.properties: Kafka’nın temel yapılandırma dosyasıdır.

############################# Server Basics #############################
process.roles=broker,controller

node.id=1

controller.quorum.voters=1@kafka:9093

############################# Socket Server Settings #############################
listeners=PLAINTEXT://:9091,CONTROLLER://:9093,EXTERNAL://:9092

inter.broker.listener.name=PLAINTEXT

advertised.listeners=PLAINTEXT://kafka:9091,EXTERNAL://localhost:9092

controller.listener.names=CONTROLLER

listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT

num.network.threads=3

num.io.threads=8

socket.send.buffer.bytes=102400

socket.receive.buffer.bytes=102400

socket.request.max.bytes=104857600


############################# Log Basics #############################
log.dirs=/data/kafka

num.partitions=3

num.recovery.threads.per.data.dir=1

############################# Internal Topic Settings  #############################
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1

############################# Log Retention Policy #############################
log.retention.hours=168

log.segment.bytes=1073741824

log.retention.check.interval.ms=300000

################################  Custom Propertied Added by Erkan #########################
controlled.shutdown.enable=true
delete.topic.enable=true

Dosyayı oluşturmak için komut:

touch docker-compose.yaml

Docker Compose oluşturulması

networks:
  my_network:
    driver: bridge

services:
  kafka:
    container_name: kafka
    image: kafka:3.8.0
    build: 
      context: ./kafka
    ports:
        - "9092:9092"
    networks:
      - my_network
    volumes:
    - ./kafka/config/server.properties:/kafka/config/server.properties
    - ./data/kafka/:/data/kafka/  

  postgresql:
    image: 'postgres:15-bullseye'
    environment:
      POSTGRES_USER: myuser
      POSTGRES_PASSWORD: mypassword
      POSTGRES_DB: mydatabase
    ports:
      - '5432:5432'
    networks:
      - my_network

  apicurio-registry-kafkasql:
    image: apicurio/apicurio-registry-kafkasql:2.6.5.Final
    environment:
      KAFKA_BOOTSTRAP_SERVERS: kafka:9091
      REGISTRY_FEATURES_GROUPS_ENABLED: "true"
    ports:
      - "8080:8080"
    depends_on:
      - kafka
    networks:
      - my_network

  apicurio-registry-ui:
    image: apicurio/apicurio-studio-ui:0.2.62.Final-announce
    ports:
      - "8081:8081"
    depends_on:
      - apicurio-registry-kafkasql
    networks:
      - my_network

docker-compose dosyasındaki her servisin işlevi ve yapılandırmasından aşağıda kısaca açıklamak istiyorum.

Kurulacak Servisler ve Temel İşlevleri

  1. Kafka:
    • Kafka, Apicurio Schema Registry ile veri ve şema yönetimi için bir mesajlaşma platformu olarak çalışır.
    • 9092 portu üzerinden dış dünyaya açılır ve my_network isimli özel bir ağda çalışır.
    • server.properties dosyasındaki konfigürasyonlar, Kafka’nın yapılandırmasını özelleştirir. Ayrıca, verileri kalıcı hale getirmek için bir volumes ayarına sahiptir.
  2. PostgreSQL:
    • PostgreSQL, verilerin depolanmasını sağlamak için bir ilişkisel veritabanı olarak çalışır.
    • 5432 portundan erişilebilir ve varsayılan kullanıcı, parola ve veritabanı bilgileri environment bölümünde belirtilmiştir.
    • Apicurio Registry gibi uygulamalara, yapılandırılmış veritabanı üzerinden veri yönetimi sağlar.
  3. Apicurio Registry (apicurio-registry-kafkasql):
    • Apicurio Schema Registry’nin ana bileşenidir ve şemaları Kafka ile yönetmek için kullanılır.
    • KAFKA_BOOTSTRAP_SERVERS ortam değişkeni ile Kafka sunucusuna bağlanır. REGISTRY_FEATURES_GROUPS_ENABLED ayarı ise şema gruplarını etkinleştirir.
    • 8080 portunda çalışır ve Kafka servisine bağımlıdır, bu yüzden depends_on ifadesi ile Kafka servisi başlatıldıktan sonra aktif hale gelir.
  4. Apicurio Registry UI (apicurio-registry-ui):
    • Apicurio’nun kullanıcı arayüzünü sağlar, kullanıcıların şemaları görsel olarak yönetmesine olanak tanır.
    • 8081 portu üzerinden erişilebilirdir.
    • apicurio-registry-kafkasql servisine bağlı olarak çalışır ve kullanıcıların şema yönetimi işlemlerini daha kolay bir arayüzle gerçekleştirmesini sağlar.

Özetledocker-compose.yaml yukarıdaki dört servisin bir köprü ağı (bridge network) içinde sorunsuz bir şekilde iletişim kurmasını sağlar.

Konteynerlarımızı başlatalım:

docker-compose up --build -d

Tüm konteynerlerin çalışıp çalışmadığını doğrulamak için docker ps komutunu kullanıyoruz.

(schema_project) (base) [train@trainvm schema_registry]$ docker ps
CONTAINER ID   IMAGE                                               COMMAND                  CREATED          STATUS          PORTS                                                                               NAMES
d13cb6f6ac39   apicurio/apicurio-studio-ui:0.2.62.Final-announce   "/usr/local/s2i/run"     27 seconds ago   Up 25 seconds   8080/tcp, 8443/tcp, 8778/tcp, 9779/tcp, 0.0.0.0:8081->8081/tcp, :::8081->8081/tcp   schema_registry-apicurio-registry-ui-1
6fd76315c36f   apicurio/apicurio-registry-kafkasql:2.6.5.Final     "/opt/jboss/containe…"   27 seconds ago   Up 25 seconds   8443/tcp, 8778/tcp, 0.0.0.0:8080->8080/tcp, :::8080->8080/tcp, 9779/tcp             schema_registry-apicurio-registry-kafkasql-1
66f478f6b4b5   kafka:3.8.0                                         "start-kafka.sh"         27 seconds ago   Up 26 seconds   0.0.0.0:9092->9092/tcp, :::9092->9092/tcp                                           kafka
54bcb258c159   postgres:15-bullseye                                "docker-entrypoint.s…"   27 seconds ago   Up 26 seconds   0.0.0.0:5432->5432/tcp, :::5432->5432/tcp                                           schema_registry-postgresql-1

Schema, Producer ve Consumer Oluşturulması

1. Schema

schema_registration.py dosyası, bir Avro şemasını Apicurio Registry’ye kaydetmeyi sağlar. Böylece belirli bir veri yapısının tanımı Apicurio’da depolanarak veri akışı ve doğrulaması yapılabilir.

Dosyayı oluşturmak için komut:

touch schema_registration.py

Dosya içeriği:

import requests
import json

# Avro Şeması Tanımı
schema = {
    "type": "record",
    "name": "Customer",
    "fields": [
        {"name": "RowNumber", "type": "int"},
        {"name": "CustomerId", "type": "int"},
        {"name": "Surname", "type": "string"},
        {"name": "CreditScore", "type": "int"},
        {"name": "Geography", "type": "string"},
        {"name": "Gender", "type": "string"},
        {"name": "Age", "type": "int"},
        {"name": "Tenure", "type": "int"},
        {"name": "Balance", "type": "float"},
        {"name": "NumOfProducts", "type": "int"},
        {"name": "HasCrCard", "type": "int"},
        {"name": "IsActiveMember", "type": "int"},
        {"name": "EstimatedSalary", "type": "float"},
        {"name": "Exited", "type": "int"}
    ]
}

# Şemayı Apicurio Registry'ye kaydet
url = "http://localhost:8080/apis/registry/v2/groups/default/artifacts"
headers = {"Content-Type": "application/json"}

response = requests.post(url, headers=headers, data=json.dumps(schema))

if response.status_code == 200 or response.status_code == 201:
    print("Şema başarıyla kaydedildi.")
else:
    print("Şema kaydedilemedi:", response.status_code, response.text)

Çalıştırmak için:

python schema_registration.py

Beklenen sonuç:

'Şema başarıyla kaydedildi.'

Ardından http://localhost:8080/ adresine giderek oluşan şemayı görebilirsiniz. Producer ve Consumer içinde ihtiyacımız olacak schema id’sini bu arayüzden görüyoruz.

Customer adlı avro formatında oluşturduğumuz şema.
Customer şemasına ait ID yi buradan alıyoruz.

2. Python Producer

producer.py  Apicurio Registry’de kayıtlı Customer adlı Avro şemasını kullanarak, Churn_Modelling.csv veri setindeki müşteri bilgilerini churn-topic adlı Kafka konusuna Avro formatında gönderir. Veriler iki saniyelik aralıklarla Kafka’ya gönderilir.

Dosyayı oluşturmak için komut:

touch producer.py

Dosya içeriği:

import requests
import pandas as pd
from confluent_kafka import Producer
from avro.schema import Parse
from avro.io import DatumWriter, BinaryEncoder
from io import BytesIO
import json
import time

# Apicurio Registry ve Kafka bağlantı ayarları 
REGISTRY_URL = 'http://localhost:8080/apis/registry/v2/groups/default/artifacts/33e4eff9-b9d6-47f4-9814-c1383d043625'
"""Şema id REGISTRY_URL'nin sonuna yazılacak! örn: 33e4eff9-b9d6-47f4-9814-c1383d043625"""
KAFKA_BOOTSTRAP_SERVERS = 'localhost:9092'

producer_config = {
    'bootstrap.servers': KAFKA_BOOTSTRAP_SERVERS,
    'client.id': 'my-producer',
    'linger.ms': 10,
    'acks': 'all'
}

# Producer'ı oluşturma
producer = Producer(producer_config)

# Veri setini yükleme
df = pd.read_csv("https://raw.githubusercontent.com/erkansirin78/datasets/refs/heads/master/Churn_Modelling.csv")

# Apicurio Registry'den şema alma
response = requests.get(REGISTRY_URL)
if response.status_code == 200:
    schema_content = response.json()
    if schema_content:
        print("Şema başarıyla alındı.")
        schema = Parse(json.dumps(schema_content))  # JSON string'e dönüştürdük
    else:
        print("Şema içeriği bulunamadı. Yanıt:", response.json())
        exit(1)
else:
    print(f"Şema alınamadı: {response.status_code}, {response.text}")
    exit(1)

# Veriyi Avro formatında Kafka’ya hazırlayan fonksiyon
def send_avro_message(row):
    writer = DatumWriter(schema)
    bytes_writer = BytesIO()
    encoder = BinaryEncoder(bytes_writer)
    writer.write(row, encoder)
    return bytes_writer.getvalue()

# Mesajları Kafka’ya Avro formatında gönderme
for _, row in df.iterrows():
    avro_message = send_avro_message(row.to_dict())
    time.sleep(2)
    producer.produce("churn-topic", avro_message)
    print(f"Sent message: {row.to_dict()}")
    producer.flush()

Özetleyecek olursak burada kullandığımız send_avro_message fonksiyonu, aldığı veriyi Avro şemasına göre serileştirerek (bayt dizisine dönüştürerek) Kafka’ya gönderilmeye uygun hale getirir. Bu sayede veriler Kafka’da yapılandırılmış bir şekilde saklanabilir ve Avro’nun sağladığı avantajlardan (schema evolution, sıkıştırma vb.) yararlanılabilir.

Çalıştırmak için: (Yeni sekmede açın, Env. aktif edin, dosya konumuna gelerek çalıştırın. )

python producer.py

Beklenen çıktı:

3. Python Consumer

consumer.py , Apicurio Registry’deki Customer Avro şemasını kullanarak churn-topic adlı Kafka konusundan gelen mesajları okur. Her mesaj Avro formatında çözülür ve PostgreSQL’de ChurnTable tablosuna kaydedilir. Yeni kayıtlar veritabanına eklenirken, mevcut kayıtlar göz ardı edilir.

Dosyayı oluşturmak için komut:

touch consumer.py

Dosya içeriği:

from sqlmodel import Field, SQLModel, create_engine, Session
import requests
from confluent_kafka import Consumer
from avro.io import DatumReader, BinaryDecoder
from io import BytesIO
from avro.schema import parse
import json

# Apicurio Registry URL ve Şema Bilgisi
REGISTRY_URL = "http://localhost:8080/apis/registry/v2/groups/default"
schema_id = "33e4eff9-b9d6-47f4-9814-c1383d043625" 

# Şema çekme ve ayrıştırma
schema_response = requests.get(f"{REGISTRY_URL}/artifacts/{schema_id}")
schema = parse(json.dumps(schema_response.json()))  # JSON'u string formatına dönüştürdük

# SQLModel Tablo Tanımı
class ChurnTable(SQLModel, table=True):
    RowNumber: int = Field(primary_key=True)
    CustomerId: int
    Surname: str
    CreditScore: int
    Geography: str
    Gender: str
    Age: int
    Tenure: int
    Balance: float
    NumOfProducts: int
    HasCrCard: int
    IsActiveMember: int
    EstimatedSalary: float
    Exited: int

# PostgreSQL bağlantısı
DATABASE_URL = "postgresql://myuser:mypassword@localhost:5432/mydatabase"
engine = create_engine(DATABASE_URL)

# Tabloyu oluştur
SQLModel.metadata.create_all(engine)

# Kafka Consumer ayarları
consumer_config = {
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'my-group',
    'auto.offset.reset': 'earliest'
}
consumer = Consumer(consumer_config)
consumer.subscribe(['churn-topic'])

# Mesajları tüketme ve PostgreSQL'e ekleme
with Session(engine) as session:
    while True:
        message = consumer.poll(1.0)
        if message is None:
            continue
        if message.error():
            print(f"Consumer error: {message.error()}")
            continue

        # Avro verisini çözümleme
        avro_bytes = BytesIO(message.value())
        decoder = BinaryDecoder(avro_bytes)
        reader = DatumReader(schema)
        record = reader.read(decoder)

        # SQLModel nesnesi oluştur ve veritabanına ekle

        # Mevcut kaydı kontrol et
        existing_record = session.get(ChurnTable, record['RowNumber'])
        if existing_record is None:  # Kayıt yoksa ekle
            churn_record = ChurnTable(
                RowNumber=record['RowNumber'],
                CustomerId=record['CustomerId'],
                Surname=record['Surname'],
                CreditScore=record['CreditScore'],
                Geography=record['Geography'],
                Gender=record['Gender'],
                Age=record['Age'],
                Tenure=record['Tenure'],
                Balance=record['Balance'],
                NumOfProducts=record['NumOfProducts'],
                HasCrCard=record['HasCrCard'],
                IsActiveMember=record['IsActiveMember'],
                EstimatedSalary=record['EstimatedSalary'],
                Exited=record['Exited']
            )
            session.add(churn_record)
            session.commit()
        else:
            print(f"Record with RowNumber {record['RowNumber']} already exists. Skipping.")

        # Eklenen kaydı ekrana yazdır
        print(f"Consumed Record: {record}")
        
# PostgreSQL ve Kafka Consumer bağlantılarını kapatma
consumer.close()

Çalıştırmak için: (Yeni sekmede açın, Env aktif edin, dosya konumuna gelerek çalıştırın. )

python consumer.py

Beklenen çıktı:

POSTGRESQL Konteynerine Bağlanma

Verilerin doğru şekilde işlendiğini ve kaydedildiğini görmek için PostgreSQL veritabanına bağlanıyoruz. Ayrı bir sekme açalım, environmentı aktif edelim ve proje dizininde olduğumuzdan emin olalım.

Konteynere bağlanmak için komutu kullanalım:

docker exec -it schema_registry-postgresql-1 bash

Bu komut ile root’a bağlandık şimdi oluşturduğumuz kullanıcı ile mydatabase’e erişelim.

psql -U myuser -d mydatabase

ChurnTable tablosunda verilerin başarılı bir şekilde kaydedildiğini doğrulamak için örnek sorgular çalıştırıyoruz:

mydatabase=# \dt
          List of relations
 Schema |    Name    | Type  | Owner
--------+------------+-------+--------
 public | churntable | table | myuser
(1 row)
mydatabase=# select * from churntable limit(5);

mydatabase=# 

 RowNumber | CustomerId | Surname  | CreditScore | Geography | Gender | Age | Tenure |    Balance     | NumOfProducts | HasCrCard | IsActiveMember | EstimatedSalary | Exited
-----------+------------+----------+-------------+-----------+--------+-----+--------+----------------+---------------+-----------+----------------+-----------------+--------
         3 |   15619304 | Onio     |         502 | France    | Female |  42 |      8 |  159660.796875 |             3 |         1 |              0 |  113931.5703125 |      1
         1 |   15634602 | Hargrave |         619 | France    | Female |  42 |      2 |              0 |             1 |         1 |              1 |  101348.8828125 |      1
         2 |   15647311 | Hill     |         608 | Spain     | Female |  41 |      1 |   83807.859375 |             1 |         0 |              1 |   112542.578125 |      0
         4 |   15701354 | Boni     |         699 | France    | Female |  39 |      1 |              0 |             2 |         0 |              0 |   93826.6328125 |      0
         5 |   15737888 | Mitchell |         850 | Spain     | Female |  43 |      2 | 125510.8203125 |             1 |         1 |              1 |   79084.1015625 |      0
(5 rows)

Makalemizin sonuna geldik. Apicurio Registry ile şema yönetimi, Kafka ve PostgreSQL ile veri akışı ve Docker ile ortam yapılandırmasının birleşimi, gerçek dünya uygulamaları için sağlam bir çözüm ortaya koyuyor. Bu yazıda, güvenli ve yapılandırılmış bir veri akış sisteminin nasıl kurulabileceğine dair kapsamlı bir örnek sunmaya çalıştım. Umarım veri yolculuğunuzda sizlere yardımcı olur.

Sevgiler,

Görsel Tasarım: Veri Akış Örneği – Merve Öztiryaki

Yazar Hakkında
Toplam 1 yazı
Merve Öztiryaki
Merve Öztiryaki
Yıllardır hem Türkiye'de hem de uluslararası arenada veri bilimi ve mühendisliği alanında çalışıyor, sürekli kendimi geliştiriyorum. Veri analitiği, makine öğrenmesi ve büyük veri teknolojilerindeki birikimimle, iş süreçlerini daha verimli hale getiren yenilikçi çözümler üretiyorum. Python, Apache Spark ve Airflow gibi güçlü araçlarla, AWS ve GCP gibi bulut platformlarını kullanarak büyük veri projelerine yaratıcı yaklaşımlar getiriyorum. Elde ettiğim her başarı, veri mühendisliği tutkumun bir parçası olurken, edindiğim bilgileri başkalarıyla paylaşarak onların gelişimine katkı sağlamaktan da büyük bir mutluluk duyuyorum.
Yorumlar (Yorum yapılmamış)

Bir yanıt yazın

E-posta adresiniz yayınlanmayacak. Gerekli alanlar * ile işaretlenmişlerdir

×

Bir Şeyler Ara