Introdução
Olá, Habr! Eu trabalho como programador java em uma organização financeira. Decidi deixar minha marca no Habré e escrever meu primeiro artigo. Devido a problemas com a presença de devops, fui incumbido de atualizar o cluster kafka de 2.0 para 2.6 sem tempo de inatividade e perda de mensagens (você sabe, ninguém gosta quando o dinheiro está no ar ou se perde em algum lugar). Gostaria de compartilhar essa experiência com você e obter um feedback construtivo. Portanto, água suficiente, vamos ao que interessa.
Esquema de migração
A tarefa foi complicada pelo fato de que era necessário migrar de máquinas virtuais antigas para novas, então a opção de desligar o broker, alterar os binários e iniciá-lo não me convinha.
Abaixo está um diagrama da migração. Temos 3 corretores em VMs antigas, 3 corretores em novas VMs e cada corretor tem seu próprio zookeeper.
Plano de migração
Para que possamos migrar sem que ninguém perceba, teremos que suar "um pouco". Abaixo está um esboço geral.
Adicionar endereços de novos corretores às configurações do aplicativo
Perfure o acesso entre tudo e todos
Prepare a infraestrutura em novas máquinas virtuais
Crie um novo grupo de zookieepers e mescle-o com o antigo
Levante novos corretores kafka
Migrar todas as partições de corretores antigos para novos
Desativar antigos corretores kafka e antigos zookieepers
Remova corretores e zukipers antigos de novas configurações
. , , . "bootstrap.servers"
old-server1:9092,old-server2:9092,old-server3:9092,new-server4:9092,new-server5:9092,new-server6:9092
- . , . , .
, 9092 -> ( 3 )
<----> (+18 )
9092 (+6 )
2 3 : 2181, 2888, 3888 ( (18+6)*3 = 72)
: 99 . ! .
100 ,
, .
kafka
- kafka. , /etc/security/limits.conf
kafka hard nofile 262144 kafka soft nofile 262144
, , , .
, 262144, , ( ). 10 , .
java
/home/kafka/.bash_profile
export JAVA_HOME=/opt/java
export PATH=JAVA_HOME/bin:PATH
jre /opt/java
, , . .
setup.sh
tar -xf ../jdk1.8.0_181.tar.gz -C /opt/
mv /home/kafka/kafka /opt
mv /home/kafka/zookeeper /opt
mv /home/kafka/kafka-start.sh /opt
mv /home/kafka/scripts /opt
ln -sfn /opt/kafka/kafka_2.13-2.6.0 /opt/kafka/current
ln -sfn /opt/zookeeper/zookeeper-3.6.2 /opt/zookeeper/current
ln -sfn /opt/jdk1.8.0_181/ /opt/java
chown -R kafka:kafka /opt
chmod -R 700 /opt
#env_var start ------------------------------->
kafkaProfile=/home/kafka/.bash_profile
homeVar="export JAVA_HOME=/opt/java"
javaHome=$(cat $kafkaProfile | grep "$homeVar")
if [ "$javaHome" != "$homeVar" ]; then
echo -e "\n$homeVar\n" >> $kafkaProfile
fi
pathVar="export PATH=\$JAVA_HOME/bin:\$PATH"
path=$(cat $kafkaProfile | grep "$pathVar")
if [ "$path" != "$pathVar" ]; then
echo -e "\n$pathVar\n" >> $kafkaProfile
fi
#env_var end --------------------------------->
#ulimit start >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
limitsFile=/etc/security/limits.conf
soft="kafka soft nofile 262144"
limitSoft=$(cat $limitsFile | grep "$soft")
if [ "$limitSoft" != "$soft" ]; then
echo -e "\n$soft\n" >> $limitsFile
fi
hard="kafka hard nofile 262144"
limitHard=$(cat $limitsFile | grep "$hard")
if [ "$limitHard" != "$hard" ]; then
echo -e "\n$hard\n" >> $limitsFile
fi
#ulimit end >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
kafka
ulimit -n = 262144
echo $JAVA_HOME = /opt/java
echo $PATH /opt/java/bin
, , , , myid, id .
/opt/zookeeper/current/conf/zoo.cfg
zoo.cfg
dataDir=/opt/zookeeper/zookeeper-data server.1=server1:2888:3888 server.2=server2:2888:3888 server.3=server3:2888:3888 server.4=server4:2888:3888 server.5=server5:2888:3888 server.6=server6:2888:3888
/opt/zookeeper/current/bin/zkServer.sh start
.
zk-add-new-servers.sh
echo -e "\nserver.4=server4:2888:3888" >> /opt/zookeeper/zookeeper-3.4.13/conf/zoo.cfg
echo -e "\nserver.5=server5:2888:3888" >> /opt/zookeeper/zookeeper-3.4.13/conf/zoo.cfg
echo -e "\nserver.6=server6:2888:3888" >> /opt/zookeeper/zookeeper-3.4.13/conf/zoo.cfg
, . .
/opt/zookeeper/zookeeper-3.4.13/bin/zkServer.sh restart
, .
.../zkServer.sh status
.
, server.properties broker.id zookeeper.connect
4
broker.id=4 zookeeper.connect=server4:2181,server5:2181,server6:2181/cluster_name
, , .
server.properties (2.0.0). .
inter.broker.protocol.version=2.0.0
nohup /opt/kafka/current/bin/kafka-server-start.sh /opt/kafka/config/server.properties > log.log 2>&1 &
. cli .
/opt/zookeeper/current/bin/zkCli.sh
ls /cluster_name/brokers/ids
[1,2,3,4,5,6]
6 , .
. - , , , . replication.factor . . .
json , .
{"topics": [{"topic": "foo1"},
{"topic": "foo2"}],
"version":1
}
,
json kafka-reassign-partitions.sh --generate id --broker-list "4,5,6".
generate1.json
{"version":1,
"partitions":[{"topic":"foo1","partition":2,"replicas":[1,2,3]},
{"topic":"foo1","partition":0,"replicas":[3,2,1]},
{"topic":"foo2","partition":2,"replicas":[1,2,3]},
{"topic":"foo2","partition":0,"replicas":[3,2,1]},
{"topic":"foo1","partition":1,"replicas":[2,3,1]},
{"topic":"foo2","partition":1,"replicas":[2,3,1]}]
}
Proposed partition reassignment configuration
{"version":1,
"partitions":[{"topic":"foo1","partition":2,"replicas":[5,4,6]},
{"topic":"foo1","partition":0,"replicas":[4,5,6]},
{"topic":"foo2","partition":2,"replicas":[6,4,5]},
{"topic":"foo2","partition":0,"replicas":[4,5,6]},
{"topic":"foo1","partition":1,"replicas":[5,4,6]},
{"topic":"foo2","partition":1,"replicas":[4,5,6]}]
}
Proposed partition reassignment configuration ( ). - . , json.
. "" , . kafka-reassign-helper.jar .
prepare-for-reassignment.sh
# . 20
if [ "$#" -eq 0 ]
then
echo "no arguments"
exit 1
fi
echo "Start reassignment preparing"
/opt/kafka/current/bin/kafka-topics.sh --list --zookeeper localhost:2181/cluster_name >> topics.txt
echo created topics.txt
java -jar kafka-reassign-helper.jar generate topics.txt $1
fileCount=$(ls -dq generate*.json | wc -l)
echo "created $fileCount file for topics to move"
echo -e "\nCreating generated files\n"
mkdir -p generated
for ((i = 1; i < $fileCount+1; i++ ))
do
/opt/kafka/current/bin/kafka-reassign-partitions.sh --zookeeper localhost:2181/cluster_name --broker-list "4,5,6" --topics-to-move-json-file "generate$i.json" --generate >> "generated/generated$i.txt"
echo "generated/generated$i.txt" created
done
echo -e "\nCreating execute/rollback files"
java -jar kafka-reassign-helper.jar execute $fileCount
echo -e "\nexecute/rollback files created"
echo -e "\nPreparing finished successfully!"
kafka-reassign-partitions.sh, --execute
move-partitions.sh
# execute1.json, execute2.json ....
if [ "$#" -eq 0 ]
then
echo "no arguments"
exit 1
fi
/opt/kafka/current/bin/kafka-reassign-partitions.sh --zookeeper localhost:2181/cluster_name --reassignment-json-file "execute/execute$1.json" --execute
. , . , , kafka-reassign-partitions.sh, --verify, .
, , .
reassign-verify.sh
progress=-1
while [ $progress != 0 ]
do
progress=$(/opt/kafka/current/bin/kafka-reassign-partitions.sh --zookeeper localhost:2181/cluster_name --reassignment-json-file execute/execute$1.json --verify | grep "in progress" -c)
complete=$(/opt/kafka/current/bin/kafka-reassign-partitions.sh --zookeeper localhost:2181/cluster_name --reassignment-json-file execute/execute$1.json --verify | grep "is complete" -c)
failed=$(/opt/kafka/current/bin/kafka-reassign-partitions.sh --zookeeper localhost:2181/cluster_name --reassignment-json-file execute/execute$1.json --verify | grep "failed" -c)
echo "In progress:" $progress;
echo "Is complete:" $complete;
echo "Failed:" $failed;
sleep 2s
done
In progress 0. Is complete - . Failed 0.
move-partitions.sh reassign-verify.sh , .
log.dirs , - .
. kafka-server-stop.sh zkStop.sh .
. zoo.cfg
zk-remove-old-servers.sh
sed -i '/server.1/d' /opt/zookeeper/current/conf/zoo.cfg
sed -i '/server.2/d' /opt/zookeeper/current/conf/zoo.cfg
sed -i '/server.3/d' /opt/zookeeper/current/conf/zoo.cfg
, , , . : 2 1 .
server.properties
remove-old-protocol.sh
sed -i '/inter.broker.protocol.version=2.0.0/d' /opt/kafka/config/server.properties
, , insync, min.insync.replicas ( 2), default.replication.factor ( 3)
- 2 , 3, , , , insync .
check-insync.sh
input="check-topics.txt"
rm -f $input
/opt/kafka/current/bin/kafka-topics.sh --list --zookeeper localhost:2181/cluster_name >> check-topics.txt
checkPerIter=100
i=0
list=""
notInsync=0
while IFS= read -r line
do
((i=i+1))
list+="${line}|"
if [ $i -eq $checkPerIter ]
then
list=${list::${#list}-1}
echo "checking $list"
count=$(/opt/kafka/current/bin/kafka-topics.sh --describe --topic $list --zookeeper localhost:2181/cluster_name | egrep "Isr: [4-6/,]{3}$" -c)
if [ "$count" -ne 0 ]
then
/opt/kafka/current/bin/kafka-topics.sh --describe --topic $list --zookeeper localhost:2181/cluster_name | egrep "Isr: [4-6/,]{3}$"
fi
((notInsync=notInsync+count))
list=""
i=0
fi
done < "$input"
echo "not insync: $notInsync"
Se não obtivermos insync: 0 na saída , podemos reiniciar os corretores um por um.
Isso é tudo. A migração de corretores agora está concluída, exceto para a reconfiguração subsequente do monitoramento e outros assuntos auxiliares.
É assim que a instrução se parece, que enviei para os administradores que fizeram tudo na batalha. Surpreendentemente, eles fizeram isso da primeira vez e sem perguntas.
README.txt
2.0.0 -> 2.6.0
1 .
1.1
NEW4.tar.gz -> 4
migration.tar.gz -> 4
NEW5.tar.gz -> 5
NEW6.tar.gz -> 6
1.2
tar -xf NEW4.tar.gz -C /home/kafka
tar -xf NEW5.tar.gz -C /home/kafka
tar -xf NEW6.tar.gz -C /home/kafka
1.3 root ( , )
/home/kafka/scripts/setup.sh
1.4 kafka ( )
1.5 ( )
ulimit -n = 262144
echo $JAVA_HOME = /opt/java
echo $PATH /opt/java/bin
/opt kafka kafka
2 .
2.1 kafka ( )
2.2 ( )
/opt/scripts/zkStart.sh
2.3
OLD1.tar.gz -> 1
OLD2.tar.gz -> 2
OLD3.tar.gz -> 3
2.4
tar -xf OLD1.tar.gz -C /home/kafka
tar -xf OLD2.tar.gz -C /home/kafka
tar -xf OLD3.tar.gz -C /home/kafka
2.5 root ( )
/home/kafka/scripts/setup-old.sh
2.6 kafka
2.7 ( )
/home/kafka/scripts/zk-add-new-servers.sh
2.8 ( )
/home/kafka/scripts/zkStatus.sh
2.9
/home/kafka/scripts/zkRestart.sh
2.10
/home/kafka/scripts/zkStatus.sh ( )
/opt/scripts/zkStatus.sh ( )
follower
leader
2.11
/opt/kafka-start.sh
2.12
/home/kafka/migration/zkCli.sh
ls /cluster_name/brokers/ids
[1,2,3,4,5,6]
3
3.1 ( )
/home/kafka/migration/prepare-for-assignment.sh 20
3.2 /home/kafka/migration/execute
:
execute1.json
/home/kafka/migration/move-partitions.sh 1
:
/home/kafka/migration/reassign-verify.sh 1
"in progress" 0 .3.2
3.3 , /opt/kafka/kafka-data
3.4 ,
/opt/kafka/current/bin/kafka-server-stop.sh
3.5 ,
/home/kafka/scripts/zkStop.sh
3.6 ( )
/opt/scripts/zk-remove-old-servers.sh
3.7
/opt/scripts/zkRestart.sh
3.8 ( , 2 )
/opt/scripts/zkStatus.sh
3.9 ( )
/opt/scripts/remove-old-protocol.sh
3.10 insync
/home/kafka/migration/check-insync.sh
not insync 0
3.11
/opt/kafka/current/bin/kafka-server-stop.sh
(ps aux | grep kafka )
/opt/kafka-start.sh
3.10
!
Espero que tenha ajudado alguém neste assunto. Estou esperando por seus comentários e observações.
Todos os scripts e instruções, incluindo aqueles para rollbacks, podem ser encontrados aqui