背景:之前资产信息用网络接口进行数据推送,但是接口推送需要验证而且反应较慢。Kafak中间件提供了另一种可行的数据推送方式,它可以进行消息队列推送,且反应速度快。但是Kafka需部署在公网环境,并进行登录验证,如果部署Kafka后未设置登录验证,会被恶意扫描到,此时向Kafka里面push超过1G的文件,将会直接导致Kafka宕掉。
为了避免Kafka宕掉,建议在使用Kafka过程中配置登录验证,以下是Kafka登录验证的配置过程。
1、修改kafka配置文件 vim /asop/kafka/kafka_2.11-2.1.0/config/server.properties
broker.id=0
listeners=SASL_PLAINTEXT://:9092
advertised.listeners=SASL_PLAINTEXT://1.1.1.1:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/asop/kafka/logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
#使用的认证协议
security.inter.broker.protocol=SASL_PLAINTEXT
#SASL机制
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN
#完成身份验证的类
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
#如果没有找到ACL(访问控制列表)配置,则允许任何操作。
allow.everyone.if.no.acl.found=false
#需要开启设置超级管理员,设置visitor用户为超级管理员
super.users=User:visitor
2、为server创建登录验证文件,可以根据自己爱好命名文件,如vim /asop/kafka/kafka_2.11-2.1.0/config/kafka_server_jaas.conf ,文件内容如下:
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="visitor"
password="qaz@123"
user_visitor="qaz@123";
};
3、修改kafka安装目录 vim /asop/kafka/kafka_2.11-2.1.0/bin/kafka-server-start.sh,在文件最上面添加变量
export KAFKA_OPTS=" -Djava.security.auth.login.config=/asop/kafka/kafka_2.11-2.1.0/config/kafka_server_jaas.conf"
4、为consumer和producer创建登录验证文件,可以根据爱好命名文件,如kafka_client_jaas.conf,文件内容如下(如果是程序访问,如springboot访问,可以不配置)
vim /asop/kafka/kafka_2.11-2.1.0/config/kafka_client_jaas.conf
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="visitor"
password="qaz@123";
};
5、在consumer.properties和producer.properties里分别加上如下配置:
vim /asop/kafka/kafka_2.11-2.1.0/config/consumer.properties
vim /asop/kafka/kafka_2.11-2.1.0/config/producer.properties
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
6、修改kafka安装目录 bin/kafka-console-producer.sh和bin/kafka-console-consumer.sh,在文件最上面添加变量
vim /asop/kafka/kafka_2.11-2.1.0/bin/kafka-console-producer.sh
export KAFKA_OPTS=" -Djava.security.auth.login.config=/asop/kafka/kafka_2.11-2.1.0/config/kafka_client_jaas.conf"
vim /asop/kafka/kafka_2.11-2.1.0/bin/kafka-console-consumer.sh
export KAFKA_OPTS=" -Djava.security.auth.login.config=/asop/kafka/kafka_2.11-2.1.0/config/kafka_client_jaas.conf"
7、分别启动zookeeper和kafka,至此服务端kafka用户登录验证配置完成(先关闭kafka后关闭zookeeper)
关闭服务kafka
# /asop/kafka/kafka_2.11-2.1.0/bin/kafka-server-stop.sh -daemon /asop/kafka/kafka_2.11-2.1.0/config/server.properties
关闭服务zookeeper-3.4.13
/asop/zk/zookeeper-3.4.13/bin/zkServer.sh stop /asop/zk/zookeeper-3.4.13/conf/zoo.cfg
启动服务zookeeper-3.4.13
/asop/zk/zookeeper-3.4.13/bin/zkServer.sh start /asop/zk/zookeeper-3.4.13/conf/zoo.cfg
启动服务kafka
#/asop/kafka/kafka_2.11-2.1.0/bin/kafka-server-start.sh -daemon /asop/kafka/kafka_2.11-2.1.0/config/server.properties
8、创建及查看主题
/asop/kafka/kafka_2.11-2.1.0/bin/kafka-console-producer.sh --broker-list 1.1.1.1:9092 --topic cmdb --producer-property security.protocol=SASL_PLAINTEXT --producer-property sasl.mechanism=PLAIN
接收消息
/asop/kafka/kafka_2.11-2.1.0/bin/kafka-console-consumer.sh --bootstrap-server 1.1.1.1:9092 --topic cmdb --from-beginning --consumer-property security.protocol=SASL_PLAINTEXT --consumer-property sasl.mechanism=PLAIN
错误信息