Índice
- Introdução Produtor / consumidor de tarefas
- Início do trabalho. Canal
- Consumo do canal. Channelreader
- Gravando no canal. Channelwriter
- Assincronia livre de alocação
- Interface IValueTaskSource
- Um pouco sobre o CompareExchange
- Problema de mergulho em pilha
- AsyncOperation - detalhes de implementação
Introdução
O problema Produtor / Consumidor é encontrado no caminho dos programadores com bastante frequência e por mais de uma dúzia de anos. O próprio Edsger Dijkstra resolveu esse problema - ele teve a idéia de usar semáforos para sincronizar threads ao organizar o trabalho em uma base de produtor / consumidor. E embora sua solução em sua forma mais simples seja conhecida e bastante trivial, no mundo real esse padrão (Produtor / Consumidor) pode ocorrer de uma forma muito mais complicada. Além disso, os padrões de programação modernos impõem suas impressões digitais, o código é escrito de forma mais simplificada e quebrada para reutilização adicional. Tudo é feito para diminuir o limite para escrever código de qualidade e simplificar esse processo. E o espaço para nome em questão - System.Threading.Channels - é outro passo em direção a esse objetivo.
Eu estava olhando para System.IO.Pipelines há um tempo atrás. Lá, era necessário um trabalho mais atento e uma compreensão profunda do assunto, Span e Memória, e para um trabalho eficaz era necessário não chamar métodos óbvios (para evitar alocações desnecessárias de memória) e pensar constantemente em bytes. Por esse motivo, a interface de programação do Pipeline não era trivial e não era intuitiva.
System.Threading.Channels apresenta ao usuário uma API muito mais simples de se trabalhar. Vale ressaltar que, apesar da simplicidade da API, essa ferramenta é altamente otimizada e provavelmente não alocará memória durante seu trabalho. Talvez isso se deva ao fato de que o ValueTask é usado em todos os lugares e, mesmo no caso de assincronia real, o IValueTaskSource é usado, que é reutilizado para outras operações. É aqui que reside todo o interesse em implementar Canais.
Canais são genéricos, o tipo genérico é, como você pode imaginar, o tipo que será produzido e consumido. O interessante é que a implementação da classe Channel, que se encaixa em 1 linha (fonte do github ):
namespace System.Threading.Channels
{
public abstract class Channel<T> : Channel<T, T> { }
}
Assim, a classe principal de canais é parametrizada por 2 tipos - separadamente para o canal produtor e o canal consumidor. Mas para canais realizados, isso não é usado.
Para aqueles familiarizados com os pipelines, a abordagem geral para começar parecerá familiar. Nomeadamente. Criamos uma classe central a partir da qual extraímos separadamente produtores ( ChannelWriter ) e consumidores ( ChannelReader ). Apesar dos nomes, vale lembrar que este é exatamente o produtor / consumidor, e não o leitor / escritor de outra tarefa clássica de multithreading com o mesmo nome. ChannelReader altera o estado do canal geral (retira o valor), que não está mais disponível. Então ele prefere não ler, mas consome. Mas conheceremos a implementação mais tarde.
Início do trabalho. Canal
A introdução aos canais começa com uma classe abstrata Channel <T> e uma classe estática de canal que cria a implementação mais apropriada. Além disso, neste canal comum, você pode obter um ChannelWriter para gravar no canal e um ChannelReader para consumo no canal. Um canal é um repositório de informações gerais para ChannelWriter e ChannelReader, portanto, são todos os dados armazenados nele. E a lógica de sua gravação ou consumo já está dispersa no ChannelWriter e no ChannelReader e, convencionalmente, os canais podem ser divididos em 2 grupos - ilimitados e limitados. Os primeiros são mais simples na implementação, você pode escrever neles sem limite (contanto que a memória permita). Os segundos são limitados por um determinado valor máximo do número de registros.
É aqui que a natureza da assincronia é um pouco diferente. Em canais ilimitados, a operação de gravação sempre será concluída de forma síncrona, não há nada para parar de gravar no canal. A situação é diferente para canais limitados. Com o comportamento padrão (que pode ser substituído), a operação de gravação será concluída de forma síncrona, desde que haja espaço no canal para novas instâncias. Quando o tubo estiver cheio, a operação de gravação não será concluída até que o espaço seja liberado (depois que o consumidor consumir o consumido). Portanto, aqui a operação será realmente assíncrona com a mudança de fluxos e alterações relacionadas (ou sem uma alteração, que será descrita mais adiante).
Na maioria das vezes, o comportamento dos leitores é o mesmo - se houver algo no canal, o leitor simplesmente lê e termina de forma síncrona. Se não houver nada, ele espera que alguém escreva algo.
A classe estática do canal contém 4 métodos para criar os canais acima:
Channel<T> CreateUnbounded<T>();
Channel<T> CreateUnbounded<T>(UnboundedChannelOptions options);
Channel<T> CreateBounded<T>(int capacity);
Channel<T> CreateBounded<T>(BoundedChannelOptions options);
Se desejar, você pode especificar opções mais precisas para criar um canal, o que ajudará a otimizá-lo para as necessidades especificadas.
UnboundedChannelOptions contém 3 propriedades, que são definidas como false por padrão:
- AllowSynchronousContinuations — , , . -. , . , , , . , , , . , - - , ;
- SingleReader — , . , ;
- SingleWriter — , ;
BoundedChannelOptions contém as mesmas 3 propriedades e mais 2 na parte superior
- AllowSynchronousContinuations - mesmo;
- SingleReader é o mesmo;
- SingleWriter é o mesmo;
- Capacidade - o número de registros colocados no canal. Este parâmetro também é um parâmetro construtor;
- FullMode - a enumeração BoundedChannelFullMode, que possui 4 opções, determina o comportamento ao tentar gravar em um canal completo:
- Aguardar - aguarda espaço livre para concluir a operação assíncrona
- DropNewest - um item gravável substitui o mais novo e termina de forma síncrona
- DropOldest - um item gravável substitui a extremidade mais antiga existente de forma síncrona
- DropWrite - o elemento gravável não é gravado, termina de forma síncrona
Dependendo dos parâmetros passados e do método chamado, será criada uma das três implementações: SingleConsumerUnboundedChannel , UnboundedChannel , BoundedChannel . Mas isso não é tão importante, porque usaremos o canal através da classe base Channel <TWrite, TRead>.
Possui 2 propriedades:
- Leitor de ChannelReader <TRead> {get; conjunto protegido; }
- ChannelWriter <TWrite> Writer {get; conjunto protegido; }
E também, 2 operadores de conversão implícita de tipo para ChannelReader <TRead> e ChannelWriter <TWrite>.
Um exemplo de como começar a trabalhar com canais:
Channel<int> channel = Channel.CreateUnbounded<int>();
//
ChannelWriter<int> writer = channel.Writer;
ChannelReader<int> reader = channel.Reader;
//
ChannelWriter<int> writer = channel;
ChannelReader<int> reader = channel;
Os dados são armazenados em uma fila. Para três tipos, três filas diferentes são usadas - ConcurrentQueue <T>, Deque <T> e SingleProducerSingleConsumerQueue <T>. Nesse ponto, parecia-me que estava desatualizado e perdi várias novas coleções mais simples. Mas eu me apresso a ficar chateado - eles não são para todos. Eles são rotulados como internos, portanto não podem ser usados. Mas se você precisar deles repentinamente à venda, poderá encontrá-los aqui (SingleProducerConsumerQueue) e aqui (Deque) . A implementação deste último é muito simples. Eu aconselho você a ler, você pode estudá-lo muito rapidamente.
Então, vamos estudar diretamente o ChannelReader e o ChannelWriter, além de detalhes interessantes de implementação. Todos eles se resumem a assíncrono, sem alocação de memória usando IValueTaskSource.
ChannelReader - consumidor
Quando um objeto consumidor é solicitado, uma das implementações da classe abstrata ChannelReader <T> é retornada. Novamente, diferentemente dos Pipelines, as APIs são simples e existem poucos métodos. Você só precisa conhecer a lista de métodos para entender como usá-lo na prática.
Métodos:
- Propriedade virtual get-only Conclusão da Tarefa {get; } Um
objeto do tipo Tarefa que termina quando o canal é fechado; - Propriedade virtual get-only int Count {get; }
Aqui deve ser enfatizado que o número atual de objetos legíveis é retornado; - Propriedade virtual get-only booleana CanCount {get; }
Indica se a propriedade Count está disponível; - bool TryRead(out T item)
. bool, , . out ( null, ); - ValueTask<bool> WaitToReadAsync(CancellationToken cancellationToken = default)
ValueTask true, , . ValueTask false, ( ); - ValueTask<T> ReadAsync(CancellationToken cancellationToken = default)
. , . .
, TryRead WaitToReadAsync. ( cancelation tokens), — TryRead. , while(true) WaitToReadAsync. true, , TryRead. TryRead , , . — , WaitToReadAsync, , , .
, , - .
ChannelWriter - produtor
Tudo é semelhante ao consumidor, então observe imediatamente os métodos:
- Método virtual bool TryComplete (Exception? Error = null)
Tenta marcar o canal como concluído, ou seja, mostre que nenhum dado será gravado nele. A exceção que causou a finalização do canal pode ser transmitida como um parâmetro opcional. Retorna true se tiver sido concluído com êxito; caso contrário, false (se o canal já tiver sido concluído ou não suportar terminação); - Método abstrato bool TryWrite (item T)
Tenta gravar um valor no canal. Retorna verdadeiro se for bem-sucedido e falso se não - Método abstrato ValueTask <bool> WaitToWriteAsync (CancellationToken cancellationToken = default)
Retorna um ValueTask com o valor true, que terminará quando houver um local para gravação no canal. O valor será falso se as gravações no canal não forem mais permitidas; - Método virtual ValueTask WriteAsync (item T, CancellationToken cancellationToken = default)
Grava de forma assíncrona no canal. Por exemplo, se o canal estiver cheio, a operação será realmente assíncrona e será concluída somente após liberar espaço para esse registro; - Método void Complete (Exception? Error = null)
Apenas tenta marcar o canal como concluído usando TryComplete e, em caso de falha, lança uma exceção.
Um pequeno exemplo do exposto acima (para iniciar facilmente suas próprias experiências):
Channel<int> unboundedChannel = Channel.CreateUnbounded<int>();
// ,
ChannelWriter<int> writer = unboundedChannel;
ChannelReader<int> reader = unboundedChannel;
//
int objectToWriteInChannel = 555;
await writer.WriteAsync(objectToWriteInChannel);
// , , ,
writer.Complete();
//
int valueFromChannel = await reader.ReadAsync();
Agora vamos para a parte mais interessante.
Assincronia livre de alocação
No processo de escrever e estudar o código, percebi que quase não havia nada de interessante na implementação de todas essas operações. Em geral, você pode descrevê-lo desta maneira - evitando bloqueios desnecessários usando coleções competitivas e o uso abundante do ValueTask, que é uma estrutura que economiza memória. No entanto, lembro-lhe que não é necessário analisar rapidamente todos os arquivos do seu PC e substituir todas as tarefas pelo ValueTask. Isso só faz sentido nos casos em que a operação é concluída de forma síncrona na maioria dos casos. Afinal, como lembramos, com a assincronia, é bem provável uma mudança de fluxo, o que significa que a pilha não será a mesma de antes. De qualquer forma, um verdadeiro profissional de desempenho sabe - não otimize antes que surjam problemas.
Uma coisa boa é que não vou me registrar como profissional e, portanto, é hora de descobrir qual é o segredo de escrever código assíncrono sem alocação de memória, o que, à primeira vista, parece bom demais para a verdade. Mas acontece.
Interface IValueTaskSource
Vamos começar nossa jornada desde o início - a estrutura ValueTask , que foi adicionada no .net core 2.0 e alterada na 2.1. Dentro dessa estrutura, existe um campo complicado de objeto _obj. É fácil adivinhar, com base no nome autoexplicativo, que uma das três coisas pode estar oculta nesse campo - nulo, Tarefa / Tarefa <T> ou IValueTaskSource. De fato, decorre da maneira como o ValueTask é criado.
Como o fabricante garante, essa estrutura deve ser usada apenas obviamente - com a palavra-chave wait. Ou seja, você não deve aplicar a espera muitas vezes à mesma ValueTask, usar combinadores, adicionar várias continuações, etc. Além disso, você não deve obter o resultado do ValueTask mais de uma vez. E isso se deve ao fato de que estamos tentando entender - a reutilização de todas essas coisas sem alocar memória.
Eu já mencionei a interface IValueTaskSource . É ele quem ajuda a economizar memória. Isso é feito reutilizando o próprio IValueTaskSource várias vezes para muitas tarefas. Mas, precisamente por causa dessa reutilização, não há como entrar no ValueTask.
Então IValueTaskSource. Essa interface possui três métodos, implementando os quais você economizará memória e tempo com êxito na alocação desses bytes estimados.
- GetResult - É chamado uma vez, quando na máquina de estado, formada no tempo de execução por métodos assíncronos, o resultado é necessário. O ValueTask possui um método GetResult, que chama o método de interface com o mesmo nome, que, como lembramos, pode ser armazenado no campo _obj.
- GetStatus - Chamado pela máquina de estado para determinar o status de uma operação. Também através do ValueTask.
- OnCompleted - Novamente, a máquina de estado chama para adicionar uma continuação à tarefa que não foi concluída naquele momento.
Mas, apesar da interface simples, a implementação exigirá alguma habilidade. E aqui você pode se lembrar sobre o que começamos - Canais . Esta implementação usa a classe AsyncOperationque é uma implementação do IValueTaskSource. Esta classe está oculta por trás do modificador de acesso interno. Mas isso não pára para entender os mecanismos básicos. Isso levanta a questão: por que não dar a implementação do IValueTaskSource às massas? A primeira razão (por diversão) é quando um martelo está nas mãos, pregos estão por toda parte, quando uma implementação IValueTaskSource está nas mãos, há um trabalho analfabeto com memória em todos os lugares. A segunda razão (mais plausível) é que, embora a interface seja simples e universal, a implementação real é ideal ao usar determinadas nuances do aplicativo. E é provavelmente por esse motivo que você pode encontrar implementações em várias partes do excelente e poderoso .net, como AsyncOperation sob o capô dos canais, AsyncIOOperation dentro da nova API de soquete e assim por diante.
No entanto, para ser justo, ainda existe uma implementação comum -ManualResetValueTaskSourceCore . Mas isso já está muito longe do tópico do artigo.
CompareExchange
Um método bastante popular da classe popular, evitando a sobrecarga das primitivas de sincronização clássicas. Acho que a maioria deles está familiarizada com isso, mas vale a pena descrever em três palavras, porque essa construção é usada com bastante frequência no AsyncOperation.
Na literatura mainstream, essa função é chamada comparar e trocar (CAS). Em .net, está disponível na classe Interlocked .
A assinatura é a seguinte:
public static T CompareExchange<T>(ref T location1, T value, T comparand) where T : class;
Também há sobrecargas com int, long, float, double, IntPtr, objeto.
O método em si é atômico, ou seja, é executado sem interrupções. Compara 2 valores e, se forem iguais, atribui o novo valor à variável. Eles resolvem o problema quando você precisa verificar o valor de uma variável e alterar a variável dependendo dela.
Digamos que você queira incrementar uma variável se seu valor for menor que 10.
Então existem 2 threads.
Stream 1 | Stream 2 |
---|---|
Verifica o valor de uma variável para alguma condição (ou seja, é menor que 10) que é acionada | - |
Entre verificar e alterar um valor | Atribui a uma variável um valor que não atende a uma condição (por exemplo, 15) |
Altera o valor, embora não deva, porque a condição não é mais atendida | - |
Ao usar esse método, você altera exatamente o valor que deseja ou não altera, enquanto obtém o valor real da variável.
location1 é uma variável cujo valor queremos alterar. É comparado com comparand, em caso de igualdade, o valor é escrito em location1. Se a operação for bem-sucedida, o método retornará o valor passado da variável location1. Caso contrário, o valor atual do local1 será retornado.
Mais profundamente, há uma instrução em linguagem assembly, cmpxchg, que faz isso. É ela quem é usada sob o capô.
Mergulho de pilha
Enquanto observava todo esse código, deparei-me com referências ao "Stack Dive" mais de uma vez. Isso é uma coisa muito legal e interessante que é realmente muito indesejável. A conclusão é que, com a execução síncrona de continuações, podemos ficar sem recursos da pilha.
Digamos que temos 10.000 tarefas, em grande estilo
//code1
await ...
//code2
Suponha que a primeira tarefa conclua a execução e, assim, libere a continuação da segunda, que imediatamente começamos a executar de forma síncrona nesse encadeamento, ou seja, pegando um pedaço da pilha com o quadro da continuação especificada. Por sua vez, essa continuação desbloqueará a continuação da terceira tarefa, que também começamos a executar imediatamente. Etc. Se não houver mais espera na sequência ou algo que de alguma forma descarte a pilha, simplesmente consumiremos todo o espaço da pilha. O que pode causar StackOverflow e falha no aplicativo. Na revisão de código, mencionarei como o AsyncOperation luta contra isso.
AsyncOperation como uma implementação IValueTaskSource
O código-fonte .
Dentro do AsyncOperation, há um campo _continuation do tipo Action <object>. O campo é usado para, você não acreditará, continuações. Mas, como geralmente ocorre em códigos muito modernos, os campos têm responsabilidades adicionais (como o coletor de lixo e o último bit na referência da tabela de métodos). Campo _continuação da mesma série. Existem 2 valores especiais que podem ser armazenados nesse campo, além da continuação propriamente dita e nula. s_availableSentinel e s_completedSentinel . Esses campos indicam que a operação está disponível e concluída de acordo. Ele pode ser acessado apenas para reutilização para uma operação completamente assíncrona.
O AsyncOperation também implementa IThreadPoolWorkItemcom um único método - void Execute () => SetCompletionAndInvokeContinuation (). O método SetCompletionAndInvokeContinuation faz a continuação. E esse método é chamado diretamente no código AsyncOperation ou através do Execute mencionado. Afinal, os tipos que implementam IThreadPoolWorkItem podem ser lançados no pool de threads de alguma forma como este ThreadPool.UnsafeQueueUserWorkItem (isso, preferLocal: false).
O método Execute será executado pelo pool de threads.
A execução da continuação em si é bastante trivial.
A continuação é copiada para uma variável local e s_completedSentinel é gravado em seu lugar- um objeto de fantoche artificial (ou uma sentinela, não sei como me dizer em nosso discurso), o que indica que a tarefa foi concluída. Bem, então a cópia local da continuação real é simplesmente executada. Se houver um ExecutionContext, essas ações serão postadas no contexto. Não há segredo aqui. Esse código pode ser chamado diretamente pela classe - simplesmente chamando o método que encapsula essas ações ou por meio da interface IThreadPoolWorkItem no pool de threads. Agora você pode adivinhar como a função com execução de continuação funciona de forma síncrona.
O primeiro método da interface IValueTaskSource é GetResult ( github ).
É simples, ele:
- _currentId.
_currentId — , . . ; - _continuation - s_availableSentinel. , , AsyncOperation . , (pooled = true);
- _result.
_result TrySetResult .
Método TrySetResult ( github ).
O método é trivial. - armazena o parâmetro recebido em _result e sinaliza a conclusão, ou seja, chama o método SignalCompleteion , o que é bastante interessante.
Método SignalCompletion ( github ).
Este método usa tudo o que falamos no começo.
No início, se _continuation == null, escrevemos o boneco s_completedSentinel.
Além disso, o método pode ser dividido em 4 blocos. Devo dizer imediatamente, por simplicidade de compreensão do esquema, o quarto bloco é apenas a execução síncrona da continuação. Ou seja, a execução trivial da continuação por meio do método, como descrevi no parágrafo sobre IThreadPoolWorkItem.
- _schedulingContext == null, .. ( if).
_runContinuationsAsynchronously == true, , — ( if).
IThreadPoolWorkItem . AsyncOperation . .
, if ( , ), , 2 3 , — .. 4 ; - _schedulingContext is SynchronizationContext, ( if).
_runContinuationsAsynchronously = true. . , , . , . 2 , :
sc.Post(s => ((AsyncOperation<TResult>)s).SetCompletionAndInvokeContinuation(), this);
. , , ( , ), 4 — ; - , 2 . .
, _schedulingContext TaskScheduler, . , 2, .. _runContinuationsAsynchronously = true TaskScheduler . , Task.Factory.StartNew . . - — . , .
O segundo método da interface IValueTaskSource é GetStatus ( github ),
como um burro de São Petersburgo.
Se _continuation! = _CompletedSentinel, retorne ValueTaskSourceStatus.Pending
Se erro == nulo, retorne ValueTaskSourceStatus.Succeeded
Se _error.SourceException for OperationCanceledException, retorne ValueTaskSourceStatus.Canceled
Bem, já que muitos vieram até aqui, retorne ValueTasaultSourceStatus
e retorne final.Text , mas o método mais complexo da interface IValueTaskSource é OnCompleted ( github ).
O método adiciona uma continuação que é executada após a conclusão.
Captura ExecutionContext e SynchronizationContext conforme necessário.
Em seguida, Interlocked.CompareExchange , descrito acima, é usado para salvar a continuação no campo, comparando-a com nulo. Lembro que o CompareExchange retorna o valor atual da variável.
Se o salvamento da continuação tiver passado, o valor que estava na variável antes da atualização é retornado, ou seja, nulo. Isso significa que a operação não foi concluída no momento da gravação da continuação. E quem o concluir, descobrirá (como vimos acima). E não fazemos sentido executar ações adicionais. E isso completa o trabalho do método.
Se o valor não foi salvo, ou seja, algo diferente de nulo foi retornado do CompareExchange. Nesse caso, alguém conseguiu colocar valor mais rapidamente do que nós. Ou seja, ocorreu uma de duas situações - a tarefa foi concluída mais rapidamente do que alcançamos aqui ou houve uma tentativa de registrar mais de uma continuação, o que não deve ser feito.
Assim, verificamos o valor retornado, se é igual a s_completedSentinel - seria exatamente o que seria escrito em caso de conclusão.
- Se isso não for s_completedSentinel , não seremos usados de acordo com o plano - eles tentaram adicionar mais de uma continuação. Ou seja, aquele que já foi escrito e o que estamos escrevendo. E esta é uma situação excepcional;
- s_completedSentinel, , , . , _runContinuationsAsynchronously = false.
, , OnCompleted, awaiter'. . , AsyncOperation — System.Threading.Channels. , . , . , , ( ) . , awaiter' , , . awaiter'.
Para evitar essa situação, apesar de tudo, é necessário iniciar a continuação de forma assíncrona. É executado de acordo com os mesmos esquemas que os 3 primeiros blocos no método SignalCompleteion - apenas em um pool, em um contexto ou através de uma fábrica e um planejador
Aqui está um exemplo de sequências síncronas:
class Program
{
static async Task Main(string[] args)
{
Channel<int> unboundedChannel = Channel.CreateUnbounded<int>(new UnboundedChannelOptions
{
AllowSynchronousContinuations = true
});
ChannelWriter<int> writer = unboundedChannel;
ChannelReader<int> reader = unboundedChannel;
Console.WriteLine($"Main, before await. Thread id: {Thread.CurrentThread.ManagedThreadId}");
var writerTask = Task.Run(async () =>
{
Thread.Sleep(500);
int objectToWriteInChannel = 555;
Console.WriteLine($"Created thread for writing with delay, before await write. Thread id: {Thread.CurrentThread.ManagedThreadId}");
await writer.WriteAsync(objectToWriteInChannel);
Console.WriteLine($"Created thread for writing with delay, after await write. Thread id: {Thread.CurrentThread.ManagedThreadId}");
});
//Blocked here because there are no items in channel
int valueFromChannel = await reader.ReadAsync();
Console.WriteLine($"Main, after await (will be processed by created thread for writing). Thread id: {Thread.CurrentThread.ManagedThreadId}");
await writerTask;
Console.Read();
}
}
Saída:
Principal, antes de aguardar. ID do segmento: 1 Tópico
criado para gravação com atraso, antes de aguardar gravação. ID do thread: 4
principal, após aguardar (será processado pelo thread criado para gravação). ID do thread: 4 Tópico
criado para gravação com atraso, após aguardar gravação. Código do tópico: 4