Menguji dalam Streaming Terstruktur Apache Spark

pengantar



Saat ini, tidak banyak contoh pengujian untuk aplikasi berdasarkan Spark Structured Streaming. Oleh karena itu, artikel ini memberikan contoh pengujian dasar dengan deskripsi mendetail.







Semua contoh menggunakan: Apache Spark 3.0.1.







Persiapan



Anda perlu menginstal:







  • Apache Spark 3.0.x.
  • Python 3.7 dan lingkungan virtualnya
  • Conda 4.y
  • scikit-learn 0.22.z
  • Maven 3.v
  • Contoh Scala menggunakan versi 2.12.10.


  1. Unduh Apache Spark
  2. Buka kemasan: tar -xvzf ./spark-3.0.1-bin-hadoop2.7.tgz
  3. Buat lingkungan, misalnya, dengan conda: conda create -n sp python = 3.7


Anda perlu menyiapkan variabel lingkungan. Berikut contoh untuk menjalankan secara lokal.







SPARK_HOME=/Users/$USER/Documents/spark/spark-3.0.1-bin-hadoop2.7
PYTHONPATH=$SPARK_HOME/python:$SPARK_HOME/python/lib/py4j-0.10.9-src.zip;
      
      





Tes



Contoh dengan scikit-learn



Saat menulis pengujian, Anda perlu memisahkan kode sehingga Anda dapat mengisolasi logika dan penggunaan API final yang sebenarnya. Contoh isolasi yang baik: DataFrame-pandas , DataFrame-spark .







Contoh berikut akan digunakan untuk menulis pengujian: LinearRegression .







Jadi, mari kita uji kode menggunakan "template" Python berikut:







class XService:

    def __init__(self):
        # 

    def train(self, ds):
        # 

    def predict(self, ds):
        #    

      
      





Untuk Scala, template terlihat seperti ini.







Contoh lengkapnya:







from sklearn import linear_model

class LocalService:

    def __init__(self):
        self.model = linear_model.LinearRegression()

    def train(self, ds):
        X, y = ds
        self.model.fit(X, y)

    def predict(self, ds):
        r = self.model.predict(ds)
        print(r)

      
      





Uji.







Impor:







import unittest
import numpy as np
      
      





Kelas utama:







class RunTest(unittest.TestCase):
      
      





Menjalankan tes:







if __name__ == "__main__":
    unittest.main()
      
      





Persiapan data:







X = np.array([
    [1, 1],  # 6
    [1, 2],  # 8
    [2, 2],  # 9
    [2, 3]  # 11
])
y = np.dot(X, np.array([1, 2])) + 3  # [ 6  8  9 11], y = 1 * x_0 + 2 * x_1 + 3
      
      





Pembuatan dan pelatihan model:







service = local_service.LocalService()
service.train((X, y))
      
      





Mendapatkan hasil:







service.predict(np.array([[3, 5]]))
service.predict(np.array([[4, 6]]))
      
      





Menjawab:







[16.]
[19.]
      
      





Bersama:







import unittest
import numpy as np

from spark_streaming_pp import local_service

class RunTest(unittest.TestCase):
    def test_run(self):
        # Prepare data.
        X = np.array([
            [1, 1],  # 6
            [1, 2],  # 8
            [2, 2],  # 9
            [2, 3]  # 11
        ])
        y = np.dot(X, np.array([1, 2])) + 3  # [ 6  8  9 11], y = 1 * x_0 + 2 * x_1 + 3
        # Create model and train.
        service = local_service.LocalService()
        service.train((X, y))
        # Predict and results.
        service.predict(np.array([[3, 5]]))
        service.predict(np.array([[4, 6]]))
        # [16.]
        # [19.]

if __name__ == "__main__":
    unittest.main()
      
      





Contoh dengan Spark dan Python



– LinearRegression. , Structured Streaming DataFrame-, Spark Sql. .







:







self.service = LinearRegression(maxIter=10, regParam=0.01)
self.model = None
      
      





:







self.model = self.service.fit(ds)
      
      





:







transformed_ds = self.model.transform(ds)
q = transformed_ds.select("label", "prediction").writeStream.format("console").start()
return q
      
      





:







from pyspark.ml.regression import LinearRegression

class StructuredStreamingService:
    def __init__(self):
        self.service = LinearRegression(maxIter=10, regParam=0.01)
        self.model = None

    def train(self, ds):
        self.model = self.service.fit(ds)

    def predict(self, ds):
        transformed_ds = self.model.transform(ds)
        q = transformed_ds.select("label", "prediction").writeStream.format("console").start()
        return q

      
      





.







, .







train_ds = spark.createDataFrame([
    (6.0, Vectors.dense([1.0, 1.0])),
    (8.0, Vectors.dense([1.0, 2.0])),
    (9.0, Vectors.dense([2.0, 2.0])),
    (11.0, Vectors.dense([2.0, 3.0]))
],
    ["label", "features"]
)
      
      





.







, , Structured Streaming, .. DataFrame , DataFrame.

, Spark.







def test_stream_read_options_overwrite(self):
    bad_schema = StructType([StructField("test", IntegerType(), False)])
    schema = StructType([StructField("data", StringType(), False)])
    df = self.spark.readStream.format('csv').option('path', 'python/test_support/sql/fake') \
        .schema(bad_schema)\
        .load(path='python/test_support/sql/streaming', schema=schema, format='text')
    self.assertTrue(df.isStreaming)
    self.assertEqual(df.schema.simpleString(), "struct<data:string>")
      
      





.







:







spark = SparkSession.builder.enableHiveSupport().getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
      
      





( ):







train_ds = spark.createDataFrame([
    (6.0, Vectors.dense([1.0, 1.0])),
    (8.0, Vectors.dense([1.0, 2.0])),
    (9.0, Vectors.dense([2.0, 2.0])),
    (11.0, Vectors.dense([2.0, 3.0]))
],
    ["label", "features"]
)
      
      





:







service = structure_streaming_service.StructuredStreamingService()
service.train(train_ds)
      
      





. : . 3 .







def extract_features(x):
    values = x.split(",")
    features_ = []
    for i in values[1:]:
        features_.append(float(i))
    features = Vectors.dense(features_)
    return features

extract_features_udf = udf(extract_features, VectorUDT())

def extract_label(x):
    values = x.split(",")
    label = float(values[0])
    return label

extract_label_udf = udf(extract_label, FloatType())

predict_ds = spark.readStream.format("text").option("path", "data/structured_streaming").load() \
    .withColumn("features", extract_features_udf(col("value"))) \
    .withColumn("label", extract_label_udf(col("value")))

service.predict(predict_ds).awaitTermination(3)
      
      





:







15.96699
18.96138
      
      





:







import unittest
import warnings

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf
from pyspark.sql.types import FloatType
from pyspark.ml.linalg import Vectors, VectorUDT

from spark_streaming_pp import structure_streaming_service

class RunTest(unittest.TestCase):
    def test_run(self):
        spark = SparkSession.builder.enableHiveSupport().getOrCreate()
        spark.sparkContext.setLogLevel("ERROR")
        # Prepare data.
        train_ds = spark.createDataFrame([
            (6.0, Vectors.dense([1.0, 1.0])),
            (8.0, Vectors.dense([1.0, 2.0])),
            (9.0, Vectors.dense([2.0, 2.0])),
            (11.0, Vectors.dense([2.0, 3.0]))
        ],
            ["label", "features"]
        )
        # Create model and train.
        service = structure_streaming_service.StructuredStreamingService()
        service.train(train_ds)

        # Predict and results.

        def extract_features(x):
            values = x.split(",")
            features_ = []
            for i in values[1:]:
                features_.append(float(i))
            features = Vectors.dense(features_)
            return features

        extract_features_udf = udf(extract_features, VectorUDT())

        def extract_label(x):
            values = x.split(",")
            label = float(values[0])
            return label

        extract_label_udf = udf(extract_label, FloatType())

        predict_ds = spark.readStream.format("text").option("path", "data/structured_streaming").load() \
            .withColumn("features", extract_features_udf(col("value"))) \
            .withColumn("label", extract_label_udf(col("value")))

        service.predict(predict_ds).awaitTermination(3)

        # +-----+------------------+
        # |label|        prediction|
        # +-----+------------------+
        # |  1.0|15.966990887541273|
        # |  2.0|18.961384020443553|
        # +-----+------------------+

    def setUp(self):
        warnings.filterwarnings("ignore", category=ResourceWarning)
        warnings.filterwarnings("ignore", category=DeprecationWarning)

if __name__ == "__main__":
    unittest.main()
      
      





, Scala .

:







implicit val sqlCtx = spark.sqlContext
import spark.implicits._
val source = MemoryStream[Record]
source.addData(Record(1.0, Vectors.dense(3.0, 5.0)))
source.addData(Record(2.0, Vectors.dense(4.0, 6.0)))
val predictDs = source.toDF()
service.predict(predictDs).awaitTermination(2000)
      
      





Scala (, , sql):







package aaa.abc.dd.spark_streaming_pr.cluster

import org.apache.spark.ml.regression.{LinearRegression, LinearRegressionModel}
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.streaming.StreamingQuery

class StructuredStreamingService {
  var service: LinearRegression = _
  var model: LinearRegressionModel = _

  def train(ds: DataFrame): Unit = {
    service = new LinearRegression().setMaxIter(10).setRegParam(0.01)
    model = service.fit(ds)
  }

  def predict(ds: DataFrame): StreamingQuery = {

    val m = ds.sparkSession.sparkContext.broadcast(model)

    def transformFun(features: org.apache.spark.ml.linalg.Vector): Double = {
      m.value.predict(features)
    }

    val transform: org.apache.spark.ml.linalg.Vector => Double = transformFun

    val toUpperUdf = udf(transform)

    val predictionDs = ds.withColumn("prediction", toUpperUdf(ds("features")))

    predictionDs
      .writeStream
      .foreachBatch((r: DataFrame, i: Long) => {
        r.show()
        // scalastyle:off println
        println(s"$i")
        // scalastyle:on println
      })
      .start()
  }
}
      
      





:







package aaa.abc.dd.spark_streaming_pr.cluster

import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.execution.streaming.MemoryStream
import org.scalatest.{Matchers, Outcome, fixture}

class StructuredStreamingServiceSuite extends fixture.FunSuite with Matchers {
  test("run") { spark =>
    // Prepare data.
    val trainDs = spark.createDataFrame(Seq(
      (6.0, Vectors.dense(1.0, 1.0)),
      (8.0, Vectors.dense(1.0, 2.0)),
      (9.0, Vectors.dense(2.0, 2.0)),
      (11.0, Vectors.dense(2.0, 3.0))
    )).toDF("label", "features")
    // Create model and train.
    val service = new StructuredStreamingService()
    service.train(trainDs)
    // Predict and results.
    implicit val sqlCtx = spark.sqlContext
    import spark.implicits._
    val source = MemoryStream[Record]
    source.addData(Record(1.0, Vectors.dense(3.0, 5.0)))
    source.addData(Record(2.0, Vectors.dense(4.0, 6.0)))
    val predictDs = source.toDF()
    service.predict(predictDs).awaitTermination(2000)
    // +-----+---------+------------------+
    // |label| features|        prediction|
    // +-----+---------+------------------+
    // |  1.0|[3.0,5.0]|15.966990887541273|
    // |  2.0|[4.0,6.0]|18.961384020443553|
    // +-----+---------+------------------+
  }

  override protected def withFixture(test: OneArgTest): Outcome = {
    val spark = SparkSession.builder().master("local[2]").getOrCreate()

    try withFixture(test.toNoArgTest(spark))

    finally spark.stop()
  }

  override type FixtureParam = SparkSession

  case class Record(label: Double, features: org.apache.spark.ml.linalg.Vector)

}
      
      







, API. . kafka.







β€œDataFrame” .







Python .












All Articles