CVE-2023-34040 Spring Kafka反序列漏洞分析
2023-9-17 20:1:56 Author: Ots安全(查看原文) 阅读量:37 收藏

根据安全公告中的描述,我们可以简单地获得导致漏洞的一些关键点。

  1. 在配置中将 作为 ErrorHandlingDeserializer Kafka 记录中的键和/或值。

  2. 将布尔类型属性 checkDeserExWhenKeyNull 和/或 checkDeserExWhenValueNull 设置为 true。

  3. 用户可以发布 Kafka 主题,无需任何验证。

0x02 kafka
概念

在深入研究该漏洞之前,我们及时回顾了 Kafka 服务的一些相关概念。

  • Producer:我们调用发布记录卡夫卡主题制作人的对象

  • Topic:记录由 Kafka 服务分类,每个分类都命名为 Topic。

  • Broker:发布的消息存储在一组服务器中,我们称之为Kafka集群。每个服务器都是一个代理。消费者可以获得数据形式 Broker 并使用多个主题。

  • consumer:用于订阅消息并与已发布消息一起处理的对象称为 Kafka topi 消费者。消耗消息是基于主题的。

此外,有必要回顾kafka记录的结构。

Kafka 记录,我们也称它为由标头和正文组成的消息或事件。标头数据实际上等于元数据,包括主题、页码和时间戳等基本元素。它们存储为一对键/值。正文数据通常也是存储为键/值结构的相关业务数据。

复现:

部署Kafka服务之前需要Zookeeper服务器。

1.通过docker安装Zookeeper服务器

docker run -d --name zookeeper -p 2181:2181 -t zookeeper:latest

2.通过docker部署Kafka服务器

docker run  -d --name kafka -p 9092:9092 \-e KAFKA_ZOOKEEPER_CONNECT=192.168.5.102:2181 \-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.5.102:9092 \-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \-e TZ="Asia/Shanghai" \wurstmeister/kafka:latest

3.Spring Boot项目导入受影响的Kafka依赖

受影响版本:

  • 2.8.1至2.9.10

  • 3.0.0 至 3.0.

<dependency>        <groupId>org.springframework.kafka</groupId>        <artifactId>spring-kafka</artifactId>        <version>2.8.11</version>    </dependency>

4.更新配置application.yaml

5.演示类

1)Kafka生产者类

package com.example.SpringKafkaDemo.producer;
import com.example.SpringKafkaDemo.model.KafkaMessage;import org.apache.kafka.clients.producer.ProducerRecord;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.kafka.core.KafkaTemplate;import org.springframework.web.bind.annotation.*;
import java.util.HashMap;
@RestControllerpublic class KafkaProducer {
@Autowired private KafkaTemplate<String, String> kafkaTemplate;
@PostMapping("/message/send") public String sendMessage(@RequestBody KafkaMessage message) {
String topic = message.getTopic(); String data = message.getData();
HashMap<String, String> headers = message.getHeaders();

ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topic, data); for (String s : headers.keySet()) { if (s.equals("springDeserializerExceptionKey")) { String exceptData = headers.get(s); byte[] exceptHandler = KafkaProducer.hexStringtoBytes(exceptData); producerRecord.headers().add(s, exceptHandler); continue; }
producerRecord.headers().add(s, headers.get(s).getBytes()); } kafkaTemplate.send(producerRecord); String jsonString="{\"code\":\"200\", \"status\":\"success\"}";
return jsonString; }

private static byte[] hexStringtoBytes(String hexString) {
byte[] excepetionMessage = new byte[hexString.length() / 2]; for (int i = 0; i < excepetionMessage.length; i++) { excepetionMessage[i] = (byte) Integer.parseInt(hexString.substring(i * 2, i * 2 + 2), 16);
} return excepetionMessage; }}

顺便说一句,这里我们使用了Java语言中的一种设计模式, 模板方法模式在此演示中,我插入了一个名为 的模板kafkaTemplate

代码片段的高亮显示

private KafkaTemplate<StringString> kafkaTemplate;

2)Kafka消费类

package com.example.SpringKafkaDemo.consumer;
import org.springframework.kafka.annotation.KafkaListener;import org.springframework.stereotype.Service;
@Servicepublic class KafkaConsumer {
@KafkaListener(topics = "my-topic", groupId = "my-group-id") public void consume(String message) { System.out.println("Received message: " + message); }
}

3)消费者配置类

package com.example.SpringKafkaDemo.config;
import org.apache.kafka.clients.consumer.ConsumerConfig;import org.springframework.beans.factory.annotation.Value;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.kafka.annotation.EnableKafka;import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;import org.springframework.kafka.core.ConsumerFactory;import org.springframework.kafka.core.DefaultKafkaConsumerFactory;import org.springframework.kafka.support.serializer.JsonDeserializer;
import java.util.HashMap;import java.util.Map;
@Configuration@EnableKafkapublic class KafkaConsumerConfig {
@Value("${spring.kafka.bootstrap-servers}") private String bootstrapServers;
@Value("${spring.kafka.consumer.group-id}") private String groupId;
@Bean public Map<String, Object> consumerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); return props; }
@Bean public ConsumerFactory<String, String> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); }
@Bean public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.getContainerProperties().setCheckDeserExWhenKeyNull(true); factory.getContainerProperties().setCheckDeserExWhenValueNull(true); return factory; }}

根据官方公告中的漏洞描述,我们应该将checkDeserExWhenKeyNullcheckDeserExWhenValueNull属性都设置为true

factory.getContainerProperties().setCheckDeserExWhenKeyNull(true) factory.getContainerProperties().setCheckDeserExWhenValueNull(true)

在函数处设置断点getExceptionFromHeader,然后启动服务器。

进入函数后invokeIfHaveRecords,记录对象将被反序列化。

回到getExceptionFromHeader函数。

该函数将 的值转化springDeserializerExceptionKeyrecord.headers()变量的值headerName并进行传递header

然后将价值交付给byteArrayToDeserializationException职能。

进入byteArrayToDeserializationException功能。

resolveClass函数被重写以限制任意 Java 类反序列化。实际上,我们可以在很多项目中找到防止Java反序列化漏洞的方法,比如Apache Shiro、Fastjson等。

显然,只有类
org.springframework.kafka.support.serializer.DeserializationException可以反序列化。

进入DeserializationException函数,它由四个参数组成。其中之一是cause用于调用实例类。

编写一个恶意类并使其继承父类Throwable

springDeserializerExceptionKey最后,用生成的Java序列化填充JSON数据中的键值。发送HTTP请求后触发远程代码执行。

可点击阅读原文跳转到原文地址

感谢您抽出

.

.

来阅读本文

点它,分享点赞在看都在这里


文章来源: http://mp.weixin.qq.com/s?__biz=MzAxMjYyMzkwOA==&mid=2247501430&idx=1&sn=caab8c05d09ae1b5bedb64046d398315&chksm=9bad8f3dacda062be2416e028db535a774c608b9146f741759401f56b5e7be9a65024e752b20&scene=0&xtrack=1#rd
如有侵权请联系:admin#unsafe.sh