05、Kafka 操作命令

05、Kafka 操作命令

1、主题命令

(1)创建主题

kafka-topics.sh --create --bootstrap-server 192.168.135.132:9092,192.168.135.133:9092,192.168.135.134:9092 --topic test1 --partitions 4 --replication-factor 3

–bootstrap-server:设置kafka执行节点

–topic:主题名称

–partitions:设置分区数,可以用于并发消费。

–replication-factor:设置副本因子,数量不能大于kafka节点数。

(2)查看主题

kafka-topics.sh --list --bootstrap-server 192.168.135.132:9092,192.168.135.133:9092,192.168.135.134:9092

image-20240509122153192

(3)在我们配置的文件夹下,

/usr/local/kafka_2.12-3.7.0/data

就可以看到topic对应的文件夹,其中 0,1,2,3是因为我们指定的partitions为4,创建了4个分区。

image-20240509160712720

在其他两台机器上,也同样有这三个文件夹,是因为我们的replication-factor 为3,表示会有三个副本,刚好对应三台机器。

(3)查询主题描述信息

kafka-topics.sh --describe --bootstrap-server 192.168.135.132:9092,192.168.135.133:9092,192.168.135.134:9092 --topic test1

image-20240509161337423

ReplicationFactor 表示副本数为3

Partition:表示当前的分区号

Replicas:表示副本存放的机器

Leader:表示三个副本中的leader

Isr:表示当前可用的机器id

(4)修改主题

分区数partitions可以修改,只能比原来的大。

replication-factor 一旦确定就不能修改了。

kafka-topics.sh --alter --bootstrap-server 192.168.135.132:9092,192.168.135.133:9092,192.168.135.134:9092 --topic test1 --partitions 6

image-20240509162034581

(5)删除主题

kafka-topics.sh --delete --bootstrap-server 192.168.135.132:9092,192.168.135.133:9092,192.168.135.134:9092 --topic test

image-20240509165305344

2、生产消息

给test1主题,发送消息

[root@localhost bin]# kafka-console-producer.sh --bootstrap-server 192.168.135.132:9092,192.168.135.133:9092,192.168.135.134:9092 --topic test1
>hello world

如果给一个不存在的主题,发送消息:

kafka-console-producer.sh --bootstrap-server 192.168.135.132:9092,192.168.135.133:9092,192.168.135.134:9092 --topic test2
>message1

image-20240509171549191

第一个消息发送后会有警告,后续发送消息没有警告,因为会自动创建topic,并且指定

partitions 和 replication-factor 都为1。

kafka-topics.sh --describe --bootstrap-server 192.168.135.132:9092,192.168.135.133:9092,192.168.135.134:9092 --topic test2

image-20240509171756305

但是,建议 topic 还是手动创建,应该 partitions 是可以修改的,但是 replication-factor

是不允许修改的。

3、消费消息

(1)消费最新数据

消费 test1 主题下的消息:

kafka-console-consumer.sh --bootstrap-server 192.168.135.132:9092,192.168.135.133:9092,192.168.135.134:9092 --topic test1

我们再启动一个生产者来发送消息:

kafka-console-producer.sh --bootstrap-server 192.168.135.132:9092,192.168.135.133:9092,192.168.135.134:9092 --topic test1
>hello world

image-20240509172354876

此时查看我们的消费者,就会打印接受到的消息:

image-20240509172430840

上面消费者,只能消费最新的数据,无法消费历史数据。

(2)消费历史数据

消费 test1 主题下的历史消息:

  • –from-beginning 表示从头开始消费:
kafka-console-consumer.sh --bootstrap-server 192.168.135.132:9092,192.168.135.133:9092,192.168.135.134:9092 --topic test1 --from-beginning

image-20240509173256632

  • –offset earliest --partition 1 表示从1号分区的头部开始消费:
kafka-console-consumer.sh --bootstrap-server 192.168.135.132:9092,192.168.135.133:9092,192.168.135.134:9092 --topic test1 --offset earliest --partition 1

image-20240509173418365

  • –offset latest --partition 1 表示从1号分区的尾部开始消费:
kafka-console-consumer.sh --bootstrap-server 192.168.135.132:9092,192.168.135.133:9092,192.168.135.134:9092 --topic test1 --offset latest --partition 1

image-20240509173610825

  • –offset 2 --partition 1 从1号分区的offset 为2的位置开始消费:
kafka-console-consumer.sh --bootstrap-server 192.168.135.132:9092,192.168.135.133:9092,192.168.135.134:9092 --topic test1 --offset 2 --partition 1

image-20240509173747666

4、消费者组

消费者组其实就是一个容器,可以容纳若干个消费者,每一个消费者必须被包含在一个消费者组里面。

(1)通过 --group 来指定消费者组。

kafka-console-consumer.sh --bootstrap-server 192.168.135.132:9092,192.168.135.133:9092,192.168.135.134:9092 --topic test1 --group my-group1

如果分组存在,则会将消费者添加到对应分组下,如果不存在,则会创建消费者组。

(2)查看已有的消费者组

kafka-consumer-groups.sh --bootstrap-server 192.168.135.132:9092,192.168.135.133:9092,192.168.135.134:9092 --list

image-20240509174505505

(3)删除消费者组

删除消费者组,必须要先保证消费者组下没有消费者:

kafka-consumer-groups.sh --bootstrap-server 192.168.135.132:9092,192.168.135.133:9092,192.168.135.134:9092 --delete --group my-group1

image-20240509174700845

(4)查看消费的位置

kafka-consumer-groups.sh --describe  --bootstrap-server 192.168.135.132:9092,192.168.135.133:9092,192.168.135.134:9092 --group my-group1

image-20240509175128763

PARTITION:表示分区号

CURRENT-OFFSET:当前消费到的offset下标

LOG-END-OFFSET:当前分区最新的offset下标

LAG:是偏移量,未消费的数量。

消费详情:

kafka消费者在消费数据的时候,都是分组消费的,不同的消费者组之间没有影响,都会去消费。在同一个组内消费,一条消息只能被一个消费者消费,同时一个分区数据只能被一个消费者消费,如果消费者数量大于分区数,则多余出来的消费者永远不会消费消息。如果分区数大于消费者,则会均匀的将分区分配给多个消费者。

image-20240510105221038

因此,kafka 的 topic 的 partition 个数代表是 kafka 的 topic 的并行度,同一时间最多可以有多个线程来消费 topic 的数据,所以如果要提高 kafka 的 topic 的消费能力,应该增大 partition 的个数。

5、手动平衡 leader:

# 使用的脚本是:kafka-leader-election.sh
# 必要的参数:
# --bootstrap-server:指定服务器列表
# --election-type:选举的类型,默认选择preferred即可
# 下面的三个参数三选一:
# --all-topic-partitions:平衡所有的主题、所有的分区
# --topic:平衡指定的主题,如果选择这个参数,则必须使用--partition指定分区
# --path-to-json-file:将需要平衡的主题、分区信息写入一个json文件,指定这个文件
#     json的格式: {"partitions": [{"topic": "test", "partition": 1}, {"topic": "test", "partition": 2}]}

[root@qianfeng01 data]$ kafka-leader-election.sh \
--bootstrap-server 192.168.135.132:9092,192.168.135.133:9092,192.168.135.134:9092 \
--election-type preferred \
--all-topic-partitions

6、kafka 自带的压力测试工具

用Kafka官方自带的脚本,对Kafka进行压测。Kafka压测时,可以查看到哪个地方出现了瓶颈(CPU,内存,网络IO)。

一般都是网络IO达到瓶颈。

kafka-consumer-perf-test.sh

kafka-producer-perf-test.sh

(1)生产者Producer压测

kafka-producer-perf-test.sh \
--topic test \
--record-size 100 \
--num-records 100000 \
--throughput -1 \
--producer-props bootstrap.servers=192.168.135.132:9092,192.168.135.133:9092,192.168.135.134:9092

record-size:一条信息有多大,单位字节

num-records:总共发送多少条信息

throughput:每秒多少条信息,设置成-1,表示不限流,可测生产者最大吞吐量

producer-props:发送端端消息配置

结果:

100000 records sent, 27495.188342 records/sec (2.62 MB/sec), 1461.75 ms avg latency, 2183.00 ms max latency, 1696 ms 50th, 2103 ms 95th, 2177 ms 99th, 2181 ms 99.9th.

解析:

一共写入10万条消息

吞吐量为2.62 MB/sec

每次写入的平均延迟为1461.75ms

最大延迟2183.00 ms

(2)消费者 Consumer 压力测试

consumer测试,如果这四个指标(IO,CPU,内存,网络)都不能改变,考虑增加分区数来提升性能

kafka-consumer-perf-test.sh \
--broker-list hadoop100:9092 \
--topic test \
--fetch-size 10000 \
--messages 10000000 \
--threads 1

broker-list:节点地址

topic:指定topic名称

fetch-size:指定每个fetch的数据大小

messages:总共要消费的消息个数

结果:

start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec, rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec

2020-06-27 13:17:57:490, 2020-06-27 13:18:11:751, 20.0272, 1.4043, 210000, 14725.4751, 1593235077858, -1593235063597, -0.0000, -0.0001

解释:

开始时间

结束时间

共消费数据:20.0272M

吞吐量:1.4043MB/s

共消费数据:210000条

平均每秒消费:14725.4751条

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/609961.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Istio 流量管理(请求路由、流量转移、请求重试、流量镜像、故障注入、熔断等)介绍及使用

一、Istio 流量管理 Istio是一个开源的服务网格,它为分布式微服务架构提供了网络层的抽象。它使得服务之间的通信变得更为可靠、安全,并且提供了细粒度的流量管理、监控和策略实施功能。Istio通过在服务之间插入一个透明的代理(Envoy&#x…

防静电段码屏驱动VK2C23适用于水电气表以及工控仪表类产品

VK2C23是一个点阵式存储映射的LCD驱动器,可支持最大224点(56SEGx4COM)或者最大416点(52SEGx8COM)的LCD屏。单片机可通过I2C接口配置显示参数和读写显示数据,也可通过指令进入省电模式。其高抗干扰&#xff…

Django实验(远程访问+图片显示)

众所周知,Python除了不能生孩子什么都会。Python也是可以做web服务的。 Python做web有一个重点优势是:做一个快速的AI Demo。 第一步:安装一个版本5.0以上django 第二步:构建咱们的Django工程,我取名为BBQ django-adm…

static静态成员变量和静态方法

当有new创建一个对象的,里面属性和方法,通过构造函数,能定义多个不同的对象,在我们做面向对象开发的时候,给一个场景,人在一个班级的时候,你的老师可能是固定的。 当我们用构造方法去构造的时候,每次都去传递一个固定的实参去定义个老师。 这样好会显得代码非常的…

Linux网络编程:TCP并发服务器实现

目录 1、前言 2、多进程代码实现 2.1 创建新的进程 2.2 客户端接收响应函数 2.3 僵尸进程处理 2.4 完整代码 2.5 代码测试 3、多线程代码实现 3.1 创建新的线程 3.2 线程函数定义 3.3 完整代码 3.4 代码测试 4、总结 1、前言 前面实现了基本的TCP编程&#xf…

理解机器学习中的类别不平衡问题

大家好,实际世界的数据集通常是杂乱的,当不同类别之间的样本分布不均匀时,就会出现类别不平衡。或者说,某些类别的样本比其他类别多得多。例如,考虑一个信用卡违约数据集,信用卡违约是一个相对较少发生的事件&#xff…

Java入门基础学习笔记2——JDK的选择下载安装

搭建Java的开发环境: Java的产品叫JDK(Java Development Kit: Java开发者工具包),必须安装JDK才能使用Java。 JDK的发展史: LTS:Long-term Support:长期支持版。指的Java会对这些版…

Sass语法介绍-变量介绍

02 【Sass语法介绍-变量】 sass有两种语法格式Sass(早期的缩进格式:Indented Sass)和SCSS(Sassy CSS) 目前最常用的是SCSS,任何css文件将后缀改为scss,都可以直接使用Sassy CSS语法编写。 所有有效的 CSS 也同样都是有效的 SCSS。 Sass语…

javaMail快速部署——发邮件喽~

目录 功能阐述 前序步骤 (1)到QQ邮箱中获取到授权码 代码实现 坑 今天在写一个修改密码的功能的时候要用到邮箱的发送,然后因为这个项目比较老旧了,采用的是javaWeb和jsp的配置,对于我只使用过springBoot整合的ja…

京东手势验证码-YOLO姿态识别+Bézier curve轨迹拟合

这次给老铁们带来的是京东手势验证码的识别。 目标网站:https://plogin.m.jd.com/mreg/index 验证码如下图: 当第一眼看到这个验证码的时候,就头大了,这玩意咋识别??? 静下心来细想后的一个方案&#xf…

JavaWeb中的Session和Cookie

前言 什么是会话跟踪技术 Cookie 1.什么是cookie 2.Cookie的应用 2.1 保持用户登录状态 2.2 记录用户名 3. Cookie的设置和获取 3.1 、通过HttpServletResponse.addCookie的方式设置Cookie 3.2、浏览器中查看cookie的内容 3.3、服务端获取客户端携带的cookie&#xf…

240+ Stylized Arctic Textures - Snow, Ice More

240+风格化的雪、冰、雪岩和其他雪纹理的集合,用于北极风格化/幻想/rpg风格的游戏环境。 在这个系列中,你会在风格化/幻想/rpg风格的游戏中找到大量适合北极和其他雪地环境的纹理——雪、冰、雪地岩石、雪地草、雪地砾石、雪地等等! 每个纹理都是可平铺/无缝的,并与各种不同…

C++语法|进程虚拟地址空间和函数调用栈

本文来自施磊老师的课程,老师讲的非常不错,我的笔记也是囫囵吞枣全部记下,但是我在这里推荐一本书,真的真的建议初学C或者想要进阶C的同学们看看:《CPU眼里的C/C》 文章目录 进程的虚拟地址空间和布局进程虚拟地址空间…

布隆过滤器和黑名单,解决Redis缓存穿透

目录 1.什么是布隆过滤器? 2.布隆过滤器的原理 3.空间计算 4.布隆过滤器的视线场景: 5.在Spring Boot中集成Redisson实现布隆过滤器 6、Redisson实现布隆过滤器 6.1导入依赖 6.2使用 布隆过滤器(Bloom Filter)是1970年由布…

邮件大附件系统如何进行安全、高效的大附件发送?

邮件大附件系统是一套解决传统电子邮件系统,在发送大文件时遇到限制的解决方案。由于传统电子邮件系统通常对附件大小有限制,这使得发送大文件变得困难。邮件大附件系统通过各种技术手段,允许用户发送超过传统限制的大文件,通常在…

修改latex中block中公式与block标题间隔过大的问题

修改block中公式与block间隔过大的问题 如图的block中公式出现了空白:代码见下方 \begin{proof}[证明]\begin{align*}&Z\alpha \beta _XX\beta _YY\varepsilon \rightarrow XZ\alpha X\beta _XX^2\beta _YXY\varepsilon X&\\&E\left( Z \right) \alpha \beta _XE\…

【Java】Java中栈溢出的常见情况及解决方法

人不走空 🌈个人主页:人不走空 💖系列专栏:算法专题 ⏰诗词歌赋:斯是陋室,惟吾德馨 目录 🌈个人主页:人不走空 💖系列专栏:算法专题 ⏰诗词歌…

同创优配正规炒股A股三大指数集体收涨 创指重回1900点关口

查查配5月9日电 周四,A股三大指数震荡上扬。截至收盘,上证指数涨0.83%,报3154.32点;深证成指涨1.55%,报9788.07点;创业板指涨1.87%,报1900.01点。总体上个股涨多跌少,全市场超4200只个股上涨。沪深两市今日成交额9011亿元,较上个交易日放量367亿元。 同创优配是AAA 级诚信经营…

Spring JdbcTemplate实现自定义动态sql拼接功能

需求描述: sql 需要能满足支持动态拼接,包含 查询字段、查询表、关联表、查询条件、关联表的查询条件、排序、分组、去重等 实现步骤: 1,创建表及导入测试数据 CREATE TABLE YES_DEV.T11 (ID BINARY_BIGINT NOT NULL,NAME VARCH…

栈结构(c语言)

1.栈的概念 栈:一种特殊的线性表,其只允许在固定的一端进行插入和删除元素操作。进行数据插入和删除操作的一端称为栈顶,另一端称为栈底。栈中的数据元素遵守后进先出LIFO(Last In First Out)的原则。 压栈&am…
最新文章