PySpark. Memecahkan masalah sesi menemukan

Selamat siang, para pembaca yang budiman! Beberapa hari yang lalu, membaca ulang buku Anthony Molinaro β€œSQL. Kumpulan resep ”, di salah satu bab saya menemukan topik yang dikhususkan untuk menentukan awal dan akhir kisaran nilai yang berurutan. Setelah membaca materi secara singkat, saya langsung teringat bahwa saya sudah pernah menemukan pertanyaan ini sebagai salah satu tugas tes, tetapi kemudian topiknya dinyatakan sebagai β€œTugas sesi menemukan”. Trik wawancara teknis bukanlah review dari pekerjaan yang dilakukan, tetapi salah satu pertanyaan pewawancara tentang bagaimana mendapatkan nilai yang serupa dengan menggunakan Spark. Mempersiapkan wawancara, saya tidak tahu bahwa perusahaan menggunakan (atau mungkin tidak ...) Apache Spark, dan karena itu tidak mengumpulkan informasi tentang alat baru untuk saya pada saat itu. Hanya tinggal mengajukan hipotesis bahwa solusi yang diinginkan bisa seperti skrip,yang dapat ditulis menggunakan pustaka Pandas. Meskipun sangat jauh, saya masih mencapai target, tetapi saya tidak berhasil bekerja di organisasi ini.





Agar adil, saya ingin mencatat bahwa selama bertahun-tahun saya hanya membuat sedikit kemajuan dalam mempelajari Apache Spark. Tetapi saya masih ingin berbagi praktik terbaik dengan pembaca, karena banyak analis belum menemukan alat ini sama sekali, dan orang lain mungkin memiliki wawancara serupa. Jika Anda seorang profesional Spark, Anda selalu dapat menyarankan kode yang lebih optimal di komentar ke pos.





Ini adalah pembukaan, mari kita lanjutkan langsung ke analisis topik ini. Mari kita pergi dulu dan menulis skrip SQL. Tapi pertama-tama, mari buat database dan mengisinya dengan nilai. Karena ini adalah contoh demo, saya sarankan menggunakan SQLite. Basis data ini kalah dengan "kolega di toko" yang lebih andal, tetapi kemampuannya untuk pengembangan skrip sudah cukup bagi kami secara penuh. Untuk mengotomatiskan operasi di atas, saya menulis kode berikut dengan Python.





#  
import sqlite3

#     
projects = [
    ('2020-01-01', '2020-01-02'),
    ('2020-01-02', '2020-01-03'),
    ('2020-01-03', '2020-01-04'),
    ('2020-01-04', '2020-01-05'),
    ('2020-01-06', '2020-01-07'),
    ('2020-01-16', '2020-01-17'),
    ('2020-01-17', '2020-01-18'),
    ('2020-01-18', '2020-01-19'),
    ('2020-01-19', '2020-01-20'),
    ('2020-01-21', '2020-01-22'),
    ('2020-01-26', '2020-01-27'),
    ('2020-01-27', '2020-01-28'),
    ('2020-01-28', '2020-01-29'),
    ('2020-01-29', '2020-01-30')
]

try:
    #  
    con = sqlite3.connect("projects.sqlite")
    #  
    cur = con.cursor()
    #  
    cur.execute("""CREATE TABLE IF NOT EXISTS projects (
                    proj_id INTEGER PRIMARY KEY AUTOINCREMENT,
                    proj_start TEXT,
                    proj_end TEXT)""")
    #  
    cur.executemany("INSERT INTO projects VALUES(NULL, ?,?)", projects)
    #  
    con.commit()
    #  
    cur.close()
except sqlite3.Error as err:
    print("  ", err)
finally:
    #  
    con.close()
    print("  ")

      
      



. DBeaver. , SQL .





, , , . , - . , . ().





select 
      p3.proj_group, 
      min(p3.proj_start) as date_start,
      max(p3.proj_end) as date_end,
      julianday(max(p3.proj_end))-julianday( min(p3.proj_end))+1 as delta
from
    (select 
	     p2.*,
	     sum(p2.flag)over(order by p2.proj_id) as proj_group
	from 
		(select 
		      p.proj_id , 
		      p.proj_start, 
		      p.proj_end, 
		      case 
		      when lag(p.proj_end)over(order by p.proj_id) = p.proj_start then 0 else 1 
		      end as flag
		from projects as p) as p2) as p3
group by p3.proj_group
      
      



, . . , : . , . , , lag. 0, 1. , . . , .  . , ( julianday SQLite). . Spark.





, Apache Spark         ,  Hadoop. Java, Scala R, Spark PySpark. . Google Colab, . - , . , .





Linux OpenJDK, Spark. . findspark. , .





SQLite , . , .





Spark , . , . -, , , -, . , β€œ Spark. ”, , , , .





, , SQL. : , ( datediff).





, . , - , , , SQL Spark. , , . .





from pyspark.sql.functions import lag
from pyspark.sql import functions as F
from pyspark.sql.window import Window
# Equivalent of Pandas.dataframe.shift() method
w = Window().partitionBy().orderBy(col("proj_id"))
df_dataframe = df.withColumn('lag', F.lag("proj_end").over(w))
#...
# Equivalent of SQL- CASE WHEN...THEN...ELSE... END
df_dataframe = df_dataframe.withColumn('flag',F.when(df_dataframe["proj_start"] == df_dataframe["lag"],0).otherwise(1))
#...
# Cumsum by column flag
w = Window().partitionBy().orderBy(col("proj_id"))
df_dataframe = df_dataframe.withColumn("proj_group", F.sum("flag").over(w))
#...
# Equivalent of SQL - GROUP BY
from pyspark.sql.functions import  min, max
df_group = df_dataframe.groupBy("proj_group").agg(min("proj_start").alias("date_start"), \
                                                  max("proj_end").alias("date_end"))
df_group = df_group.withColumn("delta", F.datediff(df_group.date_end,df_group.date_start))
df_group.show()
      
      



.





  1. , . . , β€œβ€ , .





  2. Meskipun Anda belum pernah bekerja dengan Spark sebelumnya, ini bukanlah alasan untuk menolak persaingan untuk mendapatkan posisi kosong. Dasar-dasar PySpark dapat dikuasai dalam waktu singkat, asalkan latar belakangnya sudah memiliki pengalaman pemrograman menggunakan pustaka Pandas.





  3. Tidak ada kekurangan buku tentang Spark.





Itu saja. Semua kesehatan, semoga sukses dan kesuksesan profesional!








All Articles