Kafka 配置sasl_plain及sasl_ssl认证模式
1. 安装kafka
1.1 配置Java环境
Kafka 是一个分布式流媒体平台,类似于消息队列或企业消息传递系统。Kafka底层是由依赖Java的Scala语言编写,故Kafka同样依赖于Java环境。
使用java -version
查看本机Java环境安装情况。如果不存在,请自行安装Java环境。
1.2 配置Kafka环境
1.2.1 下载并解压kafka
kafka1.0.1之前的版本需要安装zookeeper才能使用,但当前大部分使用kafka都已经是自带zookeeper的版本,此处不再赘述。
-
解压kafka:
tar -zxvf kafka_2.12-3.2.3.tgz
1.2.2 修改server.properties文件
server.properties
是kafka启动的配置文件,首次启动需要对其进行修改满足要求。
- 输入
vim config/server.properties
- 配置
broker.id=0
,这代表分布式中的唯一识别序列号,单机默认为0即可 - 配置
listeners=PLAINTEXT://:9092
这是配置kafka的监听端口,类似于tomcat的8080 - 配置
advertised.listeners=PLAINTEXT://172.18.61.18:9092
这是配置的对外发布端口,类似于代理服务器,最后会把请求转发到第3步配置的listeners
。 - 配置
log.dirs=xxx
自定义kafka运行时日志保存目录,建议与bin
目录并行,日志文件最好配置绝对路径,文件夹会自动创建。 - 配置zookeeper连接段口:
zookeeper.connect=localhost:2181
1.2.3 修改zookeeper.properties文件
zookeeper.properties
是zookeeper启动的配置文件,首次启动需要对其进行修改以满足要求:
- 配置
clientPort=2181
满足上述kafka的zookeeper连接端口。 - 配置
dataDir=xxx
自定义zookeeper数据目录。
1.3 启动kafka
1.3.1 启动zookeeper
kafka目录下的bin自带了多个使用命令,可以直接使用。
后台启动zookeeper:./bin/zookeeper-server-start.sh ./config/zookeeper.properties &
1.3.2 启动kafka
类似的启动kafka进程:./bin/kafka-server-start.sh ./config/server.properties &
停止kafka进程命令类似:./bin/kafka-server-stop.sh ./config/server.properties &
1.4 使用kafka
1.4.1 创建topic
使用bin
下命令创建一个test
topic:
./bin/kafka-topic.sh --bootstrap--server localhost:9092 --create --topic test --replication-factor 1 --partitions 1
1.4.2 启动命令行producer
使用bin
下命令创建一个console的producer:
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
1.4.3 启动命令行consumer
使用bin
下命令创建一个console的consumer:
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
1.4.4 使用java构造producer
1.4.5 使用java构造consumer
2. 配置sasl_plain认证模式
2.1 新建kafka_client_jaas.conf
通过vim ./config/kafka_client_jaas.conf
新建kafka的client认证文件
内容如下:
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="test"
password="test123";
};
2.2 新建kafka_server_jaas.conf
通过vim ./config/kafka_server_jaas.conf
新建kafka的server认证文件
内容如下:
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-kafka"
user_admin="admin-kafka"
user_test="test123";
};
Client {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="client"
password="client-kafka";
};
2.3 新建zookeeper_server_jaas.conf
通过vim ./config/zookeeper_server_jaas.conf
新建zookeeper的server认证文件
内容如下:
Server {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-kafka"
user_client="client-kafka";
};
2.4 修改kafka附加配置文件
- 修改
./bin/zookeeper-server-start.sh
,在export KAFKA_HEAP_OPTS="-Xmx512M -Xms512M"
后面增加-Djava.security.auth.login.config=./config/zookeeper_server_jaas.conf
- 修改
./bin/kafka-server-start.sh
,在export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
后面增加-Djava.security.auth.login.config=./config/kafka_server_jaas.conf
- 修改
./bin/kafka-console-consumer.sh
,在export KAFKA_HEAP_OPTS="-Xmx512M"
后面增加-Djava.security.auth.login.config=./config/kafka_client_jaas.conf
- 修改
./bin/kafka-console-producer.sh
,在export KAFKA_HEAP_OPTS="-Xmx512M"
后面增加-Djava.security.auth.login.config=./config/kafka_client_jaas.conf
2.5 修改kafka配置文件
-
修改
./config/consumer.properties
,增加以下内容:security.protocol=SASL_PLAINTEXT sasl.mechanism=PLAIN
-
修改
./config/producer.properties
,增加以下内容:security.protocol=SASL_PLAINTEXT sasl.mechanism=PLAIN
-
修改
./config/zookeeper.properties
:authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider requireClientAuthScheme=sasl jaasLoginRenew=3600000
-
修改
config/server.properties
:修改:listeners=SASL_PLAINTEXT://192.168.85.13:16667 添加以下配置: #使用的认证协议 security.inter.broker.protocol=SASL_PLAINTEXT #SASL机制 sasl.enabled.mechanisms=PLAIN sasl.mechanism.inter.broker.protocol=PLAIN #完成身份验证的类 authorizer.class.name=kafka.security.authorizer.AclAuthorizer # 部分新版本的kafka身份控制更换为: #authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer #如果没有找到ACL(访问控制列表)配置,则允许任何操作。 #allow.everyone.if.no.acl.found=true super.users=User:admin delete.topic.enable=true auto.create.topics.enable=false
2.6 重新启动kafka
此时重新启动kafka,当重新利用之前的方式启动producer或者consumer时会失败,无法进入。定义如下配置文件./config/custom.properties
:
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="test" password="test123";
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
此时重新执行进入producer的代码:
./bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --producer-config config/jaas.properties --topic test
即可正常进入producer,consumer同理。
3. 配置sasl_ssl认证模式
3.1 生成服务端密钥
-
使用
java
自带的key_tool
进行密钥对的生成:
keytool -keystore server.keystore.jks -alias localhost -validity 365 -genkey
其中需要输入密码,后续密码较多,建议统一设置成同一个。
-
(重点)自定义生成CA,该CA由自己生成,任何组织都可以申请CA。使用
openssl
生成CA证书:openssl req -new -x509 -keyout ca-key -out ca-cert -days 365
此时也需要输入CA证书的密码。
-
将上述生成的CA分别导入到server和client的truststore中。
keytool -keystore server.truststore.jks -alias CARoot -import -file ca-cert
keytool -keystore client.truststore.jks -alias CARoot -import -file ca-cert
需要输入1.中设置的keystore的密码。
-
从keystore导出证书,并用上述步骤生成的CA来签名生成的证书:
keytool -keystore server.keystore.jks -alias localhost -certreq -file cert-file
需要验证keystore在1.中设置的密码。
利用ca-key签名:
openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days 365 -CAcreateserial -passin pass:test123
最后的passin需要输入生成CA的密码。
-
将上述签名后的证书导入到keystore中。
keytool -keystore server.keystore.jks -alias CARoot -import -file ca-cert
keytool -keystore server.keystore.jks -alias localhost -import -file cert-signed
3.2 生成客户端密钥
此时生成的客户端密钥与alias映射的名称有关。这里生成localhost的客户端密钥库:
keytool -keystore client.keystore.jks -alias localhost -validity 365 -keyalg RSA -genkey
3.3 配置zookeeper认证
-
修改zk的附加配置文件
vim config/zookeeper_server_jaas.conf
,将其中的Server修改如下:Server { org.apache.zookeeper.server.auth.DigestLoginModule required username="admin" password="admin-kafka" user_client="client-kafka"; };
-
修改zk的配置文件
zookeeper.properties
,在其后增加如下:authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider requireClientAuthScheme=sasl jaasLoginRenew=3600000
3.4 配置kafka认证
-
修改kafka的附加配置文件
vim kafka-server-jaas.conf
,修改如下:Server { org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin-kafka" user_admin="admin-kafka" user_test="test123"; }; Client { org.apache.kafka.common.security.plain.PlainLoginModule required username="client" password="client-kafka"; };
启动时需要指定启动附加配置文件。
-
修改consumer和producer的等client的附加配置文件
vim kafka-client-jaas.conf
,修改如下:Client { org.apache.kafka.common.security.plain.PlainLoginModule required username="test" password="test123"; };
启动时需要指定启动附加配置文件。
-
修改server.properties,增加sasl_ssl相关配置:
security.inter.broker.protocol=SASL_SSL sasl.enabled.mechanisms=PLAIN sasl.mechanism.inter.broker.protocol=PLAIN allow.everyone.if.no.acl.found=true ssl.keystore.location=/root/worksapce/cma/ssl/server.keystore.jks ssl.keystore.password=test123 ssl.key.password=test123 ssl.truststore.location=/root/worksapce/cma/ssl/server.truststore.jks ssl.truststore.password=test123 ssl.endpoint.identification.algorithm=
同时需要修改listeners监听的协议为:
listeners=SASL_SSL://:9092
3.5 信任第三方证书
使用keytool直接导入证书即可:
keytool -keystore client.truststore.jks -alias Venus -import -file ${CA_FILE}
4. 第三方连接工具
4.1 java连接sasl_ssl
需要导入LinkdeIn
或者org.apache
发布的kafka-clients.jar
4.1.1 Producer
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class Consumer {
private static Producer<String, String> producer = null;
static {
Properties props = new Properties();
props.put("acks", "all");
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "172.18.61.18:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,"SASL_SSL");
props.put(SaslConfigs.SASL_MECHANISM,"PLAIN");
props.put(SaslConfigs.SASL_JAAS_CONFIG,"org.apache.kafka.common.security.plain.PlainLoginModule required username='test' password='test123';");
props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG,"");
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,"client.truststore.jks");
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG,"test123");
producer = new KafkaProducer<String, String>(props);
}
static void sendMsg(String topic, String value){
// 4. 调用 send 方法,发送消息
producer.send(new ProducerRecord<String, String>(topic,value));
// 5. 关闭资源
// producer.close();
}
public static void main(String[] args) {
for (int i = 0; i < 100; i++) {
System.out.println(i);
sendMsg("test_sasl","sasl_ssl"+i);
}
while (true){}
}
}
4.1.2 Consumer
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.config.SaslConfigs;
import java.util.Arrays;
import java.util.Properties;
public class Main {
private static KafkaConsumer<String, String> consumer = null;
static {
Properties props = new Properties();
props.put("bootstrap.servers", "172.18.61.18:9092");
props.put("group.id", "group-1");
props.put("auto.commit.interval.ms", "1000");
props.put("auto.offset.reset", "earliest");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false); //关闭自动提交
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,1);
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,"SASL_SSL");
props.put(SaslConfigs.SASL_MECHANISM,"PLAIN");
props.put(SaslConfigs.SASL_JAAS_CONFIG,"org.apache.kafka.common.security.plain.PlainLoginModule required username='kafka' password='qaz!@#123';");
consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Arrays.asList("test_sasl"));
}
public static void getMsg(){
while (true) {
ConsumerRecords<String, String> records = consumer.poll(10000);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, value = %s", record.offset(), record.value());
System.out.println();
}
}
}
public static void main(String[] args) {
getMsg();
}
}
4.2 kafak tool连接sasl_ssl
-
配置Properties界面
-
配置Security界面
-
配置Advanced界面
-
配置JAAS Config界面
评论区