实战指南-构建安全可靠的Kafka集群并配置SASL/PLAIN认证
引言 近年来,随着大数据和分布式系统的快速发展,数据传输和处理的需求也越来越大。在这个背景下,Apache Kafka作为一款高性能、高可靠性的分布式流媒体平台,吸引了众多企业和开发者的关注。与此同时,随着安全和数据保护意识的提升,数据传输的安全性也变得尤为重要。正因如此,深入了解Kafka集群的安装和认证机制显得尤为重要。本文将为大家介绍如何搭建一个具有SASL认证机制的Kafka集群环境,并分享一些实验经验和注意事项。
本文主要以3虚拟机测试环境安装了JDK 11, Zookeeper 3.8.3和Kafka 2.7.2集群,同时演示怎么来通过配置简单SASL认证后,怎么通过客户端来SASL密码方式登录Kafka服务器集群。在实践操作之前对Kafka、Kafka认证方式及SASL认证进行了简单的介绍。然而,仅仅通过用户名和密码进行连接认证是远远不够的,在实际应用中还是根据具体需求而选择不同的Kafka认证方式。
1 Kafka认证
关于Kafka
Apache Kafka是一种分布式流处理平台,它具有高吞吐量、扩展性和可靠性的特点。作为一个强大的分布式消息传递系统,Kafka被广泛应用于各种场景,如实时数据流处理、日志收集、事件驱动架构等。它通过高吞吐量、可靠性和可扩展性,为企业提供了处理大规模实时数据流的解决方案,满足了现代数据处理的需求。
随着数据传输和存储的增加,保护数据安全和隐私成为组织和个人的首要任务。Kafka提供了多种安全功能来确保数据的机密性、完整性和可用性。
关于Kafka认证
Kafka提供了多种认证方式来保护集群的安全性,包括以下几种常见的认证方式:
SSL/TLS认证:Kafka支持使用SSL/TLS协议对网络通信进行加密和认证。通过使用SSL/TLS证书对客户端和服务器进行身份验证,可以确保通信的机密性和完整性。
SASL/PLAIN认证:SASL/PLAIN是一种基于用户名密码的认证机制,可用于Kafka和其他应用程序之间的认证。它要求客户端提供用户名和密码来进行身份验证,并使用安全的方式进行传输。
SASL/GSSAPI(Kerberos)认证:SASL/GSSAPI认证是一种基于Kerberos的单点登录认证机制。它使用Kerberos票据来进行身份验证,通过集成现有的企业身份验证系统,实现了强大的安全性和无缝的用户体验。
自定义认证插件:Kafka还提供了自定义认证插件的扩展机制,允许用户根据特定需求实现自己的认证机制。通过自定义认证插件,可以与现有的身份验证系统或者第三方认证服务进行集成。
Kafka的认证方式的实现原理主要涉及以下几个步骤:
配置设置:通过Kafka的配置文件,设置认证方式和相关参数,如SSL证书的路径、用户名密码的配置等。
证书生成和配置:对于SSL/TLS认证,需要生成证书和私钥,并将其配置到Kafka的服务器端和客户端。证书用于对通信进行加密和认证。
用户认证信息配置:对于SASL/PLAIN和SASL/GSSAPI认证,需要在Kafka的配置文件中配置用户的认证信息,如用户名和密码,或者Kerberos票据。
服务器端验证:服务器端接收到客户端的连接请求后,根据配置的认证方式进行验证。对于SSL/TLS认证,服务器会验证客户端的证书;对于SASL/PLAIN和SASL/GSSAPI认证,服务器会验证客户端提供的用户名和密码或Kerberos票据。
客户端验证:客户端与服务器建立连接后,根据配置的认证方式提供相应的证书、用户名密码或Kerberos票据,进行身份验证。
通过以上步骤,Kafka能够实现不同的认证方式来保护集群的安全性。根据具体的需求和环境,选择合适的认证方式并正确配置,可以确保Kafka集群的安全访问和数据传输。
关于,
GNU SASL是简单身份验证和安全层框架以及一些常见SASL机制的实现。SASL被网络服务器(例如IMAP、SMTP、XMPP)用来请求来自客户端的身份验证,并在客户端中用来对服务器进行身份验证。
SASL是一个挑战-响应框架。在这里,服务器向客户机发出请求,客户机根据该请求发送响应。质询和响应是任意长度的字节数组,因此可以携带任何特定于机制的数据。工作简单原理图如下:
关于Kafka SASL认证
Kafka使用Java认证和授权服务(JAAS)进行SASL配置。
Kafka代理的JAAS配置: KafkaServer部分是每个KafkaServer/Broker使用的JAAS文件中的节名。为代理提供SASL配置选项,包括代理为进行代理间通信而建立的任何SASL客户机连接。如果多个侦听器配置为使用SASL,则可以使用侦听器名称的小写前缀加上句号,例如
sasl_ssl.KafkaServer
。 Client部分用于验证与zookeeper的SASL连接。它还允许代理在zookeeper节点上设置SASL ACL,从而锁定这些节点,以便只有代理可以修改它。有必要在所有代理中使用相同的主体名称。如果你想使用一个节名而不是Client,设置系统属性zookeeper.sasl.clientconfig
为合适的名称(例如,-Dzookeeper.sasl.clientconfig=ZkClient
)。Kafka客户端的JAAS配置: 客户端可以使用客户端配置属性
sasl.jaas.config
或使用类似于代理的静态JAAS配置文件来配置JAAS。
本文中后续以一个简单的例子较直观来解析kafka服务器端和客户端的配置连接。
2 安装Kafka集群
接下来例子以3个虚拟机Linux环境下,Kafka版本选择的是2.7的版本,在安装Kafka软件分别在3台机器上安装JDK和Zookeeper并配置zk正确运行下再安装Kafka。该环境下三台机器和IP地址分别如下:
VM65201 192.3.65.201
VM65202 192.3.65.202
VM65203 192.3.65.203
准备工作
下载所需的软件包:
JDK 11: https://www.oracle.com/java/technologies/javase/jdk11-archive-downloads.html 。选项“Linux x64 Compressed Archive”处进行 download
jdk-11.0.20_linux-x64_bin.tar.gz
Zookeeper 3.8.3:https://zookeeper.apache.org/releases.html
apache-zookeeper-3.8.3-bin.tar.gz
Kafka 2.7:官网下载 https://kafka.apache.org/downloads
kafka_2.13-2.7.2.tgz
安装Java(三台机器均需要执行)
--root下运行cd /usr/local
ls -lh jdk-11.0.20_linux-x64_bin.tar.gz
tar -xf jdk-11.0.20_linux-x64_bin.tar.gz
cat >> /etc/profile <<EOFexport JAVA_HOME=/usr/local/jdk-11.0.20export PATH=\$JAVA_HOME/bin:\$PATHexport CLASSPATH=.:\$JAVA_HOME/lib:\$CLASSPATHEOF--确认java安装状况# source /etc/profile# java -versionjava version "11.0.20" 2023-07-18 LTS
Java(TM) SE Runtime Environment 18.9 (build 11.0.20+9-LTS-256)Java HotSpot(TM) 64-Bit Server VM 18.9 (build 11.0.20+9-LTS-256, mixed mode)
安装和配置Zookeeper集群
在三台机器上相同操作分别安装zookeeper软件,上传安装包到
/usr/local
,此处将软件安装到/usr/local
下。
--root用户执行cd /usr/local/
ls -lh apache-zookeeper-3.8.3-bin.tar.gz
tar -xf apache-zookeeper-3.8.3-bin.tar.gz
ls -l apache-zookeeper-3.8.3-bin/
--准备日志和数据目录
mkdir -p apache-zookeeper-3.8.3-bin/data apache-zookeeper-3.8.3-bin/logs
三台机器上相同操作分别配置
zoo.cfg
cat > apache-zookeeper-3.8.3-bin/conf/zoo.cfg <<EOF
tickTime=2000
initLimit=10
syncLimit=5
dataLogDir=/usr/local/apache-zookeeper-3.8.3-bin/logs
dataDir=/usr/local/apache-zookeeper-3.8.3-bin/data
clientPort=2181
server.1=192.3.65.201:2888:3888
server.2=192.3.65.202:2888:3888
server.3=192.3.65.203:2888:3888
EOF
三台机器分别配置不同的myid
VM65201:/usr/local # echo "1" > /usr/local/apache-zookeeper-3.8.3-bin/data/myid
VM65202:/usr/local # echo "2" > /usr/local/apache-zookeeper-3.8.3-bin/data/myid
VM65203:/usr/local # echo "3" > /usr/local/apache-zookeeper-3.8.3-bin/data/myid
启动zookeeer集群
# /usr/local/apache-zookeeper-3.8.3-bin/bin/zkServer.sh startZooKeeper JMX enabled by default
Using config: /usr/local/apache-zookeeper-3.8.3-bin/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
--三台机器上分别查看状态
VM65201:/usr/local # /usr/local/apache-zookeeper-3.8.3-bin/bin/zkServer.sh statusZooKeeper JMX enabled by default
Using config: /usr/local/apache-zookeeper-3.8.3-bin/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost. Client SSL: false.
Mode: follower
VM65202:/usr/local # /usr/local/apache-zookeeper-3.8.3-bin/bin/zkServer.sh statusZooKeeper JMX enabled by default
Using config: /usr/local/apache-zookeeper-3.8.3-bin/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost. Client SSL: false.
Mode: follower
VM65203:/usr/local # /usr/local/apache-zookeeper-3.8.3-bin/bin/zkServer.sh statusZooKeeper JMX enabled by default
Using config: /usr/local/apache-zookeeper-3.8.3-bin/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost. Client SSL: false.
Mode: leader
安装和配置Kafka集群
在三台机器上相同操作分别安装Kafka软件,上传安装包到
/usr/local
,此处将软件安装到/usr/local
下。
--root用户下执行cd /usr/local
ls -lh kafka_2.13-2.7.2.tgz
tar -zxf kafka_2.13-2.7.2.tgz
mkdir -p kafka_2.13-2.7.2/logs
三台机器上准备不同的配置文件。如果不配置listeners参数,只能本地
localhost:9092
端口访问。
# cp kafka_2.13-2.7.2/config/server.properties kafka_2.13-2.7.2/config/server.properties.org
--节点1
cat > kafka_2.13-2.7.2/config/server.properties <<EOF
broker.id=1
listeners=PLAINTEXT://192.3.65.201:9092
log.dirs=/usr/local/kafka_2.13-2.7.2/logs
zookeeper.connect=192.3.65.201:2181,192.3.65.202:2181,192.3.65.203:2181
EOF
--节点2
cat > kafka_2.13-2.7.2/config/server.properties <<EOF
broker.id=2
listeners=PLAINTEXT://192.3.65.202:9092
log.dirs=/usr/local/kafka_2.13-2.7.2/logs
zookeeper.connect=192.3.65.201:2181,192.3.65.202:2181,192.3.65.203:2181
EOF
--节点3
cat > kafka_2.13-2.7.2/config/server.properties <<EOF
broker.id=3
listeners=PLAINTEXT://192.3.65.203:9092
log.dirs=/usr/local/kafka_2.13-2.7.2/logs
zookeeper.connect=192.3.65.201:2181,192.3.65.202:2181,192.3.65.203:2181
EOF
启动Kafka集群
# kafka_2.13-2.7.2/bin/kafka-server-start.sh -daemon kafka_2.13-2.7.2/config/server.properties
--如果失败 可以去除daemon通过前台启动或者查看logs/下日志
# kafka_2.13-2.7.2/bin/kafka-server-start.sh kafka_2.13-2.7.2/config/server.properties
...
3 SASL/PLAIN认证
默认连接操作
通过Kafka软件下自带的工具连接 - 创建topic
# kafka_2.13-2.7.2/bin/kafka-topics.sh --list --bootstrap-server 192.3.65.201:9092,192.3.65.202:9092,192.3.65.203:9092
# kafka_2.13-2.7.2/bin/kafka-topics.sh --create --bootstrap-server 192.3.65.201:9092,192.3.65.202:9092,192.3.65.203:9092 --replication-factor 3 --partitions 1 --topic test1
Created topic test1.
# kafka_2.13-2.7.2/bin/kafka-topics.sh --list --bootstrap-server 192.3.65.201:9092,192.3.65.202:9092,192.3.65.203:9092
test1
配置SASL/PLAIN认证
zookeeper上准备授权用户 admin,密码
admin2023
。
--其中一台机器上登录zk
VM65201:/usr/local # apache-zookeeper-3.8.3-bin/bin/zkCli.sh
...
[zk: localhost:2181(CONNECTED) 0] create /digest user
Created /digest
[zk: localhost:2181(CONNECTED) 1] addauth digest admin:admin2023
[zk: localhost:2181(CONNECTED) 2] setAcl /digest auth:admin:admin2023:crwda
[zk: localhost:2181(CONNECTED) 3] quit
三台机器上分别修改配置
--root用户# cp kafka_2.13-2.7.2/config/server.properties kafka_2.13-2.7.2/config/server-sasl.properties# vi kafka_2.13-2.7.2/config/server-sasl.properties --修改listener中PLIANTEXT为SASL_PLAINTEXT#listeners=PLAINTEXT://192.3.65.201:9092listeners=SASL_PLAINTEXT://:9092
--增加如下内容指定认证方式
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN
--指定超级管理员用户
super.users=User:admin
准备服务器端jaas文件,并指定用户名和密码。
--三台服务器均需执行
cat >> kafka_2.13-2.7.2/config/kafka-server-jaas.conf <<EOF
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin2023"
user_admin="admin2023";
};
EOF
重新启动kafka服务器,三台服务器均需执行。
--停止服务器# kafka_2.13-2.7.2/bin/kafka-server-stop.sh --修改启动脚本# cp kafka_2.13-2.7.2/bin/kafka-server-start.sh kafka_2.13-2.7.2/bin/kafka-server-start-sasl.sh # vi kafka_2.13-2.7.2/bin/kafka-server-start-sasl.sh exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@"-->改为exec $base_dir/kafka-run-class.sh $EXTRA_ARGS -Djava.security.auth.login.config=/usr/local/kafka_2.13-2.7.2/config/kafka-server-jaas.conf kafka.Kafka "$@"--启动带SASL的服务器# kafka_2.13-2.7.2/bin/kafka-server-start-sasl.sh kafka_2.13-2.7.2/config/server-sasl.properties ...
--后台启动# kafka_2.13-2.7.2/bin/kafka-server-start-sasl.sh -daemon kafka_2.13-2.7.2/config/server-sasl.properties # jps23638 QuorumPeerMain27133 Kafka27151 Jps
另外一台有Kafka客户端的机器上配置Kafka客户端认证文件并登录集群
--由于登录Kafka认证会严重服务器主机名,所以需先配置/etc/hosts
# cat >>/etc/hosts <<EOF
192.3.65.201 VM65201
192.3.65.202 VM65202
192.3.65.203 VM65203
EOF
--不配置/etc/hosts会出现如下报错
[2023-11-08 09:37:58,961] WARN [Controller id=1, targetBrokerId=2] Error connecting to node VM65202:9092 (id: 2 rack: null) (org.apache.kafka.clients.NetworkClient)
java.net.UnknownHostException: VM65202: Name or service not known
--配置客户端配置
# cat > /tmp/jaas.conf <<EOF
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin2023";
};
EOF
# cat > /tmp/security-client.properties <<EOF
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
EOF
# bin/kafka-topics.sh --list --bootstrap-server 192.3.65.201:9092,192.3.65.202:9092,192.3.65.203:9092 --command-config /tmp/security-client.properties
test1
# bin/kafka-topics.sh --describe --bootstrap-server 192.3.65.201:9092,192.3.65.202:9092,192.3.65.203:9092 --command-config /tmp/security-client.properties --topic test1
Topic: test1 PartitionCount: 1 ReplicationFactor: 3 Configs:
Topic: test1 Partition: 0 Leader: 3 Replicas: 3,2,1 Isr: 3,1,2
--向topic生成数据
# bin/kafka-console-producer.sh --bootstrap-server 192.3.65.201:9092,192.3.65.202:9092,192.3.65.203:9092 --topic test1 --producer-property security.protocol=SASL_PLAINTEXT --producer-property sasl.mechanism=PLAIN
>abc
>xyz
>this is test
# bin/kafka-console-consumer.sh --bootstrap-server 192.3.65.201:9092,192.3.65.202:9092,192.3.65.203:9092 --topic test1 --consumer-property security.protocol=SASL_PLAINTEXT --consumer-property sasl.mechanism=PLAIN --from-beginning
abc
xyz
this is test
>^C
通过SASL/PLAIN认证连接
解释SASL/PLAIN认证的概念和工作原理 提供Kafka配置文件的修改示例,启用和配置SASL/PLAIN认证
总结
通过本文的实验和操作指导,我们成功搭建了一个具有SASL认证机制的Apache Kafka集群环境。我们首先安装了Java、Zookeeper和Kafka,然后配置了SASL认证,并进行了客户端和服务器的连接认证。在此过程中,我们深入理解了Kafka集群的安装和认证机制,并且获取了一些实践经验和注意事项。
参考文档
GUN SASL库:https://www.gnu.org/software/gsasl/
An Introduction to Java SASL: https://www.baeldung.com/java-sasl
Apache Kafka官网:http://kafka.apache.org/