Apa yang harus kita lakukan ... memuat JSON ke Platform Data

Halo semuanya!





Dalam artikel baru - baru ini, kami membahas bagaimana kami membangun Platform Data kami. Hari ini saya ingin menyelam lebih dalam ke "perut" platform kami dan sepanjang jalan memberi tahu Anda tentang bagaimana kami memecahkan salah satu masalah yang muncul sehubungan dengan semakin beragamnya sumber data terintegrasi.





Artinya, jika kita kembali ke gambar terakhir dari artikel di atas (saya secara khusus menduplikasinya agar lebih nyaman bagi pembaca yang budiman), hari ini kita akan berbicara lebih mendalam tentang penerapan "sisi kanan" dari skema - salah satu yang terletak setelah Apache NiFi.





Diagram dari artikel kami sebelumnya.
Diagram dari artikel kami sebelumnya.

Sebagai pengingat, perusahaan kami memiliki lebih dari 350 database relasional. Secara alami, tidak semuanya "unik" dan banyak yang pada dasarnya adalah salinan berbeda dari sistem yang sama yang dipasang di semua toko jaringan perdagangan, tetapi masih ada "kebun binatang keragaman". Oleh karena itu, seseorang tidak dapat melakukannya tanpa Kerangka apa pun yang menyederhanakan dan mempercepat integrasi sumber ke dalam Platform Data.





Skema umum untuk mengirimkan data dari sumber ke lapisan Greenplum ODS menggunakan kerangka kerja yang kami kembangkan ditunjukkan di bawah ini:





Skema umum pengiriman data ke lapisan Greenplum ODS
ODS- Greenplum
  1. - Kafka AVRO-, Apache NiFi, parquet S3.





  2. «» Spark’ :





    1. Compaction – ( «»), : distinct() coalesce(). S3. parsing' , « »;





    2. Parsing – , . , ( gzip) CSV- S3.





  3. – CSV- ODS- : external table S3 PXF S3 connector, pgsql ODS- Greenplum





  4. Airflow.





DAG’ Airflow . Parsing . , , :





  • ODS- - ;





  • Git YAML-:





    • ( : , , S3-, , email ..);





    • ODS ( , , ODS- ). , ;





, . , , JSON-. , MongoDB MongoDB Kafka source connector Kafka. framework’ . , S3 JSON - " ", parquet Apache NiFi.





Compaction. , «» , :





df = spark.read.format(in_format) \
               .options(**in_options) \
               .load(path) \
               .distinct()    
new_df = df.coalesce(div)
new_df.write.mode("overwrite") \ 
            .format(out_format) \
            .options(**out_options) \
            .save(path)
      
      



JSON-, - , JSON’ Spark mergeSchema, .. , . – , - . « ».





-, , , S3. :





JSON- DataFrame , JSON-.





. , :





file1:





{«productId»: 1, «productName»: «ProductName 1», «tags»: [«tag 1», «tag 2»], «dimensions»: {«length»: 10, «width»: 12, «height»: 12.5}}
{«productId»: 2, «price»: 10.01, «tags»: [«tag 1», «tag 2»], «dimensions»: {«length»: 10, «width»: 12, «height»: 12.5}}
      
      



. JSON-, 1 = 1 . , , JSON-, JSON-. JSON- S3 ( " Apache NiFi).





:





#  
df = spark.read \
          .format("csv") \
          .option("sep", "\a") \
          .load("file1.json")

#   DataFrame
df.printSchema()

root
 |-- _c0: string (nullable = true)

#  
df.show()

+--------------------+
|                 _c0|
+--------------------+
|{"productId": 1, ...|
|{"productId": 2, ...|
+--------------------+
      
      



JSON CSV, , . , Bell character. DataFrame , dicstinct() coalesce(), . :





#  parquet
in_format = "parquet"
in_options = {}

#  JSON
in_format = "csv"
in_options = {"sep": "\a"}
      
      



DataFrame S3 :





df.write.mode("overwrite") \   
        .format(out_format) \
				.options(**out_options) \  
				.save(path)  

#  JSON     
out_format = "text" 
out_options = {"compression": "gzip"}  

#  parquet   
out_format = input_format 
out_options = {"compression": "snappy"}
      
      



Parsing. , , : JSON -, parquet, . , JSON- Spark , , JSON- , mergeSchema. . , - «field_1», , , . Spark DataFrame , Parsing, , - - , .





. , :





file1 ( ):





{«productId»: 1, «productName»: «ProductName 1», «tags»: [«tag 1», «tag 2»], «dimensions»: {«length»: 10, «width»: 12, «height»: 12.5}}
{«productId»: 2, «price»: 10.01, «tags»: [«tag 1», «tag 2»], «dimensions»: {«length»: 10, «width»: 12, «height»: 12.5}}
      
      



file2:





{«productId»: 3, «productName»: «ProductName 3», «dimensions»: {«length»: 10, «width»: 12, «height»: 12.5, «package»: [10, 20.5, 30]}}
      
      



Spark’ DataFrame:





df = spark.read \
          .format("json") \
          .option("multiline", "false") \
          .load(path)
df.printSchema()
df.show()
      
      



( ):





root
 |-- dimensions: struct (nullable = true)
 |    |-- height: double (nullable = true)
 |    |-- length: long (nullable = true)
 |    |-- width: long (nullable = true)
 |-- price: double (nullable = true)
 |-- productId: long (nullable = true)
 |-- productName: string (nullable = true)
 |-- tags: array (nullable = true)
 |    |-- element: string (containsNull = true)

+--------------+-----+---------+-------------+--------------+
|    dimensions|price|productId|  productName|          tags|
+--------------+-----+---------+-------------+--------------+
|[12.5, 10, 12]| null|        1|ProductName 1|[tag 1, tag 2]|
|[12.5, 10, 12]|10.01|        2|         null|[tag 1, tag 2]|
+--------------+-----+---------+-------------+--------------+
      
      



( ):





root
 |-- dimensions: struct (nullable = true)
 |    |-- height: double (nullable = true)
 |    |-- length: long (nullable = true)
 |    |-- package: array (nullable = true)
 |    |    |-- element: double (containsNull = true)
 |    |-- width: long (nullable = true)
 |-- productId: long (nullable = true)
 |-- productName: string (nullable = true)

+--------------------+---------+-------------+
|          dimensions|productId|  productName|
+--------------------+---------+-------------+
|[12.5, 10, [10.0,...|        3|ProductName 3|
+--------------------+---------+-------------+
      
      



, Spark . - , , DataFrame null ( price productName ).





, , ( ) ,





root
 |-- price: double (nullable = true)
 |-- productId: long (nullable = true)
 |-- productName: string (nullable = true)
      
      



«- file2», «price» , Spark , «price» DataFrame. parquet- , parquet- AVRO, , , parquet-.





, , , framework’, - JSON’ – JSON- S3.





, JSON- JSON- . JSON’ - , DataFrame , null:





df = spark.read \
          .format("json") \
          .option("multiline","false") \
          .schema(df_schema) \
          .load(path)
      
      



- YAML- . , Kafka, , Kafka Schema Registry, JSON ( , , Kafka Schema Registry ).





, :





  • Kafka Schema Registry





  • pyspark.sql.types.StructType – - :





# 1.   Kafka Schema Registry REST API   
# 2.     schema  :
df_schema = StructType.fromJson(schema)
      
      



  • JSON-





, … JSON-, Spark’. JSON file2 . JSON , :





df.schema.json()  
      
      



{
    "fields":
    [
        {
            "metadata": {},
            "name": "dimensions",
            "nullable": true,
            "type":
            {
                "fields":
                [
                    {"metadata":{},"name":"height","nullable":true,"type":"double"},
                    {"metadata":{},"name":"length","nullable":true,"type":"long"},
                    {"metadata":{},"name":"width","nullable":true,"type":"long"}
                ],
                "type": "struct"
            }
        },
        {
            "metadata": {},
            "name": "price",
            "nullable": true,
            "type": "double"
        },
        {
            "metadata": {},
            "name": "productId",
            "nullable": true,
            "type": "long"
        },
        {
            "metadata": {},
            "name": "productName",
            "nullable": true,
            "type": "string"
        },
        {
            "metadata": {},
            "name": "tags",
            "nullable": true,
            "type":
            {
                "containsNull": true,
                "elementType": "string",
                "type": "array"
            }
        }
    ],
    "type": "struct"
}

      
      



, JSON-.





« , JSON- , Spark’» - … , , , :





DataFrame JSON,





https://github.com/zalando-incubator/spark-json-schema, , Scala, pySpark …





, SchemaConverter. – . , «» - .





, , JSON. DataPlatform : NiFi Kafka, parquet, « » NiFi AVRO-schema, S3. - - -:





, :)
root
 |-- taskId: string (nullable = true)
 |-- extOrderId: string (nullable = true)
 |-- taskStatus: string (nullable = true)
 |-- taskControlStatus: string (nullable = true)
 |-- documentVersion: long (nullable = true)
 |-- buId: long (nullable = true)
 |-- storeId: long (nullable = true)
 |-- priority: string (nullable = true)
 |-- created: struct (nullable = true)
 |    |-- createdBy: string (nullable = true)
 |    |-- created: string (nullable = true)
 |-- lastUpdateInformation: struct (nullable = true)
 |    |-- updatedBy: string (nullable = true)
 |    |-- updated: string (nullable = true)
 |-- customerId: string (nullable = true)
 |-- employeeId: string (nullable = true)
 |-- pointOfGiveAway: struct (nullable = true)
 |    |-- selected: string (nullable = true)
 |    |-- available: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |-- dateOfGiveAway: string (nullable = true)
 |-- dateOfGiveAwayEnd: string (nullable = true)
 |-- pickingDeadline: string (nullable = true)
 |-- storageLocation: string (nullable = true)
 |-- currentStorageLocations: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- customerType: string (nullable = true)
 |-- comment: string (nullable = true)
 |-- totalAmount: double (nullable = true)
 |-- currency: string (nullable = true)
 |-- stockDecrease: boolean (nullable = true)
 |-- offline: boolean (nullable = true)
 |-- trackId: string (nullable = true)
 |-- transportationType: string (nullable = true)
 |-- stockRebook: boolean (nullable = true)
 |-- notificationStatus: string (nullable = true)
 |-- lines: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- lineId: string (nullable = true)
 |    |    |-- extOrderLineId: string (nullable = true)
 |    |    |-- productId: string (nullable = true)
 |    |    |-- lineStatus: string (nullable = true)
 |    |    |-- lineControlStatus: string (nullable = true)
 |    |    |-- orderedQuantity: double (nullable = true)
 |    |    |-- confirmedQuantity: double (nullable = true)
 |    |    |-- assignedQuantity: double (nullable = true)
 |    |    |-- pickedQuantity: double (nullable = true)
 |    |    |-- controlledQuantity: double (nullable = true)
 |    |    |-- allowedForGiveAwayQuantity: double (nullable = true)
 |    |    |-- givenAwayQuantity: double (nullable = true)
 |    |    |-- returnedQuantity: double (nullable = true)
 |    |    |-- sellingScheme: string (nullable = true)
 |    |    |-- stockSource: string (nullable = true)
 |    |    |-- productPrice: double (nullable = true)
 |    |    |-- lineAmount: double (nullable = true)
 |    |    |-- currency: string (nullable = true)
 |    |    |-- markingFlag: string (nullable = true)
 |    |    |-- operations: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- operationId: string (nullable = true)
 |    |    |    |    |-- type: string (nullable = true)
 |    |    |    |    |-- reason: string (nullable = true)
 |    |    |    |    |-- quantity: double (nullable = true)
 |    |    |    |    |-- dmCodes: array (nullable = true)
 |    |    |    |    |    |-- element: string (containsNull = true)
 |    |    |    |    |-- timeStamp: string (nullable = true)
 |    |    |    |    |-- updatedBy: string (nullable = true)
 |    |    |-- source: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- type: string (nullable = true)
 |    |    |    |    |-- items: array (nullable = true)
 |    |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |    |-- assignedQuantity: double (nullable = true)
 |-- linkedObjects: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- objectType: string (nullable = true)
 |    |    |-- objectId: string (nullable = true)
 |    |    |-- objectStatus: string (nullable = true)
 |    |    |-- objectLines: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- objectLineId: string (nullable = true)
 |    |    |    |    |-- taskLineId: string (nullable = true)

      
      



, , -, Avro- JSON-. : , «» . , , ( ) , JSON-, Kafka Schema Registry, «, ».





SparkJsonSchemaConverter – , definitions, refs ( ) oneOf. , «» JSON- pyspark.sql.types.StructType





, , Open Source, , , , Open Source . . Open Source , , !





SparkJsonSchemaConverter’ Parsing «» S3: ( ) S3 -:





#  JSON
df = spark.read.format(in_format)\
            .option("multiline", "false")\
            .schema(json_schema) \
            .load(path)

#  parquet:
df = spark.read.format(in_format)\
            .load(path)
      
      



, DataFrame’ CSV-.





framework’ Data Platform JSON- . :





  • 4 JSON-!





  • « » framework’, , «» .








All Articles