Thursday, May 18, 2017

Kafka commands

Create Topic
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --zookeeper awszk001 --create --replication-factor 3 --partitions 12 --topic MyKTopic --config retention.bytes=549755813888

add ACL
/usr/hdp/current/kafka-broker/bin/kafka-acls.sh --topic MyKTopic --add allowhosts * --allowprincipals ""user:stormusr"" --operations DESCRIBE,READ --config /usr/hdp/current/kafka-broker/config/server.properties

Check ACL
/usr/hdp/current/kafka-broker/bin/kafka-acls.sh --topic MyKTopic --list --config /usr/hdp/current/kafka-broker/config/server.properties

Check topic allocation, who is leader, where is the replication
export KAFKA_HEAP_OPTS=" "
date; /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --zookeeper awszk001 --topic MyKTopic --describe

for i in {1..6}; do echo awskfk00$i; ssh awskfk00$i "grep -i ERROR /var/log/kafka/server.log| grep '2016-01-11 16:05' "; done

date;du -ms /data/disk00*/kafka-logs/MyKTopic-*

Verify the index only, not the logs, not printout its content, 52428800 = 52MB, 10485760 = 10MB
export KAFKA_HEAP_OPTS=" "
/var/tmp/kafka_2.10-0.8.3-SNAPSHOT/bin/kafka-run-class.sh kafka.tools.DumpLogSegments --verify-index-only --max-message-size 52428800 --max-index-size 10485760 --files /data/disk002/kafka-logs/MyKTopic-3/00000000000004354257.index
For the last index file in kafka, an error is expected.
--print-data-log if set, printing the messages content when dumping data logs
/var/tmp/kafka_2.10-0.8.3-SNAPSHOT/bin/kafka-run-class.sh kafka.tools.DumpLogSegments --deep-iteration -max-message-size 52428800 --max-index-size 10485760 --files 000000000000003873479.index --print-data-log &>/var/tmp/3873479.index


Sanity check, not available on normal setup.
export KAFKA_HEAP_OPTS=" "
/var/tmp/kafka_2.10-0.8.3-SNAPSHOT/bin/kafka-run-class.sh kafka.tools.DumpLogSegments --verify-index-sanity --max-message-size 52428800 --max-index-size 10485760 --files /data/disk002/kafka-logs/MyKTopic-3/00000000000004354257.index

Fix/recovery index file, 52428800 = 52MB, 10485760 = 10MB
export KAFKA_HEAP_OPTS=" "
kafka-run-class.sh kafka.tools.DumpLogSegments --recover-index --max-message-size 52428800 --max-index-size 10485760 --files 00000000000000214207.index

/var/tmp/kafka_2.10-0.8.3-SNAPSHOT/bin/kafka-run-class.sh kafka.tools.DumpLogSegments --deep-iteration -max-message-size 52428800 --max-index-size 10485760 --files 000000000000003873479.index &>/var/tmp/3873479.index


Dump out consummated messages to the console using the simple consumer. 134217728 = 135MB
export KAFKA_HEAP_OPTS=" "
/usr/hdp/current/kafka-broker/bin/kafka-simple-consumer-shell.sh --broker-list p rid-awskfk005:6667 --partition 8 --topic MyKTopic --offset 3751531 --fetchsize 134217728 --security-protocol PLAINTEXTSASL -no-wait-at-logend

/usr/hdp/current/kafka-broker/bin/kafka-simple-consumer-shell.sh --broker-list p rid-awskfk005:6667 --partition 8 --topic MyKTopic --offset 3751531 --fetchsize 134217728 --security-protocol PLAINTEXTSASL --max-messages 1 --no-wait-at-logend --replica 3

/usr/hdp/current/kafka-broker/bin/kafka-simple-consumer-shell.sh --broker-list p rid-awskfk005:6667 --partition 8 --topic MyKTopic --offset 3751531 --fetchsize 134217728 --security-protocol PLAINTEXTSASL --print-offsets | grep 'next offset'

Check offset, -time -1 latest, -2 earlist.
export KAFKA_HEAP_OPTS=" "
/usr/hdp/current/kafka-broker/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list awskfk001:6667 --partition 0 --topic MyKTopic --time -1 --security-protocol=PLAINTEXTSASL

Get storm offset
/usr/hdp/current/zookeeper-client/bin/zkCli.sh -server awszk001:2181 get /kafkastorm/StormSpoutPrimary/partition_0


Create a for loop
export KAFKA_HEAP_OPTS=" "
for f in /data/disk*/kafka-logs/MyKTopic-*/*.index; do /var/tmp/kafka_2.10-0.8.3-SNAPSHOT/bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files $f --verify-index-sanity --max-message-size 52428800 --max-index-size 10485760; done

Check the brokerid on the partition
$ cat /data/disk001/kafka-logs/meta.properties
#
#Mon Nov 16 18:02:18 EST 2015
version=0
broker.id=5

./run_kafka.ksh /data/disk003/kafka-logs/MyKTopic-0/00000000000000000000.log | head