Orchestrator dengan tugas tanpa akhir

Pada artikel ini kita akan berbicara tentang bagaimana mengimplementasikan orkestrator tugas tak terbatas menggunakan antrian. Sebagai tujuan akhir, kita perlu menerapkan sistem yang dapat mengelola tugas dengan umur panjang, sistem terdistribusi, di mana sekelompok tugas di-host di server tertentu, dan jika server ini gagal, tugas secara otomatis didistribusikan kembali ke yang gratis. 





Dalam kebanyakan kasus, semua pengembangan perusahaan harus memenuhi persyaratan yang sama: aplikasi dibuat, tergantung pada jenis aplikasinya, memiliki semacam siklus hidup, di akhir masa pakai aplikasi, kami menerima (... atau tidak menerima) apa yang kami inginkan. Yang kami maksud dengan aplikasi bisa adalah apa saja mulai dari pembelian produk secara online, wesel, atau menghitung lintasan rudal balistik. Setiap aplikasi memiliki cara hidupnya sendiri dan yang penting untuk diperhatikan adalah  masa pakai , dan semakin pendek waktu ini, semakin baik. Dengan kata lain, semakin cepat transfer kawat saya selesai, semakin baik. Persyaratannya juga serupa, lebih banyak   operasi RPC per detik, Latensi lebih sedikit  , sistem harus toleran terhadap kesalahan, dapat diskalakan, dan siap  kemarin... Ada jutaan alat, ratusan database, pendekatan dan pola yang berbeda. Dan semuanya telah ditulis untuk waktu yang lama, kami hanya perlu menggunakan teknologi yang sudah jadi dengan benar dalam proyek kami. 





Topik orkestrasi tugas bukanlah hal baru, tetapi yang mengejutkan saya, tidak ada solusi yang siap pakai untuk mengelola tugas tak terbatas (yang masa hidupnya tidak terbatas), dengan kemungkinan mendistribusikan ulang tugas ke server aktif. Oleh karena itu, kami akan menerapkan solusi kami sendiri. Tapi hal pertama yang pertama…. 






, . β€” (Job), , . . , β€œβ€, . : , . , , , . β€œβ€-  WebSocket ,  connected. , , , , . , β€œβ€  Observer  , , . 





, , .     : 





  • , , . 





  • , , . 





  • , , . , , , , . 





  • /, , ( , RAM ..), . 





: N , .  , , . 





3 . #, . , C# .Net. 





  1.  Task. .  Task  β€œβ€.  





  2. Schedulers. , . , , . 





  3. , , . ,    .  RabbitMq,  Framework - MassTransit, . . 





 Task 

 Task. , ( , ). 





. ,   β€œHello Word”   : 





public async Task SendEmailAsync(Email email, CancellationToken token) 
{ 
	   //   
}
      
      



, ,  await   SendEmailAsync. 





foreach (var email in emails 
{ 
   if(token.IsCancellationRequested) 
        break; 
    _emailSender.SendEmailAsync(email, token); // await 
} 
      
      



:









  • FireAndForget   Exception  . 









  • , ,    .  





 await- ,    async/await  .





 email, ,  CancellationToken. , , , , .  RetryPolicy  , ?! , .





Schedulers

.NET , . 





  • HangFire 





  • Quartz.net 





  .  , ( , β€” , ,  instance )  /Tasks.  Hangfire, - UI, , . .





,  Hangfire.  BackgroundJob.Enqueue(Expression<Action> methodCall). 





var jobIds = new List<string>(); 
foreach (var email in emails) 
{ 
   if(token.IsCancellationRequested) 
      break; 
   jobIds.Add(BackgroundJob.Enqueue( 
      async () => await _emailSender.SendEmailAsync(email, token))); 
} 
      
      



, , .  RetryPolicy , . , , . 





, . , β€œβ€ :





_observer.DoWork(observerArg, new CancellationToken())







- , .   BackgroundJobClient





var client = new BackgroundJobClient(JobStorage.Current);

//  ,    .
var state = new EnqueuedState(β€œunique-queue-name”); 
client.Create(() =>_observer.DoWork(observerArg, new CancellationToken()), state);
      
      



, . - unique-queue-name





//  instance hangfire . 
_server = new BackgroundJobServer(new BackgroundJobServerOptions() 
{   
    WorkerCount = 10, 
    Queues = new[] { β€œunique-queue-name” }, 
    ServerName = _serverOptions.ServerName 
}); 
      
      



WorkerCount - , . , . 





, , . : . , .  Hangfire  , , . 





_monitoringApi = JobStorage.Current.GetMonitoringApi(); 
      
      







Observer-service  - , , ( HangFilre  WorkerCount ). 





Observer-manager - , ... .  , , . . 





Scheduler common db β€“ -  , Hangfire   MsSql,  PostgreSql   Redis.





β€”    . β€œβ€.  





, , , , , , . 





, , . , .    Hangfire. :





1)   . , , . 





2)  . . , , , . 





3)  .  custom-id, . - . 





4)  , β€œdefault” . , , .  job-filters   . , . 





5)  , , , .  , , ,  framework  .  





6)  ,  Hangfire   MsSql,  Redis, . 





, .  , , , , . 





, ,

, , , . . ? , β€” , , . , ? , . , , . , . 





? β€œβ€.  - PrefetchCount  .  





  •  Ready. 





  •  Conumer  ,  Unacked.  Consumer  .  





  • , _Error .  





  •  acknowledged,  Consumer.  





- PrefetchCount  , ( ),  WorkerCount,  Hangfire. 









 Observer-services, . PrefetchCount  1



. , . , ,  Unacked. 





"”, : 





 Observer-services  , ,  Round-robin





  • msg1  .  , β€œObserver 1”.  Unacked  ,  . 





  • msg2  . β€œObserver 1”  ,  ,  β€œObserver 2”. 





, β€œObserver-service 1”  , ( - β€œ ... ?”). 





 , ,  acknowledgement   Unacked  Ready.  . , , . 





-  , . _Error,  RetryPolicy. ,    .  

 RetryPolicy  : 





  • 1000 . 





  • 5 1,4,10...  . 





  •  int.MaxValue . 





? β€œβ€, /.  PrefetchCount, 10, 10 , .    - , ,  . , 10 , 5 β€œβ€, , ,    11- , .





 ? ? , , ... ?! , , "" ,  CancellationToken.  





 Manager. . , , . , . , , : 





  • Id () - Guid  . 





  • Name (), , , . 





  • CreatedAt/ModifyAt ( / ). 





  • WorkersCount,  PrefetchCount - , . 





Manager  . 





Id





Name





WorkerCount





CreatedAt





ModifyAt





IsDeleted





{Unique id} 





Observer service 1





10





{some date}





null





false





{Unique id} 





Observer service 2





10





{some date}





null





false





{Unique id} 





Observer service 3





10





{some date}





null





false





 . , , 3 - . 





,  ,   , N .  IsDeleted=true





, (Kill β€“9, ). ,  Docker. , , . β€œβ€, , , . , - …. 





β€œβ€ API. ( , β€œState queue” ). β€œβ€ , , , , - .





, , β€œβ€. , , , , . 





,  ,  β€œβ€  Created.





Id





Name





CreatedAt





ModifyAt





ServiceId





Status





{Observer id} 





My_new_observer





{created date}





null





null





Created





, , ,  Processing  .  





Id





Name





CreatedAt





ModifyAt





ServiceId





Status





{Observer id} 





My_new_observer





{created date}





{modify date} 





{Observer service 1 id} 





Processing 





β€œβ€ . 









  • Created  





  • Processing 





  • OnDeleting





  • Deleted 





"", : 





1) ,  CancellationToken. 





2) ,  FanOut. ,  β€œβ€ , . 





, β€” , ... β€œ ”.  





 Observer-service  , . , β€œβ€  CancellationToken. β€œβ€ . 





β€œβ€ . ,  id . , .  





  •  Created, β€œβ€ . - , β€œβ€. 





  •  OnDeleting  Deleted, - β€œβ€ , . 





  •  Processing, β€œβ€  OnDeleting  .    . , β€œβ€,  CancellationToken  β€œstate queue”. ,  OnDeleting  Deleted





Id





Name





CreatedAt





ModifyAt





ServiceId





Status





{Observer id} 





My_new_observer 





{created date}





{modify date} 





{Observer service 1 id} 





Deleted 





: 





1)





  ,  . , - MsSql, RabbitMq, Kafka,  Kubernetes  , , SLA . , . - , .  





2)  blackout, . 





, - , , , , , β€œβ€, . β€œβ€, .  ( , .) 





3)





, "”, . "”, , . 





4) . , "”. 





. - , , . 





5) β€œβ€, , . 





, , β€œβ€ . . . , , , , , . 





, . , , - Unacked, - Ready. , ,  polling , . - "”, . , , ,  PrefetchCount. , , .








All Articles