ClusterFuzz的bot源码(fuzz engine的选择与调度之libfuzzer)阅读
2021-1-25 00:0:0 Author: www.giantbranch.cn(查看原文) 阅读量:4 收藏

ClusterFuzz的bot源码(fuzz engine的选择与调度之libfuzzer)阅读

发表于 |

上一次我们选择了fuzz task的代码进行阅读,这次我们进一步深入,看看fuzz engine的选择

先回到上次说的引擎类:https://www.giantbranch.cn/2020/05/22/ClusterFuzz%E7%9A%84bot%E6%BA%90%E7%A0%81(fuzz%20task)%E9%98%85%E8%AF%BB/#%E5%BC%95%E6%93%8E%E7%B1%BB

注册的时候当时我们有疑问说怎么没有afl,现在在读,除了有afl了,还有一个blackbox

src/python/bot/fuzzers/init.py

1
2
3
4
5
6
7
def run():
"""Initialise builtin fuzzing engines."""
engine.register('afl', afl_engine.AFLEngine)
engine.register('blackbox', blackbox_engine.BlackboxEngine)
engine.register('honggfuzz', honggfuzz_engine.HonggfuzzEngine)
engine.register('libFuzzer', libFuzzer_engine.LibFuzzerEngine)
engine.register('syzkaller', syzkaller_engine.SyzkallerEngine)

现在fuzz_task整个调用路径是:

1
获取到任务->fuzz_task.py中的execute_task->FuzzingSession->run()->engine.get获取具体的引擎类,调用do_engine_fuzzing(engine_impl)  ->  run_engine_fuzzer(engine_impl, self.fuzz_target.binary, sync_corpus_directory, self.testcase_directory) ->

run_engine_fuzzer中,调用prepare生成FuzzOptions(里面也设置了一些fuzz的策略),之后最后调用下面的函数启动fuzz

1
2
3
4
5
6
7
8
9
options = engine_impl.prepare(sync_corpus_directory, target_path, build_dir)
fuzz_test_timeout = environment.get_value('FUZZ_TEST_TIMEOUT')
additional_processing_time = engine_impl.fuzz_additional_processing_timeout(
options)
......
......
......
result = engine_impl.fuzz(target_path, options, testcase_directory,
fuzz_test_timeout)

即下面的类中的fuzz的函数

1
2
3
4
5
AFLEngine
BlackboxEngine
HonggfuzzEngine
LibFuzzerEngine
SyzkallerEngine

先来看libfuzzer

prepare

先看prepare,首先获取参数

1
arguments = fuzzer.get_arguments(target_path)

参数就是先看看XXX.options文件是否存在(其中XXX为fuzz_target的名字),存在则返回fuzzer_options(类型FuzzerOptions类),通过fuzzer_options.get_engine_arguments(‘libfuzzer’)获取FuzzerArguments(arguments),之后通过获取FuzzerArguments的list方法转化为元素为”-%s=%s”的形式的list,之后就是加上rss_limit_mb设置内存限制,还有timeout的设置

之后获取grammar,这个也是从XXX.options的grammar section中获取的(这个在oss-fuzz中的项目中的options中没找到有这个section,是跟peach相关的,peach的模板的)

1
grammar = fuzzer.get_grammar(target_path)

继续,是生成一个策略池,之后选择策略

1
2
3
4
5
6
strategy_pool = strategy_selection.generate_weighted_strategy_pool(
strategy_list=strategy.LIBFUZZER_STRATEGY_LIST,
use_generator=True,
engine_name=self.name)
strategy_info = libfuzzer.pick_strategies(strategy_pool, target_path,
corpus_dir, arguments, grammar)

generate_weighted_strategy_pool

首先generate_weighted_strategy_pool是根据经验设定好的概率生成策略池

策略列表如下,还是很多的

1
2
3
4
5
6
7
8
9
10
11
12
13
LIBFUZZER_STRATEGY_LIST = [
CORPUS_MUTATION_RADAMSA_STRATEGY,
RANDOM_MAX_LENGTH_STRATEGY,
CORPUS_MUTATION_ML_RNN_STRATEGY,
VALUE_PROFILE_STRATEGY,
FORK_STRATEGY,
CORPUS_SUBSET_STRATEGY,
RECOMMENDED_DICTIONARY_STRATEGY,
DATAFLOW_TRACING_STRATEGY,
MUTATOR_PLUGIN_STRATEGY,
MUTATOR_PLUGIN_RADAMSA_STRATEGY,
PEACH_GRAMMAR_MUTATION_STRATEGY,
]

generate_weighted_strategy_pool函数首先获取环境变量

1
distribution = environment.get_value('STRATEGY_SELECTION_DISTRIBUTION')

之后从STRATEGY_SELECTION_DISTRIBUTION这里随机选取一个作为策略

1
2
strategy_selection = utils.random_weighted_choice(distribution_tuples,
'probability')

,否则使用默认的,调用generate_default_strategy_pool

1
return generate_default_strategy_pool(strategy_list, use_generator)

一开始初始化一个StrategyPool类,之后选择生成器,最后就将LIBFUZZER_STRATEGY_LIST中非GENERATORS的策略都加到策略池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
def generate_default_strategy_pool(strategy_list, use_generator):
"""Return a strategy pool representing a selection of strategies for launcher
to consider.

Select strategies according to default strategy selection method."""
pool = StrategyPool()

# If use_generator is enabled, decide whether to include radamsa, ml rnn,
# or no generator (mutually exclusive).
if use_generator:
choose_generator(pool)

# Decide whether or not to add non-generator strategies according to
# probability parameters.
for value in [
strategy_entry for strategy_entry in strategy_list
if strategy_entry not in GENERATORS
]:
if do_strategy(value):
pool.add_strategy(value)

logs.log('Strategy pool was generated according to default parameters. '
'Chosen strategies: ' + ', '.join(pool.strategy_names))
return pool

这个choose_generator的功能是通过生成随机数,跟radamsa_prob + ml_rnn_prob比较,假如生成的随机数比较大(比radamsa_prob + ml_rnn_prob大),那就都不选择, 不选用radamsa和ml_rnn(机器学习相关的),假如比较小再调用一次decide_with_probability进行选择radamsa或者是ml_rnn

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# /src/python/bot/fuzzers/engine_common.py
def decide_with_probability(probability):
"""Decide if we want to do something with the given probability."""
return random.SystemRandom().random() < probability

# /src/python/bot/fuzzers/strategy_selection.py
def choose_generator(strategy_pool):
"""Chooses whether to use radamsa, ml rnn, or no generator and updates the
strategy pool."""

radamsa_prob = engine_common.get_strategy_probability(
strategy.CORPUS_MUTATION_RADAMSA_STRATEGY.name,
default=strategy.CORPUS_MUTATION_RADAMSA_STRATEGY.probability)

ml_rnn_prob = engine_common.get_strategy_probability(
strategy.CORPUS_MUTATION_ML_RNN_STRATEGY.name,
default=strategy.CORPUS_MUTATION_ML_RNN_STRATEGY.probability)

if engine_common.decide_with_probability(radamsa_prob + ml_rnn_prob):
if engine_common.decide_with_probability(
radamsa_prob / (radamsa_prob + ml_rnn_prob)):
strategy_pool.add_strategy(strategy.CORPUS_MUTATION_RADAMSA_STRATEGY)
else:
strategy_pool.add_strategy(strategy.CORPUS_MUTATION_ML_RNN_STRATEGY)

libfuzzer.pick_strategies

接下来看libfuzzer.pick_strategies,里面就是对各种策略处理,实际将策略需要的工作完成,并返回StrategyInfo

1
2
StrategyInfo(fuzzing_strategies, arguments, additional_corpus_dirs,
extra_env, use_dataflow_tracing, is_mutations_run)

DATAFLOW_TRACING_STRATEGY

对于有DFSAN构建的程序以及策略池中有DATAFLOW_TRACING_STRATEGY,先获取dataflow_binary_path(DFSAN的二进制fuzzer路径),之后判断dataflow_trace_dir是否存在,不存在就不执行这个策略了,存在则添加参数 -data_flow_trace=dataflow_trace_dir,后面再加参数-focus_function=auto,最后将策略的名字添加到fuzzing_strategies

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# Depends on the presense of DFSan instrumented build.
dataflow_build_dir = environment.get_value('DATAFLOW_BUILD_DIR')
use_dataflow_tracing = (
dataflow_build_dir and
strategy_pool.do_strategy(strategy.DATAFLOW_TRACING_STRATEGY))
if use_dataflow_tracing:
dataflow_binary_path = os.path.join(
dataflow_build_dir, os.path.relpath(fuzzer_path, build_directory))
dataflow_trace_dir = dataflow_binary_path + DATAFLOW_TRACE_DIR_SUFFIX
if os.path.exists(dataflow_trace_dir):
arguments.append(
'%s%s' % (constants.DATA_FLOW_TRACE_FLAG, dataflow_trace_dir))
arguments.append('%s%s' % (constants.FOCUS_FUNCTION_FLAG, 'auto'))
fuzzing_strategies.append(strategy.DATAFLOW_TRACING_STRATEGY.name)
else:
logs.log_warn(
'Dataflow trace is not found in dataflow build, skipping strategy.')
use_dataflow_tracing = False

CORPUS_MUTATION

接下来是Generate new testcase mutations的

首先看看strategy_pool中是否有CORPUS_MUTATION_ML_RNN_STRATEGY或者CORPUS_MUTATION_RADAMSA_STRATEGY(ML_RNN的优先级高于RADAMSA),有的话is_mutations_run就为True

is_mutations_run为True,先create_corpus_directory(‘mutations’)创建样本目录,之后生成样本,将使用的策略的名字添加到fuzzing_strategies,最后将new_testcase_mutations_directory添加到additional_corpus_dirs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# Select a generator to attempt to use for existing testcase mutations.
candidate_generator = engine_common.select_generator(strategy_pool,
fuzzer_path)
is_mutations_run = (not environment.is_ephemeral() and
candidate_generator != engine_common.Generator.NONE)


# Generate new testcase mutations using radamsa, etc.
if is_mutations_run:
new_testcase_mutations_directory = create_corpus_directory('mutations')
generator_used = engine_common.generate_new_testcase_mutations(
corpus_directory, new_testcase_mutations_directory,
project_qualified_fuzzer_name, candidate_generator)

# Add the used generator strategy to our fuzzing strategies list.
if generator_used:
if candidate_generator == engine_common.Generator.RADAMSA:
fuzzing_strategies.append(
strategy.CORPUS_MUTATION_RADAMSA_STRATEGY.name)
elif candidate_generator == engine_common.Generator.ML_RNN:
fuzzing_strategies.append(strategy.CORPUS_MUTATION_ML_RNN_STRATEGY.name)

additional_corpus_dirs.append(new_testcase_mutations_directory)

策略的核心函数是generate_new_testcase_mutations,根据candidate_generator使用generate_new_testcase_mutations_using_radamsa(RADAMSA会随机选择corpus_directory中符合大小的样,循环编译2000次)或者generate_new_testcase_mutations_using_ml_rnn函数去生成新的样本,假如生成的样本的数量比原来多,才会返回true

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
def generate_new_testcase_mutations(corpus_directory,
new_testcase_mutations_directory,
fuzzer_name, candidate_generator):
"""Generate new testcase mutations, using existing corpus directory or other
methods.

Returns true if mutations are successfully generated using radamsa or ml rnn.
A false return signifies either no generator use or unsuccessful generation of
testcase mutations."""
generation_timeout = get_new_testcase_mutations_timeout()
pre_mutations_filecount = shell.get_directory_file_count(
new_testcase_mutations_directory)

# Generate new testcase mutations using Radamsa.
if candidate_generator == Generator.RADAMSA:
generate_new_testcase_mutations_using_radamsa(
corpus_directory, new_testcase_mutations_directory, generation_timeout)
# Generate new testcase mutations using ML RNN model.
elif candidate_generator == Generator.ML_RNN:
generate_new_testcase_mutations_using_ml_rnn(
corpus_directory, new_testcase_mutations_directory, fuzzer_name,
generation_timeout)

# If new mutations are successfully generated, return true.
if shell.get_directory_file_count(
new_testcase_mutations_directory) > pre_mutations_filecount:
return True

return False

RANDOM_MAX_LENGTH_STRATEGY

这个就是最大长度策略,首先判断是否已经存在-max_len=参数了,存在就不做任何操作

假如不存在,则生成一个0到10000范围内的数,作为-max_len的值

1
2
3
4
5
6
7
if strategy_pool.do_strategy(strategy.RANDOM_MAX_LENGTH_STRATEGY):
max_len_argument = fuzzer_utils.extract_argument(
existing_arguments, constants.MAX_LEN_FLAG, remove=False)
if not max_len_argument:
max_length = random.SystemRandom().randint(1, MAX_VALUE_FOR_MAX_LENGTH)
arguments.append('%s%d' % (constants.MAX_LEN_FLAG, max_length))
fuzzing_strategies.append(strategy.RANDOM_MAX_LENGTH_STRATEGY.name)

这是推荐字典策略,函数add_recommended_dictionary

1
2
3
4
if (strategy_pool.do_strategy(strategy.RECOMMENDED_DICTIONARY_STRATEGY) and
add_recommended_dictionary(arguments, project_qualified_fuzzer_name,
fuzzer_path)):
fuzzing_strategies.append(strategy.RECOMMENDED_DICTIONARY_STRATEGY.name)

add_recommended_dictionary就是从谷歌云下载recommended_dictionary.dict,假如原来有字典则与原来的字典合并,并使用合并后的字典

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
def add_recommended_dictionary(arguments, fuzzer_name, fuzzer_path):
"""Add recommended dictionary from GCS to existing .dict file or create
a new one and update the arguments as needed.
This function modifies |arguments| list in some cases."""
recommended_dictionary_path = os.path.join(
fuzzer_utils.get_temp_dir(),
dictionary_manager.RECOMMENDED_DICTIONARY_FILENAME)

dict_manager = dictionary_manager.DictionaryManager(fuzzer_name)

try:
# Bail out if cannot download recommended dictionary from GCS.
if not dict_manager.download_recommended_dictionary_from_gcs(
recommended_dictionary_path):
return False
except Exception as ex:
logs.log_error(
'Exception downloading recommended dictionary:\n%s.' % str(ex))
return False

# Bail out if the downloaded dictionary is empty.
if not os.path.getsize(recommended_dictionary_path):
return False

# Check if there is an existing dictionary file in arguments.
original_dictionary_path = fuzzer_utils.extract_argument(
arguments, constants.DICT_FLAG)
merged_dictionary_path = (
original_dictionary_path or
dictionary_manager.get_default_dictionary_path(fuzzer_path))
merged_dictionary_path += MERGED_DICT_SUFFIX

dictionary_manager.merge_dictionary_files(original_dictionary_path,
recommended_dictionary_path,
merged_dictionary_path)
arguments.append(constants.DICT_FLAG + merged_dictionary_path)
return True

VALUE_PROFILE_STRATEGY

这个简单,就是添加参数-use_value_profile=1,下面是帮助信息,应该是使用特殊的值来指导模糊测试

Experimental. Use value profile to guide fuzzing.

1
2
3
if strategy_pool.do_strategy(strategy.VALUE_PROFILE_STRATEGY):
arguments.append(constants.VALUE_PROFILE_ARGUMENT)
fuzzing_strategies.append(strategy.VALUE_PROFILE_STRATEGY.name)

FORK_STRATEGY

这个是fork策略,从MAX_FUZZ_THREADS获取max_fuzz_threads,默认值是1,

-fork=的参数是cpu的核心数除以max_fuzz_threads,最小为1

1
2
3
4
5
6
7
8
9
10
# Do not use fork mode for DFT-based fuzzing. This is needed in order to
# collect readable and actionable logs from fuzz targets running with DFT.
if (not is_fuchsia and not is_android and not is_ephemeral and
not use_dataflow_tracing and
strategy_pool.do_strategy(strategy.FORK_STRATEGY)):
max_fuzz_threads = environment.get_value('MAX_FUZZ_THREADS', 1)
num_fuzz_processes = max(1, utils.cpu_count() // max_fuzz_threads)
arguments.append('%s%d' % (constants.FORK_FLAG, num_fuzz_processes))
fuzzing_strategies.append(
'%s_%d' % (strategy.FORK_STRATEGY.name, num_fuzz_processes))

MUTATOR_PLUGIN_STRATEGY

这个是use_mutator_plugin函数通过设置extra_env['LD_PRELOAD'] = mutator_plugin_path来生效的

1
2
3
4
extra_env = {}
if (strategy_pool.do_strategy(strategy.MUTATOR_PLUGIN_STRATEGY) and
use_mutator_plugin(target_name, extra_env)):
fuzzing_strategies.append(strategy.MUTATOR_PLUGIN_STRATEGY.name)

PEACH_GRAMMAR_MUTATION_STRATEGY

1
2
3
4
5
if (not has_existing_mutator_strategy(fuzzing_strategies) and
strategy_pool.do_strategy(strategy.PEACH_GRAMMAR_MUTATION_STRATEGY) and
use_peach_mutator(extra_env, grammar)):
fuzzing_strategies.append(
'%s_%s' % (strategy.PEACH_GRAMMAR_MUTATION_STRATEGY.name, grammar))

首先假如fuzzing_strategies已经有以下策略的其中一个,就不执行PEACH_GRAMMAR_MUTATION_STRATEGY策略了

1
2
3
4
5
MUTATOR_STRATEGIES = [
strategy.PEACH_GRAMMAR_MUTATION_STRATEGY.name,
strategy.MUTATOR_PLUGIN_STRATEGY.name,
strategy.MUTATOR_PLUGIN_RADAMSA_STRATEGY.name
]

PEACH_GRAMMAR_MUTATION_STRATEGY策略也是通过环境变量来生效的,在use_peach_mutator函数中主要是下面环境变量

1
2
3
4
5
6
7
8
9
10
11
12
13
# Set title and pit environment variables
extra_env['PIT_FILENAME'] = pit_path
extra_env['PIT_TITLE'] = grammar
# Set LD_PRELOAD.
peach_path = os.path.join(unzipped, 'peach_mutator', 'src', 'peach.so')
extra_env['LD_PRELOAD'] = peach_path
# Set Python path.
new_path = [
os.path.join(unzipped, 'peach_mutator', 'src'),
os.path.join(unzipped, 'peach_mutator', 'third_party', 'peach'),
] + sys.path

extra_env['PYTHONPATH'] = os.pathsep.join(new_path)

MUTATOR_PLUGIN_RADAMSA_STRATEGY

这里的逻辑也是跟上面一样,MUTATOR_STRATEGIES其中之一已经存在,就不执行策略了

1
2
3
4
if (not has_existing_mutator_strategy(fuzzing_strategies) and
strategy_pool.do_strategy(strategy.MUTATOR_PLUGIN_RADAMSA_STRATEGY) and
use_radamsa_mutator_plugin(extra_env)):
fuzzing_strategies.append(strategy.MUTATOR_PLUGIN_RADAMSA_STRATEGY.name)

use_radamsa_mutator_plugin函数就是通过环境变量LD_PRELOAD生效的,extra_env['LD_PRELOAD'] = radamsa_path

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
def use_radamsa_mutator_plugin(extra_env):
"""Decide whether to use Radamsa in process. If yes, add the path to the
radamsa shared object to LD_PRELOAD in |extra_env| and return True."""
# Radamsa will only work on LINUX ASAN jobs.
# TODO(mpherman): Include architecture info in job definition and exclude
# i386.
if environment.is_lib() or not is_linux_asan():
return False

radamsa_path = os.path.join(environment.get_platform_resources_directory(),
'radamsa', 'libradamsa.so')

logs.log('Using Radamsa mutator plugin : %s' % radamsa_path)
extra_env['LD_PRELOAD'] = radamsa_path
return True

libfuzzer.pick_strategies之后

展开参数

1
arguments.extend(strategy_info.arguments)

解压corpus

1
2
# Check for seed corpus and add it into corpus directory.
engine_common.unpack_seed_corpus_if_needed(target_path, corpus_dir)

假如策略里面有CORPUS_SUBSET_STRATEGY,选择一些数量的corpus作为初始的corpus

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# Pick a few testcases from our corpus to use as the initial corpus.
subset_size = engine_common.random_choice(
engine_common.CORPUS_SUBSET_NUM_TESTCASES)

if (not strategy_info.use_dataflow_tracing and
strategy_pool.do_strategy(strategy.CORPUS_SUBSET_STRATEGY) and
shell.get_directory_file_count(corpus_dir) > subset_size):
# Copy |subset_size| testcases into 'subset' directory.
corpus_subset_dir = self._create_temp_corpus_dir('subset')
libfuzzer.copy_from_corpus(corpus_subset_dir, corpus_dir, subset_size)
strategy_info.fuzzing_strategies.append(
strategy.CORPUS_SUBSET_STRATEGY.name + '_' + str(subset_size))
strategy_info.additional_corpus_dirs.append(corpus_subset_dir)
else:
strategy_info.additional_corpus_dirs.append(corpus_dir)

存在字典参数,检查字典文件参数,并检查字典是否存在

如果不存在字典参数,则检查%target_binary_name%.dict是否存在

最后还检查字典的格式并尝试修复,比如缺少双引号

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# Check dict argument to make sure that it's valid.
dict_path = fuzzer_utils.extract_argument(
arguments, constants.DICT_FLAG, remove=False)
if dict_path and not os.path.exists(dict_path):
logs.log_error('Invalid dict %s for %s.' % (dict_path, target_path))
fuzzer_utils.extract_argument(arguments, constants.DICT_FLAG)

# If there's no dict argument, check for %target_binary_name%.dict file.
dict_path = fuzzer_utils.extract_argument(
arguments, constants.DICT_FLAG, remove=False)
if not dict_path:
dict_path = dictionary_manager.get_default_dictionary_path(target_path)
if os.path.exists(dict_path):
arguments.append(constants.DICT_FLAG + dict_path)

# If we have a dictionary, correct any items that are not formatted properly
# (e.g. quote items that are missing them).
dictionary_manager.correct_if_needed(dict_path)

prepare函数最后调用process_strategies,返回一个stats,哪个策略开没开,或者策略选择的值,就是strategies变量

1
2
3
4
5
6
strategies = stats.process_strategies(
strategy_info.fuzzing_strategies, name_modifier=lambda x: x)
return LibFuzzerOptions(
corpus_dir, arguments, strategies, strategy_info.additional_corpus_dirs,
strategy_info.extra_env, strategy_info.use_dataflow_tracing,
strategy_info.is_mutations_run)

fuzz的最大时间

从环境变量获取要fuzz的时长,减去在fuzz中的其他操作的时间,比如合并样本,字典分析等

1
2
3
4
5
6
7
8
fuzz_test_timeout = environment.get_value('FUZZ_TEST_TIMEOUT')
additional_processing_time = engine_impl.fuzz_additional_processing_timeout(
options)
fuzz_test_timeout -= additional_processing_time
if fuzz_test_timeout <= 0:
raise FuzzTaskException(
f'Invalid engine timeout: '
f'{fuzz_test_timeout} - {additional_processing_time}')

实际fuzz

实际fuzz就是下面这行

1
2
result = engine_impl.fuzz(target_path, options, testcase_directory,
fuzz_test_timeout)

跟进这个fuzz函数

下面的第一行profiler是性能分析相关的,假如设置了USE_PYTHON_PROFILER,并且不是False,就会启动Google Cloud Profiler

第二行就是通过libfuzzer.get_runner一般正常情况是返回的是LibFuzzerRunner(fuzzer_path)

第三行是设置sanitizer_options,比如exitcode为77

第四行创建一个临时的目录作为corpus的目录,之后地5行跟options.fuzz_corpus_dirs合并变成一个corpus_directories数组

之后调用runner.fuzz,就是实际起fuzz了

fuzz之后就简单概括下:
1、将fuzzer的输出splitlines
2、根据log看看有没有crash,并提取crash的文件的路径
3、如果libfuzzer的返回值非0,但是又没找到crash文件,那么这个应该是启动的时候就崩溃了,这时使用空文件作为crash文件
4、根据log_lines的信息,设置一些stats的值,比如crash_count,slow_unit_count,timeout_count,edges_total等
5、删除一些影响merge和字典分析的参数,比如-fork,-max_len,-runs等
6、给复现crash设置更大的超时时间
7、复制crash文件到主crash目录
8、从log中生成推荐字典
9、返回fuzz的结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
profiler.start_if_needed('libfuzzer_fuzz')
runner = libfuzzer.get_runner(target_path)
libfuzzer.set_sanitizer_options(target_path, fuzz_options=options)

# Directory to place new units.
new_corpus_dir = self._create_temp_corpus_dir('new')

corpus_directories = [new_corpus_dir] + options.fuzz_corpus_dirs
fuzz_result = runner.fuzz(
corpus_directories,
fuzz_timeout=max_time,
additional_args=options.arguments,
artifact_prefix=reproducers_dir,
extra_env=options.extra_env)

log_lines = fuzz_result.output.splitlines()
# Output can be large, so save some memory by removing reference to the
# original output which is no longer needed.
fuzz_result.output = None

# Check if we crashed, and get the crash testcase path.
crash_testcase_file_path = runner.get_testcase_path(log_lines)

# If we exited with a non-zero return code with no crash file in output from
# libFuzzer, this is most likely a startup crash. Use an empty testcase to
# to store it as a crash.
if not crash_testcase_file_path and fuzz_result.return_code:
crash_testcase_file_path = self._create_empty_testcase_file(
reproducers_dir)

# Parse stats information based on libFuzzer output.
parsed_stats = libfuzzer.parse_log_stats(log_lines)

# Extend parsed stats by additional performance features.
parsed_stats.update(
stats.parse_performance_features(log_lines, options.strategies,
options.arguments))

# Set some initial stat overrides.
timeout_limit = fuzzer_utils.extract_argument(
options.arguments, constants.TIMEOUT_FLAG, remove=False)

expected_duration = runner.get_max_total_time(max_time)
actual_duration = int(fuzz_result.time_executed)
fuzzing_time_percent = 100 * actual_duration / float(expected_duration)
parsed_stats.update({
'timeout_limit': int(timeout_limit),
'expected_duration': expected_duration,
'actual_duration': actual_duration,
'fuzzing_time_percent': fuzzing_time_percent,
})

# Remove fuzzing arguments before merge and dictionary analysis step.
merge_arguments = options.arguments[:]
libfuzzer.remove_fuzzing_arguments(merge_arguments, is_merge=True)
self._merge_new_units(target_path, options.corpus_dir, new_corpus_dir,
options.fuzz_corpus_dirs, merge_arguments,
parsed_stats)

fuzz_logs = '\n'.join(log_lines)
crashes = []
if crash_testcase_file_path:
reproduce_arguments = options.arguments[:]
libfuzzer.remove_fuzzing_arguments(reproduce_arguments)

# Use higher timeout for reproduction.
libfuzzer.fix_timeout_argument_for_reproduction(reproduce_arguments)

# Write the new testcase.
# Copy crash testcase contents into the main testcase path.
crashes.append(
engine.Crash(crash_testcase_file_path, fuzz_logs, reproduce_arguments,
actual_duration))

libfuzzer.analyze_and_update_recommended_dictionary(
runner, project_qualified_fuzzer_name, log_lines, options.corpus_dir,
merge_arguments)

return engine.FuzzResult(fuzz_logs, fuzz_result.command, crashes,
parsed_stats, fuzz_result.time_executed)

最后进去runner.fuzz函数看看,首先找到LibFuzzerRunner,发现fuzz函数实际调用的是LibFuzzerCommon.fuzz

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
class LibFuzzerRunner(new_process.UnicodeProcessRunner, LibFuzzerCommon):
"""libFuzzer runner (when minijail is not used)."""

def __init__(self, executable_path, default_args=None):
"""Inits the LibFuzzerRunner.

Args:
executable_path: Path to the fuzzer executable.
default_args: Default arguments to always pass to the fuzzer.
"""
super().__init__(executable_path=executable_path, default_args=default_args)

def fuzz(self,
corpus_directories,
fuzz_timeout,
artifact_prefix=None,
additional_args=None,
extra_env=None):
"""LibFuzzerCommon.fuzz override."""
additional_args = copy.copy(additional_args)
if additional_args is None:
additional_args = []

return LibFuzzerCommon.fuzz(self, corpus_directories, fuzz_timeout,
artifact_prefix, additional_args, extra_env)

找到LibFuzzerCommon.fuzz,里面处理了一下-artifact_prefix ,加上-max_total_time=和-print_final_stats=1,最后再加corpus_directories列表,最后就调用run_and_wait函数了(就是最多等待fuzz_timeout时间就退出,或者libfuzzer自动退出)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
def fuzz(self,
corpus_directories,
fuzz_timeout,
artifact_prefix=None,
additional_args=None,
extra_env=None):
"""Running fuzzing command.

Args:
corpus_directories: List of corpus directory paths to be passed to
libFuzzer.
fuzz_timeout: The maximum time in seconds that libFuzzer is allowed to run
for.
artifact_prefix: The directory to store new fuzzing artifacts (crashes,
timeouts, slow units)
additional_args: A sequence of additional arguments to be passed to the
executable.
extra_env: A dictionary containing environment variables and their values.
These will be added to the environment of the new process.

Returns:
A process.ProcessResult.
"""
additional_args = copy.copy(additional_args)
if additional_args is None:
additional_args = []

max_total_time = self.get_max_total_time(fuzz_timeout)
if any(arg.startswith(constants.FORK_FLAG) for arg in additional_args):
max_total_time -= self.LIBFUZZER_FORK_MODE_CLEAN_EXIT_TIME
assert max_total_time > 0

# Old libFuzzer jobs specify -artifact_prefix through additional_args
if artifact_prefix:
additional_args.append(
'%s%s' % (constants.ARTIFACT_PREFIX_FLAG,
self._normalize_artifact_prefix(artifact_prefix)))

additional_args.extend([
'%s%d' % (constants.MAX_TOTAL_TIME_FLAG, max_total_time),
constants.PRINT_FINAL_STATS_ARGUMENT,
# FIXME: temporarily disabled due to a lack of crash information in
# output.
# '-close_fd_mask=3',
])

additional_args.extend(corpus_directories)
return self.run_and_wait(
additional_args=additional_args,
timeout=fuzz_timeout - self.SIGTERM_WAIT_TIME,
terminate_before_kill=True,
terminate_wait_time=self.SIGTERM_WAIT_TIME,
max_stdout_len=MAX_OUTPUT_LEN,
extra_env=extra_env)

文章来源: https://www.giantbranch.cn/2021/01/25/ClusterFuzz%E7%9A%84bot%E6%BA%90%E7%A0%81(fuzz%20engine%E7%9A%84%E9%80%89%E6%8B%A9%E4%B8%8E%E8%B0%83%E5%BA%A6%E4%B9%8Blibfuzzer)%E9%98%85%E8%AF%BB/
如有侵权请联系:admin#unsafe.sh