diff --git a/sx127x.py b/sx127x.py index 0fa6c17..40858f9 100644 --- a/sx127x.py +++ b/sx127x.py @@ -12,29 +12,29 @@ REG_FRF_MID = 0x07 REG_FRF_LSB = 0x08 REG_PA_CONFIG = 0x09 -REG_LNA = 0x0c -REG_FIFO_ADDR_PTR = 0x0d +REG_LNA = 0x0C +REG_FIFO_ADDR_PTR = 0x0D -REG_FIFO_TX_BASE_ADDR = 0x0e +REG_FIFO_TX_BASE_ADDR = 0x0E FifoTxBaseAddr = 0x00 # FifoTxBaseAddr = 0x80 -REG_FIFO_RX_BASE_ADDR = 0x0f +REG_FIFO_RX_BASE_ADDR = 0x0F FifoRxBaseAddr = 0x00 REG_FIFO_RX_CURRENT_ADDR = 0x10 REG_IRQ_FLAGS_MASK = 0x11 REG_IRQ_FLAGS = 0x12 REG_RX_NB_BYTES = 0x13 -REG_PKT_RSSI_VALUE = 0x1a -REG_PKT_SNR_VALUE = 0x1b -REG_MODEM_CONFIG_1 = 0x1d -REG_MODEM_CONFIG_2 = 0x1e +REG_PKT_RSSI_VALUE = 0x1A +REG_PKT_SNR_VALUE = 0x1B +REG_MODEM_CONFIG_1 = 0x1D +REG_MODEM_CONFIG_2 = 0x1E REG_PREAMBLE_MSB = 0x20 REG_PREAMBLE_LSB = 0x21 REG_PAYLOAD_LENGTH = 0x22 REG_FIFO_RX_BYTE_ADDR = 0x25 REG_MODEM_CONFIG_3 = 0x26 -REG_RSSI_WIDEBAND = 0x2c +REG_RSSI_WIDEBAND = 0x2C REG_DETECTION_OPTIMIZE = 0x31 REG_DETECTION_THRESHOLD = 0x37 REG_SYNC_WORD = 0x39 @@ -76,26 +76,22 @@ __DEBUG__ = True -class SX127x: +class SX127x: default_parameters = { - 'frequency': 868E6, - 'tx_power_level': 2, - 'signal_bandwidth': 125E3, - 'spreading_factor': 8, - 'coding_rate': 5, - 'preamble_length': 8, - 'implicit_header': False, - 'sync_word': 0x12, - 'enable_CRC': False, - 'invert_IQ': False, - } - - def __init__(self, - spi, - pins, - parameters=default_parameters): - + "frequency": 868e6, + "tx_power_level": 2, + "signal_bandwidth": 125e3, + "spreading_factor": 8, + "coding_rate": 5, + "preamble_length": 8, + "implicit_header": False, + "sync_word": 0x12, + "enable_CRC": False, + "invert_IQ": False, + } + + def __init__(self, spi, pins, parameters=default_parameters): self._spi = spi self._pins = pins self._parameters = parameters @@ -118,7 +114,7 @@ def __init__(self, if version != 0: init_try = False if version != 0x12: - raise Exception('Invalid version.') + raise Exception("Invalid version.") if __DEBUG__: print("SX version: {}".format(version)) @@ -127,8 +123,8 @@ def __init__(self, self.sleep() # config - self.set_frequency(self._parameters['frequency']) - self.set_signal_bandwidth(self._parameters['signal_bandwidth']) + self.set_frequency(self._parameters["frequency"]) + self.set_signal_bandwidth(self._parameters["signal_bandwidth"]) # set LNA boost self.write_register(REG_LNA, self.read_register(REG_LNA) | 0x03) @@ -136,14 +132,14 @@ def __init__(self, # set auto AGC self.write_register(REG_MODEM_CONFIG_3, 0x04) - self.set_tx_power(self._parameters['tx_power_level']) + self.set_tx_power(self._parameters["tx_power_level"]) self._implicit_header_mode = None - self.implicit_header_mode(self._parameters['implicit_header']) - self.set_spreading_factor(self._parameters['spreading_factor']) - self.set_coding_rate(self._parameters['coding_rate']) - self.set_preamble_length(self._parameters['preamble_length']) - self.set_sync_word(self._parameters['sync_word']) - self.enable_CRC(self._parameters['enable_CRC']) + self.implicit_header_mode(self._parameters["implicit_header"]) + self.set_spreading_factor(self._parameters["spreading_factor"]) + self.set_coding_rate(self._parameters["coding_rate"]) + self.set_preamble_length(self._parameters["preamble_length"]) + self.set_sync_word(self._parameters["sync_word"]) + self.enable_CRC(self._parameters["enable_CRC"]) self.invert_IQ(self._parameters["invert_IQ"]) # set LowDataRateOptimize flag if symbol time > 16ms (default disable on reset) @@ -153,8 +149,7 @@ def __init__(self, if 1000 / (bw_parameter / 2**sf_parameter) > 16: self.write_register( - REG_MODEM_CONFIG_3, - self.read_register(REG_MODEM_CONFIG_3) | 0x08 + REG_MODEM_CONFIG_3, self.read_register(REG_MODEM_CONFIG_3) | 0x08 ) # set base addresses @@ -163,7 +158,7 @@ def __init__(self, self.standby() - def begin_packet(self, implicit_header_mode = False): + def begin_packet(self, implicit_header_mode=False): self.standby() self.implicit_header_mode(implicit_header_mode) @@ -199,24 +194,25 @@ def write(self, buffer): self.write_register(REG_PAYLOAD_LENGTH, currentLength + size) return size - def set_lock(self, lock = False): + def set_lock(self, lock=False): self._lock = lock - def println(self, msg, implicit_header = False): + def send_bytes(self, buff, implicit_header=False): self.set_lock(True) # wait until RX_Done, lock and begin writing. - self.begin_packet(implicit_header) - - if isinstance(msg, str): - message = msg.encode() - - self.write(message) - + self.write(buff) self.end_packet() - - self.set_lock(False) # unlock when done writing + self.set_lock(False) # unlock when done writing self.collect_garbage() + def println(self, msg, implicit_header=False): + if isinstance(msg, str): + self.send_bytes(msg.encode(), implicit_header) + elif isinstance(msg, bytes): + self.send_bytes(msg, implicit_header) + else: + self.send_bytes(bytes(msg), implicit_header) + def get_irq_flags(self): irq_flags = self.read_register(REG_IRQ_FLAGS) self.write_register(REG_IRQ_FLAGS, irq_flags) @@ -224,7 +220,7 @@ def get_irq_flags(self): def packet_rssi(self): rssi = self.read_register(REG_PKT_RSSI_VALUE) - return (rssi - (164 if self._frequency < 868E6 else 157)) + return rssi - (164 if self._frequency < 868e6 else 157) def packet_snr(self): snr = self.read_register(REG_PKT_SNR_VALUE) @@ -236,10 +232,10 @@ def standby(self): def sleep(self): self.write_register(REG_OP_MODE, MODE_LONG_RANGE_MODE | MODE_SLEEP) - def set_tx_power(self, level, outputPin = PA_OUTPUT_PA_BOOST_PIN): + def set_tx_power(self, level, outputPin=PA_OUTPUT_PA_BOOST_PIN): self._tx_power_level = level - if (outputPin == PA_OUTPUT_RFO_PIN): + if outputPin == PA_OUTPUT_RFO_PIN: # RFO level = min(max(level, 0), 14) self.write_register(REG_PA_CONFIG, 0x70 | level) @@ -260,15 +256,15 @@ def set_frequency(self, frequency): def set_spreading_factor(self, sf): sf = min(max(sf, 6), 12) - self.write_register(REG_DETECTION_OPTIMIZE, 0xc5 if sf == 6 else 0xc3) - self.write_register(REG_DETECTION_THRESHOLD, 0x0c if sf == 6 else 0x0a) + self.write_register(REG_DETECTION_OPTIMIZE, 0xC5 if sf == 6 else 0xC3) + self.write_register(REG_DETECTION_THRESHOLD, 0x0C if sf == 6 else 0x0A) self.write_register( - REG_MODEM_CONFIG_2, - (self.read_register(REG_MODEM_CONFIG_2) & 0x0f) | ((sf << 4) & 0xf0) + REG_MODEM_CONFIG_2, + (self.read_register(REG_MODEM_CONFIG_2) & 0x0F) | ((sf << 4) & 0xF0), ) def set_signal_bandwidth(self, sbw): - bins = (7.8E3, 10.4E3, 15.6E3, 20.8E3, 31.25E3, 41.7E3, 62.5E3, 125E3, 250E3) + bins = (7.8e3, 10.4e3, 15.6e3, 20.8e3, 31.25e3, 41.7e3, 62.5e3, 125e3, 250e3) bw = 9 @@ -281,25 +277,25 @@ def set_signal_bandwidth(self, sbw): break self.write_register( - REG_MODEM_CONFIG_1, - (self.read_register(REG_MODEM_CONFIG_1) & 0x0f) | (bw << 4) + REG_MODEM_CONFIG_1, + (self.read_register(REG_MODEM_CONFIG_1) & 0x0F) | (bw << 4), ) def set_coding_rate(self, denominator): denominator = min(max(denominator, 5), 8) cr = denominator - 4 self.write_register( - REG_MODEM_CONFIG_1, - (self.read_register(REG_MODEM_CONFIG_1) & 0xf1) | (cr << 1) + REG_MODEM_CONFIG_1, + (self.read_register(REG_MODEM_CONFIG_1) & 0xF1) | (cr << 1), ) def set_preamble_length(self, length): - self.write_register(REG_PREAMBLE_MSB, (length >> 8) & 0xff) - self.write_register(REG_PREAMBLE_LSB, (length >> 0) & 0xff) + self.write_register(REG_PREAMBLE_MSB, (length >> 8) & 0xFF) + self.write_register(REG_PREAMBLE_LSB, (length >> 0) & 0xFF) - def enable_CRC(self, enable_CRC = False): + def enable_CRC(self, enable_CRC=False): modem_config_2 = self.read_register(REG_MODEM_CONFIG_2) - config = modem_config_2 | 0x04 if enable_CRC else modem_config_2 & 0xfb + config = modem_config_2 | 0x04 if enable_CRC else modem_config_2 & 0xFB self.write_register(REG_MODEM_CONFIG_2, config) def invert_IQ(self, invert_IQ): @@ -357,24 +353,25 @@ def dump_registers(self): else: print(" | ", end="") - def implicit_header_mode(self, implicit_header_mode = False): - if self._implicit_header_mode != implicit_header_mode: # set value only if different. + def implicit_header_mode(self, implicit_header_mode=False): + if ( + self._implicit_header_mode != implicit_header_mode + ): # set value only if different. self._implicit_header_mode = implicit_header_mode modem_config_1 = self.read_register(REG_MODEM_CONFIG_1) - config = (modem_config_1 | 0x01 - if implicit_header_mode else modem_config_1 & 0xfe) + config = ( + modem_config_1 | 0x01 if implicit_header_mode else modem_config_1 & 0xFE + ) self.write_register(REG_MODEM_CONFIG_1, config) - def receive(self, size = 0): + def receive(self, size=0): self.implicit_header_mode(size > 0) - if size > 0: - self.write_register(REG_PAYLOAD_LENGTH, size & 0xff) + if size > 0: + self.write_register(REG_PAYLOAD_LENGTH, size & 0xFF) # The last packet always starts at FIFO_RX_CURRENT_ADDR # no need to reset FIFO_ADDR_PTR - self.write_register( - REG_OP_MODE, MODE_LONG_RANGE_MODE | MODE_RX_CONTINUOUS - ) + self.write_register(REG_OP_MODE, MODE_LONG_RANGE_MODE | MODE_RX_CONTINUOUS) def on_receive(self, callback): self._on_receive = callback @@ -383,72 +380,63 @@ def on_receive(self, callback): if callback: self.write_register(REG_DIO_MAPPING_1, 0x00) self._pin_rx_done.irq( - trigger=Pin.IRQ_RISING, handler = self.handle_on_receive + trigger=Pin.IRQ_RISING, handler=self.handle_on_receive ) else: self._pin_rx_done.detach_irq() def handle_on_receive(self, event_source): - self.set_lock(True) # lock until TX_Done + self.set_lock(True) # lock until TX_Done irq_flags = self.get_irq_flags() - if (irq_flags == IRQ_RX_DONE_MASK): # RX_DONE only, irq_flags should be 0x40 + if irq_flags == IRQ_RX_DONE_MASK: # RX_DONE only, irq_flags should be 0x40 # automatically standby when RX_DONE if self._on_receive: payload = self.read_payload() self._on_receive(self, payload) - elif self.read_register(REG_OP_MODE) != ( - MODE_LONG_RANGE_MODE | MODE_RX_SINGLE - ): + elif self.read_register(REG_OP_MODE) != (MODE_LONG_RANGE_MODE | MODE_RX_SINGLE): # no packet received. # reset FIFO address / # enter single RX mode self.write_register(REG_FIFO_ADDR_PTR, FifoRxBaseAddr) - self.write_register( - REG_OP_MODE, - MODE_LONG_RANGE_MODE | MODE_RX_SINGLE - ) + self.write_register(REG_OP_MODE, MODE_LONG_RANGE_MODE | MODE_RX_SINGLE) - self.set_lock(False) # unlock in any case. + self.set_lock(False) # unlock in any case. self.collect_garbage() return True - def received_packet(self, size = 0): + def received_packet(self, size=0): irq_flags = self.get_irq_flags() self.implicit_header_mode(size > 0) - if size > 0: - self.write_register(REG_PAYLOAD_LENGTH, size & 0xff) + if size > 0: + self.write_register(REG_PAYLOAD_LENGTH, size & 0xFF) # if (irq_flags & IRQ_RX_DONE_MASK) and \ - # (irq_flags & IRQ_RX_TIME_OUT_MASK == 0) and \ - # (irq_flags & IRQ_PAYLOAD_CRC_ERROR_MASK == 0): + # (irq_flags & IRQ_RX_TIME_OUT_MASK == 0) and \ + # (irq_flags & IRQ_PAYLOAD_CRC_ERROR_MASK == 0): - if (irq_flags == IRQ_RX_DONE_MASK): + if irq_flags == IRQ_RX_DONE_MASK: # RX_DONE only, irq_flags should be 0x40 # automatically standby when RX_DONE return True - + elif self.read_register(REG_OP_MODE) != (MODE_LONG_RANGE_MODE | MODE_RX_SINGLE): # no packet received. # reset FIFO address / # enter single RX mode self.write_register(REG_FIFO_ADDR_PTR, FifoRxBaseAddr) - self.write_register( - REG_OP_MODE, - MODE_LONG_RANGE_MODE | MODE_RX_SINGLE - ) + self.write_register(REG_OP_MODE, MODE_LONG_RANGE_MODE | MODE_RX_SINGLE) def read_payload(self): # set FIFO address to current RX address # fifo_rx_current_addr = self.read_register(REG_FIFO_RX_CURRENT_ADDR) self.write_register( - REG_FIFO_ADDR_PTR, - self.read_register(REG_FIFO_RX_CURRENT_ADDR) + REG_FIFO_ADDR_PTR, self.read_register(REG_FIFO_RX_CURRENT_ADDR) ) # read packet length if self._implicit_header_mode: - packet_length = self.read_register(REG_PAYLOAD_LENGTH) + packet_length = self.read_register(REG_PAYLOAD_LENGTH) else: packet_length = self.read_register(REG_RX_NB_BYTES) @@ -459,15 +447,14 @@ def read_payload(self): self.collect_garbage() return bytes(payload) - def read_register(self, address, byteorder = 'big', signed = False): - response = self.transfer(address & 0x7f) + def read_register(self, address, byteorder="big", signed=False): + response = self.transfer(address & 0x7F) return int.from_bytes(response, byteorder) def write_register(self, address, value): self.transfer(address | 0x80, value) - - def transfer(self, address, value = 0x00): + def transfer(self, address, value=0x00): response = bytearray(1) self._pin_ss.value(0) @@ -479,7 +466,7 @@ def transfer(self, address, value = 0x00): return response - def blink_led(self, times = 1, on_seconds = 0.1, off_seconds = 0.1): + def blink_led(self, times=1, on_seconds=0.1, off_seconds=0.1): for i in range(times): if self._led_status: self._led_status.value(True) @@ -490,4 +477,8 @@ def blink_led(self, times = 1, on_seconds = 0.1, off_seconds = 0.1): def collect_garbage(self): gc.collect() if __DEBUG__: - print('[Memory - free: {} allocated: {}]'.format(gc.mem_free(), gc.mem_alloc())) + print( + "[Memory - free: {} allocated: {}]".format( + gc.mem_free(), gc.mem_alloc() + ) + )