Kafkarian mover ou como mover partições

Introdução

Olá, Habr! Eu trabalho como programador java em uma organização financeira. Decidi deixar minha marca no Habré e escrever meu primeiro artigo. Devido a problemas com a presença de devops, fui incumbido de atualizar o cluster kafka de 2.0 para 2.6 sem tempo de inatividade e perda de mensagens (você sabe, ninguém gosta quando o dinheiro está no ar ou se perde em algum lugar). Gostaria de compartilhar essa experiência com você e obter um feedback construtivo. Portanto, água suficiente, vamos ao que interessa.





Esquema de migração

A tarefa foi complicada pelo fato de que era necessário migrar de máquinas virtuais antigas para novas, então a opção de desligar o broker, alterar os binários e iniciá-lo não me convinha.





Abaixo está um diagrama da migração. Temos 3 corretores em VMs antigas, 3 corretores em novas VMs e cada corretor tem seu próprio zookeeper.





Plano de migração

Para que possamos migrar sem que ninguém perceba, teremos que suar "um pouco". Abaixo está um esboço geral.





  1. Adicionar endereços de novos corretores às configurações do aplicativo





  2. Perfure o acesso entre tudo e todos





  3. Prepare a infraestrutura em novas máquinas virtuais





  4. Crie um novo grupo de zookieepers e mescle-o com o antigo





  5. Levante novos corretores kafka





  6. Migrar todas as partições de corretores antigos para novos





  7. Desativar antigos corretores kafka e antigos zookieepers





  8. Remova corretores e zukipers antigos de novas configurações









. , , . "bootstrap.servers"





old-server1:9092,old-server2:9092,old-server3:9092,new-server4:9092,new-server5:9092,new-server6:9092







- . , . , .





  1. , 9092 -> ( 3 )





  2. <----> (+18 )





  3. 9092 (+6 )





  4. 2 3 : 2181, 2888, 3888 ( (18+6)*3 = 72)





: 99 . ! .





100 ,









, .





kafka





- kafka. , /etc/security/limits.conf





kafka hard nofile 262144
kafka soft nofile 262144
      
      



, , , .





, 262144, , ( ). 10 , .









java





/home/kafka/.bash_profile





export JAVA_HOME=/opt/java
export PATH=JAVA_HOME/bin:PATH
      
      



jre /opt/java









, , . .





setup.sh
tar -xf ../jdk1.8.0_181.tar.gz -C /opt/
mv /home/kafka/kafka /opt
mv /home/kafka/zookeeper /opt
mv /home/kafka/kafka-start.sh /opt
mv /home/kafka/scripts /opt

ln -sfn /opt/kafka/kafka_2.13-2.6.0 /opt/kafka/current
ln -sfn /opt/zookeeper/zookeeper-3.6.2 /opt/zookeeper/current
ln -sfn /opt/jdk1.8.0_181/ /opt/java

chown -R kafka:kafka /opt
chmod -R 700 /opt

#env_var start ------------------------------->

kafkaProfile=/home/kafka/.bash_profile

homeVar="export JAVA_HOME=/opt/java"
javaHome=$(cat $kafkaProfile | grep "$homeVar")


if [ "$javaHome" != "$homeVar" ]; then
    echo -e "\n$homeVar\n" >> $kafkaProfile
fi


pathVar="export PATH=\$JAVA_HOME/bin:\$PATH"
path=$(cat $kafkaProfile | grep "$pathVar")

if [ "$path" != "$pathVar" ]; then
    echo -e "\n$pathVar\n" >> $kafkaProfile
fi

#env_var end --------------------------------->




#ulimit start >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>

limitsFile=/etc/security/limits.conf

soft="kafka soft nofile 262144"
limitSoft=$(cat $limitsFile | grep "$soft")

if [ "$limitSoft" != "$soft" ]; then
    echo -e "\n$soft\n" >> $limitsFile
fi


hard="kafka hard nofile 262144"
limitHard=$(cat $limitsFile | grep "$hard")

if [ "$limitHard" != "$hard" ]; then
    echo -e "\n$hard\n" >> $limitsFile
fi

#ulimit end >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
      
      







kafka





ulimit -n  = 262144
echo $JAVA_HOME  = /opt/java
echo $PATH    /opt/java/bin
      
      



, , , , myid, id .





/opt/zookeeper/current/conf/zoo.cfg





zoo.cfg
dataDir=/opt/zookeeper/zookeeper-data
server.1=server1:2888:3888
server.2=server2:2888:3888
server.3=server3:2888:3888

server.4=server4:2888:3888
server.5=server5:2888:3888
server.6=server6:2888:3888
      
      







/opt/zookeeper/current/bin/zkServer.sh start





.





zk-add-new-servers.sh
echo -e "\nserver.4=server4:2888:3888" >> /opt/zookeeper/zookeeper-3.4.13/conf/zoo.cfg
echo -e "\nserver.5=server5:2888:3888" >> /opt/zookeeper/zookeeper-3.4.13/conf/zoo.cfg
echo -e "\nserver.6=server6:2888:3888" >> /opt/zookeeper/zookeeper-3.4.13/conf/zoo.cfg
      
      







, . .





/opt/zookeeper/zookeeper-3.4.13/bin/zkServer.sh restart
      
      



, .





.../zkServer.sh status
      
      



.





, server.properties broker.id zookeeper.connect





4





broker.id=4
zookeeper.connect=server4:2181,server5:2181,server6:2181/cluster_name
      
      







, , .





server.properties (2.0.0). .





inter.broker.protocol.version=2.0.0
      
      







nohup /opt/kafka/current/bin/kafka-server-start.sh /opt/kafka/config/server.properties > log.log 2>&1 &
      
      



. cli .





/opt/zookeeper/current/bin/zkCli.sh
      
      







ls /cluster_name/brokers/ids
      
      







[1,2,3,4,5,6]
      
      



6 , .





. - , , , . replication.factor . . .





, "", , .





json , .





{"topics": [{"topic": "foo1"},
              {"topic": "foo2"}],
  "version":1
 }
      
      



,





json kafka-reassign-partitions.sh --generate id --broker-list "4,5,6".









generate1.json
{"version":1,
  "partitions":[{"topic":"foo1","partition":2,"replicas":[1,2,3]},
                {"topic":"foo1","partition":0,"replicas":[3,2,1]},
                {"topic":"foo2","partition":2,"replicas":[1,2,3]},
                {"topic":"foo2","partition":0,"replicas":[3,2,1]},
                {"topic":"foo1","partition":1,"replicas":[2,3,1]},
                {"topic":"foo2","partition":1,"replicas":[2,3,1]}]
  }

  Proposed partition reassignment configuration

  {"version":1,
  "partitions":[{"topic":"foo1","partition":2,"replicas":[5,4,6]},
                {"topic":"foo1","partition":0,"replicas":[4,5,6]},
                {"topic":"foo2","partition":2,"replicas":[6,4,5]},
                {"topic":"foo2","partition":0,"replicas":[4,5,6]},
                {"topic":"foo1","partition":1,"replicas":[5,4,6]},
                {"topic":"foo2","partition":1,"replicas":[4,5,6]}]
  }
      
      







Proposed partition reassignment configuration ( ). - . , json.





. "" , . kafka-reassign-helper.jar .









prepare-for-reassignment.sh
#       .    20
if [ "$#" -eq 0 ]
then
    echo "no arguments"
    exit 1
fi


echo "Start reassignment preparing"

/opt/kafka/current/bin/kafka-topics.sh --list --zookeeper localhost:2181/cluster_name >> topics.txt

echo created topics.txt

java -jar kafka-reassign-helper.jar generate topics.txt $1

fileCount=$(ls -dq generate*.json | wc -l)

echo "created $fileCount file for topics to move"

echo -e "\nCreating generated files\n"

mkdir -p generated
for ((i = 1; i < $fileCount+1; i++ ))
do
/opt/kafka/current/bin/kafka-reassign-partitions.sh --zookeeper localhost:2181/cluster_name --broker-list "4,5,6" --topics-to-move-json-file "generate$i.json" --generate >> "generated/generated$i.txt"
echo "generated/generated$i.txt" created
done

echo -e "\nCreating execute/rollback files"

java -jar kafka-reassign-helper.jar execute $fileCount

echo -e "\nexecute/rollback files created"

echo -e "\nPreparing finished successfully!"
      
      







kafka-reassign-partitions.sh, --execute





move-partitions.sh
#     execute1.json, execute2.json ....
if [ "$#" -eq 0 ]
then
    echo "no arguments"
    exit 1
fi
        

/opt/kafka/current/bin/kafka-reassign-partitions.sh --zookeeper localhost:2181/cluster_name --reassignment-json-file "execute/execute$1.json" --execute
      
      







. , . , , kafka-reassign-partitions.sh, --verify, .





, , .





reassign-verify.sh
progress=-1

while [ $progress != 0 ]
do

    progress=$(/opt/kafka/current/bin/kafka-reassign-partitions.sh --zookeeper localhost:2181/cluster_name --reassignment-json-file execute/execute$1.json --verify | grep "in progress" -c)
    complete=$(/opt/kafka/current/bin/kafka-reassign-partitions.sh --zookeeper localhost:2181/cluster_name --reassignment-json-file execute/execute$1.json --verify | grep "is complete" -c)
    failed=$(/opt/kafka/current/bin/kafka-reassign-partitions.sh --zookeeper localhost:2181/cluster_name --reassignment-json-file execute/execute$1.json --verify | grep "failed" -c)

    echo "In progress:" $progress;
    echo "Is complete:" $complete;
    echo "Failed:" $failed;

    sleep 2s

done
      
      







In progress 0. Is complete - . Failed 0.





move-partitions.sh reassign-verify.sh , .





log.dirs , - .





. kafka-server-stop.sh zkStop.sh .





. zoo.cfg





zk-remove-old-servers.sh
sed -i '/server.1/d' /opt/zookeeper/current/conf/zoo.cfg
sed -i '/server.2/d' /opt/zookeeper/current/conf/zoo.cfg
sed -i '/server.3/d' /opt/zookeeper/current/conf/zoo.cfg
      
      







, , , . : 2 1 .





server.properties





remove-old-protocol.sh
sed -i '/inter.broker.protocol.version=2.0.0/d' /opt/kafka/config/server.properties
      
      







, , insync, min.insync.replicas ( 2), default.replication.factor ( 3)





- 2 , 3, , , , insync .









check-insync.sh
input="check-topics.txt"
rm -f $input

/opt/kafka/current/bin/kafka-topics.sh --list --zookeeper localhost:2181/cluster_name >> check-topics.txt

checkPerIter=100
i=0
list=""
notInsync=0
while IFS= read -r line
do
 ((i=i+1))
 list+="${line}|"
 if [ $i -eq $checkPerIter ]
  then
   list=${list::${#list}-1}
   echo "checking $list"
   count=$(/opt/kafka/current/bin/kafka-topics.sh --describe --topic $list --zookeeper localhost:2181/cluster_name | egrep "Isr: [4-6/,]{3}$" -c)
   if [ "$count" -ne 0 ]
    then
     /opt/kafka/current/bin/kafka-topics.sh --describe --topic $list --zookeeper localhost:2181/cluster_name | egrep "Isr: [4-6/,]{3}$"
   fi
   ((notInsync=notInsync+count))
   list=""
   i=0
 fi
done < "$input"

echo "not insync: $notInsync"
      
      







Se não obtivermos insync: 0 na saída , podemos reiniciar os corretores um por um.





Isso é tudo. A migração de corretores agora está concluída, exceto para a reconfiguração subsequente do monitoramento e outros assuntos auxiliares.





É assim que a instrução se parece, que enviei para os administradores que fizeram tudo na batalha. Surpreendentemente, eles fizeram isso da primeira vez e sem perguntas.





README.txt
    2.0.0 -> 2.6.0

1 .  

1.1       

NEW4.tar.gz ->  4 
migration.tar.gz ->  4 

NEW5.tar.gz ->  5 
NEW6.tar.gz ->  6 

1.2  

tar -xf NEW4.tar.gz -C /home/kafka
tar -xf NEW5.tar.gz -C /home/kafka
tar -xf NEW6.tar.gz -C /home/kafka

1.3   root       (    ,   )

/home/kafka/scripts/setup.sh

1.4    kafka (   )

1.5     (   )

ulimit -n  = 262144
echo $JAVA_HOME  = /opt/java
echo $PATH    /opt/java/bin

  /opt   kafka kafka



2 .    

2.1    kafka (   )

2.2       (  )

/opt/scripts/zkStart.sh

2.3       

OLD1.tar.gz ->  1 
OLD2.tar.gz ->  2 
OLD3.tar.gz ->  3 

2.4  

tar -xf OLD1.tar.gz -C /home/kafka
tar -xf OLD2.tar.gz -C /home/kafka
tar -xf OLD3.tar.gz -C /home/kafka

2.5   root       (    )

/home/kafka/scripts/setup-old.sh

2.6    kafka    

2.7       (      )

/home/kafka/scripts/zk-add-new-servers.sh

2.8       (  )

/home/kafka/scripts/zkStatus.sh

      

2.9       
    

/home/kafka/scripts/zkRestart.sh

2.10       
     

/home/kafka/scripts/zkStatus.sh ( )
/opt/scripts/zkStatus.sh ( )

        follower
  leader

2.11   
    
/opt/kafka-start.sh

2.12      
   

/home/kafka/migration/zkCli.sh

ls /cluster_name/brokers/ids

   [1,2,3,4,5,6]

3      

3.1   (    )
/home/kafka/migration/prepare-for-assignment.sh  20

3.2           /home/kafka/migration/execute


:
  execute1.json
/home/kafka/migration/move-partitions.sh 1

      

:
/home/kafka/migration/reassign-verify.sh 1

     "in progress"  0        .3.2   

3.3          ,     /opt/kafka/kafka-data    
     

3.4       ,  
/opt/kafka/current/bin/kafka-server-stop.sh

3.5       ,  

/home/kafka/scripts/zkStop.sh

3.6      (      )
/opt/scripts/zk-remove-old-servers.sh

3.7     

/opt/scripts/zkRestart.sh

3.8       ( , 2 )

/opt/scripts/zkStatus.sh

3.9      (       )
/opt/scripts/remove-old-protocol.sh


3.10       insync

    /home/kafka/migration/check-insync.sh

not insync    0

3.11       

/opt/kafka/current/bin/kafka-server-stop.sh

    (ps aux | grep kafka    )

 /opt/kafka-start.sh

  3.10  

 !
      
      







Espero que tenha ajudado alguém neste assunto. Estou esperando por seus comentários e observações.





Todos os scripts e instruções, incluindo aqueles para rollbacks, podem ser encontrados aqui








All Articles