Neste artigo, vamos falar sobre como implementar um orquestrador de tarefas infinitas usando filas. Como objetivo final, precisamos implementar um sistema que possa gerenciar tarefas com uma longa vida útil, um sistema distribuído, onde um grupo de tarefas fica hospedado em um servidor específico, e se este servidor falhar, as tarefas são automaticamente redistribuídas em outras gratuitas.
Na maioria dos casos, todo desenvolvimento empresarial se resume a cumprir os mesmos requisitos: um aplicativo é criado, dependendo do tipo de aplicativo, ele tem algum tipo de ciclo de vida, ao final da vida do aplicativo, recebemos (... ou não recebemos) o que queremos. Por aplicativo, podemos significar qualquer coisa, desde a compra online de um produto, uma ordem de pagamento ou o cálculo da trajetória de um míssil balístico. Cada aplicativo tem seu próprio estilo de vida, e o que é importante observar é o tempo de vida , e quanto menor for esse tempo, melhor. Em outras palavras, quanto antes minha transferência eletrônica for concluída, melhor. Os requisitos também são semelhantes, mais operações RPC por segundo, menos latência , o sistema deve ser tolerante a falhas, escalonável e deve estar pronto ontem... Existem um milhão de ferramentas, centenas de bancos de dados, diferentes abordagens e padrões. E tudo já está escrito há muito tempo, só temos que usar corretamente as tecnologias prontas em nossos projetos.
O tópico de orquestração de tarefas não é novo, mas para minha surpresa, simplesmente não existem soluções prontas para gerenciar tarefas infinitas (cujo tempo de vida é ilimitado), com a possibilidade de redistribuir tarefas entre servidores ativos. Portanto, implementaremos nossa própria solução. Mas as primeiras coisas primeiro….
, . — (Job), , . . , “”, . : , . , , , . “”- WebSocket , connected. , , , , . , “” Observer , , .
, , . :
, , .
, , .
, , . , , , , .
/, , ( , RAM ..), .
: N , . , , .
3 . #, . , C# .Net.
Task. . Task “”.
Schedulers. , . , , .
, , . , . RabbitMq, Framework - MassTransit, . .
Task
Task. , ( , ).
. , “Hello Word” :
public async Task SendEmailAsync(Email email, CancellationToken token)
{
//
}
, , await SendEmailAsync.
foreach (var email in emails
{
if(token.IsCancellationRequested)
break;
_emailSender.SendEmailAsync(email, token); // await
}
:
.
FireAndForget Exception .
.
, , .
await- , async/await .
email, , CancellationToken. , , , , . RetryPolicy , ?! , .
Schedulers
.NET , .
. , ( , — , , instance ) /Tasks. Hangfire, - UI, , . .
, Hangfire. BackgroundJob.Enqueue(Expression<Action> methodCall).
var jobIds = new List<string>();
foreach (var email in emails)
{
if(token.IsCancellationRequested)
break;
jobIds.Add(BackgroundJob.Enqueue(
async () => await _emailSender.SendEmailAsync(email, token)));
}
, , . RetryPolicy , . , , .
, . , “” :
_observer.DoWork(observerArg, new CancellationToken())
- , . BackgroundJobClient.
var client = new BackgroundJobClient(JobStorage.Current);
// , .
var state = new EnqueuedState(“unique-queue-name”);
client.Create(() =>_observer.DoWork(observerArg, new CancellationToken()), state);
, . - unique-queue-name.
// instance hangfire .
_server = new BackgroundJobServer(new BackgroundJobServerOptions()
{
WorkerCount = 10,
Queues = new[] { “unique-queue-name” },
ServerName = _serverOptions.ServerName
});
WorkerCount - , . , .
, , . : . , . Hangfire , , .
_monitoringApi = JobStorage.Current.GetMonitoringApi();
:
Observer-service - , , ( HangFilre WorkerCount ).
Observer-manager - , ... . , , . .
Scheduler common db – - , Hangfire MsSql, PostgreSql Redis.
— . “”.
, , , , , , .
, , . , . Hangfire. :
1) . , , .
2) . . , , , .
3) . custom-id, . - .
4) , “default” . , , . job-filters . , .
5) , , , . , , , framework .
, . , , , , .
, ,
, , , . . ? , — , , . , ? , . , , . , .
? “”. - PrefetchCount .
Ready.
Conumer , Unacked. Consumer .
, _Error .
acknowledged, Consumer.
- PrefetchCount , ( ), WorkerCount, Hangfire.
:
Observer-services, . PrefetchCount 1
. , . , , Unacked.
"”, :
Observer-services , , Round-robin.
msg1 . , “Observer 1”. Unacked , .
msg2 . “Observer 1” , , “Observer 2”.
, “Observer-service 1” , ( - “ ... ?”).
, , acknowledgement Unacked Ready. . , , .
- , . _Error, RetryPolicy. , .
RetryPolicy :
1000 .
5 1,4,10... .
int.MaxValue .
? “”, /. PrefetchCount, 10, 10 , . - , , . , 10 , 5 “”, , , 11- , .
? ? , , ... ?! , , "" , CancellationToken.
Manager. . , , . , . , , :
Id () - Guid .
Name (), , , .
CreatedAt/ModifyAt ( / ).
WorkersCount, PrefetchCount - , .
Manager .
Id |
Name |
WorkerCount |
CreatedAt |
ModifyAt |
IsDeleted |
{Unique id} |
Observer service 1 |
10 |
{some date} |
null |
false |
{Unique id} |
Observer service 2 |
10 |
{some date} |
null |
false |
{Unique id} |
Observer service 3 |
10 |
{some date} |
null |
false |
. , , 3 - .
, , , N . IsDeleted=true.
, (Kill –9, ). , Docker. , , . “”, , , . , - ….
“” API. ( , “State queue” ). “” , , , , - .
, , “”. , , , , .
, , “” Created.
Id |
Name |
CreatedAt |
ModifyAt |
ServiceId |
Status |
{Observer id} |
My_new_observer |
{created date} |
null |
null |
Created |
, , , Processing .
Id |
Name |
CreatedAt |
ModifyAt |
ServiceId |
Status |
{Observer id} |
My_new_observer |
{created date} |
{modify date} |
{Observer service 1 id} |
Processing |
“” .
:
Created
Processing
OnDeleting
Deleted
"", :
1) , CancellationToken.
2) , FanOut. , “” , .
, — , ... “ ”.
Observer-service , . , “” CancellationToken. “” .
“” . , id . , .
Created, “” . - , “”.
OnDeleting Deleted, - “” , .
Id |
Name |
CreatedAt |
ModifyAt |
ServiceId |
Status |
{Observer id} |
My_new_observer |
{created date} |
{modify date} |
{Observer service 1 id} |
Deleted |
:
1) .
, . , - MsSql, RabbitMq, Kafka, Kubernetes , , SLA . , . - , .
2) blackout, .
, - , , , , , “”, . “”, . ( , .)
3) .
, "”, . "”, , .
4) . , "”.
. - , , .
5) “”, , .
, , “” . . . , , , , , .
, . , , - Unacked, - Ready. , , polling , . - "”, . , , , PrefetchCount. , , .