Mengelola atribut entitas di Apache Kafka

pengantar



Saat mengerjakan masalah pembelajaran mesin dengan data online, ada kebutuhan untuk mengumpulkan berbagai entitas menjadi satu untuk analisis dan evaluasi lebih lanjut. Proses pengumpulan harus mudah dan cepat. Ini juga harus sering menyediakan transisi yang mulus dari pengembangan ke penggunaan industri tanpa usaha tambahan dan pekerjaan rutin. Anda dapat menggunakan pendekatan Toko Fitur untuk menyelesaikan masalah ini. Pendekatan ini dijelaskan dalam banyak detail di sini: Perkenalkan Michelangelo: Platform Pembelajaran Mesin Uber . Artikel ini menjelaskan cara menafsirkan solusi manajemen fitur yang ditentukan sebagai prototipe.







Toko Fitur untuk streaming online



Feature Store dapat dipandang sebagai layanan yang harus menjalankan fungsinya secara ketat sesuai dengan spesifikasinya. Sebelum menentukan spesifikasi ini, contoh sederhana harus dibongkar.







Contoh



Biarkan entitas berikut diberikan.







Film yang memiliki ID dan judul.







Rating film, yang juga memiliki pengenal, pengenal film, dan nilai ratingnya sendiri. Peringkat berubah seiring waktu.







Sumber peringkat, yang juga memiliki peringkatnya sendiri. Dan itu berubah seiring waktu.

Dan Anda perlu menggabungkan entitas ini menjadi satu.







Inilah yang terjadi.







gambar

Diagram entitas







Seperti yang Anda lihat, penggabungan terjadi dengan kunci entitas. Itu. semua peringkat film dicari untuk sebuah film, dan semua peringkat sumber untuk sebuah peringkat film.







Generalisasi contoh



, .







kafka-, : A, B… NN.

: AB, BCD… NM.

: Feature Stream Engine.







Feature Stream Engine kafka-, Feature Stream Store Feature Stream Center, .







gambar

Feature Stream Engine







Feature Stream Store



, .







– (feature).







, , .







.







Feature Stream Center



, , .







Feature Stream Engine



Feature Stream Engine , .







gambar

Feature Stream Engine







Feature Stream Engine



Feature Stream Engine , .







Feature Stream Engine .







.

kafka.

.

( ).

, .







gambar

Feature Stream Engine









.







.







, ("configration.properties").







.







topic- kafka. “,”.

. “,”.

topic-.







, .







public static FeaturesDescriptor createFromProperties(Properties properties) {
    String sources = properties.getProperty(FEATURES_DESCRIPTOR_FEATURE_DESCRIPTORS_SOURCES);
    String keys = properties.getProperty(FEATURES_DESCRIPTOR_FEATURE_DESCRIPTORS_KEYS);
    String sinkSource = properties.getProperty(FEATURES_DESCRIPTOR_SINK_SOURCE);
    String[] sourcesArray = sources.split(",");
    String[] keysArray = keys.split(",");
    List<FeatureDescriptor> featureDescriptors = new ArrayList<>();
    for (int i = 0; i < sourcesArray.length; i++) {
        FeatureDescriptor featureDescriptor =
                new FeatureDescriptor(sourcesArray[i], keysArray[i]);
        featureDescriptors.add(featureDescriptor);
    }
    return new FeaturesDescriptor(featureDescriptors, sinkSource);
}
      
      





public static class FeatureDescriptor {
    public final String source;
    public final String key;

    public FeatureDescriptor(String source, String key) {
        this.source = source;
        this.key = key;
    }
}
      
      





public static class FeaturesDescriptor {
    public final List<FeatureDescriptor> featureDescriptors;
    public final String sinkSource;

    public FeaturesDescriptor(List<FeatureDescriptor> featureDescriptors, String sinkSource) {
        this.featureDescriptors = featureDescriptors;
        this.sinkSource = sinkSource;
    }
}
      
      





.







void buildStreams(StreamsBuilder builder)
      
      





topic-, , , .







Serde<String> stringSerde = Serdes.String();

List<KStream<String, String>> streams = new ArrayList<>();

for (FeatureDescriptor featureDescriptor : featuresDescriptor.featureDescriptors) {
    KStream<String, String> stream =
            builder.stream(featureDescriptor.source, Consumed.with(stringSerde, stringSerde))
                    .map(new KeyValueMapperSimple(featureDescriptor.key));
    streams.add(stream);
}
      
      





.







KStream<String, String> pref = streams.get(0);
for (int i = 1; i < streams.size(); i++) {
    KStream<String, String> cur = streams.get(i);
    pref = pref.leftJoin(cur,
            new ValueJoinerSimple(),
            JoinWindows.of(Duration.ofSeconds(1)),
            StreamJoined.with(
                    Serdes.String(),
                    Serdes.String(),
                    Serdes.String())
    );
}
      
      





topic.







pref.to(featuresDescriptor.sinkSource);
      
      





.







public void buildStreams(StreamsBuilder builder) {
    Serde<String> stringSerde = Serdes.String();

    List<KStream<String, String>> streams = new ArrayList<>();

    for (FeatureDescriptor featureDescriptor : featuresDescriptor.featureDescriptors) {
        KStream<String, String> stream =
                builder.stream(featureDescriptor.source, Consumed.with(stringSerde, stringSerde))
                        .map(new KeyValueMapperSimple(featureDescriptor.key));
        streams.add(stream);
    }

    if (streams.size() > 0) {
        if (streams.size() == 1) {
            KStream<String, String> stream = streams.get(0);
            stream.to(featuresDescriptor.sinkSource);
        } else {
            KStream<String, String> pref = streams.get(0);
            for (int i = 1; i < streams.size(); i++) {
                KStream<String, String> cur = streams.get(i);
                pref = pref.leftJoin(cur,
                        new ValueJoinerSimple(),
                        JoinWindows.of(Duration.ofSeconds(1)),
                        StreamJoined.with(
                                Serdes.String(),
                                Serdes.String(),
                                Serdes.String())
                );
            }
            pref.to(featuresDescriptor.sinkSource);
        }
    }
}
      
      





.







void run(Properties config)
      
      





( ).







FeaturesStream featuresStream = new FeaturesStream(config);
      
      





kafka.







StreamsBuilder builder = new StreamsBuilder();

featuresStream.buildStreams(builder);
Topology topology = builder.build();
KafkaStreams streams = new KafkaStreams(topology, featuresStream.buildStreamsProperties());
      
      





.







streams.start();
      
      





.







public static void run(Properties config) {
    StreamsBuilder builder = new StreamsBuilder();
    FeaturesStream featuresStream = new FeaturesStream(config);
    featuresStream.buildStreams(builder);
    Topology topology = builder.build();
    KafkaStreams streams = new KafkaStreams(topology, featuresStream.buildStreamsProperties());
    CountDownLatch latch = new CountDownLatch(1);
    Runtime.getRuntime().addShutdownHook(new Thread(() -> {
        streams.close();
        latch.countDown();
    }));
    try {
        streams.start();
        latch.await();
    } catch (Throwable e) {
        System.exit(1);
    }
    System.exit(0);
}
      
      





.







java -jar features-stream-1.0.0.jar -c plain.properties
      
      





: Java 1.8.

: kafka 2.6.0, jsoup 1.13.1.









. .







Pertama: memungkinkan Anda dengan cepat membuat topic-to-union.

Kedua: memungkinkan Anda untuk mulai menggabungkan dengan cepat di lingkungan yang berbeda.







Perlu dicatat bahwa solusi tersebut memberikan batasan pada struktur data masukan. Yakni, topik-dan harus memiliki struktur tabel. Untuk mengatasi batasan ini, Anda dapat memasukkan lapisan tambahan yang memungkinkan Anda mengurangi berbagai struktur menjadi tabular.







Untuk implementasi industri dari fungsionalitas penuh, Anda harus memperhatikan fungsionalitas yang sangat kuat dan, yang paling penting, fleksibel: KSQL .







Tautan dan Sumber Daya



Kode sumber ;

Perkenalkan Michelangelo: Platform Pembelajaran Mesin Uber .








All Articles