Big / Bug Data: Analisando o código-fonte do Apache Flink

image1.png


Os aplicativos de Big Data processam grandes quantidades de informações, geralmente em tempo real. Naturalmente, esses aplicativos devem ser altamente confiáveis ​​para que nenhum erro no código possa interferir no processamento dos dados. Para alcançar alta confiabilidade, é necessário acompanhar de perto a qualidade do código dos projetos desenvolvidos para esta área. O analisador estático PVS-Studio lida com este problema. Hoje, o projeto Apache Flink, desenvolvido pela Apache Software Foundation, uma das líderes no mercado de software de Big Data, foi escolhido como objeto de teste do analisador.



O que é Apache Flink? É uma estrutura de código aberto para processamento distribuído de grandes quantidades de dados. Ele foi desenvolvido como uma alternativa ao Hadoop MapReduce em 2010 na Universidade Técnica de Berlim. A estrutura é baseada em um mecanismo de execução distribuído para aplicativos de processamento em lote e fluxo. Este mecanismo foi escrito em Java e Scala. Hoje, o Apache Flink pode ser usado em projetos escritos em Java, Scala, Python e até SQL.



Análise de projeto



Tendo baixado o código-fonte do projeto, comecei a construir o projeto com o comando 'mvn clean package -DskipTests' especificado nas instruções no GitHub . Enquanto a montagem estava em andamento, usando o utilitário CLOC , descobri que existem 10838 arquivos Java no projeto, que têm cerca de 1,3 milhão de linhas de código. Além disso, havia 3833 arquivos Java de teste, o que é mais de 1/3 de todos os arquivos Java. Notei também que o projeto usa o analisador de código estático FindBugs e o utilitário Cobertura, que fornece informações sobre a cobertura de código por testes. Com tudo isso em mente, fica claro que os desenvolvedores do Apache Flink monitoraram cuidadosamente a qualidade do código e a cobertura do teste durante o desenvolvimento.



Após uma construção bem-sucedida, abri o projeto no IntelliJ IDEA e executei a análise usando o plugin PVS-Studio for IDEA e Android Studio . Os avisos do analisador foram distribuídos da seguinte forma:



  • 183 alto;
  • 759 Médio;
  • 545 Low.


Cerca de 2/3 dos gatilhos do analisador PVS-Studio foram atribuídos a arquivos de teste. Considerando esse fato e o tamanho da base de código do projeto, podemos dizer que os desenvolvedores do Apache Flink conseguiram manter a qualidade do código no seu melhor.



Tendo estudado os avisos do analisador com mais detalhes, escolhi os mais interessantes na minha opinião. Então vamos ver o que o PVS-Studio conseguiu encontrar neste projeto!





Só um pouco de descuido



V6001 Existem subexpressões idênticas ' processData ' à esquerda e à direita do operador '=='. CheckpointStatistics.java (229)



@Override
public boolean equals(Object o) 
{
  ....
  CheckpointStatistics that = (CheckpointStatistics) o;
  return id == that.id &&
    savepoint == that.savepoint &&
    triggerTimestamp == that.triggerTimestamp &&
    latestAckTimestamp == that.latestAckTimestamp &&
    stateSize == that.stateSize &&
    duration == that.duration &&
    alignmentBuffered == that.alignmentBuffered &&
    processedData == processedData &&                // <=
    persistedData == that.persistedData &&
    numSubtasks == that.numSubtasks &&
    numAckSubtasks == that.numAckSubtasks &&
    status == that.status &&
    Objects.equals(checkpointType, that.checkpointType) &&
    Objects.equals(
      checkpointStatisticsPerTask, 
      that.checkpointStatisticsPerTask);
}
      
      





No contexto de outras expressões em troca, esse erro não é muito marcante. Ao substituir o método equals para a classe CheckpointStatistics , o programador cometeu um erro na expressão processData == processData , que não tem sentido porque é sempre verdadeira. Da mesma forma, o restante da expressão em retorno deveria ser comparado com o campo do objeto atual this e o objeto That : processingData == that.processedData... Esta situação é um dos padrões de erro típicos encontrados em funções de comparação, que são descritos em detalhes no artigo "O mal vive em funções de comparação ". Portanto, apenas "um pouco de desatenção" quebrou a lógica de verificar a equivalência de objetos da classe CheckpointStatistics .



Expressão é sempre verdadeira



A expressão V6007 'input2.length> 0' é sempre verdadeira. Operator.java (283)



public static <T> Operator<T> createUnionCascade(Operator<T> input1, 
                                                 Operator<T>... input2) 
{
  if (input2 == null || input2.length == 0) 
  {
    return input1;                                // <=
  } 
  else if (input2.length == 1 && input1 == null) 
  {
    return input2[0];
  }
  ....
  if (input1 != null) 
  {
    ....
  } 
  else if (input2.length > 0 && input2[0] != null) // <=
  {
    ....
  } 
  else 
  {
    ....
  }
}
      
      





Nesse método, o analisador mostrou-se mais atento do que uma pessoa, que decidiu reportar de maneira peculiar, indicando que a expressão input2.length> 0 será sempre verdadeira. A razão é que se o comprimento da matriz input2 for 0, então a condição input2 == null || input2.length == 0 do primeiro se no método for verdadeiro, e a execução do método será interrompida antes de atingir a linha com a expressão input2.length> 0 .



Analisador que tudo vê



A expressão V6007 'slotSharingGroup == null' é sempre falsa. StreamGraphGenerator.java (510)



private <T> Collection<Integer> transformFeedback(....)
{
  ....
  String slotSharingGroup = determineSlotSharingGroup(null, allFeedbackIds);
  if (slotSharingGroup == null)
  {
    slotSharingGroup = "SlotSharingGroup-" + iterate.getId();
  }
  ....
}
      
      





O analisador relatou que slotSharingGroup == null é sempre falso. Isso sugere que o método determineSlotSharingGroup nunca retornará nulo . O analisador é tão inteligente que foi capaz de calcular todos os valores que esse método pode retornar? Vamos verificar melhor tudo nós mesmos:



public class StreamGraphGenerator 
{
  ....
  public static final String DEFAULT_SLOT_SHARING_GROUP = "default";
  ....
  private String determineSlotSharingGroup(String specifiedGroup, 
                                           Collection<Integer> inputIds) 
  {
    if (specifiedGroup != null)
    {
      return specifiedGroup; // <= 1
    }
    else
    {
      String inputGroup = null;
      for (int id: inputIds)
      {
        String inputGroupCandidate = streamGraph.getSlotSharingGroup(id);
        if (inputGroup == null)
        {
          inputGroup = inputGroupCandidate;
        }
        else if (!inputGroup.equals(inputGroupCandidate))
        {
          return DEFAULT_SLOT_SHARING_GROUP; // <= 2
        }
      }
      return inputGroup == null 
             ? DEFAULT_SLOT_SHARING_GROUP 
             : inputGroup; // <= 3
    }
  }
  ....
}
      
      





Na ordem percorremos todos os retornos e vemos o que pode recuperar este método:



  • O primeiro retorno retornará o argumento para o método specifiedGroup , mas somente se não for nulo .
  • return for DEFAULT_SLOT_SHARING_GROUP, ;
  • return inputGroup, null. DEFAULT_SLOT_SHARING_GROUP.


Acontece que o analisador foi realmente capaz de calcular a impossibilidade de retornar null a partir do método determineSlotSharingGroup e nos alertou sobre isso, apontando a falta de sentido de verificar slotSharingGroup == null . E embora esta situação não seja errônea, tal proteção adicional do analisador será capaz de detectar um erro em algum outro caso. Por exemplo, quando você precisa de um método para retornar nulo sob certas condições.



Colete-os todos



A expressão V6007 'currentCount <= lastEnd' é sempre verdadeira. CountSlidingWindowAssigner.java (75)



V6007 A expressão 'lastStart <= currentCount' é sempre verdadeira. CountSlidingWindowAssigner.java (75)



@Override
public Collection<CountWindow> assignWindows(....) throws IOException 
{
  Long countValue = count.value();
  long currentCount = countValue == null ? 0L : countValue;
  count.update(currentCount + 1);
  long lastId = currentCount / windowSlide;
  long lastStart = lastId * windowSlide;
  long lastEnd = lastStart + windowSize - 1;
  List<CountWindow> windows = new ArrayList<>();
  while (lastId >= 0 && 
         lastStart <= currentCount && 
         currentCount <= lastEnd) 
  {
    if (lastStart <= currentCount && currentCount <= lastEnd) // <=
    {
      windows.add(new CountWindow(lastId));
    }
    lastId--;
    lastStart -= windowSlide;
    lastEnd -= windowSlide;
  }
  return windows;
}
      
      





O analisador avisa que as expressões currentCount <= lastEnd e lastStart <= currentCount são sempre verdadeiras. E, de fato, se você observar a condição do loop while , verá que existem exatamente as mesmas expressões. Isso significa que dentro do loop essas expressões sempre serão verdadeiras, portanto todos os objetos do tipo CountWindow criados no loop serão adicionados à lista de janelas . Existem muitas opções para o aparecimento dessa verificação sem sentido, e a primeira coisa que vem à mente é um artefato de refatoração ou a supervisão de um desenvolvedor. Mas pode ser um erro, se você quiser verificar outra coisa ...



Ordem de argumento incorreta



V6029 Possível ordem incorreta de argumentos passados ​​para o método: 'hasBufferForReleasedChannel', 'hasBufferForRemovedChannel'. NettyMessageClientDecoderDelegateTest.java (165), NettyMessageClientDecoderDelegateTest.java (166)



private void testNettyMessageClientDecoding(
       boolean hasEmptyBuffer,
       boolean hasBufferForReleasedChannel,
       boolean hasBufferForRemovedChannel) throws Exception 
{
  ....
  List<BufferResponse> messages = createMessageList (
    hasEmptyBuffer,
    hasBufferForReleasedChannel,
    hasBufferForRemovedChannel);
  ....
}
      
      





A falta de habilidade do Java de chamar um método com parâmetros nomeados às vezes é uma piada cruel com os desenvolvedores. Isso é exatamente o que aconteceu quando o analisador apontou para o método createMessageList . Olhando para a definição desse método, fica claro que o parâmetro hasBufferForRemovedChannel deve ser passado para o método antes do parâmetro hasBufferForReleasedChannel :



private List<BufferResponse> createMessageList(
  boolean hasEmptyBuffer,
  boolean hasBufferForRemovedChannel,
  boolean hasBufferForReleasedChannel) 
{
  ....
  if (hasBufferForReleasedChannel) {
    addBufferResponse(messages, 
                      releasedInputChannelId, 
                      Buffer.DataType.DATA_BUFFER, 
                      BUFFER_SIZE, 
                      seqNumber++);
  }
  if (hasBufferForRemovedChannel) {
    addBufferResponse(messages, 
                      new InputChannelID(), 
                      Buffer.DataType.DATA_BUFFER, 
                      BUFFER_SIZE, 
                      seqNumber++);
  }
  ....
  return messages;
}
      
      





No entanto, ao chamar o método, o desenvolvedor confundiu a ordem desses argumentos, e é por isso que a lógica do método createMessageList será quebrada se os valores dos argumentos mistos forem diferentes.



Oh, este copiar-colar



V6032 É estranho que o corpo do método 'seekToFirst' seja totalmente equivalente ao corpo de outro método 'seekToLast'. RocksIteratorWrapper.java (53), RocksIteratorWrapper.java (59)



public class RocksIteratorWrapper implements RocksIteratorInterface, Closeable {
  ....
  private RocksIterator iterator;
  ....

  @Override
  public void seekToFirst() {
    iterator.seekToFirst(); // <=
    status(); 
  }
  
  @Override
  public void seekToLast() {
    iterator.seekToFirst();  // <=
    status();
  }
  
  ....
}
      
      





Os corpos dos seekToFirst e métodos seekToLast são a mesma coisa. Além disso, ambos os métodos são usados ​​no código.



Algo está sujo aqui! Na verdade, se você observar quais métodos o objeto iterador possui , ficará claro qual erro o analisador ajudou a encontrar:



public class RocksIterator extends AbstractRocksIterator<RocksDB>
{
  ....
}

public abstract class AbstractRocksIterator<....> extends ....
{
  ....
  public void seekToFirst() // <=
  {
    assert this.isOwningHandle();
    this.seekToFirst0(this.nativeHandle_);
  }
  
  public void seekToLast() // <=
  {
    assert this.isOwningHandle();
    this.seekToLast0(this.nativeHandle_);
  }
  ....
}
      
      





Acontece que o método seekToLast da classe RocksIteratorWrapper foi criado pelo método de copiar e colar seekToFirst da mesma classe. No entanto, por algum motivo, o desenvolvedor esqueceu de substituir o iterador 's seekToFirst método de chamada com seekToLast .



Confusão com strings de formato



V6046 Formato incorreto. Um número diferente de itens de formato é esperado. Argumentos não usados: 1. UnsignedTypeConversionITCase.java (102)



public static void prepareMariaDB() throws IllegalStateException {
  ....
  if (!initDbSuccess) {
    throw new IllegalStateException(
      String.format(
        "Initialize MySQL database instance failed after {} attempts," + // <=
        " please open an issue.", INITIALIZE_DB_MAX_RETRY));
  }
}
      
      





As strings de formato do método String.format e dos loggers Java são diferentes. Ao contrário da string de formato do método String.format , onde as substituições de argumento são especificadas usando o caractere '%', as strings de formato do logger usam a combinação de caracteres '{}'. Por causa dessa confusão, esse erro ocorreu. Como uma string de formato, uma string é passada para o método String.format , que provavelmente foi copiado de outro lugar onde foi usado em algum logger. Como resultado, o valor do campo INITIALIZE_DB_MAX_RETRY não será substituído na mensagem IllegalStateException . em vez de '{}', e quem captura ou registra essa exceção nunca saberá quantas tentativas de conexão com o banco de dados foram feitas.



Distribuição anormal



V6048 Esta expressão pode ser simplificada. Operando 'índice' na operação é igual a 0. CollectionUtil.java (76)



public static <T> Collection<List<T>> partition(Collection<T> elements, 
                                                int numBuckets) 
{
  Map<Integer, List<T>> buckets = new HashMap<>(numBuckets);
  
  int initialCapacity = elements.size() / numBuckets;

  int index = 0;
  for (T element : elements) 
  {
    int bucket = index % numBuckets;                                 // <=
    buckets.computeIfAbsent(bucket, 
                            key -> new ArrayList<>(initialCapacity))

           .add(element); 
  }

  return buckets.values();
}
      
      





O método de partição divide os elementos da coleção de elementos em vários segmentos e, em seguida, retorna esses segmentos. Porém, devido ao erro apontado pelo analisador, não ocorrerá separação. A expressão usada para determinar o índice do número do segmento % numBuckets sempre será 0, porque o índice é sempre 0. Originalmente, pensei que o código para esse método foi refatorado, como resultado, eles se esqueceram de adicionar um aumento na variável de índice no loop for . Mas olhando para o commitonde esse método foi adicionado, descobriu-se que esse erro veio junto com esse método. Versão corrigida do código:



public static <T> Collection<List<T>> partition(Collection<T> elements, 
                                                int numBuckets) 
{
  Map<Integer, List<T>> buckets = new HashMap<>(numBuckets);
  
  int initialCapacity = elements.size() / numBuckets;

  int index = 0;
  for (T element : elements) 
  {
    int bucket = index % numBuckets; 
    buckets.computeIfAbsent(bucket, 
                            key -> new ArrayList<>(initialCapacity))
           .add(element);
    index++;
  }

  return buckets.values();
}
      
      





Tipo incompatível



V6066 O tipo de objeto passado como argumento é incompatível com o tipo de coleção: String, ListStateDescriptor <NextTransactionalIdHint>. FlinkKafkaProducer.java (1083)



public interface OperatorStateStore 
{
  Set<String> getRegisteredStateNames();
}
public class FlinkKafkaProducer<IN> extends ....
{
  ....
  private static final 
  ListStateDescriptor<FlinkKafkaProducer.NextTransactionalIdHint>
  NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR = ....;

  @Override
  public void initializeState(FunctionInitializationContext context).... 
  {
    ....
    if (context.getOperatorStateStore()
               .getRegisteredStateNames()
               .contains(NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR))    // <=
    {
       migrateNextTransactionalIdHindState(context);
    }
    ....
  }
}
      
      





A expressão apontada pelo analisador sempre será falsa, o que significa que a chamada ao método migrateNextTransactionalIdHindState nunca acontecerá. Como é que alguém está procurando um elemento de um tipo completamente diferente em uma coleção do tipo Set <String> - ListStateDescriptor <FlinkKafkaProducer.NextTransactionalIdHint> ? Sem a ajuda do analisador, tal erro, muito provavelmente, teria vivido no código por muito tempo, uma vez que não atinge os olhos e é simplesmente impossível encontrá-lo sem uma verificação completa deste método.



Mudança de variável não atômica



V6074 Modificação não atômica da variável volátil. Inspecione 'currentNumAcknowledgedSubtasks'. PendingCheckpointStats.java (131)



boolean reportSubtaskStats(JobVertexID jobVertexId, SubtaskStateStats subtask) {
  TaskStateStats taskStateStats = taskStats.get(jobVertexId);

  if (taskStateStats != null && taskStateStats.reportSubtaskStats(subtask)) {
    currentNumAcknowledgedSubtasks++;                // <=
    latestAcknowledgedSubtask = subtask;

    currentStateSize += subtask.getStateSize();      // <=

    long processedData = subtask.getProcessedData();
    if (processedData > 0) {
      currentProcessedData += processedData;         // <=
    }

    long persistedData = subtask.getPersistedData();
    if (persistedData > 0) {
      currentPersistedData += persistedData;         // <=
    }
    return true;
  } else {
    return false;
  }
}
      
      





Mais 3 avisos do analisador no mesmo método:



  • V6074 Modificação não atômica da variável volátil. Inspecione 'currentStateSize'. PendingCheckpointStats.java (134)
  • V6074 Modificação não atômica da variável volátil. Inspecione 'currentProcessedData'. PendingCheckpointStats.java (138)
  • V6074 Modificação não atômica da variável volátil. Inspecione 'currentPersistedData'. PendingCheckpointStats.java (143)


O analisador sugeriu que até 4 campos voláteis no método mudam para não atômicos. E o analisador, como sempre, acaba acertando, porque as operações ++ e + = são , na verdade, uma sequência de várias operações de leitura-modificação-gravação. Como você sabe, o valor volátil de um campo é visível para todos os threads, o que significa que parte das alterações do campo podem ser perdidas devido a uma condição de corrida. Você pode ler informações mais detalhadas sobre isso na descrição do diagnóstico.



Conclusão



Em projetos de Big Data, a confiabilidade é um dos principais requisitos, portanto, a qualidade do código neles deve ser monitorada de perto. Os desenvolvedores do Apache Flink foram auxiliados por várias ferramentas e também escreveram um número significativo de testes. No entanto, mesmo sob tais condições, o analisador PVS-Studio foi capaz de encontrar erros. É impossível eliminar completamente os erros, mas o uso regular de várias ferramentas de análise estática de código permitirá que você se aproxime desse ideal. Sim, exatamente regularmente. Somente com o uso regular a análise estática mostra sua eficácia, que é descrita com mais detalhes neste artigo .





Se você deseja compartilhar este artigo com um público que fala inglês, use o link de tradução: Valery Komarov. Big / Bug Data: Analisando o código-fonte do Apache Flink .



All Articles