
Olá, Habr! Karma foi drenado devido a um comentário descuidado sob o artigo holivar, o que significa que você precisa escrever uma postagem interessante (espero) e se reabilitar.
Eu uso um cliente de servidor de telegrama em php há vários anos. E como muitos usuários - cansado do crescimento constante do consumo de memória. Algumas sessões podem ocupar de 1 a 8 gigabytes de RAM! O suporte de banco de dados foi prometido há muito tempo, mas não houve progresso nessa direção. Eu tive que resolver o problema sozinho :) A popularidade do projeto de código aberto impôs requisitos interessantes na solicitação de pull:
- Compatibilidade com versões anteriores . Todas as sessões existentes devem continuar funcionando na nova versão (a sessão é uma instância serializada do aplicativo em um arquivo);
- Liberdade de escolha de banco de dados . Capacidade de alterar o tipo de armazenamento sem perder dados e a qualquer momento, já que os usuários possuem diferentes configurações de ambiente;
- Extensibilidade . Facilidade de adicionar novos tipos de bancos de dados;
- Salvar interface . O código do aplicativo que manipula dados não deve ser alterado;
- Assincronia . O projeto usa amphp, portanto, todas as operações do banco de dados devem ser sem bloqueio;
Para mais detalhes, convido a todos em cat.
O que vamos transferir
A maior parte da memória de MadelineProto é ocupada por chats, usuários e arquivos. Por exemplo, no cache de pares, tenho mais de 20 mil entradas. Esses são todos os usuários que a conta já viu (incluindo membros de todos os grupos), bem como canais, bots e grupos. Quanto mais antiga e ativa for a conta, mais dados estarão na memória. São dezenas e centenas de megabytes, e a maioria deles não é usada. Mas você não pode limpar todo o cache, porque o telegrama restringirá imediatamente a conta ao tentar receber os mesmos dados várias vezes. Por exemplo, depois de recriar a sessão em meu servidor de demonstração público, os telegramas em uma semana responderam à maioria das solicitações com o erro FLOOD_WAIT e nada realmente funcionou. Após o aquecimento do cache, tudo voltou ao normal.
Do ponto de vista do código, esses dados são armazenados como matrizes nas propriedades de um par de classes.
Arquitetura
Com base nos requisitos, um esquema nasceu:
- Todos os arrays "pesados" são substituídos por objetos que implementam ArrayAccess;
- Para cada tipo de banco de dados, criamos nossas próprias classes que herdam a base;
- Os objetos são criados e gravados nas propriedades durante __consrtuct e __awake;
- A fábrica abstrata seleciona a classe desejada para o objeto, dependendo do banco de dados selecionado nas configurações do aplicativo;
- Se o aplicativo já tiver outro tipo de armazenamento, então lemos todos os dados de lá e gravamos o array no novo armazenamento.
Problemas de mundo assíncrono
A primeira coisa que fiz foi criar interfaces e uma classe para armazenar arrays na memória. Este era o padrão, idêntico em comportamento à versão anterior do programa. Na primeira noite, fiquei muito animado com o sucesso do protótipo. O código era bom e simples. Até o momento não foi descoberto que é impossível usar geradores dentro de métodos da interface Iterator e dentro de métodos responsáveis por unset e isset.
Deve ser esclarecido aqui que amphp usa sintaxe de gerador para implementar assíncrono em php. Yield se torna análogo a async ... await from js. Se um método usa assincronia, então para obter um resultado dele, você precisa esperar por esse resultado no código usando yield. Por exemplo:
<?php
include 'vendor/autoload.php';
$MadelineProto = new \danog\MadelineProto\API('session.madeline');
$MadelineProto->async(true);
$MadelineProto->loop(function() use($MadelineProto) {
$myAsyncFunction = function() use($MadelineProto): \Generator {
$me = yield $MadelineProto->start();
yield $MadelineProto->echo(json_encode($me, JSON_PRETTY_PRINT|JSON_UNESCAPED_UNICODE));
};
yield $myAsyncFunction();
});
Se for da corda
yield $myAsyncFunction();
remover o rendimento, então o aplicativo será encerrado antes que este código seja executado. Não obteremos o resultado.
Adicionar rendimento antes de chamar métodos e funções não é muito difícil. Mas como a interface ArrayAccess é usada, os métodos não são chamados diretamente. Por exemplo, unset () chama offsetUnset () e isset () chama offsetIsset (). A situação é semelhante com iteradores foreach ao usar a interface Iterator.
Adicionar rendimento antes dos métodos integrados gera um erro, pois esses métodos não foram projetados para funcionar com geradores. Um pouco mais nos comentários: aqui e aqui .
Tive que comprometer e reescrever o código para usar meus próprios métodos. Felizmente, havia poucos lugares assim. Na maioria dos casos, os arrays eram usados para leitura ou escrita por chave. Esta funcionalidade fez grande amizade com geradores.
A interface resultante é:
<?php
use Amp\Producer;
use Amp\Promise;
interface DbArray extends DbType, \ArrayAccess, \Countable
{
public function getArrayCopy(): Promise;
public function isset($key): Promise;
public function offsetGet($offset): Promise;
public function offsetSet($offset, $value);
public function offsetUnset($offset): Promise;
public function count(): Promise;
public function getIterator(): Producer;
/**
* @deprecated
* @internal
* @see DbArray::isset();
*
* @param mixed $offset
*
* @return bool
*/
public function offsetExists($offset);
}
Exemplos de trabalho com dados
<?php
...
//
$existingChat = yield $this->chats[$user['id']];
//.
yield $this->chats[$user['id']] = $user;
// yield, .
$this->chats[$user['id']] = $user;
//unset
yield $this->chats->offsetUnset($id);
//foreach
$iterator = $this->chats->getIterator();
while (yield $iterator->advance()) {
[$key, $value] = $iterator->getCurrent();
//
}
Armazenamento de dados
A maneira mais fácil de armazenar dados é serializada. Tive que abandonar o uso do json para suportar objetos. A tabela possui duas colunas principais: chave e valor.
Um exemplo de consulta sql para criar uma tabela:
CREATE TABLE IF NOT EXISTS `{$this->table}`
(
`key` VARCHAR(255) NOT NULL,
`value` MEDIUMBLOB NULL,
`ts` TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`key`)
)
ENGINE = InnoDB
CHARACTER SET 'utf8mb4'
COLLATE 'utf8mb4_general_ci'
Cada vez que o aplicativo é iniciado, tentamos criar uma tabela para cada propriedade. Os clientes do Telegram não são recomendados para reiniciar mais de uma vez a cada poucas horas, portanto, não teremos várias solicitações para criar tabelas por segundo :)
Como a chave primária não aumenta automaticamente, a inserção e atualização de dados podem ser feitas com uma consulta, como em uma matriz regular:
INSERT INTO `{$this->table}`
SET `key` = :index, `value` = :value
ON DUPLICATE KEY UPDATE `value` = :value
Uma tabela com um nome no formato% account_id% _% class% _% variable_name% é criada para cada variável. Mas quando você inicia o aplicativo pela primeira vez, ainda não há uma conta. Nesse caso, você deve gerar um id temporário aleatório com o prefixo tmp. A cada lançamento, a classe de cada variável verifica se o id da conta apareceu. Se id estiver presente, as tabelas serão renomeadas.
Índices
A estrutura do banco de dados é o mais simples possível, de modo que novas propriedades serão adicionadas automaticamente no futuro. Não há conexões. Apenas índices de chave PRIMARY são usados. Mas há situações em que você precisa pesquisar em outros campos.
Por exemplo, há uma matriz / mesa de bate-papos. A chave é o id do bate-papo. Mas frequentemente você precisa pesquisar por nome de usuário. Quando o aplicativo estava armazenando dados em arrays, a pesquisa por nome de usuário era realizada como de costume, iterando sobre o array em foreach. Essa pesquisa funcionou a uma velocidade aceitável na memória, mas não no banco de dados. Portanto, outra tabela / array foi criada e uma propriedade correspondente na classe. A chave é o nome de usuário, o valor é o id do bate-papo. A única desvantagem dessa abordagem é que você precisa escrever código adicional para sincronizar as duas tabelas.
Cache
O mysql local é rápido, mas um pouco de cache nunca é demais. Especialmente se o mesmo valor for usado várias vezes seguidas. Por exemplo, primeiro verificamos a presença de um chat no banco de dados e, a seguir, obtemos alguns dados dele.
Um traço simples da
<?php
namespace danog\MadelineProto\Db;
use Amp\Loop;
use danog\MadelineProto\Logger;
trait ArrayCacheTrait
{
/**
* Values stored in this format:
* [
* [
* 'value' => mixed,
* 'ttl' => int
* ],
* ...
* ].
* @var array
*/
protected array $cache = [];
protected string $ttl = '+5 minutes';
private string $ttlCheckInterval = '+1 minute';
protected function getCache(string $key, $default = null)
{
$cacheItem = $this->cache[$key] ?? null;
$result = $default;
if (\is_array($cacheItem)) {
$result = $cacheItem['value'];
$this->cache[$key]['ttl'] = \strtotime($this->ttl);
}
return $result;
}
/**
* Save item in cache.
*
* @param string $key
* @param $value
*/
protected function setCache(string $key, $value): void
{
$this->cache[$key] = [
'value' => $value,
'ttl' => \strtotime($this->ttl),
];
}
/**
* Remove key from cache.
*
* @param string $key
*/
protected function unsetCache(string $key): void
{
unset($this->cache[$key]);
}
protected function startCacheCleanupLoop(): void
{
Loop::repeat(\strtotime($this->ttlCheckInterval, 0) * 1000, fn () => $this->cleanupCache());
}
/**
* Remove all keys from cache.
*/
protected function cleanupCache(): void
{
$now = \time();
$oldKeys = [];
foreach ($this->cache as $cacheKey => $cacheValue) {
if ($cacheValue['ttl'] < $now) {
$oldKeys[] = $cacheKey;
}
}
foreach ($oldKeys as $oldKey) {
$this->unsetCache($oldKey);
}
Logger::log(
\sprintf(
"cache for table:%s; keys left: %s; keys removed: %s",
$this->table,
\count($this->cache),
\count($oldKeys)
),
Logger::VERBOSE
);
}
}
Gostaria de prestar atenção especial ao startCacheCleanupLoop. Graças à magia do amphp, invalidar o cache é o mais simples possível. O retorno de chamada começa no intervalo especificado, percorre todos os valores e examina o campo ts, que armazena o carimbo de data / hora da última chamada para este elemento. Se a chamada foi há mais de 5 minutos (configurável nas configurações), o elemento é excluído. É muito fácil implementar um análogo ttl do redis ou memcache usando amphp. Tudo isso acontece em segundo plano e não bloqueia o thread principal.
Com a ajuda do cache e da assincronia, não apenas as leituras são aceleradas, mas também as gravações.
Aqui está o código-fonte do método que grava dados no banco de dados.
/**
* Set value for an offset.
*
* @link https://php.net/manual/en/arrayiterator.offsetset.php
*
* @param string $index <p>
* The index to set for.
* </p>
* @param $value
*
* @throws \Throwable
*/
public function offsetSet($index, $value): Promise
{
if ($this->getCache($index) === $value) {
return call(fn () =>null);
}
$this->setCache($index, $value);
$request = $this->request(
"
INSERT INTO `{$this->table}`
SET `key` = :index, `value` = :value
ON DUPLICATE KEY UPDATE `value` = :value
",
[
'index' => $index,
'value' => \serialize($value),
]
);
//Ensure that cache is synced with latest insert in case of concurrent requests.
$request->onResolve(fn () => $this->setCache($index, $value));
return $request;
}
$ this-> request cria uma Promise que grava dados de forma assíncrona. E as operações com o cache ocorrem de forma síncrona. Ou seja, você não precisa esperar por uma gravação no banco de dados e, ao mesmo tempo, certificar-se de que as operações de leitura começarão imediatamente a retornar novos dados.
O método onResolve do amphp revelou-se muito útil. Após a conclusão da inserção, os dados serão gravados no cache novamente. Se alguma operação de gravação atrasar e o cache e a base começarem a diferir, o cache será atualizado com o valor gravado na base por último. Essa. nosso cache voltará a ser consistente com a base.
Fonte
→ Link para pull request
E assim como outro usuário adicionou suporte para postgre. Demorou apenas 5 minutos para escrever as instruções para ele.
A quantidade de código pode ser reduzida movendo os métodos duplicados para a classe abstrata geral SqlArray.
Mais uma coisa
Percebeu-se que ao baixar arquivos de mídia do telegrama, o coletor de lixo padrão php não faz o trabalho e pedaços do arquivo permanecem na memória. Normalmente, os vazamentos eram do mesmo tamanho que o arquivo. Possível causa: O coletor de lixo é acionado automaticamente quando 10.000 links se acumulam. Em nosso caso, os links eram poucos (dezenas), mas cada um poderia se referir a megabytes de dados na memória. Era muito preguiçoso estudar milhares de linhas de código com a implementação mtproto. Por que não experimentar a muleta elegante com \ gc_collect_cycles (); primeiro?
Surpreendentemente, resolveu o problema. Isso significa que basta configurar o início periódico da limpeza. Felizmente, o amphp fornece ferramentas simples para execução em segundo plano em intervalos especificados.
Limpar a memória a cada segundo parecia muito fácil e não muito eficaz. Decidi usar um algoritmo que verifica o ganho de memória desde a última limpeza. A limpeza ocorre se o ganho for maior que o limite.
<?php
namespace danog\MadelineProto\MTProtoTools;
use Amp\Loop;
use danog\MadelineProto\Logger;
class GarbageCollector
{
/**
* Ensure only one instance of GarbageCollector
* when multiple instances of MadelineProto running.
* @var bool
*/
public static bool $lock = false;
/**
* How often will check memory.
* @var int
*/
public static int $checkIntervalMs = 1000;
/**
* Next cleanup will be triggered when memory consumption will increase by this amount.
* @var int
*/
public static int $memoryDiffMb = 1;
/**
* Memory consumption after last cleanup.
* @var int
*/
private static int $memoryConsumption = 0;
public static function start(): void
{
if (static::$lock) {
return;
}
static::$lock = true;
Loop::repeat(static::$checkIntervalMs, static function () {
$currentMemory = static::getMemoryConsumption();
if ($currentMemory > static::$memoryConsumption + static::$memoryDiffMb) {
\gc_collect_cycles();
static::$memoryConsumption = static::getMemoryConsumption();
$cleanedMemory = $currentMemory - static::$memoryConsumption;
Logger::log("gc_collect_cycles done. Cleaned memory: $cleanedMemory Mb", Logger::VERBOSE);
}
});
}
private static function getMemoryConsumption(): int
{
$memory = \round(\memory_get_usage()/1024/1024, 1);
Logger::log("Memory consumption: $memory Mb", Logger::ULTRA_VERBOSE);
return (int) $memory;
}
}