码迷,mamicode.com
首页 > 编程语言 > 详细

基于 Spring functional 的Secured Kafka (kerberos) configuration

时间:2020-11-25 12:45:59      阅读:10      评论:0      收藏:0      [点我收藏+]

标签:消息   broker   链接   bin   col   keytab   bindings   runtime   latest   

基于kerberos 加密的kafka消息我们在用spring binder链接的时候配置应当如下:

#input
spring.cloud.stream.bindings.process-in-0.destination=input-topic
spring.cloud.stream.bindings.process-in-0.binder=kafka1
spring.cloud.stream.bindings.process-in-0.group=groupId1
spring.cloud.stream.bindings.process-in-0.consumer.startOffset=latest
spring.cloud.stream.binders.kafka1.type=kafka
spring.cloud.stream.binders.kafka1.environment.spring.cloud.stream.kafka.binder.brokers=host1:port,host2:port
spring.cloud.stream.binders.kafka1.environment.spring.cloud.stream.kafka.binder.autoCreateTopics=false
spring.cloud.stream.binders.kafka1.environment.spring.cloud.stream.kafka.binder.configuration.security.protocol=SASL_PLAINTEXT
spring.cloud.stream.binders.kafka1.environment.spring.cloud.stream.kafka.binder.jaas.options.useKeyTab=true
spring.cloud.stream.binders.kafka1.environment.spring.cloud.stream.kafka.binder.jaas.options.serviceName=kafka
spring.cloud.stream.binders.kafka1.environment.spring.cloud.stream.kafka.binder.jaas.options.storeKey=true
spring.cloud.stream.binders.kafka1.environment.spring.cloud.stream.kafka.binder.jaas.options.keyTab=/file/input.keytab
spring.cloud.stream.binders.kafka1.environment.spring.cloud.stream.kafka.binder.jaas.options.principal=user@domain.com
spring.cloud.stream.binders.kafka1.environment.spring.kafka.consumer.value-deserializer=com.kafka.message.serializer.EntityDeserialize
#output
spring.cloud.stream.bindings.process-out-0.destination=output-topic
spring.cloud.stream.bindings.process-out-0.binder=kafka2
spring.cloud.stream.bindings.process-out-0.group=groupId2
spring.cloud.stream.binders.kafka2.type=kafka
spring.cloud.stream.binders.kafka2.environment.spring.cloud.stream.kafka.binder.brokers=host1:port,host2:port
spring.cloud.stream.binders.kafka2.environment.spring.cloud.stream.kafka.binder.autoCreateTopics=false
spring.cloud.stream.binders.kafka2.environment.spring.cloud.stream.kafka.binder.configuration.security.protocol=SASL_PLAINTEXT
spring.cloud.stream.binders.kafka2.environment.spring.cloud.stream.kafka.binder.jaas.options.useKeyTab=true
spring.cloud.stream.binders.kafka2.environment.spring.cloud.stream.kafka.binder.jaas.options.serviceName=kafka
spring.cloud.stream.binders.kafka2.environment.spring.cloud.stream.kafka.binder.jaas.options.storeKey=true
spring.cloud.stream.binders.kafka2.environment.spring.cloud.stream.kafka.binder.jaas.options.keyTab=/file/output.keytab
spring.cloud.stream.binders.kafka2.environment.spring.cloud.stream.kafka.binder.jaas.options.principal=user@domain.com

  

同时要设置java变量

System.setProperty("java.security.krb5.conf", "/file/krb5.conf");
System.setProperty("sun.security.krb5.debug", "true");

 

另外,要序列化kafka message的内容除了

默认提供的基本类型序列化器,我们也可以自定义例如上面配置的

com.kafka.message.serializer.EntityDeserialize
public class EntityDeserialize implements Deserializer {
    private ObjectReader objectReader = new ObjectMapper().readerFor(EntityTest.class);

    @Override
    public EntityTest deserialize(String arg0, byte[] arg1) {
        try {
            return objectReader.readValue(arg1);
        } catch (Exception e) {
            throw new RuntimeException("message deserialize error", e);
        }
    }
}


//对应的functional方法
 @Bean
    public Function<Flux<Message<EntityTest>>, Flux<Message<String>>> process(MessageProcessService messageProcessService) {
        return messageFlux ->MessageProcessService.process(messageFlux);

    }        

  

基于 Spring functional 的Secured Kafka (kerberos) configuration

标签:消息   broker   链接   bin   col   keytab   bindings   runtime   latest   

原文地址:https://www.cnblogs.com/lgtrajectory/p/14017469.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!