MySQL Upsert İçin Alternatif Bir Yaklaşım
İş ihtiyacını karşılayacak şekilde temiz, doğru, güvenilir, istenen formatta, gıcır gıcır veri tabanında bekleyen bir veriyi kim sevmez ki 🙂 Mesela Mysql’de.
Elbette herkes sever ancak bunu sağlayacak olan sağlam işleyen veri akış hatlarıdır (data pipelines). Veri akış hatları bilgi sistemi kullanan küçük veya büyük her örgüt için olmazsa olmaz bir zorunluluk haline geldi. Artık veri akış hattı ve bunun sağlıklı bir şekilde işlemesi insan vücudundaki kan dolaşımı kadar önemli. Bu akışlar genellikle bir yerden doğup, bir yerde de son bulmaktadır. Sonlanan yerlerin başında da veri tabanları gelmektedir. Bunlardan birisi de MySQL veri tabanıdır.
MySQL, ilişkisel veri tabanlarında belki de en yaygın kullanılanlarından birisi. Lisans maliyetinin olmayışı onun bu popülerliğinde payı var. Benzer şekilde diğer popüler veri tabanı ise PostgreSQL’dir. PostgreSQL ile bir çok ortak özelliği olmasına rağmen bazı özellikler MySQL’de yok. Bunlardan birisi de upsert operasyonuna imkan veren MERGE’dir. MySQL kullanıyorsanız ve upsert ihtiyacınız varsa MERGE alternatifi bir yöntem bulmalısınız. Ben bu konuyu araştırdığımda ihtiyacımı tam olarak görecek bir yöntem bulamadım. Karşıma hep INSERT ON DUPLICATE KEY UPDATE çıktı. Bu şu anlama geliyor: yeni kayıtları gir, mevcut kayıtları güncelle. İlişkisel veri tabanlarının çalışma kurallarına göre tabloda mevcut bir primary key’i tekrar insert yapamazsınız, hata alırsınız. Bu yöntem hata yerine tekrar girilmeye çalışılan primary key’e ait satırı güncelliyor. Ancak bu yöntemin şöyle bir eksikliği var benim için; yeni gelen kayıtların haricinde tabloda olan eski kayıtları (non-matching rows) silmek veya onları da güncellemek (örneğin tarih null, işaret koyma) istediğimde buna imkan vermiyor. Dolayısıyla MERGE özelliği olmayınca ve INSERT ON DUPLICATE KEY UPDATE ihtiyacımı tam karşılamayınca ben de python-pandas ve sqlalchemy ile kendime göre bir yöntem buldum. Bu yöntemde işin pis tarafını pandas ile özellikle de python set ile halledip sırasıyla mysql’de insert, update ve delete yapıyorum.
Böyle bir giriş yaptıktan sonra örnek olarak ne yapacağız ondan bahsedelim. Senaryomuz şöyle: Günlük olarak yeni gelen müşteriler var ve elimde bu müşteriler için kullandığım bir mysql tablosu var. Bu tabloyu sürekli güncellemem gerekiyor. Bununla ilgili belirlenmiş gereksinimler aşağıdadır.
- Gereksinim-1 Insert: Günlük yeni gelen müşteri kayıtlarından mevcut tabloda olmayanlar varsa yeni kayıt olarak ekle
- Gereksinim-2 Update: Günlük yeni gelen müşteri kayıtlarından mevcut tabloda olanlar varsa tüm sütunları güncelle
- Gereksinim-3 Delete: Günlük yeni gelen müşteri kayıtlarından mevcut tabloda hiç yoksa mevcut tablodan sil veya o gün güncellenmediğine dair bir işaret koy.
Yukarıdaki gereksinimleri küme ile gösterirsek:
Ortam bilgileri:
Python: 3.8
İşletim Sistemi: Ubuntu Desktop 20.04 LTS
Veri tabanı: Mysql 8
python paketi: sqlalchemy
IDE: Jupyter Lab
Hazırlık:
Terminali açalım, mysql shell’e bağlanalım, kendimize bir veri tabanı ve kullanıcı yaratalım, ardından da bu kullanıcıya ait yetkileri tanımlayalım.
mysql> CREATE DATABASE vbo CHARACTER SET utf8; mysql> use vbo; mysql> CREATE USER 'vbo_user'@'192.168.206.140' IDENTIFIED BY 'VBo*kmfd8(06'; mysql> GRANT ALL PRIVILEGES ON `vbo`. * TO 'vbo_user'@'192.168.206.140';
Şimdi bir IDE açalım (Jupyter Lab) ve tablomuzda hali hazırda bulunan kayıtları python sqlalchemy paketini kullanarak girelim.
import sqlalchemy with engine.connect() as conn: conn.execute(""" CREATE TABLE IF NOT EXISTS staff (id int primary key, name_surname varchar(255), section varchar(50)); """) try: query="INSERT INTO `vbo`.`staff` (`id` ,`name_surname` ,`section`) VALUES(%s,%s,%s)" my_data=[(1416, "Tuncay Avcı", "9. Section"), (1417, "İsmail İçtüzer", "9. Section"), (1506, "Mustafa Aslan", "11. Section"), (1527, "Buket Durmuş", "11. Section"), (1525, "Saadet Yılmaz", "11. Section")] id=conn.execute(query, my_data) print("Rows Added = ", id.rowcount) except: print("Database error ")
Output: Rows Added = 5
Günlük olarak yeni gelen kayıtları da pandas dataframe olarak alalım.
new_customers = pd.DataFrame(data= [(1416, "Tuncay Avcı", "11. Section"), (1417, "İsmail İçtüzer", "11. Section"), (1508, "Mustafa Bankur", "11. Section"), (1522, "Meliha Kaya", "11. Section"), (1525, "Saadet Yılmaz", "17. Section")], columns=['id' ,'name_surname' ,'section'] ) new_customers.head()
Az önce MySQL veri tabanına yazdığımız var olan müşterileri de pandas ile okuyup başka bir dataframe yapalım.
existing_customers = pd.read_sql('staff', engine) existing_customers.head()
Bu aşamada elimizde mevcut müşteriler ve yeni günlük müşteriler var. Bu yenilerden bazıları zaten var olabilir. Bu nedenle yukarıdaki 3 iş gereksinimini karşılamak için 3 farklı dataframe oluşturacağız.
Gereksinim-1: Yeni girilecek kayıtlar
Tamamen yeni müşteriler.
insert_ids = set(new_customers.id.to_list()).difference(set(existing_customers.id.to_list())) print(insert_ids) {1522, 1508} insert_df = new_customers[ new_customers['id'].isin(insert_ids) ] insert_df.head()
Yukarıdaki gördüğümüz 2 kayıt mysql staff tablosuna yeni kayıt olarak eklenecek.
Gereksinim-2: Güncellenecek kayıtlar
update_ids = set(existing_customers.id.to_list()).intersection(set(new_customers.id.to_list())) print(update_ids) {1416, 1417, 1525} update_df = existing_customers[ existing_customers['id'].isin(update_ids) ] update_df.head()
Gereksinim-3: Silinecek Kayıtlar
Son olarak da staff tablosundan silinecek kayıtları dataframe yapalım.
delete_ids = set(existing_customers.id.to_list()).difference(set(new_customers.id.to_list())) print(delete_ids) {1506, 1527} delete_df = existing_customers[ existing_customers['id'].isin(delete_ids) ] delete_df.head()
Elde ettiğimiz bu 3 dataframe’i geçici tablolar olarak mysql veri tabanına yazalım.
insert_df.to_sql('tmp_insert_table', engine, if_exists='replace', index=False) update_df.to_sql('tmp_update_table', engine, if_exists='replace', index=False) delete_df.to_sql('tmp_delete_table', engine, if_exists='replace', index=False)
Artık hazırız ve bir transaction olarak kendi upsert operasyonumuzu yapabiliriz.
engine.execution_options(autocommit=False) with engine.connect() as conn: with conn.begin(): # 1. Insert conn.execute( "INSERT INTO staff SELECT * FROM tmp_insert_table ;" ) # 2. Update conn.execute( """ UPDATE staff s INNER JOIN tmp_update_table u ON s.id = u.id SET s.name_surname = u.name_surname, s.section = u.section WHERE s.id = u.id; """ ) # 3. Delete conn.execute(""" DELETE FROM staff s WHERE s.id IN (SELECT id from tmp_delete_table); """)
Yukarıda 3 gereksinimi de yerine getirmiş olduk. Buradaki with conn.begin()
bloğunun güzel bir yönü aynı zamanda bir transaction başlatıyor olmasıdır. Yani operasyon yarıda bir yerde bir hata alırsa otomatik rollback olacak ve veri tabanındaki hiç bir kayıt değişmeyecektir.
Sonucu kontrol edelim:
final_df = pd.read_sql_table('staff', engine) final_df.head()
Sonuç:
Yukarıda Tablo-6’da gördüğümüz gibi yeni kayıtlar eklendi, güncellenecek olanlar güncellendi (örneğin Saadet Yılmaz 17. Section). Dilerseniz bu üç tane temp tablolarını tekrar silebilirsiniz. Bu yöntemde üç tane ilave tablo yaratılmasını eleştirebilirsiniz ancak günlük gelen veri seti ve mevcut tablonuzdaki veri çok büyük değil ise bence ihmal edilebilir. Bu yöntem belki en iyi yöntem olmayabilir ancak MERGE özelliği olmayan MySQL için güzel bir alternatif upsert olduğunu düşünüyorum, üstelik çalışıyor ve gereksinimleri karşılıyor.
Buna benzer bir örneği Spark ve Delta Lake ile yapmıştık. O yazıya buradan ulaşabilirsiniz.
Başka bir yazıda görüşmek dileğiyle hoşçakalın.