Dalam artikel ini, konsultan terkemuka lini bisnis Solusi Data Besar di Neoflex, menjelaskan secara mendetail opsi untuk membangun etalase struktur variabel menggunakan Apache Spark.
Sebagai bagian dari proyek analisis data, tugas membangun mart berdasarkan data yang terstruktur secara longgar sering kali muncul.
Biasanya ini adalah log, atau tanggapan dari berbagai sistem, disimpan sebagai JSON atau XML. Data diunggah ke Hadoop, lalu Anda perlu membuat etalase dari mereka. Kami dapat mengatur akses ke etalase yang dibuat, misalnya, melalui Impala.
Dalam kasus ini, tata letak etalase target sebelumnya tidak diketahui. Selain itu, skema tidak dapat dibuat sebelumnya, karena bergantung pada data, dan kita berurusan dengan data yang terstruktur sangat lemah ini.
Misalnya, hari ini jawaban berikut dicatat:
{source: "app1", error_code: ""}
dan besok jawaban berikut datang dari sistem yang sama:
{source: "app1", error_code: "error", description: "Network error"}
Akibatnya, satu bidang lagi harus ditambahkan ke etalase - deskripsi, dan tidak ada yang tahu apakah itu akan datang atau tidak.
Tugas membuat mart pada data semacam itu cukup standar, dan Spark memiliki sejumlah alat untuk ini. Baik JSON dan XML didukung untuk penguraian data mentah, dan dukungan schemaEvolution disediakan untuk skema yang sebelumnya tidak dikenal.
Sekilas, solusinya terlihat sederhana. Kita perlu mengambil folder dengan JSON dan membacanya menjadi dataframe. Spark akan membuat skema dan mengubah data bersarang menjadi struktur. Maka semuanya perlu disimpan dalam parket, yang juga didukung di Impala, dengan mendaftarkan etalase di metastore Hive.
Segalanya tampak sederhana.
Namun, tidak jelas dari contoh singkat dalam dokumentasi apa yang harus dilakukan dengan sejumlah masalah dalam praktiknya.
Dokumentasi menjelaskan pendekatan bukan untuk membuat etalase, tetapi untuk membaca JSON atau XML ke dalam kerangka data.
Yaitu, hanya diberikan cara membaca dan mengurai JSON:
df = spark.read.json(path...)
Ini cukup untuk membuat data tersedia untuk Spark.
Dalam praktiknya, skenario ini jauh lebih rumit daripada hanya membaca file JSON dari folder dan membuat kerangka data. Situasinya terlihat seperti ini: sudah ada showcase tertentu, data baru datang setiap hari, mereka perlu ditambahkan ke showcase, tidak lupa bahwa skemanya mungkin berbeda.
Skema umum untuk membangun etalase adalah sebagai berikut:
Langkah 1. Data dimuat ke Hadoop, diikuti dengan pemuatan ulang harian dan ditambahkan ke partisi baru. Ternyata folder dengan data awal dipartisi menurut hari.
Langkah 2.Selama booting awal, folder ini dibaca dan diurai oleh Spark. Kerangka data yang dihasilkan disimpan dalam format yang tersedia untuk analisis, misalnya, dalam parket, yang kemudian dapat diimpor ke Impala. Ini membuat showcase target dengan semua data yang telah terakumulasi hingga saat ini.
Langkah 3. Download dibuat yang akan memperbarui etalase setiap hari.
Muncul pertanyaan tentang pemuatan tambahan, kebutuhan untuk mempartisi etalase, dan pertanyaan tentang mendukung skema umum etalase.
Mari beri contoh. Katakanlah langkah pertama membangun penyimpanan diimplementasikan, dan ekspor file JSON ke folder dikonfigurasi.
Tidak masalah untuk membuat kerangka data dari mereka, lalu menyimpannya sebagai etalase. Ini adalah langkah pertama yang dapat Anda temukan dengan mudah di dokumentasi Spark:
df = spark.read.option("mergeSchema", True).json(".../*")
df.printSchema()
root
|-- a: long (nullable = true)
|-- b: string (nullable = true)
|-- c: struct (nullable = true) |
|-- d: long (nullable = true)
Segalanya tampak baik-baik saja.
Kami membaca dan mengurai JSON, lalu kami menyimpan dataframe sebagai parket, mendaftarkannya dengan Hive dengan cara apa pun yang nyaman:
df.write.format(βparquetβ).option('path','<External Table Path>').saveAsTable('<Table Name>')
Kami mendapatkan etalase.
Tapi, keesokan harinya, data baru dari sumber itu ditambahkan. Kami memiliki folder dengan JSON, dan etalase dibuat berdasarkan folder ini. Setelah memuat potongan data berikutnya dari sumber, data mart akan kehabisan data selama satu hari.
Solusi logisnya adalah dengan mempartisi etalase berdasarkan hari, yang akan memungkinkan penambahan partisi baru setiap hari berikutnya. Mekanisme untuk ini juga terkenal, Spark memungkinkan Anda menulis partisi secara terpisah.
Pertama, kami melakukan pemuatan inisialisasi, menyimpan data seperti yang dijelaskan di atas, hanya menambahkan partisi. Tindakan ini disebut inisialisasi etalase dan dilakukan hanya sekali:
df.write.partitionBy("date_load").mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable)
Keesokan harinya, kami hanya memuat partisi baru:
df.coalesce(1).write.mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable +"/date_load=" + date_load + "/")
Yang tersisa hanyalah mendaftar ulang dengan Hive untuk memperbarui skema.
Namun, di sinilah masalah muncul.
Masalah pertama. Cepat atau lambat, parket yang dihasilkan tidak dapat dibaca. Ini ada hubungannya dengan perbedaan pendekatan parket dan JSON pada bidang kosong.
Mari pertimbangkan situasi yang khas. Misalnya, JSON tiba kemarin:
1: {"a": {"b": 1}},
dan hari ini JSON yang sama terlihat seperti ini:
2: {"a": null}
Katakanlah kita memiliki dua partisi berbeda dengan masing-masing satu baris.
Saat kita membaca seluruh data mentah, Spark akan dapat menentukan jenisnya, dan memahami bahwa "a" adalah bidang berjenis "struktur", dengan bidang bertingkat "b" berjenis INT. Tetapi, jika setiap partisi disimpan secara terpisah, maka parket dengan skema partisi yang tidak kompatibel diperoleh:
df1 (a: <struct<"b": INT>>)
df2 (a: STRING NULLABLE)
Situasi ini diketahui dengan baik, oleh karena itu, sebuah opsi telah ditambahkan secara khusus - saat mengurai data awal, hapus bidang kosong:
df = spark.read.json("...", dropFieldIfAllNull=True)
Dalam hal ini, parket akan terdiri dari sekat-sekat yang bisa dibaca bersama.
Meskipun mereka yang telah melakukan ini dalam praktiknya akan tertawa getir. Mengapa? Karena dua situasi lagi kemungkinan besar akan muncul. Atau tiga. Atau empat. Yang pertama, yang hampir pasti akan muncul, adalah tipe numerik akan terlihat berbeda di file JSON yang berbeda. Misalnya, {intField: 1} dan {intField: 1.1}. Jika bidang seperti itu ditemukan di satu bagian, gabungan skema akan membaca semuanya dengan benar, mengarah ke jenis yang paling akurat. Tetapi jika berbeda, maka yang satu akan memiliki intField: int, dan intField: double lainnya.
Ada bendera berikut untuk menangani situasi ini:
df = spark.read.json("...", dropFieldIfAllNull=True, primitivesAsString=True)
Sekarang kita memiliki folder tempat partisi berada, yang dapat dibaca menjadi satu kerangka data dan parket yang valid untuk seluruh etalase. Iya? Tidak.
Ingatlah bahwa kami mendaftarkan tabel di Hive. Sarang tidak membedakan huruf besar-kecil di nama bidang, sedangkan parket membedakan huruf besar-kecil. Oleh karena itu, partisi dengan skema: field1: int dan Field1: int sama untuk Hive, tetapi tidak untuk Spark. Ingatlah untuk menggunakan huruf kecil pada nama field.
Setelah itu, semuanya tampak baik-baik saja.
Namun, tidak semuanya sesederhana itu. Masalah kedua yang juga terkenal muncul. Karena setiap partisi baru disimpan secara terpisah, file layanan Spark akan berada di folder partisi, misalnya, bendera keberhasilan operasi _SUCCESS. Ini akan menimbulkan kesalahan saat mencoba parket. Untuk menghindarinya, Anda perlu mengatur konfigurasi dengan menonaktifkan Spark dari menambahkan file layanan ke folder:
hadoopConf = sc._jsc.hadoopConfiguration()
hadoopConf.set("parquet.enable.summary-metadata", "false")
hadoopConf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")
Tampaknya sekarang setiap hari partisi parket baru ditambahkan ke folder etalase target, tempat data yang diuraikan untuk hari itu disimpan. Kami telah berhati-hati sebelumnya bahwa tidak ada partisi dengan konflik tipe data.
Tapi, sebelum kita ada masalah ketiga. Sekarang skema umum tidak diketahui, apalagi, di Hive, tabel dengan skema yang salah, karena setiap partisi baru, kemungkinan besar, memperkenalkan distorsi dalam skema.
Anda perlu mendaftarkan ulang tabel. Ini dapat dilakukan dengan mudah: baca lagi parket etalase, ambil skema dan buat DDL berdasarkan itu, yang akan mendaftarkan ulang folder di Hive sebagai tabel eksternal, memperbarui skema etalase toko target.
Kami dihadapkan pada masalah keempat. Pertama kali kami mendaftarkan tabel, kami mengandalkan Spark. Sekarang kami melakukannya sendiri, dan Anda perlu ingat bahwa bidang parket bisa dimulai dengan karakter yang tidak valid untuk Hive. Misalnya, Spark mengeluarkan baris yang tidak dapat diurai ke dalam bidang "corrupt_record". Bidang seperti itu tidak dapat didaftarkan dengan Hive tanpa melarikan diri.
Mengetahui hal ini, kami mendapatkan skema:
f_def = ""
for f in pf.dtypes:
if f[0] != "date_load":
f_def = f_def + "," + f[0].replace("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("array<`", "array<")
table_define = "CREATE EXTERNAL TABLE jsonevolvtable (" + f_def[1:] + " ) "
table_define = table_define + "PARTITIONED BY (date_load string) STORED AS PARQUET LOCATION '/user/admin/testJson/testSchemaEvolution/pq/'"
hc.sql("drop table if exists jsonevolvtable")
hc.sql(table_define)
Kode ("_corrupt_record", "` _corrupt_record` ") +" "+ f [1] .replace (": "," `:"). Ganti ("<", "<` "). Ganti (", " , ",` "). replace (" array <`", "array <") membuat DDL aman, yaitu, sebagai ganti:
create table tname (_field1 string, 1field string)
Dengan nama kolom seperti "_field1, 1field", DDL aman dibuat di mana nama kolom diloloskan: buat tabel `tname` (string` _field1`, string `1field`).
Muncul pertanyaan: bagaimana cara mendapatkan dataframe dengan skema lengkap dengan benar (dalam kode pf)? Bagaimana cara mendapatkan pf ini? Ini masalah kelima. Baca ulang skema semua partisi dari folder dengan file parket dari etalase target? Ini adalah metode yang paling aman, tetapi yang paling sulit.
Skema sudah ada di Hive. Anda bisa mendapatkan skema baru dengan menggabungkan skema seluruh tabel dan partisi baru. Jadi, Anda perlu mengambil skema tabel dari Hive dan menggabungkannya dengan skema partisi baru. Ini dapat dilakukan dengan membaca metadata uji dari Hive, menyimpannya ke folder sementara, dan membaca kedua partisi dengan Spark sekaligus.
Pada dasarnya, Anda memiliki semua yang Anda butuhkan: skema tabel asli di Hive dan partisi baru. Kami juga punya datanya. Yang tersisa hanyalah mendapatkan skema baru yang menggabungkan skema etalase dan bidang baru dari partisi yang dibuat:
from pyspark.sql import HiveContext
from pyspark.sql.functions import lit
hc = HiveContext(spark)
df = spark.read.json("...", dropFieldIfAllNull=True)
df.write.mode("overwrite").parquet(".../date_load=12-12-2019")
pe = hc.sql("select * from jsonevolvtable limit 1")
pe.write.mode("overwrite").parquet(".../fakePartiton/")
pf = spark.read.option("mergeSchema", True).parquet(".../date_load=12-12-2019/*", ".../fakePartiton/*")
Selanjutnya, kami membuat DDL untuk mendaftarkan tabel, seperti pada cuplikan sebelumnya.
Jika seluruh rantai bekerja dengan benar, yaitu - ada pemuatan awal, dan di Hive ada tabel yang dibuat dengan benar, maka kita mendapatkan skema tabel yang diperbarui.
Dan masalah terakhir adalah Anda tidak bisa begitu saja menambahkan partisi ke tabel Hive, karena akan rusak. Anda perlu memaksa Hive untuk memperbaiki struktur partisi:
from pyspark.sql import HiveContext
hc = HiveContext(spark)
hc.sql("MSCK REPAIR TABLE " + db + "." + destTable)
Tugas sederhana membaca JSON dan membuat etalase darinya diterjemahkan ke dalam mengatasi sejumlah kesulitan implisit, yang mana Anda harus mencari solusinya secara terpisah. Meskipun solusi ini sederhana, namun perlu waktu lama untuk menemukannya.
Untuk melaksanakan pembangunan etalase, saya harus:
- Tambahkan partisi ke etalase, hapus file layanan
- Tangani bidang kosong di data asli yang telah diketik oleh Spark
- Transmisikan tipe sederhana ke string
- Ubah nama bidang menjadi huruf kecil
- Pisahkan dump data dan pendaftaran tabel di Hive (pembuatan DDL)
- Ingatlah untuk keluar dari nama bidang yang mungkin tidak kompatibel dengan Hive
- Belajar untuk memperbarui pendaftaran tabel di Hive
Kesimpulannya, kami mencatat bahwa keputusan untuk membangun etalase menyembunyikan banyak jebakan. Oleh karena itu, jika timbul kesulitan dalam implementasi, lebih baik menghubungi mitra berpengalaman dengan keahlian yang sukses.
Terima kasih telah membaca artikel ini, semoga informasinya bermanfaat.