Apache Flink ile Kafka’dan Mesaj Okuma (Scala)
Herkese merhaba. Apache Flink gerçek zamanlı veri işleme konusunda oldukça başarılı ve popüler bir araç. Böyle olmasına rağmen ilginç bir şekilde Flink öğrenmek için yeterli kaynak ve güncel örnekler bulmak zor. Mesela benzer bir araç olan Apache Spark ile tonlarca kitap, kurs, makale ve örnek varken Flink’te bunun onda biri bile yok. Daha önce Apache Flink ile ilgili bir kaç yazı yazmıştım. Şimdi ise Kafka’dan Flink ile nasıl mesaj okuruz ile ilgili oldukça basit bir örnek yapacağız. Kullanacağımız dil Scala olacaktır.
Ortam Bilgileri
Operating System: CentOS7
IDE: IntelliJ IDEA 2022.2.3 (Community Edition)
Proje Türü: Maven
Maven: 3.8.6
Java: Java-11
Scala: 2.12.12
Flink: 1.16.0
Kafka: 3.2.0
Ön Gereksinimler
- Flink kurulmuş ve çalışıyor ve versiyon 1.16.0. (Dikkat: Eski versiyonlarda Kafka source farklı olabilir.)
- IntelliJ Scala plugin eklenmiş.
- Maven projesi oluşturulmuş
- Şu komutu kullanarak ve sorulan sorulara cevap vererek maven scala projesi oluşturabilirsiniz.
mvn archetype:generate -DarchetypeGroupId=net.alchim31.maven -DarchetypeArtifactId=scala-archetype-simple
- localhost:9092‘den erişilebilir bir Kafka mevcut ve çalışıyor.
pom.xml Bağımlılıklar
Tamamı yazının sonunda paylaşılmıştır. Burada ilgili bağımlıklar mevcuttur.
... ... <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_2.12</artifactId> <version>1.16.0</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients</artifactId> <version>1.16.0</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka</artifactId> <version>1.16.0</version> </dependency> ... ...
Kütüphaneleri indirelim
import org.apache.flink.api.common.eventtime.WatermarkStrategy import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.connector.kafka.source.KafkaSource import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer import org.apache.flink.streaming.api.scala._
Flink StreamExecutionEnvironment
Her Flink uygulaması bir environment oluşturmakla başlar burada StreamExecutionEnvironment oluşturuyoruz.
val env = StreamExecutionEnvironment.getExecutionEnvironment
Kafka Kaynağı
Eski sürümler için farklı örnekler görebilirsiniz.
val kafkaSource = KafkaSource.builder() .setBootstrapServers("localhost:9092") .setTopics("flink-example") .setGroupId("flink-consumer-group") .setStartingOffsets(OffsetsInitializer.latest()) .setValueOnlyDeserializer(new SimpleStringSchema()) .build()
Kafkadan Gelen Mesajlar
StreamExecutionEnvironment (env)’a az önce tanımladığımız KafkaSource’u (localhost:9092’deki Kafka’nın “flink-example” topiği) göstererek mesajları okumaya (consume etmeye) başlıyoruz ve sonucu lines
isminde DataStream[String]
türünde bir değişkene atıyoruz.
val lines = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "Kafka Source")
Mesajları Konsola yazdırma
Okuduğumuz mesajlara hiç bir şey yapmadan olduğu gibi konsola yazdıracağız. Dilerseniz siz burada artık dönüşüm, temizleme ve filtreleme gibi operasyonları yapabilirsiniz.
lines.print()
Stream Uygulamasını Başlatma
Aşağıdaki kod stream akışını başlatmak için. Bunu kullanmadığınızda stream başlamayacaktır. Kodu çalıştırmadan bekleyelim ve Kafka tarafına geçip topic oluşturalım.
env.execute("Read from Kafka")
Kafka Topic Oluşturma
kafka/bin/kafka-topics.sh --bootstrap-server kafka2:9092 --create --topic flink-example --partitions 3 --replication-factor 1
Flink Uygulamasını Çalıştırma
IntelliJ’de kodların içinden sağ tıklayıp run dediğinizde Flink çalışacak ve Kafka’dan mesaj consume etmeye başlayacaktır.
Kafka’ya Mesaj Göndermek
Kafka ile hazır gelen kafka-console-producer.sh işimizi fazlasıyla görecektir.
kafka/bin/kafka-console-producer.sh --bootstrap-server kafka2:9092 --topic flink-example
> işaretini gördükten sonra mesaj göndermeye başlayabilirsiniz. Örnek olarak ben aşağıdaki mesajları gönderdim:
hi flink
IntelliJ konsolundan tüketilen mesajları görebilirsiniz.
Kodların Tamamı
package myPackage import org.apache.flink.api.common.eventtime.WatermarkStrategy import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.connector.kafka.source.KafkaSource import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer import org.apache.flink.streaming.api.scala._ import java.util.Properties /* kafka/bin/kafka-topics.sh --bootstrap-server kafka2:9092 --create --topic flink-example --partitions 3 --replication-factor 3 */ object ReadFromKafka extends App{ val env = StreamExecutionEnvironment.getExecutionEnvironment val kafkaSource = KafkaSource.builder() .setBootstrapServers("localhost:9092") .setTopics("flink-example") .setGroupId("flink-consumer-group") .setStartingOffsets(OffsetsInitializer.latest()) .setValueOnlyDeserializer(new SimpleStringSchema()) .build() val lines = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "Kafka Source") lines.print() env.execute("Read from Kafka") }
pom.xml Tamamı
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>local.vbo</groupId> <artifactId>flink-examples</artifactId> <version>0.0.1</version> <name>${project.artifactId}</name> <description>My wonderfull scala app</description> <inceptionYear>2018</inceptionYear> <licenses> <license> <name>My License</name> <url>http://....</url> <distribution>repo</distribution> </license> </licenses> <properties> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> <encoding>UTF-8</encoding> <scala.version>2.12.6</scala.version> <scala.compat.version>2.12</scala.compat.version> <spec2.version>4.2.0</spec2.version> </properties> <dependencies> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_2.12</artifactId> <version>1.16.0</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients</artifactId> <version>1.16.0</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka</artifactId> <version>1.16.0</version> </dependency> <!-- https://mvnrepository.com/artifact/org.json4s/json4s-native --> <dependency> <groupId>org.json4s</groupId> <artifactId>json4s-native_2.12</artifactId> <version>4.0.4</version> </dependency> <!-- Test --> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> <scope>test</scope> </dependency> <dependency> <groupId>org.scalatest</groupId> <artifactId>scalatest_${scala.compat.version}</artifactId> <version>3.0.5</version> <scope>test</scope> </dependency> <dependency> <groupId>org.specs2</groupId> <artifactId>specs2-core_${scala.compat.version}</artifactId> <version>${spec2.version}</version> <scope>test</scope> </dependency> <dependency> <groupId>org.specs2</groupId> <artifactId>specs2-junit_${scala.compat.version}</artifactId> <version>${spec2.version}</version> <scope>test</scope> </dependency> </dependencies> <build> <sourceDirectory>src/main/scala</sourceDirectory> <testSourceDirectory>src/test/scala</testSourceDirectory> <plugins> <plugin> <!-- see http://davidb.github.com/scala-maven-plugin --> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.3.2</version> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> <configuration> <args> <arg>-dependencyfile</arg> <arg>${project.build.directory}/.scala_dependencies</arg> </args> </configuration> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-surefire-plugin</artifactId> <version>2.21.0</version> <configuration> <!-- Tests will be run with scalatest-maven-plugin instead --> <skipTests>true</skipTests> </configuration> </plugin> <plugin> <groupId>org.scalatest</groupId> <artifactId>scalatest-maven-plugin</artifactId> <version>2.0.0</version> <configuration> <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory> <junitxml>.</junitxml> <filereports>TestSuiteReport.txt</filereports> <!-- Comma separated list of JUnit test class names to execute --> <jUnitClasses>samples.AppTest</jUnitClasses> </configuration> <executions> <execution> <id>test</id> <goals> <goal>test</goal> </goals> </execution> </executions> </plugin> </plugins> </build> </project>
Çok basit haliyle ve Scala dilinde Flink ile Kafka’dan mesaj okuma için bir örnek vermeye çalıştık. Umarım faydalı olmuştur.
Başka bir yazıda daha görüşmek dileğiyle. Esen kalın…
Görsel kapak: Photo by Shane Young on Unsplash