根据安全公告中的描述,我们可以简单地获得导致漏洞的一些关键点。
在配置中将 作为 ErrorHandlingDeserializer
Kafka 记录中的键和/或值。
将布尔类型属性 checkDeserExWhenKeyNull
和/或 checkDeserExWhenValueNull
设置为 true。
用户可以发布 Kafka 主题,无需任何验证。
在深入研究该漏洞之前,我们及时回顾了 Kafka 服务的一些相关概念。
Producer:我们调用发布记录卡夫卡主题制作人的对象
Topic:记录由 Kafka 服务分类,每个分类都命名为 Topic。
Broker:发布的消息存储在一组服务器中,我们称之为Kafka集群。每个服务器都是一个代理。消费者可以获得数据形式 Broker 并使用多个主题。
consumer:用于订阅消息并与已发布消息一起处理的对象称为 Kafka topi 消费者。消耗消息是基于主题的。
此外,有必要回顾kafka记录的结构。
Kafka 记录,我们也称它为由标头和正文组成的消息或事件。标头数据实际上等于元数据,包括主题、页码和时间戳等基本元素。它们存储为一对键/值。正文数据通常也是存储为键/值结构的相关业务数据。
复现:
部署Kafka服务之前需要Zookeeper服务器。
1.通过docker安装Zookeeper服务器
docker run -d --name zookeeper -p 2181:2181 -t zookeeper:latest
2.通过docker部署Kafka服务器
docker run -d --name kafka -p 9092:9092 \
-e KAFKA_ZOOKEEPER_CONNECT=192.168.5.102:2181 \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.5.102:9092 \
-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \
-e TZ="Asia/Shanghai" \
wurstmeister/kafka:latest
3.Spring Boot项目导入受影响的Kafka依赖
受影响版本:
2.8.1至2.9.10
3.0.0 至 3.0.
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.8.11</version>
</dependency>
4.更新配置application.yaml
5.演示类
1)Kafka生产者类
package com.example.SpringKafkaDemo.producer;
import com.example.SpringKafkaDemo.model.KafkaMessage;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.*;
import java.util.HashMap;
@RestController
public class KafkaProducer {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@PostMapping("/message/send")
public String sendMessage(@RequestBody KafkaMessage message) {
String topic = message.getTopic();
String data = message.getData();
HashMap<String, String> headers = message.getHeaders();
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topic, data);
for (String s : headers.keySet()) {
if (s.equals("springDeserializerExceptionKey")) {
String exceptData = headers.get(s);
byte[] exceptHandler = KafkaProducer.hexStringtoBytes(exceptData);
producerRecord.headers().add(s, exceptHandler);
continue;
}
producerRecord.headers().add(s, headers.get(s).getBytes());
}
kafkaTemplate.send(producerRecord);
String jsonString="{\"code\":\"200\", \"status\":\"success\"}";
return jsonString;
}
private static byte[] hexStringtoBytes(String hexString) {
byte[] excepetionMessage = new byte[hexString.length() / 2];
for (int i = 0; i < excepetionMessage.length; i++) {
excepetionMessage[i] = (byte) Integer.parseInt(hexString.substring(i * 2, i * 2 + 2), 16);
}
return excepetionMessage;
}
}
顺便说一句,这里我们使用了Java语言中的一种设计模式, 模板方法模式。在此演示中,我插入了一个名为 的模板kafkaTemplate
。
代码片段的高亮显示
private KafkaTemplate<String, String> kafkaTemplate;
2)Kafka消费类
package com.example.SpringKafkaDemo.consumer;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Service
public class KafkaConsumer {
@KafkaListener(topics = "my-topic", groupId = "my-group-id")
public void consume(String message) {
System.out.println("Received message: " + message);
}
}
3)消费者配置类
package com.example.SpringKafkaDemo.config;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import java.util.HashMap;
import java.util.Map;
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.consumer.group-id}")
private String groupId;
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
return props;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setCheckDeserExWhenKeyNull(true);
factory.getContainerProperties().setCheckDeserExWhenValueNull(true);
return factory;
}
}
根据官方公告中的漏洞描述,我们应该将checkDeserExWhenKeyNull
和checkDeserExWhenValueNull
属性都设置为true。
factory.getContainerProperties().setCheckDeserExWhenKeyNull(true)
factory.getContainerProperties().setCheckDeserExWhenValueNull(true)
在函数处设置断点getExceptionFromHeader
,然后启动服务器。
进入函数后invokeIfHaveRecords
,记录对象将被反序列化。
回到getExceptionFromHeader
函数。
该函数将 的值转化springDeserializerExceptionKey
为record.headers()
变量的值headerName
并进行传递header
。
然后将价值交付给byteArrayToDeserializationException
职能。
进入byteArrayToDeserializationException
功能。
该resolveClass
函数被重写以限制任意 Java 类反序列化。实际上,我们可以在很多项目中找到防止Java反序列化漏洞的方法,比如Apache Shiro、Fastjson等。
显然,只有类org.springframework.kafka.support.serializer.DeserializationException
可以反序列化。
进入DeserializationException
函数,它由四个参数组成。其中之一是cause
用于调用实例类。
编写一个恶意类并使其继承父类Throwable
。
springDeserializerExceptionKey
最后,用生成的Java序列化填充JSON数据中的键值。发送HTTP请求后触发远程代码执行。
可点击阅读原文跳转到原文地址
感谢您抽出
.
.
来阅读本文
点它,分享点赞在看都在这里