diff --git a/.github/FUNDING.yml b/.github/FUNDING.yml deleted file mode 100644 index 5d517c8..0000000 --- a/.github/FUNDING.yml +++ /dev/null @@ -1 +0,0 @@ -custom: https://blog.skyplabs.net/support/ diff --git a/.github/dependabot.yml b/.github/dependabot.yml new file mode 100644 index 0000000..4ec4496 --- /dev/null +++ b/.github/dependabot.yml @@ -0,0 +1,20 @@ +version: 2 +updates: + + - package-ecosystem: pip + directory: "/" + schedule: + interval: weekly + time: "09:00" + timezone: Europe/Dublin + open-pull-requests-limit: 10 + target-branch: develop + + - package-ecosystem: github-actions + directory: "/" + schedule: + interval: weekly + time: "09:00" + timezone: Europe/Dublin + open-pull-requests-limit: 10 + target-branch: develop diff --git a/.github/settings.yml b/.github/settings.yml index 91eef3b..d43c5ba 100644 --- a/.github/settings.yml +++ b/.github/settings.yml @@ -17,11 +17,10 @@ repository: labels: - name: feature - oldname: enhancement description: New feature color: 84b6eb - - name: optimisation - description: Optimisation + - name: enhancement + description: Enhancement color: 84b6eb - name: refactor description: Refactoring @@ -36,6 +35,15 @@ labels: - name: ui description: Related to the user interface color: 1d76db + - name: exporters + description: Related to the exporters + color: 1d76db + - name: cli + description: Related to the CLI tool + color: 1d76db + - name: dependencies + description: Related to the dependencies + color: 1d76db - name: android description: Android platform support issues @@ -81,9 +89,21 @@ labels: color: 33aa3f - name: documentation - description: Documentation-related issue + description: Related to the documentation color: 2d2de2 + - name: packaging + description: Related to software packaging + color: 31f427 + + - name: testing + description: Related to software testing + color: efa5ef + + - name: ci/cd + description: Related to CI/CD + color: e85733 + - name: good first issue description: Good first issue color: 7057ff diff --git a/.github/workflows/test_and_publish.yml b/.github/workflows/test_and_publish.yml new file mode 100644 index 0000000..089b68a --- /dev/null +++ b/.github/workflows/test_and_publish.yml @@ -0,0 +1,118 @@ +name: Test and Publish + +on: + - push + - pull_request + +jobs: + test-code: + name: Test code + runs-on: ${{ matrix.os }} + strategy: + matrix: + os: [ubuntu-20.04, macos-10.15] + python-version: + - '3.7' + - '3.8' + - '3.9' + - '3.10' + + steps: + - name: Check out code + uses: actions/checkout@v3 + + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v3 + with: + python-version: ${{ matrix.python-version }} + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install .[tests] tox-gh-actions + + - name: Test with Tox + run: tox + + test-docs: + name: Test documentation + runs-on: ubuntu-20.04 + env: + PYTHON_VERSION: '3.x' + steps: + - name: Check out code + uses: actions/checkout@v3 + + - name: Set up Python ${{ env.PYTHON_VERSION }} + uses: actions/setup-python@v3 + with: + python-version: ${{ env.PYTHON_VERSION }} + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install .[docs] + + - name: Build documentation + working-directory: docs + run: make html + + publish-to-test-pypi: + name: Publish to TestPyPI + environment: staging + runs-on: ubuntu-20.04 + if: github.ref == 'refs/heads/main' && github.event_name == 'push' + needs: + - test-code + - test-docs + + steps: + - name: Check out code + uses: actions/checkout@v3 + + - name: Set up Python + uses: actions/setup-python@v3 + with: + python-version: '3.x' + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install --upgrade setuptools wheel twine build + + - name: Build and publish + env: + TWINE_USERNAME: '__token__' + TWINE_PASSWORD: ${{ secrets.PYPI_API_TOKEN }} + run: | + python -m build + twine upload --repository testpypi dist/* + + publish-to-pypi: + name: Publish to PyPI + environment: production + runs-on: ubuntu-20.04 + if: github.ref == 'refs/heads/main' && github.event_name == 'push' + needs: publish-to-test-pypi + + steps: + - name: Check out code + uses: actions/checkout@v3 + + - name: Set up Python + uses: actions/setup-python@v3 + with: + python-version: '3.x' + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install --upgrade setuptools wheel twine build + + - name: Build and publish + env: + TWINE_USERNAME: '__token__' + TWINE_PASSWORD: ${{ secrets.PYPI_API_TOKEN }} + run: | + python -m build + twine upload dist/* diff --git a/.readthedocs.yaml b/.readthedocs.yaml new file mode 100644 index 0000000..a479393 --- /dev/null +++ b/.readthedocs.yaml @@ -0,0 +1,17 @@ +version: 2 + +build: + os: ubuntu-20.04 + tools: + python: "3" + +python: + install: + - method: pip + path: . + extra_requirements: + - complete + - docs + +sphinx: + configuration: docs/conf.py diff --git a/.readthedocs.yml b/.readthedocs.yml deleted file mode 100644 index b752bb3..0000000 --- a/.readthedocs.yml +++ /dev/null @@ -1,5 +0,0 @@ -python: - version: 3 - pip_install: true - extra_requirements: - - docs diff --git a/.snyk b/.snyk deleted file mode 100644 index 8a475e4..0000000 --- a/.snyk +++ /dev/null @@ -1,2 +0,0 @@ -language-settings: - python: '3' diff --git a/.travis.yml b/.travis.yml deleted file mode 100644 index f60bf94..0000000 --- a/.travis.yml +++ /dev/null @@ -1,31 +0,0 @@ -sudo: false - -language: python - -install: - -script: python setup.py test - -jobs: - include: - - python: 3.4 - - python: 3.5 - - python: 3.6 - - python: 3.7 - dist: xenial - - stage: PyPI release - python: 3.7 - deploy: - provider: pypi - distributions: "sdist bdist_wheel" - user: skyplabs - password: - secure: AAdbZ/WxjoCcGV9BAvphrxNp48D3n6yt1FEWZwA2V1BK2SnVV6YwQX/r7ryat2uoYrTONnGf/P5QgXc2wThl5mzPiRMaaONOIkyrmi2ZCD86yHYy2QQPR1gO1QOYyIsx545FRB6PaGXBC99Hw30vh3HPQaCKoQ8/PY+u3QbVn47BSBPuIdBi/FLl81uYk0ZE1457TxHkYrSDheRA3JVs72oD+izeoS+JJ+1YX+2zreukk9xuNfuwtcXPAn5B8LH8yYkFp1dd2pRsQzYMB4aH3rz7QEzSZ0mLBr3/J2bFCldmT8NToRijIjZNk04ik8XlGQ7xmpYG9rYIAkWwSBYSsZbfeLBxoxxIBcUy9xVxveZSaNl7TcRchCsyVO8leuL9aLmKz3wuKhWCRxJQSUDLlo83LYoBaTifqstUO85gC8IxR/Y/yqkW8wfSfVgVaDi9ET3/7UgSZEQJFEqfiYGdnD6/IkAy2tUCRO5xNsXsOyVJ5A0CsDTTtvlEfGxf1UtQyt0BmRSGYLMTnDBStW1Oua2QfVcTKIJOdfEOyL/VWnn/f0RCJQiUkRo9OFmSywoFjgSC9Arejwsff5smEd5i/jTKk6rOoHgIMnAGxn+75BjF3vQ7usAJeEOlLzHB5puc5dKeCpn5rwxOHha1lfmr6kDs1ec5XhcgQKgvulYfjVQ= - on: - branch: master - tags: true - repo: SkypLabs/probequest - -notifications: - email: - on_success: never diff --git a/CHANGELOG.md b/CHANGELOG.md index 1b24ed9..7063ab5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,8 +1,54 @@ +# Changelog + +## v0.8.0 - Mar 22, 2022 + +### Breaking Changes + +* The PNL view has been removed. + +### Improvements + +* Add `pyproject.toml` and `setup.cfg` +* Remove argparse from dependencies (@fabaff) +* Use f-strings instead of `str.format()` +* Add support for Python 3.8, 3.9 and 3.10 +* Drop support for Python 3.4, 3.5 and 3.6 +* Make some dependencies optional +* Refactor code around Scapy's PipeTools +* Add metavars to argument parser +* Turn `interface` option into argument +* Cache the compiled regex in `Config` once computed +* Cache the frame filter in `Config` once computed +* Cache the MAC address' OUI in `ProbeRequest` +* Use the logging package +* Add extra dependency group `tests` +* Add unit tests for the argument parser +* Add `__version__` attribute to package +* Use an entry point to generate the CLI tool +* Use tox for unit testing + +### Fixes + +* Fix interface checking +* Close open files before exiting +* Use a fake `Config` object in unit tests +* Fix linting issues + +### Infrastructure + +* Upgrade RTD configuration file to version 2 +* Monitor GH Actions dependencies with Dependabot +* Use `main` as branch for production releases +* Upgrade to GitHub-native Dependabot +* Add macOS to build matrix +* Switch from Travis CI to GitHub Actions + ## v0.7.2 - Aug 26, 2019 ### Improvements -* Use the new [Scapy built-in asynchronous sniffer](https://scapy.readthedocs.io/en/latest/usage.html#asynchronous-sniffing) +* Use the new [Scapy built-in asynchronous + sniffer](https://scapy.readthedocs.io/en/latest/usage.html#asynchronous-sniffing) * Introduce the new `Config` object containing the configuration of ProbeQuest ### Fixes @@ -17,8 +63,10 @@ ### Fixes -* Error when trying to decode ESSIDs using invalid UTF-8 characters ([#4](https://github.com/SkypLabs/probequest/issues/4)) -* Arguments not working (-e, -r) ([#17](https://github.com/SkypLabs/probequest/issues/17)) +* Error when trying to decode ESSIDs using invalid UTF-8 characters + ([#4](https://github.com/SkypLabs/probequest/issues/4)) +* Arguments not working (-e, -r) + ([#17](https://github.com/SkypLabs/probequest/issues/17)) ## v0.7.0 - Oct 8, 2018 @@ -34,7 +82,9 @@ ### Fixes -* Test if a packet has a `Dot11ProbeReq` layer before parsing it ([#5](https://github.com/SkypLabs/probequest/issues/5), [#8](https://github.com/SkypLabs/probequest/issues/8)) +* Test if a packet has a `Dot11ProbeReq` layer before parsing it + ([#5](https://github.com/SkypLabs/probequest/issues/5), + [#8](https://github.com/SkypLabs/probequest/issues/8)) ## v0.6.1 - May 28, 2018 @@ -71,7 +121,8 @@ The project has been renamed to ProbeQuest. ### Fixes -* The sniffer stops after having received the first frame ([#3](https://github.com/SkypLabs/probequest/issues/3)) +* The sniffer stops after having received the first frame + ([#3](https://github.com/SkypLabs/probequest/issues/3)) ## v0.5.0 - Feb 7, 2018 diff --git a/README.rst b/README.rst index 1686b1a..6ef5ddd 100644 --- a/README.rst +++ b/README.rst @@ -2,14 +2,20 @@ ProbeQuest ========== -|PyPI Package| |PyPI Downloads| |PyPI Python Versions| |Build Status Master Branch| |Build Status Develop Branch| |Code Coverage| |LGTM Grade| |LGTM Alerts| |Documentation Status| +|PyPI Package| |PyPI Downloads| |PyPI Python Versions| |Build Status| |LGTM +Grade| |LGTM Alerts| |Documentation Status| -Toolkit allowing to sniff and display the Wi-Fi probe requests passing nearby your wireless interface. +Toolkit allowing to sniff and display the Wi-Fi probe requests passing nearby +your wireless interface. -Probe requests are sent by a station to elicit information about access points, in particular to determine if an access point is present or not in the nearby environment. Some devices (mostly smartphones and tablets) use these requests to determine if one of the networks they have previously been connected to is in range, leaking personal information. +Probe requests are sent by a station to elicit information about access points, +in particular to determine if an access point is present or not in the nearby +environment. Some devices (mostly smartphones and tablets) use these requests +to determine if one of the networks they have previously been connected to is +in range, leaking personal information. -Further details are discussed in `this -paper `__. +Further details are discussed in `this paper +`__. .. image:: docs/_static/img/probequest_demo.gif :target: https://asciinema.org/a/205172 @@ -25,51 +31,56 @@ Installation Documentation ============= -The project is documented `here `__. +The project is documented `here +`__. In the Media ============ ProbeQuest has appeared in the following media: -- `KitPloit `__ -- `Hakin9 Magazine, VOL.13, NO. 05, "Open Source Hacking Tools" `__ -- `WonderHowTo `__ (including a `YouTube video `__) -- `ShellVoide `__ -- `Cyber Pi Projects `__ (`Worksheet `__) +- `KitPloit + `__ +- `Hakin9 Magazine, VOL.13, NO. 05, "Open Source Hacking Tools" + `__ +- `WonderHowTo + `__ + (including a `YouTube video `__) +- `ShellVoide + `__ +- `Cyber Pi Projects + `__ (`Worksheet + `__) License ======= `GPL version 3 `__ -.. |Build Status Master Branch| image:: https://img.shields.io/travis/SkypLabs/probequest/master.svg?label=master&logo=travis&style=flat - :target: https://travis-ci.org/SkypLabs/probequest - :alt: Build Status Master Branch -.. |Build Status Develop Branch| image:: https://img.shields.io/travis/SkypLabs/probequest/develop.svg?label=develop&logo=travis&style=flat - :target: https://travis-ci.org/SkypLabs/probequest +.. |Build Status| image:: https://github.com/SkypLabs/probequest/actions/workflows/test_and_publish.yml/badge.svg?branch=develop + :target: https://github.com/SkypLabs/probequest/actions/workflows/test_and_publish.yml?query=branch%3Adevelop :alt: Build Status Develop Branch -.. |Code Coverage| image:: https://api.codacy.com/project/badge/Grade/16b9e70e51744256b37099ae8fe9132d - :target: https://www.codacy.com/app/skyper/probequest?utm_source=github.com&utm_medium=referral&utm_content=SkypLabs/probequest&utm_campaign=Badge_Grade - :alt: Code Coverage + .. |Documentation Status| image:: https://readthedocs.org/projects/probequest/badge/?version=latest - :target: http://probequest.readthedocs.io/en/latest/?badge=latest + :target: https://probequest.readthedocs.io/en/latest/?badge=latest :alt: Documentation Status -.. |Known Vulnerabilities| image:: https://snyk.io/test/github/SkypLabs/probequest/badge.svg - :target: https://snyk.io/test/github/SkypLabs/probequest - :alt: Known Vulnerabilities + .. |LGTM Alerts| image:: https://img.shields.io/lgtm/alerts/g/SkypLabs/probequest.svg?logo=lgtm&logoWidth=18 :target: https://lgtm.com/projects/g/SkypLabs/probequest/alerts/ :alt: LGTM Alerts + .. |LGTM Grade| image:: https://img.shields.io/lgtm/grade/python/g/SkypLabs/probequest.svg?logo=lgtm&logoWidth=18 :target: https://lgtm.com/projects/g/SkypLabs/probequest/context:python :alt: LGTM Grade + .. |PyPI Downloads| image:: https://img.shields.io/pypi/dm/probequest.svg?style=flat :target: https://pypi.org/project/probequest/ :alt: PyPI Package Downloads Per Month + .. |PyPI Package| image:: https://img.shields.io/pypi/v/probequest.svg?style=flat :target: https://pypi.org/project/probequest/ :alt: PyPI Package Latest Release + .. |PyPI Python Versions| image:: https://img.shields.io/pypi/pyversions/probequest.svg?logo=python&style=flat :target: https://pypi.org/project/probequest/ :alt: PyPI Package Python Versions diff --git a/bin/probequest b/bin/probequest deleted file mode 100755 index a2ae32e..0000000 --- a/bin/probequest +++ /dev/null @@ -1,149 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- - -""" -Toolkit allowing to sniff and display the Wi-Fi probe requests passing nearby -your wireless interface. -""" - -# pylint: disable=no-name-in-module - -from argparse import ArgumentParser, FileType - -from probequest.config import Config, Mode -from probequest.version import VERSION - - -def get_arg_parser(): - """ - Returns the argument parser. - """ - - arg_parser = ArgumentParser( - description="Toolkit for Playing with Wi-Fi Probe Requests" - ) - arg_parser.add_argument( - "--debug", action="store_true", - dest="debug", - help="debug mode" - ) - arg_parser.add_argument( - "--fake", action="store_true", - dest="fake", - help="display only fake ESSIDs") - arg_parser.add_argument( - "-i", "--interface", - required=True, - dest="interface", - help="wireless interface to use (must be in monitor mode)" - ) - arg_parser.add_argument( - "--ignore-case", action="store_true", - dest="ignore_case", - help="ignore case distinctions in the regex pattern (default: false)" - ) - arg_parser.add_argument( - "--mode", - type=Mode, choices=Mode.__members__.values(), - dest="mode", - help="set the mode to use" - ) - arg_parser.add_argument( - "-o", "--output", - type=FileType("a"), - dest="output_file", - help="output file to save the captured data (CSV format)" - ) - arg_parser.add_argument("--version", action="version", version=VERSION) - arg_parser.set_defaults(debug=False) - arg_parser.set_defaults(fake=False) - arg_parser.set_defaults(ignore_case=False) - arg_parser.set_defaults(mode=Mode.RAW) - - essid_arguments = arg_parser.add_mutually_exclusive_group() - essid_arguments.add_argument( - "-e", "--essid", - nargs="+", - dest="essid_filters", - help="ESSID of the APs to filter (space-separated list)" - ) - essid_arguments.add_argument( - "-r", "--regex", - dest="essid_regex", - help="regex to filter the ESSIDs" - ) - - station_arguments = arg_parser.add_mutually_exclusive_group() - station_arguments.add_argument( - "--exclude", - nargs="+", - dest="mac_exclusions", - help="MAC addresses of the stations to exclude (space-separated list)" - ) - station_arguments.add_argument( - "-s", "--station", - nargs="+", - dest="mac_filters", - help="MAC addresses of the stations to filter (space-separated list)" - ) - - return arg_parser - - -def main(): - """ - Main function. - """ - - from os import geteuid - from sys import exit as sys_exit - - config = Config() - get_arg_parser().parse_args(namespace=config) - - if not geteuid() == 0: - sys_exit("[!] You must be root") - - # Default mode. - if config.mode == Mode.RAW: - from time import sleep - from probequest.ui.raw import RawProbeRequestViewer - - try: - print("[*] Start sniffing probe requests...") - raw_viewer = RawProbeRequestViewer(config) - raw_viewer.start() - - while True: - sleep(100) - except OSError: - raw_viewer.stop() - sys_exit( - "[!] Interface {interface} doesn't exist".format( - interface=config.interface - ) - ) - except KeyboardInterrupt: - print("[*] Stopping the threads...") - raw_viewer.stop() - print("[*] Bye!") - elif config.mode == Mode.PNL: - from probequest.ui.pnl import PNLViewer - - try: - pnl_viewer = PNLViewer(config) - pnl_viewer.main() - except OSError: - sys_exit( - "[!] Interface {interface} doesn't exist".format( - interface=config.interface - ) - ) - except KeyboardInterrupt: - pnl_viewer.sniffer.stop() - else: - sys_exit("[x] Invalid mode") - - -if __name__ == "__main__": - main() diff --git a/docs/conf.py b/docs/conf.py index 86724fa..c985dc4 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -1,10 +1,10 @@ -# -*- coding: utf-8 -*- -# # Configuration file for the Sphinx documentation builder. # -# This file does only contain a selection of the most common options. For a -# full list see the documentation: -# http://www.sphinx-doc.org/en/master/config +# This file only contains a selection of the most common options. For a full +# list see the documentation: +# https://www.sphinx-doc.org/en/master/usage/configuration.html + +# pylint: skip-file # -- Path setup -------------------------------------------------------------- @@ -12,40 +12,30 @@ # add these directories to sys.path here. If the directory is relative to the # documentation root, use os.path.abspath to make it absolute, like shown here. # +# import os +# import sys +# sys.path.insert(0, os.path.abspath('.')) -# pylint: skip-file - -import os -import sys -from probequest.version import VERSION +from probequest import __version__ as VERSION -sys.path.insert(0, os.path.abspath('..')) # -- Project information ----------------------------------------------------- project = 'ProbeQuest' -copyright = '2019, Paul-Emmanuel Raoul' +copyright = '2022, Paul-Emmanuel Raoul' author = 'Paul-Emmanuel Raoul' -# The short X.Y version -version = ".".join(VERSION.split(".")[:2]) # The full version, including alpha/beta/rc tags release = VERSION # -- General configuration --------------------------------------------------- -# If your documentation needs a minimal Sphinx version, state it here. -# -# needs_sphinx = '1.0' - # Add any Sphinx extension module names here, as strings. They can be # extensions coming with Sphinx (named 'sphinx.ext.*') or your custom # ones. extensions = [ 'sphinx.ext.autodoc', - 'sphinx.ext.graphviz', - 'sphinx.ext.mathjax', 'sphinx.ext.todo', 'sphinx.ext.viewcode', 'sphinxarg.ext', @@ -55,30 +45,14 @@ # Add any paths that contain templates here, relative to this directory. templates_path = ['_templates'] -# The suffix(es) of source filenames. -# You can specify multiple suffix as a list of string: -# -# source_suffix = ['.rst', '.md'] -source_suffix = '.rst' - # The master toctree document. master_doc = 'index' -# The language for content autogenerated by Sphinx. Refer to documentation -# for a list of supported languages. -# -# This is also used if you do content translation via gettext catalogs. -# Usually you set "language" from the command line for these cases. -language = None - # List of patterns, relative to source directory, that match files and # directories to ignore when looking for source files. -# This pattern also affects html_static_path and html_extra_path . +# This pattern also affects html_static_path and html_extra_path. exclude_patterns = ['_build', 'Thumbs.db', '.DS_Store'] -# The name of the Pygments (syntax highlighting) style to use. -pygments_style = 'sphinx' - # -- Options for HTML output ------------------------------------------------- @@ -87,105 +61,11 @@ # html_theme = 'sphinx_rtd_theme' -# Theme options are theme-specific and customize the look and feel of a theme -# further. For a list of options available for each theme, see the -# documentation. -# -# html_theme_options = {} - # Add any paths that contain custom static files (such as style sheets) here, # relative to this directory. They are copied after the builtin static files, # so a file named "default.css" will overwrite the builtin "default.css". html_static_path = ['_static'] -# Custom sidebar templates, must be a dictionary that maps document names -# to template names. -# -# The default sidebars (for documents that don't match any pattern) are -# defined by theme itself. Builtin themes are using these templates by -# default: ``['localtoc.html', 'relations.html', 'sourcelink.html', -# 'searchbox.html']``. -# -# html_sidebars = {} - - -# -- Options for HTMLHelp output --------------------------------------------- - -# Output file base name for HTML help builder. -htmlhelp_basename = 'ProbeQuestdoc' - - -# -- Options for LaTeX output ------------------------------------------------ - -latex_elements = { - # The paper size ('letterpaper' or 'a4paper'). - # - # 'papersize': 'letterpaper', - - # The font size ('10pt', '11pt' or '12pt'). - # - # 'pointsize': '10pt', - - # Additional stuff for the LaTeX preamble. - # - # 'preamble': '', - - # Latex figure (float) alignment - # - # 'figure_align': 'htbp', -} - -# Grouping the document tree into LaTeX files. List of tuples -# (source start file, target name, title, -# author, documentclass [howto, manual, or own class]). -latex_documents = [ - (master_doc, 'ProbeQuest.tex', 'ProbeQuest Documentation', - 'Paul-Emmanuel Raoul', 'manual'), -] - - -# -- Options for manual page output ------------------------------------------ - -# One entry per manual page. List of tuples -# (source start file, name, description, authors, manual section). -man_pages = [ - (master_doc, 'probequest', 'ProbeQuest Documentation', - [author], 1) -] - - -# -- Options for Texinfo output ---------------------------------------------- - -# Grouping the document tree into Texinfo files. List of tuples -# (source start file, target name, title, author, -# dir menu entry, description, category) -texinfo_documents = [ - (master_doc, 'ProbeQuest', 'ProbeQuest Documentation', - author, 'ProbeQuest', 'One line description of project.', - 'Miscellaneous'), -] - - -# -- Options for Epub output ------------------------------------------------- - -# Bibliographic Dublin Core info. -epub_title = project -epub_author = author -epub_publisher = author -epub_copyright = copyright - -# The unique identifier of the text. This can be a ISBN number -# or the project homepage. -# -# epub_identifier = '' - -# A unique identification for the text. -# -# epub_uid = '' - -# A list of files that should not be packed into the epub file. -epub_exclude_files = ['search.html'] - # -- Extension configuration ------------------------------------------------- @@ -194,17 +74,19 @@ # If true, `todo` and `todoList` produce output, else they produce nothing. todo_include_todos = True + # -- Options for sphinxcontrib-seqdiag extension ----------------------------- # Fontpath for seqdiag (truetype font). seqdiag_fontpath = '/usr/share/fonts/truetype/ipafont/ipagp.ttf' + # -- Options for GitHub integration ------------------------------------------ html_context = { 'display_github': True, # Integrate GitHub 'github_user': 'SkypLabs', # Username 'github_repo': 'probequest', # Repo name - 'github_version': 'master', # Version + 'github_version': 'develop', # Version 'conf_py_path': '/docs/', # Path in the checkout to the docs root } diff --git a/docs/development.rst b/docs/development.rst index d7a9a2c..efe8b71 100644 --- a/docs/development.rst +++ b/docs/development.rst @@ -5,27 +5,31 @@ Development Running the unit tests ---------------------- -To run the unit tests: +`tox`_ is used to run the unit tests: :: - python3 setup.py test + tox Releasing a new version ----------------------- -Below are the different steps to do before releasing a new version: +Below are the different steps to follow before releasing a new version: -- Run all tests and be sure they all pass -- Update the `VERSION` variable in `probequest/version.py` -- Update the requirements in `setup.py` if needed -- Update the package's metadata (description, classifiers, etc) in `setup.py` if needed -- Update `README.rst` if needed -- Update the documentation if needed and make sure it compiles well (`cd ./docs && make html`) -- Update the copyright year in `docs/conf.py` if needed -- Add the corresponding release note to `CHANGELOG.md` +- Run all tests and be sure they all pass. +- Update the `version` field in `setup.cfg`. +- Update the requirements in `setup.cfg` if needed. +- Update the package's metadata (description, classifiers, etc.) in `setup.cfg` + if needed. +- Update `README.rst` if needed. +- Update the documentation if needed and make sure it compiles well (`cd ./docs + && make html`). +- Update the copyright year in `docs/conf.py` if needed. +- Add the corresponding release note to `CHANGELOG.md`. After having pushed the new release: -- Edit the release note on GitHub +- Create the corresponding release note on GitHub. + +.. _tox: https://tox.readthedocs.io diff --git a/docs/index.rst b/docs/index.rst index 8f252ed..be78887 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -6,7 +6,8 @@ Welcome to ProbeQuest's documentation! ====================================== -ProbeQuest is a toolkit allowing to sniff and display the Wi-Fi probe requests passing nearby your wireless interface. +ProbeQuest is a toolkit allowing to sniff and display the Wi-Fi probe requests +passing nearby your wireless interface. This project has been inspired by `this paper`_. diff --git a/docs/installation.rst b/docs/installation.rst index 2e1e727..1d1db53 100644 --- a/docs/installation.rst +++ b/docs/installation.rst @@ -2,18 +2,25 @@ Installation ============ -Using pip (recommended) +From PyPI (recommended) ----------------------- :: - sudo pip3 install --upgrade probequest + pip3 install --upgrade probequest From sources ------------ +ProbeQuest is packaged with `Setuptools`_. + +The default Git branch is `develop`. To install the latest stable version, you +need to clone the `main` branch. + :: - git clone https://github.com/SkypLabs/probequest.git + git clone -b main https://github.com/SkypLabs/probequest.git cd probequest - sudo pip3 install --upgrade . + pip3 install --upgrade . + +.. _Setuptools: https://setuptools.pypa.io/ diff --git a/docs/mitigation.rst b/docs/mitigation.rst index e2ef680..64440fd 100644 --- a/docs/mitigation.rst +++ b/docs/mitigation.rst @@ -4,17 +4,35 @@ Mitigation As far as I know, there are two mitigation techniques: -- Don’t use probe requests at all. It is by far the most efficient way not to leak any piece of information. As said earlier, it is not necessary to rely on probe requests to get the list of the nearby access points since they broadcast their name by themselves. -- Randomise the source MAC address of each probe request sent. This way, it’s no longer possible for a third party to link probe requests to a specific device based on the Wi-Fi data collected. However, using a Software-Defined Radio to capture RF metadata such as the frequency offset, it would be possible to fingerprint each Wi-Fi packet and so each Wi-Fi device, regardless of their source MAC address (this technique will be implemented in ProbeQuest). +- Don’t use probe requests at all. It is by far the most efficient way not to + leak any piece of information. As said earlier, it is not necessary to rely on + probe requests to get the list of the nearby access points since they + broadcast their name by themselves. +- Randomise the source MAC address of each probe request sent. This way, it’s no + longer possible for a third party to link probe requests to a specific device + based on the Wi-Fi data collected. However, using a Software-Defined Radio to + capture RF metadata such as the frequency offset, it would be possible to + fingerprint each Wi-Fi packet and so each Wi-Fi device, regardless of their + source MAC address (this technique will be implemented in ProbeQuest). -In practice, you can install `Wi-Fi Privacy Police`_ from `F-Droid`_ or the `Play Store`_ to prevent your Android devices from leaking their PNL. +Android +------- + +Some Android-based operating systems, like `GrapheneOS`_, randomise the source +MAC address natively. Otherwise, you can install `Wi-Fi Privacy Police`_ from +`F-Droid`_ or the `Play Store`_ to prevent your Android devices from leaking +their PNL. .. image:: _static/img/wifi_privacy_police_main_screen.png Once installed, the **Privacy protection** option should be switched on. +iOS +--- + On iOS, the source MAC address is randomised since iOS 8. .. _F-Droid: https://f-droid.org/packages/be.uhasselt.privacypolice/ +.. _GrapheneOS: https://grapheneos.org/ .. _Play Store: https://play.google.com/store/apps/details?id=be.uhasselt.privacypolice .. _Wi-Fi Privacy Police: https://github.com/BramBonne/privacypolice diff --git a/docs/modules.rst b/docs/modules.rst index f36d0f7..301316c 100644 --- a/docs/modules.rst +++ b/docs/modules.rst @@ -6,4 +6,4 @@ Modules :maxdepth: 1 :glob: - modules/* + modules/** diff --git a/docs/modules/cli.rst b/docs/modules/cli.rst new file mode 100644 index 0000000..6b5e71a --- /dev/null +++ b/docs/modules/cli.rst @@ -0,0 +1,5 @@ +CLI +--- + +.. automodule:: probequest.cli + :members: diff --git a/docs/modules/exceptions.rst b/docs/modules/exceptions.rst new file mode 100644 index 0000000..f21c369 --- /dev/null +++ b/docs/modules/exceptions.rst @@ -0,0 +1,5 @@ +Exceptions +---------- + +.. automodule:: probequest.exceptions + :members: diff --git a/docs/modules/exporters/csv.rst b/docs/modules/exporters/csv.rst new file mode 100644 index 0000000..dfad80a --- /dev/null +++ b/docs/modules/exporters/csv.rst @@ -0,0 +1,6 @@ +CSV Exporter +------------ + +.. automodule:: probequest.exporters.csv + :members: + diff --git a/docs/modules/fake_packet_sniffer.rst b/docs/modules/fake_packet_sniffer.rst deleted file mode 100644 index bbd8e07..0000000 --- a/docs/modules/fake_packet_sniffer.rst +++ /dev/null @@ -1,5 +0,0 @@ -Fake Packet Sniffer -------------------- - -.. automodule:: probequest.fake_packet_sniffer - :members: diff --git a/docs/modules/packet_sniffer.rst b/docs/modules/packet_sniffer.rst deleted file mode 100644 index 9e885cc..0000000 --- a/docs/modules/packet_sniffer.rst +++ /dev/null @@ -1,5 +0,0 @@ -Packet Sniffer --------------- - -.. automodule:: probequest.packet_sniffer - :members: diff --git a/docs/modules/pnl.rst b/docs/modules/pnl.rst deleted file mode 100644 index a01f9dd..0000000 --- a/docs/modules/pnl.rst +++ /dev/null @@ -1,5 +0,0 @@ -PNL Viewer ----------- - -.. automodule:: probequest.ui.pnl - :members: diff --git a/docs/modules/probe_request_filter.rst b/docs/modules/probe_request_filter.rst new file mode 100644 index 0000000..e507514 --- /dev/null +++ b/docs/modules/probe_request_filter.rst @@ -0,0 +1,5 @@ +Probe Request Filter +-------------------- + +.. automodule:: probequest.probe_request_filter + :members: diff --git a/docs/modules/probe_request_parser.rst b/docs/modules/probe_request_parser.rst new file mode 100644 index 0000000..7c1945e --- /dev/null +++ b/docs/modules/probe_request_parser.rst @@ -0,0 +1,5 @@ +Probe Request Parser +-------------------- + +.. automodule:: probequest.probe_request_parser + :members: diff --git a/docs/modules/raw.rst b/docs/modules/raw.rst deleted file mode 100644 index 6354d42..0000000 --- a/docs/modules/raw.rst +++ /dev/null @@ -1,5 +0,0 @@ -Raw Probe Request Viewer ------------------------- - -.. automodule:: probequest.ui.raw - :members: diff --git a/docs/modules/sniffers/fake_probe_request_sniffer.rst b/docs/modules/sniffers/fake_probe_request_sniffer.rst new file mode 100644 index 0000000..2cf45ce --- /dev/null +++ b/docs/modules/sniffers/fake_probe_request_sniffer.rst @@ -0,0 +1,5 @@ +Fake Probe Request Sniffer +-------------------------- + +.. automodule:: probequest.sniffers.fake_probe_request_sniffer + :members: diff --git a/docs/modules/probe_request_sniffer.rst b/docs/modules/sniffers/probe_request_sniffer.rst similarity index 50% rename from docs/modules/probe_request_sniffer.rst rename to docs/modules/sniffers/probe_request_sniffer.rst index b095171..a3d379f 100644 --- a/docs/modules/probe_request_sniffer.rst +++ b/docs/modules/sniffers/probe_request_sniffer.rst @@ -1,5 +1,5 @@ Probe Request Sniffer --------------------- -.. automodule:: probequest.probe_request_sniffer +.. automodule:: probequest.sniffers.probe_request_sniffer :members: diff --git a/docs/modules/ui/console.rst b/docs/modules/ui/console.rst new file mode 100644 index 0000000..f5730fa --- /dev/null +++ b/docs/modules/ui/console.rst @@ -0,0 +1,5 @@ +Console +------- + +.. automodule:: probequest.ui.console + :members: diff --git a/docs/probe_requests.rst b/docs/probe_requests.rst index 1385d73..5113a2b 100644 --- a/docs/probe_requests.rst +++ b/docs/probe_requests.rst @@ -2,9 +2,15 @@ What are Wi-Fi probe requests? ============================== -Probe requests are sent by a station to elicit information about access points, in particular to determine if an access point is present or not in the nearby environment. Some devices (mostly smartphones and tablets) use these requests to determine if one of the networks they have previously been connected to is in range, leaking their preferred network list (PNL) and, therefore, your personal information. +Probe requests are sent by a station to elicit information about access points, +in particular to determine if an access point is present or not in the nearby +environment. Some devices (mostly smartphones and tablets) use these requests to +determine if one of the networks they have previously been connected to is in +range, leaking their preferred network list (PNL) and, therefore, your personal +information. -Below is a typical Wi-Fi authentication process between a mobile station (for example, your smartphone) and an access point (AP): +Below is a typical Wi-Fi authentication process between a mobile station (for +example, your smartphone) and an access point (AP): .. seqdiag:: @@ -22,8 +28,13 @@ Below is a typical Wi-Fi authentication process between a mobile station (for ex "Mobile Station" <-- "Access Point" [label = "Association Response"]; } -Step 1 is optional (and therefore, step 2) since the access points announce their presence by broadcasting their name (ESSID) using `beacon frames`_. Consequently, it is not necessary to rely on probe requests to get the list of the access points available. It is a design choice that, although it speeds up the discovery process, causes privacy and security issues. +Step 1 is optional (and therefore, step 2) since the access points announce +their presence by broadcasting their name (ESSID) using `beacon frames`_. +Consequently, it is not necessary to rely on probe requests to get the list of +the access points available. It is a design choice that, although it speeds up +the discovery process, causes privacy and security issues. -ProbeQuest can be used to leverage this leak of information to conduct diverse social engineering and network attacks. +ProbeQuest can be used to leverage this leak of information to conduct diverse +social engineering and network attacks. .. _beacon frames: https://en.wikipedia.org/wiki/Beacon_frame diff --git a/docs/security.rst b/docs/security.rst index 1e6e7ee..0e60a07 100644 --- a/docs/security.rst +++ b/docs/security.rst @@ -5,13 +5,16 @@ Security Policy Reporting a Vulnerability ------------------------- -If you have found a security issue in ProbeQuest, please disclose it responsibly by emailing me at `skyper(at)skyplabs[dot]net`. My PGP public key can be found on my `Keybase profile`_: +If you have found a security issue in ProbeQuest, please disclose it responsibly +by emailing me at `skyper(at)skyplabs[dot]net`. My PGP public key can be found +on my `Keybase profile`_: .. image:: https://img.shields.io/keybase/pgp/skyplabs.svg :target: https://keybase.io/skyplabs/pgp_keys.asc :alt: PGP key fingerprint -To facilitate the encryption process, you can use `this online tool`_. You can also use it to verify my signatures. +To facilitate the encryption process, you can use `this online tool`_. You can +also use it to verify my signatures. .. _Keybase profile: https://keybase.io/skyplabs .. _this online tool: https://keybase.io/encrypt#skyplabs diff --git a/docs/usage.rst b/docs/usage.rst index 859eace..96a41ab 100644 --- a/docs/usage.rst +++ b/docs/usage.rst @@ -5,7 +5,25 @@ Usage Enabling the monitor mode ------------------------- -To be able to sniff the probe requests, your Wi-Fi network interface must be set to monitor mode. +To be able to sniff the probe requests, your Wi-Fi network interface must be set +to `monitor mode`_. + +With `ip` and `iw` +^^^^^^^^^^^^^^^^^^ + +:: + + sudo ip link set down + sudo iw set monitor control + sudo ip link set up + +For example: + +:: + + sudo ip link set wlan0 down + sudo iw wlan0 set monitor control + sudo ip link set wlan0 up With `ifconfig` and `iwconfig` ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ @@ -49,7 +67,7 @@ Command line arguments ---------------------- .. argparse:: - :filename: ../bin/probequest + :module: probequest.cli :func: get_arg_parser :prog: probequest @@ -58,8 +76,10 @@ Example of use :: - sudo probequest -i wlan0 + sudo probequest wlan0 Here is a sample output: .. image:: _static/img/probequest_output_example.png + +.. _monitor mode: https://en.wikipedia.org/wiki/Monitor_mode diff --git a/docs/use_case.rst b/docs/use_case.rst index 97656d4..d2b56b7 100644 --- a/docs/use_case.rst +++ b/docs/use_case.rst @@ -2,18 +2,38 @@ Use Case ======== -Let's consider the following simple scenario inspired from a real data collection (the data have been anonymised): a device tries to connect to `John's iPhone`, `CompanyX_staff`, `STARBUCKS-FREE-WIFI` and `VM21ECAB2`. Based on this information, several assumptions can be made: +Let's consider the following simple scenario inspired from a real data +collection (the data have been anonymised): a device tries to connect to `John's +iPhone`, `CompanyX_staff`, `STARBUCKS-FREE-WIFI` and `VM21ECAB2`. Based on this +information, several assumptions can be made: - The device owner's name is John. -- The device is set in English and its owner speaks this language (otherwise it would have been `iPhone de John` in French, `iPhone von John` in German, etc). -- The device should be a laptop trying to connect to an iPhone in hotspot mode. The owner has consequently at least two devices and is nomad. +- The device is set in English and its owner speaks this language (otherwise it + would have been `iPhone de John` in French, `iPhone von John` in German, etc). +- The device should be a laptop trying to connect to an iPhone in hotspot mode. + The owner has consequently at least two devices and is nomad. - The owner works for CompanyX. - The owner frequents coffee shops, in particular StarBucks. - The owner is used to connecting to open Wi-Fi access points. -- `VM21ECAB2` seems to be a home access point and is the only one in the device's PNL. It is likely the owner's place and, consequently, the device's owner is a customer of Virgin Media. +- `VM21ECAB2` seems to be a home access point and is the only one in the + device's PNL. It is likely the owner's place and, consequently, the device's + owner is a customer of Virgin Media. -As you can see, the amount of data inferred from these four probe requests is already impressive, but we can go further. Relying on a database of Wi-Fi access points’ location, such as `WIGLE.net`_, it becomes possible to determine the places the device’s owner has previously been to. VM21ECAB2 should be a unique name, easily localisable on a map. Same for CompanyX_staff. If this last one is not unique (because CompanyX has several offices), crossing the data we have can help us in our investigation. For example, if CompanyX is present in several countries, we can assume that the device’s owner lives in a country where both CompanyX and Virgin Media are present. Once we have determined which office it is, we can suppose that the device’s owner is used to stopping in StarBucks located on their way from home to their office. +As you can see, the amount of data inferred from these four probe requests is +already impressive, but we can go further. Relying on a database of Wi-Fi access +points’ location, such as `WIGLE.net`_, it becomes possible to determine the +places the device’s owner has previously been to. VM21ECAB2 should be a unique +name, easily localisable on a map. Same for CompanyX_staff. If this last one is +not unique (because CompanyX has several offices), crossing the data we have can +help us in our investigation. For example, if CompanyX is present in several +countries, we can assume that the device’s owner lives in a country where both +CompanyX and Virgin Media are present. Once we have determined which office it +is, we can suppose that the device’s owner is used to stopping in StarBucks +located on their way from home to their office. -Profiling a person is the first step to conduct a social engineering attack. The more we know about our target, the better chance the attack has to succeed. Also, because we know which Wi-Fi access points our target’s devices will try to connect to, an evil twin attack is conceivable. +Profiling a person is the first step to conduct a social engineering attack. The +more we know about our target, the better chance the attack has to succeed. +Also, because we know which Wi-Fi access points our target’s devices will try to +connect to, an evil twin attack is conceivable. .. _WIGLE.net: https://wigle.net/ diff --git a/probequest/config.py b/probequest/config.py deleted file mode 100644 index 60d173c..0000000 --- a/probequest/config.py +++ /dev/null @@ -1,132 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- - -""" -ProbeQuest configuration. -""" - -from enum import Enum -from re import compile as rcompile, IGNORECASE - - -class Mode(Enum): - """ - Enumeration of the different operational modes - supported by this software. - """ - - RAW = "raw" - PNL = "pnl" - - def __str__(self): - return self.value - - -class Config: - """ - Configuration object. - """ - - interface = None - - essid_filters = None - essid_regex = None - ignore_case = False - - mac_exclusions = None - mac_filters = None - - output_file = None - - mode = Mode.RAW - fake = False - debug = False - - _display_func = lambda *args: None # noqa: E731 - _storage_func = lambda *args: None # noqa: E731 - - @property - def display_func(self): - """ - Callback function triggered when a packet needs to be displayed. - """ - - return self._display_func - - @property - def storage_func(self): - """ - Callback function triggered when a packet needs to be stored. - """ - - return self._storage_func - - @display_func.setter - def display_func(self, func): - if not hasattr(func, "__call__"): - raise TypeError( - "The display function property is not a callable object" - ) - - self._display_func = func - - @storage_func.setter - def storage_func(self, func): - if not hasattr(func, "__call__"): - raise TypeError( - "The storage function property is not a callable object" - ) - - self._storage_func = func - - def generate_frame_filter(self): - """ - Generates and returns the frame filter according to the different - options set of the current 'Config' object. - """ - - frame_filter = "type mgt subtype probe-req" - - if self.mac_exclusions is not None: - frame_filter += " and not (" - - for i, station in enumerate(self.mac_exclusions): - if i == 0: - frame_filter += "ether src host {s_mac}".format( - s_mac=station) - else: - frame_filter += "|| ether src host {s_mac}".format( - s_mac=station) - - frame_filter += ")" - - if self.mac_filters is not None: - frame_filter += " and (" - - for i, station in enumerate(self.mac_filters): - if i == 0: - frame_filter += "ether src host {s_mac}".format( - s_mac=station) - else: - frame_filter += "|| ether src host {s_mac}".format( - s_mac=station) - - frame_filter += ")" - - return frame_filter - - def complile_essid_regex(self): - """ - Returns the compiled version of the ESSID regex. - """ - - if self.essid_regex is not None: - if self.ignore_case: - return rcompile( - self.essid_regex, - IGNORECASE - ) - - return rcompile(self.essid_regex) - - return None diff --git a/probequest/fake_packet_sniffer.py b/probequest/fake_packet_sniffer.py deleted file mode 100644 index cdc6e71..0000000 --- a/probequest/fake_packet_sniffer.py +++ /dev/null @@ -1,73 +0,0 @@ -""" -Fake packet sniffer module. -""" - -from threading import Thread, Event - -from scapy.layers.dot11 import RadioTap, Dot11, Dot11ProbeReq, Dot11Elt - -from faker import Faker -from faker_wifi_essid import WifiESSID - - -class FakePacketSniffer(Thread): - """ - A fake packet sniffing thread. - - This thread returns fake Wi-Fi ESSIDs for development and test purposes. - """ - - def __init__(self, config, new_packets): - super().__init__() - - self.config = config - self.new_packets = new_packets - - self.stop_sniffer = Event() - - self.fake = Faker() - self.fake.add_provider(WifiESSID) - - def run(self): - from time import sleep - - while not self.stop_sniffer.isSet(): - sleep(1) - self.new_packet() - - def join(self, timeout=None): - """ - Stops the fake packet sniffer. - """ - - self.stop_sniffer.set() - super().join(timeout) - - def stop(self): - """ - Stops the fake packet sniffer. - - Alias for 'join()'. - """ - - self.join() - - def new_packet(self): - """ - Adds a new fake packet to the queue to be processed. - """ - - # pylint: disable=no-member - - fake_probe_req = RadioTap() \ - / Dot11( - addr1="ff:ff:ff:ff:ff:ff", - addr2=self.fake.mac_address(), - addr3=self.fake.mac_address() - ) \ - / Dot11ProbeReq() \ - / Dot11Elt( - info=self.fake.wifi_essid() - ) - - self.new_packets.put(fake_probe_req) diff --git a/probequest/packet_sniffer.py b/probequest/packet_sniffer.py deleted file mode 100644 index 3938460..0000000 --- a/probequest/packet_sniffer.py +++ /dev/null @@ -1,51 +0,0 @@ -""" -Packet sniffer module. -""" - -from scapy.sendrecv import AsyncSniffer - - -class PacketSniffer: - """ - Wrapper around the 'AsyncSniffer' class from the Scapy project. - """ - - def __init__(self, config, new_packets): - self.config = config - self.new_packets = new_packets - - self.sniffer = AsyncSniffer( - iface=self.config.interface, - filter=self.config.generate_frame_filter(), - store=False, - prn=self.new_packet - ) - - def start(self): - """ - Starts the packet sniffer. - """ - - self.sniffer.start() - - def stop(self): - """ - Stops the packet sniffer. - """ - - self.sniffer.stop() - - def is_running(self): - """ - Returns true if the sniffer is running, false otherwise. - """ - - return self.sniffer.running - - def new_packet(self, packet): - """ - Adds the packet given as parameter to the queue to be processed by the - parser. - """ - - self.new_packets.put(packet) diff --git a/probequest/probe_request.py b/probequest/probe_request.py deleted file mode 100644 index 6fd480d..0000000 --- a/probequest/probe_request.py +++ /dev/null @@ -1,42 +0,0 @@ -""" -A Wi-Fi probe request. -""" - -from time import localtime, strftime -from netaddr import EUI, NotRegisteredError - - -class ProbeRequest: - """ - Probe request class. - """ - - def __init__(self, timestamp, s_mac, essid): - self.timestamp = timestamp - self.s_mac = str(s_mac) - self.essid = str(essid) - - self.s_mac_oui = self.get_mac_organisation() - - def __str__(self): - return "{timestamp} - {s_mac} ({mac_org}) -> {essid}".format( - timestamp=strftime( - "%a, %d %b %Y %H:%M:%S %Z", - localtime(self.timestamp) - ), - s_mac=self.s_mac, - mac_org=self.s_mac_oui, - essid=self.essid - ) - - def get_mac_organisation(self): - """ - Returns the OUI of the MAC address as a string. - """ - - # pylint: disable=no-member - - try: - return EUI(self.s_mac).oui.registration().org - except NotRegisteredError: - return None diff --git a/probequest/probe_request_parser.py b/probequest/probe_request_parser.py deleted file mode 100644 index 1e9a4d6..0000000 --- a/probequest/probe_request_parser.py +++ /dev/null @@ -1,90 +0,0 @@ -""" -Probe request parser module. -""" - -from queue import Empty -from threading import Thread, Event -from re import match - -from scapy.layers.dot11 import RadioTap, Dot11ProbeReq - -from probequest.probe_request import ProbeRequest - - -class ProbeRequestParser(Thread): - """ - A Wi-Fi probe request parsing thread. - """ - - def __init__(self, config, new_packets): - super().__init__() - - self.config = config - self.new_packets = new_packets - - self.cregex = self.config.complile_essid_regex() - - self.stop_parser = Event() - - if self.config.debug: - print("[!] ESSID filters: " + str(self.config.essid_filters)) - print("[!] ESSID regex: " + str(self.config.essid_regex)) - print("[!] Ignore case: " + str(self.config.ignore_case)) - - def run(self): - # The parser continues to do its job even after the call of the - # join method if the queue is not empty. - while not self.stop_parser.isSet() or not self.new_packets.empty(): - try: - packet = self.new_packets.get(timeout=1) - probe_request = self.parse(packet) - - if probe_request is None: - continue - - if not probe_request.essid: - continue - - if (self.config.essid_filters is not None - and probe_request.essid - not in self.config.essid_filters): - continue - - if (self.cregex is not None - and not - match(self.cregex, probe_request.essid)): - continue - - self.config.display_func(probe_request) - self.config.storage_func(probe_request) - - self.new_packets.task_done() - except Empty: - pass - - def join(self, timeout=None): - """ - Stops the probe request parsing thread. - """ - - self.stop_parser.set() - super().join(timeout) - - @staticmethod - def parse(packet): - """ - Parses the raw packet and returns a probe request object. - """ - - try: - if packet.haslayer(Dot11ProbeReq): - timestamp = packet.getlayer(RadioTap).time - s_mac = packet.getlayer(RadioTap).addr2 - essid = packet.getlayer(Dot11ProbeReq).info.decode("utf-8") - - return ProbeRequest(timestamp, s_mac, essid) - - return None - except UnicodeDecodeError: - # The ESSID is not a valid UTF-8 string. - return None diff --git a/probequest/probe_request_sniffer.py b/probequest/probe_request_sniffer.py deleted file mode 100644 index c6ba9ec..0000000 --- a/probequest/probe_request_sniffer.py +++ /dev/null @@ -1,104 +0,0 @@ -""" -Wi-Fi probe request sniffer. -""" - -from queue import Queue - -from scapy.arch import get_if_hwaddr -from scapy.error import Scapy_Exception - -from probequest.packet_sniffer import PacketSniffer -from probequest.fake_packet_sniffer import FakePacketSniffer -from probequest.probe_request_parser import ProbeRequestParser - - -class ProbeRequestSniffer: - """ - Wi-Fi probe request sniffer. - - It is composed of a packet sniffer and a packet parser, both running - in a thread and intercommunicating using a queue. - """ - - def __init__(self, config): - self.config = config - - self.new_packets = Queue() - self.new_sniffer() - self.new_parser() - - def start(self): - """ - Starts the probe request sniffer. - - This method will start the sniffing and parsing threads. - """ - - try: - # Test if the interface exists. - get_if_hwaddr(self.config.interface) - except Scapy_Exception: - pass - - self.sniffer.start() - - try: - self.parser.start() - except RuntimeError: - self.new_parser() - self.parser.start() - - def stop(self): - """ - Stops the probe request sniffer. - - This method will stop the sniffing and parsing threads. - """ - - try: - self.sniffer.stop() - except Scapy_Exception: - # The sniffer was not running. - pass - - try: - self.parser.join() - except RuntimeError: - # stop() has been called before start(). - pass - - def new_sniffer(self): - """ - Creates a new sniffing thread. - - If the '--fake' option is set, a fake packet sniffer will be used. - """ - - if self.config.fake: - self.sniffer = FakePacketSniffer( - self.config, - self.new_packets - ) - else: - self.sniffer = PacketSniffer( - self.config, - self.new_packets - ) - - def new_parser(self): - """ - Creates a new parsing thread. - """ - - self.parser = ProbeRequestParser( - self.config, - self.new_packets - ) - - def is_running(self): - """ - Returns true if the probe request sniffer is running and false - otherwise. - """ - - return self.sniffer.is_running() diff --git a/probequest/ui/pnl.py b/probequest/ui/pnl.py deleted file mode 100644 index 04d61a4..0000000 --- a/probequest/ui/pnl.py +++ /dev/null @@ -1,206 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- - -""" -Preferred network list viewer. -""" - -import urwid - -from probequest.probe_request_sniffer import ProbeRequestSniffer - - -class PNLViewer: - """ - TUI used to display the PNL of the nearby devices sending probe requests. - """ - - # pylint: disable=too-many-instance-attributes - - palette = [ - ("header_running", "white", "dark green", "bold"), - ("header_stopped", "white", "dark red", "bold"), - ("footer", "dark cyan", "dark blue", "bold"), - ("key", "light cyan", "dark blue", "underline"), - ("selected", "black", "light green"), - ] - - footer_text = ("footer", [ - " ", - ("key", "P"), " play/pause ", - ("key", "Q"), " quit", - ]) - - def __init__(self, config): - self.config = config - - self.stations = dict() - self.loop = None - - self.config.display_func = self.new_probe_req - self.sniffer = ProbeRequestSniffer(config) - - self.view = self.setup_view() - - def setup_view(self): - """ - Returns the root widget. - """ - - self.interface_text = urwid.Text(self.config.interface) - self.sniffer_state_text = urwid.Text("Stopped") - - self.header = urwid.AttrWrap(urwid.Columns([ - urwid.Text("Sniffer's state: "), - self.sniffer_state_text, - urwid.Text(" Interface: "), - self.interface_text, - ]), "header_stopped") - footer = urwid.AttrWrap(urwid.Text(self.footer_text), "footer") - vline = urwid.AttrWrap(urwid.SolidFill(u"\u2502"), "line") - - station_panel = urwid.Padding( - self.setup_menu("List of Stations", self.stations.keys()), - align="center", width=("relative", 90) - ) - pnl_panel = urwid.Padding( - urwid.ListBox(urwid.SimpleListWalker([])), - align="center", width=("relative", 90) - ) - - self.station_list = station_panel.base_widget - self.pnl_list = pnl_panel.base_widget - - body = urwid.Columns([ - station_panel, - ("fixed", 1, vline), - pnl_panel, - ], focus_column=0) - - top = urwid.Frame( - header=self.header, - body=body, - footer=footer, - focus_part="body" - ) - - return top - - def setup_menu(self, title, choices): - """ - Creates and returns a dynamic ListBox object containing a title and the - choices given as parameters. - """ - - body = [urwid.Text(title), urwid.Divider()] - - for choice in choices: - button = urwid.Button(choice) - urwid.connect_signal(button, "click", self.station_chosen, choice) - body.append(urwid.AttrMap(button, None, focus_map="selected")) - - return urwid.ListBox(urwid.SimpleFocusListWalker(body)) - - def new_probe_req(self, probe_req): - """ - Callback method called on each new probe request. - """ - - if probe_req.s_mac not in self.stations: - self.stations[probe_req.s_mac] = [] - self.add_station(probe_req.s_mac) - - if not any(essid.text == probe_req.essid - for essid in self.stations[probe_req.s_mac]): - self.stations[probe_req.s_mac].append(urwid.Text(probe_req.essid)) - - if len(self.stations.keys()) == 1: - self.station_list.set_focus(2) - self.station_chosen(None, probe_req.s_mac) - - self.loop.draw_screen() - - def add_station(self, name): - """ - Adds a new station to the stations list. - """ - - button = urwid.Button(name) - urwid.connect_signal(button, "click", self.station_chosen, name) - self.station_list.body.append( - urwid.AttrMap(button, None, focus_map="selected") - ) - - def station_chosen(self, button, choice): - """ - Callback method called when a station is selected in the station list. - """ - - # pylint: disable=unused-argument - - # 'button' is the widget object passed by Urwid. - - self.pnl_list.body = self.stations[choice] - - def start_sniffer(self): - """ - Starts the sniffer. - """ - - self.sniffer.start() - self.sniffer_state_text.set_text("Running") - self.header.set_attr("header_running") - - def stop_sniffer(self): - """ - Stops the sniffer. - """ - - self.sniffer.stop() - self.sniffer_state_text.set_text("Stopped") - self.header.set_attr("header_stopped") - - def toggle_sniffer_state(self): - """ - Toggles the sniffer's state. - """ - - if self.sniffer.is_running(): - self.stop_sniffer() - else: - self.start_sniffer() - - def main(self): - """ - Starts the TUI. - """ - - self.loop = urwid.MainLoop( - self.view, - self.palette, - unhandled_input=self.unhandled_keypress - ) - self.loop.run() - - def exit_program(self): - """ - Stops and exits the TUI. - """ - - self.sniffer.stop() - raise urwid.ExitMainLoop() - - def unhandled_keypress(self, key): - """ - Contains handlers for each keypress that is not handled by the widgets - being displayed. - """ - - if key in ("q", "Q"): - self.exit_program() - elif key in ("p", "P"): - self.toggle_sniffer_state() - else: - return False - - return True diff --git a/probequest/ui/raw.py b/probequest/ui/raw.py deleted file mode 100644 index d8ddf02..0000000 --- a/probequest/ui/raw.py +++ /dev/null @@ -1,57 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- - -""" -Raw probe request viewer. -""" - -from probequest.probe_request_sniffer import ProbeRequestSniffer - - -class RawProbeRequestViewer: - """ - Displays the raw probe requests passing nearby the Wi-Fi interface. - """ - - def __init__(self, config): - self.output = config.output_file - - if self.output is not None: - from csv import writer - - outfile = writer(self.output, delimiter=";") - - def write_csv(probe_req): - outfile.writerow([ - probe_req.timestamp, - probe_req.s_mac, - probe_req.s_mac_oui, - probe_req.essid - ]) - else: - write_csv = lambda *args: None # noqa: E731 - - def display_probe_req(probe_req): - print(probe_req) - - config.display_func = display_probe_req - config.storage_func = write_csv - - self.sniffer = ProbeRequestSniffer(config) - - def start(self): - """ - Starts the probe request sniffer. - """ - - self.sniffer.start() - - def stop(self): - """ - Stops the probe request sniffer. - """ - - self.sniffer.stop() - - if self.output is not None: - self.output.close() diff --git a/probequest/version.py b/probequest/version.py deleted file mode 100644 index a815ff8..0000000 --- a/probequest/version.py +++ /dev/null @@ -1,5 +0,0 @@ -""" -Actual version number of ProbeQuest. -""" - -VERSION = "0.7.2" diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..4602018 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,7 @@ +[build-system] +requires = [ + "setuptools >= 42", + "setuptools_scm >= 2.0.0, <3", + "wheel", +] +build-backend = "setuptools.build_meta" diff --git a/setup.cfg b/setup.cfg new file mode 100644 index 0000000..e527402 --- /dev/null +++ b/setup.cfg @@ -0,0 +1,61 @@ +[metadata] +name = probequest +version = 0.8.0 +description = Toolkit for Playing with Wi-Fi Probe Requests. +long_description = file: README.rst +keywords = wifi, wireless, security, sniffer +license = GPLv3 +license_files = LICENSE +author = Paul-Emmanuel Raoul +author_email = skyper@skyplabs.net +url = https://github.com/SkypLabs/probequest +project_urls = + Bug Tracker = https://github.com/SkypLabs/probequest/issues + Documentation = https://probequest.readthedocs.io + Source Code = https://github.com/SkypLabs/probequest +classifiers = + Development Status :: 4 - Beta + Environment :: Console + Intended Audience :: Information Technology + Natural Language :: English + Topic :: Security + Topic :: System :: Networking + Topic :: System :: Networking :: Monitoring + Programming Language :: Python :: 3 :: Only + Programming Language :: Python :: 3.7 + Programming Language :: Python :: 3.8 + Programming Language :: Python :: 3.9 + Programming Language :: Python :: 3.10 + License :: OSI Approved :: GNU General Public License v3 (GPLv3) + +[options] +packages = find: +package_dir = + =src +python_requires = >=3.7, <4 +install_requires = + netaddr >= 0.7.19 + scapy >= 2.4.3 + +[options.packages.find] +where = src + +[options.entry_points] +console_scripts = + probequest = probequest.cli:main + +[options.extras_require] +complete = + faker_wifi_essid +tests = + flake8 + pylint + tox +docs = + Sphinx >= 3.2 + sphinxcontrib-seqdiag >= 2.0.0 + sphinx-argparse >= 0.2.2 + sphinx_rtd_theme >= 0.5.0 + +[pylint.message_control] +disable = duplicate-code diff --git a/setup.py b/setup.py deleted file mode 100755 index 26d604c..0000000 --- a/setup.py +++ /dev/null @@ -1,67 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- - -""" -Setuptools build system configuration file -for ProbeQuest. - -See https://setuptools.readthedocs.io. -""" - -from codecs import open as fopen -from os.path import dirname, abspath, join -from setuptools import setup, find_packages - -from probequest.version import VERSION - -DIR = dirname(abspath(__file__)) - -with fopen(join(DIR, 'README.rst'), encoding='utf-8') as f: - LONG_DESCRIPTION = f.read() - - -setup( - name='probequest', - version=VERSION, - description='Toolkit for Playing with Wi-Fi Probe Requests', - long_description=LONG_DESCRIPTION, - license='GPLv3', - keywords='wifi wireless security sniffer', - author='Paul-Emmanuel Raoul', - author_email='skyper@skyplabs.net', - url='https://github.com/SkypLabs/probequest', - download_url='https://github.com/SkypLabs/probequest/archive/v{0}.zip' - .format(VERSION), - classifiers=[ - 'Development Status :: 4 - Beta', - 'Intended Audience :: Developers', - 'Topic :: Software Development :: Libraries :: Python Modules', - 'Programming Language :: Python :: 3 :: Only', - 'Programming Language :: Python :: 3.4', - 'Programming Language :: Python :: 3.5', - 'Programming Language :: Python :: 3.6', - 'Programming Language :: Python :: 3.7', - 'License :: OSI Approved :: GNU General Public License v3 (GPLv3)', - ], - packages=find_packages(), - scripts=['bin/probequest'], - test_suite='test', - install_requires=[ - 'argparse >= 1.4.0', - 'faker_wifi_essid', - 'netaddr >= 0.7.19', - 'scapy >= 2.4.3', - 'urwid>= 2.0.1', - ], - tests_require=[ - 'pylint' - ], - extras_require={ - 'docs': [ - 'sphinx >= 1.4.0', - 'sphinxcontrib-seqdiag >= 0.8.5', - 'sphinx-argparse >= 0.2.2', - 'sphinx_rtd_theme', - ], - }, -) diff --git a/src/probequest/__init__.py b/src/probequest/__init__.py new file mode 100644 index 0000000..da264d1 --- /dev/null +++ b/src/probequest/__init__.py @@ -0,0 +1,21 @@ +""" +ProbeQuest package. +""" + +import logging +from pkg_resources import get_distribution + +__version__ = get_distribution("probequest").version + + +def set_up_package_logger(): + """ + Sets up the package logger. + """ + + logger = logging.getLogger(__name__) + logger.setLevel(logging.DEBUG) + logger.addHandler(logging.NullHandler()) + + +set_up_package_logger() diff --git a/src/probequest/__main__.py b/src/probequest/__main__.py new file mode 100644 index 0000000..b40b87f --- /dev/null +++ b/src/probequest/__main__.py @@ -0,0 +1,7 @@ +""" +Executes the command-line tool when run as a script or with 'python -m'. +""" + +from .cli import main + +main() diff --git a/src/probequest/cli.py b/src/probequest/cli.py new file mode 100644 index 0000000..3d22a31 --- /dev/null +++ b/src/probequest/cli.py @@ -0,0 +1,247 @@ +""" +CLI module. +""" + +import logging +from argparse import ArgumentParser, FileType +from logging.handlers import MemoryHandler +from os import geteuid +from sys import exit as sys_exit +from time import sleep + +from scapy.pipetool import PipeEngine + +from . import __version__ as VERSION +from .config import Config +from .exceptions import InterfaceDoesNotExistException +from .exceptions import DependencyNotPresentException +from .exporters.csv import ProbeRequestCSVExporter +from .probe_request_filter import ProbeRequestFilter +from .probe_request_parser import ProbeRequestParser +from .sniffers.probe_request_sniffer import ProbeRequestSniffer +from .ui.console import ProbeRequestConsole + +# Used to specify the capacity of the memory handler which will store the logs +# in memory until the argument parser is called to know whether they need to be +# flushed to the console (see "--debug" option) or not. +MEMORY_LOGGER_CAPACITY = 50 + + +def get_arg_parser(): + """ + Returns the argument parser. + """ + + arg_parser = ArgumentParser( + description="Toolkit for Playing with Wi-Fi Probe Requests", + ) + arg_parser.add_argument( + "interface", + help="wireless interface to use (must be in monitor mode)", + ) + arg_parser.add_argument( + "--debug", action="store_true", + dest="debug", + help="debug mode", + ) + arg_parser.add_argument( + "--fake", action="store_true", + dest="fake", + help="display only fake ESSIDs", + ) + arg_parser.add_argument( + "--ignore-case", action="store_true", + dest="ignore_case", + help="ignore case distinctions in the regex pattern (default: false)", + ) + arg_parser.add_argument( + "-o", "--output", + type=FileType("a"), + dest="output_file", + help="output file to save the captured data (CSV format)", + ) + arg_parser.add_argument("--version", action="version", version=VERSION) + arg_parser.set_defaults(debug=False) + arg_parser.set_defaults(fake=False) + arg_parser.set_defaults(ignore_case=False) + + essid_arguments = arg_parser.add_mutually_exclusive_group() + essid_arguments.add_argument( + "-e", "--essid", + nargs="+", + metavar="ESSID", + dest="essid_filters", + help="ESSID of the APs to filter (space-separated list)", + ) + essid_arguments.add_argument( + "-r", "--regex", + metavar="REGEX", + dest="essid_regex", + help="regex to filter the ESSIDs", + ) + + station_arguments = arg_parser.add_mutually_exclusive_group() + station_arguments.add_argument( + "--exclude", + nargs="+", + metavar="STATION", + dest="mac_exclusions", + help="MAC addresses of the stations to exclude (space-separated list)", + ) + station_arguments.add_argument( + "-s", "--station", + nargs="+", + metavar="STATION", + dest="mac_filters", + help="MAC addresses of the stations to filter (space-separated list)", + ) + + return arg_parser + + +def set_up_root_logger(level=logging.DEBUG): + """ + Sets up the root logger. + + Returns a tuple containing the root logger, the memory handler and the + console handler. + """ + + root_logger = logging.getLogger("") + root_logger.setLevel(level) + + console = logging.StreamHandler() + + console_formatter = \ + logging.Formatter("%(name)-12s: %(levelname)-8s %(message)s") + console.setFormatter(console_formatter) + + memory_handler = MemoryHandler(MEMORY_LOGGER_CAPACITY) + root_logger.addHandler(memory_handler) + + return (root_logger, memory_handler, console) + + +def build_cluster(config): + """ + Build the ProbeQuest cluster. + """ + + # pylint: disable=import-outside-toplevel + # pylint: disable=pointless-statement + + try: + if config.fake: + from .sniffers.fake_probe_request_sniffer \ + import FakeProbeRequestSniffer + sniffer = FakeProbeRequestSniffer(1) + else: + sniffer = ProbeRequestSniffer(config) + except ModuleNotFoundError as err: + raise DependencyNotPresentException(err) from err + + parser = ProbeRequestParser(config) + filters = ProbeRequestFilter(config) + console = ProbeRequestConsole() + + engine = PipeEngine(sniffer) + + sniffer > parser > filters > console + + if config.output_file: + csv_exporter = ProbeRequestCSVExporter(config) + filters > csv_exporter + + return engine + + +def main(): + """ + Entry point of the command-line tool. + """ + + # pylint: disable=too-many-statements + + root_logger, memory_handler, console = set_up_root_logger() + + logger = logging.getLogger(__name__) + + logger.info("Program started") + + # -------------------------------------------------- # + # CLI configuration + # -------------------------------------------------- # + logger.debug("Creating configuration object") + config = Config() + + # -------------------------------------------------- # + # Parsing arguments + # -------------------------------------------------- # + logger.debug("Parsing arguments") + + try: + get_arg_parser().parse_args(namespace=config) + except InterfaceDoesNotExistException as err: + logger.critical(err, exc_info=True) + sys_exit(f"[!] {err}") + + # -------------------------------------------------- # + # Debug mode + # -------------------------------------------------- # + # If the "--debug" option is present, flush the log buffer to the console, + # remove the memory handler from the root logger and add the console + # handler directly to the root logger. + if config.debug: + logger.debug("Setting the console as target of the memory handler") + memory_handler.setTarget(console) + + logger.debug("Removing the memory handler from the root logger") + # The buffer is flushed to the console at close time. + memory_handler.close() + root_logger.removeHandler(memory_handler) + + root_logger.addHandler(console) + logger.debug("Console handler added to the root logger") + # If the "--debug" option is absent (default), close the memory handler + # without flushing anything to the console. + else: + memory_handler.flushOnClose = False + memory_handler.close() + logger.debug("Memory handler closed") + + # -------------------------------------------------- # + # Checking privileges + # -------------------------------------------------- # + if not geteuid() == 0: + logger.critical("User needs to be root to sniff the traffic") + sys_exit("[!] You must be root") + + # -------------------------------------------------- # + # Sniffing loop + # -------------------------------------------------- # + try: + logger.info("Creating Pipe engine") + engine = build_cluster(config) + + logger.info("Starting Pipe engine") + print("[*] Start sniffing probe requests...") + engine.start() + while True: + sleep(100) + except DependencyNotPresentException as err: + err_msg = f"An optional dependency is missing: {err}" + logger.critical(err_msg, exc_info=True) + sys_exit("[x] " + err_msg) + except KeyboardInterrupt: + logger.info("Keyboard interrupt received") + print("[*] Bye!") + finally: + if "engine" in locals(): + logger.debug("Stopping the Pipe engine") + engine.stop() + + if config.output_file is not None: + logger.debug("Closing output file") + config.output_file.close() + + logger.info("Program ended") diff --git a/src/probequest/config.py b/src/probequest/config.py new file mode 100644 index 0000000..3a06bbb --- /dev/null +++ b/src/probequest/config.py @@ -0,0 +1,121 @@ +""" +ProbeQuest configuration. +""" + +import logging +from re import compile as rcompile, IGNORECASE + +from scapy.arch import get_if_list + +from .exceptions import InterfaceDoesNotExistException + + +class Config: + """ + Configuration object. + """ + + _interface = None + + essid_filters = None + essid_regex = None + ignore_case = False + + mac_exclusions = None + mac_filters = None + + output_file = None + + fake = False + debug = False + + _compiled_essid_regex = None + _frame_filter = None + + def __init__(self): + self.logger = logging.getLogger(__name__) + + @property + def interface(self): + """ + Interface from which the probe requests will be captured. + """ + + return self._interface + + @interface.setter + def interface(self, interface): + # If interface does not exist. + if interface not in get_if_list(): + raise InterfaceDoesNotExistException( + f"Interface {interface} does not exist" + ) + + self._interface = interface + + @property + def frame_filter(self): + """ + Generates and returns the frame filter according to the different + options set of the current 'Config' object. + + The value is cached once computed. + """ + + if self._frame_filter is None: + self._frame_filter = "type mgt subtype probe-req" + + if self.mac_exclusions is not None: + self._frame_filter += " and not (" + + for i, station in enumerate(self.mac_exclusions): + if i == 0: + self._frame_filter += \ + f"ether src host {station}" + else: + self._frame_filter += \ + f"|| ether src host {station}" + + self._frame_filter += ")" + + if self.mac_filters is not None: + self._frame_filter += " and (" + + for i, station in enumerate(self.mac_filters): + if i == 0: + self._frame_filter += \ + f"ether src host {station}" + else: + self._frame_filter += \ + f"|| ether src host {station}" + + self._frame_filter += ")" + + self.logger.debug("Frame filter: \"%s\"", self._frame_filter) + + return self._frame_filter + + @property + def compiled_essid_regex(self): + """ + Returns the compiled version of the ESSID regex. + + The value is cached once computed. + """ + + # If there is a regex in the configuration and it hasn't been compiled + # yet. + if self._compiled_essid_regex is None and self.essid_regex is not None: + self.logger.debug("Compiling ESSID regex") + + if self.ignore_case: + self.logger.debug("Ignoring case in ESSID regex") + + self._compiled_essid_regex = rcompile( + self.essid_regex, + IGNORECASE + ) + else: + self._compiled_essid_regex = rcompile(self.essid_regex) + + return self._compiled_essid_regex diff --git a/src/probequest/exceptions.py b/src/probequest/exceptions.py new file mode 100644 index 0000000..3183881 --- /dev/null +++ b/src/probequest/exceptions.py @@ -0,0 +1,21 @@ +""" +ProbeQuest exceptions module. +""" + + +class ProbeQuestException(Exception): + """ + Base class for all exceptions thrown by the probequest module. + """ + + +class InterfaceDoesNotExistException(ProbeQuestException): + """ + Thrown when the network interface does not exist. + """ + + +class DependencyNotPresentException(ProbeQuestException): + """ + Thrown when an optional dependency is not present on the system. + """ diff --git a/probequest/__init__.py b/src/probequest/exporters/__init__.py similarity index 100% rename from probequest/__init__.py rename to src/probequest/exporters/__init__.py diff --git a/src/probequest/exporters/csv.py b/src/probequest/exporters/csv.py new file mode 100644 index 0000000..cfbcc8c --- /dev/null +++ b/src/probequest/exporters/csv.py @@ -0,0 +1,36 @@ +""" +Probe request CSV exporter module. +""" + +import logging +from csv import writer + +from scapy.pipetool import Sink + + +class ProbeRequestCSVExporter(Sink): + """ + A probe request CSV exporter. + """ + + def __init__(self, config, name=None): + self.logger = logging.getLogger(__name__) + + Sink.__init__(self, name=name) + + self.csv_file = config.output_file + self.csv_writer = None + + if self.csv_file is not None: + self.csv_writer = writer(self.csv_file, delimiter=";") + + self.logger.info("CSV exporter initialised") + + def push(self, msg): + if self.csv_writer is not None: + self.csv_writer.writerow([ + msg.timestamp, + msg.s_mac, + msg.s_mac_oui, + msg.essid + ]) diff --git a/src/probequest/probe_request.py b/src/probequest/probe_request.py new file mode 100644 index 0000000..0557c2b --- /dev/null +++ b/src/probequest/probe_request.py @@ -0,0 +1,48 @@ +""" +A Wi-Fi probe request. +""" + +from time import localtime, strftime +from netaddr import EUI, NotRegisteredError + + +class ProbeRequest: + """ + Probe request class. + """ + + def __init__(self, timestamp, s_mac, essid): + self.timestamp = timestamp + self.s_mac = str(s_mac) + self.essid = str(essid) + + self._s_mac_oui = None + + def __str__(self): + timestamp = strftime( + "%a, %d %b %Y %H:%M:%S %Z", + localtime(self.timestamp) + ) + s_mac = self.s_mac + s_mac_oui = self.s_mac_oui + essid = self.essid + + return f"{timestamp} - {s_mac} ({s_mac_oui}) -> {essid}" + + @property + def s_mac_oui(self): + """ + OUI of the station's MAC address as a string. + + The value is cached once computed. + """ + + # pylint: disable=no-member + + if self._s_mac_oui is None: + try: + self._s_mac_oui = EUI(self.s_mac).oui.registration().org + except NotRegisteredError: + self._s_mac_oui = "Unknown OUI" + + return self._s_mac_oui diff --git a/src/probequest/probe_request_filter.py b/src/probequest/probe_request_filter.py new file mode 100644 index 0000000..c9a8450 --- /dev/null +++ b/src/probequest/probe_request_filter.py @@ -0,0 +1,55 @@ +""" +Probe request filter module. +""" + +import logging +from re import match + +from scapy.pipetool import Drain + + +class ProbeRequestFilter(Drain): + """ + A Wi-Fi probe request filtering drain. + """ + + def __init__(self, config, name=None): + self.logger = logging.getLogger(__name__) + + Drain.__init__(self, name=name) + + self._config = config + self._cregex = self._config.compiled_essid_regex + + self.logger.info("Probe request filter initialised") + + def push(self, msg): + if self.can_pass(msg): + self._send(msg) + + def high_push(self, msg): + if self.can_pass(msg): + self._send(msg) + + def can_pass(self, probe_req): + """ + Whether or not the probe request given as parameter can pass the drain + according to a set of filters. + """ + + # If the probe request doesn't have an ESSID. + if not probe_req.essid: + return False + + # If the probe request's ESSID is not one of those in the filtering + # list. + if (self._config.essid_filters is not None and + probe_req.essid not in self._config.essid_filters): + return False + + # If the probe request's ESSID doesn't match the regex. + if (self._cregex is not None and + not match(self._cregex, probe_req.essid)): + return False + + return True diff --git a/src/probequest/probe_request_parser.py b/src/probequest/probe_request_parser.py new file mode 100644 index 0000000..d9c889c --- /dev/null +++ b/src/probequest/probe_request_parser.py @@ -0,0 +1,57 @@ +""" +Probe request parser module. +""" + +import logging + +from scapy.pipetool import Drain +from scapy.layers.dot11 import RadioTap, Dot11ProbeReq + +from probequest.probe_request import ProbeRequest + + +class ProbeRequestParser(Drain): + """ + A Wi-Fi probe request parsing drain. + """ + + def __init__(self, config, name=None): + self.logger = logging.getLogger(__name__) + + Drain.__init__(self, name=name) + + self.config = config + + self.logger.info("Probe request parser initialised") + + def push(self, msg): + try: + self._send(self.parse(msg)) + except TypeError: + return + + def high_push(self, msg): + try: + self._high_send(self.parse(msg)) + except TypeError: + return + + @staticmethod + def parse(packet): + """ + Parses the raw packet and returns a probe request object. + """ + + try: + if packet.haslayer(Dot11ProbeReq): + timestamp = packet.getlayer(RadioTap).time + s_mac = packet.getlayer(RadioTap).addr2 + essid = packet.getlayer(Dot11ProbeReq).info.decode("utf-8") + + return ProbeRequest(timestamp, s_mac, essid) + + # The packet is not a probe request. + raise TypeError + except UnicodeDecodeError as unicode_decode_err: + # The ESSID is not a valid UTF-8 string. + raise TypeError from unicode_decode_err diff --git a/probequest/ui/__init__.py b/src/probequest/sniffers/__init__.py similarity index 100% rename from probequest/ui/__init__.py rename to src/probequest/sniffers/__init__.py diff --git a/src/probequest/sniffers/fake_probe_request_sniffer.py b/src/probequest/sniffers/fake_probe_request_sniffer.py new file mode 100644 index 0000000..872a3f9 --- /dev/null +++ b/src/probequest/sniffers/fake_probe_request_sniffer.py @@ -0,0 +1,102 @@ +""" +Fake probe request sniffer module. +""" + +import logging +from time import sleep + +from scapy.layers.dot11 import RadioTap, Dot11, Dot11ProbeReq, Dot11Elt +from scapy.pipetool import ThreadGenSource + +from faker import Faker # pylint: disable=import-error +from faker_wifi_essid import WifiESSID # pylint: disable=import-error + + +class FakeProbeRequestSniffer(ThreadGenSource): + """ + A fake probe request sniffer. + + This pipe source sends periodically fake Wi-Fi ESSIDs for development and + test purposes. + + This class inherits from 'ThreadGenSource' and not from 'PeriodicSource' as + this last one only accepts lists, sets and tuples. + """ + + # pylint: disable=too-many-ancestors + + def __init__(self, period, period2=0, name=None): + self.logger = logging.getLogger(__name__) + + ThreadGenSource.__init__(self, name=name) + + self.fake_probe_requests = FakeProbeRequest() + self.period = period + self.period2 = period2 + + self.logger.info("Fake probe request sniffer initialised") + + def generate(self): + # Fix a false positive about not finding '_wake_up'. + # pylint: disable=no-member + + while self.RUN: + # Infinite loop until 'stop()' is called. + for fake_probe_req in self.fake_probe_requests: + self._gen_data(fake_probe_req) + sleep(self.period) + + self.is_exhausted = True + self._wake_up() + + sleep(self.period2) + + def stop(self): + ThreadGenSource.stop(self) + self.fake_probe_requests.stop() + + +class FakeProbeRequest: + """ + A fake probe request iterator. + """ + + def __init__(self): + self._fake = Faker() + self._fake.add_provider(WifiESSID) + + self._should_stop = False + + def __iter__(self): + return self + + def __next__(self): + """ + Generator of fake Wi-Fi probe requests. + """ + + # pylint: disable=no-member + + if self._should_stop: + raise StopIteration + + return RadioTap() \ + / Dot11( + addr1="ff:ff:ff:ff:ff:ff", + addr2=self._fake.mac_address(), + addr3=self._fake.mac_address() + ) \ + / Dot11ProbeReq() \ + / Dot11Elt( + info=self._fake.wifi_essid() + ) + + def stop(self): + """ + Interrupts the iteration. + + The next time the iterator will be called, a 'StopIteration' exception + will be raised. + """ + + self._should_stop = True diff --git a/src/probequest/sniffers/probe_request_sniffer.py b/src/probequest/sniffers/probe_request_sniffer.py new file mode 100644 index 0000000..5de1369 --- /dev/null +++ b/src/probequest/sniffers/probe_request_sniffer.py @@ -0,0 +1,30 @@ +""" +Probe request sniffer module. +""" + +import logging + +from scapy.scapypipes import SniffSource + + +class ProbeRequestSniffer(SniffSource): + """ + Probe request sniffer. + + Wrapper around the 'SniffSource' Scapy pipe module. + """ + + def __init__(self, config): + self.logger = logging.getLogger(__name__) + + self.config = config + + frame_filter = self.config.frame_filter + + SniffSource.__init__( + self, + iface=self.config.interface, + filter=frame_filter + ) + + self.logger.info("Probe request sniffer initialised") diff --git a/test/__init__.py b/src/probequest/ui/__init__.py similarity index 100% rename from test/__init__.py rename to src/probequest/ui/__init__.py diff --git a/src/probequest/ui/console.py b/src/probequest/ui/console.py new file mode 100644 index 0000000..716e903 --- /dev/null +++ b/src/probequest/ui/console.py @@ -0,0 +1,26 @@ +""" +Probe request console module. +""" + +import logging + +from scapy.pipetool import Sink + + +class ProbeRequestConsole(Sink): + """ + Probe request displaying sink. + """ + + def __init__(self): + self.logger = logging.getLogger(__name__) + + Sink.__init__(self) + + self.logger.info("Console initialised") + + def push(self, msg): + print(msg) + + def high_push(self, msg): + print(msg) diff --git a/test/test.py b/test/test.py deleted file mode 100644 index ba5d3ac..0000000 --- a/test/test.py +++ /dev/null @@ -1,452 +0,0 @@ -""" -Unit tests written with the 'unittest' module. -""" - -# pylint: disable=import-error -# pylint: disable=unused-variable - -from queue import Queue -import unittest -import pylint.lint -from netaddr.core import AddrFormatError - -from scapy.layers.dot11 import RadioTap, Dot11, Dot11ProbeReq, Dot11Elt -from scapy.packet import fuzz -from scapy.error import Scapy_Exception - -from probequest.config import Config -from probequest.probe_request import ProbeRequest -from probequest.probe_request_sniffer import ProbeRequestSniffer -from probequest.packet_sniffer import PacketSniffer -from probequest.fake_packet_sniffer import FakePacketSniffer -from probequest.probe_request_parser import ProbeRequestParser - - -class TestProbeRequest(unittest.TestCase): - """ - Unit tests for the 'ProbeRequest' class. - """ - - def test_without_parameters(self): - """ - Initialises a 'ProbeRequest' object without any parameter. - """ - - # pylint: disable=no-value-for-parameter - - with self.assertRaises(TypeError): - probe_req = ProbeRequest() # noqa: F841 - - def test_with_only_one_parameter(self): - """ - Initialises a 'ProbeRequest' object with only one parameter. - """ - - # pylint: disable=no-value-for-parameter - - timestamp = 1517872027.0 - - with self.assertRaises(TypeError): - probe_req = ProbeRequest(timestamp) # noqa: F841 - - def test_with_only_two_parameters(self): - """ - Initialises a 'ProbeRequest' object with only two parameters. - """ - - # pylint: disable=no-value-for-parameter - - timestamp = 1517872027.0 - s_mac = "aa:bb:cc:dd:ee:ff" - - with self.assertRaises(TypeError): - probe_req = ProbeRequest(timestamp, s_mac) # noqa: F841 - - def test_create_a_probe_request(self): - """ - Creates a new 'ProbeRequest' with all the required parameters. - """ - - # pylint: disable=no-self-use - - timestamp = 1517872027.0 - s_mac = "aa:bb:cc:dd:ee:ff" - essid = "Test ESSID" - - probe_req = ProbeRequest(timestamp, s_mac, essid) # noqa: F841 - - def test_bad_mac_address(self): - """ - Initialises a 'ProbeRequest' object with a malformed MAC address. - """ - - timestamp = 1517872027.0 - s_mac = "aa:bb:cc:dd:ee" - essid = "Test ESSID" - - with self.assertRaises(AddrFormatError): - probe_req = ProbeRequest(timestamp, s_mac, essid) # noqa: F841 - - def test_print_a_probe_request(self): - """ - Initialises a 'ProbeRequest' object and prints it. - """ - - timestamp = 1517872027.0 - s_mac = "aa:bb:cc:dd:ee:ff" - essid = "Test ESSID" - - probe_req = ProbeRequest(timestamp, s_mac, essid) - - self.assertNotEqual( - str(probe_req).find("Mon, 05 Feb 2018 23:07:07"), - -1 - ) - self.assertNotEqual( - str(probe_req).find("aa:bb:cc:dd:ee:ff (None) -> Test ESSID"), - -1 - ) - - -class TestConfig(unittest.TestCase): - """ - Unit tests for the 'Config' class. - """ - - def test_bad_display_function(self): - """ - Assigns a non-callable object to the display callback function. - """ - - with self.assertRaises(TypeError): - config = Config() - config.display_func = "test" - - def test_bad_storage_function(self): - """ - Assigns a non-callable object to the storage callback function. - """ - - with self.assertRaises(TypeError): - config = Config() - config.storage_func = "test" - - def test_default_frame_filter(self): - """ - Tests the default frame filter. - """ - - config = Config() - frame_filter = config.generate_frame_filter() - - self.assertEqual( - frame_filter, - "type mgt subtype probe-req" - ) - - def test_frame_filter_with_mac_filtering(self): - """ - Tests the frame filter when some MAC addresses need to be filtered. - """ - - config = Config() - config.mac_filters = ["a4:77:33:9a:73:5c", "b0:05:94:5d:5a:4d"] - frame_filter = config.generate_frame_filter() - - self.assertEqual( - frame_filter, - "type mgt subtype probe-req" + - " and (ether src host a4:77:33:9a:73:5c" + - "|| ether src host b0:05:94:5d:5a:4d)" - ) - - def test_frame_filter_with_mac_exclusion(self): - """ - Tests the frame filter when some MAC addresses need to be excluded. - """ - - config = Config() - config.mac_exclusions = ["a4:77:33:9a:73:5c", "b0:05:94:5d:5a:4d"] - frame_filter = config.generate_frame_filter() - - self.assertEqual( - frame_filter, - "type mgt subtype probe-req" + - " and not (ether src host a4:77:33:9a:73:5c" + - "|| ether src host b0:05:94:5d:5a:4d)" - ) - - def test_compile_essid_regex_with_an_empty_regex(self): - """ - Tests 'complile_essid_regex' with an empty regex. - """ - - config = Config() - compiled_regex = config.complile_essid_regex() - - self.assertEqual(compiled_regex, None) - - def test_compile_essid_regex_with_a_case_sensitive_regex(self): - """ - Tests 'complile_essid_regex' with a case-sensitive regex. - """ - - from re import compile as rcompile - - config = Config() - config.essid_regex = "Free Wi-Fi" - compiled_regex = config.complile_essid_regex() - - self.assertEqual(compiled_regex, rcompile(config.essid_regex)) - - def test_compile_essid_regex_with_a_case_insensitive_regex(self): - """ - Tests 'complile_essid_regex' with a case-insensitive regex. - """ - - from re import compile as rcompile, IGNORECASE - - config = Config() - config.essid_regex = "Free Wi-Fi" - config.ignore_case = True - compiled_regex = config.complile_essid_regex() - - self.assertEqual(compiled_regex, rcompile( - config.essid_regex, IGNORECASE)) - - -class TestProbeRequestSniffer(unittest.TestCase): - """ - Unit tests for the 'ProbeRequestSniffer' class. - """ - - def test_without_parameters(self): - """ - Initialises a 'ProbeRequestSniffer' object without parameters. - """ - - # pylint: disable=no-value-for-parameter - - with self.assertRaises(TypeError): - sniffer = ProbeRequestSniffer() # noqa: F841 - - def test_bad_parameter(self): - """ - Initialises a 'ProbeRequestSniffer' object with a bad parameter. - """ - - # pylint: disable=no-value-for-parameter - - with self.assertRaises(AttributeError): - sniffer = ProbeRequestSniffer("test") # noqa: F841 - - def test_create_sniffer(self): - """ - Creates a 'ProbeRequestSniffer' object with the correct parameter. - """ - - # pylint: disable=no-self-use - - config = Config() - sniffer = ProbeRequestSniffer(config) # noqa: F841 - - def test_stop_before_start(self): - """ - Creates a 'ProbeRequestSniffer' object and stops the sniffer before - starting it. - """ - - # pylint: disable=no-self-use - - config = Config() - sniffer = ProbeRequestSniffer(config) - sniffer.stop() - - -class TestPacketSniffer(unittest.TestCase): - """ - Unit tests for the 'PacketSniffer' class. - """ - - def test_new_packet(self): - """ - Tests the 'new_packet' method. - """ - - config = Config() - new_packets = Queue() - sniffer = PacketSniffer(config, new_packets) - - self.assertEqual(sniffer.new_packets.qsize(), 0) - - packet = RadioTap() \ - / Dot11( - addr1="ff:ff:ff:ff:ff:ff", - addr2="aa:bb:cc:11:22:33", - addr3="dd:ee:ff:11:22:33" - ) \ - / Dot11ProbeReq() \ - / Dot11Elt( - info="Test" - ) - - sniffer.new_packet(packet) - self.assertEqual(sniffer.new_packets.qsize(), 1) - - ProbeRequestParser.parse(sniffer.new_packets.get(timeout=1)) - - def test_stop_before_start(self): - """ - Creates a 'PacketSniffer' object and stops the sniffer before starting - it. - """ - - config = Config() - new_packets = Queue() - sniffer = PacketSniffer(config, new_packets) - - with self.assertRaises(Scapy_Exception): - sniffer.stop() - - def test_is_running_before_start(self): - """ - Creates a 'PacketSniffer' object and runs 'is_running' before starting - the sniffer. - """ - - config = Config() - new_packets = Queue() - sniffer = PacketSniffer(config, new_packets) - - self.assertFalse(sniffer.is_running()) - - -class TestFakePacketSniffer(unittest.TestCase): - """ - Unit tests for the 'FakePacketSniffer' class. - """ - - def test_new_packet(self): - """ - Tests the 'new_packet' method. - """ - - config = Config() - new_packets = Queue() - sniffer = FakePacketSniffer(config, new_packets) - - self.assertEqual(sniffer.new_packets.qsize(), 0) - - sniffer.new_packet() - self.assertEqual(sniffer.new_packets.qsize(), 1) - sniffer.new_packet() - self.assertEqual(sniffer.new_packets.qsize(), 2) - sniffer.new_packet() - self.assertEqual(sniffer.new_packets.qsize(), 3) - - ProbeRequestParser.parse(sniffer.new_packets.get(timeout=1)) - ProbeRequestParser.parse(sniffer.new_packets.get(timeout=1)) - ProbeRequestParser.parse(sniffer.new_packets.get(timeout=1)) - - def test_stop_before_start(self): - """ - Creates a 'FakePacketSniffer' object and stops the sniffer before - starting it. - """ - - config = Config() - new_packets = Queue() - sniffer = FakePacketSniffer(config, new_packets) - - with self.assertRaises(RuntimeError): - sniffer.stop() - - def test_stop_before_start_using_join(self): - """ - Creates a 'FakePacketSniffer' object and stops the sniffer before - starting it. - """ - - config = Config() - new_packets = Queue() - sniffer = FakePacketSniffer(config, new_packets) - - with self.assertRaises(RuntimeError): - sniffer.join() - - -class TestProbeRequestParser(unittest.TestCase): - """ - Unit tests for the 'ProbeRequestParser' class. - """ - - def test_no_probe_request_layer(self): - """ - Creates a non-probe-request Wi-Fi packet and parses it with the - 'ProbeRequestParser.parse()' function. - """ - - # pylint: disable=no-self-use - - packet = RadioTap() \ - / Dot11( - addr1="ff:ff:ff:ff:ff:ff", - addr2="aa:bb:cc:11:22:33", - addr3="dd:ee:ff:11:22:33" - ) - - ProbeRequestParser.parse(packet) - - def test_empty_essid(self): - """ - Creates a probe request packet with an empty ESSID field and parses - it with the 'ProbeRequestParser.parse()' function. - """ - - # pylint: disable=no-self-use - - packet = RadioTap() \ - / Dot11( - addr1="ff:ff:ff:ff:ff:ff", - addr2="aa:bb:cc:11:22:33", - addr3="dd:ee:ff:11:22:33" - ) \ - / Dot11ProbeReq() \ - / Dot11Elt( - info="" - ) - - ProbeRequestParser.parse(packet) - - def test_fuzz_packets(self): - """ - Parses 1000 randomly-generated probe requests with the - 'ProbeRequestParser.parse()' function. - """ - - # pylint: disable=no-self-use - - for i in range(0, 1000): - packet = RadioTap()/fuzz(Dot11()/Dot11ProbeReq()/Dot11Elt()) - ProbeRequestParser.parse(packet) - - -class TestLinter(unittest.TestCase): - """ - Unit tests for Python linters. - """ - - # Some linting errors will be fixed while - # refactoring the code. - @unittest.expectedFailure - def test_pylint(self): - """ - Executes Pylint. - """ - - # pylint: disable=no-self-use - - pylint.lint.Run([ - "probequest", - "test" - ]) diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/unit/__init__.py b/tests/unit/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/unit/test_cli.py b/tests/unit/test_cli.py new file mode 100644 index 0000000..f9c4b21 --- /dev/null +++ b/tests/unit/test_cli.py @@ -0,0 +1,395 @@ +""" +Unit tests for the cli module. +""" + +import unittest + +from argparse import Namespace +from contextlib import redirect_stdout, redirect_stderr +from io import StringIO, TextIOWrapper +from os import remove +from os.path import isfile + +from probequest import __version__ as VERSION +from probequest.cli import get_arg_parser + + +class TestArgParse(unittest.TestCase): + """ + Tests the argument parser. + """ + + # pylint: disable=too-many-public-methods + + output_test_file = "probequest_test_output.txt" + + def setUp(self): + """ + Instanciates a new argument parser. + """ + + self.arg_parser = get_arg_parser() + + def tearDown(self): + """ + Removes test files if any. + """ + + if isfile(self.output_test_file): + remove(self.output_test_file) + + def test_without_parameters(self): + """ + Calls the argument parser with an emtpy input. + """ + + with self.assertRaises(SystemExit) as error_code: + error_output = StringIO() + + with redirect_stderr(error_output): + self.arg_parser.parse_args([]) + + self.assertEqual(error_code.exception.code, 2) + + def test_short_help_option(self): + """ + Calls the argument parser with the '-h' option. + """ + + with self.assertRaises(SystemExit) as error_code: + output = StringIO() + + with redirect_stdout(output): + self.arg_parser.parse_args(["-h"]) + + self.assertEqual(error_code.exception.code, 0) + + def test_long_help_option(self): + """ + Calls the argument parser with the '--help' option. + """ + + with self.assertRaises(SystemExit) as error_code: + output = StringIO() + + with redirect_stdout(output): + self.arg_parser.parse_args(["--help"]) + + self.assertEqual(error_code.exception.code, 0) + + def test_version_option(self): + """ + Calls the argument parser with the '--version' option. + """ + + with self.assertRaises(SystemExit) as error_code: + output = StringIO() + + with redirect_stdout(output): + self.arg_parser.parse_args(["--version"]) + + self.assertEqual(error_code.exception.code, 0) + self.assertEqual(output.getvalue(), VERSION + "\n") + + def test_default_values(self): + """ + Calls the argument parser with an empty input and tests the default + values in the configuration namespace. + """ + + # pylint: disable=no-member + + with self.assertRaises(SystemExit) as error_code: + error_output = StringIO() + + with redirect_stderr(error_output): + config = Namespace() + self.arg_parser.parse_args( + [], namespace=config + ) + + self.assertEqual(error_code.exception.code, 2) + + self.assertIsNone(config.interface) + self.assertIsNone(config.essid_filters) + self.assertIsNone(config.essid_regex) + self.assertFalse(config.ignore_case) + self.assertIsNone(config.mac_exclusions) + self.assertIsNone(config.mac_filters) + self.assertIsNone(config.output_file) + self.assertFalse(config.fake) + self.assertFalse(config.debug) + + def test_interface_argument(self): + """ + Calls the argument parser with the 'interface' argument. + """ + + # pylint: disable=no-member + + config = Namespace() + self.arg_parser.parse_args([ + "wlan0", + ], namespace=config) + + self.assertEqual(config.interface, "wlan0") + + def test_without_interface_argument(self): + """ + Calls the argument parser with some options but not the required + interface argument. + """ + + # pylint: disable=no-member + + with self.assertRaises(SystemExit) as error_code: + error_output = StringIO() + + with redirect_stderr(error_output): + config = Namespace() + self.arg_parser.parse_args([ + "--debug", "--fake", + ], namespace=config) + + self.assertEqual(error_code.exception.code, 2) + + def test_debug_option(self): + """ + Calls the argument parser with the '--debug' option. + """ + + # pylint: disable=no-member + + config = Namespace() + self.arg_parser.parse_args([ + "--debug", "wlan0", + ], namespace=config) + + self.assertTrue(config.debug) + self.assertEqual(config.interface, "wlan0") + + def test_fake_option(self): + """ + Calls the argument parser with the '--fake' option. + """ + + # pylint: disable=no-member + + config = Namespace() + self.arg_parser.parse_args([ + "--fake", "wlan0", + ], namespace=config) + + self.assertTrue(config.fake) + self.assertEqual(config.interface, "wlan0") + + def test_ignore_case_option(self): + """ + Calls the argument parser with the '--ignore-case' option. + """ + + # pylint: disable=no-member + + config = Namespace() + self.arg_parser.parse_args([ + "--ignore-case", "wlan0", + ], namespace=config) + + self.assertTrue(config.ignore_case) + self.assertEqual(config.interface, "wlan0") + + def test_short_output_option(self): + """ + Calls the argument parser with the '-o' option. + """ + + # pylint: disable=no-member + + config = Namespace() + self.arg_parser.parse_args([ + "-o", self.output_test_file, "wlan0", + ], namespace=config) + + self.assertIsInstance(config.output_file, TextIOWrapper) + config.output_file.close() + self.assertEqual(config.interface, "wlan0") + + def test_long_output_option(self): + """ + Calls the argument parser with the '--output' option. + """ + + # pylint: disable=no-member + + config = Namespace() + self.arg_parser.parse_args([ + "--output", self.output_test_file, "wlan0", + ], namespace=config) + + self.assertIsInstance(config.output_file, TextIOWrapper) + config.output_file.close() + self.assertEqual(config.interface, "wlan0") + + def test_short_essid_option(self): + """ + Calls the argument parser with the '-e' option. + """ + + # pylint: disable=no-member + + config = Namespace() + self.arg_parser.parse_args([ + "-e", "essid_1", "essid_2", "essid_3", "--", "wlan0", + ], namespace=config) + + self.assertListEqual(config.essid_filters, [ + "essid_1", "essid_2", "essid_3" + ]) + self.assertEqual(config.interface, "wlan0") + + def test_long_essid_option(self): + """ + Calls the argument parser with the '--essid' option. + """ + + # pylint: disable=no-member + + config = Namespace() + self.arg_parser.parse_args([ + "--essid", "essid_1", "essid_2", "essid_3", "--", "wlan0", + ], namespace=config) + + self.assertListEqual(config.essid_filters, [ + "essid_1", "essid_2", "essid_3" + ]) + self.assertEqual(config.interface, "wlan0") + + def test_short_regex_option(self): + """ + Calls the argument parser with the '-r' option. + """ + + # pylint: disable=no-member + + config = Namespace() + self.arg_parser.parse_args([ + "-r", "test_regex", "wlan0", + ], namespace=config) + + self.assertEqual(config.essid_regex, "test_regex") + self.assertEqual(config.interface, "wlan0") + + def test_long_regex_option(self): + """ + Calls the argument parser with the '--regex' option. + """ + + # pylint: disable=no-member + + config = Namespace() + self.arg_parser.parse_args([ + "--regex", "test_regex", "wlan0", + ], namespace=config) + + self.assertEqual(config.essid_regex, "test_regex") + self.assertEqual(config.interface, "wlan0") + + def test_essid_regex_mutual_exclusivity(self): + """ + Calls the argument parser with both '--essid' and '--regex' options, + which must fail as they are in the same mutually exclusive group. + """ + + # pylint: disable=no-member + + with self.assertRaises(SystemExit) as error_code: + error_output = StringIO() + + with redirect_stderr(error_output): + config = Namespace() + self.arg_parser.parse_args([ + "--essid", "essid_1", + "--regex", "test_regex", + "wlan0", + ], namespace=config) + + self.assertEqual(error_code.exception.code, 2) + + def test_exclude_option(self): + """ + Calls the argument parser with the '--exclude' option. + """ + + # pylint: disable=no-member + + config = Namespace() + self.arg_parser.parse_args([ + "--exclude", "aa:bb:cc:dd:ee:ff", "ff:ee:dd:cc:bb:aa", + "--", "wlan0", + ], namespace=config) + + self.assertListEqual(config.mac_exclusions, [ + "aa:bb:cc:dd:ee:ff", + "ff:ee:dd:cc:bb:aa", + ]) + self.assertEqual(config.interface, "wlan0") + + def test_short_station_option(self): + """ + Calls the argument parser with the '-s' option. + """ + + # pylint: disable=no-member + + config = Namespace() + self.arg_parser.parse_args([ + "-s", "aa:bb:cc:dd:ee:ff", "ff:ee:dd:cc:bb:aa", + "--", "wlan0", + ], namespace=config) + + self.assertListEqual(config.mac_filters, [ + "aa:bb:cc:dd:ee:ff", + "ff:ee:dd:cc:bb:aa", + ]) + self.assertEqual(config.interface, "wlan0") + + def test_long_station_option(self): + """ + Calls the argument parser with the '--station' option. + """ + + # pylint: disable=no-member + + config = Namespace() + self.arg_parser.parse_args([ + "--station", "aa:bb:cc:dd:ee:ff", "ff:ee:dd:cc:bb:aa", + "--", "wlan0", + ], namespace=config) + + self.assertListEqual(config.mac_filters, [ + "aa:bb:cc:dd:ee:ff", + "ff:ee:dd:cc:bb:aa", + ]) + self.assertEqual(config.interface, "wlan0") + + def test_exclude_station_mutual_exclusivity(self): + """ + Calls the argument parser with both '--exclude' and '--station' + options, which must fail as they are in the same mutually exclusive + group. + """ + + # pylint: disable=no-member + + with self.assertRaises(SystemExit) as error_code: + error_output = StringIO() + + with redirect_stderr(error_output): + config = Namespace() + self.arg_parser.parse_args([ + "--exclude", "aa:bb:cc:dd:ee:ff", + "--station", "ff:ee:dd:cc:bb:aa", + "wlan0", + ], namespace=config) + + self.assertEqual(error_code.exception.code, 2) diff --git a/tests/unit/test_config.py b/tests/unit/test_config.py new file mode 100644 index 0000000..5f495d9 --- /dev/null +++ b/tests/unit/test_config.py @@ -0,0 +1,147 @@ +""" +Unit tests for the configuration module. +""" + +import logging +import unittest +from unittest.mock import patch +from re import compile as rcompile, IGNORECASE + +from probequest.config import Config +from probequest.exceptions import InterfaceDoesNotExistException + + +class TestConfig(unittest.TestCase): + """ + Unit tests for the 'Config' class. + """ + + def setUp(self): + """ + Creates a fake package logger. + """ + + self.logger = logging.getLogger("probequest") + self.logger.setLevel(logging.DEBUG) + + def test_default_values(self): + """ + Tests the default values. + """ + + config = Config() + + self.assertIsNone(config.interface) + + self.assertIsNone(config.essid_filters) + self.assertIsNone(config.essid_regex) + self.assertFalse(config.ignore_case) + + self.assertIsNone(config.mac_exclusions) + self.assertIsNone(config.mac_filters) + + self.assertIsNone(config.output_file) + + self.assertFalse(config.fake) + self.assertFalse(config.debug) + + with self.assertLogs(self.logger, level=logging.DEBUG): + self.assertEqual( + config.frame_filter, + "type mgt subtype probe-req" + ) + + def test_non_existing_interface(self): + """ + Tests if an exception is well raised when setting a non-existing + network interface. + """ + + with patch("probequest.config.get_if_list", return_value=("wlan0", + "wlan0mon")): + config = Config() + + with self.assertRaises(InterfaceDoesNotExistException): + config.interface = "wlan1" + + def test_existing_interface(self): + """ + Tests with an existing network interface. + """ + + # pylint: disable=no-self-use + + with patch("probequest.config.get_if_list", return_value=("wlan0", + "wlan0mon")): + config = Config() + config.interface = "wlan0" + + def test_frame_filter_with_mac_filtering(self): + """ + Tests the frame filter when some MAC addresses need to be filtered. + """ + + config = Config() + config.mac_filters = ["a4:77:33:9a:73:5c", "b0:05:94:5d:5a:4d"] + + with self.assertLogs(self.logger, level=logging.DEBUG): + self.assertEqual( + config.frame_filter, + "type mgt subtype probe-req" + + " and (ether src host a4:77:33:9a:73:5c" + + "|| ether src host b0:05:94:5d:5a:4d)" + ) + + def test_frame_filter_with_mac_exclusion(self): + """ + Tests the frame filter when some MAC addresses need to be excluded. + """ + + config = Config() + config.mac_exclusions = ["a4:77:33:9a:73:5c", "b0:05:94:5d:5a:4d"] + + with self.assertLogs(self.logger, level=logging.DEBUG): + self.assertEqual( + config.frame_filter, + "type mgt subtype probe-req" + + " and not (ether src host a4:77:33:9a:73:5c" + + "|| ether src host b0:05:94:5d:5a:4d)" + ) + + def test_compiled_essid_regex_with_an_empty_regex(self): + """ + Tests 'compiled_essid_regex' with an empty regex. + """ + + config = Config() + compiled_regex = config.compiled_essid_regex + + self.assertEqual(compiled_regex, None) + + def test_compiled_essid_regex_with_a_case_sensitive_regex(self): + """ + Tests 'compiled_essid_regex' with a case-sensitive regex. + """ + + config = Config() + config.essid_regex = "Free Wi-Fi" + + with self.assertLogs(self.logger, level=logging.DEBUG): + compiled_regex = config.compiled_essid_regex + + self.assertEqual(compiled_regex, rcompile(config.essid_regex)) + + def test_compiled_essid_regex_with_a_case_insensitive_regex(self): + """ + Tests 'compiled_essid_regex' with a case-insensitive regex. + """ + + config = Config() + config.essid_regex = "Free Wi-Fi" + config.ignore_case = True + + with self.assertLogs(self.logger, level=logging.DEBUG): + compiled_regex = config.compiled_essid_regex + + self.assertEqual(compiled_regex, rcompile( + config.essid_regex, IGNORECASE)) diff --git a/tests/unit/test_probe_request.py b/tests/unit/test_probe_request.py new file mode 100644 index 0000000..276cf8e --- /dev/null +++ b/tests/unit/test_probe_request.py @@ -0,0 +1,97 @@ +""" +Unit tests for the probe request module. +""" + +import unittest +from netaddr.core import AddrFormatError + +from probequest.probe_request import ProbeRequest + + +class TestProbeRequest(unittest.TestCase): + """ + Unit tests for the 'ProbeRequest' class. + """ + + def test_without_parameters(self): + """ + Initialises a 'ProbeRequest' object without any parameter. + """ + + # pylint: disable=no-value-for-parameter + + with self.assertRaises(TypeError): + _ = ProbeRequest() + + def test_with_only_one_parameter(self): + """ + Initialises a 'ProbeRequest' object with only one parameter. + """ + + # pylint: disable=no-value-for-parameter + + timestamp = 1517872027.0 + + with self.assertRaises(TypeError): + _ = ProbeRequest(timestamp) + + def test_with_only_two_parameters(self): + """ + Initialises a 'ProbeRequest' object with only two parameters. + """ + + # pylint: disable=no-value-for-parameter + + timestamp = 1517872027.0 + s_mac = "aa:bb:cc:dd:ee:ff" + + with self.assertRaises(TypeError): + _ = ProbeRequest(timestamp, s_mac) + + def test_create_a_probe_request(self): + """ + Creates a new 'ProbeRequest' with all the required parameters. + """ + + # pylint: disable=no-self-use + + timestamp = 1517872027.0 + s_mac = "aa:bb:cc:dd:ee:ff" + essid = "Test ESSID" + + _ = ProbeRequest(timestamp, s_mac, essid) + + def test_bad_mac_address(self): + """ + Initialises a 'ProbeRequest' object with a malformed MAC address. + """ + + timestamp = 1517872027.0 + s_mac = "aa:bb:cc:dd:ee" + essid = "Test ESSID" + + with self.assertRaises(AddrFormatError): + probe_req = ProbeRequest(timestamp, s_mac, essid) + _ = probe_req.s_mac_oui + + def test_print_a_probe_request(self): + """ + Initialises a 'ProbeRequest' object and prints it. + """ + + timestamp = 1517872027.0 + s_mac = "aa:bb:cc:dd:ee:ff" + essid = "Test ESSID" + + probe_req = ProbeRequest(timestamp, s_mac, essid) + + self.assertNotEqual( + str(probe_req).find("Mon, 05 Feb 2018 23:07:07"), + -1 + ) + self.assertNotEqual( + str(probe_req).find( + "aa:bb:cc:dd:ee:ff (Unknown OUI) -> Test ESSID" + ), + -1 + ) diff --git a/tests/unit/test_probe_request_parser.py b/tests/unit/test_probe_request_parser.py new file mode 100644 index 0000000..6e47007 --- /dev/null +++ b/tests/unit/test_probe_request_parser.py @@ -0,0 +1,60 @@ +""" +Unit tests for the probe request parser module. +""" + +import unittest + +from scapy.layers.dot11 import RadioTap, Dot11, Dot11ProbeReq, Dot11Elt +from scapy.packet import fuzz + +from probequest.probe_request_parser import ProbeRequestParser + + +class TestProbeRequestParser(unittest.TestCase): + """ + Unit tests for the 'ProbeRequestParser' class. + """ + + dot11_layer = Dot11( + addr1="ff:ff:ff:ff:ff:ff", + addr2="aa:bb:cc:11:22:33", + addr3="dd:ee:ff:11:22:33", + ) + + def test_no_probe_request_layer(self): + """ + Creates a non-probe-request Wi-Fi packet and parses it with the + 'ProbeRequestParser.parse()' function. + """ + + with self.assertRaises(TypeError): + packet = RadioTap() / self.dot11_layer + ProbeRequestParser.parse(packet) + + def test_empty_essid(self): + """ + Creates a probe request packet with an empty ESSID field and parses + it with the 'ProbeRequestParser.parse()' function. + """ + + packet = RadioTap() \ + / self.dot11_layer \ + / Dot11ProbeReq() \ + / Dot11Elt( + info="" + ) + + ProbeRequestParser.parse(packet) + + def test_fuzz_packets(self): + """ + Parses 1000 randomly-generated probe requests with the + 'ProbeRequestParser.parse()' function. + """ + + # pylint: disable=no-self-use + + with self.assertRaises(TypeError): + for _ in range(0, 1000): + packet = RadioTap()/fuzz(Dot11()/Dot11ProbeReq()/Dot11Elt()) + ProbeRequestParser.parse(packet) diff --git a/tests/unit/utils.py b/tests/unit/utils.py new file mode 100644 index 0000000..b09f217 --- /dev/null +++ b/tests/unit/utils.py @@ -0,0 +1,32 @@ +""" +Common assets for the unit tests. +""" + +from argparse import Namespace + + +def create_fake_config(): + """ + Creates and returns a fake 'Config' object. + """ + + config = Namespace() + + config.interface = None + + config.essid_filters = None + config.essid_regex = None + config.ignore_case = False + + config.mac_exclusions = None + config.mac_filters = None + + config.output_file = None + + config.fake = False + config.debug = False + + config.compiled_essid_regex = None + config.frame_filter = None + + return config diff --git a/tox.ini b/tox.ini new file mode 100644 index 0000000..12c38df --- /dev/null +++ b/tox.ini @@ -0,0 +1,35 @@ +# tox (https://tox.readthedocs.io/) is a tool for running tests +# in multiple virtualenvs. This configuration file will run the +# test suite on all supported python versions. To use it, "pip install tox" +# and then run "tox" from this directory. + +[tox] +envlist = py37, py38, py39, py310, flake8, pylint +skip_missing_interpreters = true +minversion = 3.0 +isolated_build = true + +[testenv] +description = "ProbeQuest's unit tests" +commands = + {envpython} -m unittest discover -s tests + +[testenv:flake8] +description = "Check ProbeQuest's code style & quality" +deps = flake8 +commands = + {envpython} -m flake8 src tests + +[testenv:pylint] +description = "Check ProbeQuest for programming errors" +deps = pylint +commands = + {envpython} -m pylint --rcfile={toxinidir}/setup.cfg src tests + +[gh-actions] +description = "tox configuration when running on GitHub Actions" +python = + 3.7: py37, flake8, pylint + 3.8: py38, flake8, pylint + 3.9: py39, flake8, pylint + 3.10: py310, flake8, pylint