Apache Kafka dan pengujian dengan Kafka Server

pengantar



Ada berbagai cara untuk menulis pengujian menggunakan Apache Kafka. Misalnya, TestContainers dan EmbeddedKafka dapat digunakan . Anda dapat membaca tentang ini, misalnya, di sini: Kesulitan dalam menguji Aliran Kafka . Tetapi ada juga opsi untuk menulis tes menggunakan KafkaServer.



Apa yang akan diuji?



Misalkan Anda perlu mengembangkan layanan untuk mengirim pesan melalui berbagai saluran: email, telegram, dll.



Biarkan nama layanan menjadi: SenderService.



Layanan harus: mendengarkan saluran tertentu, memilih pesan yang dibutuhkan dari saluran tersebut, mengurai pesan dan mengirimkannya melalui saluran yang diinginkan untuk pengiriman akhir pesan.



Untuk menguji layanan, Anda perlu membuat pesan yang akan dikirim menggunakan saluran pengiriman surat dan memastikan bahwa pesan itu dikirim ke saluran terakhir.

Tentu saja, dalam aplikasi dunia nyata, pengujian akan lebih sulit. Tetapi untuk mengilustrasikan pendekatan yang dipilih, tes semacam itu sudah cukup.



Layanan dan pengujian diimplementasikan menggunakan: Java 1.8, Kafka 2.1.0, JUnit 5.5.2, Maven 3.6.1.



Layanan



Layanan akan dapat memulai dan menghentikan pekerjaannya.



void start()

void stop()


Pada awalnya, Anda harus mengatur setidaknya parameter berikut:



String bootstrapServers
String senderTopic
EmailService emailService


bootstrapServers – kafka.

senderTopic – , .

emailService – .



.



«», , . «» . «» : Introducing the Kafka Consumer: Getting Started with the New Apache Kafka 0.9 Consumer Client.



Collection<AutoCloseable> closeables = new ArrayList<>();
ExecutorService senderTasksExecutor = Executors.newFixedThreadPool(senderTasksN);
ExecutorService tasksExecutorService = Executors.newFixedThreadPool(tasksN);
for (int i = 0; i < senderTasksN; i++) {
    SenderConsumerLoop senderConsumerLoop =
            new SenderConsumerLoop(
                    bootstrapServers,
                    senderTopic,
                    "sender",
                    "sender",
                    tasksExecutorService,
                    emailService
            );
    closeables.add(senderConsumerLoop);
    senderTasksExecutor.submit(senderConsumerLoop);
}


«», .



«» . .



Runtime.getRuntime().addShutdownHook(new Thread(() -> {
    for (AutoCloseable autoCloseable : closeables) {
        try {
            autoCloseable.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    senderTasksExecutor.shutdown();
    tasksExecutorService.shutdown();
    stop();
    try {
        senderTasksExecutor.awaitTermination(5000, TimeUnit.MILLISECONDS);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}));


.



«»



«» :



void run()

void close()


: run.



@Override
public void run() {
    kafkaConsumer = createKafkaConsumerStringString(bootstrapServers, clientId, groupId);
    kafkaConsumer.subscribe(Collections.singleton(topic));
    while (true) {
        calculate(kafkaConsumer.poll(Duration.ofSeconds(1)));
    }
}


«kafka-». «kafka-» . . .



json- , , .



:



{
  "subject": {
    "subject_type": "send"
  },
  "body": {
    "method": "email",
    "recipients": "mrbrown@ml.ml;mrblack@ml.ml;mrwhite@ml.ml",
    "title": "42",
    "message": "73"
  }
}


subject_type — . «send».

method – . «email» — .

recipients – .

title – .

message – .



:



void calculate(ConsumerRecords<String, String> records) {
    for (ConsumerRecord<String, String> record : records) {
        calculate(record);
    }
}


:



void calculate(ConsumerRecord<String, String> record) {
            JSONParser jsonParser = new JSONParser();
            Object parsedObject = null;
            try {
                parsedObject = jsonParser.parse(record.value());
            } catch (ParseException e) {
                e.printStackTrace();
            }
            if (parsedObject instanceof JSONObject) {
                JSONObject jsonObject = (JSONObject) parsedObject;
                JSONObject jsonSubject = (JSONObject) jsonObject.get(SUBJECT);
                String subjectType = jsonSubject.get(SUBJECT_TYPE).toString();
                if (SEND.equals(subjectType)) {
                    JSONObject jsonBody = (JSONObject) jsonObject.get(BODY);
                    calculate(jsonBody);
                }
            }
        }


:



void calculate(JSONObject jsonBody) {
    String method = jsonBody.get(METHOD).toString();
    if (EMAIL_METHOD.equals(method)) {
        String recipients = jsonBody.get(RECIPIENTS).toString();
        String title = jsonBody.get(TITLE).toString();
        String message = jsonBody.get(MESSAGE).toString();
        sendEmail(recipients, title, message);
    }
}


:



void sendEmail(String recipients, String title, String message) {
    tasksExecutorService.submit(() -> emailService.send(recipients, title, message));
}


.



.



«kafka-»:



static KafkaConsumer<String, String> createKafkaConsumerStringString(
        String bootstrapServers,
        String clientId,
        String groupId
) {
    Properties properties = new Properties();
    properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    properties.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
    properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    properties.setProperty(
            ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
            "org.apache.kafka.common.serialization.StringDeserializer");
    properties.setProperty(
            ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
            "org.apache.kafka.common.serialization.StringDeserializer");
    properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    return new KafkaConsumer<>(properties);
}


:



interface EmailService {
    void send(String recipients, String title, String message);
}




.

«kafka-».

«kafka-».

.



«kafka-». .



public class SenderServiceTest {
    @Test
    void consumeEmail() throws InterruptedException {
        String brokerHost = "127.0.0.1";
        int brokerPort = 29092;
        String bootstrapServers = brokerHost + ":" + brokerPort;
        String senderTopic = "sender_data";
        try (KafkaServerService kafkaServerService = new KafkaServerService(brokerHost, brokerPort)) {
            kafkaServerService.start();
            kafkaServerService.createTopic(senderTopic);

        }
    }
}


. «kafka-». «kafka-» . .



«mock» :



SenderService.EmailService emailService = mock(SenderService.EmailService.class);


:



SenderService senderService = new SenderService(bootstrapServers, senderTopic, emailService);
senderService.start();


:



String recipients = "mrbrown@ml.ml;mrblack@ml.ml;mrwhite@ml.ml";
String title = "42";
String message = "73";


:



kafkaServerService.send(senderTopic, key(), createMessage(EMAIL_METHOD, recipients, title, message));


:



Thread.sleep(6000);


, :



verify(emailService).send(recipients, title, message);


:



senderService.stop();


:



public class SenderServiceTest {
    @Test
    void consumeEmail() throws InterruptedException {
        String brokerHost = "127.0.0.1";
        int brokerPort = 29092;
        String bootstrapServers = brokerHost + ":" + brokerPort;
        String senderTopic = "sender_data";
        try (KafkaServerService kafkaServerService = new KafkaServerService(brokerHost, brokerPort)) {
            kafkaServerService.start();
            kafkaServerService.createTopic(senderTopic);
            SenderService.EmailService emailService = mock(SenderService.EmailService.class);
            SenderService senderService = new SenderService(bootstrapServers, senderTopic, emailService);
            senderService.start();
            String recipients = "mrbrown@ml.ml;mrblack@ml.ml;mrwhite@ml.ml";
            String title = "42";
            String message = "73";
            kafkaServerService.send(senderTopic, key(), createMessage(EMAIL_METHOD, recipients, title, message));
            Thread.sleep(6000);
            verify(emailService).send(recipients, title, message);
            senderService.stop();
        }
    }
}


:



public class SenderFactory {
    public static final String SUBJECT = "subject";
    public static final String SUBJECT_TYPE = "subject_type";
    public static final String BODY = "body";
    public static final String METHOD = "method";
    public static final String EMAIL_METHOD = "email";
    public static final String RECIPIENTS = "recipients";
    public static final String TITLE = "title";
    public static final String MESSAGE = "message";
    public static final String SEND = "send";

    public static String key() {
        return UUID.randomUUID().toString();
    }

    public static String createMessage(String method, String recipients, String title, String message) {
        Map<String, Object> map = new HashMap<>();
        Map<String, Object> subject = new HashMap<>();
        Map<String, Object> body = new HashMap<>();
        map.put(SUBJECT, subject);
        subject.put(SUBJECT_TYPE, SEND);
        map.put(BODY, body);
        body.put(METHOD, method);
        body.put(RECIPIENTS, recipients);
        body.put(TITLE, title);
        body.put(MESSAGE, message);
        return JSONObject.toJSONString(map);
    }
}


«kafka-»



:



void start()

void close()

void createTopic(String topic)


«start» .



Buat "penjaga kebun binatang" dan simpan alamatnya:



zkServer = new EmbeddedZookeeper();
String zkConnect = zkHost + ":" + zkServer.port();


Buat klien penjaga kebun binatang:



zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer$.MODULE$);
zkUtils = ZkUtils.apply(zkClient, false);


Mengatur properti untuk server:



Properties brokerProps = new Properties();
brokerProps.setProperty("zookeeper.connect", zkConnect);
brokerProps.setProperty("broker.id", "0");
try {
    brokerProps.setProperty("log.dirs", Files.createTempDirectory("kafka-").toAbsolutePath().toString());
} catch (IOException e) {
    throw new RuntimeException(e);
}
brokerProps.setProperty("listeners", "PLAINTEXT://" + brokerHost + ":" + brokerPort);
brokerProps.setProperty("offsets.topic.replication.factor", "1");
KafkaConfig config = new KafkaConfig(brokerProps);


Pembuatan server:



kafkaServer = TestUtils.createServer(config, new MockTime());


Bersama:



public void start() {
    zkServer = new EmbeddedZookeeper();
    String zkConnect = zkHost + ":" + zkServer.port();
    zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer$.MODULE$);
    zkUtils = ZkUtils.apply(zkClient, false);
    Properties brokerProps = new Properties();
    brokerProps.setProperty("zookeeper.connect", zkConnect);
    brokerProps.setProperty("broker.id", "0");
    try {
        brokerProps.setProperty("log.dirs", Files.createTempDirectory("kafka-").toAbsolutePath().toString());
    } catch (IOException e) {
        throw new RuntimeException(e);
    }
    brokerProps.setProperty("listeners", "PLAINTEXT://" + brokerHost + ":" + brokerPort);
    brokerProps.setProperty("offsets.topic.replication.factor", "1");
    KafkaConfig config = new KafkaConfig(brokerProps);
    kafkaServer = TestUtils.createServer(config, new MockTime());
}


Menghentikan layanan:



@Override
public void close() {
    kafkaServer.shutdown();
    zkClient.close();
    zkServer.shutdown();
}


Pembuatan topik:



public void createTopic(String topic) {
    AdminUtils.createTopic(
            zkUtils, topic, 1, 1, new Properties(), RackAwareMode.Disabled$.MODULE$);
}


Kesimpulan



Sebagai kesimpulan, perlu dicatat bahwa kode yang diberikan di sini hanya menggambarkan metode yang dipilih.



Untuk membuat dan menguji layanan menggunakan "kafka", Anda dapat merujuk ke sumber daya berikut:

kafka-streams-example



Tautan dan Sumber Daya



Sumber



Kode untuk pengujian dengan "server kafka"




All Articles