Skip to content

Commit

Permalink
Repo recreated on GitHub
Browse files Browse the repository at this point in the history
  • Loading branch information
meeas committed Jan 28, 2019
1 parent f5c9bb6 commit 78a4dea
Show file tree
Hide file tree
Showing 8 changed files with 1,115 additions and 0 deletions.
7 changes: 7 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
.DS_Store
*.pyc
__pycache__/
.mypy_cache/
src/ctserial/__pycache__/
dist/
ctserial.egg-info/
674 changes: 674 additions & 0 deletions LICENSE

Large diffs are not rendered by default.

42 changes: 42 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# Control Things Serial

ctserial goal is to become the security professional's Swiss army knife for interacting with raw serial devices

# Installation:

As long as you have git and Python 3.5 or later installed, all you should need to do is:

```
git clone https://github.com/ControlThingsTools/ctserial.git
cd ctserial
pip3 install -r requirements.txt
```

# Usage:

First, start the tool from a terminal. Then connect to your serial device and interact with it. For example:

```
ctmodbus> connect /dev/your-serial-device
ctmodbus> sendhex deadc0de (sends actual hex, so 4 bytes)
ctmodbus> sendhex \xde \xad c0de (sends same hex as before, ignoring spaces and \x)
ctmondus> send Dead Code 国 (sends full utf-8 string without spaces)
ctmodbus> send "Dead Code 国" (Use quotes if you need spaces)
ctmodbus> exit
```

# Platform Independence

Python 3.5+ and all dependencies are available for all major operating systems. It is primarily developed on MacOS and Linux, but should work in Windows as well.

# Author

* Justin Searle <[email protected]>

Based on the excellent work of Philipp Klaus <[email protected]> which can be found at:

* (https://github.com/pklaus/jpnevulator.py)

Which in turn was a python implementation of:

* [jpnevulator](http://jpnevulator.snarl.nl/)
2 changes: 2 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
-e git+git://github.com/ControlThingsTools/ctui.git@master#egg=ctui
-e .
74 changes: 74 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
# -*- encoding: utf-8 -*-
from __future__ import absolute_import
from __future__ import print_function

import io
import re
from glob import glob
from os.path import basename
from os.path import dirname
from os.path import join
from os.path import splitext

from setuptools import find_packages
from setuptools import setup


def read(*names, **kwargs):
return io.open(
join(dirname(__file__), *names),
encoding=kwargs.get('encoding', 'utf8')
).read()

setup(
name='ctserial',
version='0.3',
license='GPLv3',
description='ctserial is a security professional\'s swiss army knife for interacting with raw serial devices',
long_description=open('README.md').read(),
author='Justin Searle',
author_email='[email protected]',
url='https://github.com/ControlThingsTools/ctserial',
packages=find_packages('src'),
package_dir={'': 'src'},
py_modules=[splitext(basename(path))[0] for path in glob('src/*.py')],
include_package_data=True,
zip_safe=False,
classifiers=[
# complete classifier list: http://pypi.python.org/pypi?%3Aaction=list_classifiers
'Development Status :: 5 - Production/Stable',
'Intended Audience :: Developers',
'Intended Audience :: Information Technology',
'Intended Audience :: Manufacturing',
'Intended Audience :: Other Audience',
'Intended Audience :: Science/Research',
'License :: OSI Approved :: GNU General Public License v3 or later (GPLv3+)',
'Operating System :: Unix',
'Operating System :: POSIX',
'Operating System :: Microsoft :: Windows',
'Programming Language :: Python',
'Programming Language :: Python :: 3.5',
'Programming Language :: Python :: 3.6',
'Programming Language :: Python :: Implementation :: CPython',
'Programming Language :: Python :: Implementation :: PyPy',
'Topic :: Utilities',
],
keywords=[
'serial', 'pentest', 'ControlThingsTools', 'ControlThingsPlatform',
],
install_requires=[
'ctui',
'pyserial',
'tabulate'
],
extras_require={
# eg:
# 'rst': ['docutils>=0.11'],
# ':python_version=="2.6"': ['argparse'],
},
entry_points={
'console_scripts': [
'ctserial = ctserial.commands:main',
]
},
)
Empty file added src/ctserial/__init__.py
Empty file.
183 changes: 183 additions & 0 deletions src/ctserial/commands.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
# Copyright (C) 2018 Justin Searle
#
# This program is free software: you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or any later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details at <http://www.gnu.org/licenses/>.

import shlex
import re
import serial
import serial.tools.list_ports
import time

from ctui.application import Ctui
from ctui.dialogs import message_dialog
from tabulate import tabulate


class CtSerial(Ctui):
"""Commands that users may use at the application prompt."""
name = 'ctmodbus'
version = '0.5'
description = 'a security professional\'s swiss army knife for interacting with Modbus devices'
prompt = 'ctmodbus> '
session = None
unit_id = 1
statusbar = 'Session:{}'.format(session)
session = ''
output_format = 'mixed'
macro_hex = {}


# def get_statusbar_text():
# sep = ' - '
# session = get_app().session
# if type(session) == serial.Serial:
# device = 'connected:' + session.port
# else:
# device = 'connected:None'
# output_format = 'output:' + get_app().output_format
# return sep.join([device, output_format])


def do_connect(self, args, output_text):
"""Generate a session with a single serial device to interact with it."""
parts = args.split()
devices = [x.device for x in serial.tools.list_ports.comports()]
if len(parts) > 0:
device = parts[0]
if len(parts) > 1:
baudrate = parts[1]
else:
baudrate = 9600
if device in devices:
self.session = serial.Serial(
port=device,
baudrate=baudrate,
parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE,
bytesize=serial.EIGHTBITS)
# initiate a serial session and return success message
self.session.isOpen()
output_text += 'Connect session opened with {}\n'.format(device)
return output_text
# return list of devices if command incomplete or incorrect
message = 'Valid devices:\n' + ', \n'.join(devices) + '\n'
message_dialog(title='Invalid Device', text=message)
return False


def do_close(self, args, output_text):
"""Close a session."""
if type(self.session) != serial.Serial:
output_text += 'Connect to a device first\n'
return output_text
else:
device = self.session.port
self.session.close()
output_text += 'Session with {} closed.'.format(device) + '\n'
self.session = ''
return output_text


def _send_instruction(self, session, tx_bytes):
"""Send data to serial device"""
# clear out any leftover data
try:
rx_raw = bytes()
if session.inWaiting() > 0:
session.flushInput()
session.write(tx_bytes)
time.sleep(0.1)
while session.inWaiting() > 0:
rx_raw += session.read()
except BaseException as e:
output = '\n\n{}'.format(e)
time.sleep(0.1)
return rx_raw


def _format_output(self, raw_bytes, prefix=''):
""" Return hex and utf-8 decodes aligned on two lines """
if len(raw_bytes) == 0:
return prefix + 'None'
table = []
if self.output_format == 'hex' or self.output_format == 'mixed':
hex_out = [prefix] + list(bytes([x]).hex() for x in raw_bytes)
table.append(hex_out)
if self.output_format == 'ascii' or self.output_format == 'mixed':
ascii_out = [' ' * len(prefix)] + list(raw_bytes.decode('ascii', 'replace'))
table.append(ascii_out)
if self.output_format == 'utf-8':
# TODO: track \xefbfdb and replace with actual sent character
utf8 = raw_bytes.decode('utf-8', 'replace')
utf8_hex_out = [prefix] + list(x.encode('utf-8').hex() for x in utf8)
utf8_str_out = [' ' * len(prefix)] + list(utf8)
table = [utf8_hex_out, utf8_str_out]
return tabulate(table, tablefmt="plain", stralign='right')


def do_sendhex(self, args, output_text):
"""Send raw hex to serial device."""
if type(self.session) != serial.Serial:
output_text += 'Connect to a device first\n'
return output_text
data = args.lower().replace("0x", "")
if re.match('^[0123456789abcdef\\\\x ]+$', data):
raw_hex = re.sub('[\\\\x ]', '', data)
if len(raw_hex) % 2 == 0:
tx_bytes = bytes.fromhex(raw_hex)
session = self.session
rx_bytes = self._send_instruction(session, tx_bytes)
output_text += self._format_output(tx_bytes, prefix='--> ') + '\n'
output_text += self._format_output(rx_bytes, prefix='<-- ') + '\n'
return output_text
return False


def do_send(self, args, output_text):
"""Send string to serial device."""
if type(self.session) != serial.Serial:
output_text += 'Connect to a device first\n'
return output_text
if len(args) > 0:
# remove spaces not in quotes and format
string = ''.join(shlex.split(args))
tx_bytes = bytes(string, encoding='utf-8')
session = self.session
rx_bytes = self._send_instruction(session, tx_bytes)
output_text += self._format_output(tx_bytes, prefix='--> ') + '\n'
output_text += self._format_output(rx_bytes, prefix='<-- ') + '\n'
return output_text
return False


def do_macro_set(self, args, output_text):
v1 = args[: args.find(' ')]
v2 = args[args.find(' '): ]
self.macro_hex[v1] = v2
output_text += "key " + v1 + " set to value " + v2 + "\n"
return output_text


def do_macro_send(self, args, output_text):
macro = self.macro_hex[args]
if macro:
return self.do_sendhex(macro, output_text)
output_text += 'Unknown macro\n'
return output_text




def main():
ctserial = CtSerial()
ctserial.run()

if __name__ == '__main__':
main()
Loading

0 comments on commit 78a4dea

Please sign in to comment.