Kafka实战

实验环境为Ubuntu。

安装

Kafka的安装非常简单,只需要下载解压就可以了。需要注意的是Kafka依赖于Java环境,所以确保你的系统中装有JDK。

//安装sun默认JDK
drfish@kafka-5934:~# sudo apt-get install default-jdk
//下载Kafka并解压
drfish@kafka-5934:~$ wget http://apache.mirrors.lucidnetworks.net/kafka/0.9.0.0/kafka_2.11-0.9.0.0.tgz
jshen4@kafka-5934:~$ tar -xzf kafka_2.11-0.9.0.0.tgz 
jshen4@kafka-5934:~$ cd kafka_2.11-0.9.0.0/

启动

Kafka的运行是依赖于Zookeeper的,Zookeeper是一个为分布式应用提供一致性服务的软件,提供的功能包括:配置维护、名字服务、分布式同步、组服务等。在这里我们直接用Kafka自带的Zookeeper即可。

//启动Zookeeper
drfish@kafka-5934:~/kafka_2.11-0.9.0.0$ bin/zookeeper-server-start.sh config/zookeeper.properties &
//启动Kafka
drfish@kafka-5934:~/kafka_2.11-0.9.0.0$ bin/kafka-server-start.sh config/server.
properties &

创建话题

所有消息要发布到相应的话题下,我们来创建一个测试话题。

//创建一个名为test的topic
drfish@kafka-5934:~/kafka_2.11-0.9.0.0$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
//查看现有的topic,正常情况应输出test和一些日志
drfish@kafka-5934:~/kafka_2.11-0.9.0.0$ bin/kafka-topics.sh --list --zookeeper l
ocalhost:2181

发送消息

现在可以通过生产者脚本来发布消息了,运行脚本后就可在命令行输入消息。

drfish@kafka-5934:~/kafka_2.11-0.9.0.0$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
This is a message.

接受消息

创建一个消费者来接收消息,通过自带的消费者脚本可以简单的把接受的消息输出,在命令行中可以看见刚刚输入的那条信息“This is a message.”。

drfish@kafka-5934:~/kafka_2.11-0.9.0.0$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
This is a message.

到这里一条最基本的流程就走通了,下面来尝试一下Kafka的多节点集群。

多节点集群尝试

在启动新节点之前,首先要修改配置文件,因为已经有一个Kafka进程在运行中,我们要保证新的Kafka节点不与它冲突。运行下面的命令,对相应文件做如下修改:

config/server-1.properties:
broker.id=1
listeners=PLAINTEXT://:9093
log.dir=/tmp/kafka-logs-1

config/server-2.properties:
broker.id=2
listeners=PLAINTEXT://:9094
log.dir=/tmp/kafka-logs-2

//配置节点启动文件
drfish@kafka-5934:~/kafka_2.11-0.9.0.0$ cp config/server.properties config/server-1.properties
drfish@kafka-5934:~/kafka_2.11-0.9.0.0$ cp config/server.properties config/serve
r-2.properties
drfish@kafka-5934:~/kafka_2.11-0.9.0.0$ vi config/server-1.properties 
drfish@kafka-5934:~/kafka_2.11-0.9.0.0$ vi config/server-2.properties 
//启动新节点
drfish@kafka-5934:~/kafka_2.11-0.9.0.0$ bin/kafka-server-start.sh config/server-1.properties &
drfish@kafka-5934:~/kafka_2.11-0.9.0.0$ bin/kafka-server-start.sh config/server-2.properties &

接下去要创建能够被多个节点接收的话题。从返回的结果可以看出,主节点是节点0,节点1和2是从节点。而Isr后面的节点是指与主节点同步的节点。

//创建一个叫做my-replicated-topic的话题,只有一个分区,但备份因子为3
drfish@kafka-5934:~/kafka_2.11-0.9.0.0$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
 //查看新创建话题的结果
drfish@kafka-5934:~/kafka_2.11-0.9.0.0$  bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic       PartitionCount:1        ReplicationFactor:3     Configs:
        Topic: my-replicated-topic      Partition: 0    Leader: 0       Replicas: 0,1,2 Isr: 0,1,2

测试一下消息的发布与接收。

//向新的话题发布消息
drfish@kafka-5934:~/kafka_2.11-0.9.0.0$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic
new message
//消费者读取消息
drfish@kafka-5934:~/kafka_2.11-0.9.0.0$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic my-replicated-topic
new message

既然有了备份节点,我们就来看一下它们的实际作用,我们通过关闭主节点进程来模拟主节点异常的情况,测试从节点能否继续正常完成主节点应该做的工作。

//查看我们的主节点运行在哪一个后台命令中,由于主节点是0,即2号命令
drfish@kafka-5934:~/kafka_2.11-0.9.0.0$ jobs
[1]   Running                 bin/zookeeper-server-start.sh config/zookeeper.properties &
[2]   Running                 bin/kafka-server-start.sh config/server.properties &
[3]-  Running                 bin/kafka-server-start.sh config/server-1.properties &
[4]+  Running                 bin/kafka-server-start.sh config/server-2.properties &
//将对应的命令调到前台运行,并通过^C终止命令
drfish@kafka-5934:~/kafka_2.11-0.9.0.0$ fg 2
//在终止了上面一条命令后发现如下日志,新的主节点是原来的从节点1
[2015-12-07 23:55:53,118] INFO [Kafka Server 0], shut down completed (kafka.server.KafkaServer)
[2015-12-07 23:55:53,158] INFO New leader is 1 (kafka.server.ZookeeperLeaderElector$LeaderChangeListener)
//查看话题消息来验证我们的发现
drfish@kafka-5934:~/kafka_2.11-0.9.0.0$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic       PartitionCount:1        ReplicationFactor:3     Configs:
        Topic: my-replicated-topic      Partition: 0    Leader: 1       Replicas: 0,1,2 Isr: 1,2
//看新的主节点能否处理消息
drfish@kafka-5934:~/kafka_2.11-0.9.0.0$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
new message

从上面的实验结果可以看出,当主节点0发生异常时,从节点1变为主节点,此时节点0仍在这个备份组里,但它已经不与其它节点同步(通过Isr属性看出)。接下去做最后一个实验
,如果现在我们重启节点一会怎么样呢?

//重启节点1
drfish@kafka-5934:~/kafka_2.11-0.9.0.0$ bin/kafka-server-start.sh config/server.
properties &
//查看话题
drfish@kafka-5934:~/kafka_2.11-0.9.0.0$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic       PartitionCount:1        ReplicationFactor:3     Configs:
        Topic: my-replicated-topic      Partition: 0    Leader: 1       Replicas: 0,1,2 Isr: 1,2,0

我们可以看出来节点1又处在同步状态了,我们可以看出Kafka节点的可用性是非常好的,如果节点出现异常,它会临时把该节点废弃,一旦当节点恢复正常,它又使节点进行正常的备份工作。

文章导航