Pindah Kafkarian atau cara memindahkan partisi

pengantar

Halo, Habr! Saya bekerja sebagai programmer java di satu organisasi keuangan. Saya memutuskan untuk meninggalkan jejak saya di Habré dan menulis artikel pertama saya. Karena masalah dengan kehadiran devops, saya ditugaskan untuk mengupdate cluster kafka dari 2.0 ke 2.6 tanpa downtime dan hilangnya pesan (Anda tahu, tidak ada yang suka ketika uang tergantung di udara atau hilang di suatu tempat). Saya ingin berbagi pengalaman ini dengan Anda dan mendapatkan umpan balik yang membangun. Jadi, cukup air, mari kita mulai bisnis.





Skema migrasi

Tugas itu diperumit oleh kenyataan bahwa perlu bermigrasi dari mesin virtual lama ke yang baru, jadi opsi untuk mematikan broker, mengubah biner dan memulainya tidak cocok untuk saya.





Di bawah ini adalah diagram migrasi. Kami memiliki 3 broker di VM lama, 3 broker di VM baru, dan setiap broker memiliki penjaga kebun binatang sendiri.





Rencana migrasi

Agar kami dapat bermigrasi tanpa ada yang menyadarinya, kami harus "sedikit berkeringat". Di bawah ini adalah garis besarnya.





  1. Tambahkan alamat broker baru ke pengaturan aplikasi





  2. Pukul akses antara semua orang dan segalanya





  3. Siapkan infrastruktur pada mesin virtual baru





  4. Naikkan kelompok zookieeper baru dan gabungkan dengan yang lama





  5. Angkat pialang kafka baru





  6. Migrasikan semua partisi dari pialang lama ke yang baru





  7. Nonaktifkan broker kafka tua dan zookieeper tua





  8. Hapus pialang dan zukiper lama dari konfigurasi baru









. , , . "bootstrap.servers"





old-server1:9092,old-server2:9092,old-server3:9092,new-server4:9092,new-server5:9092,new-server6:9092







- . , . , .





  1. , 9092 -> ( 3 )





  2. <----> (+18 )





  3. 9092 (+6 )





  4. 2 3 : 2181, 2888, 3888 ( (18+6)*3 = 72)





: 99 . ! .





100 ,









, .





kafka





- kafka. , /etc/security/limits.conf





kafka hard nofile 262144
kafka soft nofile 262144
      
      



, , , .





, 262144, , ( ). 10 , .









java





/home/kafka/.bash_profile





export JAVA_HOME=/opt/java
export PATH=JAVA_HOME/bin:PATH
      
      



jre /opt/java









, , . .





setup.sh
tar -xf ../jdk1.8.0_181.tar.gz -C /opt/
mv /home/kafka/kafka /opt
mv /home/kafka/zookeeper /opt
mv /home/kafka/kafka-start.sh /opt
mv /home/kafka/scripts /opt

ln -sfn /opt/kafka/kafka_2.13-2.6.0 /opt/kafka/current
ln -sfn /opt/zookeeper/zookeeper-3.6.2 /opt/zookeeper/current
ln -sfn /opt/jdk1.8.0_181/ /opt/java

chown -R kafka:kafka /opt
chmod -R 700 /opt

#env_var start ------------------------------->

kafkaProfile=/home/kafka/.bash_profile

homeVar="export JAVA_HOME=/opt/java"
javaHome=$(cat $kafkaProfile | grep "$homeVar")


if [ "$javaHome" != "$homeVar" ]; then
    echo -e "\n$homeVar\n" >> $kafkaProfile
fi


pathVar="export PATH=\$JAVA_HOME/bin:\$PATH"
path=$(cat $kafkaProfile | grep "$pathVar")

if [ "$path" != "$pathVar" ]; then
    echo -e "\n$pathVar\n" >> $kafkaProfile
fi

#env_var end --------------------------------->




#ulimit start >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>

limitsFile=/etc/security/limits.conf

soft="kafka soft nofile 262144"
limitSoft=$(cat $limitsFile | grep "$soft")

if [ "$limitSoft" != "$soft" ]; then
    echo -e "\n$soft\n" >> $limitsFile
fi


hard="kafka hard nofile 262144"
limitHard=$(cat $limitsFile | grep "$hard")

if [ "$limitHard" != "$hard" ]; then
    echo -e "\n$hard\n" >> $limitsFile
fi

#ulimit end >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
      
      







kafka





ulimit -n  = 262144
echo $JAVA_HOME  = /opt/java
echo $PATH    /opt/java/bin
      
      



, , , , myid, id .





/opt/zookeeper/current/conf/zoo.cfg





zoo.cfg
dataDir=/opt/zookeeper/zookeeper-data
server.1=server1:2888:3888
server.2=server2:2888:3888
server.3=server3:2888:3888

server.4=server4:2888:3888
server.5=server5:2888:3888
server.6=server6:2888:3888
      
      







/opt/zookeeper/current/bin/zkServer.sh start





.





zk-add-new-servers.sh
echo -e "\nserver.4=server4:2888:3888" >> /opt/zookeeper/zookeeper-3.4.13/conf/zoo.cfg
echo -e "\nserver.5=server5:2888:3888" >> /opt/zookeeper/zookeeper-3.4.13/conf/zoo.cfg
echo -e "\nserver.6=server6:2888:3888" >> /opt/zookeeper/zookeeper-3.4.13/conf/zoo.cfg
      
      







, . .





/opt/zookeeper/zookeeper-3.4.13/bin/zkServer.sh restart
      
      



, .





.../zkServer.sh status
      
      



.





, server.properties broker.id zookeeper.connect





4





broker.id=4
zookeeper.connect=server4:2181,server5:2181,server6:2181/cluster_name
      
      







, , .





server.properties (2.0.0). .





inter.broker.protocol.version=2.0.0
      
      







nohup /opt/kafka/current/bin/kafka-server-start.sh /opt/kafka/config/server.properties > log.log 2>&1 &
      
      



. cli .





/opt/zookeeper/current/bin/zkCli.sh
      
      







ls /cluster_name/brokers/ids
      
      







[1,2,3,4,5,6]
      
      



6 , .





. - , , , . replication.factor . . .





, "", , .





json , .





{"topics": [{"topic": "foo1"},
              {"topic": "foo2"}],
  "version":1
 }
      
      



,





json kafka-reassign-partitions.sh --generate id --broker-list "4,5,6".









generate1.json
{"version":1,
  "partitions":[{"topic":"foo1","partition":2,"replicas":[1,2,3]},
                {"topic":"foo1","partition":0,"replicas":[3,2,1]},
                {"topic":"foo2","partition":2,"replicas":[1,2,3]},
                {"topic":"foo2","partition":0,"replicas":[3,2,1]},
                {"topic":"foo1","partition":1,"replicas":[2,3,1]},
                {"topic":"foo2","partition":1,"replicas":[2,3,1]}]
  }

  Proposed partition reassignment configuration

  {"version":1,
  "partitions":[{"topic":"foo1","partition":2,"replicas":[5,4,6]},
                {"topic":"foo1","partition":0,"replicas":[4,5,6]},
                {"topic":"foo2","partition":2,"replicas":[6,4,5]},
                {"topic":"foo2","partition":0,"replicas":[4,5,6]},
                {"topic":"foo1","partition":1,"replicas":[5,4,6]},
                {"topic":"foo2","partition":1,"replicas":[4,5,6]}]
  }
      
      







Proposed partition reassignment configuration ( ). - . , json.





. "" , . kafka-reassign-helper.jar .









prepare-for-reassignment.sh
#       .    20
if [ "$#" -eq 0 ]
then
    echo "no arguments"
    exit 1
fi


echo "Start reassignment preparing"

/opt/kafka/current/bin/kafka-topics.sh --list --zookeeper localhost:2181/cluster_name >> topics.txt

echo created topics.txt

java -jar kafka-reassign-helper.jar generate topics.txt $1

fileCount=$(ls -dq generate*.json | wc -l)

echo "created $fileCount file for topics to move"

echo -e "\nCreating generated files\n"

mkdir -p generated
for ((i = 1; i < $fileCount+1; i++ ))
do
/opt/kafka/current/bin/kafka-reassign-partitions.sh --zookeeper localhost:2181/cluster_name --broker-list "4,5,6" --topics-to-move-json-file "generate$i.json" --generate >> "generated/generated$i.txt"
echo "generated/generated$i.txt" created
done

echo -e "\nCreating execute/rollback files"

java -jar kafka-reassign-helper.jar execute $fileCount

echo -e "\nexecute/rollback files created"

echo -e "\nPreparing finished successfully!"
      
      







kafka-reassign-partitions.sh, --execute





move-partitions.sh
#     execute1.json, execute2.json ....
if [ "$#" -eq 0 ]
then
    echo "no arguments"
    exit 1
fi
        

/opt/kafka/current/bin/kafka-reassign-partitions.sh --zookeeper localhost:2181/cluster_name --reassignment-json-file "execute/execute$1.json" --execute
      
      







. , . , , kafka-reassign-partitions.sh, --verify, .





, , .





reassign-verify.sh
progress=-1

while [ $progress != 0 ]
do

    progress=$(/opt/kafka/current/bin/kafka-reassign-partitions.sh --zookeeper localhost:2181/cluster_name --reassignment-json-file execute/execute$1.json --verify | grep "in progress" -c)
    complete=$(/opt/kafka/current/bin/kafka-reassign-partitions.sh --zookeeper localhost:2181/cluster_name --reassignment-json-file execute/execute$1.json --verify | grep "is complete" -c)
    failed=$(/opt/kafka/current/bin/kafka-reassign-partitions.sh --zookeeper localhost:2181/cluster_name --reassignment-json-file execute/execute$1.json --verify | grep "failed" -c)

    echo "In progress:" $progress;
    echo "Is complete:" $complete;
    echo "Failed:" $failed;

    sleep 2s

done
      
      







In progress 0. Is complete - . Failed 0.





move-partitions.sh reassign-verify.sh , .





log.dirs , - .





. kafka-server-stop.sh zkStop.sh .





. zoo.cfg





zk-remove-old-servers.sh
sed -i '/server.1/d' /opt/zookeeper/current/conf/zoo.cfg
sed -i '/server.2/d' /opt/zookeeper/current/conf/zoo.cfg
sed -i '/server.3/d' /opt/zookeeper/current/conf/zoo.cfg
      
      







, , , . : 2 1 .





server.properties





remove-old-protocol.sh
sed -i '/inter.broker.protocol.version=2.0.0/d' /opt/kafka/config/server.properties
      
      







, , insync, min.insync.replicas ( 2), default.replication.factor ( 3)





- 2 , 3, , , , insync .









check-insync.sh
input="check-topics.txt"
rm -f $input

/opt/kafka/current/bin/kafka-topics.sh --list --zookeeper localhost:2181/cluster_name >> check-topics.txt

checkPerIter=100
i=0
list=""
notInsync=0
while IFS= read -r line
do
 ((i=i+1))
 list+="${line}|"
 if [ $i -eq $checkPerIter ]
  then
   list=${list::${#list}-1}
   echo "checking $list"
   count=$(/opt/kafka/current/bin/kafka-topics.sh --describe --topic $list --zookeeper localhost:2181/cluster_name | egrep "Isr: [4-6/,]{3}$" -c)
   if [ "$count" -ne 0 ]
    then
     /opt/kafka/current/bin/kafka-topics.sh --describe --topic $list --zookeeper localhost:2181/cluster_name | egrep "Isr: [4-6/,]{3}$"
   fi
   ((notInsync=notInsync+count))
   list=""
   i=0
 fi
done < "$input"

echo "not insync: $notInsync"
      
      







Jika kita tidak mendapatkan insync: 0 pada output , kita dapat memulai kembali broker satu per satu.





Itu saja. Migrasi broker sekarang telah selesai, kecuali untuk konfigurasi ulang pemantauan dan hal-hal tambahan lainnya.





Seperti inilah instruksinya, yang saya kirim ke admin yang melakukan semuanya dalam pertempuran. Anehnya mereka melakukannya pertama kali dan tidak ada pertanyaan yang diajukan.





README.txt
    2.0.0 -> 2.6.0

1 .  

1.1       

NEW4.tar.gz ->  4 
migration.tar.gz ->  4 

NEW5.tar.gz ->  5 
NEW6.tar.gz ->  6 

1.2  

tar -xf NEW4.tar.gz -C /home/kafka
tar -xf NEW5.tar.gz -C /home/kafka
tar -xf NEW6.tar.gz -C /home/kafka

1.3   root       (    ,   )

/home/kafka/scripts/setup.sh

1.4    kafka (   )

1.5     (   )

ulimit -n  = 262144
echo $JAVA_HOME  = /opt/java
echo $PATH    /opt/java/bin

  /opt   kafka kafka



2 .    

2.1    kafka (   )

2.2       (  )

/opt/scripts/zkStart.sh

2.3       

OLD1.tar.gz ->  1 
OLD2.tar.gz ->  2 
OLD3.tar.gz ->  3 

2.4  

tar -xf OLD1.tar.gz -C /home/kafka
tar -xf OLD2.tar.gz -C /home/kafka
tar -xf OLD3.tar.gz -C /home/kafka

2.5   root       (    )

/home/kafka/scripts/setup-old.sh

2.6    kafka    

2.7       (      )

/home/kafka/scripts/zk-add-new-servers.sh

2.8       (  )

/home/kafka/scripts/zkStatus.sh

      

2.9       
    

/home/kafka/scripts/zkRestart.sh

2.10       
     

/home/kafka/scripts/zkStatus.sh ( )
/opt/scripts/zkStatus.sh ( )

        follower
  leader

2.11   
    
/opt/kafka-start.sh

2.12      
   

/home/kafka/migration/zkCli.sh

ls /cluster_name/brokers/ids

   [1,2,3,4,5,6]

3      

3.1   (    )
/home/kafka/migration/prepare-for-assignment.sh  20

3.2           /home/kafka/migration/execute


:
  execute1.json
/home/kafka/migration/move-partitions.sh 1

      

:
/home/kafka/migration/reassign-verify.sh 1

     "in progress"  0        .3.2   

3.3          ,     /opt/kafka/kafka-data    
     

3.4       ,  
/opt/kafka/current/bin/kafka-server-stop.sh

3.5       ,  

/home/kafka/scripts/zkStop.sh

3.6      (      )
/opt/scripts/zk-remove-old-servers.sh

3.7     

/opt/scripts/zkRestart.sh

3.8       ( , 2 )

/opt/scripts/zkStatus.sh

3.9      (       )
/opt/scripts/remove-old-protocol.sh


3.10       insync

    /home/kafka/migration/check-insync.sh

not insync    0

3.11       

/opt/kafka/current/bin/kafka-server-stop.sh

    (ps aux | grep kafka    )

 /opt/kafka-start.sh

  3.10  

 !
      
      







Semoga membantu seseorang dalam masalah ini. Saya menunggu komentar dan komentar Anda.





Semua skrip dan instruksi, termasuk yang untuk rollback, dapat ditemukan di sini








All Articles