diff --git a/floss/language/elf.py b/floss/language/elf.py new file mode 100644 index 000000000..594e42081 --- /dev/null +++ b/floss/language/elf.py @@ -0,0 +1,184 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import io +from typing import List, Tuple, Union, Iterable, Optional +from pathlib import Path +from dataclasses import dataclass + +from elftools.elf.elffile import ELFFile + +# ELF program header constants +PT_LOAD = 1 # loadable segment + +PF_X = 1 # executable +PF_W = 2 # writable +PF_R = 4 # readable + + +@dataclass(frozen=True) +class Segment: + """ + Models a single memory segment in an ELF binary + ELF binaries organize memory by segments, not sections + """ + + vaddr_start: int + vaddr_end: int + file_off: int + file_end: int + flags: int + + def contains_va(self, va: int) -> bool: + return self.vaddr_start <= va < self.vaddr_end + + def contains_file_backed_va(self, va: int) -> bool: + size = self.file_end - self.file_off + return self.vaddr_start <= va < self.vaddr_start + size + + +class ELF: + """ + Minimal ELF(x86-64) helper for VA mapping and byte reading + """ + + def __init__(self, source: Union[Path, str, bytes]): + # if source as bytes passed + if isinstance(source, bytes): + self._data = source + # if source as Path or str passed + else: + self._data = Path(source).read_bytes() + + self._segments = self._parse_load_segments(self._data) + if not self._segments: + raise ValueError("ELF has no PT_LOAD segments") + + @property + def data(self) -> bytes: + return self._data + + def iter_load_segments(self) -> Iterable[Segment]: + return iter(self._segments) + + def iter_executable_segments(self) -> Iterable[Segment]: + return (seg for seg in self._segments if seg.flags & PF_X) + + def iter_readable_segments(self) -> Iterable[Segment]: + return (seg for seg in self._segments if seg.flags & PF_R) + + def iter_readonly_segments(self) -> Iterable[Segment]: + return (seg for seg in self._segments if (seg.flags & PF_R) and not (seg.flags & PF_X)) + + def get_mapped_range(self) -> Tuple[int, int]: + low = min(seg.vaddr_start for seg in self._segments) + high = max(seg.vaddr_end for seg in self._segments) + return low, high + + def is_va_mapped(self, va: int) -> bool: + return self._find_va_segment(va) is not None + + def va_to_file_offset(self, va: int) -> int: + seg = self._find_file_backed_va_segment(va) + if seg is None: + raise ValueError(f"VA is not file backed: 0x{va:x}") + + return seg.file_off + (va - seg.vaddr_start) + + def read_va(self, va: int, size: int) -> bytes: + if size < 0: + raise ValueError("size must be non negative") + if size == 0: + return b"" + + out = bytearray() + cur = va + remaining = size + + while remaining > 0: + segment = self._find_file_backed_va_segment(cur) + if segment is None: + raise ValueError(f"VA range is not fully file backed: 0x{cur:x}") + + file_backed_end_va = segment.vaddr_start + (segment.file_end - segment.file_off) + chunk_size = min(remaining, file_backed_end_va - cur) + if chunk_size <= 0: + raise ValueError("invalid VA range") + + file_offset = segment.file_off + (cur - segment.vaddr_start) + out.extend(self._data[file_offset : file_offset + chunk_size]) + + cur += chunk_size + remaining -= chunk_size + + return bytes(out) + + @staticmethod + def _parse_load_segments(data: bytes) -> List[Segment]: + stream = io.BytesIO(data) + + try: + elf = ELFFile(stream) + except Exception: + raise ValueError("not a valid ELF file") + + # contraints for now, might update later + if elf.elfclass != 64: + raise ValueError("only ELF64 is supported") + if not elf.little_endian: + raise ValueError("only little endian is supported") + if elf["e_machine"] != "EM_X86_64": + raise ValueError("only x86-64 ELF is supported") + + size = len(data) + segments: list[Segment] = [] + + for ph in elf.iter_segments(): + if ph["p_type"] != "PT_LOAD": + continue + + p_offset = int(ph["p_offset"]) + p_filesz = int(ph["p_filesz"]) + p_memsz = int(ph["p_memsz"]) + p_vaddr = int(ph["p_vaddr"]) + p_flags = int(ph["p_flags"]) + + file_end = p_offset + p_filesz + if file_end > size: + raise ValueError("PT_LOAD file range exceeds file size") + + segments.append( + Segment( + vaddr_start=p_vaddr, + vaddr_end=p_vaddr + p_memsz, + file_off=p_offset, + file_end=file_end, + flags=p_flags, + ) + ) + + return sorted(segments, key=lambda s: s.vaddr_start) + + def _find_va_segment(self, va: int) -> Optional[Segment]: + for seg in self._segments: + if seg.contains_va(va): + return seg + return None + + def _find_file_backed_va_segment(self, va: int) -> Optional[Segment]: + for segment in self._segments: + if segment.contains_file_backed_va(va): + return segment + return None diff --git a/floss/language/go/extract_elf.py b/floss/language/go/extract_elf.py new file mode 100644 index 000000000..63f7074ea --- /dev/null +++ b/floss/language/go/extract_elf.py @@ -0,0 +1,417 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import struct +import logging +from bisect import bisect_left +from typing import Dict, List, Tuple, Iterable + +import floss.utils +from floss.results import StaticString +from floss.language.elf import ELF +from floss.language.utils import StructString + +logger = logging.getLogger(__name__) + +MAX_STRING_LEN = 1024 * 1024 +MAX_RUNS_TO_TRY = 8 +MAX_ANCHOR_ATTEMPTS_PER_RUN = 64 + + +def find_longest_monotonically_increasing_run(values: List[int]) -> Tuple[int, int]: + """ + for the given sorted list of values, + find the (start, end) indices of the longest run of values + such that each value is greater than or equal to the previous value. + """ + max_run_length = 0 + max_run_end_index = 0 + + current_run_length = 0 + prior_value = 0 + + for i, value in enumerate(values): + if value >= prior_value: + current_run_length += 1 + else: + current_run_length = 1 + + if current_run_length > max_run_length: + max_run_length = current_run_length + max_run_end_index = i + + prior_value = value + + max_run_start_index = max_run_end_index - max_run_length + 1 + + return max_run_start_index, max_run_end_index + + +def _is_va_in_readable_segments(view: ELF, va: int) -> bool: + for segment in view.iter_readable_segments(): + if segment.vaddr_start <= va < segment.vaddr_end: + return True + return False + + +def _is_va_in_readonly_segments(view: ELF, va: int) -> bool: + for segment in view.iter_readonly_segments(): + if segment.vaddr_start <= va < segment.vaddr_end: + return True + return False + + +def get_struct_string_candidates_elf(view: ELF, min_length: int = 1) -> Iterable[Tuple[StructString, int]]: + """ + Find candidate struct String instances in the given elf file + """ + for segment in view.iter_readonly_segments(): + data = view.data[segment.file_off : segment.file_end] + # check segment data len (8 bytes for pointer,8 for len) + if len(data) < 16: + continue + + for offset in range(0, len(data) - 16 + 1, 8): + struct_va = segment.vaddr_start + offset + ptr = struct.unpack_from(" MAX_STRING_LEN: + continue + if not _is_va_in_readable_segments(view, ptr): + continue + if not _is_va_in_readable_segments(view, ptr + length - 1): + continue + + yield StructString(struct_va, length), ptr + + +def read_struct_string_elf(view: ELF, ptr: int, instance: StructString) -> str: + """ + Read the string for the given struct String instance, + validating that it looks like UTF-8 + """ + buf = view.read_va(ptr, instance.length + 1) + instance_data = buf[: instance.length] + next_byte = buf[instance.length] + + try: + s = instance_data.decode("utf-8") + except UnicodeDecodeError: + raise ValueError("struct string instance does not contain valid UTF-8") + + if s.encode("utf-8") != instance_data: + raise ValueError("struct string length incorrect") + + if next_byte == 0x00: + raise ValueError("struct string is NULL terminated") + + return s + + +def _find_segment_for_va(view: ELF, va: int): + for segment in view.iter_load_segments(): + if segment.vaddr_start <= va < segment.vaddr_end: + return segment + return None + + +def _get_monotonic_runs(values: List[int]) -> List[Tuple[int, int]]: + if not values: + return [] + + runs: List[Tuple[int, int]] = [] + run_start = 0 + prior_value = values[0] + + for i, value in enumerate(values[1:], start=1): + if value < prior_value: + runs.append((run_start, i - 1)) + run_start = i + prior_value = value + + runs.append((run_start, len(values) - 1)) + # longest runs are prioritized + runs.sort(key=lambda run: run[1] - run[0] + 1, reverse=True) + return runs + + +def _iter_anchor_indices(run_start: int, run_end: int) -> Iterable[int]: + """ + Sometimes the midpoint candidate is invalid (not UTF-8 or not a real Go string) + by trying nearby candidates, FLOSS can recover and find a valid blob anchor + """ + run_mid = (run_start + run_end) // 2 + yield run_mid + + for delta in range(1, run_end - run_start + 1): + left = run_mid - delta + right = run_mid + delta + + if left >= run_start: + yield left + if right <= run_end: + yield right + + +def _find_blob_range_from_anchor(view: ELF, ptr: int) -> Tuple[int, int]: + segment = _find_segment_for_va(view, ptr) + if segment is None: + raise ValueError("string data pointer is not in a load segment") + + segment_data = view.data[segment.file_off : segment.file_end] + instance_offset = ptr - segment.vaddr_start + + next_null = segment_data.find(b"\x00\x00\x00\x00", instance_offset) + assert next_null != -1 + + prev_null = segment_data.rfind(b"\x00\x00\x00\x00", 0, instance_offset) + assert prev_null != -1 + + blob_start = segment.vaddr_start + prev_null + blob_end = segment.vaddr_start + next_null + logger.debug("string blob: [0x%x-0x%x]", blob_start, blob_end) + return blob_start, blob_end + + +def find_string_blob_range_elf( + view: ELF, struct_strings: List[StructString], ptrs_by_struct_va: Dict[int, int] +) -> Tuple[int, int]: + """ + Find the range of the string blob, as loaded in memory. + This is an improvement from the PE version as ELF Go binaries are + less uniform across versions so one run/one midpoint is less reliable + This one: + - builds all monotic runs,sorts by length and tries several + - tries many anchor points around midpoint + - scores blobs + """ + if not struct_strings: + raise ValueError("no struct string candidates") + + struct_strings.sort(key=lambda s: s.address) + + # no need to compute the single longest monotonic run + # here we compute all monotonic runs + lengths = list(map(lambda s: s.length, struct_strings)) + runs = _get_monotonic_runs(lengths) + + if not runs: + raise ValueError("failed to find monotonic runs") + + sorted_ptrs = sorted(ptrs_by_struct_va.values()) + + def score_blob_range(blob_start: int, blob_end: int) -> Tuple[int, int]: + # a good candidate range contains many pointers and has reasonable size + if blob_end <= blob_start: + return (0, 0) + + # num of pointers in the blob range + count = bisect_left(sorted_ptrs, blob_end) - bisect_left(sorted_ptrs, blob_start) + return count, blob_end - blob_start + + # if no valid range is found , this is raised + first_error: ValueError | None = None + best_blob_range: Tuple[int, int] | None = None + best_score = (0, 0) + + # iterate over monotonic runs + for run_start, run_end in runs[:MAX_RUNS_TO_TRY]: + readonly_anchor_indices: List[int] = [] + readable_anchor_indices: List[int] = [] + + for i in _iter_anchor_indices(run_start, run_end): + instance = struct_strings[i] + ptr = ptrs_by_struct_va.get(instance.address) + if ptr is None: + continue + + if _is_va_in_readonly_segments(view, ptr): + readonly_anchor_indices.append(i) + else: + readable_anchor_indices.append(i) + + attempts = 0 + # iterate over anchor points + for i in readonly_anchor_indices + readable_anchor_indices: + if attempts >= MAX_ANCHOR_ATTEMPTS_PER_RUN: + break + attempts += 1 + + instance = struct_strings[i] + ptr = ptrs_by_struct_va.get(instance.address) + if ptr is None: + continue + + try: + # validate and score anchors + s = read_struct_string_elf(view, ptr, instance) + logger.debug("string blob: struct string instance: 0x%x: %s...", instance.address, s[:16]) + blob_start, blob_end = _find_blob_range_from_anchor(view, ptr) + score = score_blob_range(blob_start, blob_end) + # track the best blob range + if score > best_score: + best_score = score + best_blob_range = (blob_start, blob_end) + except ValueError as exc: + if first_error is None: + first_error = exc + + if best_blob_range is not None: + logger.debug( + "string blob: selected best ELF blob range [0x%x-0x%x], score=(%d pointers, %d bytes)", + best_blob_range[0], + best_blob_range[1], + best_score[0], + best_score[1], + ) + return best_blob_range + + if first_error is not None: + raise first_error + + raise ValueError("failed to find valid string blob anchor") + + +def get_string_blob_strings_elf(view: ELF, min_length: int) -> Iterable[StaticString]: + """ + For the given ELF file compiled by Go, + find the string blob and then extract strings from it. + """ + + with floss.utils.timing("find struct string candidates"): + deduped_candidates: Dict[int, Tuple[StructString, int]] = {} + for struct_string, ptr in get_struct_string_candidates_elf(view): + deduped_candidates.setdefault(struct_string.address, (struct_string, ptr)) + + struct_string_candidates = list(sorted(deduped_candidates.values(), key=lambda candidate: candidate[0].address)) + struct_strings = [struct_string for struct_string, _ in struct_string_candidates] + ptrs_by_struct_va = {struct_string.address: ptr for struct_string, ptr in struct_string_candidates} + + if not struct_strings: + logger.warning( + "Failed to find struct string candidates: Is this a Go binary? If so, the Go version may be unsupported." + ) + return + + with floss.utils.timing("find string blob"): + try: + string_blob_start, string_blob_end = find_string_blob_range_elf(view, struct_strings, ptrs_by_struct_va) + except ValueError: + logger.warning( + "Failed to find string blob range: Is this a Go binary? If so, the Go version may be unsupported." + ) + return + + with floss.utils.timing("collect string blob strings"): + string_blob_size = string_blob_end - string_blob_start + string_blob_buf = view.read_va(string_blob_start, string_blob_size) + + string_blob_pointers: List[int] = [] + + for instance in struct_strings: + ptr = ptrs_by_struct_va.get(instance.address) + if ptr is None: + continue + + if not (string_blob_start <= ptr < string_blob_end): + continue + + string_blob_pointers.append(ptr) + + if not string_blob_pointers: + return + + last_size = 0 + string_blob_pointers = list(sorted(set(string_blob_pointers))) + for start, end in zip(string_blob_pointers, string_blob_pointers[1:]): + assert string_blob_start <= start < string_blob_end + assert string_blob_start <= end < string_blob_end + + size = end - start + string_blob_offset = start - string_blob_start + sbuf = string_blob_buf[string_blob_offset : string_blob_offset + size] + + try: + s = sbuf.decode("utf-8") + except UnicodeDecodeError: + continue + + if not s: + continue + + if last_size > len(s): + logger.warning("probably missed a string blob string ending at: 0x%x", start - 1) + + try: + string = StaticString.from_utf8(sbuf, view.va_to_file_offset(start), min_length) + yield string + except ValueError: + pass + + last_size = len(s) + + last_pointer = string_blob_pointers[-1] + last_pointer_offset = last_pointer - string_blob_start + last_buf = string_blob_buf[last_pointer_offset:] + for size in range(len(last_buf), 0, -1): + try: + _ = last_buf[:size].decode("utf-8") + except UnicodeDecodeError: + continue + else: + try: + string = StaticString.from_utf8(last_buf[:size], view.va_to_file_offset(last_pointer), min_length) + yield string + except ValueError: + pass + break + + +def extract_go_strings_elf(sample, min_length: int) -> List[StaticString]: + """ + Extract Go strings from the given ELF file. + """ + view = ELF(sample) + return list(get_string_blob_strings_elf(view, min_length)) + + +def get_static_strings_from_blob_range_elf(sample, static_strings: List[StaticString]) -> List[StaticString]: + view = ELF(sample) + + deduped_candidates: Dict[int, Tuple[StructString, int]] = {} + for struct_string, ptr in get_struct_string_candidates_elf(view): + deduped_candidates.setdefault(struct_string.address, (struct_string, ptr)) + + struct_string_candidates = list(sorted(deduped_candidates.values(), key=lambda candidate: candidate[0].address)) + struct_strings = [struct_string for struct_string, _ in struct_string_candidates] + ptrs_by_struct_va = {struct_string.address: ptr for struct_string, ptr in struct_string_candidates} + + if not struct_strings: + return [] + + try: + string_blob_start, string_blob_end = find_string_blob_range_elf(view, struct_strings, ptrs_by_struct_va) + except ValueError: + return [] + + string_blob_start = view.va_to_file_offset(string_blob_start) + string_blob_end = view.va_to_file_offset(string_blob_end) + + return list(filter(lambda s: string_blob_start <= s.offset < string_blob_end, static_strings)) diff --git a/floss/language/identify.py b/floss/language/identify.py index bc22cfc5b..bd1a65c9b 100644 --- a/floss/language/identify.py +++ b/floss/language/identify.py @@ -21,7 +21,9 @@ import pefile import floss.logging_ +from floss.const import SUPPORTED_FILE_MAGIC_PE, SUPPORTED_FILE_MAGIC_ELF from floss.results import StaticString +from floss.language.elf import ELF from floss.language.utils import get_rdata_section from floss.language.rust.rust_version_database import rust_commit_hash @@ -30,6 +32,24 @@ VERSION_UNKNOWN_OR_NA = "version unknown" +GO_MAGIC = [ + b"\xf0\xff\xff\xff\x00\x00", + b"\xfb\xff\xff\xff\x00\x00", + b"\xfa\xff\xff\xff\x00\x00", + b"\xf1\xff\xff\xff\x00\x00", +] + +GO_FUNCTIONS = [ + b"runtime.main", + b"main.main", + b"runtime.gcWork", + b"runtime.morestack", + b"runtime.morestack_noctxt", + b"runtime.newproc", + b"runtime.gcWriteBarrier", + b"runtime.Gosched", +] + class Language(Enum): GO = "go" @@ -45,25 +65,44 @@ def identify_language_and_version(sample: Path, static_strings: Iterable[StaticS logger.info("Rust binary found with version: %s", version) return Language.RUST, version - # open file as PE for further checks - try: - pe = pefile.PE(str(sample)) - except pefile.PEFormatError as err: - logger.debug( - f"FLOSS currently only detects if Windows PE files were written in Go or .NET. " - f"This is not a valid PE file: {err}" - ) + from floss.main import get_file_type + + file_type = get_file_type(sample) + + # ELF Go binary + if file_type == SUPPORTED_FILE_MAGIC_ELF: + try: + elf_view = ELF(sample) + except ValueError as elf_err: + logger.debug(f"This is not a supported ELF file: {elf_err}") + return Language.UNKNOWN, VERSION_UNKNOWN_OR_NA + + is_go, version = get_if_go_and_version_elf(elf_view) + if is_go: + logger.info("Go ELF binary found with version %s", version) + return Language.GO, version + return Language.UNKNOWN, VERSION_UNKNOWN_OR_NA - is_go, version = get_if_go_and_version(pe) - if is_go: - logger.info("Go binary found with version %s", version) - return Language.GO, version - elif is_dotnet_bin(pe): - return Language.DOTNET, VERSION_UNKNOWN_OR_NA - else: + # PE Go bianry + if file_type == SUPPORTED_FILE_MAGIC_PE: + try: + pe = pefile.PE(str(sample)) + except pefile.PEFormatError as err: + logger.debug(f"This is not a valid PE file: {err}") + return Language.UNKNOWN, VERSION_UNKNOWN_OR_NA + + is_go, version = get_if_go_and_version(pe) + if is_go: + logger.info("Go binary found with version %s", version) + return Language.GO, version + if is_dotnet_bin(pe): + return Language.DOTNET, VERSION_UNKNOWN_OR_NA + return Language.UNKNOWN, VERSION_UNKNOWN_OR_NA + return Language.UNKNOWN, VERSION_UNKNOWN_OR_NA + def get_if_rust_and_version(static_strings: Iterable[StaticString]) -> Tuple[bool, str]: """ @@ -107,22 +146,6 @@ def get_if_go_and_version(pe: pefile.PE) -> Tuple[bool, str]: https://github.com/0xjiayu/go_parser/blob/865359c297257e00165beb1683ef6a679edc2c7f/pclntbl.py#L46 """ - go_magic = [ - b"\xf0\xff\xff\xff\x00\x00", - b"\xfb\xff\xff\xff\x00\x00", - b"\xfa\xff\xff\xff\x00\x00", - b"\xf1\xff\xff\xff\x00\x00", - ] - go_functions = [ - b"runtime.main", - b"main.main", - b"runtime.gcWork", - b"runtime.morestack", - b"runtime.morestack_noctxt", - b"runtime.newproc", - b"runtime.gcWriteBarrier", - b"runtime.Gosched", - ] # look for the .rdata section first try: section = get_rdata_section(pe) @@ -132,14 +155,14 @@ def get_if_go_and_version(pe: pefile.PE) -> Tuple[bool, str]: section_va = section.VirtualAddress section_size = section.SizeOfRawData section_data = section.get_data(section_va, section_size) - for magic in go_magic: + for magic in GO_MAGIC: if magic in section_data: pclntab_va = section_data.index(magic) + section_va if verify_pclntab(section, pclntab_va): return True, get_go_version(magic) # if not found, search in all the available sections - for magic in go_magic: + for magic in GO_MAGIC: for section in pe.sections: section_va = section.VirtualAddress section_size = section.SizeOfRawData @@ -159,7 +182,7 @@ def get_if_go_and_version(pe: pefile.PE) -> Tuple[bool, str]: section_va = section.VirtualAddress section_size = section.SizeOfRawData section_data = section.get_data(section_va, section_size) - for go_function in go_functions: + for go_function in GO_FUNCTIONS: if go_function in section_data: logger.info("Go binary found, function name %s", go_function) return True, VERSION_UNKNOWN_OR_NA @@ -169,7 +192,7 @@ def get_if_go_and_version(pe: pefile.PE) -> Tuple[bool, str]: section_va = section.VirtualAddress section_size = section.SizeOfRawData section_data = section.get_data(section_va, section_size) - for go_function in go_functions: + for go_function in GO_FUNCTIONS: if go_function in section_data: logger.info("Go binary found, function name %s", go_function) return True, VERSION_UNKNOWN_OR_NA @@ -211,6 +234,58 @@ def verify_pclntab(section, pclntab_va: int) -> bool: return True if pc_quanum in {1, 2, 4} and pointer_size in {4, 8} else False +def verify_pclntab_elf(view: ELF, pclntab_va: int) -> bool: + """ + Parse headers of pclntab to verify it is legit + """ + try: + pc_quanum = view.read_va(pclntab_va + 6, 1)[0] + pointer_size = view.read_va(pclntab_va + 7, 1)[0] + except Exception: + logger.debug("Error parsing ELF pclntab header") + return False + return True if pc_quanum in {1, 2, 4} and pointer_size in {4, 8} else False + + +def _iter_magic_matches(data: bytes, magic: bytes): + start = 0 + while True: + idx = data.find(magic, start) + if idx == -1: + break + yield idx + start = idx + 1 + + +def get_if_go_and_version_elf(elf_view: ELF) -> Tuple[bool, str]: + """ + Return if the ELF binary was compiled with Go and its version + """ + + ordered_segments = [ + list(elf_view.iter_readonly_segments()), + list(elf_view.iter_load_segments()), + ] + + for segments in ordered_segments: + for segment in segments: + segment_data = elf_view.data[segment.file_off : segment.file_end] + for magic in GO_MAGIC: + for match_offset in _iter_magic_matches(segment_data, magic): + pclntab_va = segment.vaddr_start + match_offset + if verify_pclntab_elf(elf_view, pclntab_va): + return True, get_go_version(magic) + + for segment in elf_view.iter_readable_segments(): + segment_data = elf_view.data[segment.file_off : segment.file_end] + for go_function in GO_FUNCTIONS: + if go_function in segment_data: + logger.info("Go ELF binary found, function name %s", go_function) + return True, VERSION_UNKNOWN_OR_NA + + return False, VERSION_UNKNOWN_OR_NA + + def is_dotnet_bin(pe: pefile.PE) -> bool: """ Check if the binary is .net or not diff --git a/floss/main.py b/floss/main.py index 6a56f28b2..d6b481305 100644 --- a/floss/main.py +++ b/floss/main.py @@ -41,6 +41,7 @@ import floss.language.go.coverage import floss.language.rust.extract import floss.language.rust.coverage +import floss.language.go.extract_elf from floss.const import ( MEGABYTE, MAX_FILE_SIZE, @@ -126,7 +127,8 @@ def make_parser(argv): " 1. Go: strings from binaries written in Go\n" " 2. Rust: strings from binaries written in Rust\n" ) - epilog = textwrap.dedent(""" + epilog = textwrap.dedent( + """ only displaying core arguments, run `floss -H` to see all supported options examples: @@ -138,8 +140,10 @@ def make_parser(argv): only extract stack and tight strings floss --only stack tight -- suspicious.exe - """) - epilog_advanced = textwrap.dedent(""" + """ + ) + epilog_advanced = textwrap.dedent( + """ examples: extract all strings from 32-bit shellcode floss -f sc32 shellcode.bin @@ -149,7 +153,8 @@ def make_parser(argv): extract strings from a binary written in Go (if automatic language identification fails) floss --language go program.exe - """) + """ + ) show_all_options = "-H" in argv @@ -592,6 +597,8 @@ def main(argv=None) -> int: if not static_strings: return 0 + file_type = get_file_type(sample) + static_runtime = get_runtime_diff(interim) # set language configurations selected_lang = Language(args.language) @@ -684,12 +691,24 @@ def main(argv=None) -> int: logger.info("extracting language-specific Go strings") interim = time() - results.strings.language_strings = floss.language.go.extract.extract_go_strings(sample, args.min_length) + if file_type == SUPPORTED_FILE_MAGIC_ELF: + results.strings.language_strings = floss.language.go.extract_elf.extract_go_strings_elf( + sample, args.min_length + ) + else: + results.strings.language_strings = floss.language.go.extract.extract_go_strings(sample, args.min_length) results.metadata.runtime.language_strings = get_runtime_diff(interim) # missed strings only includes non-identified strings in searched range # here currently only focus on strings in string blob range - string_blob_strings = floss.language.go.extract.get_static_strings_from_blob_range(sample, static_strings) + if file_type == SUPPORTED_FILE_MAGIC_ELF: + string_blob_strings = floss.language.go.extract_elf.get_static_strings_from_blob_range_elf( + sample, static_strings + ) + else: + string_blob_strings = floss.language.go.extract.get_static_strings_from_blob_range( + sample, static_strings + ) results.strings.language_strings_missed = floss.language.utils.get_missed_strings( string_blob_strings, results.strings.language_strings, args.min_length ) diff --git a/floss/render/default.py b/floss/render/default.py index 8ff973ce5..f5b3337a3 100644 --- a/floss/render/default.py +++ b/floss/render/default.py @@ -174,6 +174,10 @@ def strtime(seconds): def render_language_strings(language, language_strings, language_strings_missed, console, verbose, disable_headers): strings = sorted(language_strings + language_strings_missed, key=lambda s: s.offset) render_heading(f"FLOSS {language.upper()} STRINGS ({len(strings)})", console, verbose, disable_headers) + if not strings: + logger.info("no %s strings found", language) + return + offset_len = len(f"{strings[-1].offset}") for s in strings: if verbose == Verbosity.DEFAULT: diff --git a/pyproject.toml b/pyproject.toml index b42234e78..0a7209fb7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -103,6 +103,7 @@ dependencies = [ # we still support. "networkx>=3", + "pyelftools>=0.32", ] dynamic = ["version", "readme"] diff --git a/requirements.txt b/requirements.txt index c6617f4eb..c931400fc 100644 --- a/requirements.txt +++ b/requirements.txt @@ -38,6 +38,7 @@ pydantic==2.12.5 # but dependabot updates these separately (which is broken) and is annoying, # so we rely on pydantic to pull in the right version of pydantic-core. # pydantic-core==2.27.1 +pyelftools==0.32 pygments==2.20.0 pytest==9.0.2 pytest-cov==6.2.1 diff --git a/tests/test_language_elf.py b/tests/test_language_elf.py new file mode 100644 index 000000000..bc0f667d5 --- /dev/null +++ b/tests/test_language_elf.py @@ -0,0 +1,73 @@ +import pathlib + +import pytest + +from floss.language.elf import ELF, PF_R, Segment + + +@pytest.fixture(scope="module") +def elf_sample_path() -> pathlib.Path: + return pathlib.Path(__file__).parent / "data" / "language" / "go" / "go-hello" / "bin" / "go-hello" + + +def test_iter_load_segments(elf_sample_path): + elf = ELF(elf_sample_path) + + segments = list(elf.iter_load_segments()) + + assert len(segments) == 3 + assert segments[0].vaddr_start == 0x400000 + assert segments[0].vaddr_end == 0x481387 + assert segments[0].file_off == 0x0 + assert segments[0].file_end == 0x81387 + + +def test_va_mapping_and_read_known_text_bytes(elf_sample_path): + elf = ELF(elf_sample_path) + + assert elf.va_to_file_offset(0x401000) == 0x1000 + + expected = bytes.fromhex("49 3b 66 10 76 38 48 83 ec 18 48 89 6c 24 10 48") + assert elf.read_va(0x401000, len(expected)) == expected + + +def test_ranges_and_mapping_helpers(elf_sample_path): + elf = ELF(elf_sample_path) + + executable_segments = list(elf.iter_executable_segments()) + readonly_segments = list(elf.iter_readonly_segments()) + + assert len(executable_segments) == 1 + assert executable_segments[0].vaddr_start == 0x400000 + + assert readonly_segments + assert all(not (segment.flags & 1) for segment in readonly_segments) + + low_va, high_va = elf.get_mapped_range() + assert (low_va, high_va) == (0x400000, 0x55D830) + + assert elf.is_va_mapped(0x401000) + assert not elf.is_va_mapped(0x390000) + + +def test_read_va_across_adjacent_file_backed_segments(): + elf = ELF.__new__(ELF) + elf._data = b"ABCDEFGH" + elf._segments = [ + Segment(vaddr_start=0x1000, vaddr_end=0x1004, file_off=0, file_end=4, flags=PF_R), + Segment(vaddr_start=0x1004, vaddr_end=0x1008, file_off=4, file_end=8, flags=PF_R), + ] + + assert elf.read_va(0x1002, 4) == b"CDEF" + + +def test_read_va_rejects_non_file_backed_gap(): + elf = ELF.__new__(ELF) + elf._data = b"ABCDEFGH" + elf._segments = [ + Segment(vaddr_start=0x1000, vaddr_end=0x1008, file_off=0, file_end=4, flags=PF_R), + Segment(vaddr_start=0x1008, vaddr_end=0x100C, file_off=4, file_end=8, flags=PF_R), + ] + + with pytest.raises(ValueError, match="not fully file backed"): + elf.read_va(0x1002, 6) diff --git a/tests/test_language_extract_go_elf.py b/tests/test_language_extract_go_elf.py new file mode 100644 index 000000000..e26fb434a --- /dev/null +++ b/tests/test_language_extract_go_elf.py @@ -0,0 +1,29 @@ +from pathlib import Path + +import pytest + +from floss.results import StaticString, StringEncoding +from floss.language.go.extract_elf import extract_go_strings_elf + + +@pytest.fixture(scope="module") +def go_strings_elf(): + n = 6 + sample = Path(__file__).parent / "data" / "language" / "go" / "go-hello" / "bin" / "go-hello" + return extract_go_strings_elf(sample, n) + + +@pytest.mark.parametrize( + "string,offset,encoding", + [ + pytest.param('5:<=CLMPSZ[]`hms{} + @ P [(") )()\n, ->', 0x9951D, StringEncoding.UTF8), + pytest.param("boolcallcas1cas2cas3cas4cas5cas6", 0x996AA, StringEncoding.UTF8), + pytest.param("arrayclose", 0x997AB, StringEncoding.UTF8), + ], +) +def test_elf_go_string_offset(string, offset, encoding, go_strings_elf): + assert StaticString(string=string, offset=offset, encoding=encoding) in go_strings_elf + + +def test_extract_go_strings_elf_go_hello_not_empty(go_strings_elf): + assert go_strings_elf diff --git a/tests/test_language_id.py b/tests/test_language_id.py index c0382aa14..c82d1967a 100644 --- a/tests/test_language_id.py +++ b/tests/test_language_id.py @@ -3,12 +3,19 @@ import pytest from floss.utils import get_static_strings -from floss.language.identify import VERSION_UNKNOWN_OR_NA, Language, identify_language_and_version +from floss.language.elf import Segment +from floss.language.identify import ( + VERSION_UNKNOWN_OR_NA, + Language, + get_if_go_and_version_elf, + identify_language_and_version, +) @pytest.mark.parametrize( "binary_file, expected_result, expected_version", [ + ("data/language/go/go-hello/bin/go-hello", Language.GO, "1.20"), ("data/language/go/go-hello/bin/go-hello.exe", Language.GO, "1.20"), # Go sample with stomped PCNLTAB magic bytes, see https://github.com/mandiant/flare-floss/issues/840 ( @@ -34,3 +41,41 @@ def test_language_detection(binary_file, expected_result, expected_version): assert language == expected_result, f"Expected: {expected_result.value}, Actual: {language.value}" assert version == expected_version, f"Expected: {expected_version}, Actual: {version}" + + +def test_go_elf_magic_detection(): + CD = Path(__file__).resolve().parent + abs_path = (CD / "data/language/go/go-hello/bin/go-hello").resolve() + + assert abs_path.exists() + + from floss.language.elf import ELF + + is_go, version = get_if_go_and_version_elf(ELF(abs_path)) + + assert is_go is True + assert version == "1.20" + + +def test_go_elf_fallback_function_names(): + class FakeELFView: + def __init__(self): + self.data = b"xxxxruntime.mainyyyy" + self._segment = Segment(vaddr_start=0x1000, vaddr_end=0x1014, file_off=0, file_end=20, flags=4) + + def iter_readonly_segments(self): + return [self._segment] + + def iter_load_segments(self): + return [self._segment] + + def iter_readable_segments(self): + return [self._segment] + + def read_va(self, va, size): + raise ValueError("no magic in this test") + + is_go, version = get_if_go_and_version_elf(FakeELFView()) + + assert is_go is True + assert version == VERSION_UNKNOWN_OR_NA