def generate_default_strategy_pool(strategy_list, use_generator): """Return a strategy pool representing a selection of strategies for launcher to consider.
Select strategies according to default strategy selection method.""" pool = StrategyPool()
# If use_generator is enabled, decide whether to include radamsa, ml rnn, # or no generator (mutually exclusive). if use_generator: choose_generator(pool)
# Decide whether or not to add non-generator strategies according to # probability parameters. for value in [ strategy_entry for strategy_entry in strategy_list if strategy_entry not in GENERATORS ]: if do_strategy(value): pool.add_strategy(value)
logs.log('Strategy pool was generated according to default parameters. ' 'Chosen strategies: ' + ', '.join(pool.strategy_names)) return pool
# /src/python/bot/fuzzers/engine_common.py def decide_with_probability(probability): """Decide if we want to do something with the given probability.""" return random.SystemRandom().random() < probability # /src/python/bot/fuzzers/strategy_selection.py def choose_generator(strategy_pool): """Chooses whether to use radamsa, ml rnn, or no generator and updates the strategy pool."""
# Select a generator to attempt to use for existing testcase mutations. candidate_generator = engine_common.select_generator(strategy_pool, fuzzer_path) is_mutations_run = (not environment.is_ephemeral() and candidate_generator != engine_common.Generator.NONE)
# Generate new testcase mutations using radamsa, etc. if is_mutations_run: new_testcase_mutations_directory = create_corpus_directory('mutations') generator_used = engine_common.generate_new_testcase_mutations( corpus_directory, new_testcase_mutations_directory, project_qualified_fuzzer_name, candidate_generator)
# Add the used generator strategy to our fuzzing strategies list. if generator_used: if candidate_generator == engine_common.Generator.RADAMSA: fuzzing_strategies.append( strategy.CORPUS_MUTATION_RADAMSA_STRATEGY.name) elif candidate_generator == engine_common.Generator.ML_RNN: fuzzing_strategies.append(strategy.CORPUS_MUTATION_ML_RNN_STRATEGY.name)
def generate_new_testcase_mutations(corpus_directory, new_testcase_mutations_directory, fuzzer_name, candidate_generator): """Generate new testcase mutations, using existing corpus directory or other methods.
Returns true if mutations are successfully generated using radamsa or ml rnn. A false return signifies either no generator use or unsuccessful generation of testcase mutations.""" generation_timeout = get_new_testcase_mutations_timeout() pre_mutations_filecount = shell.get_directory_file_count( new_testcase_mutations_directory)
# Generate new testcase mutations using Radamsa. if candidate_generator == Generator.RADAMSA: generate_new_testcase_mutations_using_radamsa( corpus_directory, new_testcase_mutations_directory, generation_timeout) # Generate new testcase mutations using ML RNN model. elif candidate_generator == Generator.ML_RNN: generate_new_testcase_mutations_using_ml_rnn( corpus_directory, new_testcase_mutations_directory, fuzzer_name, generation_timeout)
# If new mutations are successfully generated, return true. if shell.get_directory_file_count( new_testcase_mutations_directory) > pre_mutations_filecount: return True
return False
RANDOM_MAX_LENGTH_STRATEGY
这个就是最大长度策略,首先判断是否已经存在-max_len=参数了,存在就不做任何操作
假如不存在,则生成一个0到10000范围内的数,作为-max_len的值
1 2 3 4 5 6 7
if strategy_pool.do_strategy(strategy.RANDOM_MAX_LENGTH_STRATEGY): max_len_argument = fuzzer_utils.extract_argument( existing_arguments, constants.MAX_LEN_FLAG, remove=False) if not max_len_argument: max_length = random.SystemRandom().randint(1, MAX_VALUE_FOR_MAX_LENGTH) arguments.append('%s%d' % (constants.MAX_LEN_FLAG, max_length)) fuzzing_strategies.append(strategy.RANDOM_MAX_LENGTH_STRATEGY.name)
RECOMMENDED_DICTIONARY_STRATEGY
这是推荐字典策略,函数add_recommended_dictionary
1 2 3 4
if (strategy_pool.do_strategy(strategy.RECOMMENDED_DICTIONARY_STRATEGY) and add_recommended_dictionary(arguments, project_qualified_fuzzer_name, fuzzer_path)): fuzzing_strategies.append(strategy.RECOMMENDED_DICTIONARY_STRATEGY.name)
def add_recommended_dictionary(arguments, fuzzer_name, fuzzer_path): """Add recommended dictionary from GCS to existing .dict file or create a new one and update the arguments as needed. This function modifies |arguments| list in some cases.""" recommended_dictionary_path = os.path.join( fuzzer_utils.get_temp_dir(), dictionary_manager.RECOMMENDED_DICTIONARY_FILENAME)
try: # Bail out if cannot download recommended dictionary from GCS. if not dict_manager.download_recommended_dictionary_from_gcs( recommended_dictionary_path): return False except Exception as ex: logs.log_error( 'Exception downloading recommended dictionary:\n%s.' % str(ex)) return False
# Bail out if the downloaded dictionary is empty. if not os.path.getsize(recommended_dictionary_path): return False
# Check if there is an existing dictionary file in arguments. original_dictionary_path = fuzzer_utils.extract_argument( arguments, constants.DICT_FLAG) merged_dictionary_path = ( original_dictionary_path or dictionary_manager.get_default_dictionary_path(fuzzer_path)) merged_dictionary_path += MERGED_DICT_SUFFIX
if strategy_pool.do_strategy(strategy.VALUE_PROFILE_STRATEGY): arguments.append(constants.VALUE_PROFILE_ARGUMENT) fuzzing_strategies.append(strategy.VALUE_PROFILE_STRATEGY.name)
# Do not use fork mode for DFT-based fuzzing. This is needed in order to # collect readable and actionable logs from fuzz targets running with DFT. if (not is_fuchsia and not is_android and not is_ephemeral and not use_dataflow_tracing and strategy_pool.do_strategy(strategy.FORK_STRATEGY)): max_fuzz_threads = environment.get_value('MAX_FUZZ_THREADS', 1) num_fuzz_processes = max(1, utils.cpu_count() // max_fuzz_threads) arguments.append('%s%d' % (constants.FORK_FLAG, num_fuzz_processes)) fuzzing_strategies.append( '%s_%d' % (strategy.FORK_STRATEGY.name, num_fuzz_processes))
extra_env = {} if (strategy_pool.do_strategy(strategy.MUTATOR_PLUGIN_STRATEGY) and use_mutator_plugin(target_name, extra_env)): fuzzing_strategies.append(strategy.MUTATOR_PLUGIN_STRATEGY.name)
PEACH_GRAMMAR_MUTATION_STRATEGY
1 2 3 4 5
if (not has_existing_mutator_strategy(fuzzing_strategies) and strategy_pool.do_strategy(strategy.PEACH_GRAMMAR_MUTATION_STRATEGY) and use_peach_mutator(extra_env, grammar)): fuzzing_strategies.append( '%s_%s' % (strategy.PEACH_GRAMMAR_MUTATION_STRATEGY.name, grammar))
if (not has_existing_mutator_strategy(fuzzing_strategies) and strategy_pool.do_strategy(strategy.MUTATOR_PLUGIN_RADAMSA_STRATEGY) and use_radamsa_mutator_plugin(extra_env)): fuzzing_strategies.append(strategy.MUTATOR_PLUGIN_RADAMSA_STRATEGY.name)
def use_radamsa_mutator_plugin(extra_env): """Decide whether to use Radamsa in process. If yes, add the path to the radamsa shared object to LD_PRELOAD in |extra_env| and return True.""" # Radamsa will only work on LINUX ASAN jobs. # TODO(mpherman): Include architecture info in job definition and exclude # i386. if environment.is_lib() or not is_linux_asan(): return False
# Pick a few testcases from our corpus to use as the initial corpus. subset_size = engine_common.random_choice( engine_common.CORPUS_SUBSET_NUM_TESTCASES)
if (not strategy_info.use_dataflow_tracing and strategy_pool.do_strategy(strategy.CORPUS_SUBSET_STRATEGY) and shell.get_directory_file_count(corpus_dir) > subset_size): # Copy |subset_size| testcases into 'subset' directory. corpus_subset_dir = self._create_temp_corpus_dir('subset') libfuzzer.copy_from_corpus(corpus_subset_dir, corpus_dir, subset_size) strategy_info.fuzzing_strategies.append( strategy.CORPUS_SUBSET_STRATEGY.name + '_' + str(subset_size)) strategy_info.additional_corpus_dirs.append(corpus_subset_dir) else: strategy_info.additional_corpus_dirs.append(corpus_dir)
存在字典参数,检查字典文件参数,并检查字典是否存在
如果不存在字典参数,则检查%target_binary_name%.dict是否存在
最后还检查字典的格式并尝试修复,比如缺少双引号
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
# Check dict argument to make sure that it's valid. dict_path = fuzzer_utils.extract_argument( arguments, constants.DICT_FLAG, remove=False) if dict_path and not os.path.exists(dict_path): logs.log_error('Invalid dict %s for %s.' % (dict_path, target_path)) fuzzer_utils.extract_argument(arguments, constants.DICT_FLAG) # If there's no dict argument, check for %target_binary_name%.dict file. dict_path = fuzzer_utils.extract_argument( arguments, constants.DICT_FLAG, remove=False) if not dict_path: dict_path = dictionary_manager.get_default_dictionary_path(target_path) if os.path.exists(dict_path): arguments.append(constants.DICT_FLAG + dict_path) # If we have a dictionary, correct any items that are not formatted properly # (e.g. quote items that are missing them). dictionary_manager.correct_if_needed(dict_path)
log_lines = fuzz_result.output.splitlines() # Output can be large, so save some memory by removing reference to the # original output which is no longer needed. fuzz_result.output = None
# Check if we crashed, and get the crash testcase path. crash_testcase_file_path = runner.get_testcase_path(log_lines)
# If we exited with a non-zero return code with no crash file in output from # libFuzzer, this is most likely a startup crash. Use an empty testcase to # to store it as a crash. if not crash_testcase_file_path and fuzz_result.return_code: crash_testcase_file_path = self._create_empty_testcase_file( reproducers_dir)
# Parse stats information based on libFuzzer output. parsed_stats = libfuzzer.parse_log_stats(log_lines)
# Use higher timeout for reproduction. libfuzzer.fix_timeout_argument_for_reproduction(reproduce_arguments)
# Write the new testcase. # Copy crash testcase contents into the main testcase path. crashes.append( engine.Crash(crash_testcase_file_path, fuzz_logs, reproduce_arguments, actual_duration))
class LibFuzzerRunner(new_process.UnicodeProcessRunner, LibFuzzerCommon): """libFuzzer runner (when minijail is not used)."""
def __init__(self, executable_path, default_args=None): """Inits the LibFuzzerRunner.
Args: executable_path: Path to the fuzzer executable. default_args: Default arguments to always pass to the fuzzer. """ super().__init__(executable_path=executable_path, default_args=default_args)
def fuzz(self, corpus_directories, fuzz_timeout, artifact_prefix=None, additional_args=None, extra_env=None): """LibFuzzerCommon.fuzz override.""" additional_args = copy.copy(additional_args) if additional_args is None: additional_args = []
Args: corpus_directories: List of corpus directory paths to be passed to libFuzzer. fuzz_timeout: The maximum time in seconds that libFuzzer is allowed to run for. artifact_prefix: The directory to store new fuzzing artifacts (crashes, timeouts, slow units) additional_args: A sequence of additional arguments to be passed to the executable. extra_env: A dictionary containing environment variables and their values. These will be added to the environment of the new process.
Returns: A process.ProcessResult. """ additional_args = copy.copy(additional_args) if additional_args is None: additional_args = []
max_total_time = self.get_max_total_time(fuzz_timeout) if any(arg.startswith(constants.FORK_FLAG) for arg in additional_args): max_total_time -= self.LIBFUZZER_FORK_MODE_CLEAN_EXIT_TIME assert max_total_time > 0
# Old libFuzzer jobs specify -artifact_prefix through additional_args if artifact_prefix: additional_args.append( '%s%s' % (constants.ARTIFACT_PREFIX_FLAG, self._normalize_artifact_prefix(artifact_prefix)))
additional_args.extend([ '%s%d' % (constants.MAX_TOTAL_TIME_FLAG, max_total_time), constants.PRINT_FINAL_STATS_ARGUMENT, # FIXME: temporarily disabled due to a lack of crash information in # output. # '-close_fd_mask=3', ])