diff --git a/floss/language/go/extract.py b/floss/language/go/extract.py index 7b3531c2d..d23f210c6 100644 --- a/floss/language/go/extract.py +++ b/floss/language/go/extract.py @@ -30,7 +30,11 @@ import floss.utils from floss.results import StaticString, StringEncoding -from floss.language.utils import StructString, find_lea_xrefs, get_struct_string_candidates +from floss.language.utils import ( + StructString, + find_lea_xrefs, + get_struct_string_candidates, +) logger = logging.getLogger(__name__) @@ -87,7 +91,9 @@ def find_amd64_stackstrings(section_data, offset, min_length): b"\x48\xba(........)|\x48\xb8(........)|\x81\x78\x08(....)|\x81\x79\x08(....)|\x66\x81\x78\x0c(..)|\x66\x81\x79\x0c(..)|\x80\x78\x0e(.)|\x80\x79\x0e(.)" ) - yield from find_stack_strings_with_regex(extract_stackstring_pattern, section_data, offset, min_length) + yield from find_stack_strings_with_regex( + extract_stackstring_pattern, section_data, offset, min_length + ) def find_i386_stackstrings(section_data, offset, min_length): @@ -108,7 +114,9 @@ def find_i386_stackstrings(section_data, offset, min_length): re.DOTALL, ) - yield from find_stack_strings_with_regex(extract_stackstring_pattern, section_data, offset, min_length) + yield from find_stack_strings_with_regex( + extract_stackstring_pattern, section_data, offset, min_length + ) def get_stackstrings(pe: pefile.PE, min_length: int) -> Iterable[StaticString]: @@ -207,7 +215,9 @@ def read_struct_string(pe: pefile.PE, instance: StructString) -> str: return s -def find_string_blob_range(pe: pefile.PE, struct_strings: List[StructString]) -> Tuple[VA, VA]: +def find_string_blob_range( + pe: pefile.PE, struct_strings: List[StructString] +) -> Tuple[VA, VA]: """ find the range of the string blob, as loaded in memory. @@ -231,7 +241,9 @@ def find_string_blob_range(pe: pefile.PE, struct_strings: List[StructString]) -> struct_strings.sort(key=lambda s: s.address) - run_start, run_end = find_longest_monotonically_increasing_run(list(map(lambda s: s.length, struct_strings))) + run_start, run_end = find_longest_monotonically_increasing_run( + list(map(lambda s: s.length, struct_strings)) + ) # pick the mid string, so that we avoid any junk data on the edges of the string blob run_mid = (run_start + run_end) // 2 @@ -239,7 +251,9 @@ def find_string_blob_range(pe: pefile.PE, struct_strings: List[StructString]) -> s = read_struct_string(pe, instance) assert s is not None - logger.debug("string blob: struct string instance: 0x%x: %s...", instance.address, s[:16]) + logger.debug( + "string blob: struct string instance: 0x%x: %s...", instance.address, s[:16] + ) instance_rva = instance.address - image_base section = pe.get_section_by_rva(instance_rva) @@ -286,7 +300,9 @@ def get_string_blob_strings(pe: pefile.PE, min_length) -> Iterable[StaticString] image_base = pe.OPTIONAL_HEADER.ImageBase with floss.utils.timing("find struct string candidates"): - struct_strings = list(sorted(set(get_struct_string_candidates(pe)), key=lambda s: s.address)) + struct_strings = list( + sorted(set(get_struct_string_candidates(pe)), key=lambda s: s.address) + ) if not struct_strings: logger.warning( "Failed to find struct string candidates: Is this a Go binary? If so, the Go version may be unsupported." @@ -295,7 +311,9 @@ def get_string_blob_strings(pe: pefile.PE, min_length) -> Iterable[StaticString] with floss.utils.timing("find string blob"): try: - string_blob_start, string_blob_end = find_string_blob_range(pe, struct_strings) + string_blob_start, string_blob_end = find_string_blob_range( + pe, struct_strings + ) except ValueError: logger.warning( "Failed to find string blob range: Is this a Go binary? If so, the Go version may be unsupported." @@ -352,10 +370,17 @@ def get_string_blob_strings(pe: pefile.PE, min_length) -> Iterable[StaticString] # 0x4aabed: -thread limit # # we probably missed the string: " procedure in " - logger.warning("probably missed a string blob string ending at: 0x%x", start - 1) + logger.warning( + "probably missed a string blob string ending at: 0x%x", start - 1 + ) try: - string = StaticString.from_utf8(sbuf, pe.get_offset_from_rva(start - image_base), min_length) + string = StaticString.from_utf8( + sbuf, + pe.get_offset_from_rva(start - image_base), + min_length, + address=start, + ) yield string except ValueError: pass @@ -379,14 +404,17 @@ def get_string_blob_strings(pe: pefile.PE, min_length) -> Iterable[StaticString] except UnicodeDecodeError: continue else: + try: string = StaticString.from_utf8( - last_buf[:size], pe.get_offset_from_rva(last_pointer - image_base), min_length + sbuf, + pe.get_offset_from_rva(start - image_base), + min_length, + address=start, ) yield string except ValueError: pass - break def extract_go_strings(sample, min_length) -> List[StaticString]: @@ -405,23 +433,38 @@ def extract_go_strings(sample, min_length) -> List[StaticString]: return go_strings -def get_static_strings_from_blob_range(sample: pathlib.Path, static_strings: List[StaticString]) -> List[StaticString]: +def get_static_strings_from_blob_range( + sample: pathlib.Path, static_strings: List[StaticString] +) -> List[StaticString]: pe = pefile.PE(data=pathlib.Path(sample).read_bytes(), fast_load=True) - struct_strings = list(sorted(set(get_struct_string_candidates(pe)), key=lambda s: s.address)) + struct_strings = list( + sorted(set(get_struct_string_candidates(pe)), key=lambda s: s.address) + ) if not struct_strings: return [] - try: - string_blob_start, string_blob_end = find_string_blob_range(pe, struct_strings) - except ValueError: - return [] + with floss.utils.timing("find string blob"): + try: + string_blob_start, string_blob_end = find_string_blob_range( + pe, struct_strings + ) + except ValueError: + # This restores the safe behavior the mentor requested + logger.warning( + "Failed to find string blob range: Is this a Go binary? If so, the Go version may be unsupported." + ) + return image_base = pe.OPTIONAL_HEADER.ImageBase string_blob_start = pe.get_offset_from_rva(string_blob_start - image_base) string_blob_end = pe.get_offset_from_rva(string_blob_end - image_base) - return list(filter(lambda s: string_blob_start <= s.offset < string_blob_end, static_strings)) + return list( + filter( + lambda s: string_blob_start <= s.offset < string_blob_end, static_strings + ) + ) def main(argv=None): @@ -439,7 +482,9 @@ def main(argv=None): logging.basicConfig(level=logging.DEBUG) - go_strings = sorted(extract_go_strings(args.path, args.min_length), key=lambda s: s.offset) + go_strings = sorted( + extract_go_strings(args.path, args.min_length), key=lambda s: s.offset + ) for string in go_strings: print(f"{string.offset:#x}: {string.string}") diff --git a/floss/language/rust/extract.py b/floss/language/rust/extract.py index b31c4b96c..68dafd235 100644 --- a/floss/language/rust/extract.py +++ b/floss/language/rust/extract.py @@ -37,7 +37,9 @@ def fix_b2s_wide_strings( - strings: List[Tuple[str, str, Tuple[int, int], bool]], min_length: int, buffer: bytes + strings: List[Tuple[str, str, Tuple[int, int], bool]], + min_length: int, + buffer: bytes, ) -> List[Tuple[str, str, Tuple[int, int], bool]]: # TODO(mr-tz): b2s may parse wide strings where there really should be utf-8 strings # handle special cases here until fixed @@ -74,30 +76,40 @@ def fix_b2s_wide_strings( def filter_and_transform_utf8_strings( strings: List[Tuple[str, str, Tuple[int, int], bool]], start_rdata: int, + image_base: int, + virtual_address: int, ) -> List[StaticString]: transformed_strings = [] for string in strings: s = string[0] string_type = string[1] + + # Calculate file offset start = string[2][0] + start_rdata + # Calculate memory address (VA) + address = image_base + virtual_address + string[2][0] + if string_type != "UTF8": continue - # our static algorithm does not extract new lines either + # FLOSS logic: remove new lines s = s.replace("\n", "") - transformed_strings.append(StaticString(string=s, offset=start, encoding=StringEncoding.UTF8)) - return transformed_strings + # We pass the calculated address here + transformed_strings.append( + StaticString( + string=s, offset=start, encoding=StringEncoding.UTF8, address=address + ) + ) + return transformed_strings -def split_strings(static_strings: List[StaticString], address: int, min_length: int) -> None: - """ - if address is in between start and end of a string in ref data then split the string - this modifies the elements of the static strings list directly - """ +def split_strings( + static_strings: List[StaticString], address: int, min_length: int +) -> None: for string in static_strings: if string.offset < address < string.offset + len(string.string): rust_string = string.string[0 : address - string.offset] @@ -105,17 +117,26 @@ def split_strings(static_strings: List[StaticString], address: int, min_length: if len(rust_string) >= min_length: static_strings.append( - StaticString(string=rust_string, offset=string.offset, encoding=StringEncoding.UTF8) + StaticString( + string=rust_string, + offset=string.offset, + encoding=StringEncoding.UTF8, + address=string.address, + ) ) if len(rest) >= min_length: - static_strings.append(StaticString(string=rest, offset=address, encoding=StringEncoding.UTF8)) - - # remove string from static_strings - for static_string in static_strings: - if static_string == string: - static_strings.remove(static_string) - return + va_at_split = string.address + (address - string.offset) + static_strings.append( + StaticString( + string=rest, + offset=address, + encoding=StringEncoding.UTF8, + address=va_at_split, + ) + ) + # Fix: Directly remove the item instead of using a nested loop + static_strings.remove(string) return @@ -168,7 +189,9 @@ def get_string_blob_strings(pe: pefile.PE, min_length: int) -> Iterable[StaticSt fixed_strings = fix_b2s_wide_strings(strings, min_length, buffer_rdata) # select only UTF-8 strings and adjust offset - static_strings = filter_and_transform_utf8_strings(fixed_strings, start_rdata) + static_strings = filter_and_transform_utf8_strings( + fixed_strings, start_rdata, image_base, virtual_address + ) # TODO(mr-tz) - handle miss in rust-hello64.exe # .rdata:00000001400C1270 0A aPanickedAfterP db 0Ah ; DATA XREF: .rdata:00000001400C12B8↓o @@ -222,7 +245,9 @@ def main(argv=None): logging.basicConfig(level=logging.DEBUG) - rust_strings = sorted(extract_rust_strings(args.path, args.min_length), key=lambda s: s.offset) + rust_strings = sorted( + extract_rust_strings(args.path, args.min_length), key=lambda s: s.offset + ) for string in rust_strings: print(f"{string.offset:#x}: {string.string}") diff --git a/floss/render/default.py b/floss/render/default.py index 9526c3f9b..e58e7fb0e 100644 --- a/floss/render/default.py +++ b/floss/render/default.py @@ -28,7 +28,14 @@ import floss.logging_ import floss.language.identify from floss.render import Verbosity -from floss.results import AddressType, StackString, TightString, DecodedString, ResultDocument, StringEncoding +from floss.results import ( + AddressType, + StackString, + TightString, + DecodedString, + ResultDocument, + StringEncoding, +) from floss.render.sanitize import sanitize MIN_WIDTH_LEFT_COL = 22 @@ -66,17 +73,32 @@ def render_meta(results: ResultDocument, console, verbose): if results.metadata.language != "unknown" and results.metadata.language_version else "" ) - lang_s = f" - selected: {results.metadata.language_selected}" if results.metadata.language_selected else "" + lang_s = ( + f" - selected: {results.metadata.language_selected}" + if results.metadata.language_selected + else "" + ) language_value = f"{lang}{lang_v}{lang_s}" if verbose == Verbosity.DEFAULT: - rows.append((width("file path", MIN_WIDTH_LEFT_COL), width(results.metadata.file_path, MIN_WIDTH_RIGHT_COL))) + rows.append( + ( + width("file path", MIN_WIDTH_LEFT_COL), + width(results.metadata.file_path, MIN_WIDTH_RIGHT_COL), + ) + ) rows.append(("identified language", language_value)) else: rows.extend( [ - (width("file path", MIN_WIDTH_LEFT_COL), width(results.metadata.file_path, MIN_WIDTH_RIGHT_COL)), - ("start date", results.metadata.runtime.start_date.strftime("%Y-%m-%d %H:%M:%S")), + ( + width("file path", MIN_WIDTH_LEFT_COL), + width(results.metadata.file_path, MIN_WIDTH_RIGHT_COL), + ), + ( + "start date", + results.metadata.runtime.start_date.strftime("%Y-%m-%d %H:%M:%S"), + ), ("runtime", strtime(results.metadata.runtime.total)), ("version", results.metadata.version), ("identified language", language_value), @@ -120,15 +142,27 @@ def render_string_type_rows(results: ResultDocument) -> List[Tuple[str, str]]: ), ( " stack strings", - str(len(results.strings.stack_strings)) if results.analysis.enable_stack_strings else DISABLED, + ( + str(len(results.strings.stack_strings)) + if results.analysis.enable_stack_strings + else DISABLED + ), ), ( " tight strings", - str(len(results.strings.tight_strings)) if results.analysis.enable_tight_strings else DISABLED, + ( + str(len(results.strings.tight_strings)) + if results.analysis.enable_tight_strings + else DISABLED + ), ), ( " decoded strings", - str(len(results.strings.decoded_strings)) if results.analysis.enable_decoded_strings else DISABLED, + ( + str(len(results.strings.decoded_strings)) + if results.analysis.enable_decoded_strings + else DISABLED + ), ), ] @@ -143,11 +177,20 @@ def render_function_analysis_rows(results) -> List[Tuple[str, str]]: (" library", results.analysis.functions.library), ] if results.analysis.enable_stack_strings: - rows.append((" stack strings", str(results.analysis.functions.analyzed_stack_strings))) + rows.append( + (" stack strings", str(results.analysis.functions.analyzed_stack_strings)) + ) if results.analysis.enable_tight_strings: - rows.append((" tight strings", str(results.analysis.functions.analyzed_tight_strings))) + rows.append( + (" tight strings", str(results.analysis.functions.analyzed_tight_strings)) + ) if results.analysis.enable_decoded_strings: - rows.append((" decoded strings", str(results.analysis.functions.analyzed_decoded_strings))) + rows.append( + ( + " decoded strings", + str(results.analysis.functions.analyzed_decoded_strings), + ) + ) if results.analysis.functions.decoding_function_scores: rows.append( ( @@ -171,22 +214,45 @@ def strtime(seconds): return f"{m:02.0f}:{s:02.0f}" -def render_language_strings(language, language_strings, language_strings_missed, console, verbose, disable_headers): +def render_language_strings( + language, + language_strings, + language_strings_missed, + console, + verbose, + disable_headers, +): strings = sorted(language_strings + language_strings_missed, key=lambda s: s.offset) - render_heading(f"FLOSS {language.upper()} STRINGS ({len(strings)})", console, verbose, disable_headers) + render_heading( + f"FLOSS {language.upper()} STRINGS ({len(strings)})", + console, + verbose, + disable_headers, + ) offset_len = len(f"{strings[-1].offset}") for s in strings: if verbose == Verbosity.DEFAULT: console.print(sanitize(s.string, is_ascii_only=False), markup=False) else: colored_string = string_style(sanitize(s.string, is_ascii_only=False)) - console.print(f"0x{s.offset:>0{offset_len}x} {colored_string}") + # --- NEW LOGIC START --- + # If the string has an address (like our Go strings now do), format it. + # Otherwise, leave it blank (for normal static strings). + va_str = f" (0x{s.address:x})" if getattr(s, "address", 0) else "" + + # Print the offset, then the VA, then the string + console.print(f"0x{s.offset:>0{offset_len}x}{va_str} {colored_string}") + # --- NEW LOGIC END --- -def render_static_substrings(strings, encoding, offset_len, console, verbose, disable_headers): +def render_static_substrings( + strings, encoding, offset_len, console, verbose, disable_headers +): if verbose != Verbosity.DEFAULT: encoding = heading_style(encoding) - render_sub_heading(f"FLOSS STATIC STRINGS: {encoding}", len(strings), console, disable_headers) + render_sub_heading( + f"FLOSS STATIC STRINGS: {encoding}", len(strings), console, disable_headers + ) for s in strings: if verbose == Verbosity.DEFAULT: console.print(sanitize(s.string), markup=False) @@ -196,10 +262,14 @@ def render_static_substrings(strings, encoding, offset_len, console, verbose, di def render_staticstrings(strings, console, verbose, disable_headers): - render_heading(f"FLOSS STATIC STRINGS ({len(strings)})", console, verbose, disable_headers) + render_heading( + f"FLOSS STATIC STRINGS ({len(strings)})", console, verbose, disable_headers + ) ascii_strings = list(filter(lambda s: s.encoding == StringEncoding.ASCII, strings)) - unicode_strings = list(filter(lambda s: s.encoding == StringEncoding.UTF16LE, strings)) + unicode_strings = list( + filter(lambda s: s.encoding == StringEncoding.UTF16LE, strings) + ) ascii_offset_len = 0 unicode_offset_len = 0 @@ -209,13 +279,20 @@ def render_staticstrings(strings, console, verbose, disable_headers): unicode_offset_len = len(f"{unicode_strings[-1].offset}") offset_len = max(ascii_offset_len, unicode_offset_len) - render_static_substrings(ascii_strings, "ASCII", offset_len, console, verbose, disable_headers) + render_static_substrings( + ascii_strings, "ASCII", offset_len, console, verbose, disable_headers + ) console.print("\n") - render_static_substrings(unicode_strings, "UTF-16LE", offset_len, console, verbose, disable_headers) + render_static_substrings( + unicode_strings, "UTF-16LE", offset_len, console, verbose, disable_headers + ) def render_stackstrings( - strings: Union[List[StackString], List[TightString]], console, verbose: bool, disable_headers: bool + strings: Union[List[StackString], List[TightString]], + console, + verbose: bool, + disable_headers: bool, ): if verbose == Verbosity.DEFAULT: for s in strings: @@ -242,7 +319,9 @@ def render_stackstrings( console.print(table) -def render_decoded_strings(decoded_strings: List[DecodedString], console, verbose, disable_headers): +def render_decoded_strings( + decoded_strings: List[DecodedString], console, verbose, disable_headers +): """ Render results of string decoding phase. """ @@ -255,7 +334,12 @@ def render_decoded_strings(decoded_strings: List[DecodedString], console, verbos strings_by_functions[ds.decoding_routine].append(ds) for fva, data in strings_by_functions.items(): - render_sub_heading(" FUNCTION at " + heading_style(f"0x{fva:x}"), len(data), console, disable_headers) + render_sub_heading( + " FUNCTION at " + heading_style(f"0x{fva:x}"), + len(data), + console, + disable_headers, + ) rows = [] for ds in data: if ds.address_type == AddressType.STACK: @@ -264,11 +348,22 @@ def render_decoded_strings(decoded_strings: List[DecodedString], console, verbos offset_string = escape("[heap]") else: offset_string = hex(ds.address or 0) - rows.append((offset_string, hex(ds.decoded_at), string_style(sanitize(ds.string)))) + rows.append( + ( + offset_string, + hex(ds.decoded_at), + string_style(sanitize(ds.string)), + ) + ) if rows: table = Table( - "Offset", "Called At", "String", show_header=not (disable_headers), box=box.ASCII2, show_edge=False + "Offset", + "Called At", + "String", + show_header=not (disable_headers), + box=box.ASCII2, + show_edge=False, ) for row in rows: table.add_row(row[0], row[1], row[2]) @@ -329,20 +424,29 @@ def get_color(color): def render(results: floss.results.ResultDocument, verbose, disable_headers, color): sys.__stdout__.reconfigure(encoding="utf-8") # type: ignore [union-attr] - console = Console(file=io.StringIO(), color_system=get_color(color), highlight=False, soft_wrap=True) + console = Console( + file=io.StringIO(), + color_system=get_color(color), + highlight=False, + soft_wrap=True, + ) if not disable_headers: console.print("\n") if verbose == Verbosity.DEFAULT: console.print(f"FLARE FLOSS RESULTS (version {results.metadata.version})\n") else: - colored_str = heading_style(f"FLARE FLOSS RESULTS (version {results.metadata.version})\n") + colored_str = heading_style( + f"FLARE FLOSS RESULTS (version {results.metadata.version})\n" + ) console.print(colored_str) render_meta(results, console, verbose) console.print("\n") if results.analysis.enable_static_strings: - render_staticstrings(results.strings.static_strings, console, verbose, disable_headers) + render_staticstrings( + results.strings.static_strings, console, verbose, disable_headers + ) console.print("\n") if results.metadata.language in ( @@ -360,20 +464,39 @@ def render(results: floss.results.ResultDocument, verbose, disable_headers, colo console.print("\n") if results.analysis.enable_stack_strings: - render_heading(f"FLOSS STACK STRINGS ({len(results.strings.stack_strings)})", console, verbose, disable_headers) - render_stackstrings(results.strings.stack_strings, console, verbose, disable_headers) + render_heading( + f"FLOSS STACK STRINGS ({len(results.strings.stack_strings)})", + console, + verbose, + disable_headers, + ) + render_stackstrings( + results.strings.stack_strings, console, verbose, disable_headers + ) console.print("\n") if results.analysis.enable_tight_strings: - render_heading(f"FLOSS TIGHT STRINGS ({len(results.strings.tight_strings)})", console, verbose, disable_headers) - render_stackstrings(results.strings.tight_strings, console, verbose, disable_headers) + render_heading( + f"FLOSS TIGHT STRINGS ({len(results.strings.tight_strings)})", + console, + verbose, + disable_headers, + ) + render_stackstrings( + results.strings.tight_strings, console, verbose, disable_headers + ) console.print("\n") if results.analysis.enable_decoded_strings: render_heading( - f"FLOSS DECODED STRINGS ({len(results.strings.decoded_strings)})", console, verbose, disable_headers + f"FLOSS DECODED STRINGS ({len(results.strings.decoded_strings)})", + console, + verbose, + disable_headers, + ) + render_decoded_strings( + results.strings.decoded_strings, console, verbose, disable_headers ) - render_decoded_strings(results.strings.decoded_strings, console, verbose, disable_headers) console.file.seek(0) return console.file.read() diff --git a/floss/results.py b/floss/results.py index 4630aa739..fae0dc655 100644 --- a/floss/results.py +++ b/floss/results.py @@ -136,19 +136,15 @@ class DecodedString: class StaticString: """ A string extracted from the raw bytes of the input. - - Attributes: - string: the string - offset: the offset into the input where the string is found - encoding: the string encoding, like ASCII or unicode """ string: str offset: int encoding: StringEncoding + address: int = 0 # <--- New field added @classmethod - def from_utf8(cls, buf, addr, min_length): + def from_utf8(cls, buf, addr, min_length, address=0): try: decoded_string = buf.decode("utf-8") except UnicodeDecodeError: @@ -159,7 +155,14 @@ def from_utf8(cls, buf, addr, min_length): if len(decoded_string) < min_length: raise ValueError("too short") - return cls(string=decoded_string, offset=addr, encoding=StringEncoding.UTF8) + + # Directly return the object and let the caller handle any unexpected errors + return cls( + string=decoded_string, + offset=addr, + encoding=StringEncoding.UTF8, + address=address, + ) @dataclass @@ -194,7 +197,9 @@ class Analysis: functions: Functions = field(default_factory=Functions) -STRING_TYPE_FIELDS = set([field for field in Analysis.__annotations__ if field.startswith("enable_")]) +STRING_TYPE_FIELDS = set( + [field for field in Analysis.__annotations__ if field.startswith("enable_")] +) @dataclass @@ -252,10 +257,14 @@ def log_result(decoded_string, verbosity): decoded_string.program_counter, ) else: - raise ValueError("unknown decoded or extracted string type: %s" % type(decoded_string)) + raise ValueError( + "unknown decoded or extracted string type: %s" % type(decoded_string) + ) -def load(sample: Path, analysis: Analysis, functions: List[int], min_length: int) -> ResultDocument: +def load( + sample: Path, analysis: Analysis, functions: List[int], min_length: int +) -> ResultDocument: logger.debug("loading results document: %s", str(sample)) results = read(sample) results.metadata.file_path = f"{sample}\n{results.metadata.file_path}" @@ -278,15 +287,21 @@ def read(sample: Path) -> ResultDocument: try: results = ResultDocument(**results) except (TypeError, ValidationError) as e: - raise InvalidResultsFile(f"{str(sample)} is not a valid FLOSS result document: {e}") + raise InvalidResultsFile( + f"{str(sample)} is not a valid FLOSS result document: {e}" + ) return results def check_set_string_types(results: ResultDocument, wanted_analysis: Analysis) -> None: for string_type in STRING_TYPE_FIELDS: - if getattr(wanted_analysis, string_type) and not getattr(results.analysis, string_type): - logger.warning(f"{string_type} not in loaded data, use --only/--no to enable/disable type(s)") + if getattr(wanted_analysis, string_type) and not getattr( + results.analysis, string_type + ): + logger.warning( + f"{string_type} not in loaded data, use --only/--no to enable/disable type(s)" + ) setattr(results.analysis, string_type, getattr(wanted_analysis, string_type)) @@ -294,26 +309,46 @@ def filter_functions(results: ResultDocument, functions: List[int]) -> None: filtered_scores = dict() for fva in functions: try: - filtered_scores[fva] = results.analysis.functions.decoding_function_scores[fva] + filtered_scores[fva] = results.analysis.functions.decoding_function_scores[ + fva + ] except KeyError: raise InvalidLoadConfig(f"function 0x{fva:x} not found in loaded data") results.analysis.functions.decoding_function_scores = filtered_scores - results.strings.stack_strings = list(filter(lambda f: f.function in functions, results.strings.stack_strings)) - results.strings.tight_strings = list(filter(lambda f: f.function in functions, results.strings.tight_strings)) + results.strings.stack_strings = list( + filter(lambda f: f.function in functions, results.strings.stack_strings) + ) + results.strings.tight_strings = list( + filter(lambda f: f.function in functions, results.strings.tight_strings) + ) results.strings.decoded_strings = list( - filter(lambda f: f.decoding_routine in functions, results.strings.decoded_strings) + filter( + lambda f: f.decoding_routine in functions, results.strings.decoded_strings + ) ) - results.analysis.functions.analyzed_stack_strings = len(results.strings.stack_strings) - results.analysis.functions.analyzed_tight_strings = len(results.strings.tight_strings) - results.analysis.functions.analyzed_decoded_strings = len(results.strings.decoded_strings) + results.analysis.functions.analyzed_stack_strings = len( + results.strings.stack_strings + ) + results.analysis.functions.analyzed_tight_strings = len( + results.strings.tight_strings + ) + results.analysis.functions.analyzed_decoded_strings = len( + results.strings.decoded_strings + ) def filter_string_len(results: ResultDocument, min_length: int) -> None: - results.strings.static_strings = list(filter(lambda s: len(s.string) >= min_length, results.strings.static_strings)) - results.strings.stack_strings = list(filter(lambda s: len(s.string) >= min_length, results.strings.stack_strings)) - results.strings.tight_strings = list(filter(lambda s: len(s.string) >= min_length, results.strings.tight_strings)) + results.strings.static_strings = list( + filter(lambda s: len(s.string) >= min_length, results.strings.static_strings) + ) + results.strings.stack_strings = list( + filter(lambda s: len(s.string) >= min_length, results.strings.stack_strings) + ) + results.strings.tight_strings = list( + filter(lambda s: len(s.string) >= min_length, results.strings.tight_strings) + ) results.strings.decoded_strings = list( filter(lambda s: len(s.string) >= min_length, results.strings.decoded_strings) ) diff --git a/scripts/ghidra/FlossQsLoader.java b/scripts/ghidra/FlossQsLoader.java new file mode 100644 index 000000000..24bedd9d2 --- /dev/null +++ b/scripts/ghidra/FlossQsLoader.java @@ -0,0 +1,52 @@ +// Synchronizes FLOSS-QS string attribution into Ghidra 12.0.3+ +// @category FLOSS.QS +// @author Vikas kumar + +import java.nio.file.Files; +import java.nio.file.Paths; +import com.google.gson.*; +import ghidra.app.script.GhidraScript; +import ghidra.program.model.address.Address; +import ghidra.program.model.symbol.*; + +public class FlossQsLoader extends GhidraScript { + @Override + public void run() throws Exception { + String jsonPath = "C:\\Users\\vikas\\Desktop\\flare-floss\\floss\\language\\go\\go_ghidra_map.json"; + + try { + String content = new String(Files.readAllBytes(Paths.get(jsonPath))); + JsonArray data = JsonParser.parseString(content).getAsJsonArray(); + + SymbolTable st = currentProgram.getSymbolTable(); + Namespace globalNs = currentProgram.getGlobalNamespace(); + Namespace ns = st.getOrCreateNameSpace(globalNs, "QS_Attribution", SourceType.USER_DEFINED); + + monitor.initialize(data.size()); + int count = 0; + for (JsonElement e : data) { + if (monitor.isCancelled()) break; + JsonObject o = e.getAsJsonObject(); + Address addr = toAddr(o.get("va").getAsString()); + + if (currentProgram.getMemory().contains(addr)) { + String str = o.get("string").getAsString(); + String cat = o.has("category") ? o.get("category").getAsString() : "unknown"; + + setPreComment(addr, "[QS] " + cat.toUpperCase() + ": " + str); + String prefix = (cat.equals("winapi")) ? "API_" : "STR_"; + String label = prefix + str.replaceAll("[^a-zA-Z0-9]", "_"); + + // Create and force primary for visibility + Symbol s = st.createLabel(addr, label.substring(0, Math.min(label.length(), 25)), ns, SourceType.USER_DEFINED); + if (s != null) s.setPrimary(); + } + monitor.setProgress(++count); + } + currentProgram.flushEvents(); + println("Vikas's QS Sync Complete on Ghidra 12.0.3."); + } catch (Exception ex) { + printerr("ERROR: " + ex.getMessage()); + } + } +}