Integrasi Spring - aliran data dinamis

Kembang api, Habr! Hari ini kita akan menganalisis area yang agak spesifik - streaming data menggunakan kerangka Spring Integration dan bagaimana membuat aliran ini saat runtime tanpa inisialisasi awal dalam konteks aplikasi. Contoh aplikasi lengkap ada di Gita .



pengantar



Spring Integration adalah Enterprise Integration Framework (EIP) yang menggunakan mekanisme pengiriman pesan di bawah penutup antara adapter dari berbagai protokol / sistem integrasi berdasarkan saluran pesan (antrian kondisional). Analog terkenal adalah Unta, Mule, Nifi.



Dari test case, kita harus - membuat layanan REST yang dapat membaca parameter permintaan yang diterima, pergi ke database kami, misalnya, postgres, memperbarui dan mengambil data tabel sesuai dengan parameter yang diterima dari sumber, dan mengirim hasilnya kembali ke antrian (permintaan / response), dan juga buat beberapa instance dengan jalur permintaan berbeda.



Secara konvensional, diagram aliran data akan terlihat seperti ini:



gambar



Selanjutnya, saya akan menunjukkan bagaimana Anda bisa melakukan ini tanpa banyak menari dengan rebana, menggunakan IntegrationFlowContext, dengan titik akhir komponen / pengontrol REST. Semua kode proyek utama akan terletak di repositori, di sini saya hanya akan menunjukkan beberapa kliping. Nah, siapa yang tertarik, tolong, di bawah kucing.



Alat



Mari kita mulai dengan blok dependensi secara default. Pada dasarnya, kita akan memerlukan proyek pegas boot - untuk ideologi REST tentang manajemen aliran / komponen, pegas integrasi - untuk membuat kasing kami berdasarkan saluran dan adaptor.



Dan kami segera berpikir apa lagi yang kami butuhkan untuk mereproduksi kasus ini. Selain dependensi inti, kita memerlukan subproyek - integrasi-http, integrasi-jdbc, integrasi-groovy (menyediakan konverter data yang dapat disesuaikan secara dinamis berdasarkan skrip Goovy). Secara terpisah, saya akan mengatakan bahwa dalam contoh ini kita tidak akan menggunakan konverter groovy sebagai tidak perlu, tetapi kami akan memberikan kemampuan untuk menyesuaikannya dari luar.



Daftar ketergantungan
 <!-- Spring block -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.data</groupId>
            <artifactId>spring-data-commons</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-integration</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-groovy</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-http</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-jdbc</artifactId>
        </dependency>

        <!-- Db block -->
        <dependency>
            <groupId>org.postgresql</groupId>
            <artifactId>postgresql</artifactId>
        </dependency>

        <dependency>
            <groupId>com.zaxxer</groupId>
            <artifactId>HikariCP</artifactId>
        </dependency>

        <!-- Utility block -->
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
        </dependency>

        <dependency>
            <groupId>org.reflections</groupId>
            <artifactId>reflections</artifactId>
            <version>0.9.12</version>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.4</version>
            <scope>provided</scope>
        </dependency>




Dapur internal



Mari kita lanjutkan untuk membuat komponen sistem yang diperlukan (pembungkus / model). Kita akan membutuhkan saluran, kacang, httpInboundGateway, handler, jdbcOutboundGateway dan model hasil.



  • bean - objek pembantu yang dibutuhkan untuk adaptor, utas
  • saluran - saluran untuk mengirim pesan ke / dari komponen aliran
  • httpInboundGateway - jalur akses http yang selanjutnya akan kami kirim permintaan dengan data untuk diproses lebih lanjut
  • handler - jenis handler umum (transformator alur, berbagai adaptor, dll.)
  • jdbcOutboundGateway - adaptor jdbc
  • pengendali hasil untuk mengirim informasi ke saluran tertentu


Kami akan membutuhkan pembungkus untuk menyimpan parameter dan menginisialisasi dengan benar komponen seluruh aliran, jadi kami segera membuat toko komponen, tambahkan. fungsionalitas konverter JSON -> Model Definisi. Pemetaan langsung bidang menggunakan jackson dan objek dalam kasus saya tidak berlaku - kami memiliki satu lagi sepeda untuk protokol komunikasi tertentu.



Mari kita lakukan dengan baik segera , menggunakan anotasi :



StreamComponent - bertanggung jawab untuk mengidentifikasi kelas sebagai model penyetelan komponen stream dan memiliki informasi layanan - nama komponen, jenis komponen, apakah komponen bersarang dan deskripsi;



SettingClass - bertanggung jawab untuk opsi tambahan untuk memindai model, seperti memindai bidang kelas super dan mengabaikan bidang saat menginisialisasi nilai;



SettingValue - bertanggung jawab untuk mengidentifikasi bidang kelas sebagai dapat dikustomisasi dari luar, dengan pengaturan penamaan di JSON, deskripsi, tipe konverter, bendera bidang yang diperlukan dan bendera objek internal untuk tujuan informasi;



Manajer penyimpanan komponen



Metode pembantu untuk bekerja dengan model untuk pengendali REST



Model dasar - abstraksi dengan seperangkat bidang bantu / metode model Model



konfigurasi aliran saat ini



Mapper JSON -> Model Definisi



Tanah utama untuk pekerjaan disiapkan. Sekarang mari kita langsung ke implementasi, langsung, dari layanan yang akan bertanggung jawab untuk siklus hidup, penyimpanan, dan inisialisasi stream dan kami akan segera meletakkan gagasan bahwa kami dapat memparalelkan 1 stream dengan penamaan yang sama ke beberapa contoh, mis. kita perlu membuat pengidentifikasi unik (panduan) untuk semua komponen aliran, jika tidak tabrakan dengan komponen tunggal lainnya (kacang, saluran, dll.) dapat terjadi dalam konteks aplikasi. Tapi pertama-tama mari kita membuat pemetaan dari dua komponen - http dan jdbc, yaitu penambahan model yang dibuat sebelumnya ke komponen aliran itu sendiri (HttpRequestHandlerEndpointSpec dan JdbcOutboundGateway).



HttpRegistry



JdbcRegistry



Layanan Manajemen Pusat ( StreamDeployingService) melakukan fungsi menyimpan pekerja / tidak aktif, mendaftar yang baru, memulai, berhenti dan menghapus utas sepenuhnya dari konteks aplikasi. Fitur penting dari layanan ini adalah penerapan dependensi IntegrationFlowBuilderRegistry, yang membantu kita membuat dinamika aplikasi (mungkin ingat file xml konfigurasi ini atau kelas DSL untuk kilometer). Menurut spesifikasi streaming, itu harus selalu dimulai dengan komponen atau saluran yang masuk, jadi kami mempertimbangkan ini dalam penerapan metode registerStreamContext.



Dan manajer tambahan ( IntegrationFlowBuilderRegistry), yang melakukan fungsi baik mapper model untuk mengalirkan komponen dan inisialisasi aliran itu sendiri menggunakan IntegrationFlowBuilder. Saya juga menerapkan penangan log dalam pipa aliran, layanan untuk mengumpulkan metrik saluran aliran (opsi toggleable) dan kemungkinan implementasi konverter pesan aliran berdasarkan pada penerapan Groovy (jika tiba-tiba contoh ini menjadi dasar untuk penjualan, maka prakompilasi skrip groovy harus dilakukan pada tahap inisialisasi aliran , karena mengalami tes beban dalam RAM dan tidak peduli berapa banyak inti dan daya yang Anda miliki). Bergantung pada konfigurasi tahap log model dan parameter tingkat log, itu akan aktif setelah setiap pengiriman pesan dari komponen ke komponen. Pemantauan diaktifkan dan dinonaktifkan oleh parameter di application.yml:



monitoring:
  injectction:
    default: true


Sekarang kami memiliki semua mekanisme untuk menginisialisasi aliran pemrosesan data dinamis, kami juga dapat menulis pemetaan untuk berbagai protokol dan adaptor seperti RabbitMQ, Kafka, Tcp, Ftp, dll. Selain itu, dalam kebanyakan kasus, Anda tidak perlu menulis apa pun dengan tangan Anda sendiri (kecuali, tentu saja, model konfigurasi dan metode tambahan) - sejumlah besar komponen sudah ada dalam repositori .



Tahap terakhir adalah implementasi pengendali untuk mendapatkan informasi tentang komponen sistem yang ada, mengelola aliran, dan mendapatkan metrik.



ComponentsController - memberikan informasi tentang semua komponen dalam model yang dapat dibaca manusia, dan satu komponen berdasarkan nama dan jenis.



StreamController - menyediakan manajemen aliran penuh, yaitu inisialisasi model JSON baru, mulai, berhenti, menghapus dan mengeluarkan metrik oleh pengidentifikasi.



Produk akhir



Kami meningkatkan aplikasi yang dihasilkan dan menjelaskan kasus uji dalam format JSON.



Contoh Aliran Data
Skrip inisialisasi basis data:



CREATE TABLE IF NOT EXISTS account_data
(
    id          INT                      NOT NULL,
    accountname VARCHAR(45)              NOT NULL,
    password    VARCHAR(128),
    email       VARCHAR(255),
    last_ip     VARCHAR(15) DEFAULT NULL NOT NULL
);

CREATE UNIQUE INDEX account_data_username_uindex
    ON account_data (accountname);

ALTER TABLE account_data
    ALTER COLUMN id ADD GENERATED BY DEFAULT AS IDENTITY (
        SEQUENCE NAME account_data_id_seq
            START WITH 1
            INCREMENT BY 1
            NO MINVALUE
            NO MAXVALUE
            CACHE 1
        );

ALTER TABLE account_data
    ADD CONSTRAINT account_data_pk
        PRIMARY KEY (id);

CREATE TABLE IF NOT EXISTS account_info
(
    id             INT NOT NULL,
    banned         BOOLEAN  DEFAULT FALSE,
    premium_points INT      DEFAULT 0,
    premium_type   SMALLINT DEFAULT -1
);

ALTER TABLE account_info
    ALTER COLUMN id ADD GENERATED BY DEFAULT AS IDENTITY (
        SEQUENCE NAME account_info_id_seq
            START WITH 1
            INCREMENT BY 1
            NO MINVALUE
            NO MAXVALUE
            CACHE 1
        );

ALTER TABLE account_info
    ADD CONSTRAINT account_info_account_data_id_fk FOREIGN KEY (id) REFERENCES account_data (id)
        ON UPDATE CASCADE ON DELETE CASCADE;

ALTER TABLE account_info
    ADD CONSTRAINT account_info_pk
        PRIMARY KEY (id);



INSERT INTO account_data (accountname, password, email, last_ip)
VALUES ('test', 'test', 'test@test', '127.0.0.1');
INSERT INTO account_info (banned, premium_points, premium_type)
VALUES (false, 1000, 1);


: order — , .. , . ( ). — .



{
  "flowName": "Rest Postgres stream",
  "components": [
    {
      "componentName": "bean",
      "componentType": "other",
      "componentParameters": {
        "id": "pgDataSource",
        "bean-type": "com.zaxxer.hikari.HikariDataSource",
        "property-args": [
          {
            "property-name": "username",
            "property-value": "postgres"
          },
          {
            "property-name": "password",
            "property-value": "postgres"
          },
          {
            "property-name": "jdbcUrl",
            "property-value": "jdbc:postgresql://localhost:5432/test"
          },
          {
            "property-name": "driverClassName",
            "property-value": "org.postgresql.Driver"
          }
        ]
      }
    },
    {
      "componentName": "message-channel",
      "componentType": "source",
      "componentParameters": {
        "id": "jdbcReqChannel",
        "order": 1,
        "channel-type": "direct",
        "max-subscribers": 1000
      }
    },
    {
      "componentName": "message-channel",
      "componentType": "source",
      "componentParameters": {
        "id": "jdbcRepChannel",
        "order": 1,
        "channel-type": "direct"
      }
    },
    {
      "componentName": "http-inbound-gateway",
      "componentType": "source",
      "componentParameters": {
        "order": 2,
        "http-inbound-supported-methods": [
          "POST"
        ],
        "payload-type": "org.genfork.integration.model.request.http.SimpleJdbcPayload",
        "log-stages": true,
        "log-level": "INFO",
        "request-channel": "jdbcReqChannel",
        "reply-channel": "jdbcRepChannel"
      }
    },
    {
      "componentName": "handler",
      "componentType": "processor",
      "componentParameters": {
        "order": 3,
        "handler-definition": {
          "componentName": "jdbc-outbound-adapter",
          "componentType": "app",
          "componentParameters": {
            "data-source": "pgDataSource",
            "query": "SELECT accountname, password, email, last_ip, banned, premium_points, premium_type FROM account_data d INNER JOIN account_info i ON d.id = i.id WHERE d.id = :payload.accountId",
            "update-query": "UPDATE account_info SET banned = true WHERE id = :payload.accountId",
            "jdbc-reply-channel": "jdbcRepChannel",
            "log-stages": true,
            "log-level": "INFO"
          }
        }
      }
    },
    {
      "componentName": "result",
      "componentType": "app",
      "componentParameters": {
        "order": 4,
        "cancel": false,
        "result-channel": "jdbcRepChannel"
      }
    }
  ]
}





Pengujian:



1) Kami menginisialisasi aliran baru menggunakan metode



POST / stream / deploy, di mana JSON kami akan berada di badan permintaan.



Sebagai tanggapan, sistem harus mengirim jika semuanya benar, jika tidak, pesan kesalahan akan terlihat:



{
    "status": "SUCCESS", -  
    "streamId": "2bf65d9d-97c6-4199-86aa-0c808c25071b" -  
}


2) Kami memulai awal menggunakan metode:



GET / stream / 2bf65d9d-97c6-4199-86aa-0c808c25071b / start, di mana kami menunjukkan pengidentifikasi aliran yang diinisialisasi sebelumnya.



Sebagai tanggapan, sistem harus mengirim jika semuanya benar, jika tidak, pesan kesalahan akan terlihat:



{
    "status": "SUCCESS", -  
}


3) Memanggil stream oleh pengenal dalam sistem? Bagaimana, apa, dan di mana - dalam mapper model HttpRegistry, saya menulis kondisinya



Http.inboundGateway(localPath != null ? localPath : String.format("/stream/%s/call", uuid))


di mana, parameter http-inbound-path diperhitungkan, dan jika tidak ditentukan secara eksplisit dalam konfigurasi komponen, itu diabaikan dan jalur panggilan sistem diatur. Dalam kasus kami, ini akan menjadi:



POST / stream / ece4d4ac-3b46-4952-b0a6-8cf334074b99 / panggilan - di mana pengidentifikasi aliran hadir, dengan badan permintaan:



{
    "accountId": 1
}


Sebagai tanggapan, kami akan menerima, jika tahapan pemrosesan permintaan bekerja dengan benar, kami akan menerima struktur datar catatan dari tabel account_data dan account_info.



{
    "accountname": "test",
    "password": "test",
    "email": "test@test",
    "last_ip": "127.0.0.1",
    "banned": true,
    "premium_points": 1000,
    "premium_type": 1
}


Spesifisitas adaptor JdbcOutboundGateway adalah sedemikian rupa sehingga jika Anda menentukan parameter permintaan pembaruan, penangan tambahan terdaftar, yang pertama memperbarui data, dan hanya kemudian diambil oleh parameter kueri.



Jika Anda menentukan jalur yang sama secara manual, maka kemungkinan meluncurkan komponen dengan HttpInboundGateway sebagai titik akses ke aliran dalam beberapa kasus akan dihapuskan karena sistem tidak akan mengizinkan mendaftarkan jalur yang sama.



4) Mari kita lihat metrik menggunakan metode GET / stream / 2bf65d9d-97c6-4199-86aa-0c808c25071b / metrik



Konten tanggapan
, / , / / :



[
    {
        "streamId": "2bf65d9d-97c6-4199-86aa-0c808c25071b",
        "channelName": "application.Rest Postgres stream_2bf65d9d-97c6-4199-86aa-0c808c25071b_jdbcReqChannel",
        "sendDuration": {
            "count": 1,
            "min": 153.414,
            "max": 153.414,
            "mean": 153.414,
            "standardDeviation": 0.0,
            "countLong": 1
        },
        "maxSendDuration": 153.414,
        "minSendDuration": 153.414,
        "meanSendDuration": 153.414,
        "meanSendRate": 0.001195117818082359,
        "sendCount": 1,
        "sendErrorCount": 0,
        "errorRate": {
            "count": 0,
            "min": 0.0,
            "max": 0.0,
            "mean": 0.0,
            "standardDeviation": 0.0,
            "countLong": 0
        },
        "meanErrorRate": 0.0,
        "meanErrorRatio": 1.1102230246251565E-16
    },
    {
        "streamId": "2bf65d9d-97c6-4199-86aa-0c808c25071b",
        "channelName": "application.2bf65d9d-97c6-4199-86aa-0c808c25071b.channel#2",
        "sendDuration": {
            "count": 1,
            "min": 0.1431,
            "max": 0.1431,
            "mean": 0.1431,
            "standardDeviation": 0.0,
            "countLong": 1
        },
        "maxSendDuration": 0.1431,
        "minSendDuration": 0.1431,
        "meanSendDuration": 0.1431,
        "meanSendRate": 0.005382436008121413,
        "sendCount": 1,
        "sendErrorCount": 0,
        "errorRate": {
            "count": 0,
            "min": 0.0,
            "max": 0.0,
            "mean": 0.0,
            "standardDeviation": 0.0,
            "countLong": 0
        },
        "meanErrorRate": 0.0,
        "meanErrorRatio": 0.0
    },
    {
        "streamId": "2bf65d9d-97c6-4199-86aa-0c808c25071b",
        "channelName": "application.Rest Postgres stream_2bf65d9d-97c6-4199-86aa-0c808c25071b_jdbcRepChannel",
        "sendDuration": {
            "count": 1,
            "min": 0.0668,
            "max": 0.0668,
            "mean": 0.0668,
            "standardDeviation": 0.0,
            "countLong": 1
        },
        "maxSendDuration": 0.0668,
        "minSendDuration": 0.0668,
        "meanSendDuration": 0.0668,
        "meanSendRate": 0.001195118373693797,
        "sendCount": 1,
        "sendErrorCount": 0,
        "errorRate": {
            "count": 0,
            "min": 0.0,
            "max": 0.0,
            "mean": 0.0,
            "standardDeviation": 0.0,
            "countLong": 0
        },
        "meanErrorRate": 0.0,
        "meanErrorRatio": 1.1102230246251565E-16
    }
]




Kesimpulan



Jadi, diperlihatkan bagaimana, setelah menghabiskan sedikit lebih banyak waktu dan usaha, menulis aplikasi untuk integrasi dengan berbagai sistem daripada menulis penangan manual tambahan (pipa) setiap kali dalam aplikasi Anda untuk integrasi dengan sistem lain, masing-masing 200-500 baris kode.



Dalam contoh saat ini, Anda dapat memparalelkan pekerjaan dari jenis utas yang sama untuk beberapa contoh dengan menggunakan pengidentifikasi unik, menghindari tabrakan dalam konteks global aplikasi antara ketergantungan benang (nampan, saluran, dll.).



Selain itu, Anda dapat mengembangkan proyek:



  • menyimpan stream ke database;
  • membuat dukungan untuk semua komponen integrasi yang disediakan oleh komunitas pegas dan integrasi pegas;
  • membuat pekerja yang akan melakukan pekerjaan dengan utas sesuai jadwal;
  • buatlah UI yang waras untuk mengonfigurasikan stream dengan "mouse and component cubes" bersyarat (omong-omong, sebagian contoh dipertajam untuk proyek github.com/spring-cloud/spring-cloud-dataflow-ui ).


Dan sekali lagi saya akan menggandakan tautan ke repositori .



All Articles