Linux 搭建 Kafka 环境 - 详细教程

目录

一. Kafka介绍

1. 应用场景

2. 版本对比

二. Kafka安装

1. 前置环境

(1)安装JDK

2. 软件安装

(3)环境变量配置

(3)服务启动

三. Console测试

基础命令

(1)列出Kafka集群中所有存在的主题

(3)创建一个新的主题

(3)删除主题

(4)描述主题

(5)启动生产者

(6)启动消费者

四. 注册系统服务

1. Systemd服务配置

2. Kafka服务控制


一. Kafka介绍

Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。该项目的目标是为处理实时数据提供一个统一、高吞吐、低延迟的平台。其持久化层本质上是一个“按照分布式事务日志架构的大规模发布/订阅消息队列”,这使它作为企业级基础设施来处理流式数据非常有价值。

1. 应用场景

Kafka可以看作是一个能够处理消息队列的中间件,适用于实时的流数据处理,主要用于平衡好生产者和消费者之间的关系。

  • 生产者

生产者可以看作是数据源,可以来自于日志采集框架,如Flume,也可以来自于其它的流数据服务。当接收到数据后,将根据预设的Topic暂存在Kafka中等待消费。对于接收到的数据将会有额外的标记,用于记录数据的被消费【使用】情况。

  • 消费者

消费者即数据的使用端,可以是一个持久化的存储结构,如Hadoop,也可以直接接入支持流数据计算的各种框架,如Spark - Streaming。消费者可以有多个,通过订阅不同的Topic来获取数据。

2. 版本对比

Kafka的0.x和1.x可以看作是上古版本了,最近的更新也是几年以前,从目前的场景需求来看,也没有什么特别的理由需要使用到这两个版本了。

  • 2.x

在进行版本选择时,通常需要综合考虑整个数据流所设计到的计算框架和存储结构,来确定开发成本以及兼容性。目前2.x版本同样是一个可以用于生产环境的版本,并且保持着对Scala最新版本的编译更新。

  • 3.x

3.x是目前最新的稳定版,需要注意的是,Kafka的每个大版本之间的差异较大,包括命令参数以及API调用,所以在更换版本前需要做好详细的调查与准备,本文以3.x的安装为例。

二. Kafka安装

解压安装的操作方式可以适用于各种主流Linux操作系统,只需要解决好前置环境问题。

1. 前置环境

此前,运行Kafka需要预先安装Zookeeper。在Kafka 2.8.0版本以后,引入了Kraft(Kafka Raft)模式,可以使Kafka在不依赖外部Zookeeper的前提下运行。除此之外Kafka由Scala语言编写,需要JVM的运行环境。

(1)安装JDK

 Ubuntu/Debian:

sudo apt install openjdk-8-jdk

  CentOS/RedHat:

sudo yum install java-1.8.0-openjdk

安装完成后可以使用java-version命令验证【可省去环境变量配置】。

2. 软件安装

  • 下载Kafka ,链接如下:
# 离线下载安装包
https://downloads.apache.org/kafka/3.5.2/kafka_2.12-3.5.2.tgz

# 在线利用wget远程下载​
wget https://downloads.apache.org/kafka/3.5.2/kafka_2.12-3.5.2.tgz
  • 解压安装  
tar -zvxf kafka_2.12-3.5.2.tgz

(3)环境变量配置

需要在环境变量中指定Kafka的安装目录以及命令文件所在目录,系统环境变量与用户环境变量配置其中之一即可。

/etc/profile 文件最下方添加如下两行命令--配置全局环境。

export KAFKA_HOME=/home/ygsj/Config_files/kafka_server/kafka_2.12-3.5.2
export PATH=$PATH:$KAFKA_HOME/bin

在文件结尾添加以上内容后执行source命令,使其立即生效。

source /etc/profile

[Ubuntu/Debian] source ~/.bashrc

[CentOS/RedHat] source ~/.bash_profile

执行后可以输入kafka,然后按Tab尝试补全【需要按多次】,如果出现命令列表则证明配置成功。

(3)服务启动

使用Kraft模式,则需要先进行集群初始化【即使是单个节点】,以下为操作步骤:

  • 目录下创建 kafka-logs文件夹

  • 修改配置文件

修改Kafka的/config/kraft/server.properties文件,更换其中的log.dirs目录指向创建目录,防止默认的/tmp被清空:

log.dirs=/home/ygsj/Config_files/kafka_server/kafka-logs

  • 创建Kafka的集群ID 
KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"

调用 kafka-storage.sh 生成一个UUID

将获得的 UUID 放到 kafka_2.12-3.5.2/config/kraft/server.properties 文件中 如下:

相同文件内修改:远程连接开启 (红框内写服务器ip)---自己测试0.0.0.0无效

进入到Kafka的家目录后,执行以下命令 

bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server.properties

# bin/kafka-server-start.sh config/kraft/server.properties

 这种方式并不是后台运行,需要保证终端开启,等测试稳定后可以在后台执行或者注册为系统服务。 

三. Console测试

基础命令

(1)列出Kafka集群中所有存在的主题

kafka-topics.sh --list --bootstrap-server localhost:9092

--bootstrap-server localhost:9092 指定了Kafka集群的连接地址(在这里是本地的Kafka服务器)
如果集群中没有主题,命令不会返回任何内容
当你创建主题后,这条命令会返回集群中存在的主题列表

(3)创建一个新的主题

kafka-topics.sh --create --topic my-topic --bootstrap-server localhost:9092

这条命令用于创建一个名为 my-topic 的新主题。
--create 指定了创建操作。
--topic my-topic 指定了要创建的主题名称。
--bootstrap-server localhost:9092 指定了Kafka集群的连接地址。
Created topic my-topic. 表示主题 my-topic 已成功创建。

(3)删除主题

kafka-topics.sh --delete  --topic my-topic --bootstrap-server localhost:9092

--delete: 指定要删除一个主题。
--topic my-topic: 指定要删除的主题名称是 my-topic。
--bootstrap-server localhost:9092: 指定Kafka集群的连接地址(在此是本地的Kafka服务器)。

(4)描述主题

 kafka-topics.sh --describe  --topic my-topic --bootstrap-server localhost:9092

获取指定主题 my-topic 的详细信息。
--describe 指定了描述操作。
--topic my-topic 指定了要描述的主题名称。
--bootstrap-server localhost:9092 指定了Kafka集群的连接地址。

(5)启动生产者

kafka-console-producer.sh --bootstrap-server localhost:9092 --topic my-topic

启动一个基于console的生产者脚本,可以方便的进行数据输入的测试,直接进行数据输入即可。

(6)启动消费者

 kafka-console-consumer.sh --help  打印所有参数

kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my-topic --from-beginning

添加from-beginning参数来从头消费数据。

四. 注册系统服务

 为了方便的控制Kafka服务的启动和停止,可以将其注册为系统服务。

1. Systemd服务配置

创建Systemd服务文件

sudo vim /etc/systemd/system/kafka.service

在文件中添加以下内容,需要手动替换ExecStartExecStop中关于路径的部分:

[Unit]
Description=Apache Kafka
Requires=network.target remote-fs.target
After=network.target remote-fs.target
[Service]
Type=simple
ExecStart=/home/ygsj/Config_files/kafka_server/kafka_2.12-3.5.2/bin/kafka-server-start.sh /home/ygsj/Config_files/kafka_server/kafka_2.12-3.5.2/config/kraft/server.properties
ExecStop=/home/ygsj/Config_files/kafka_server/kafka_2.12-3.5.2/bin/kafka-server-stop.sh
Restart=on-abnormal
[Install]
WantedBy=multi-user.target

 重新加载Systemd配置 

sudo systemctl daemon-reload

2. Kafka服务控制

  • 开机自动启动
sudo systemctl enable kafka.service
  • 启动Kafka服务
sudo systemctl start kafka.service
  • 检查Kafka状态 
sudo systemctl status kafka.service

  • 停止Kafka服务
sudo systemctl stop kafka.service
  • 重启Kafka服务
sudo systemctl restart kafka.service

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/780976.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

PLC电源模块

PM电源模块 为CPU信号模块及 其他的扩展设备、其他用电设备(如传感器)提供工作供电 接线和开关 状态显示 灯的闪烁示意看手册 PS电源模块 为CPU信号模块及其他的扩展设备提供工作供电。PS(System Power Supply) 外形与PM电源模块类似,状…

PLC【搭建服务端】

PLC搭建服务端 文章目录 PLC搭建服务端前言一、搭建PLC服务器二、打开ModSean32获取数据总结 前言 使用博图V16编写PLC搭建服务器,使用 ModSean32 读取其中数据 一、搭建PLC服务器 打开博图V16点击项目进行新建,编辑好项目名称、及项目路径&#xff0c…

Netty 启动源码阅读

文章目录 1. 入门2. Netty 代码实例3. Netty bind3.1 initAndRegister3.1.1 newChannel, 创建 NioServerSocketChannel3.1.2 init(channel); 初始化 NioServerSocketChannel3.1.3 register 注册channel 3.2 doBind0 绑定端口3.3 ServerBootstrapAcceptor 1. 入门 主从Reactor模…

MATLAB制作一个简单的函数绘制APP

制作一个函数绘制APP,输入函数以及左右端点,绘制出函数图像。 编写回调函数: 结果:

HTML 【实用教程】(2024最新版)

核心思想 —— 语义化 【面试题】如何理解 HTML 语义化 ?仅通过标签便能判断内容的类型,特别是区分标题、段落、图片和表格 增加代码可读性,让人更容易读懂对SEO更加友好,让搜索引擎更容易读懂 html 文件的基本结构 html 文件的文件后缀为 …

移动硬盘“需格式化”预警:专业数据恢复指南

移动硬盘“需格式化”危机:了解背后的真相 在日常的数字生活中,移动硬盘作为我们存储重要数据的“保险箱”,其稳定性与安全性直接关系到我们信息的完整与便捷访问。然而,当您尝试打开移动硬盘时,屏幕上赫然出现的“需…

科技赋能智慧应急:“数字孪生+无人机”在防汛救灾中的应用

近期,全国多地暴雨持续,“麻辣王子工厂停工”“水上派出所成水上的派出所了”等相关词条冲上热搜,让人们看到了全国各地城市内涝、洪涝带来的严重灾情。暴雨带来的影响可见一斑,潜在的洪水、泥石流、山体滑坡等地质灾害更应提高警…

aardio —— 今日减bug

打字就减bug 鼠标双击也减bug 看看有多少bug够你减的 使用方法: 1、将资源附件解压缩,里面的文件夹,放到aardio\plugin\plugins 目录 2、aardio 启动插件 → 插件设置 → 选中“今日减bug” → 保存。 3、重启 aardio,等aa…

BUUCTF[PWN][fastbin attack]

fastbin_attack例题 题目:[BUUCTF在线评测 (buuoj.cn)](https://buuoj.cn/challenges#[ZJCTF 2019]EasyHeap) 整体思路:利用编辑时edit_heap函数的栈溢出漏洞,覆盖heaparray中的栈指针指向free的got表,将其改为system的plt表&…

make工具

1、什么是make? make是个命令,是个可执行程序,是个工具,用来解析Makefile文件的命令,这个命令存放在/usr/bin/目录下 -rwxr-xr-x 1 root root 250K 2月 15 2022 make -rwxr-xr-x 1 root root 4.8K 2月 15 2022 ma…

Linux_实现简易日志系统

目录 1、认识可变参数 2、解析可变参数 3、打印可变参数 3.1 va_list 3.2 va_start 3.3 va_arg 3.4 va_end 3.5 小结 4、实现日志 4.1 日志左半部分 4.2 日志右半部分 4.3 日志的存档归类 结语 前言: 在Linux下实现一个日志系统,该日…

Open3D 删除点云中重叠的点(方法二)

目录 一、概述 1.1原理 1.2应用 二、代码实现 三、实现效果 3.1原始点云 3.2处理后点云 3.3数据对比 一、概述 在点云处理中,重叠点(即重复点)可能会对数据分析和处理的结果产生负面影响。因此,删除重叠点是点云预处理中常…

一招解决找不到d3dcompiler43.dll,无法继续执行代码问题

当您的电脑遇到d3dcompiler43.dll缺失问题时,首先需要了解d3dcompiler43.dll文件及其可能导致问题的原因,之后便可以选择合适的解决方案。在此,我们将会为您提供寻找d3dcompiler43.dll文件的多种处理方法。 一、d3dcompiler43.dll文件分析 d…

【C++第十课 - stack_queue】stack、queue的使用、适配器模型stack、queue和priority_queue的底层实现、deque

目录 一、stack使用1、push2、pop3、empty4、top题目1、最小栈2、栈的压入、弹出序3、逆波兰表达式求值 二、queue的使用priority_queue习题 三、适配器stack的底层实现queue的底层实现priority_queue的底层实现仿函数/函数对象函数指针 四、deque 一、stack使用 stack是个容器…

【74LS163做24进制计数器】2021-11-19

缘由用74LS163做24进制计数器-其他-CSDN问答,仿真multisim两个74LS163芯片如何构成47进制计数器-吐槽问答-CSDN问答 参考74ls163中文资料汇总(74ls163引脚图及功能_内部结构图及应用电路) - 电子发烧友网

1.pwn的汇编基础(提及第一个溢出:整数溢出)

汇编掌握程度 能看懂就行,绝大多数情况不需要真正的编程(shellcode题除外) 其实有时候也不需要读汇编,ida F5 通常都是分析gadget,知道怎么用, 调试程序也不需要分析每一条汇编指令,单步执行然后查看寄存器状态即可 但…

【Python机器学习】模型评估与改进——多分类指标

多分类问题的所有指标基本是上都来自于二分类问题,但是要对所有类别进行平均。多分类的精度被定义为正确分类的样本所占的比例。同样,如果类别是不平衡的,精度并不是很好的评估度量。 想象一个三分类问题,其中85%的数据点属于类别…

Java(七)——多态

个人简介 👀个人主页: 前端杂货铺 ⚡开源项目: rich-vue3 (基于 Vue3 TS Pinia Element Plus Spring全家桶 MySQL) 🙋‍♂️学习方向: 主攻前端方向,正逐渐往全干发展 &#x1…

Go语言如何入门,有哪些书推荐?

Go 语言之所以如此受欢迎,其编译器功不可没。Go 语言的发展也得益于其编译速度够快。 对开发者来说,更快的编译速度意味着更短的反馈周期。大型的 Go 应用程序总是能在几秒钟之 内完成编译。而当使用 go run编译和执行小型的 Go 应用程序时,其…

VMware虚拟机搭建CentOS7环境

相关资料 安装VMware 双击VMware-workstation(16.1.1软件安装包.exe安装文件,点下一步 激活码文件复制激活码激活安装linux 1、点击创建虚拟机