引言
Kafka是由LinkedIn开发的一个分布式的消息系统,它以可水平扩展和高吞吐率而被广泛使用,现在已经是Apache的项目。
Kafka系统自带了丰富的运维管理工具,都是基于命令行的,本文主要介绍一些常用的命令。
读者需要对Kafka已经有入门级的了解。
常用命令
以下命令都是在Kafka的主目录下执行的。
启动Kafka
启动命令需要指定配置文件
bin/kafka-server-start.sh config/server.properties复制代码
默认的启动方式并不是守护进程,可以添加'nohup'和'&'让进程保持在后台运行,即使断开SSH终端连接。
nohup bin/kafka-server-start.sh config/server.properties > ~/kafka-server-start.out 2>&1 &复制代码
topic 相关
列出 topic
bin/kafka-topics.sh --list --zookeeper localhost:2181复制代码
--zookeeper:指定zookeeper地址。
创建 topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 4 --topic test_topic复制代码
--replication-factor:备份数,就是数据保存几份,这里设置为3,表示该topic的数据会在3个节点上各保存一份。
--partitions:分区数
topic 详情
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test_topic复制代码
删除 topic
bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test_topic复制代码
需要开启一个配置项才可以删除topic。
配置项在server.properties中: delete.topic.enable = true
ps: 有个特殊的topic叫 __consumer_offsets,是Kafka内部使用的,不允许删除的。
生产者消费者相关
生产者
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test_topic复制代码
--broker-list:kafka broker地址
消费者
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test_topic复制代码
分区相关
增加分区
bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic test_topic --partitions 4复制代码
--partitions:指明将分区增加至多少个。 分区不能减少只能增加。
改变分区分布和副本数
bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file test_topic.json --execute复制代码
--reassignment-json-file:指定一个分区方案文件。
分区方案文件是一个json格式的文本文件,需要自己编写。
举个例子,test_topic.json内容如下:
{ "version": 1, "partitions": [ { "topic": "test_topic", "partition": 0, "replicas": [1,2] }, { "topic": "test_topic", "partition": 1, "replicas": [2,3] }, { "topic": "test_topic", "partition": 2, "replicas": [3,1] }, { "topic": "test_topic", "partition": 3, "replicas": [1,2] } ]}复制代码
解释其中一部分,剩下的自然也会明白。
"topic": "test_topic", //topic名称"partition": 0, //分区编号,从0开始"replicas": [1,2] //指定保存在哪个broker,这里是填写broker id。写两个id,意思就是有两份备份啦。复制代码
PS:这个方法不能用来增加分区数。第一个replicas就是默认leader。
改变分区和备份数的操作并不能立即完成,而是需要一段时间,内部操作会在后台运行。 所以需要...
检查进度
bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file test_topic.json --verify复制代码
以上。