From aeb746a1c76bdd4b187e1ba2c2471beb7e8bc00d Mon Sep 17 00:00:00 2001 From: eHonnef Date: Mon, 27 Nov 2023 18:23:16 +0100 Subject: [PATCH 1/4] CoAP socket implementation, client and server Fixing response payload Some docstring and bug fixes. Finished CoAP server logic implementation Added client interaction Client/Server done. Added delayed response handling Fixing small problems Unit tests Documentation --- scapy/contrib/coap_socket.py | 1001 ++++++++++++++++++++++++++++++++++ test/contrib/coap_socket.uts | 129 +++++ 2 files changed, 1130 insertions(+) create mode 100644 scapy/contrib/coap_socket.py create mode 100644 test/contrib/coap_socket.uts diff --git a/scapy/contrib/coap_socket.py b/scapy/contrib/coap_socket.py new file mode 100644 index 00000000000..81194e45aba --- /dev/null +++ b/scapy/contrib/coap_socket.py @@ -0,0 +1,1001 @@ +# SPDX-License-Identifier: GPL-2.0-only +# This file is part of Scapy +# See https://scapy.net/ for more information +# Copyright (C) 2024 eHonnef + +# scapy.contrib.description = CoAP Socket Library / RFC 7252 +# scapy.contrib.status = library + +import logging +import random +import socket +import time + +# Typing imports +from typing import ( + Optional, + Union, + Tuple, + Any, + cast, + Type +) + +from scapy.error import Scapy_Exception +from scapy.packet import Packet +from scapy.contrib.coap import CoAP, coap_options, coap_codes +from scapy.contrib.isotp.isotp_soft_socket import TimeoutScheduler +from scapy.data import MTU +from scapy.utils import EDecimal +from scapy.automaton import ObjectPipe, select_objects + +from scapy.supersocket import SuperSocket, SimpleSocket + +""" +CoAP message request codes (RFC 7252 @ section-5.8.1) +""" +EMPTY_MESSAGE = 0 +GET = 1 +POST = 2 +PUT = 3 +DELETE = 4 +COAP_REQ_CODES = [GET, POST, PUT, DELETE] +""" +CoAP message response codes (RFC 7252 @ section-12.1.2) +Also, from scapy.contrib.coap.coap_codes +""" +EMPTY_ACK = EMPTY_MESSAGE +CONTENT_205 = 69 +NOT_FOUND_404 = 132 +NOT_ALLOWED_405 = 133 +NOT_IMPLEMENTED_501 = 161 +""" +CoAP content type (RFC 7252 @ section-12.3) +""" +CF_TEXT_PLAIN = b"\x00" +CF_APP_LINK_FORMAT = b"\x28" +CF_APP_XML = b"\x29" +CF_APP_OCTET_STREAM = b"\x2A" +CF_APP_EXI = b"\x2F" +CF_APP_JSON = b"\x32" +""" +CoAP options (RFC 7252 @ section-5.10) +""" +PAYMARK = b"\xff" +URI_PATH = 11 +CONTENT_FORMAT = 12 +""" +CoAP message type +""" +CON = 0 +NON = 1 +ACK = 2 +RST = 3 + +log_coap_sock = logging.getLogger("scapy.contrib.coap_socket") + + +class CoAPSocket(SuperSocket): + """ + CoAP socket with client and server capabilities. + + General and defaults timeouts for the protocol - RFC 7252 @ section-4.8.2 + + Client example: + >>> with CoAPSocket("127.0.0.1", 1234) as coap_client: + >>> req = CoAPSocket.make_coap_req_packet( + >>> method=GET, uri="endpoint-uri", payload=b"") + >>> coap_client.send("127.0.0.1", 5683, req) + >>> # Careful, this will block until the coap_client receives something + >>> res = coap_client.recv() + + Server without specifying resources: + >>> with CoAPSocket("127.0.0.1", 5683) as coap_server: + >>> while True: + >>> pkg = coap_server.recv() + >>> handle_package(pkg) + + Server with custom resources: + >>> class DummyResource(CoAPResource): + >>> def get(self, payload, options, token, sa_ll): + >>> return {"type": ACK, "code": CONTENT_205, + >>> "options": [(CONTENT_FORMAT, CF_TEXT_PLAIN)], + >>> "payload": b'dummy response'} + >>> + >>> class DelayedResource(CoAPResource): + >>> def __init__(self, url): + >>> CoAPResource.__init__(self, url=url) + >>> self.delayed_tokens = [] + >>> def delayed_message(self): + >>> token, address = self.delayed_tokens.pop(0) + >>> pkt = CoAPSocket.make_delayed_resp_packet(token, + >>> [(CONTENT_FORMAT, CF_TEXT_PLAIN)], b"delayed payload") + >>> self._send_separate_response(pkt, address) + >>> def get(self, payload, options, token, sa_ll): + >>> # We know that this can take a while, so we return an empty ACK now + >>> # and wait for whatever resource to be available. + >>> TimeoutScheduler.schedule(1, self.delayed_message) + >>> self.delayed_tokens.append((token, sa_ll)) + >>> return CoAPSocket.empty_ack_params() + >>> # Doesn't matter if it starts with "/dummy" or "dummy", + >>> # but it is an error if it is in the end + >>> lst_resources = [DummyResource("dummy"), DelayedResource("/delayed")]. + >>> with CoAPSocket("127.0.0.1", 5683, lst_resources=lst_resources) as coap_socket: + >>> while True: + >>> pkg = coap_socket.recv() + >>> # You can handle the packages inside your resources, + >>> # here will only be the "unhandled" ones. + + :param ip: ip address to bind udp socket to. + :param port: port to bind udp socket to. + :param ack_timeout: the time, in ms, that we should wait for the acknowledgment + after sending a request. + :param retries: amount of retransmissions before giving up on the request. + :param duplication_response_timeout: Timeout, in fractions of seconds, that we will + keep the response in case a response get lost. + :param lst_resources: optional, list of registered resources. + :param sock: optional, a socket instance to transmit, + if None, a classic UDP socket will be open and bound to ip/port. + :param close_on_timeout: Will try to close the socket if the retries is exceeded + """ + + def __init__(self, + ip="", # type: str + port=5683, # type: int + ack_timeout=500, # type: int + retries=3, # type: int + duplication_response_timeout=1.00, # type: float + lst_resources=None, # type: Optional[None, list[CoAPResource]] + sock=None, # type: Optional[None, SuperSocket, any] + close_on_timeout=False # type: bool + ): + self.impl = CoAPSocketImpl(ip, port, ack_timeout, retries, + duplication_response_timeout, lst_resources, sock, + close_on_timeout) + + self.ins = cast(socket.socket, self.impl) + self.outs = cast(socket.socket, self.impl) + self.basecls = CoAP + + def recv_raw(self, x=0xffff): + # type: (int) -> Tuple[Optional[Type[Packet]], Optional[bytes], Optional[float]] + if not self.closed: + tup = self.impl.recv() + if tup is not None: + return self.basecls, tup[0], float(tup[1]) + return self.basecls, None, None + + def recv(self, x=MTU, **kwargs): + # type: (int, **Any) -> Optional[Packet] + return super(CoAPSocket, self).recv(x, **kwargs) + + def close(self): + # type: () -> None + if not self.closed: + self.impl.close() + self.closed = True + + def send(self, ip, port, x): + # type: (str, int, CoAP) -> None + self.impl.send(ip, port, x) + + @staticmethod + def make_coap_req_packet(method=GET, uri="", options=None, payload=b""): + # type: (int, str, list[tuple], bytes) -> Packet + """ + Create a CoAP request packet + + :param method: The target method, one of: GET, POST, PUT, DELETE + :param uri: The destination uri + :param options: The options, should be a list of tuples. + You must specify here the payload type. + Example: options = [(CONTENT_FORMAT, CF_APP_XML)] + :param payload: The payload to send, should be a byte array + :return: The CoAP packet. + """ + return CoAPSocketImpl.make_coap_req_packet(method, uri, options, payload) + + @staticmethod + def make_coap_resp_packet(coap_type, code, token, message_id, options=None, + payload=b""): + # type: (int, int, bytes, int, list[tuple], bytes) -> Packet + """ + Create a CoAP response packet + + :param coap_type: Message type, one of: CON, NON, ACK, RST + :param code: Response code, one of: EMPTY_ACK, CONTENT_205, NOT_FOUND_404, + NOT_ALLOWED_405, NOT_IMPLEMENTED_501 + :param token: The token from the request + :param message_id: The message id from the request + :param options: The options, should be a list of tuples. + You must specify here the payload type. If applicable. + Example: options = [(CONTENT_FORMAT, CF_APP_XML)] + :param payload: The payload to send, should be a byte array. + :return: The CoAP packet. + """ + return CoAPSocketImpl.make_coap_resp_packet(coap_type, code, token, message_id, + options, payload) + + @staticmethod + def empty_ack_params(): + # type: () -> dict + """ + A dictionary containing the base parameters for the empty ACK response. + Later, you should also add the request msg_id. + + :return: A dictionary containing the parameters necessary to build a + CoAP package for an empty ACK response. + """ + # {"type": ACK, "code": EMPTY_MESSAGE, "tkl": 0, "token": b'', "options": []} + return CoAPSocketImpl.empty_ack_params() + + @staticmethod + def make_delayed_resp_packet(token, options, payload): + # type: (int|bytes, list[tuple], bytes) -> Packet + """ + This will create a CoAP packet that contains all the correct parameters + for the delayed response. + The msg_id is not necessary to be specified, it will be random generated. + After all, this is similar to a new request. + + :param token: The original request token + :param options: The options, should be a list of tuples. + You must specify here the payload type. If applicable. + Example: options = [(CONTENT_FORMAT, CF_APP_XML)] + :param payload: The payload to send, should be a byte array. + :return: The CoAP packet. + """ + return CoAPSocketImpl.make_delayed_resp_packet(token, options, payload) + + +class CoAPResource: + """ + User should implement this class if he wants an answering machine for the CoAPSocket + + :param url: the resource URL + :param content_format: the default content format, this can be overridden by + specifying the CF in the method's return value. RFC 7252 @ section-7.2.1 + :param title: A human-readable title for this resource. RFC 5988 @ section 5.4. + :param description: One can think of this as describing verbs usable on a resource. + RFC 6690 @ section-3.1 + :param resource_type: One can think of this as a noun describing the resource. + RFC 6690 @ section-3.2 + """ + + def __init__(self, + url, # type: str + content_format=CF_TEXT_PLAIN, # type: bytes + title="", # type: str + description="", # type: str + resource_type="", # type: str + ): + # type: (...) -> None + self.url = url + if self.url[0] != "/": + self.url = "/" + self.url + self.description = description # if + self.content_format = content_format # ct + self.resource_type = resource_type # rt + self.title = title # title + self._coap_socket = None + self._duplication_dict = {} # type: dict[str, tuple[dict, float]] + + def get_CORE_string(self): + # type: () -> str + """ + Will return a CORE formatted string as specified in + RFC 6690 + RFC 7252 @ section-7.2.1 + """ + fmt_str = "<%s>;" % self.url + if self.description: + fmt_str += "if=\"%s\";" % self.description + if self.resource_type: + fmt_str += "rt=\"%s\";" % self.resource_type + if self.title: + fmt_str += "title=\"%s\"" % self.title + fmt_str += "ct=%d" % int().from_bytes(self.content_format, "big") + return fmt_str + + def get(self, payload, options, token, sa_ll): + # type: (bytes, list[tuple], int, tuple[str, int]) -> dict + + """ + Implementation of the get method for this resource. + User should return a dictionary containing, at least these keys: + + - type: one of the CoAP message type + - code: one of the CoAP message response codes (RFC 7252 @ section-12.1.2) + - options: a list of tuples with the options for the response + (RFC 7252 @ section-5.10). + Should have at least the pair CONTENT_FORMAT + - payload: optional, byte encoded payload + - token: the request token, in case you need to implement a delayed message + - sa_ll: the sender ip/port pair, + in case you need to implement a delayed message + + RFC 7252 @ section-5.8.1 + """ + return {"type": ACK, "code": NOT_ALLOWED_405, + "options": [(CONTENT_FORMAT, CF_TEXT_PLAIN)], + "payload": coap_codes[NOT_ALLOWED_405].encode("utf8")} + + def put(self, payload, options, token, sa_ll): + # type: (bytes, list[tuple], int, tuple[str, int]) -> dict + + """ + Implementation of the put method for this resource. + User should return a dictionary containing, at least these keys: + + - type: one of the CoAP message type + - code: one of the CoAP message response codes (RFC 7252 @ section-12.1.2) + - options: a list of tuples with the options for the response + (RFC 7252 @ section-5.10). + Should have at least the pair CONTENT_FORMAT + - payload: optional, byte encoded payload + - token: the request token, in case you need to implement a delayed message + - sa_ll: the sender ip/port pair, + in case you need to implement a delayed message + + RFC 7252 @ section-5.8.3 + """ + return {"type": ACK, "code": NOT_ALLOWED_405, + "options": [(CONTENT_FORMAT, CF_TEXT_PLAIN)], + "payload": coap_codes[NOT_ALLOWED_405].encode("utf8")} + + def check_duplicated(self, message_id, token): + # type: (int, int) -> bool + """Returns true if (message_id, token) duplicated.""" + return (message_id, token) in self._duplication_dict.keys() + + def _set_coap_socket(self, coap_socket): + # type: (CoAPSocketImpl) -> None + """Will set the CoAP socket internally, this will be called by CoAPSocketImpl""" + self._coap_socket = coap_socket + + def _register_request_response(self, message_id, token, response): + # type: (int, int, dict) -> None + """Registers a response in case it get lost""" + if (message_id, token) not in self._duplication_dict.keys(): + self._duplication_dict[(message_id, token)] = (response, time.monotonic()) + + def _get_response(self, message_id, token): + # type: (int, int) -> dict + """Returns the already sent message""" + return self._duplication_dict[(message_id, token)][0] + + def _duplicates_cleanup(self, timeout): + # type: (float) -> None + """ + Will clean up the duplication dictionary if response timestamp + + timeout is less than now + """ + now = time.monotonic() + deletion_list = [key for key, value in self._duplication_dict.items() if + (value[1] + timeout) <= now] + for key in deletion_list: + log_coap_sock.debug("Removing response: MessageID=%s; Token=0x%x", key[0], + key[1]) + del self._duplication_dict[key] + + def _send_separate_response(self, pkt, sa_ll): + # type: (CoAP, tuple[str, int]) -> None + """ + Will create a separate response, that will be treated as a + new request by the CoAPSocket. + :param pkt: The built packet. + :param sa_ll: The ip/port pair to the target machine. + """ + request = CoAPSocketImpl.CoAPRequest(sa_ll[0], sa_ll[1], + self._coap_socket.retries, + self._coap_socket.ack_timeout, + pkt) + self._coap_socket.tx_queue.send(request) + + +class CoAPSocketImpl: + """ + Implementation of a CoAP socket with client and server capabilities. + + :param ip: ip address to bind udp socket to. + :param port: port to bind udp socket to. + :param ack_timeout: the time, in ms, that we should wait for the acknowledgment + after sending a request. + :param retries: amount of retransmissions before giving up on the request. + :param duplication_response_timeout: Timeout, in fractions of seconds, + that we will keep the response in case a response get lost. + :param lst_resources: optional, list of registered resources. + :param sock: optional, a socket instance to transmit, + if None, a classic UDP socket will be open and bound to ip/port. + :param close_on_timeout: Will try to close the socket if the retries is exceeded + """ + + def __init__(self, + ip="", # type: str + port=5683, # type: int + ack_timeout=500, # type: int + retries=3, # type: int + duplication_response_timeout=1.00, # type: float + lst_resources=None, # type: Optional[None, list["CoAPResource"]] + sock=None, # type: Optional[None, SuperSocket, any] + close_on_timeout=False # type: bool + ): + # type: (...) -> None + + self.ip = ip + self.port = port + self.ack_timeout = ack_timeout + self.duplication_response_timeout = duplication_response_timeout + self.retries = retries + self.close_on_timeout = close_on_timeout + + # For development: set this to True, so it will drop rx/tx packages on purpose, + # this way it is possible to test the retransmission mechanism + self._enable_debug = False + self._debug_drop_package_number = 1 # Will drop the first received package + self._debug_drop_package_counter = 0 + + if lst_resources is not None: + self.resources = {} # type: dict[str, CoAPResource] + for res in lst_resources: + if res.url not in self.resources.keys(): + self.resources[res.url] = res + res._set_coap_socket(self) + else: + log_coap_sock.error( + "Duplicated URL for different resources:\nURL=%s", res.url) + + # Only creates the well-known resource if we have some answering machine + self.resources["/.well-known/core"] = CoAPSocketImpl.WellKnownResource( + lst_resources) + else: + self.resources = None + + if sock is None: + s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) + s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) + s.bind((self.ip, self.port)) + self.sock = SimpleSocket(s) + else: + self.sock = SimpleSocket(sock) + + self.poll_rate = 0.005 + self.closed = False + + self.rx_queue = ObjectPipe[Tuple[bytes, Union[float, EDecimal]]]() + self.tx_queue = ObjectPipe[CoAPSocketImpl.CoAPRequest]() + + self.rx_handle = TimeoutScheduler.schedule(self.poll_rate, self._recv) + self.tx_handle = TimeoutScheduler.schedule(self.poll_rate, self._send) + + # type: dict[tuple[int,int], CoAPSocketImpl.CoAPRequest] + self.pending_requests = {} + + def __del__(self): + self.close() + + def recv(self, timeout=None): + # type: (Optional[int]) -> Optional[Tuple[bytes, Union[float, EDecimal]]] + return self.rx_queue.recv(timeout) + + def send(self, ip, port, x): + # type: (str, int, CoAP) -> None + self.tx_queue.send( + CoAPSocketImpl.CoAPRequest(ip, port, self.retries, self.ack_timeout, x)) + + def close(self): + # type: () -> None + try: + if select_objects([self.tx_queue], 0): + log_coap_sock.warning("TX queue not empty") + time.sleep(0.1) + except OSError: + pass + + try: + if select_objects([self.rx_queue], 0): + log_coap_sock.warning("RX queue not empty") + except OSError: + pass + + self.closed = True + try: + self.sock.close() + except Scapy_Exception: + pass + try: + self.rx_handle.cancel() + except Scapy_Exception: + pass + try: + self.tx_handle.cancel() + except Scapy_Exception: + pass + + @staticmethod + def make_coap_req_packet(method=GET, uri="", options=None, payload=b""): + # type: (int, str, Optional[list[tuple]], bytes) -> Packet + """Check CoAPSocket for the documentation""" + + # Parse the uri as options + if uri[0] == "/": + uri = uri[1:] + parsed_opt = [(URI_PATH, x) for x in uri.split("/")] + + if options is not None: + parsed_opt.extend(options) + + msg_id, token = CoAPSocketImpl.generate_msgId_token() + coap_packet = CoAP(type=CON, code=method, options=parsed_opt, msg_id=msg_id, + tkl=len(token), token=token) + if payload: + coap_packet.paymark = PAYMARK + coap_packet.add_payload(payload) + + return coap_packet + + @staticmethod + def make_coap_resp_packet(coap_type, code, token, message_id, options, payload): + # type: (int, int, bytes, int, list[tuple], bytes) -> Packet + """Check CoAPSocket for the documentation""" + pkt_params = { + "type": coap_type, "code": code, "options": options, "msg_id": message_id, + "tkl": len(token), "token": token + } + if payload != b'': + pkt_params["paymark"] = PAYMARK + + pkt = CoAP(**pkt_params) + + if payload != b'': + pkt.add_payload(payload) + return pkt + + @staticmethod + def empty_ack_params(): + # type: () -> dict + return {"type": ACK, "code": EMPTY_MESSAGE, "tkl": 0, "token": b'', + "options": []} + + @staticmethod + def make_delayed_resp_packet(token, options, payload): + # type: (int|bytes, list[tuple], bytes) -> Packet + """Check CoAPSocket for the documentation""" + t = token + if isinstance(token, int): + t = token.to_bytes((token.bit_length() + 7) // 8, 'big') + return CoAPSocketImpl.make_coap_resp_packet(CON, CONTENT_205, t, + random.randint(0, 0xffff), + options, payload) + + @staticmethod + def generate_msgId_token(): + # type: () -> tuple[int, bytes] + """ + Will generate a pair of (msgId, token) with message + id in the range of [0, 0xffff] and a random token with size from 1 to 8 bytes + :return: msgId and token tuple + """ + + def _randbytes(): + return bytes([random.randint(1, 255) + for _ in range(random.randint(1, 8))]) + + return random.randint(0, 0xffff), _randbytes() + + def fileno(self): + return self.sock.fileno() + + def _recv(self): + # type: () -> None + """ + Method called periodically to poll the real socket for messages. + Also, this method will do periodic cleanups in the resources. + """ + # Do a cleanup in the resources + if self.resources is not None: + for _, resource in self.resources.items(): + resource._duplicates_cleanup(self.duplication_response_timeout) + + if self.sock.select([self.sock], 0): + pkt, sa_ll = self.sock.ins.recvfrom(MTU) + pkt = CoAP(bytes(pkt)) + if pkt: + if not self._debug_drop_package(): + self._on_pkt_recv(pkt, sa_ll) + self._debug_drop_package_counter = 0 + else: + self._debug_drop_package_counter += 1 + + if not self.closed and not self.sock.closed: + if self.sock.select([self.sock], 0): + poll_time = 0.0 + else: + poll_time = self.poll_rate + self.rx_handle = TimeoutScheduler.schedule(poll_time, self._recv) + else: + try: + self.rx_handle.cancel() + except Scapy_Exception: + pass + + def _on_pkt_recv(self, pkt, sa_ll): + # type: (CoAP, tuple[str, int]) -> None + """Handles a received package""" + # Request codes + if pkt.code in COAP_REQ_CODES: + if self.resources is None: + # No answering machine registered, user will handle it individually + self.rx_queue.send((pkt.build(), pkt.time)) + else: + self._handle_rcv_request(pkt, sa_ll) + else: + # Response, check pending requests + self._handle_request_response(pkt, sa_ll) + + def _post(self): + # type: () -> dict + """ + Creates a new resource. + @todo: handle resource POST: RFC 7252 @ section-5.8.2 + """ + return {"type": ACK, "code": NOT_ALLOWED_405, + "options": [(CONTENT_FORMAT, CF_TEXT_PLAIN)], + "payload": coap_codes[NOT_ALLOWED_405].encode("utf8")} + + def _delete(self, resource): + # type: (CoAPResource) -> dict + """ + Will remove resource from the server. + @todo: handle resource DELETE: RFC 7252 @ section-5.8.4 + """ + return {"type": ACK, "code": NOT_ALLOWED_405, + "options": [(CONTENT_FORMAT, CF_TEXT_PLAIN)], + "payload": coap_codes[NOT_ALLOWED_405].encode("utf8")} + + def _handle_rcv_request(self, pkt, sa_ll): + # type: (CoAP, tuple[str, int]) -> None + """Process a received request""" + req_uri = "/" + token = int.from_bytes(pkt.token, "big") # Can be up to 8 bytes + message_id = pkt.msg_id + lst_options = [] + response = {"type": ACK, "code": NOT_FOUND_404, + "options": [(CONTENT_FORMAT, CF_TEXT_PLAIN)], + "payload": coap_codes[NOT_FOUND_404].encode("utf8")} + + for option in pkt.options: + option_type_id = coap_options[1].get(option[0], -1) + option_value = option[1] + + if option_type_id == -1: + log_coap_sock.warning("Invalid option ID, ignoring: " + "ID=%s; Value=%s;", + option[0], option[1]) + elif option_type_id == URI_PATH: + req_uri += option_value.decode("ascii").casefold() + req_uri += "/" + else: + lst_options.append(option) + + # Special case: if we are requesting the root resource + if req_uri != "/": + req_uri = req_uri[:-1] # remove the extra "/" in the end + + resource = self.resources.get(req_uri, None) + if resource is not None: + if not resource.check_duplicated(message_id, token): + if pkt.code == GET: + response = resource.get(pkt.payload, lst_options, token, sa_ll) + elif pkt.code == POST: + # @todo: handle existing resource POST: RFC 7252 @ section-5.8.2 + pass + elif pkt.code == PUT: + response = resource.put(pkt.payload, lst_options, token, sa_ll) + elif pkt.code == DELETE: + response = self._delete(resource) + + resource._register_request_response(message_id, token, response) + else: + response = resource._get_response(message_id, token) + log_coap_sock.debug( + "Received duplicated request: " + "URI=%s; MessageID=%s; Token=0x%x", + req_uri, + message_id, token) + else: + if pkt.code == POST: + response = self._post() + else: + log_coap_sock.warning("Unknown resource: URI=%s", req_uri) + + response["tkl"] = pkt.tkl + response["token"] = pkt.token + response["msg_id"] = message_id + + if pkt.type == NON: + response["type"] = NON + + # Add paymark (separator between options and payload) + if "paymark" not in response.keys(): + response["paymark"] = PAYMARK + + # Remove useless fields for the empty ACK + if response["code"] == EMPTY_MESSAGE and response["type"] == ACK: + response["tkl"] = 0 + response["token"] = b"" + response.pop("paymark", None) + + # Assign payload to packet + pl = response.pop("payload", None) + p = CoAP(**response) + if pl is not None: + p.add_payload(pl) + + self._sock_send(sa_ll, p) + + def _start_new_client_request(self, request): + # type: (CoAPSocketImpl.CoAPRequest) -> None + """ + Starts a new client interaction. This function is meant to be called internally. + :param request: a CoAPRequest instance. + """ + if request.indexing() not in self.pending_requests.keys(): + log_coap_sock.debug("New client request: msg_id=%s; token=0x%x", + request.message_id, request.token) + self.pending_requests[request.indexing()] = request + self._sock_send((request.ip, request.port), request.get_pkt_and_mark()) + else: + log_coap_sock.warning( + "Duplicated request, will not be sent: msg_id=%s; token=0x%x", + request.message_id, + request.token) + + def _handle_pending_client_request(self, request): + # type: (CoAPSocketImpl.CoAPRequest) -> bool + """ + Will check the pending request and trigger a retransmission or deletion + of the request. + :param request: a CoAPRequest instance. + :return: Will return True if we should delete the request instance. + """ + result = False + if request.should_give_up(): + if not request.empty_ack_fulfilled: # To avoid misleading logs + log_coap_sock.warning( + "Expired number of retries, giving up: msg_id=%s; token=0x%x", + request.message_id, + request.token) + result = True + elif request.should_resend(): + self._sock_send((request.ip, request.port), request.get_pkt_and_mark()) + + return result + + def _handle_request_response(self, pkt, sa_ll): + # type: (CoAP, tuple[str, int]) -> None + """ + Handles a received response. Will check if there is the valid request. + Otherwise, it will put in the rx_queue for the user to handle it + via the recv() function. + :param pkt: The CoAP packet to be processed + :param sa_ll: The ip/port tuple of the sender + """ + token = int.from_bytes(pkt.token, "big") + index = (pkt.msg_id, token) + request = self.pending_requests.get(index, None) + if request is None and (pkt.type == ACK or pkt.type == CON or pkt.type == NON): + for key in self.pending_requests.keys(): + if index[0] == key[0] or index[1] == key[1]: + log_coap_sock.info("Found request by using %s", + "token" if index[1] == key[1] + else "message_id") + request = self.pending_requests[key] + index = key + break + + if request is None: + log_coap_sock.warning( + "Request for received response not found: msg_id=%s; token=0x%x", + pkt.msg_id, token) + return + + if pkt.type == ACK and pkt.code != EMPTY_MESSAGE: + log_coap_sock.debug("Request fulfilled: msg_id=%s; token=0x%x; code=%s", + index[0], index[1], + coap_codes[pkt.code]) + del self.pending_requests[index] + # Piggybacked message, give it to the user + self.rx_queue.send((pkt.build(), pkt.time)) + elif pkt.type == ACK and pkt.code == EMPTY_MESSAGE: + log_coap_sock.debug( + "Server sent an empty ack, request will be fulfilled later: " + "msg_id=%s; token=0x%x; code=%s", + index[0], index[1], coap_codes[pkt.code]) + request.empty_ack_set() + elif pkt.type == CON and pkt.code == CONTENT_205: + log_coap_sock.debug( + "Received a delayed content for a previous request: msg_id=%s; " + "token=0x%x; code=%s", + index[0], index[1], coap_codes[pkt.code]) + + # We need to respond with an empty ACK + request.empty_ack_fulfilled = True + response = CoAPSocketImpl.empty_ack_params() + response["msg_id"] = pkt.msg_id + self._sock_send(sa_ll, CoAP(**response)) + + # Give the packet to the user + self.rx_queue.send((pkt.build(), pkt.time)) + else: + log_coap_sock.info("Not handled message, giving to user: " + "type=%s; code=%s;", + pkt.type, coap_codes[pkt.code]) + self.rx_queue.send((pkt.build(), pkt.time)) + + def _sock_send(self, address, pl): + # type: (tuple[str, int], Packet) -> None + self.sock.outs.sendto(pl.build(), address) + + def _send(self): + # type: () -> None + """ + Periodically checks the pending requests for either retransmitting or removing, + depends on the result of _handle_pending_client_request(). + """ + lst_remove = [] + for key, request in self.pending_requests.items(): + if self._handle_pending_client_request(request): + lst_remove.append(key) + + for key in lst_remove: + del self.pending_requests[key] + + if select_objects([self.tx_queue], 0): + request = self.tx_queue.recv() + if request: + self._start_new_client_request(request) + + if self.close_on_timeout and len(self.pending_requests) == 0: + self.close() + + if not self.closed: + self.tx_handle = TimeoutScheduler.schedule(self.poll_rate, self._send) + else: + try: + self.tx_handle.cancel() + except Scapy_Exception: + pass + + def _debug_drop_package(self): + # type: () -> bool + """ + Debug function where it will return if we should drop the + package to test the retransmission mechanism + """ + return (self._enable_debug and + self._debug_drop_package_counter < self._debug_drop_package_number) + + class WellKnownResource(CoAPResource): + """ + This is a default resource that will return information about all the registered + resources in the server. + Described at RFC 7252 @ section 7.2 and RFC 6690 + + :param lst_resources: List of CoAPResource. + """ + + def __init__(self, + lst_resources # type: list[CoAPResource] + ): + # type: (...) -> None + CoAPResource.__init__(self, url=".well-known/core", + content_format=CF_APP_LINK_FORMAT) + self.lst_resources = lst_resources + + def get(self, payload, options, token, sa_ll): + # type: (bytes, list[tuple], int, tuple[str, int]) -> dict + str_resources = ",".join([x.get_CORE_string() for x in self.lst_resources]) + return {"type": ACK, "code": CONTENT_205, + "options": [(CONTENT_FORMAT, CF_APP_LINK_FORMAT)], + "payload": str_resources.encode("ascii")} + + class CoAPRequest: + """ + Class to control a client request. + + :param ip: The remote server's ip address. + :param port: The remote server's port. + :param max_retries: Number of retransmissions before giving up. + :param retry_timeout: ACK timeout for retransmission. + :param pkt: The CoAP package to be sent. + """ + + def __init__(self, + ip, # type: str + port, # type: int + max_retries, # type: int + retry_timeout, # type: float + pkt, # type: CoAP + resource=None # type: Optional[CoAPResource] + ): + # type: (...) -> None + self.ip = ip + self.port = port + self.package = pkt + + self.message_id = pkt.msg_id + self.token = int.from_bytes(pkt.token, "big") + + self.tries = 0 + self.max_retries = max_retries + self.last_try_timestamp = 0.0 + self.base_retry_timeout = retry_timeout + self.retry_timeout = self.base_retry_timeout + + # Set this flag if an empty ack was received + self.received_empty_ack = False + self.empty_ack_timeout = 0 + self.empty_ack_fulfilled = False + self.resource = resource + + def get_pkt_and_mark(self): + # type: () -> Packet + """ + Returns the already sent packet for retransmission and sets + a new timeout for retry. + :return: A CoAP packet for retransmission. + """ + self.tries += 1 + self.last_try_timestamp = time.monotonic() + self.retry_timeout = self.base_retry_timeout * self.tries + + # Clear the empty ack flags + self.empty_ack_timeout = 0 + self.received_empty_ack = False + + return self.package + + def should_give_up(self): + # type: () -> bool + """ + Checks if we should give up on retransmission of this request. + :return: True if we should give up. + """ + return self.tries > self.max_retries + + def should_resend(self): + # type: () -> bool + """ + Checks if it is time to resend this request. + :return: True if we should resend. + """ + if self.received_empty_ack: + return ((self.last_try_timestamp + self.retry_timeout) <= + time.monotonic()) + else: + if self.empty_ack_fulfilled: + # This way, eventually, this request will be removed by the timer. + # It is to avoid late retransmissions. + self.tries += 1 + return ((not self.empty_ack_fulfilled) and + self.empty_ack_timeout <= time.monotonic()) + + def indexing(self): + # type: () -> tuple[int, int] + """ + Returns the indexing of this request. + :return: A tuple containing the message_id and token of this request. + """ + return self.message_id, self.token + + def empty_ack_set(self): + # type: () -> None + """ + Set the empty ack flag and will set the timeout. + After the timeout, it will resend the request until should_give_up() + is triggered. + """ + self.tries = 0 + self.received_empty_ack = True + self.empty_ack_timeout = time.monotonic() + 15 diff --git a/test/contrib/coap_socket.uts b/test/contrib/coap_socket.uts new file mode 100644 index 00000000000..91cd20890ac --- /dev/null +++ b/test/contrib/coap_socket.uts @@ -0,0 +1,129 @@ +% Regression tests for CoAPSocket + ++ Configuration +~ conf + += Imports +from scapy.contrib.coap_socket import * + += Redirect logging +import logging +from scapy.error import log_runtime + +from io import StringIO + +log_stream = StringIO() +handler = logging.StreamHandler(log_stream) +log_runtime.addHandler(handler) +log_coap_sock.addHandler(handler) + ++ Testing client -> server interactions + += Setup dummy resources + +responses = [b'dummy response', b'delayed response'] + +class DummyResource(CoAPResource): + def get(self, payload, options, token, sa_ll): + return {"type": ACK, "code": CONTENT_205, "options": [(CONTENT_FORMAT, CF_TEXT_PLAIN)], "payload": responses[0]} + + +class DelayedResource(CoAPResource): + def __init__(self, url): + CoAPResource.__init__(self, url=url) + self.delayed_tokens = [] + def delayed_message(self): + token, address = self.delayed_tokens.pop(0) + pkt = CoAPSocket.make_delayed_resp_packet(token, [(CONTENT_FORMAT, CF_TEXT_PLAIN)], responses[1]) + self._send_separate_response(pkt, address) + def get(self, payload, options, token, sa_ll): + # We know that this can take a while, so we return an empty ACK now and wait for whatever resource to be available. + TimeoutScheduler.schedule(1, self.delayed_message) + self.delayed_tokens.append((token, sa_ll)) + return CoAPSocket.empty_ack_params() + +lst_resources = [DummyResource("/dummy"), DelayedResource("delayed")] + += Send and receive package [.well-known/core] + +with CoAPSocket("127.0.0.1", 5683, lst_resources=lst_resources) as coap_server, CoAPSocket("127.0.0.1", 5684) as coap_client: + req = CoAPSocket.make_coap_req_packet(uri=".well-known/core", payload=b"") + coap_client.send("127.0.0.1", 5683, req) + res = coap_client.recv() + assert res.payload.load == b';ct=0,;ct=0' + assert res.type == ACK + assert res.code == CONTENT_205 + assert res.msg_id == req.msg_id + assert res.token == req.token + += Send and receive package [dummy] + +with CoAPSocket("127.0.0.1", 5683, lst_resources=lst_resources) as coap_server, CoAPSocket("127.0.0.1", 5684) as coap_client: + req = CoAPSocket.make_coap_req_packet(uri="dummy", payload=b"") + coap_client.send("127.0.0.1", 5683, req) + res = coap_client.recv() + assert res.payload.load == responses[0] + assert res.type == ACK + assert res.code == CONTENT_205 + assert res.msg_id == req.msg_id + assert res.token == req.token + += Send and receive package [/dummy] + +with CoAPSocket("127.0.0.1", 5683, lst_resources=lst_resources) as coap_server, CoAPSocket("127.0.0.1", 5684) as coap_client: + req = CoAPSocket.make_coap_req_packet(uri="/dummy", payload=b"") + coap_client.send("127.0.0.1", 5683, req) + res = coap_client.recv() + assert res.payload.load == responses[0] + assert res.type == ACK + assert res.code == CONTENT_205 + assert res.msg_id == req.msg_id + assert res.token == req.token + += Incorrect endpoint [dummy/] + +with CoAPSocket("127.0.0.1", 5683, lst_resources=lst_resources) as coap_server, CoAPSocket("127.0.0.1", 5684) as coap_client: + req = CoAPSocket.make_coap_req_packet(uri="dummy/", payload=b"") + coap_client.send("127.0.0.1", 5683, req) + res = coap_client.recv() + assert res.type == ACK + assert res.code == NOT_FOUND_404 + assert res.msg_id == req.msg_id + assert res.token == req.token + += Invalid method + +with CoAPSocket("127.0.0.1", 5683, lst_resources=lst_resources) as coap_server, CoAPSocket("127.0.0.1", 5684) as coap_client: + req = CoAPSocket.make_coap_req_packet(method=PUT, uri="dummy", payload=b"a payload") + coap_client.send("127.0.0.1", 5683, req) + res = coap_client.recv() + assert res.type == ACK + assert res.code == NOT_ALLOWED_405 + assert res.msg_id == req.msg_id + assert res.token == req.token + += Retransmission + +with CoAPSocket("127.0.0.1", 5683, lst_resources=lst_resources) as coap_server, CoAPSocket("127.0.0.1", 5684) as coap_client: + coap_server.impl._enable_debug = True + req = CoAPSocket.make_coap_req_packet(uri="/dummy", payload=b"") + coap_client.send("127.0.0.1", 5683, req) + res = coap_client.recv() + assert res.payload.load == responses[0] + assert res.type == ACK + assert res.code == CONTENT_205 + assert res.msg_id == req.msg_id + assert res.token == req.token + += Delayed response + +with CoAPSocket("127.0.0.1", 5683, lst_resources=lst_resources) as coap_server, CoAPSocket("127.0.0.1", 5684) as coap_client: + req = CoAPSocket.make_coap_req_packet(uri="/delayed", payload=b"") + coap_client.send("127.0.0.1", 5683, req) + res = coap_client.recv() + assert res.payload.load == responses[1] + assert res.type == CON + assert res.code == CONTENT_205 + # assert res.msg_id == req.msg_id - This assert doesn't make sense because it will send with another msg_id + assert res.token == req.token + assert res.payload.load == responses[1] From f6b86b5191d5f946b5fa0966d6042c4ca3fbfe41 Mon Sep 17 00:00:00 2001 From: eHonnef Date: Sat, 13 Apr 2024 21:39:20 +0200 Subject: [PATCH 2/4] Code reviews: - Moved the defines/enumerators to coap.py - Changed the send() function to match the SuperSocket declaration - Updated unit tests --- scapy/contrib/coap.py | 67 ++++++++++++++++++++++++++------ scapy/contrib/coap_socket.py | 74 +++++++++++------------------------- test/contrib/coap_socket.uts | 14 +++---- 3 files changed, 84 insertions(+), 71 deletions(-) diff --git a/scapy/contrib/coap.py b/scapy/contrib/coap.py index 1c8eb3d7b14..661a4b4e3c3 100644 --- a/scapy/contrib/coap.py +++ b/scapy/contrib/coap.py @@ -19,31 +19,71 @@ from scapy.error import warning from scapy.compat import raw +""" +CoAP message request codes (RFC 7252 @ section-5.8.1) +""" +EMPTY_MESSAGE = 0 +GET = 1 +POST = 2 +PUT = 3 +DELETE = 4 +COAP_REQ_CODES = [GET, POST, PUT, DELETE] +""" +CoAP message response codes (RFC 7252 @ section-12.1.2) +""" +EMPTY_ACK = EMPTY_MESSAGE +CONTENT_205 = 69 +NOT_FOUND_404 = 132 +NOT_ALLOWED_405 = 133 +NOT_IMPLEMENTED_501 = 161 +""" +CoAP content type (RFC 7252 @ section-12.3) +""" +CF_TEXT_PLAIN = b"\x00" +CF_APP_LINK_FORMAT = b"\x28" +CF_APP_XML = b"\x29" +CF_APP_OCTET_STREAM = b"\x2A" +CF_APP_EXI = b"\x2F" +CF_APP_JSON = b"\x32" +""" +CoAP options (RFC 7252 @ section-5.10) +""" +PAYMARK = b"\xff" +URI_PATH = 11 +CONTENT_FORMAT = 12 +""" +CoAP message type +""" +CON = 0 +NON = 1 +ACK = 2 +RST = 3 + coap_codes = { - 0: "Empty", + EMPTY_MESSAGE: "Empty", # Request codes - 1: "GET", - 2: "POST", - 3: "PUT", - 4: "DELETE", + GET: "GET", + POST: "POST", + PUT: "PUT", + DELETE: "DELETE", # Response codes 65: "2.01 Created", 66: "2.02 Deleted", 67: "2.03 Valid", 68: "2.04 Changed", - 69: "2.05 Content", + CONTENT_205: "2.05 Content", 128: "4.00 Bad Request", 129: "4.01 Unauthorized", 130: "4.02 Bad Option", 131: "4.03 Forbidden", - 132: "4.04 Not Found", - 133: "4.05 Method Not Allowed", + NOT_FOUND_404: "4.04 Not Found", + NOT_ALLOWED_405: "4.05 Method Not Allowed", 134: "4.06 Not Acceptable", 140: "4.12 Precondition Failed", 141: "4.13 Request Entity Too Large", 143: "4.15 Unsupported Content-Format", 160: "5.00 Internal Server Error", - 161: "5.01 Not Implemented", + NOT_IMPLEMENTED_501: "5.01 Not Implemented", 162: "5.02 Bad Gateway", 163: "5.03 Service Unavailable", 164: "5.04 Gateway Timeout", @@ -120,7 +160,8 @@ def _get_opt_val_size(pkt): class _CoAPOpt(Packet): fields_desc = [BitField("delta", 0, 4), BitField("len", 0, 4), - StrLenField("delta_ext", "", length_from=_get_delta_ext_size), # noqa: E501 + StrLenField("delta_ext", "", length_from=_get_delta_ext_size), + # noqa: E501 StrLenField("len_ext", "", length_from=_get_len_ext_size), StrLenField("opt_val", "", length_from=_get_opt_val_size)] @@ -149,7 +190,8 @@ class _CoAPOptsField(StrField): islist = 1 def i2h(self, pkt, x): - return [(coap_options[0][o[0]], o[1]) if o[0] in coap_options[0] else o for o in x] # noqa: E501 + return [(coap_options[0][o[0]], o[1]) if o[0] in coap_options[0] else o for o in + x] # noqa: E501 # consume only the coap layer from the wire string def getfield(self, pkt, s): @@ -214,7 +256,8 @@ class CoAP(Packet): name = "CoAP" fields_desc = [BitField("ver", 1, 2), - BitEnumField("type", 0, 2, {0: "CON", 1: "NON", 2: "ACK", 3: "RST"}), # noqa: E501 + BitEnumField("type", 0, 2, {0: "CON", 1: "NON", 2: "ACK", 3: "RST"}), + # noqa: E501 BitFieldLenField("tkl", None, 4, length_of='token'), ByteEnumField("code", 0, coap_codes), ShortField("msg_id", 0), diff --git a/scapy/contrib/coap_socket.py b/scapy/contrib/coap_socket.py index 81194e45aba..7ccef561048 100644 --- a/scapy/contrib/coap_socket.py +++ b/scapy/contrib/coap_socket.py @@ -23,7 +23,9 @@ from scapy.error import Scapy_Exception from scapy.packet import Packet -from scapy.contrib.coap import CoAP, coap_options, coap_codes +from scapy.contrib.coap import CoAP, coap_options, coap_codes, EMPTY_MESSAGE, GET, \ + POST, PUT, DELETE, COAP_REQ_CODES, CONTENT_205, NOT_FOUND_404, NOT_ALLOWED_405, \ + CF_TEXT_PLAIN, CF_APP_LINK_FORMAT, PAYMARK, URI_PATH, CONTENT_FORMAT, CON, NON, ACK from scapy.contrib.isotp.isotp_soft_socket import TimeoutScheduler from scapy.data import MTU from scapy.utils import EDecimal @@ -31,47 +33,6 @@ from scapy.supersocket import SuperSocket, SimpleSocket -""" -CoAP message request codes (RFC 7252 @ section-5.8.1) -""" -EMPTY_MESSAGE = 0 -GET = 1 -POST = 2 -PUT = 3 -DELETE = 4 -COAP_REQ_CODES = [GET, POST, PUT, DELETE] -""" -CoAP message response codes (RFC 7252 @ section-12.1.2) -Also, from scapy.contrib.coap.coap_codes -""" -EMPTY_ACK = EMPTY_MESSAGE -CONTENT_205 = 69 -NOT_FOUND_404 = 132 -NOT_ALLOWED_405 = 133 -NOT_IMPLEMENTED_501 = 161 -""" -CoAP content type (RFC 7252 @ section-12.3) -""" -CF_TEXT_PLAIN = b"\x00" -CF_APP_LINK_FORMAT = b"\x28" -CF_APP_XML = b"\x29" -CF_APP_OCTET_STREAM = b"\x2A" -CF_APP_EXI = b"\x2F" -CF_APP_JSON = b"\x32" -""" -CoAP options (RFC 7252 @ section-5.10) -""" -PAYMARK = b"\xff" -URI_PATH = 11 -CONTENT_FORMAT = 12 -""" -CoAP message type -""" -CON = 0 -NON = 1 -ACK = 2 -RST = 3 - log_coap_sock = logging.getLogger("scapy.contrib.coap_socket") @@ -85,7 +46,7 @@ class CoAPSocket(SuperSocket): >>> with CoAPSocket("127.0.0.1", 1234) as coap_client: >>> req = CoAPSocket.make_coap_req_packet( >>> method=GET, uri="endpoint-uri", payload=b"") - >>> coap_client.send("127.0.0.1", 5683, req) + >>> coap_client.send(IP(dst="192.168.1.1") / UDP(dport=1234) / req) >>> # Careful, this will block until the coap_client receives something >>> res = coap_client.recv() @@ -172,12 +133,24 @@ def recv(self, x=MTU, **kwargs): def close(self): # type: () -> None if not self.closed: - self.impl.close() self.closed = True + self.impl.close() - def send(self, ip, port, x): - # type: (str, int, CoAP) -> None - self.impl.send(ip, port, x) + def send(self, x): + # type: (Packet) -> int + """ + Send the packet using this socket. + Should be a CoAP packet with IP and UDP data. + + Example: + >>> IP(dst="192.168.1.1") / UDP(dport=1234) / CoAP() + >>> IP(dst="192.168.1.1") / UDP(dport=1234) / CoAPSocket.make_coap_req_packet() + + :param x: Concatenated packet with IP / UDP / CoAP + :return: The length of x, which is the amount of bytes sent + """ + self.impl.send(x.dst, x.dport, x[CoAP]) + return len(x) @staticmethod def make_coap_req_packet(method=GET, uri="", options=None, payload=b""): @@ -630,7 +603,7 @@ def _on_pkt_recv(self, pkt, sa_ll): else: self._handle_rcv_request(pkt, sa_ll) else: - # Response, check pending requests + # Response, check pending requests and process internally self._handle_request_response(pkt, sa_ll) def _post(self): @@ -805,7 +778,6 @@ def _handle_request_response(self, pkt, sa_ll): index[0], index[1], coap_codes[pkt.code]) del self.pending_requests[index] - # Piggybacked message, give it to the user self.rx_queue.send((pkt.build(), pkt.time)) elif pkt.type == ACK and pkt.code == EMPTY_MESSAGE: log_coap_sock.debug( @@ -824,11 +796,9 @@ def _handle_request_response(self, pkt, sa_ll): response = CoAPSocketImpl.empty_ack_params() response["msg_id"] = pkt.msg_id self._sock_send(sa_ll, CoAP(**response)) - - # Give the packet to the user self.rx_queue.send((pkt.build(), pkt.time)) else: - log_coap_sock.info("Not handled message, giving to user: " + log_coap_sock.info("Not handled message: " "type=%s; code=%s;", pkt.type, coap_codes[pkt.code]) self.rx_queue.send((pkt.build(), pkt.time)) diff --git a/test/contrib/coap_socket.uts b/test/contrib/coap_socket.uts index 91cd20890ac..44aeb34c55e 100644 --- a/test/contrib/coap_socket.uts +++ b/test/contrib/coap_socket.uts @@ -48,7 +48,7 @@ lst_resources = [DummyResource("/dummy"), DelayedResource("delayed")] with CoAPSocket("127.0.0.1", 5683, lst_resources=lst_resources) as coap_server, CoAPSocket("127.0.0.1", 5684) as coap_client: req = CoAPSocket.make_coap_req_packet(uri=".well-known/core", payload=b"") - coap_client.send("127.0.0.1", 5683, req) + coap_client.send(IP(dst="127.0.0.1")/UDP(dport=5683)/req) res = coap_client.recv() assert res.payload.load == b';ct=0,;ct=0' assert res.type == ACK @@ -60,7 +60,7 @@ with CoAPSocket("127.0.0.1", 5683, lst_resources=lst_resources) as coap_server, with CoAPSocket("127.0.0.1", 5683, lst_resources=lst_resources) as coap_server, CoAPSocket("127.0.0.1", 5684) as coap_client: req = CoAPSocket.make_coap_req_packet(uri="dummy", payload=b"") - coap_client.send("127.0.0.1", 5683, req) + coap_client.send(IP(dst="127.0.0.1")/UDP(dport=5683)/req) res = coap_client.recv() assert res.payload.load == responses[0] assert res.type == ACK @@ -72,7 +72,7 @@ with CoAPSocket("127.0.0.1", 5683, lst_resources=lst_resources) as coap_server, with CoAPSocket("127.0.0.1", 5683, lst_resources=lst_resources) as coap_server, CoAPSocket("127.0.0.1", 5684) as coap_client: req = CoAPSocket.make_coap_req_packet(uri="/dummy", payload=b"") - coap_client.send("127.0.0.1", 5683, req) + coap_client.send(IP(dst="127.0.0.1")/UDP(dport=5683)/req) res = coap_client.recv() assert res.payload.load == responses[0] assert res.type == ACK @@ -84,7 +84,7 @@ with CoAPSocket("127.0.0.1", 5683, lst_resources=lst_resources) as coap_server, with CoAPSocket("127.0.0.1", 5683, lst_resources=lst_resources) as coap_server, CoAPSocket("127.0.0.1", 5684) as coap_client: req = CoAPSocket.make_coap_req_packet(uri="dummy/", payload=b"") - coap_client.send("127.0.0.1", 5683, req) + coap_client.send(IP(dst="127.0.0.1")/UDP(dport=5683)/req) res = coap_client.recv() assert res.type == ACK assert res.code == NOT_FOUND_404 @@ -95,7 +95,7 @@ with CoAPSocket("127.0.0.1", 5683, lst_resources=lst_resources) as coap_server, with CoAPSocket("127.0.0.1", 5683, lst_resources=lst_resources) as coap_server, CoAPSocket("127.0.0.1", 5684) as coap_client: req = CoAPSocket.make_coap_req_packet(method=PUT, uri="dummy", payload=b"a payload") - coap_client.send("127.0.0.1", 5683, req) + coap_client.send(IP(dst="127.0.0.1")/UDP(dport=5683)/req) res = coap_client.recv() assert res.type == ACK assert res.code == NOT_ALLOWED_405 @@ -107,7 +107,7 @@ with CoAPSocket("127.0.0.1", 5683, lst_resources=lst_resources) as coap_server, with CoAPSocket("127.0.0.1", 5683, lst_resources=lst_resources) as coap_server, CoAPSocket("127.0.0.1", 5684) as coap_client: coap_server.impl._enable_debug = True req = CoAPSocket.make_coap_req_packet(uri="/dummy", payload=b"") - coap_client.send("127.0.0.1", 5683, req) + coap_client.send(IP(dst="127.0.0.1")/UDP(dport=5683)/req) res = coap_client.recv() assert res.payload.load == responses[0] assert res.type == ACK @@ -119,7 +119,7 @@ with CoAPSocket("127.0.0.1", 5683, lst_resources=lst_resources) as coap_server, with CoAPSocket("127.0.0.1", 5683, lst_resources=lst_resources) as coap_server, CoAPSocket("127.0.0.1", 5684) as coap_client: req = CoAPSocket.make_coap_req_packet(uri="/delayed", payload=b"") - coap_client.send("127.0.0.1", 5683, req) + coap_client.send(IP(dst="127.0.0.1")/UDP(dport=5683)/req) res = coap_client.recv() assert res.payload.load == responses[1] assert res.type == CON From 1703df8504a6b0bbe76e86351a98405b86d7fc26 Mon Sep 17 00:00:00 2001 From: eHonnef Date: Thu, 16 May 2024 15:31:31 +0200 Subject: [PATCH 3/4] Unit tests for SR and SR1, added hash functions for the CoAP packet. --- scapy/contrib/coap.py | 6 +++ scapy/contrib/coap_socket.py | 96 +++++++++++++++++++++++------------- test/contrib/coap_socket.uts | 23 +++++++++ 3 files changed, 92 insertions(+), 33 deletions(-) diff --git a/scapy/contrib/coap.py b/scapy/contrib/coap.py index 661a4b4e3c3..909f8d22bd5 100644 --- a/scapy/contrib/coap.py +++ b/scapy/contrib/coap.py @@ -278,6 +278,12 @@ def post_dissect(self, pay): self.content_format = k[1] return pay + def hashret(self): + return struct.pack('I', self.msg_id) + self.token + + def answers(self, other): + return True + bind_layers(UDP, CoAP, sport=5683) bind_layers(UDP, CoAP, dport=5683) diff --git a/scapy/contrib/coap_socket.py b/scapy/contrib/coap_socket.py index 7ccef561048..f5419918cd4 100644 --- a/scapy/contrib/coap_socket.py +++ b/scapy/contrib/coap_socket.py @@ -30,7 +30,7 @@ from scapy.data import MTU from scapy.utils import EDecimal from scapy.automaton import ObjectPipe, select_objects - +from scapy.layers.inet import UDP, IP from scapy.supersocket import SuperSocket, SimpleSocket log_coap_sock = logging.getLogger("scapy.contrib.coap_socket") @@ -123,8 +123,8 @@ def recv_raw(self, x=0xffff): if not self.closed: tup = self.impl.recv() if tup is not None: - return self.basecls, tup[0], float(tup[1]) - return self.basecls, None, None + return IP, tup[0], float(tup[1]) + return IP, None, None def recv(self, x=MTU, **kwargs): # type: (int, **Any) -> Optional[Packet] @@ -152,6 +152,29 @@ def send(self, x): self.impl.send(x.dst, x.dport, x[CoAP]) return len(x) + def sr(self, *args, **kargs): + args[0].sport = self.impl.port + return super(CoAPSocket, self).sr(*args, **kargs) + + def sr1(self, *args, **kargs): + args[0].sport = self.impl.port + return super(CoAPSocket, self).sr1(*args, **kargs) + + @staticmethod + def select(sockets, remain=None): + # type: (list[SuperSocket], Optional[float]) -> list[SuperSocket] + """ + This function is called during sendrecv() routine to wait for + sockets to be ready to receive. + """ + obj_pipes = [x.impl.rx_queue for x in sockets if + isinstance(x, CoAPSocket) and not x.closed] + + ready_pipes = select_objects(obj_pipes, 0) + + return [x for x in sockets if isinstance(x, CoAPSocket) and + not x.closed and x.impl.rx_queue in ready_pipes] + @staticmethod def make_coap_req_packet(method=GET, uri="", options=None, payload=b""): # type: (int, str, list[tuple], bytes) -> Packet @@ -572,7 +595,9 @@ def _recv(self): if self.sock.select([self.sock], 0): pkt, sa_ll = self.sock.ins.recvfrom(MTU) - pkt = CoAP(bytes(pkt)) + pkt = (IP(src=sa_ll[0], dst=self.ip) / + UDP(sport=sa_ll[1], dport=self.port) / + CoAP(bytes(pkt))) if pkt: if not self._debug_drop_package(): self._on_pkt_recv(pkt, sa_ll) @@ -629,15 +654,16 @@ def _delete(self, resource): def _handle_rcv_request(self, pkt, sa_ll): # type: (CoAP, tuple[str, int]) -> None """Process a received request""" + coap_pkt = pkt[CoAP] req_uri = "/" - token = int.from_bytes(pkt.token, "big") # Can be up to 8 bytes - message_id = pkt.msg_id + token = int.from_bytes(coap_pkt.token, "big") # Can be up to 8 bytes + message_id = coap_pkt.msg_id lst_options = [] response = {"type": ACK, "code": NOT_FOUND_404, "options": [(CONTENT_FORMAT, CF_TEXT_PLAIN)], "payload": coap_codes[NOT_FOUND_404].encode("utf8")} - for option in pkt.options: + for option in coap_pkt.options: option_type_id = coap_options[1].get(option[0], -1) option_value = option[1] @@ -658,14 +684,14 @@ def _handle_rcv_request(self, pkt, sa_ll): resource = self.resources.get(req_uri, None) if resource is not None: if not resource.check_duplicated(message_id, token): - if pkt.code == GET: - response = resource.get(pkt.payload, lst_options, token, sa_ll) - elif pkt.code == POST: + if coap_pkt.code == GET: + response = resource.get(coap_pkt.payload, lst_options, token, sa_ll) + elif coap_pkt.code == POST: # @todo: handle existing resource POST: RFC 7252 @ section-5.8.2 pass - elif pkt.code == PUT: - response = resource.put(pkt.payload, lst_options, token, sa_ll) - elif pkt.code == DELETE: + elif coap_pkt.code == PUT: + response = resource.put(coap_pkt.payload, lst_options, token, sa_ll) + elif coap_pkt.code == DELETE: response = self._delete(resource) resource._register_request_response(message_id, token, response) @@ -677,16 +703,16 @@ def _handle_rcv_request(self, pkt, sa_ll): req_uri, message_id, token) else: - if pkt.code == POST: + if coap_pkt.code == POST: response = self._post() else: log_coap_sock.warning("Unknown resource: URI=%s", req_uri) - response["tkl"] = pkt.tkl - response["token"] = pkt.token + response["tkl"] = coap_pkt.tkl + response["token"] = coap_pkt.token response["msg_id"] = message_id - if pkt.type == NON: + if coap_pkt.type == NON: response["type"] = NON # Add paymark (separator between options and payload) @@ -751,13 +777,15 @@ def _handle_request_response(self, pkt, sa_ll): Handles a received response. Will check if there is the valid request. Otherwise, it will put in the rx_queue for the user to handle it via the recv() function. - :param pkt: The CoAP packet to be processed + :param coap_pkt: The CoAP packet to be processed :param sa_ll: The ip/port tuple of the sender """ - token = int.from_bytes(pkt.token, "big") - index = (pkt.msg_id, token) + coap_pkt = pkt[CoAP] + token = int.from_bytes(coap_pkt.token, "big") + index = (coap_pkt.msg_id, token) request = self.pending_requests.get(index, None) - if request is None and (pkt.type == ACK or pkt.type == CON or pkt.type == NON): + if (request is None and + (coap_pkt.type == ACK or coap_pkt.type == CON or coap_pkt.type == NON)): for key in self.pending_requests.keys(): if index[0] == key[0] or index[1] == key[1]: log_coap_sock.info("Found request by using %s", @@ -770,38 +798,40 @@ def _handle_request_response(self, pkt, sa_ll): if request is None: log_coap_sock.warning( "Request for received response not found: msg_id=%s; token=0x%x", - pkt.msg_id, token) + coap_pkt.msg_id, token) return - if pkt.type == ACK and pkt.code != EMPTY_MESSAGE: + if coap_pkt.type == ACK and coap_pkt.code != EMPTY_MESSAGE: log_coap_sock.debug("Request fulfilled: msg_id=%s; token=0x%x; code=%s", index[0], index[1], - coap_codes[pkt.code]) + coap_codes[coap_pkt.code]) + pkt.sport = self.pending_requests[index].port del self.pending_requests[index] - self.rx_queue.send((pkt.build(), pkt.time)) - elif pkt.type == ACK and pkt.code == EMPTY_MESSAGE: + self.rx_queue.send((pkt.build(), coap_pkt.time)) + elif coap_pkt.type == ACK and coap_pkt.code == EMPTY_MESSAGE: log_coap_sock.debug( "Server sent an empty ack, request will be fulfilled later: " "msg_id=%s; token=0x%x; code=%s", - index[0], index[1], coap_codes[pkt.code]) + index[0], index[1], coap_codes[coap_pkt.code]) request.empty_ack_set() - elif pkt.type == CON and pkt.code == CONTENT_205: + elif coap_pkt.type == CON and coap_pkt.code == CONTENT_205: log_coap_sock.debug( "Received a delayed content for a previous request: msg_id=%s; " "token=0x%x; code=%s", - index[0], index[1], coap_codes[pkt.code]) + index[0], index[1], coap_codes[coap_pkt.code]) # We need to respond with an empty ACK request.empty_ack_fulfilled = True response = CoAPSocketImpl.empty_ack_params() - response["msg_id"] = pkt.msg_id + response["msg_id"] = coap_pkt.msg_id self._sock_send(sa_ll, CoAP(**response)) - self.rx_queue.send((pkt.build(), pkt.time)) + pkt.sport = request.port + self.rx_queue.send((pkt.build(), coap_pkt.time)) else: log_coap_sock.info("Not handled message: " "type=%s; code=%s;", - pkt.type, coap_codes[pkt.code]) - self.rx_queue.send((pkt.build(), pkt.time)) + coap_pkt.type, coap_codes[coap_pkt.code]) + self.rx_queue.send((pkt.build(), coap_pkt.time)) def _sock_send(self, address, pl): # type: (tuple[str, int], Packet) -> None diff --git a/test/contrib/coap_socket.uts b/test/contrib/coap_socket.uts index 44aeb34c55e..43f72a41e22 100644 --- a/test/contrib/coap_socket.uts +++ b/test/contrib/coap_socket.uts @@ -127,3 +127,26 @@ with CoAPSocket("127.0.0.1", 5683, lst_resources=lst_resources) as coap_server, # assert res.msg_id == req.msg_id - This assert doesn't make sense because it will send with another msg_id assert res.token == req.token assert res.payload.load == responses[1] + += SR1 + +with CoAPSocket("127.0.0.1", 5683, lst_resources=lst_resources) as coap_server, CoAPSocket("127.0.0.1", 5684) as coap_client: + req = CoAPSocket.make_coap_req_packet(uri="/dummy", payload=b"") + res = coap_client.sr1(IP(dst="127.0.0.1")/UDP(dport=5683)/req) + assert res.payload.load == responses[0] + assert res.type == ACK + assert res.code == CONTENT_205 + assert res.msg_id == req.msg_id + assert res.token == req.token + += SR + +with CoAPSocket("127.0.0.1", 5683, lst_resources=lst_resources) as coap_server, CoAPSocket("127.0.0.1", 5684) as coap_client: + pkt = CoAPSocket.make_coap_req_packet(uri="/dummy", payload=b"") + ans, _ = coap_client.sr(IP(dst="127.0.0.1")/UDP(dport=5683)/pkt) + for _, rcv in ans: + assert rcv.payload.load == responses[0] + assert rcv.type == ACK + assert rcv.code == CONTENT_205 + assert rcv.msg_id == pkt.msg_id + assert rcv.token == pkt.token From 73ce0b0d07b74eb44045641f949dcff0bfc2055a Mon Sep 17 00:00:00 2001 From: eHonnef Date: Wed, 17 Jul 2024 15:34:25 +0200 Subject: [PATCH 4/4] Code review: - Implemented coap.answers function - Fixed some types - Remove unnecessary override - Changed sr and sr1 functions signatures. --- scapy/contrib/coap.py | 13 +++++++++++-- scapy/contrib/coap_socket.py | 32 ++++++++++++++------------------ 2 files changed, 25 insertions(+), 20 deletions(-) diff --git a/scapy/contrib/coap.py b/scapy/contrib/coap.py index 909f8d22bd5..899e8fef054 100644 --- a/scapy/contrib/coap.py +++ b/scapy/contrib/coap.py @@ -87,7 +87,7 @@ 162: "5.02 Bad Gateway", 163: "5.03 Service Unavailable", 164: "5.04 Gateway Timeout", - 165: "Proxying Not Supported"} + 165: "5.05 Proxying Not Supported"} coap_options = ({ 1: "If-Match", @@ -282,7 +282,16 @@ def hashret(self): return struct.pack('I', self.msg_id) + self.token def answers(self, other): - return True + # type: (Packet) -> int + """ + DEV: true if self is an answer from other + Any response that is inside coap_codes that is not a request is valid. + i.e.: do not answer a request with a request. + """ + if self.code not in COAP_REQ_CODES: + if self.code in coap_codes.keys(): + return 1 + return 0 bind_layers(UDP, CoAP, sport=5683) diff --git a/scapy/contrib/coap_socket.py b/scapy/contrib/coap_socket.py index f5419918cd4..61f0b818a56 100644 --- a/scapy/contrib/coap_socket.py +++ b/scapy/contrib/coap_socket.py @@ -16,7 +16,6 @@ Optional, Union, Tuple, - Any, cast, Type ) @@ -106,8 +105,8 @@ def __init__(self, ack_timeout=500, # type: int retries=3, # type: int duplication_response_timeout=1.00, # type: float - lst_resources=None, # type: Optional[None, list[CoAPResource]] - sock=None, # type: Optional[None, SuperSocket, any] + lst_resources=None, # type: Optional[list[CoAPResource]] + sock=None, # type: Optional[SuperSocket] close_on_timeout=False # type: bool ): self.impl = CoAPSocketImpl(ip, port, ack_timeout, retries, @@ -126,10 +125,6 @@ def recv_raw(self, x=0xffff): return IP, tup[0], float(tup[1]) return IP, None, None - def recv(self, x=MTU, **kwargs): - # type: (int, **Any) -> Optional[Packet] - return super(CoAPSocket, self).recv(x, **kwargs) - def close(self): # type: () -> None if not self.closed: @@ -149,16 +144,15 @@ def send(self, x): :param x: Concatenated packet with IP / UDP / CoAP :return: The length of x, which is the amount of bytes sent """ - self.impl.send(x.dst, x.dport, x[CoAP]) - return len(x) + return self.impl.send(x) - def sr(self, *args, **kargs): - args[0].sport = self.impl.port - return super(CoAPSocket, self).sr(*args, **kargs) + def sr(self, pkt, *args, **kargs): + pkt[UDP].sport = self.impl.port + return super(CoAPSocket, self).sr(pkt, *args, **kargs) - def sr1(self, *args, **kargs): - args[0].sport = self.impl.port - return super(CoAPSocket, self).sr1(*args, **kargs) + def sr1(self, pkt, *args, **kargs): + pkt[UDP].sport = self.impl.port + return super(CoAPSocket, self).sr1(pkt, *args, **kargs) @staticmethod def select(sockets, remain=None): @@ -474,10 +468,12 @@ def recv(self, timeout=None): # type: (Optional[int]) -> Optional[Tuple[bytes, Union[float, EDecimal]]] return self.rx_queue.recv(timeout) - def send(self, ip, port, x): - # type: (str, int, CoAP) -> None + def send(self, x): + # type: (CoAP) -> int self.tx_queue.send( - CoAPSocketImpl.CoAPRequest(ip, port, self.retries, self.ack_timeout, x)) + CoAPSocketImpl.CoAPRequest(x.dst, x.dport, self.retries, self.ack_timeout, + x[CoAP])) + return len(x) def close(self): # type: () -> None