Apache Flink Dataset Api ile Temel Veri Analizi
Herkese merhabalar. Bu yazımızda Apache Flink Dataset Api ile örnek veri setleri üzerinden veri analizi yapacağız. Yapacağımız örnek meşhur retail_db veri tabanındaki csv dosyalarını kullanarak tutar bakımından en çok iptal edilen ürünleri bulmaya çalışmak olacak. Çalışmamda kullandığım ortam bilgileri şu şekildedir: Ubuntu, Intellij IDEA (maven projesi), Java8, Scala 2.11, Flink 1.9.2, veri setlerine buradan github projesine buradan erişebilirsiniz.
Apache Flink büyük veri dünyasında akan veri işlemeye odaklı olarak başlayan ve halihazırda bu alanda en iyi proje olmayı başarmış bir projedir. Benim görüşüme göre elbette, yoksa bunun böyle kolaylıkla söylenemeyecek bir iddia olduğunun farkındayım.
Uzun zamandır Spark kullanıyorum. Biraz da Flink’e bakayım ve hadi şu Spark’ta yaptığım basit bir işi Flink ile de yapayım dedim. İş yukarıda bahsettiğim iş. Ancak farkettim ki Spark ile karşılaştırdığımda Flink ile ilgili neredeyse hiç bir kaynak, örnek, eğitim, kitap vs yok. Aldığım hataları gidermekte bile zorlandım, hatta bazıları için Stackoverflow’da soru sorarak yardım istedim. Sonra düşündüm ki öğrendiklerimi insalarla paylaşayım ve hiç olmazsa Flink ile bir dosya nasıl okunur ve analiz edilir bir örneği olsun.
Şunu belirtmeliyim ki dataset api datastream api’ye göre biraz üvey evlat. Çünkü Flink’in asıl iddiası akan veri tarafında. Bunu söylüyorum ki sizi yanlış yönlendirmiş olmayayım.
Kullandığım kütüphaneler
import org.apache.flink.api.scala._ import java.sql.Timestamp import org.apache.flink.api.common.operators.Order
Spark’ta nasıl SparkSession varsa Flink’te de ExecutionEvironment
var. Her işe başlarken bunu oluşturuyoruz.
val env = ExecutionEnvironment.getExecutionEnvironment
İş problemi ( tutar bakımından en çok iptal edilen ürünleri bulma ) bana;
- products, order_items ve orders verisetlerini kullanmam,
- Önce orders ile order_items’ı birleştirmemi, sonra bunlara products’ı eklemem,
- İptal edilen ürünleri filtreleyip ürün adına göre gruplarken tutarları toplamam,
- Sonucu da azalan şekilde sıralamam
gerektiğini söylüyor.
Case Class ile Şema Tanımlama
Önce Products ile başlıyorum. Tabi şema derdi burada da var. Bu derdi Scala kullandığımız için case class ile rahatlıkla hallediyoruz. Yani veriye bakıyorum (linux head olabilir) başlık var mı, neyle ayrılmış, veri türleri ne olabilir inceliyorum ve sonra aşağıdaki veri yapısını oluşturuyorum:
// Define Products schema case class Products(productId: Int, productCategoryId: Int, productName: String, productDescription: String, productPrice: Double, productImage: String)
Flink ile Veri Okuma
Şimdi products.csv’yi okuyalım ve bir göz atalım.
val productsDS = env.readCsvFile[Products]("/home/erkan/datasets/retail_db/products2.csv", fieldDelimiter=",", ignoreFirstLine = true, quoteCharacter = '"') println(productsDS.count()) // 1345 productsDS.first(5).print() /* Products(1,2,Quest Q64 10 FT. x 10 FT. Slant Leg Instant U,,59.98,http://images.acmesports.sports/Quest+Q64+10+FT.+x+10+FT.+Slant+Leg+Instant+Up+Canopy) Products(2,2,Under Armour Men's Highlight MC Football Clea,,129.99,http://images.acmesports.sports/Under+Armour+Men%27s+Highlight+MC+Football+Cleat) Products(3,2,Under Armour Men's Renegade D Mid Football Cl,,89.99,http://images.acmesports.sports/Under+Armour+Men%27s+Renegade+D+Mid+Football+Cleat) Products(4,2,Under Armour Men's Renegade D Mid Football Cl,,89.99,http://images.acmesports.sports/Under+Armour+Men%27s+Renegade+D+Mid+Football+Cleat) Products(5,2,Riddell Youth Revolution Speed Custom Footbal,,199.99,http://images.acmesports.sports/Riddell+Youth+Revolution+Speed+Custom+Football+Helmet) */
Orders için şema tanımlama ve veri okuyup bir göz atma
// Define Orders schema case class Orders(orderId: Int, orderDate: Timestamp, orderCustomerId: Int, orderStatus: String) // Create dataset from file val ordersDS = env.readCsvFile[Orders]("/home/erkan/datasets/retail_db/orders.csv", fieldDelimiter=",", ignoreFirstLine = true) println(ordersDS.count()) // 68883 ordersDS.first(5).print() /** * Orders(51730,2014-06-14 00:00:00.0,2153,COMPLETE) * Orders(51731,2014-06-14 00:00:00.0,8895,PROCESSING) * Orders(51732,2014-06-14 00:00:00.0,10222,PENDING) * Orders(51733,2014-06-14 00:00:00.0,4078,CANCELED) * Orders(51734,2014-06-14 00:00:00.0,6522,PENDING_PAYMENT) */
Order Items için şema tanımlama ve veri okuyup bir göz atma
// Define Order Items Schema case class OrderItems(orderItemName: Int, orderItemOrderId: Int, orderItemProductId: Int, orderItemQuantity: Int, orderItemSubTotal: Double, orderItemProductPrice: Double) val orderItemsDS = env.readCsvFile[OrderItems](filePath = "/home/erkan/datasets/retail_db/order_items.csv", fieldDelimiter = ",", ignoreFirstLine = true) println(orderItemsDS.count()) // 172198 orderItemsDS.first(5).print() /** * OrderItems(1,1,957,1,299.98,299.98) * OrderItems(2,2,1073,1,199.99,199.99) * OrderItems(3,2,502,5,250.0,50.0) * OrderItems(4,2,403,1,129.99,129.99) * OrderItems(5,4,897,2,49.98,24.99) */
Flink Join
Şimdi orders ile order_items’ı birleştirelim. Birleşme anahtaların kod içinde yorumu mevcuttur. where ve equalTo içindeki rakamlar ilgili join anahtarların dataset içindeki sütun indeksleridir.
// where refers left dataset (ordersDS) orderId, equalTo refers right dataset (orderItemsDS) orderItemOrderId val ordersJoinedDS = ordersDS.join(orderItemsDS).where(0).equalTo(1) ordersJoinedDS.first(5).print() /** * (Orders(35211,2014-02-27 00:00:00.0,4849,COMPLETE),OrderItems(87944,35211,365,4,239.96,59.99)) * (Orders(35214,2014-02-27 00:00:00.0,388,PENDING_PAYMENT),OrderItems(87954,35214,403,1,129.99,129.99)) * (Orders(35214,2014-02-27 00:00:00.0,388,PENDING_PAYMENT),OrderItems(87955,35214,1014,3,149.94,49.98)) * (Orders(35214,2014-02-27 00:00:00.0,388,PENDING_PAYMENT),OrderItems(87956,35214,1004,1,399.98,399.98)) * (Orders(35214,2014-02-27 00:00:00.0,388,PENDING_PAYMENT),OrderItems(87957,35214,365,4,239.96,59.99)) */ println(ordersJoinedDS.count()) // 172198
Şimdi iki elemanlı bir tuple’dan oluşan bir Dataset elde ettim. Ancak hala products’ı eklemedik onu da ekleyelim tam olsun.
// Join ordersJoinedDS and products val ordersWithProductsDS = ordersJoinedDS.join(productsDS).where(_._2.orderItemProductId).equalTo(_.productId) println(ordersWithProductsDS.count()) // 172198 ordersWithProductsDS.first(5).print() /* ((Orders(53282,2014-06-25 00:00:00.0,4285,PENDING_PAYMENT),OrderItems(133137,53282,37,2,69.98,34.99)),Products(37,3,adidas Kids' F5 Messi FG Soccer Cleat,,34.99,http://images.acmesports.sports/adidas+Kids%27+F5+Messi+FG+Soccer+Cleat)) ((Orders(53364,2014-06-26 00:00:00.0,9001,COMPLETE),OrderItems(133333,53364,37,1,34.99,34.99)),Products(37,3,adidas Kids' F5 Messi FG Soccer Cleat,,34.99,http://images.acmesports.sports/adidas+Kids%27+F5+Messi+FG+Soccer+Cleat)) ((Orders(53539,2014-06-27 00:00:00.0,1499,PROCESSING),OrderItems(133801,53539,37,5,174.95,34.99)),Products(37,3,adidas Kids' F5 Messi FG Soccer Cleat,,34.99,http://images.acmesports.sports/adidas+Kids%27+F5+Messi+FG+Soccer+Cleat)) ((Orders(21,2013-07-25 00:00:00.0,2711,PENDING),OrderItems(66,21,37,2,69.98,34.99)),Products(37,3,adidas Kids' F5 Messi FG Soccer Cleat,,34.99,http://images.acmesports.sports/adidas+Kids%27+F5+Messi+FG+Soccer+Cleat)) ((Orders(36176,2014-03-04 00:00:00.0,9331,PENDING),OrderItems(90342,36176,37,2,69.98,34.99)),Products(37,3,adidas Kids' F5 Messi FG Soccer Cleat,,34.99,http://images.acmesports.sports/adidas+Kids%27+F5+Messi+FG+Soccer+Cleat)) */
Flink Group By Order By
Şimdi son dokunuşu yapıyoruz:
// Find most selling products in terms of total amount ordersWithProductsDS.map(x => (x._2.productName, x._1._1.orderStatus, x._1._2.orderItemSubTotal)) .filter(_._2.contains("CANCELED")) .groupBy(0) .sum(2) .sortPartition(2, Order.DESCENDING).setParallelism(1) .first(10).print() /* (Field & Stream Sportsman 16 Gun Fire Safe,CANCELED,134393.27999999965) (Perfect Fitness Perfect Rip Deck,CANCELED,85785.70000000008) (Nike Men's Free 5.0+ Running Shoe,CANCELED,80691.9300000001) (Diamondback Women's Serene Classic Comfort Bi,CANCELED,80094.6600000001) (Pelican Sunstream 100 Kayak,CANCELED,66196.6899999998) (Nike Men's Dri-FIT Victory Golf Polo,CANCELED,65750.0) (Nike Men's CJ Elite 2 TD Football Cleat,CANCELED,60705.32999999974) (O'Brien Men's Neoprene Life Vest,CANCELED,58126.74000000006) (Under Armour Girls' Toddler Spine Surge Runni,CANCELED,26153.459999999992) (LIJA Women's Eyelet Sleeveless Golf Polo,CANCELED,2145.0) */
Evet bu kadar. Satış tutarı bakımından en çok iptal edilen ürünleri bulduk.
Kapak Görseli: Caleb Martin on Unsplash