侧边栏壁纸
  • 累计撰写 47 篇文章
  • 累计创建 22 个标签
  • 累计收到 27 条评论

目 录CONTENT

文章目录

Kafka 配置sasl_plain及sasl_ssl认证模式

vchopin
2023-02-22 / 0 评论 / 0 点赞 / 562 阅读 / 11,215 字

Kafka 配置sasl_plain及sasl_ssl认证模式

1. 安装kafka

1.1 配置Java环境

Kafka 是一个分布式流媒体平台,类似于消息队列或企业消息传递系统。Kafka底层是由依赖Java的Scala语言编写,故Kafka同样依赖于Java环境。

使用java -version查看本机Java环境安装情况。如果不存在,请自行安装Java环境。

image-20230222104928434

1.2 配置Kafka环境

1.2.1 下载并解压kafka

kafka1.0.1之前的版本需要安装zookeeper才能使用,但当前大部分使用kafka都已经是自带zookeeper的版本,此处不再赘述。

image-20230222105631549

1.2.2 修改server.properties文件

server.properties是kafka启动的配置文件,首次启动需要对其进行修改满足要求。

  1. 输入vim config/server.properties
  2. 配置broker.id=0,这代表分布式中的唯一识别序列号,单机默认为0即可
  3. 配置listeners=PLAINTEXT://:9092这是配置kafka的监听端口,类似于tomcat的8080
  4. 配置advertised.listeners=PLAINTEXT://172.18.61.18:9092这是配置的对外发布端口,类似于代理服务器,最后会把请求转发到第3步配置的listeners
  5. 配置log.dirs=xxx自定义kafka运行时日志保存目录,建议与bin目录并行,日志文件最好配置绝对路径,文件夹会自动创建。
  6. 配置zookeeper连接段口:zookeeper.connect=localhost:2181

1.2.3 修改zookeeper.properties文件

zookeeper.properties是zookeeper启动的配置文件,首次启动需要对其进行修改以满足要求:

  1. 配置clientPort=2181满足上述kafka的zookeeper连接端口。
  2. 配置dataDir=xxx自定义zookeeper数据目录。

1.3 启动kafka

1.3.1 启动zookeeper

kafka目录下的bin自带了多个使用命令,可以直接使用。

后台启动zookeeper:./bin/zookeeper-server-start.sh ./config/zookeeper.properties &

image-20230222111119517

1.3.2 启动kafka

类似的启动kafka进程:./bin/kafka-server-start.sh ./config/server.properties &

image-20230222111435720

停止kafka进程命令类似:./bin/kafka-server-stop.sh ./config/server.properties &

1.4 使用kafka

1.4.1 创建topic

使用bin下命令创建一个test topic:

./bin/kafka-topic.sh --bootstrap--server localhost:9092 --create --topic test --replication-factor 1 --partitions 1

image-20230222111729889

1.4.2 启动命令行producer

使用bin下命令创建一个console的producer:

./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

image-20230222111847771

1.4.3 启动命令行consumer

使用bin下命令创建一个console的consumer:

./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

image-20230222111927930image-20230222204648097

1.4.4 使用java构造producer

1.4.5 使用java构造consumer

2. 配置sasl_plain认证模式

2.1 新建kafka_client_jaas.conf

通过vim ./config/kafka_client_jaas.conf新建kafka的client认证文件

内容如下:

KafkaClient {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="test"
    password="test123";
};

2.2 新建kafka_server_jaas.conf

通过vim ./config/kafka_server_jaas.conf新建kafka的server认证文件

内容如下:

KafkaServer {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="admin"
    password="admin-kafka"
    user_admin="admin-kafka"
    user_test="test123";
};
Client {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="client"
    password="client-kafka";
};

2.3 新建zookeeper_server_jaas.conf

通过vim ./config/zookeeper_server_jaas.conf新建zookeeper的server认证文件

内容如下:

Server {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="admin"
    password="admin-kafka"
    user_client="client-kafka";
};

2.4 修改kafka附加配置文件

  1. 修改./bin/zookeeper-server-start.sh,在export KAFKA_HEAP_OPTS="-Xmx512M -Xms512M"后面增加 -Djava.security.auth.login.config=./config/zookeeper_server_jaas.conf
  2. 修改./bin/kafka-server-start.sh,在export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"后面增加 -Djava.security.auth.login.config=./config/kafka_server_jaas.conf
  3. 修改./bin/kafka-console-consumer.sh,在export KAFKA_HEAP_OPTS="-Xmx512M"后面增加 -Djava.security.auth.login.config=./config/kafka_client_jaas.conf
  4. 修改./bin/kafka-console-producer.sh,在export KAFKA_HEAP_OPTS="-Xmx512M"后面增加 -Djava.security.auth.login.config=./config/kafka_client_jaas.conf

2.5 修改kafka配置文件

  1. 修改./config/consumer.properties,增加以下内容:

    security.protocol=SASL_PLAINTEXT
    sasl.mechanism=PLAIN
    
    
  2. 修改./config/producer.properties,增加以下内容:

    security.protocol=SASL_PLAINTEXT
    sasl.mechanism=PLAIN
    
    
  3. 修改./config/zookeeper.properties

    authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
    requireClientAuthScheme=sasl
    jaasLoginRenew=3600000
    
    
  4. 修改config/server.properties:

    修改:listeners=SASL_PLAINTEXT://192.168.85.13:16667
    添加以下配置:
    #使用的认证协议
    security.inter.broker.protocol=SASL_PLAINTEXT
    #SASL机制
    sasl.enabled.mechanisms=PLAIN
    sasl.mechanism.inter.broker.protocol=PLAIN
    #完成身份验证的类
    authorizer.class.name=kafka.security.authorizer.AclAuthorizer
    # 部分新版本的kafka身份控制更换为:
    #authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
    #如果没有找到ACL(访问控制列表)配置,则允许任何操作。
    #allow.everyone.if.no.acl.found=true
    super.users=User:admin
    delete.topic.enable=true
    auto.create.topics.enable=false
    

2.6 重新启动kafka

此时重新启动kafka,当重新利用之前的方式启动producer或者consumer时会失败,无法进入。定义如下配置文件./config/custom.properties

sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="test" password="test123";
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN

​ 此时重新执行进入producer的代码:

./bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --producer-config config/jaas.properties --topic test

即可正常进入producer,consumer同理。

3. 配置sasl_ssl认证模式

3.1 生成服务端密钥

  1. 使用java自带的key_tool进行密钥对的生成:
    keytool -keystore server.keystore.jks -alias localhost -validity 365 -genkey

    image-20230222204648097-1677073570710

    其中需要输入密码,后续密码较多,建议统一设置成同一个。

  2. (重点)自定义生成CA,该CA由自己生成,任何组织都可以申请CA。使用openssl生成CA证书:

    openssl req -new -x509 -keyout ca-key -out ca-cert -days 365

    image-20230222204945848

    此时也需要输入CA证书的密码。

  3. 将上述生成的CA分别导入到server和client的truststore中。

    keytool -keystore server.truststore.jks -alias CARoot -import -file ca-cert
    keytool -keystore client.truststore.jks -alias CARoot -import -file ca-cert

    image-20230222205152783

    需要输入1.中设置的keystore的密码。

  4. 从keystore导出证书,并用上述步骤生成的CA来签名生成的证书:

    keytool -keystore server.keystore.jks -alias localhost -certreq -file cert-file

    image-20230222205444022

    需要验证keystore在1.中设置的密码。

    利用ca-key签名:

    openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days 365 -CAcreateserial -passin pass:test123

    image-20230222205547692

    最后的passin需要输入生成CA的密码。

  5. 将上述签名后的证书导入到keystore中。

    keytool -keystore server.keystore.jks -alias CARoot -import -file ca-cert
    keytool -keystore server.keystore.jks -alias localhost -import -file cert-signed

    image-20230222205849890

3.2 生成客户端密钥

此时生成的客户端密钥与alias映射的名称有关。这里生成localhost的客户端密钥库:

keytool -keystore client.keystore.jks -alias localhost -validity 365 -keyalg RSA -genkey

image-20230222210331387

3.3 配置zookeeper认证

  1. 修改zk的附加配置文件 vim config/zookeeper_server_jaas.conf,将其中的Server修改如下:

    Server {
        org.apache.zookeeper.server.auth.DigestLoginModule required
        username="admin"
        password="admin-kafka"
        user_client="client-kafka";
    };
    
  2. 修改zk的配置文件zookeeper.properties,在其后增加如下:

    authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
    requireClientAuthScheme=sasl
    jaasLoginRenew=3600000
    

3.4 配置kafka认证

  1. 修改kafka的附加配置文件vim kafka-server-jaas.conf,修改如下:

    Server {
        org.apache.kafka.common.security.plain.PlainLoginModule required
        username="admin"
        password="admin-kafka"
        user_admin="admin-kafka"
        user_test="test123";
    };
    Client {
        org.apache.kafka.common.security.plain.PlainLoginModule required
        username="client"
        password="client-kafka";
    };
    
    

    启动时需要指定启动附加配置文件。

  2. 修改consumer和producer的等client的附加配置文件vim kafka-client-jaas.conf,修改如下:

    Client {
        org.apache.kafka.common.security.plain.PlainLoginModule required
        username="test"
        password="test123";
    };
    

    启动时需要指定启动附加配置文件。

  3. 修改server.properties,增加sasl_ssl相关配置:

    security.inter.broker.protocol=SASL_SSL
    sasl.enabled.mechanisms=PLAIN
    sasl.mechanism.inter.broker.protocol=PLAIN
    allow.everyone.if.no.acl.found=true
    ssl.keystore.location=/root/worksapce/cma/ssl/server.keystore.jks
    ssl.keystore.password=test123
    ssl.key.password=test123
    ssl.truststore.location=/root/worksapce/cma/ssl/server.truststore.jks
    ssl.truststore.password=test123
    ssl.endpoint.identification.algorithm=
    

    同时需要修改listeners监听的协议为:

    listeners=SASL_SSL://:9092

3.5 信任第三方证书

使用keytool直接导入证书即可:

keytool -keystore client.truststore.jks -alias Venus -import -file ${CA_FILE}

4. 第三方连接工具

4.1 java连接sasl_ssl

需要导入LinkdeIn或者org.apache发布的kafka-clients.jar

4.1.1 Producer

import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class Consumer {
    private static Producer<String, String> producer = null;
    static {
        Properties props = new Properties();
        props.put("acks", "all");

        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "172.18.61.18:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,"SASL_SSL");
        props.put(SaslConfigs.SASL_MECHANISM,"PLAIN");
        props.put(SaslConfigs.SASL_JAAS_CONFIG,"org.apache.kafka.common.security.plain.PlainLoginModule required username='test' password='test123';");
        props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG,"");
        props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,"client.truststore.jks");
        props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG,"test123");
        producer = new KafkaProducer<String, String>(props);
    }

    static void sendMsg(String topic, String value){
        // 4. 调用 send 方法,发送消息
        producer.send(new ProducerRecord<String, String>(topic,value));
        // 5. 关闭资源
//        producer.close();
    }
    public static void main(String[] args) {
        for (int i = 0; i < 100; i++) {
            System.out.println(i);
            sendMsg("test_sasl","sasl_ssl"+i);
        }
        while (true){}

    }
}

4.1.2 Consumer

import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.config.SaslConfigs;

import java.util.Arrays;
import java.util.Properties;

public class Main {
    private static KafkaConsumer<String, String> consumer = null;
    static {
        Properties props = new Properties();
        props.put("bootstrap.servers", "172.18.61.18:9092");
        props.put("group.id", "group-1");
        props.put("auto.commit.interval.ms", "1000");
        props.put("auto.offset.reset", "earliest");
        props.put("session.timeout.ms", "30000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");


        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);  //关闭自动提交
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,1);
        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,"SASL_SSL");
        props.put(SaslConfigs.SASL_MECHANISM,"PLAIN");
        props.put(SaslConfigs.SASL_JAAS_CONFIG,"org.apache.kafka.common.security.plain.PlainLoginModule required username='kafka' password='qaz!@#123';");


        consumer = new  KafkaConsumer<String, String>(props);
        consumer.subscribe(Arrays.asList("test_sasl"));
    }

    public static void getMsg(){
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(10000);
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d, value = %s", record.offset(), record.value());
                System.out.println();
            }
        }
    }
    public static void main(String[] args) {
        getMsg();

    }
}

4.2 kafak tool连接sasl_ssl

  1. 配置Properties界面

    image-20230222212257409

  2. 配置Security界面

    image-20230222212454939

  3. 配置Advanced界面

    image-20230222212510353

  4. 配置JAAS Config界面

    image-20230222212602507

参考

  1. https://blog.csdn.net/m0_46192647/article/details/123424090
  2. https://www.cnblogs.com/mmzl/articles/14468199.html
  3. https://blog.csdn.net/totoro1992/article/details/127089412
0

评论区