Katakanlah kita memiliki tugas seperti itu.
Ada sumber transaksi di pasar saham. Sumber ini mengirimkan transaksi kepada kami melalui antarmuka Istirahat.
Kita perlu mendapatkan transaksi ini, menyimpannya ke database dan membuat penyimpanan dalam memori yang nyaman.
Repositori ini harus melakukan fungsi berikut:
- mengembalikan daftar perdagangan;
- mengembalikan posisi penuh, mis. tabel "instrumen" - "jumlah sekuritas saat ini";
- posisi pengembalian untuk instrumen tertentu.
Bagaimana kita mendekati tugas ini?
Sesuai dengan aturan mode layanan mikro, kita perlu membagi tugas menjadi komponen layanan mikro:
- penerimaan transaksi dengan Istirahat;
- menyimpan transaksi ke database;
- penyimpanan dalam memori untuk menyajikan data posisi.
Mari buat layanan pertama dan ketiga dalam kerangka tutorial ini, dan tinggalkan yang kedua untuk bagian kedua (tulis di komentar jika menarik).
Jadi, kami memiliki dua layanan mikro.
Yang pertama menerima data dari luar.
Proses kedua data ini dan menanggapi permintaan yang masuk.
Tentu saja, kami ingin mendapatkan penskalaan horizontal, update nonstop, dan manfaat layanan mikro lainnya.
Tugas apa yang sangat sulit di hadapan kita?
Sebenarnya ada banyak, tapi sekarang mari kita bicara tentang bagaimana data akan mengalir di antara layanan mikro ini. Anda juga dapat membuat Istirahat di antara mereka, Anda dapat memasukkan beberapa jenis antrian, Anda dapat menemukan banyak hal dengan pro dan kontra mereka.
Mari kita lihat satu pendekatan yang mungkin - komunikasi asinkron melalui kerangka kerja Axon .
Apa keuntungan dari solusi ini?
Pertama, komunikasi asinkron meningkatkan fleksibilitas (ya, ada minus di sini, tapi sejauh ini kita hanya membicarakan plus).
Kedua, kami mendapatkan Event Sourcing dan CQRS langsung dari kotaknya .
Ketiga, Axon menyediakan infrastruktur yang sudah jadi, dan kita hanya perlu fokus mengembangkan logika bisnis.
Mari kita mulai.
Kami akan memiliki proyek di gradle. Ini akan memiliki tiga modul:
- umum. modul dengan struktur data umum (kami tidak suka copy-paste);
- tradeCreator. modul dengan layanan mikro untuk menerima transaksi saat Istirahat;
- tradeQueries. modul dengan microservice untuk menampilkan posisi.
Mari kita ambil Spring Boot sebagai dasar dan hubungkan starter Axon.
Axon berfungsi dengan baik tanpa Spring, tapi kami akan menggunakannya bersama.
Di sini kami perlu berhenti dan memberi tahu Anda beberapa kata tentang Axon.
Ini adalah sistem klien-server. Ada server - ini adalah aplikasi terpisah, kami akan menjalankannya di buruh pelabuhan.
Dan ada klien yang menyematkan dirinya ke layanan mikro.
Ini gambarnya. Pertama, server Axon (di buruh pelabuhan) diluncurkan, kemudian layanan mikro kami.
Saat memulai, layanan mikro mencari server dan mulai berinteraksi dengannya. Interaksi dapat dibagi menjadi dua jenis: teknis dan bisnis.
Yang teknis adalah pertukaran pesan "Saya hidup" (pesan seperti itu dapat dilihat dalam mode debug logging).
Bisnis dikaburkan oleh pesan-pesan seperti "kesepakatan baru".
Sebuah fitur penting, setelah memulai layanan mikro, ia dapat menanyakan server Axon "apa yang terjadi" dan server mengirimkan peristiwa yang terakumulasi ke layanan mikro. Dengan demikian, layanan mikro dapat diluncurkan kembali dengan relatif aman tanpa kehilangan data.
Dengan skema pertukaran ini, kita dapat dengan mudah menjalankan banyak layanan mikro,
dan pada host yang berbeda.
Ya, satu contoh Server Axon tidak dapat diandalkan, tapi sejauh ini.
Kami bekerja dalam paradigma Event Sourcing dan CQRS. Artinya kita harus memiliki "tim", "peristiwa", dan "sampel".
Kita akan memiliki satu perintah: "buat kesepakatan", satu peristiwa "kesepakatan dibuat" dan tiga pilihan: "tampilkan semua transaksi", "tampilkan posisi", "tunjukkan posisi untuk instrumen".
Skema kerjanya adalah sebagai berikut:
- Layanan mikro TradeCreator menerima transaksi Istirahat.
- Layanan mikro tradeCreator membuat perintah "buat perdagangan" dan mengirimkannya ke server Axon.
- Server Axon menerima perintah dan mengirimkan perintah ke penerima yang tertarik, dalam kasus kami itu adalah microservice tradeCreator.
- Layanan mikro tradeCreator menerima perintah, menghasilkan peristiwa "kesepakatan dibuat" dan mengirimkannya ke server Axon.
- Server Axon menerima acara dan meneruskannya ke pelanggan yang tertarik.
- Sekarang kami hanya memiliki satu penerima yang tertarik - layanan mikro tradeQueries.
- Layanan mikro tradeQueries menerima peristiwa dan memperbarui data internal.
(Penting bahwa pada saat acara terbentuk, layanan Mikro tradeQueries mungkin tidak tersedia, tetapi begitu dimulai, itu akan segera menerima acara).
Ya, server-akson berada di pusat komunikasi, semua pesan melewatinya.
Mari beralih ke pengkodean.
Agar tidak mengacaukan posting dengan kode, di bawah ini saya hanya akan memberikan fragmen, tautan ke seluruh contoh ada di bawah.
Mari kita mulai dengan modul umum.
Di dalamnya, bagian umum adalah acara (kelas CreatedTradeEvent). Perhatikan namanya, sebenarnya ini adalah nama tim yang menghasilkan acara ini, namun dalam bentuk lampau. Dulu, karena pertama, perintah muncul, yang mengarah ke pembuatan acara.
Struktur umum lainnya termasuk kelas untuk mendeskripsikan posisi (posisi kelas), perdagangan (kelas Perdagangan) dan sisi perdagangan (enum Side), mis. membeli atau menjual.
Mari beralih ke modul tradeCreator.
Modul ini memiliki antarmuka Istirahat (kelas TradeController) untuk menerima perdagangan.
Perintah "buat kesepakatan" dibentuk dari kesepakatan yang diterima dan dikirim ke server-axon.
@PostMapping("/trade")
public ResponseEntity<String> create(@RequestBody Trade trade) {
var createTradeCommand = CreateTradeCommand.builder()
.tradeId(trade.getTradeId())
...
.build();
var result = commandGateway.sendAndWait(createTradeCommand, 3, TimeUnit.SECONDS);
return ResponseEntity.ok(result.get().toString());
}
Untuk memproses perintah, kelas TradeAggregate digunakan.
Agar Axon menemukannya, kami menganotasi @Aggregate.
Metode untuk memproses perintah terlihat seperti ini (dengan singkatan):
@CommandHandler
public TradeAggregate(CreateTradeCommand command) {
log.info("command: {}", command);
var event = CreatedTradeEvent.builder()
.tradeId(command.tradeId())
....
.build();
AggregateLifecycle.apply(event);
}
Suatu peristiwa dihasilkan dari perintah dan dikirim ke server.
Perintahnya ada di kelas CreateTradeCommand.
Sekarang mari kita lihat modul tradeQueries terakhir.
Pilihannya dijelaskan dalam paket kueri.
Modul ini juga memiliki antarmuka Rest
TradeController kelas publik.
Sebagai contoh, mari kita lihat pemrosesan permintaan: "tampilkan semua transaksi".
@GetMapping("/trade/all")
public List<Trade> findAllTrades() {
return queryGateway.query(new FindAllTradesQuery(),
ResponseTypes.multipleInstancesOf(Trade.class)).join();
}
Permintaan pengambilan dibuat dan dikirim ke server.
Kelas TradesEventHandler digunakan untuk memproses permintaan pengambilan.
Ini memiliki metode yang dijelaskan
@QueryHandler
public List<Position> handleFindCurrentPositionQuery(FindCurrentPositionQuery query)
Dialah yang bertanggung jawab untuk mengambil data dari penyimpanan dalam memori.
Muncul pertanyaan tentang bagaimana informasi diperbarui di toko ini.
Pertama-tama, ini hanyalah kumpulan ConcurrentHashMaps yang disesuaikan untuk pilihan tertentu.
Untuk memperbaruinya, metode diterapkan:
@EventHandler
public void on(CreatedTradeEvent event) {
log.info("event:{}", event);
var trade = Trade.builder()
...
.build();
trades.put(event.tradeId(), trade);
position.merge(event.shortName(), event.size(),
(oldValue, value) -> event.side() == Side.BUY ? oldValue + value : oldValue - value);
}
Ia menerima acara "kesepakatan dibuat" dan memperbarui Maps.
Ini adalah sorotan dari pengembangan layanan mikro.
Bagaimana dengan kelemahan Axon?
Pertama, ini adalah komplikasi infrastruktur, titik kegagalan telah muncul - server Axon, semua komunikasi melewatinya.
Kedua, kelemahan dari sistem terdistribusi seperti itu sangat jelas terlihat - ketidakkonsistenan data sementara. Dalam kasus kami, waktu yang sangat lama mungkin berlalu antara menerima kesepakatan baru dan memperbarui data untuk sampel.
Apa yang tertinggal di balik layar?
Tidak ada yang dikatakan sama sekali tentang Event Sourcing dan CQRS, apa itu dan untuk apa.
Tanpa mengungkap konsep ini, beberapa poin mungkin tidak jelas.
Mungkin beberapa fragmen kode juga memerlukan klarifikasi.
Kami membicarakan hal ini di webinar terbuka .
Contoh lengkap .