Kafka必知必会

Kafka必知必会

Kafka是一种高吞吐量的分布式发布订阅消息系统,它是一个开源的流处理平台,由Scala和Java编写。

消息系统分类

“消息队列”是在消息的传输过程中保存消息的容器。
消息系统可以分为两大类:

  • 点对点(Peer-to-Peer):发送到队列中的消息被一个而且仅仅一个接收者所接收。即使有多个接收者在同一个消息队列中监听同一个消息。
  • 发布/订阅:发布到一个主题的消息,可以被多个订阅者所接收。

设计目标

高吞吐量:在廉价的商用机器上单机可支持每秒100万条消息的读写
消息持久化:所有消息均被持久化到磁盘中,无消息丢失,支持消息重放
完全分布式:Producer、Broker、Consumer均支持水平扩展
同时支持在线流处理和离线批处理

常用术语

broker

Kafka以集群的方式运行,可以由一个或者多个服务组成,每一个服务叫做一个broker。

生产者:Producer

向boker发布消息的应用程序。
Producer将消息发布到它指定的Topic中,并负责决定发布到哪个分区。通常简单的由负载均衡机制随机选择分区,但也可以通过特定的分区函数(key值)选择分区,使用更多的是第二种。

消费者:Consumer

从消息队列(Kafka)中请求消息的客户端应用程序。
每一个consumer实例都属于一个Consumer Group。
每一条消息只会被同一个Consumer Group里的一个Consumer实例消费。
不同的Consumer Group可以同时消费同一条消息。

主题:Topic

一个主题类似于新闻中的体育、娱乐、教育等分类概念。

分区:Partition

一个Topic中的消息数据按照多个分区组织,分区是Kafka消息队列组织的最小单位。

PUSH&PULL

Producer向Broker中PUSH消息,Consumer从Broker中PULL消息。
PUSH模式的目标是尽可能以最快的速度传递消息,但是这样很容易造成Consumer来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。
PULL模式则可以根据Consumer的消费能力以适当的速率消费消息。

Kafka的安装部署

节点规划

节点 规划
node03 ZooKeeper、Kafka
node04 ZooKeeper、Kafka
node05 ZooKeeper、Kafka

安装部署

官网下载Kafka:https://kafka.apache.org/downloads。这里以kafka_2.12-3.0.0.tgz为例。

1
2
3
4
5
6
7
8
9
10
11
12
## 解压文件
[root@node03 tools]# tar -zxvf kafka_2.12-3.0.0.tgz -C /opt/
## 文件重命名
[root@node03 tools]# cd /opt/
[root@node03 opt]# mv kafka_2.12-3.0.0/ kafka
## 配置环境变量
[root@node03 opt]# vim /etc/profile
## KAFKA_HOME
export KAFKA_HOME=/opt/kafka
export PATH=$KAFKA_HOME/bin:$PATH
## 使配置生效
[root@node03 opt]# source /etc/profile

修改配置文件

这里主要是server.properties配置文件的修改。

1
2
3
4
5
6
7
[root@node03 config]# vim server.properties
## broker 的全局唯一编号,不能重复,只能是数字。分别将node3-5的id设为0-2
broker.id=0
## 日志文件存放目录
log.dirs=/opt/kafka/logs
## ZooKeeper集群信息,逗号隔开
zookeeper.connect=node03:2181,node04:2181,node05:2181

分发到其他节点

这里包括/etc/profileserver.properties。注意分发完成之后需要修改broker.id以及使配置文件生效。

1
2
3
4
5
6
7
8
9
10
11
[root@node03 opt]# scp -r  /opt/kafka root@node04:/opt/
[root@node04 kafka]# vim config/server.properties
broker.id=1
[root@node03 opt]# scp /etc/profile root@node04:/etc/
[root@node04 kafka]# source /etc/profile

[root@node03 opt]# scp -r /opt/kafka root@node05:/opt/
[root@node05 kafka]# vim config/server.properties
broker.id=2
[root@node03 opt]# scp /etc/profile root@node05:/etc/
[root@node05 kafka]# source /etc/profile

集群启动

注意:这里需要先启动ZooKeeper集群,再启动Kafka集群。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
## ZooKeeper集群启动:node03、node04、node05各节点依次启动ZooKeeper
[root@node03 opt]# /opt/zookeeper-3.5.7/bin/zkServer.sh start
[root@node04 opt]# /opt/zookeeper-3.5.7/bin/zkServer.sh start
[root@node05 opt]# /opt/zookeeper-3.5.7/bin/zkServer.sh start

## Kafka集群启动:node03、node04、node05各节点依次启动Kakfa
[root@node03 kafka]# bin/kafka-server-start.sh -daemon config/server.properties
[root@node04 kafka]# bin/kafka-server-start.sh -daemon config/server.properties
[root@node05 kafka]# bin/kafka-server-start.sh -daemon config/server.properties
## 查看启动之后的进程
[root@node03 kafka]# jps
4231 QuorumPeerMain
5191 Kafka
5295 Jps

## Kafka集群关闭:node03、node04、node05各节点依次关闭Kakfa
[root@node03 kafka]# bin/kafka-server-stop.sh
[root@node03 kafka]# bin/kafka-server-stop.sh
[root@node03 kafka]# bin/kafka-server-stop.sh

常用Shell命令

下面是kafka的基本参数

参数 解释
–bootstrap-server 连接的 Kafka Broker 主机名称和端口号。
–topic 操作的 topic 名称。
–create 创建主题
–delete 删除主题
–alter 修改主题
–list 查看主题
–describe 查看主题详情描述
–partitions 设置分区数
–replication-factor 设置分区副本
–config<k=v> 更新系统默认的配置

下面是具体操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
## 查看Topic列表
[root@node03 kafka]# bin/kafka-topics.sh --bootstrap-server node03:9092 --list

## 创建Topic
[root@node03 kafka]# bin/kafka-topics.sh --bootstrap-server node03:9092 --create --topic topic01 --partitions 1 --replication-factor 3
Created topic topic01.

## 查看Topic详细信息
[root@node03 kafka]# bin/kafka-topics.sh --bootstrap-server node03:9092 --describe --topic topic01

## 修改分区数(注意:分区数只能增加,不能减少)
[root@node03 kafka]# bin/kafka-topics.sh --bootstrap-server node03:9092 --alter --topic topic01 --partitions 3

## 删除Topic。
[root@node03 kafka]# bin/kafka-topics.sh --bootstrap-server node03:9092 --delete --topic topic01

## 生产者生产数据
[root@node03 kafka]# bin/kafka-console-producer.sh --bootstrap-server node03:9092 --topic topic01
>hello,world
>hello,kakfa

## 消费者消费数据
[root@node03 kafka]# bin/kafka-console-consumer.sh --bootstrap-server node03:9092 --topic topic01
hello,scala

## 消费者消费数据(包括历史数据)
[root@node03 kafka]# bin/kafka-console-consumer.sh --bootstrap-server node03:9092 --from-beginning --topic topic01
hello,world
hello,kakfa
hello,scala

客户端API操作

pom依赖

1
2
3
4
5
6
7
8
9
10
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.70</version>
</dependency>

Producer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
import com.alibaba.fastjson.JSON;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

public class Producer {

// 指定主题
private final static String TOPIC_NAME = "topic02";
// 集群用逗号分隔
private final static String BOOTSTRAP_SERVERS = "node03:9092,node04:9092,node05:9092";

public static void main(String[] args) throws InterruptedException, ExecutionException {
// 1.设置生产者的配置信息
Properties properties = new Properties();
//指定连接的 kafka集群
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
// kv序列化
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

// 2.创建生产者对象。通过配置信息
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);

// 3.发送消息
for (int i = 1; i <= 5; i++) {
//构建消息数据。Student为实体类,用于封装JSON对象。
Student student = new Student(i, "jack" + i, 10 + i);
//将消息数据封装成 ProducerRecord 对象。未指定发送分区,未指定Key
ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(TOPIC_NAME, JSON.toJSONString(student));

//异步方式发送消息
Future<RecordMetadata> future = producer.send(producerRecord);
// 同步方式发送:只需在异步发送的基础上,再调用一下 get()方法即可。
RecordMetadata metadata = future.get();
System.out.println("同步方式发送消息结果:" + "topic:" + metadata.topic() + "|partition:"
+ metadata.partition() + "|offset:" + metadata.offset());

}
//方便测试
TimeUnit.SECONDS.sleep(10);
//4.关闭资源
producer.close();
}
}

Consumer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

public class Consumer {

// 指定主题
private final static String TOPIC_NAME = "topic01";
// 指定消费者组(必填,组名可任意填入)
private final static String CONSUMER_GROUP_NAME = "testGroup";
// 集群用逗号分隔
private final static String BOOTSTRAP_SERVERS = "node03:9092,node04:9092,node05:9092";

public static void main(String[] args) {

// 1.设置消费者的配置信息
Properties properties = new Properties();
//指定连接的 kafka集群
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
// 消费分组名
properties.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP_NAME);
//key,value的序列化:这里把 key和 value从字符串序列化为字节数组
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

// 2.创建消费者对象。通过配置信息
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);

// 3.消费者对象订阅主题
// 直接订阅主题
consumer.subscribe(Arrays.asList(TOPIC_NAME));

//4. 获取/消费消息
while (true) {
/**
* 获取消费消息
* poll():表示拉取消息的长轮询
*/
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));

//4.2 处理消息
for (ConsumerRecord<String, String> record : records) {
// 直接输出
System.out.println("value: " + record.value());
}
}
}
}
打赏
  • 版权声明: 本博客所有文章除特别声明外,著作权归作者所有。转载请注明出处!
  • Copyrights © 2015-2023 henrrywan

请我喝杯咖啡吧~

支付宝
微信