Orquestrador de tarefas infinitas

Neste artigo, vamos falar sobre como implementar um orquestrador de tarefas infinitas usando filas. Como objetivo final, precisamos implementar um sistema que possa gerenciar tarefas com uma longa vida útil, um sistema distribuído, onde um grupo de tarefas fica hospedado em um servidor específico, e se este servidor falhar, as tarefas são automaticamente redistribuídas em outras gratuitas. 





Na maioria dos casos, todo desenvolvimento empresarial se resume a cumprir os mesmos requisitos: um aplicativo é criado, dependendo do tipo de aplicativo, ele tem algum tipo de ciclo de vida, ao final da vida do aplicativo, recebemos (... ou não recebemos) o que queremos. Por aplicativo, podemos significar qualquer coisa, desde a compra online de um produto, uma ordem de pagamento ou o cálculo da trajetória de um míssil balístico. Cada aplicativo tem seu próprio estilo de vida, e o que é importante observar é  o tempo de vida , e quanto menor for esse tempo, melhor. Em outras palavras, quanto antes minha transferência eletrônica for concluída, melhor. Os requisitos também são semelhantes, mais   operações RPC por segundo, menos  latência , o sistema deve ser tolerante a falhas, escalonável e deve estar pronto  ontem... Existem um milhão de ferramentas, centenas de bancos de dados, diferentes abordagens e padrões. E tudo já está escrito há muito tempo, só temos que usar corretamente as tecnologias prontas em nossos projetos. 





O tópico de orquestração de tarefas não é novo, mas para minha surpresa, simplesmente não existem soluções prontas para gerenciar tarefas infinitas (cujo tempo de vida é ilimitado), com a possibilidade de redistribuir tarefas entre servidores ativos. Portanto, implementaremos nossa própria solução. Mas as primeiras coisas primeiro…. 






, . — (Job), , . . , “”, . : , . , , , . “”-  WebSocket ,  connected. , , , , . , “”  Observer  , , . 





, , .     : 





  • , , . 





  • , , . 





  • , , . , , , , . 





  • /, , ( , RAM ..), . 





: N , .  , , . 





3 . #, . , C# .Net. 





  1.  Task. .  Task  “”.  





  2. Schedulers. , . , , . 





  3. , , . ,    .  RabbitMq,  Framework - MassTransit, . . 





 Task 

 Task. , ( , ). 





. ,   “Hello Word”   : 





public async Task SendEmailAsync(Email email, CancellationToken token) 
{ 
	   //   
}
      
      



, ,  await   SendEmailAsync. 





foreach (var email in emails 
{ 
   if(token.IsCancellationRequested) 
        break; 
    _emailSender.SendEmailAsync(email, token); // await 
} 
      
      



:









  • FireAndForget   Exception  . 









  • , ,    .  





 await- ,    async/await  .





 email, ,  CancellationToken. , , , , .  RetryPolicy  , ?! , .





Schedulers

.NET , . 





  • HangFire 





  • Quartz.net 





  .  , ( , — , ,  instance )  /Tasks.  Hangfire, - UI, , . .





,  Hangfire.  BackgroundJob.Enqueue(Expression<Action> methodCall). 





var jobIds = new List<string>(); 
foreach (var email in emails) 
{ 
   if(token.IsCancellationRequested) 
      break; 
   jobIds.Add(BackgroundJob.Enqueue( 
      async () => await _emailSender.SendEmailAsync(email, token))); 
} 
      
      



, , .  RetryPolicy , . , , . 





, . , “” :





_observer.DoWork(observerArg, new CancellationToken())







- , .   BackgroundJobClient





var client = new BackgroundJobClient(JobStorage.Current);

//  ,    .
var state = new EnqueuedState(“unique-queue-name”); 
client.Create(() =>_observer.DoWork(observerArg, new CancellationToken()), state);
      
      



, . - unique-queue-name





//  instance hangfire . 
_server = new BackgroundJobServer(new BackgroundJobServerOptions() 
{   
    WorkerCount = 10, 
    Queues = new[] { “unique-queue-name” }, 
    ServerName = _serverOptions.ServerName 
}); 
      
      



WorkerCount - , . , . 





, , . : . , .  Hangfire  , , . 





_monitoringApi = JobStorage.Current.GetMonitoringApi(); 
      
      







Observer-service  - , , ( HangFilre  WorkerCount ). 





Observer-manager - , ... .  , , . . 





Scheduler common db – -  , Hangfire   MsSql,  PostgreSql   Redis.





—    . “”.  





, , , , , , . 





, , . , .    Hangfire. :





1)   . , , . 





2)  . . , , , . 





3)  .  custom-id, . - . 





4)  , “default” . , , .  job-filters   . , . 





5)  , , , .  , , ,  framework  .  





6)  ,  Hangfire   MsSql,  Redis, . 





, .  , , , , . 





, ,

, , , . . ? , — , , . , ? , . , , . , . 





? “”.  - PrefetchCount  .  





  •  Ready. 





  •  Conumer  ,  Unacked.  Consumer  .  





  • , _Error .  





  •  acknowledged,  Consumer.  





- PrefetchCount  , ( ),  WorkerCount,  Hangfire. 









 Observer-services, . PrefetchCount  1



. , . , ,  Unacked. 





"”, : 





 Observer-services  , ,  Round-robin





  • msg1  .  , “Observer 1”.  Unacked  ,  . 





  • msg2  . “Observer 1”  ,  ,  “Observer 2”. 





, “Observer-service 1”  , ( - “ ... ?”). 





 , ,  acknowledgement   Unacked  Ready.  . , , . 





-  , . _Error,  RetryPolicy. ,    .  

 RetryPolicy  : 





  • 1000 . 





  • 5 1,4,10...  . 





  •  int.MaxValue . 





? “”, /.  PrefetchCount, 10, 10 , .    - , ,  . , 10 , 5 “”, , ,    11- , .





 ? ? , , ... ?! , , "" ,  CancellationToken.  





 Manager. . , , . , . , , : 





  • Id () - Guid  . 





  • Name (), , , . 





  • CreatedAt/ModifyAt ( / ). 





  • WorkersCount,  PrefetchCount - , . 





Manager  . 





Id





Name





WorkerCount





CreatedAt





ModifyAt





IsDeleted





{Unique id} 





Observer service 1





10





{some date}





null





false





{Unique id} 





Observer service 2





10





{some date}





null





false





{Unique id} 





Observer service 3





10





{some date}





null





false





 . , , 3 - . 





,  ,   , N .  IsDeleted=true





, (Kill –9, ). ,  Docker. , , . “”, , , . , - …. 





“” API. ( , “State queue” ). “” , , , , - .





, , “”. , , , , . 





,  ,  “”  Created.





Id





Name





CreatedAt





ModifyAt





ServiceId





Status





{Observer id} 





My_new_observer





{created date}





null





null





Created





, , ,  Processing  .  





Id





Name





CreatedAt





ModifyAt





ServiceId





Status





{Observer id} 





My_new_observer





{created date}





{modify date} 





{Observer service 1 id} 





Processing 





“” . 









  • Created  





  • Processing 





  • OnDeleting





  • Deleted 





"", : 





1) ,  CancellationToken. 





2) ,  FanOut. ,  “” , . 





, — , ... “ ”.  





 Observer-service  , . , “”  CancellationToken. “” . 





“” . ,  id . , .  





  •  Created, “” . - , “”. 





  •  OnDeleting  Deleted, - “” , . 





  •  Processing, “”  OnDeleting  .    . , “”,  CancellationToken  “state queue”. ,  OnDeleting  Deleted





Id





Name





CreatedAt





ModifyAt





ServiceId





Status





{Observer id} 





My_new_observer 





{created date}





{modify date} 





{Observer service 1 id} 





Deleted 





: 





1)





  ,  . , - MsSql, RabbitMq, Kafka,  Kubernetes  , , SLA . , . - , .  





2)  blackout, . 





, - , , , , , “”, . “”, .  ( , .) 





3)





, "”, . "”, , . 





4) . , "”. 





. - , , . 





5) “”, , . 





, , “” . . . , , , , , . 





, . , , - Unacked, - Ready. , ,  polling , . - "”, . , , ,  PrefetchCount. , , .








All Articles