Usando Atomics.wait (), Atomics.notify () e Atomics.waitAsync ()

Os métodos estáticos Atomics.wait () e Atomics.notify () são primitivos de sincronização de baixo nível que podem ser usados ​​para implementar mutexes e outros mecanismos semelhantes. Mas, como o método Atomics.wait()está bloqueando, ele não pode ser chamado na thread principal (se você tentar fazer isso, um erro será gerado TypeError).



O mecanismo V8, desde a versão 8.7, suporta uma opção sem bloqueio Atomics.wait()chamada Atomics.waitAsync () . Este novo método pode ser usado no thread principal. Hoje vamos mostrar como usar essas APIs de baixo nível para criar um mutex que pode ser executado de forma síncrona (em threads de trabalho) e de forma assíncrona (em threads de trabalho ou na thread principal).











Atomics.wait () e Atomics.waitAsync ()



Métodos Atomics.wait()e Atomics.waitAsync()use os seguintes parâmetros:



  • buffer: uma matriz do tipo Int32Arrayou BigInt64Array, que se baseia em SharedArrayBuffer.
  • index: o índice real do elemento na matriz.
  • expectedValue: o valor que esperamos que seja representado na memória, no local descrito com buffere index.
  • timeout: tempo limite em milissegundos (opcional, o padrão é Infinity).


Atomics.wait()retorna uma string. Se o valor esperado não for encontrado no local de memória especificado, ele Atomics.wait()sai imediatamente, retornando uma string not-equal. Caso contrário, o thread é bloqueado. Um dos eventos a seguir deve ocorrer para que o bloqueio seja liberado. A primeira é uma chamada de outro thread de um método Atomics.notify()com uma indicação do lugar na memória em que o método está interessado Atomics.wait(). O segundo é a expiração do tempo limite. No primeiro caso, ele Atomics.wait()retornará uma string ok, no segundo - um valor de string timed-out.



O método Atomics.notify()usa os seguintes parâmetros:



  • typedArray: uma matriz do tipo Int32Arrayou BigInt64Array, que se baseia em SharedArrayBuffer.
  • index: o índice real do elemento na matriz.
  • count: número de agentes aguardando notificação (parâmetro opcional, definido como padrão Infinity).


O método Atomics.notify()notifica o número especificado de agentes aguardando notificação no endereço descrito typedArraye os indexdesvia na ordem FIFO. Se várias chamadas foram feitas Atomics.wait()ou Atomics.waitAsync()estão assistindo ao mesmo lugar na memória, todas acabam na mesma fila.



Ao contrário de um método Atomics.wait(), um método Atomics.waitAsync()retorna imediatamente um valor onde é chamado. Pode ser um dos seguintes valores:



  • { async: false, value: 'not-equal' } - se o local de memória especificado não contiver o valor esperado.
  • { async: false, value: 'timed-out' } - somente quando o tempo limite é definido como 0.
  • { async: true, value: promise } - Em outros casos.


Uma promessa, depois de algum tempo, pode ser resolvida com sucesso por um valor de string ok(se um método foi chamado Atomics.notify(), para o qual foram passadas informações sobre o lugar na memória que foi passado Atomics.waitAsync()). Isso pode ser resolvido com um valor timed-out. Esta promessa nunca é rejeitada.



O exemplo a seguir demonstra os princípios básicos de uso Atomics.waitAsync():



const sab = new SharedArrayBuffer(16);
const i32a = new Int32Array(sab);
const result = Atomics.waitAsync(i32a, 0, 0, 1000);
//                                     |  |  ^ - ()
//                                     |  ^  
//                                     ^ 

if (result.value === 'not-equal') {
  //   SharedArrayBuffer   .
} else {
  result.value instanceof Promise; // true
  result.value.then(
    (value) => {
      if (value == 'ok') { /*   */ }
      else { /*  - */ }
    });
}

//      :
Atomics.notify(i32a, 0);


Agora vamos falar sobre como criar um mutex que pode ser usado nos modos síncrono e assíncrono. Deve-se observar que a implementação da versão síncrona do mutex foi discutida anteriormente. Por exemplo - neste material.



Neste exemplo, não usaremos o parâmetro timeoutao chamar Atomics.wait()e Atomics.waitAsync(). Este parâmetro pode ser usado para implementar condicionais relacionadas ao tempo limite.



Nossa classe que AsyncLockrepresenta um mutex funciona com um buffer SharedArrayBuffere implementa os seguintes métodos:



  • lock(): bloqueia o thread até que tenhamos a oportunidade de capturar o mutex (aplicável apenas no thread de trabalho).
  • unlock(): libera o mutex (este é o oposto lock()).
  • executeLocked(callback): tenta adquirir o bloqueio sem bloquear o segmento. Este método pode ser usado no thread principal. Ele planeja executar o callback no momento em que pudermos adquirir o bloqueio.


Vamos dar uma olhada em como esses métodos podem ser implementados. A declaração da classe inclui constantes e um construtor que leva um buffer SharedArrayBuffer.



class AsyncLock {
  static INDEX = 0;
  static UNLOCKED = 0;
  static LOCKED = 1;

  constructor(sab) {
    this.sab = sab;
    this.i32a = new Int32Array(sab);
  }

  lock() {
    /* … */
  }

  unlock() {
    /* … */
  }

  executeLocked(f) {
    /* … */
  }
}


Aqui, o elemento i32a[0]contém o valor LOCKEDou UNLOCKED. Ele, ademais, representa o lugar na memória que interessa Atomics.wait()e Atomics.waitAsync(). A classe AsyncLockoferece os seguintes recursos básicos:



  1. i32a[0] == LOCKEDe o thread estiver em um estado de espera (após uma chamada Atomics.wait()ou Atomics.waitAsync()), assistindo i32a[0], será eventualmente notificado.
  2. Depois que o segmento for notificado, ele tentará obter o bloqueio. Se tiver êxito, então, quando liberar o bloqueio, ele fará a chamada Atomics.notify().


Captura e liberação de bloqueio síncrono



Considere o código de um método lock()que só pode ser chamado a partir de um thread de trabalho.



lock() {
  while (true) {
    const oldValue = Atomics.compareExchange(this.i32a, AsyncLock.INDEX,
                        /*   >>> */  AsyncLock.UNLOCKED,
                        /*   >>> */  AsyncLock.LOCKED);
    if (oldValue == AsyncLock.UNLOCKED) {
      return;
    }
    Atomics.wait(this.i32a, AsyncLock.INDEX,
                 AsyncLock.LOCKED); // <<< ,    
  }
}


Quando um método é chamado a partir de um encadeamento lock(), ele primeiro tenta adquirir o bloqueio, usando-o Atomics.compareExchange()para alterar o estado do bloqueio de UNLOCKEDpara LOCKED. O método Atomics.compareExchange()tenta realizar uma operação atômica de alteração do estado de bloqueio, ele retorna o valor original localizado na área de memória especificada. Se o valor original era UNLOCKED, então sabemos que a mudança de estado foi bem-sucedida e que o encadeamento adquiriu o bloqueio. Você não precisa fazer mais nada.



Se não foi Atomics.compareExchange()possível alterar o estado do bloqueio, isso significa que outro segmento está segurando o bloqueio. Como resultado, a thread a partir da qual o método é chamado lock()tenta usar o métodoAtomics.wait()para esperar até que o bloqueio seja liberado por outro segmento. Se o valor esperado ainda estiver armazenado na área de interesse da memória (no nosso caso - AsyncLock.LOCKED), a chamada Atomics.wait()bloqueará a thread. O retorno de Atomics.wait()só acontecerá quando outro encadeamento chamar Atomics.notify().



O método unlock()libera o bloqueio configurando-o para o estado UNLOCKEDe o chama Atomics.notify()para notificar os agentes que estão esperando que o bloqueio seja liberado. Presume-se que uma operação de mudança de estado de bloqueio sempre é bem-sucedida. Isso ocorre porque o encadeamento que executa esta operação está travando. Portanto, nada mais deve chamar o método neste momento unlock().



unlock() {
  const oldValue = Atomics.compareExchange(this.i32a, AsyncLock.INDEX,
                      /*   >>> */  AsyncLock.LOCKED,
                      /*   >>> */  AsyncLock.UNLOCKED);
  if (oldValue != AsyncLock.LOCKED) {
    throw new Error('Tried to unlock while not holding the mutex');
  }
  Atomics.notify(this.i32a, AsyncLock.INDEX, 1);
}


Em um caso típico, tudo acontece assim: o bloqueio está livre e o thread T1 o captura, mudando seu estado usando Atomics.compareExchange(). O thread T2 tenta adquirir o bloqueio chamando Atomics.compareExchange(), mas não pode alterar seu estado. Então T2 chama Atomics.wait(), esta chamada irá bloquear o thread. Depois de algum tempo, o encadeamento T1 libera o bloqueio e chama Atomics.notify(). Isso faz com que a chamada Atomics.wait()para T2 retorne oke o thread T2 para sair do bloqueio. T2 então tenta adquirir o bloqueio novamente. Desta vez, ele consegue.



Existem dois casos especiais aqui. Sua análise visa demonstrar os motivos Atomics.wait()e Atomics.waitAsync()verificar um valor específico no índice especificado do elemento da matriz. Estes são os casos:



  • T1 , T2 . T2 , Atomics.compareExchange(), . T1 , T2 Atomics.wait(). T2 Atomics.wait(), not-equal. T2 .
  • T1 , T2 Atomics.wait() . T1 , T2 ( Atomics.wait()) Atomics.compareExchange() . , T3, . . Atomics.compareExchange() T2 . T2 Atomics.wait() , T3 .


O último caso especial demonstra o fato de que nosso mutex não está funcionando de maneira justa. Pode acontecer que o encadeamento T2 esteja aguardando a liberação do bloqueio, mas T3 conseguiu adquiri-lo imediatamente após sua liberação. Uma implementação de bloqueio que é mais adequada para uso no mundo real pode usar vários estados de bloqueio existentes para distinguir entre situações em que o bloqueio foi simplesmente “adquirido” e em que “houve um conflito durante a aquisição”.



Captura de bloqueio assíncrono



Um método sem bloqueio executeLocked()pode, ao contrário de um método lock(), ser chamado a partir do thread principal. Ele recebe, como único parâmetro, um retorno de chamada e programa o retorno de chamada após obter o bloqueio com sucesso.



executeLocked(f) {
  const self = this;

  async function tryGetLock() {
    while (true) {
      const oldValue = Atomics.compareExchange(self.i32a, AsyncLock.INDEX,
                          /*   >>> */  AsyncLock.UNLOCKED,
                          /*   >>> */  AsyncLock.LOCKED);
      if (oldValue == AsyncLock.UNLOCKED) {
        f();
        self.unlock();
        return;
      }
      const result = Atomics.waitAsync(self.i32a, AsyncLock.INDEX,
                                       AsyncLock.LOCKED);
                                   //  ^ ,    
      await result.value;
    }
  }

  tryGetLock();
}


A função interna tryGetLock()primeiro tenta adquirir o bloqueio com Atomics.compareExchange(). Se a chamada desse método resultar em uma mudança de estado de bloqueio bem-sucedida, a função pode chamar um retorno de chamada e, em seguida, liberar o bloqueio e sair.



Se a chamada Atomics.compareExchange()não permitiu adquirir o bloqueio, temos que tentar fazê-lo novamente, no momento em que provavelmente o bloqueio estará livre. Mas não podemos bloquear o thread e esperar que o bloqueio seja liberado. Em vez disso, estamos Atomics.waitAsync()programando uma nova tentativa de adquirir o bloqueio usando o método e a promessa que ele retorna.



Se obtivermos sucesso na execução do método Atomics.waitAsync(), a promessa retornada por este método será resolvida quando o thread que continha o bloqueio chamarAtomics.notify()... Depois disso, o thread que queria adquirir o bloqueio, como antes, tenta fazê-lo novamente.



Aqui, são possíveis aqueles casos especiais que são característicos da versão síncrona (o bloqueio é liberado entre as chamadas Atomics.compareExchange()e Atomics.waitAsync(); o bloqueio é capturado por outro encadeamento, fazendo isso entre os momentos de resolução da promessa e a chamada Atomics.compareExchange()). Portanto, em um código semelhante aplicável em projetos reais, isso deve ser levado em consideração.



Resultado



Neste artigo, falamos sobre as primitivas de sincronização de baixo nível Atomics.wait(), Atomics.waitAsync()e Atomics.notify(). Analisamos um exemplo de criação de um mutex baseado neles, que pode ser usado tanto no thread principal quanto em threads de trabalho.



Atomics.wait (), Atomics.waitAsync () e Atomics.notify () serão úteis em seus projetos?



All Articles