Pada artikel ini kita akan berbicara tentang bagaimana mengimplementasikan orkestrator tugas tak terbatas menggunakan antrian. Sebagai tujuan akhir, kita perlu menerapkan sistem yang dapat mengelola tugas dengan umur panjang, sistem terdistribusi, di mana sekelompok tugas di-host di server tertentu, dan jika server ini gagal, tugas secara otomatis didistribusikan kembali ke yang gratis.
Dalam kebanyakan kasus, semua pengembangan perusahaan harus memenuhi persyaratan yang sama: aplikasi dibuat, tergantung pada jenis aplikasinya, memiliki semacam siklus hidup, di akhir masa pakai aplikasi, kami menerima (... atau tidak menerima) apa yang kami inginkan. Yang kami maksud dengan aplikasi bisa adalah apa saja mulai dari pembelian produk secara online, wesel, atau menghitung lintasan rudal balistik. Setiap aplikasi memiliki cara hidupnya sendiri dan yang penting untuk diperhatikan adalah masa pakai , dan semakin pendek waktu ini, semakin baik. Dengan kata lain, semakin cepat transfer kawat saya selesai, semakin baik. Persyaratannya juga serupa, lebih banyak operasi RPC per detik, Latensi lebih sedikit , sistem harus toleran terhadap kesalahan, dapat diskalakan, dan siap kemarin... Ada jutaan alat, ratusan database, pendekatan dan pola yang berbeda. Dan semuanya telah ditulis untuk waktu yang lama, kami hanya perlu menggunakan teknologi yang sudah jadi dengan benar dalam proyek kami.
Topik orkestrasi tugas bukanlah hal baru, tetapi yang mengejutkan saya, tidak ada solusi yang siap pakai untuk mengelola tugas tak terbatas (yang masa hidupnya tidak terbatas), dengan kemungkinan mendistribusikan ulang tugas ke server aktif. Oleh karena itu, kami akan menerapkan solusi kami sendiri. Tapi hal pertama yang pertamaβ¦.
, . β (Job), , . . , ββ, . : , . , , , . ββ- WebSocket , connected. , , , , . , ββ Observer , , .
, , . :
, , .
, , .
, , . , , , , .
/, , ( , RAM ..), .
: N , . , , .
3 . #, . , C# .Net.
Task. . Task ββ.
Schedulers. , . , , .
, , . , . RabbitMq, Framework - MassTransit, . .
Task
Task. , ( , ).
. , βHello Wordβ :
public async Task SendEmailAsync(Email email, CancellationToken token)
{
//
}
, , await SendEmailAsync.
foreach (var email in emails
{
if(token.IsCancellationRequested)
break;
_emailSender.SendEmailAsync(email, token); // await
}
:
.
FireAndForget Exception .
.
, , .
await- , async/await .
email, , CancellationToken. , , , , . RetryPolicy , ?! , .
Schedulers
.NET , .
. , ( , β , , instance ) /Tasks. Hangfire, - UI, , . .
, Hangfire. BackgroundJob.Enqueue(Expression<Action> methodCall).
var jobIds = new List<string>();
foreach (var email in emails)
{
if(token.IsCancellationRequested)
break;
jobIds.Add(BackgroundJob.Enqueue(
async () => await _emailSender.SendEmailAsync(email, token)));
}
, , . RetryPolicy , . , , .
, . , ββ :
_observer.DoWork(observerArg, new CancellationToken())
- , . BackgroundJobClient.
var client = new BackgroundJobClient(JobStorage.Current);
// , .
var state = new EnqueuedState(βunique-queue-nameβ);
client.Create(() =>_observer.DoWork(observerArg, new CancellationToken()), state);
, . - unique-queue-name.
// instance hangfire .
_server = new BackgroundJobServer(new BackgroundJobServerOptions()
{
WorkerCount = 10,
Queues = new[] { βunique-queue-nameβ },
ServerName = _serverOptions.ServerName
});
WorkerCount - , . , .
, , . : . , . Hangfire , , .
_monitoringApi = JobStorage.Current.GetMonitoringApi();
:
Observer-service - , , ( HangFilre WorkerCount ).
Observer-manager - , ... . , , . .
Scheduler common db β - , Hangfire MsSql, PostgreSql Redis.
β . ββ.
, , , , , , .
, , . , . Hangfire. :
1) . , , .
2) . . , , , .
3) . custom-id, . - .
4) , βdefaultβ . , , . job-filters . , .
5) , , , . , , , framework .
, . , , , , .
, ,
, , , . . ? , β , , . , ? , . , , . , .
? ββ. - PrefetchCount .
Ready.
Conumer , Unacked. Consumer .
, _Error .
acknowledged, Consumer.
- PrefetchCount , ( ), WorkerCount, Hangfire.
:
Observer-services, . PrefetchCount 1
. , . , , Unacked.
"β, :
Observer-services , , Round-robin.
msg1 . , βObserver 1β. Unacked , .
msg2 . βObserver 1β , , βObserver 2β.
, βObserver-service 1β , ( - β ... ?β).
, , acknowledgement Unacked Ready. . , , .
- , . _Error, RetryPolicy. , .
RetryPolicy :
1000 .
5 1,4,10... .
int.MaxValue .
? ββ, /. PrefetchCount, 10, 10 , . - , , . , 10 , 5 ββ, , , 11- , .
? ? , , ... ?! , , "" , CancellationToken.
Manager. . , , . , . , , :
Id () - Guid .
Name (), , , .
CreatedAt/ModifyAt ( / ).
WorkersCount, PrefetchCount - , .
Manager .
Id |
Name |
WorkerCount |
CreatedAt |
ModifyAt |
IsDeleted |
{Unique id} |
Observer service 1 |
10 |
{some date} |
null |
false |
{Unique id} |
Observer service 2 |
10 |
{some date} |
null |
false |
{Unique id} |
Observer service 3 |
10 |
{some date} |
null |
false |
. , , 3 - .
, , , N . IsDeleted=true.
, (Kill β9, ). , Docker. , , . ββ, , , . , - β¦.
ββ API. ( , βState queueβ ). ββ , , , , - .
, , ββ. , , , , .
, , ββ Created.
Id |
Name |
CreatedAt |
ModifyAt |
ServiceId |
Status |
{Observer id} |
My_new_observer |
{created date} |
null |
null |
Created |
, , , Processing .
Id |
Name |
CreatedAt |
ModifyAt |
ServiceId |
Status |
{Observer id} |
My_new_observer |
{created date} |
{modify date} |
{Observer service 1 id} |
Processing |
ββ .
:
Created
Processing
OnDeleting
Deleted
"", :
1) , CancellationToken.
2) , FanOut. , ββ , .
, β , ... β β.
Observer-service , . , ββ CancellationToken. ββ .
ββ . , id . , .
Created, ββ . - , ββ.
OnDeleting Deleted, - ββ , .
Processing, ββ OnDeleting . . , ββ, CancellationToken βstate queueβ. , OnDeleting Deleted.
Id |
Name |
CreatedAt |
ModifyAt |
ServiceId |
Status |
{Observer id} |
My_new_observer |
{created date} |
{modify date} |
{Observer service 1 id} |
Deleted |
:
1) .
, . , - MsSql, RabbitMq, Kafka, Kubernetes , , SLA . , . - , .
2) blackout, .
, - , , , , , ββ, . ββ, . ( , .)
3) .
, "β, . "β, , .
4) . , "β.
. - , , .
5) ββ, , .
, , ββ . . . , , , , , .
, . , , - Unacked, - Ready. , , polling , . - "β, . , , , PrefetchCount. , , .