Halo semuanya!
Dalam artikel baru - baru ini, kami membahas bagaimana kami membangun Platform Data kami. Hari ini saya ingin menyelam lebih dalam ke "perut" platform kami dan sepanjang jalan memberi tahu Anda tentang bagaimana kami memecahkan salah satu masalah yang muncul sehubungan dengan semakin beragamnya sumber data terintegrasi.
Artinya, jika kita kembali ke gambar terakhir dari artikel di atas (saya secara khusus menduplikasinya agar lebih nyaman bagi pembaca yang budiman), hari ini kita akan berbicara lebih mendalam tentang penerapan "sisi kanan" dari skema - salah satu yang terletak setelah Apache NiFi.
Sebagai pengingat, perusahaan kami memiliki lebih dari 350 database relasional. Secara alami, tidak semuanya "unik" dan banyak yang pada dasarnya adalah salinan berbeda dari sistem yang sama yang dipasang di semua toko jaringan perdagangan, tetapi masih ada "kebun binatang keragaman". Oleh karena itu, seseorang tidak dapat melakukannya tanpa Kerangka apa pun yang menyederhanakan dan mempercepat integrasi sumber ke dalam Platform Data.
Skema umum untuk mengirimkan data dari sumber ke lapisan Greenplum ODS menggunakan kerangka kerja yang kami kembangkan ditunjukkan di bawah ini:
- Kafka AVRO-, Apache NiFi, parquet S3.
«» Spark’ :
Compaction – ( «»), : distinct() coalesce(). S3. parsing' , « »;
Parsing – , . , ( gzip) CSV- S3.
– CSV- ODS- : external table S3 PXF S3 connector, pgsql ODS- Greenplum
Airflow.
DAG’ Airflow . Parsing . , , :
ODS- - ;
Git YAML-:
( : , , S3-, , email ..);
ODS ( , , ODS- ). , ;
, . , , JSON-. , MongoDB MongoDB Kafka source connector Kafka. framework’ . , S3 JSON - " ", parquet Apache NiFi.
Compaction. , «» , :
df = spark.read.format(in_format) \
.options(**in_options) \
.load(path) \
.distinct()
new_df = df.coalesce(div)
new_df.write.mode("overwrite") \
.format(out_format) \
.options(**out_options) \
.save(path)
JSON-, - , JSON’ Spark mergeSchema, .. , . – , - . « ».
-, , , S3. :
JSON- DataFrame , JSON-.
. , :
file1:
{«productId»: 1, «productName»: «ProductName 1», «tags»: [«tag 1», «tag 2»], «dimensions»: {«length»: 10, «width»: 12, «height»: 12.5}}
{«productId»: 2, «price»: 10.01, «tags»: [«tag 1», «tag 2»], «dimensions»: {«length»: 10, «width»: 12, «height»: 12.5}}
. JSON-, 1 = 1 . , , JSON-, JSON-. JSON- S3 ( " Apache NiFi).
:
#
df = spark.read \
.format("csv") \
.option("sep", "\a") \
.load("file1.json")
# DataFrame
df.printSchema()
root
|-- _c0: string (nullable = true)
#
df.show()
+--------------------+
| _c0|
+--------------------+
|{"productId": 1, ...|
|{"productId": 2, ...|
+--------------------+
JSON CSV, , . , Bell character. DataFrame , dicstinct() coalesce(), . :
# parquet
in_format = "parquet"
in_options = {}
# JSON
in_format = "csv"
in_options = {"sep": "\a"}
DataFrame S3 :
df.write.mode("overwrite") \
.format(out_format) \
.options(**out_options) \
.save(path)
# JSON
out_format = "text"
out_options = {"compression": "gzip"}
# parquet
out_format = input_format
out_options = {"compression": "snappy"}
Parsing. , , : JSON -, parquet, . , JSON- Spark , , JSON- , mergeSchema. . , - «field_1», , , . Spark DataFrame , Parsing, , - - , .
. , :
file1 ( ):
{«productId»: 1, «productName»: «ProductName 1», «tags»: [«tag 1», «tag 2»], «dimensions»: {«length»: 10, «width»: 12, «height»: 12.5}}
{«productId»: 2, «price»: 10.01, «tags»: [«tag 1», «tag 2»], «dimensions»: {«length»: 10, «width»: 12, «height»: 12.5}}
file2:
{«productId»: 3, «productName»: «ProductName 3», «dimensions»: {«length»: 10, «width»: 12, «height»: 12.5, «package»: [10, 20.5, 30]}}
Spark’ DataFrame:
df = spark.read \
.format("json") \
.option("multiline", "false") \
.load(path)
df.printSchema()
df.show()
( ):
root
|-- dimensions: struct (nullable = true)
| |-- height: double (nullable = true)
| |-- length: long (nullable = true)
| |-- width: long (nullable = true)
|-- price: double (nullable = true)
|-- productId: long (nullable = true)
|-- productName: string (nullable = true)
|-- tags: array (nullable = true)
| |-- element: string (containsNull = true)
+--------------+-----+---------+-------------+--------------+
| dimensions|price|productId| productName| tags|
+--------------+-----+---------+-------------+--------------+
|[12.5, 10, 12]| null| 1|ProductName 1|[tag 1, tag 2]|
|[12.5, 10, 12]|10.01| 2| null|[tag 1, tag 2]|
+--------------+-----+---------+-------------+--------------+
( ):
root
|-- dimensions: struct (nullable = true)
| |-- height: double (nullable = true)
| |-- length: long (nullable = true)
| |-- package: array (nullable = true)
| | |-- element: double (containsNull = true)
| |-- width: long (nullable = true)
|-- productId: long (nullable = true)
|-- productName: string (nullable = true)
+--------------------+---------+-------------+
| dimensions|productId| productName|
+--------------------+---------+-------------+
|[12.5, 10, [10.0,...| 3|ProductName 3|
+--------------------+---------+-------------+
, Spark . - , , DataFrame null ( price productName ).
, , ( ) ,
root
|-- price: double (nullable = true)
|-- productId: long (nullable = true)
|-- productName: string (nullable = true)
«- file2», «price» , Spark , «price» DataFrame. parquet- , parquet- AVRO, , , parquet-.
, , , framework’, - JSON’ – JSON- S3.
, JSON- JSON- . JSON’ - , DataFrame , null:
df = spark.read \
.format("json") \
.option("multiline","false") \
.schema(df_schema) \
.load(path)
- YAML- . , Kafka, , Kafka Schema Registry, JSON ( , , Kafka Schema Registry ).
, :
Kafka Schema Registry
pyspark.sql.types.StructType – - :
# 1. Kafka Schema Registry REST API
# 2. schema :
df_schema = StructType.fromJson(schema)
JSON-
, … JSON-, Spark’. JSON file2 . JSON , :
df.schema.json()
{
"fields":
[
{
"metadata": {},
"name": "dimensions",
"nullable": true,
"type":
{
"fields":
[
{"metadata":{},"name":"height","nullable":true,"type":"double"},
{"metadata":{},"name":"length","nullable":true,"type":"long"},
{"metadata":{},"name":"width","nullable":true,"type":"long"}
],
"type": "struct"
}
},
{
"metadata": {},
"name": "price",
"nullable": true,
"type": "double"
},
{
"metadata": {},
"name": "productId",
"nullable": true,
"type": "long"
},
{
"metadata": {},
"name": "productName",
"nullable": true,
"type": "string"
},
{
"metadata": {},
"name": "tags",
"nullable": true,
"type":
{
"containsNull": true,
"elementType": "string",
"type": "array"
}
}
],
"type": "struct"
}
, JSON-.
« , JSON- , Spark’» - … , , , :
DataFrame JSON,
https://github.com/zalando-incubator/spark-json-schema, , Scala, pySpark …
, SchemaConverter. – . , «» - .
, , JSON. DataPlatform : NiFi Kafka, parquet, « » NiFi AVRO-schema, S3. - - -:
, :)
root |-- taskId: string (nullable = true) |-- extOrderId: string (nullable = true) |-- taskStatus: string (nullable = true) |-- taskControlStatus: string (nullable = true) |-- documentVersion: long (nullable = true) |-- buId: long (nullable = true) |-- storeId: long (nullable = true) |-- priority: string (nullable = true) |-- created: struct (nullable = true) | |-- createdBy: string (nullable = true) | |-- created: string (nullable = true) |-- lastUpdateInformation: struct (nullable = true) | |-- updatedBy: string (nullable = true) | |-- updated: string (nullable = true) |-- customerId: string (nullable = true) |-- employeeId: string (nullable = true) |-- pointOfGiveAway: struct (nullable = true) | |-- selected: string (nullable = true) | |-- available: array (nullable = true) | | |-- element: string (containsNull = true) |-- dateOfGiveAway: string (nullable = true) |-- dateOfGiveAwayEnd: string (nullable = true) |-- pickingDeadline: string (nullable = true) |-- storageLocation: string (nullable = true) |-- currentStorageLocations: array (nullable = true) | |-- element: string (containsNull = true) |-- customerType: string (nullable = true) |-- comment: string (nullable = true) |-- totalAmount: double (nullable = true) |-- currency: string (nullable = true) |-- stockDecrease: boolean (nullable = true) |-- offline: boolean (nullable = true) |-- trackId: string (nullable = true) |-- transportationType: string (nullable = true) |-- stockRebook: boolean (nullable = true) |-- notificationStatus: string (nullable = true) |-- lines: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- lineId: string (nullable = true) | | |-- extOrderLineId: string (nullable = true) | | |-- productId: string (nullable = true) | | |-- lineStatus: string (nullable = true) | | |-- lineControlStatus: string (nullable = true) | | |-- orderedQuantity: double (nullable = true) | | |-- confirmedQuantity: double (nullable = true) | | |-- assignedQuantity: double (nullable = true) | | |-- pickedQuantity: double (nullable = true) | | |-- controlledQuantity: double (nullable = true) | | |-- allowedForGiveAwayQuantity: double (nullable = true) | | |-- givenAwayQuantity: double (nullable = true) | | |-- returnedQuantity: double (nullable = true) | | |-- sellingScheme: string (nullable = true) | | |-- stockSource: string (nullable = true) | | |-- productPrice: double (nullable = true) | | |-- lineAmount: double (nullable = true) | | |-- currency: string (nullable = true) | | |-- markingFlag: string (nullable = true) | | |-- operations: array (nullable = true) | | | |-- element: struct (containsNull = true) | | | | |-- operationId: string (nullable = true) | | | | |-- type: string (nullable = true) | | | | |-- reason: string (nullable = true) | | | | |-- quantity: double (nullable = true) | | | | |-- dmCodes: array (nullable = true) | | | | | |-- element: string (containsNull = true) | | | | |-- timeStamp: string (nullable = true) | | | | |-- updatedBy: string (nullable = true) | | |-- source: array (nullable = true) | | | |-- element: struct (containsNull = true) | | | | |-- type: string (nullable = true) | | | | |-- items: array (nullable = true) | | | | | |-- element: struct (containsNull = true) | | | | | | |-- assignedQuantity: double (nullable = true) |-- linkedObjects: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- objectType: string (nullable = true) | | |-- objectId: string (nullable = true) | | |-- objectStatus: string (nullable = true) | | |-- objectLines: array (nullable = true) | | | |-- element: struct (containsNull = true) | | | | |-- objectLineId: string (nullable = true) | | | | |-- taskLineId: string (nullable = true)
, , -, Avro- JSON-. : , «» . , , ( ) , JSON-, Kafka Schema Registry, «, ».
SparkJsonSchemaConverter – , definitions, refs ( ) oneOf. , «» JSON- pyspark.sql.types.StructType
, , Open Source, , , , Open Source . . Open Source , , !
SparkJsonSchemaConverter’ Parsing «» S3: ( ) S3 -:
# JSON
df = spark.read.format(in_format)\
.option("multiline", "false")\
.schema(json_schema) \
.load(path)
# parquet:
df = spark.read.format(in_format)\
.load(path)
, DataFrame’ CSV-.
framework’ Data Platform JSON- . :
4 JSON-!
« » framework’, , «» .