Mengumpulkan data dan mengirim ke Apache Kafka

pengantar



Untuk menganalisis data streaming, Anda memerlukan sumber data ini. Informasi yang diberikan oleh sumber juga penting. Dan sumber dengan informasi tekstual, misalnya, juga jarang.



Sumber yang menarik antara lain: twitter , vk . Tetapi sumber-sumber ini tidak cocok untuk semua tugas.



Ada sumber dengan data yang diperlukan, tetapi sumber ini tidak mengalir. Tautan berikut dapat dikutip di sini: public-apis .



Anda dapat menggunakan metode lama saat memecahkan masalah dengan data streaming.



Unduh data dan kirim ke streaming.



Misalnya, Anda dapat menggunakan sumber berikut: imdb .

Perlu diperhatikan bahwa imdb menyediakan datanya sendiri. Lihat Kumpulan Data IMDb . Namun dapat diasumsikan bahwa data yang dikumpulkan secara langsung berisi informasi yang lebih relevan.



Bahasa: Java 1.8.

Perpustakaan: kafka 2.6.0, jsoup 1.13.1.



Pengumpulan data



Pengumpulan data adalah layanan yang, sesuai dengan data masukan, memuat halaman html, mencari informasi yang diperlukan dan mengubahnya menjadi sekumpulan objek.



: imdb. : https://www.imdb.com/search/title/?release_date=%s,%s&countries=%s

1, 2 โ€“ . 3 โ€“ .



: imdb-extensive-dataset.



:



public interface MovieDirectScrapingService {
    Collection<Movie> scrap();
}


Movie โ€“ , ( ..).



class Movie {
    public final String titleId;
    public final String titleUrl;
    public final String title;
    public final String description;
    public final Double rating;
    public final String genres;
    public final String runtime;
    public final String baseUrl;
    public final String baseNameUrl;
    public final String baseTitleUrl;
    public final String participantIds;
    public final String participantNames;
    public final String directorIds;
    public final String directorNames;
โ€ฆ


.



. jsoup. html- .



String scrap(String url, List<Movie> items) {
    Document doc = null;
    try {
        doc = Jsoup.connect(url).header("Accept-Language", language).get();
    } catch (IOException e) {
        e.printStackTrace();
    }
    if (doc != null) {
        collectItems(doc, items);
        return nextUrl(doc);
    }
    return "";
}


.



String nextUrl(Document doc) {
    Elements nextPageElements = doc.select(".next-page");
    if (nextPageElements.size() > 0) {
        Element hrefElement = nextPageElements.get(0);
        return baseUrl + hrefElement.attributes().get("href");
    }
    return "";
}


. . . , . .



@Override
public Collection<Movie> scrap() {
    String url = String.format(
            baseUrl + "/search/title/?release_date=%s,%s&countries=%s",
            startDate, endDate, countries
    );
    List<Movie> items = new ArrayList<>();
    String nextUrl = url;
    while (true) {
        nextUrl = scrap(nextUrl, items);
        if ("".equals(nextUrl)) {
            break;
        }
        try {
            Thread.sleep(50);
        } catch (InterruptedException e) {
        }
    }
    return items;
}


.





: MovieProducer. : run.



. . .



public void run() {
    try (SimpleStringStringProducer producer = new SimpleStringStringProducer(
            bootstrapServers, clientId, topic)) {
        Collection<Data.Movie> movies = movieDirectScrapingService.scrap();
        List<SimpleStringStringProducer.KeyValueStringString> kvList = new ArrayList<>();
        for (Data.Movie move : movies) {
            Map<String, String> map = new HashMap<>();
            map.put("title_id", move.titleId);
            map.put("title_url", move.titleUrl);
            โ€ฆ
            String value = JSONObject.toJSONString(map);
            String key = UUID.randomUUID().toString();
            kvList.add(new SimpleStringStringProducer.KeyValueStringString(key, value));
        }
        producer.produce(kvList);
    }
}




. .

: MovieDirectScrapingExecutor. : run.



. .



public void run() {
    int countriesCounter = 0;
    List<String> countriesSource = Arrays.asList("us");

    while (true) {
        try {
            LocalDate localDate = LocalDate.now();

            int year = localDate.getYear();
            int month = localDate.getMonthValue();
            int day = localDate.getDayOfMonth();

            String monthString = month < 9 ? "0" + month : Integer.toString(month);
            String dayString = day < 9 ? "0" + day : Integer.toString(day);

            String startDate = year + "-" + monthString + "-" + dayString;
            String endDate = startDate;

            String language = "en";
            String countries = countriesSource.get(countriesCounter);

            execute(language, startDate, endDate, countries);

            Thread.sleep(1000);

            countriesCounter += 1;
            if (countriesCounter >= countriesSource.size()) {
                countriesCounter = 0;
            }

        } catch (InterruptedException e) {
        }
    }
}


MovieDirectScrapingExecutor, , , main.



.



{
  "base_name_url": "https:\/\/www.imdb.com\/name",
  "participant_ids": "nm7947173~nm2373827~nm0005288~nm0942193~",
  "title_id": "tt13121702",
  "rating": "0.0",
  "base_url": "https:\/\/www.imdb.com",
  "description": "It's Christmas time and Jackie (Carly Hughes), an up-and-coming journalist, finds that her life is at a crossroads until she finds an unexpected opportunity - to run a small-town newspaper ... See full summary ยป",
  "runtime": "",
  "title": "The Christmas Edition",
  "director_ids": "nm0838289~",
  "title_url": "\/title\/tt13121702\/?ref_=adv_li_tt",
  "director_names": "Peter Sullivan~",
  "genres": "Drama, Romance",
  "base_title_url": "https:\/\/www.imdb.com\/title",
  "participant_names": "Carly Hughes~Rob Mayes~Marie Osmond~Aloma Wright~"
}


.





, , -. kafka-.

. Apache Kafka Kafka Server.



: MovieProducerTest.



public class MovieProducerTest {
    @Test
    void simple() throws InterruptedException {
        String brokerHost = "127.0.0.1";
        int brokerPort = 29092;
        String zooKeeperHost = "127.0.0.1";
        int zooKeeperPort = 22183;
        String bootstrapServers = brokerHost + ":" + brokerPort;
        String topic = "q-data";
        String clientId = "simple";
        try (KafkaServerService kafkaServerService = new KafkaServerService(
                brokerHost, brokerPort, zooKeeperHost, zooKeeperPort
        )
        ) {
            kafkaServerService.start();
            kafkaServerService.createTopic(topic);

            MovieDirectScrapingService movieDirectScrapingServiceImpl = () -> Collections.singleton(
                    new Data.Movie(โ€ฆ)
            );
            MovieProducer movieProducer =
                    new MovieProducer(bootstrapServers, clientId, topic, movieDirectScrapingServiceImpl);
            movieProducer.run();

            kafkaServerService.poll(topic, "simple", 1, 5, (records) -> {
                assertTrue(records.count() > 0);
                ConsumerRecord<String, String> record = records.iterator().next();
                JSONParser jsonParser = new JSONParser();
                JSONObject jsonObject = null;
                try {
                    jsonObject = (JSONObject) jsonParser.parse(record.value());
                } catch (ParseException e) {
                    e.printStackTrace();
                }
                assertNotNull(jsonObject);
        โ€ฆ
            });

            Thread.sleep(5000);
        }
    }
}




, , . .





.




All Articles