linux 环境搭建kafka需要配置linux 删除环境变量量吗

一、环境操作系统和软件版本介绍
1.环境操作系统为CentOS Linux release 7.2.1511 (Core)
可用cat /etc/redhat-release查询
2.软件版本
Kafka版本为:0.10.0.0
二、软件的基础准备
由于Kafka集群需要依赖ZooKeeper集群来协同管理,所以需要事先搭建好ZK集群。此文主要介绍搭建Kafka集群环境。
三、详细安装搭建步骤
1.下载压缩包kafka_2.10-0.10.0.0.tgz到/data/soft目录
2.将kafka_2.10-0.10.0.0.tgz解压到/data/app/Kafka目录
tar –xzf kafka_2.10-0.10.0.0.tgz –C /data/app/kafkacluster
把文件夹重命名为19092,进入config目录,修改server.properties文件
3.用vi命令打开server.properties
<span style="color: # [root@centos7 config]# vi server.properties
4.修改如下:
<span style="color: # broker.id=<span style="color: #
<span style="color: # port=<span style="color: #092
<span style="color: # log.dirs=/data/app/kafkacluster/<span style="color: #093/bin/kafka-logs19092
<span style="color: # zookeeper.connect=<span style="color: #2.168.<span style="color: #.18:<span style="color: #01,<span style="color: #2.168.<span style="color: #.18:<span style="color: #02,<span style="color: #2.168.<span style="color: #.18:<span style="color: #03
其他两台服务器上的kafka同上,先修改文件夹名称(在此文另外两个文件夹名称为1)
再进入config目录,分别改server.properties名称为server1.properties和server2.properties
server1.properties中的配置需要改:
<span style="color: # broker.id=<span style="color: #
<span style="color: # port=<span style="color: #093
<span style="color: # log.dirs=/data/app/kafkacluster/<span style="color: #093/bin/kafka-logs19093
<span style="color: # zookeeper.connect=<span style="color: #2.168.<span style="color: #.18:<span style="color: #01,<span style="color: #2.168.<span style="color: #.18:<span style="color: #02,<span style="color: #2.168.<span style="color: #.18:<span style="color: #03
server2.properties中的配置需要改:
<span style="color: # broker.id=<span style="color: #
<span style="color: # port=<span style="color: #094
<span style="color: # log.dirs=/data/app/kafkacluster/<span style="color: #094/bin/kafka-logs19094
<span style="color: # zookeeper.connect=<span style="color: #2.168.<span style="color: #.18:<span style="color: #01,<span style="color: #2.168.<span style="color: #.18:<span style="color: #02,<span style="color: #2.168.<span style="color: #.18:<span style="color: #03
四、启动kafka&测试验证
1.首先启动独立的ZK集群,三台都要启动(./zkServer.sh start)
2.进入到kafka的bin目录,然后启动服务./kafka-server-start.sh ../config/server.properties (三台服务器都要启动)
<span style="color: # ./kafka-server-start.sh ../config/server1.properties
<span style="color: # ./kafka-server-start.sh ../config/server2.properties
另外,启动其他节点的时候,在最先开始启动的节点会显示其它节点加入的信息记录,如下所示:
<span style="color: # [<span style="color: #17-<span style="color: #-<span style="color: # <span style="color: #:<span style="color: #:<span style="color: #,<span style="color: #2] INFO Partition [aaa,<span style="color: #] on broker <span style="color: #: Expanding ISR for partition [aaa,<span style="color: #] from <span style="color: # to <span style="color: #,<span style="color: # (kafka.cluster.Partition)
<span style="color: # [<span style="color: #17-<span style="color: #-<span style="color: # <span style="color: #:<span style="color: #:<span style="color: #,<span style="color: #5] INFO Partition [aaa,<span style="color: #] on broker <span style="color: #: Expanding ISR for partition [aaa,<span style="color: #] from <span style="color: #,<span style="color: # to <span style="color: #,<span style="color: #,<span style="color: # (kafka.cluster.Partition)
3.验证启动进程
<span style="color: # [root@centos7 bin]# jps
<span style="color: # <span style="color: #778 Kafka
<span style="color: # <span style="color: #132 Jps
<span style="color: # <span style="color: #285 Kafka
<span style="color: # <span style="color: #014 QuorumPeerMain
<span style="color: # <span style="color: #064 QuorumPeerMain
<span style="color: # <span style="color: #531 Kafka
<span style="color: # <span style="color: #116 QuorumPeerMain
4.使用客户端进入zk
<span style="color: # [root@centos7 bin]# ./zkCli.sh -server <span style="color: #2.168.<span style="color: #.18:<span style="color: #01
<span style="color: # Connecting to <span style="color: #2.168.<span style="color: #.18:<span style="color: #01
5.查看目录情况
<span style="color: # [zk: <span style="color: #2.168.<span style="color: #.18:<span style="color: #01(CONNECTED) <span style="color: #] ls /
<span style="color: # [controller_epoch, controller, brokers, zookeeper, test, admin, isr_change_notification, consumers, config]
<span style="color: # [zk: <span style="color: #2.168.<span style="color: #.18:<span style="color: #01(CONNECTED) <span style="color: #]
上面的显示结果中:只有zookeeper是zookeeper原生的,其他都是Kafka创建的
6. 创建一个topic:
<span style="color: # [root@centos7 bin]# ./kafka-topics.sh --create --zookeeper <span style="color: #2.168.<span style="color: #.18:<span style="color: #01,<span style="color: #2.168.<span style="color: #.18:<span style="color: #02,<span style="color: #2.168.<span style="color: #.18:<span style="color: #03 --replication-factor <span style="color: # --partitions <span style="color: # --topic test666
<span style="color: # Created topic "test666".
7. 查看topic状态:
<span style="color: # [root@centos7 bin]# ./kafka-topics.sh --describe --zookeeper <span style="color: #2.168.<span style="color: #.18:<span style="color: #01,<span style="color: #2.168.<span style="color: #.18:<span style="color: #02,<span style="color: #2.168.<span style="color: #.18:<span style="color: #03 --topic test666
<span style="color: # Topic:test666
PartitionCount:<span style="color: #
ReplicationFactor:<span style="color: #
<span style="color: #
Topic: test666
Partition: <span style="color: #
Leader: <span style="color: #
Replicas: <span style="color: #,<span style="color: #,<span style="color: #
Isr: <span style="color: #,<span style="color: #,<span style="color: #
输出参数解释:
第一行是对所有分区的一个描述,然后每个分区都会对应一行,因为我们只有一个分区所以下面就只加了一行。
Leader:负责处理消息的读和写,Leader是从所有节点中随机选择的。
Replicas:列出了所有的副本节点,不管节点是否在服务中
Isr:是正在服务中的节点
由上可见,此时的leader是0
下文会kill 0,看leader是否更改
8.往test666中发送消息:
<span style="color: # [root@centos7 bin]# ./kafka-console-producer.sh --broker-list localhost:<span style="color: #092,localhost:<span style="color: #093,localhost:<span style="color: #094 --topic test666
<span style="color: # hello kafka!
<span style="color: # hello littleMonster!
<span style="color: # hello world!
9.接收消息:
<span style="color: # [root@centos7 bin]# ./kafka-console-consumer.sh --zookeeper <span style="color: #2.168.<span style="color: #.18:<span style="color: #01,<span style="color: #2.168.<span style="color: #.18:<span style="color: #02,<span style="color: #2.168.<span style="color: #.18:<span style="color: #03 --topic test666 --from-beginning
<span style="color: # hello kafka!
<span style="color: # hello littleMonster!
<span style="color: # hello world!
消息接收成功。
10.找到为0的leader的进程,并杀死
<span style="color: # [root@centos7 /]# ps -ef | grep ka
<span style="color: # [root@centos7 /]# kill -<span style="color: # <span style="color: #285
11.再次查看topic状态:
<span style="color: # [root@centos7 bin]# ./kafka-topics.sh --describe --zookeeper <span style="color: #2.168.<span style="color: #.18:<span style="color: #01,<span style="color: #2.168.<span style="color: #.18:<span style="color: #02,<span style="color: #2.168.<span style="color: #.18:<span style="color: #03 --topic test666
<span style="color: # Topic:test666
PartitionCount:<span style="color: #
ReplicationFactor:<span style="color: #
<span style="color: #
Topic: test666
Partition: <span style="color: #
Leader: <span style="color: #
Replicas: <span style="color: #,<span style="color: #,<span style="color: #
Isr: <span style="color: #,<span style="color: #
由此可见,在Isr(正在服务中的节点)项,0已消失,新选举出的leader是2。
12.再次发送消息
消息正常接收,该测试通过。
阅读(...) 评论()一、安装、配置
 1.下载
kafka是由linkedin开源的,但是已经托管在了apache,所以需要从apache下载,安装推荐的版本安装就可以了,例如下面0.10.0.0是最新的release,也是推荐稳定的release。建议下载scala 2.11版本(kafka是scala语言开发的)
0.10.0.0 is the latest release. The current stable version is 0.10.0.0.
You can verify your download by following these procedures and using these KEYS.
Release Notes
Source download: kafka-0.10.0.0-src.tgz (asc, md5)
Binary downloads:
Scala 2.10
- kafka_2.10-0.10.0.0.tgz (asc, md5)
Scala 2.11
- kafka_2.11-0.10.0.0.tgz (asc, md5)
We build for multiple versions of Scala. This only matters if you are using Scala and you want a version built for the same Scala version you use. Otherwise any version should work (2.11 is recommended).
2. kafka目录结构
drwxr-xr-x 3 root root
drwxr-xr-x 2 root root
2015 config
drwxr-xr-x 2 root root
-rw-r--r-- 1 root root 11358 Sep
2015 LICENSE
-rw-r--r-- 1 root root
2015 NOTICE
bin: 可执行文件,例如启动关闭kafka,生产者、消费者客户端,zookeeper启动(kafka依赖zookeeper)等等config: kafka的相关配置libs: kafka的类库
kafka有很多配置选项,本次只说明重要的或者必备的一些配置。
[@zw_94_190 /opt/soft/kafka/config]# cat server.properties | grep -v "#" | grep -v "^$"
broker.id=0
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.segment.bytes=
log.retention.check.interval.ms=300000
log.cleaner.enable=false
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=6000
broker的唯一标识
看着名字是日志目录,其实是kafka的持久化文件目录(可以配置多个用逗号隔开)
/tmp/kafka-logs
zookeeper.connect
zookeeper集群地址(多个地址用逗号隔开)
localhost:2181
broker的hostname,类似于bind,一般绑定内网网卡ip
num.partitions
默认每个topic的分片数(如果没有指定的话)
auto.create.topics.enable
是否自动创建topic,例如producer publish不存在的topic就会创建出来,类似es自动创建索引(一般线上系统都false)
default.replication.factor
默认副本数量
4. 完整步骤
cd /opt/soft
wget /kafka/0.8.2.2/kafka_2.10-0.8.2.2.tgz
tar -xvf kafka_2.10-0.8.2.2.tgz
ln -s kafka_2.10-0.8.2.2 kafka
mkdir -p /opt/soft/kafka/data/kafka-logs
5. 设置环境变量
export KAFKA_HOME=/opt/soft/kafka
export PATH=$PATH:$KAFKA_HOME/bin
二、单机单broker(单点)
(1). 修改配置文件 ${kafka_home}/config/server.properties
修改了log.dirs、zookeeper地址、num.partitions个数
[@zw_94_190 /opt/soft/kafka/config]# cat server.properties | grep -v "#" | grep -v "^$"
broker.id=0
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=
log.dirs=/opt/soft/kafka/data/kafka-logs
num.partitions=2
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.segment.bytes=
log.retention.check.interval.ms=300000
log.cleaner.enable=false
zookeeper.connect=10.10.53.159:.53.162:.14.182:2181
${KAFKA_HOME}/bin/kafka-server-start.sh ${KAFKA_HOME}/config/server.properties &
&是用守护进程的形式启动。
${KAFKA_HOME}/bin/kafka-server-start.sh ${KAFKA_HOME}/config/server.properties &
[@zw_94_190 /opt/soft/kafka/bin]# [ 16:09:32,436] INFO Verifying properties (kafka.utils.VerifiableProperties)
[ 16:09:32,475] INFO Property broker.id is overridden to 0 (kafka.utils.VerifiableProperties)
[ 16:09:32,476] INFO Property log.cleaner.enable is overridden to false (kafka.utils.VerifiableProperties)
[ 16:09:32,476] INFO Property log.dirs is overridden to /opt/soft/kafka/kafka-logs (kafka.utils.VerifiableProperties)
[ 16:09:32,476] INFO Property log.retention.check.interval.ms is overridden to 300000 (kafka.utils.VerifiableProperties)
[ 16:09:32,476] INFO Property log.retention.hours is overridden to 168 (kafka.utils.VerifiableProperties)
[ 16:09:32,476] INFO Property log.segment.bytes is overridden to
(kafka.utils.VerifiableProperties)
[ 16:09:32,476] INFO Property num.io.threads is overridden to 8 (kafka.utils.VerifiableProperties)
[ 16:09:32,477] INFO Property num.network.threads is overridden to 3 (kafka.utils.VerifiableProperties)
[ 16:09:32,477] INFO Property num.partitions is overridden to 2 (kafka.utils.VerifiableProperties)
[ 16:09:32,477] INFO Property num.recovery.threads.per.data.dir is overridden to 1 (kafka.utils.VerifiableProperties)
[ 16:09:32,477] INFO Property port is overridden to 9092 (kafka.utils.VerifiableProperties)
[ 16:09:32,477] INFO Property socket.receive.buffer.bytes is overridden to 102400 (kafka.utils.VerifiableProperties)
[ 16:09:32,477] INFO Property socket.request.max.bytes is overridden to
(kafka.utils.VerifiableProperties)
[ 16:09:32,478] INFO Property socket.send.buffer.bytes is overridden to 102400 (kafka.utils.VerifiableProperties)
[ 16:09:32,478] INFO Property zookeeper.connect is overridden to 10.10.53.159:.53.162:.14.182:2181 (kafka.utils.VerifiableProperties)
[ 16:09:32,478] INFO Property zookeeper.connection.timeout.ms is overridden to 6000 (kafka.utils.VerifiableProperties)
[ 16:09:32,519] INFO [Kafka Server 0], starting (kafka.server.KafkaServer)
[ 16:09:32,521] INFO [Kafka Server 0], Connecting to zookeeper on 10.10.53.159:.53.162:.14.182:2181 (kafka.server.KafkaServer)
[ 16:09:32,534] INFO Starting ZkClient event thread. (org.I0Itec.zkclient.ZkEventThread)
[ 16:09:32,543] INFO Client environment:zookeeper.version=3.4.6-1569965, built on 02/20/ GMT (org.apache.zookeeper.ZooKeeper)
[ 16:09:32,543] INFO Client environment:host.name=zw_94_190 (org.apache.zookeeper.ZooKeeper)
[ 16:09:32,543] INFO Client environment:java.version=1.7.0_45 (org.apache.zookeeper.ZooKeeper)
[ 16:09:32,543] INFO Client environment:java.vendor=Oracle Corporation (org.apache.zookeeper.ZooKeeper)
[ 16:09:32,543] INFO Client environment:java.home=/opt/soft/jdk1.7.0_45/jre (org.apache.zookeeper.ZooKeeper)
[ 16:09:32,543] INFO Client environment:java.class.path=:/usr/local/jdk/lib:/usr/local/jdk/jre/lib:/usr/local/jdk/lib:/usr/local/jdk/jre/lib:/usr/local/jdk/lib:/usr/local/jdk/jre/lib:/usr/local/jdk/lib:/usr/local/jdk/jre/lib:/usr/local/jdk/lib:/usr/local/jdk/jre/lib:/opt/soft/kafka/bin/../core/build/dependant-libs-2.10.4*/*.jar:/opt/soft/kafka/bin/../examples/build/libs//kafka-examples*.jar:/opt/soft/kafka/bin/../contrib/hadoop-consumer/build/libs//kafka-hadoop-consumer*.jar:/opt/soft/kafka/bin/../contrib/hadoop-producer/build/libs//kafka-hadoop-producer*.jar:/opt/soft/kafka/bin/../clients/build/libs/kafka-clients*.jar:/opt/soft/kafka/bin/../libs/jopt-simple-3.2.jar:/opt/soft/kafka/bin/../libs/kafka_2.10-0.8.2.2.jar:/opt/soft/kafka/bin/../libs/kafka_2.10-0.8.2.2-javadoc.jar:/opt/soft/kafka/bin/../libs/kafka_2.10-0.8.2.2-scaladoc.jar:/opt/soft/kafka/bin/../libs/kafka_2.10-0.8.2.2-sources.jar:/opt/soft/kafka/bin/../libs/kafka_2.10-0.8.2.2-test.jar:/opt/soft/kafka/bin/../libs/kafka-clients-0.8.2.2.jar:/opt/soft/kafka/bin/../libs/log4j-1.2.16.jar:/opt/soft/kafka/bin/../libs/lz4-1.2.0.jar:/opt/soft/kafka/bin/../libs/metrics-core-2.2.0.jar:/opt/soft/kafka/bin/../libs/scala-library-2.10.4.jar:/opt/soft/kafka/bin/../libs/slf4j-api-1.7.6.jar:/opt/soft/kafka/bin/../libs/slf4j-log4j12-1.6.1.jar:/opt/soft/kafka/bin/../libs/snappy-java-1.1.1.7.jar:/opt/soft/kafka/bin/../libs/zkclient-0.3.jar:/opt/soft/kafka/bin/../libs/zookeeper-3.4.6.jar:/opt/soft/kafka/bin/../core/build/libs/kafka_2.10*.jar (org.apache.zookeeper.ZooKeeper)
[ 16:09:32,543] INFO Client environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib (org.apache.zookeeper.ZooKeeper)
[ 16:09:32,543] INFO Client environment:java.io.tmpdir=/tmp (org.apache.zookeeper.ZooKeeper)
[ 16:09:32,543] INFO Client piler=&NA& (org.apache.zookeeper.ZooKeeper)
[ 16:09:32,543] INFO Client environment:os.name=Linux (org.apache.zookeeper.ZooKeeper)
[ 16:09:32,543] INFO Client environment:os.arch=amd64 (org.apache.zookeeper.ZooKeeper)
[ 16:09:32,543] INFO Client environment:os.version=2.6.32-279.el6.x86_64 (org.apache.zookeeper.ZooKeeper)
[ 16:09:32,543] INFO Client environment:user.name=root (org.apache.zookeeper.ZooKeeper)
[ 16:09:32,543] INFO Client environment:user.home=/root (org.apache.zookeeper.ZooKeeper)
[ 16:09:32,543] INFO Client environment:user.dir=/opt/soft/kafka_2.10-0.8.2.2/bin (org.apache.zookeeper.ZooKeeper)
[ 16:09:32,544] INFO Initiating client connection, connectString=10.10.53.159:.53.162:.14.182:2181 sessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@2abd2c1e (org.apache.zookeeper.ZooKeeper)
[ 16:09:32,565] INFO Opening socket connection to server 10.10.53.162/10.10.53.162:2181. Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn)
[ 16:09:32,569] INFO Socket connection established to 10.10.53.162/10.10.53.162:2181, initiating session (org.apache.zookeeper.ClientCnxn)
[ 16:09:32,581] INFO Session establishment complete on server 10.10.53.162/10.10.53.162:2181, sessionid = 0xc9b71, negotiated timeout = 6000 (org.apache.zookeeper.ClientCnxn)
[ 16:09:32,583] INFO zookeeper state changed (SyncConnected) (org.I0Itec.zkclient.ZkClient)
[ 16:09:32,772] INFO Loading logs. (kafka.log.LogManager)
[ 16:09:32,806] INFO Completed load of log test-0 with log end offset 0 (kafka.log.Log)
[ 16:09:32,814] INFO Logs loading complete. (kafka.log.LogManager)
[ 16:09:32,814] INFO Starting log cleanup with a period of 300000 ms. (kafka.log.LogManager)
[ 16:09:32,817] INFO Starting log flusher with a default period of 4775807 ms. (kafka.log.LogManager)
[ 16:09:32,839] INFO Awaiting socket connections on 0.0.0.0:9092. (kafka.network.Acceptor)
[ 16:09:32,840] INFO [Socket Server on Broker 0], Started (kafka.network.SocketServer)
[ 16:09:32,895] INFO Will not load MX4J, mx4j-tools.jar is not in the classpath (kafka.utils.Mx4jLoader$)
[ 16:09:32,920] INFO 0 successfully elected as leader (kafka.server.ZookeeperLeaderElector)
[ 16:09:33,089] INFO Registered broker 0 at path /brokers/ids/0 with address zw_94_190:9092. (kafka.utils.ZkUtils$)
[ 16:09:33,094] INFO New leader is 0 (kafka.server.ZookeeperLeaderElector$LeaderChangeListener)
[ 16:09:33,101] INFO [Kafka Server 0], started (kafka.server.KafkaServer)
[ 16:09:33,248] INFO [ReplicaFetcherManager on broker 0] Removed fetcher for partitions [test,0] (kafka.server.ReplicaFetcherManager)
[ 16:09:33,284] INFO [ReplicaFetcherManager on broker 0] Removed fetcher for partitions [test,0] (kafka.server.ReplicaFetcherManager)
(3) 创建topic
${KAFKA_HOME}/bin/kafka-topics.sh --create --zookeeper 10.10.53.159:.53.162:.14.182:2181 --replication-factor 1 --partitions 1 --topic test_topic
Created topic "test_topic".
[ 16:13:54,131] INFO [ReplicaFetcherManager on broker 0] Removed fetcher for partitions [test_topic,0] (kafka.server.ReplicaFetcherManager)
[ 16:13:54,136] INFO Completed load of log test_topic-0 with log end offset 0 (kafka.log.Log)
[ 16:13:54,140] INFO Created log for partition [test_topic,0] in /opt/soft/kafka/kafka-logs with properties {segment.index.bytes -& , file.delete.delay.ms -& 60000, segment.bytes -& , flush.ms -& 4775807, delete.retention.ms -& , index.interval.bytes -& 4096, retention.bytes -& -1, min.insync.replicas -& 1, cleanup.policy -& delete, unclean.leader.election.enable -& true, segment.ms -& , max.message.bytes -& 1000012, flush.messages -& 4775807, min.cleanable.dirty.ratio -& 0.5, retention.ms -& , segment.jitter.ms -& 0}. (kafka.log.LogManager)
[ 16:13:54,141] WARN Partition [test_topic,0] on broker 0: No checkpointed highwatermark is found for partition [test_topic,0] (kafka.cluster.Partition
(4) 查看topic列表
${KAFKA_HOME}/bin/kafka-topics.sh --list --zookeeper 10.10.53.159:.53.162:.14.182:2181
test_topic
(5) producer
${KAFKA_HOME}/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test_topic
[ 16:17:34,545] WARN Property topic is not valid (kafka.utils.VerifiableProperties)
[ 16:17:41,705] INFO Closing socket connection to /127.0.0.1. (kafka.network.Processor)
[ 16:19:27,978] INFO Closing socket connection to /127.0.0.1. (kafka.network.Processor)
(6) consumer
${KAFKA_HOME}/bin/kafka-console-consumer.sh --zookeeper 10.10.53.159:.53.162:.14.182:2181 --topic test_topic --from-beginning
后启动的consumer依然能消费到消息,证明消息可以持久化,有关内部的实现原理以后的文章可能会介绍。
三、单机多broker(伪分布式)
添加三个配置文件(主要区别在broker.id,port,log.dirs,剩下的和第二章没有任何区别)
(a) server-9093.properties
broker.id=1
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=
log.dirs=/opt/soft/kafka/data/kafka-0-logs
num.partitions=2
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.segment.bytes=
log.retention.check.interval.ms=300000
log.cleaner.enable=false
zookeeper.connect=10.10.53.159:.53.162:.14.182:2181
(b) server-9094.properties
broker.id=2
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=
log.dirs=/opt/soft/kafka/data/kafka-1-logs
num.partitions=2
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.segment.bytes=
log.retention.check.interval.ms=300000
log.cleaner.enable=false
zookeeper.connect=10.10.53.159:.53.162:.14.182:2181
(c) server-9095.properties
broker.id=3
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=
log.dirs=/opt/soft/kafka/data/kafka-2-logs
num.partitions=2
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.segment.bytes=
log.retention.check.interval.ms=300000
log.cleaner.enable=false
zookeeper.connect=10.10.53.159:.53.162:.14.182:2181
(2) 启动三个broker
${KAFKA_HOME}/bin/kafka-server-start.sh ${KAFKA_HOME}/config/server-9093.properties &
${KAFKA_HOME}/bin/kafka-server-start.sh ${KAFKA_HOME}/config/server-9094.properties &
${KAFKA_HOME}/bin/kafka-server-start.sh ${KAFKA_HOME}/config/server-9095.properties &
(3) 创建topic
${KAFKA_HOME}/bin/kafka-topics.sh --create --zookeeper 10.10.53.159:.53.162:.14.182:2181 --replication-factor 3 --partitions 1 --topic hello_topic
(4) 查看topic列表
${KAFKA_HOME}/bin/kafka-topics.sh --list --zookeeper 10.10.53.159:.53.162:.14.182:2181
(5) producer
${KAFKA_HOME}/bin/kafka-console-producer.sh --broker-list localhost:9093,localhost:9094,localhost:9095 --topic hello_topic
(6) consumer
${KAFKA_HOME}/bin/kafka-console-consumer.sh --zookeeper 10.10.53.159:.53.162:.14.182:2181 --topic hello_topic --from-beginning
四、多机多broker(分布式)
和第三章差不多,只不过用了多台机器,最好加上一个host.name
五、 一些总结
1.一套zookeper配置一套broker。2.生产环境要使用多机多实例。3.最好不要使用自动创建topic功能。4.producer依赖的资源是broker-list。(对应的bin下的可执行文件)5.consumer依赖的资源是zookeeper-list。(对应的bin下的可执行文件)6.目前看broker之间不会通信,分布式的实现比较依赖于zookeeper。
浏览: 250946 次
来自: 北京
浏览量:12400
浏览量:68862
浏览量:65296
驻听秀 写道您好,我们最近实验室的项目要用到redis,所以准 ...
您好,我们最近实验室的项目要用到redis,所以准备学习一下r ...
刚买了一本,redis 书籍市面上很少,有的介绍的都比较浅显
请问从哪里看出慢查询的,怎么查找的能具体说说嘛
请问,redis的主从复制能不能换个机制?从节点在客户端要查询 ...
(window.slotbydup=window.slotbydup || []).push({
id: '4773203',
container: s,
size: '200,200',
display: 'inlay-fix'}

我要回帖

更多关于 kafka环境变量配置 的文章

更多推荐

版权声明:文章内容来源于网络,版权归原作者所有,如有侵权请点击这里与我们联系,我们将及时删除。

点击添加站长微信