From 631b93fcc4c941fc9fb5b30611778e23dd337a25 Mon Sep 17 00:00:00 2001 From: "Michael Hirsch, Ph.D" Date: Fri, 30 Aug 2019 09:02:31 -0400 Subject: [PATCH] setup format --- .flake8 | 3 + README.md | 54 ++++++ README.rst | 53 ------ mypy.ini | 4 + pyproject.toml | 5 + sim_amfmssb.py | 492 +++++++++++++++++++++++++------------------------ sinad.py | 45 ++--- 7 files changed, 341 insertions(+), 315 deletions(-) create mode 100644 .flake8 create mode 100644 README.md delete mode 100644 README.rst create mode 100644 mypy.ini create mode 100644 pyproject.toml diff --git a/.flake8 b/.flake8 new file mode 100644 index 0000000..aab5c9e --- /dev/null +++ b/.flake8 @@ -0,0 +1,3 @@ +[flake8] +max-line-length = 132 +exclude = .git,__pycache__,.eggs/,doc/,docs/,build/,dist/,archive/ diff --git a/README.md b/README.md new file mode 100644 index 0000000..98ed29a --- /dev/null +++ b/README.md @@ -0,0 +1,54 @@ +# GNU Radio amateur radio examples + + +## Red Pitaya + +[RedPitaya]{.title-ref} contains code for [Red +Pitaya](https://www.scivision.dev/red-pitaya-gnuradio-setup) +[PiRadar](https://www.scivision.dev/pi-radar). + +## SSB Examples + +First we start with the most basic example of SSB. This example lacks +the filtering normally used with these methods to be as minimal as +possible. They work via files recorded to disk using your computer\'s +microphone. + +There are numerous references for SSB transmitters and receivers, such +as the ARRL Handbook, or the [Philips AN1981 application +note](http://www.nxp.com/documents/application_note/an1981.pdf). In +AN1981, Fig. 8 shows the Weaver method receiver (note I only use the +part to the right of the filters, since I didn\'t use filtering). In +AN1981, Fig. 5 shows the Phasing method transmitter. In both cases, +ignore the divide-by-4 of AN1981 as that\'s peculiar to their system. + +### Simplest synthethic SSB simulation + +This simulation doesn\'t use any filters, and so would never work in the +real world unless there were no other transmissions within a few octaves +of your sampling bandwidth. You can see in the FFT scope on transmit +that it is indeed USB transmission. + +The first example uses the phasing method to transmit on a synthetic 50 +kHz carrier frequency Fc, upper sideband only. Next, you receive this +Fc=50 kHz USB transmission using a Weaver method SSB receiver. + +The noise floor on transmit extends from Fc to Fc+24 kHz as that\'s 1/2 +the bandwidth of your 48kHz sound card. You can transmit some +hi-fidelity SSB audio of DC to 24kHz, limited by your PC microphone and +speakers. You may hear some background crackling and aliasing artifacts +due to no digital filtering used, but overall the audio should be +relatively clear. + +You can see the reference material to add the appropriate filters to +these examples as an exercise. + +1. Open and Run ssbTXhilbert.grc in GNU Radio Companion. This records + your voice via your computer microphone to a file \$HOME/test.dat. + This file is complex float data. Hit Stop to stop recording. +2. Open and Run ssbRX.grc in GNU Radio Companion. This loops back your + voice through your computer speakers. + +![simplest no-filter SSB receiver](gfx/ssbRX.grc.png) + +![simplest no-filter SSB transmitter](gfx/ssbTXhilbert.grc.png) diff --git a/README.rst b/README.rst deleted file mode 100644 index 1e42b39..0000000 --- a/README.rst +++ /dev/null @@ -1,53 +0,0 @@ -================================ -GNU Radio amateur radio examples -================================ - -Hi! Let me know if there's anything else/more you'd like to see for amateur radio with the Red Pitaya / Raspberry Pi using GNU Radio. - -.. contents:: - -Red Pitaya -========== -`RedPitaya` contains code for `Red Pitaya `_ `PiRadar `_. - -SSB Examples -============ - - -First we start with the most basic example of SSB. -This example lacks the filtering normally used with these methods to be as minimal as possible. -They work via files recorded to disk using your computer's microphone. - - -There are numerous references for SSB transmitters and receivers, such as the ARRL Handbook, or the `Philips AN1981 application note `_. -In AN1981, Fig. 8 shows the Weaver method receiver (note I only use the part to the right of the filters, since I didn't use filtering). -In AN1981, Fig. 5 shows the Phasing method transmitter. -In both cases, ignore the divide-by-4 of AN1981 as that's peculiar to their system. - -Simplest synthethic SSB simulation ----------------------------------- -This simulation doesn't use any filters, and so would never work in the real world unless there were no other transmissions -within a few octaves of your sampling bandwidth. -You can see in the FFT scope on transmit that it is indeed USB transmission. - -The first example uses the phasing method to transmit on a synthetic 50 kHz carrier frequency Fc, upper sideband only. -Next, you receive this Fc=50 kHz USB transmission using a Weaver method SSB receiver. - - -The noise floor on transmit extends from Fc to Fc+24 kHz as that's 1/2 the bandwidth of your 48kHz sound card. You can transmit -some hi-fidelity SSB audio of DC to 24kHz, limited by your PC microphone and speakers. -You may hear some background crackling and aliasing artifacts due to no digital filtering used, but overall -the audio should be relatively clear. - -You can see the reference material to add the appropriate filters to these examples as an exercise. - -1. Open and Run ssbTXhilbert.grc in GNU Radio Companion. This records your voice via your computer microphone to a file $HOME/test.dat. This file is complex float data. Hit Stop to stop recording. -2. Open and Run ssbRX.grc in GNU Radio Companion. This loops back your voice through your computer speakers. - -.. image:: gfx/ssbRX.grc.png - :alt: simplest no-filter SSB receiver - -.. image:: gfx/ssbTXhilbert.grc.png - :alt: simplest no-filter SSB transmitter - - diff --git a/mypy.ini b/mypy.ini new file mode 100644 index 0000000..bf6bcdf --- /dev/null +++ b/mypy.ini @@ -0,0 +1,4 @@ +[mypy] +ignore_missing_imports = True +strict_optional = False +allow_redefinition = True \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..7418137 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,5 @@ +[build-system] +requires = ["setuptools", "wheel"] + +[tool.black] +line-length = 99 diff --git a/sim_amfmssb.py b/sim_amfmssb.py index 085b036..aa75b44 100755 --- a/sim_amfmssb.py +++ b/sim_amfmssb.py @@ -3,22 +3,21 @@ from math import log10 from PyQt4 import Qt -from gnuradio import audio, analog, blocks, digital, filter, gr, qtgui +from gnuradio import audio, analog, blocks, filter, gr, qtgui from gnuradio.eng_option import eng_option from gnuradio.filter import firdes from gnuradio.qtgui import Range, RangeWidget from optparse import OptionParser import sip import sys -import threading -import time -import numpy as np + # from sinad import sinad_ff -v2dbm = 316.18e-3 # V +v2dbm = 316.18e-3 # V + +modtype = "fm" -modtype='fm' def common_params(self): """ @@ -26,148 +25,151 @@ def common_params(self): """ self.samp_rate = 192000 self.fs_audio = 48000 -#%% transmitter + # %% transmitter self.tonelevel_dB = -4.77 self._tonelevel_dB_range = Range(-60, 0, 1, -4.77, 200) - self._tonelevel_dB_win = RangeWidget(self._tonelevel_dB_range, self.set_tonelevel_dB, "Mod. Level [dB]", "counter_slider", float) + self._tonelevel_dB_win = RangeWidget( + self._tonelevel_dB_range, self.set_tonelevel_dB, "Mod. Level [dB]", "counter_slider", float + ) self.top_layout.addWidget(self._tonelevel_dB_win) - self.Ptx_dBm = 30. - self._Ptx_dBm_range = Range(-30,60,1,30,200) - self._Ptx_dBm_win = RangeWidget(self._Ptx_dBm_range,self.set_Ptx_dBm, "Transmit Power [dBm]","counter_slider",float) + self.Ptx_dBm = 30.0 + self._Ptx_dBm_range = Range(-30, 60, 1, 30, 200) + self._Ptx_dBm_win = RangeWidget( + self._Ptx_dBm_range, self.set_Ptx_dBm, "Transmit Power [dBm]", "counter_slider", float + ) self.top_layout.addWidget(self._Ptx_dBm_win) self.ftx = 50e3 -#%% propagation + # %% propagation self.pathloss_dB = 140 -#%% receiver - self.bwrx = 45000 #[hz] - self.fmbw = 15000 #[hz] - self.nf = 8 #[dB] noise figure + # %% receiver + self.bwrx = 45000 # [hz] + self.fmbw = 15000 # [hz] + self.nf = 8 # [dB] noise figure self.frx = 50e3 - self._frx_range = Range(40e3, 60e3, .1e3, 50e3, 200) - self._frx_win = RangeWidget(self._frx_range, self.set_frx, "RX freq. [Hz]", "counter_slider", float) + self._frx_range = Range(40e3, 60e3, 0.1e3, 50e3, 200) + self._frx_win = RangeWidget( + self._frx_range, self.set_frx, "RX freq. [Hz]", "counter_slider", float + ) self.top_layout.addWidget(self._frx_win) - self.rx_decim = self.samp_rate/self.fs_audio + self.rx_decim = self.samp_rate / self.fs_audio - self.Anoise = 10**((-174.4 + 10*log10(self.bwrx) + self.nf)/20) + self.Anoise = 10 ** ((-174.4 + 10 * log10(self.bwrx) + self.nf) / 20) self.snr = None + def module_setup(self): -#%% transmitter + # %% transmitter self.mod_upsample = filter.rational_resampler_ccc( - interpolation=self.samp_rate//self.fs_audio, - decimation=1, - taps=None, - fractional_bw=None,) + interpolation=self.samp_rate // self.fs_audio, decimation=1, taps=None, fractional_bw=None + ) - self.mod_source = analog.sig_source_f(self.fs_audio, analog.GR_SIN_WAVE, - 1000, 10**(self.tonelevel_dB/20), 0) + self.mod_source = analog.sig_source_f( + self.fs_audio, analog.GR_SIN_WAVE, 1000, 10 ** (self.tonelevel_dB / 20), 0 + ) - if modtype=='am': + if modtype == "am": self.mod_out = blocks.float_to_complex() - self.tx_carrier_level = blocks.add_const_ff(1) #normalized - elif modtype=='fm': - self.mod_out = analog.nbfm_tx( - audio_rate = self.fs_audio, - quad_rate = self.samp_rate, - tau = 75e-6, - max_dev=self.fmbw, - ) - elif modtype=='ssb': + self.tx_carrier_level = blocks.add_const_ff(1) # normalized + elif modtype == "fm": + self.mod_out = analog.nbfm_tx( + audio_rate=self.fs_audio, quad_rate=self.samp_rate, tau=75e-6, max_dev=self.fmbw + ) + elif modtype == "ssb": self.hilb = filter.hilbert_fc(256, firdes.WIN_HAMMING, 6.76) self.TX_LO = analog.sig_source_c(self.samp_rate, analog.GR_SIN_WAVE, self.ftx, 1, 0) self.tx_mixer = blocks.multiply_cc() - self.transmit_amplifier = blocks.multiply_const_cc(v2dbm * 10**(self.Ptx_dBm/20)) -#%% propagation, noise, interference - self.pathloss = blocks.multiply_const_cc(10**(-self.pathloss_dB/20)) + self.transmit_amplifier = blocks.multiply_const_cc(v2dbm * 10 ** (self.Ptx_dBm / 20)) + # %% propagation, noise, interference + self.pathloss = blocks.multiply_const_cc(10 ** (-self.pathloss_dB / 20)) self.thermal_noise = analog.noise_source_c(analog.GR_GAUSSIAN, self.Anoise, 0) self.channeladder = blocks.add_cc() -#%% receiver + # %% receiver self.rx_lo = analog.sig_source_c(self.samp_rate, analog.GR_SIN_WAVE, -self.frx, 1, 0) self.rx_mixer = blocks.multiply_cc() self.rx_downsample = filter.rational_resampler_ccc( - interpolation=1, - decimation=self.samp_rate//self.fs_audio, - taps=None, - fractional_bw=None,) + interpolation=1, decimation=self.samp_rate // self.fs_audio, taps=None, fractional_bw=None + ) - if modtype=='am': - self.rxif_filter = filter.fir_filter_ccc(1, firdes.low_pass( - 1, self.fs_audio, self.bwrx/2, 200, firdes.WIN_HAMMING, 6.76)) + if modtype == "am": + self.rxif_filter = filter.fir_filter_ccc( + 1, firdes.low_pass(1, self.fs_audio, self.bwrx / 2, 200, firdes.WIN_HAMMING, 6.76) + ) self.agc = analog.agc2_cc(attack_rate=1, decay_rate=1, reference=0.9, gain=100) self.agc.set_max_gain(65535) - self.demod = blocks.complex_to_mag() #envelope det - elif modtype=='fm': - self.rxif_filter = filter.fir_filter_ccc(1, firdes.low_pass( - 1, self.fs_audio, self.bwrx/2, 200, firdes.WIN_HAMMING, 6.76)) + self.demod = blocks.complex_to_mag() # envelope det + elif modtype == "fm": + self.rxif_filter = filter.fir_filter_ccc( + 1, firdes.low_pass(1, self.fs_audio, self.bwrx / 2, 200, firdes.WIN_HAMMING, 6.76) + ) self.demod = analog.nbfm_rx( - audio_rate=self.fs_audio, - quad_rate=self.fs_audio, - tau=75e-6, - max_dev=self.fmbw, - ) - elif modtype=='ssb': - self.rxif_filter = filter.fir_filter_ccc(1, firdes.band_pass( - 1, self.samp_rate, self.frx+300, self.frx+4000, 2500, firdes.WIN_HAMMING, 6.76)) + audio_rate=self.fs_audio, quad_rate=self.fs_audio, tau=75e-6, max_dev=self.fmbw + ) + elif modtype == "ssb": + self.rxif_filter = filter.fir_filter_ccc( + 1, + firdes.band_pass( + 1, self.samp_rate, self.frx + 300, self.frx + 4000, 2500, firdes.WIN_HAMMING, 6.76 + ), + ) self.agc = analog.agc2_cc(attack_rate=1, decay_rate=1, reference=0.9, gain=100) self.agc.set_max_gain(65535) self.demod = blocks.complex_to_float() - self.audio_bpf = filter.fir_filter_fff(1, firdes.band_pass( - 1, self.fs_audio, 300, 3400, 200, firdes.WIN_HAMMING, 6.76)) + self.audio_bpf = filter.fir_filter_fff( + 1, firdes.band_pass(1, self.fs_audio, 300, 3400, 200, firdes.WIN_HAMMING, 6.76) + ) self.audio_out = audio.sink(self.fs_audio, "", False) -def text(tl,snr): + +def text(tl, snr): txt = Qt.QToolBar() if None: - _snr_formatter = None + _snr_formatter = None else: - _snr_formatter = lambda x: x - txt.addWidget(Qt.QLabel("snr"+": ")) + def _snr_formatter(x): + return x + + txt.addWidget(Qt.QLabel("snr" + ": ")) _snr_label = Qt.QLabel(str(_snr_formatter(snr))) txt.addWidget(_snr_label) tl.addWidget(txt) return txt -def scope(name,fsamp,tl,vpk,ntype,autoscale=False): + +def scope(name, fsamp, tl, vpk, ntype, autoscale=False): if ntype is complex: scope = qtgui.time_sink_c( - 1024, #size - fsamp, #samp_rate - name, #name - 1 #number of inputs - ) + 1024, fsamp, name, 1 # size # samp_rate # name # number of inputs + ) else: scope = qtgui.time_sink_f( - 1024, #size - fsamp, #samp_rate - name, #name - 1 #number of inputs - ) + 1024, fsamp, name, 1 # size # samp_rate # name # number of inputs + ) scope.set_update_time(0.10) - if isinstance(vpk,(tuple,list)): - scope.set_y_axis(vpk[0],vpk[1]) + if isinstance(vpk, (tuple, list)): + scope.set_y_axis(vpk[0], vpk[1]) else: scope.set_y_axis(-vpk, vpk) @@ -177,42 +179,43 @@ def scope(name,fsamp,tl,vpk,ntype,autoscale=False): scope.enable_autoscale(autoscale) scope.enable_grid(True) scope.enable_control_panel(False) -# self.scope_tx.disable_legend() + # self.scope_tx.disable_legend() _scope_win = sip.wrapinstance(scope.pyqwidget(), Qt.QWidget) tl.addWidget(_scope_win) return scope -def specan(name,fsamp,tl,ntype,autoscale=True,minmax=(-180,0)): - if ntype is complex: + +def specan(name, fsamp, tl, ntype, autoscale=True, minmax=(-180, 0)): + if isinstance(ntype, complex): specan = qtgui.freq_sink_c( - 1024, #size - firdes.WIN_RECTANGULAR, #wintype - 0, #fc - fsamp, #bw - name, #name - 1 #number of inputs - ) + 1024, # size + firdes.WIN_RECTANGULAR, # wintype + 0, # fc + fsamp, # bw + name, # name + 1, # number of inputs + ) else: specan = qtgui.freq_sink_f( - 1024, #size - firdes.WIN_RECTANGULAR, #wintype - 0, #fc - fsamp, #bw - name, #name - 1 #number of inputs - ) + 1024, # size + firdes.WIN_RECTANGULAR, # wintype + 0, # fc + fsamp, # bw + name, # name + 1, # number of inputs + ) specan.set_update_time(0.10) - specan.set_y_axis(minmax[0],minmax[1]) + specan.set_y_axis(minmax[0], minmax[1]) specan.set_trigger_mode(qtgui.TRIG_MODE_FREE, 0.0, 0, "") specan.enable_autoscale(autoscale) specan.enable_grid(True) specan.set_fft_average(1.0) specan.enable_control_panel(True) -# specan.disable_legend() + # specan.disable_legend() - if complex == type(float()): + if isinstance(complex, float): specan.set_plot_pos_half(not True) _specan_win = sip.wrapinstance(specan.pyqwidget(), Qt.QWidget) @@ -221,15 +224,14 @@ def specan(name,fsamp,tl,ntype,autoscale=True,minmax=(-180,0)): class top_block(gr.top_block, Qt.QWidget): - def __init__(self): gr.top_block.__init__(self, "Top Block") Qt.QWidget.__init__(self) self.setWindowTitle("Top Block") try: - self.setWindowIcon(Qt.QIcon.fromTheme('gnuradio-grc')) - except: - pass + self.setWindowIcon(Qt.QIcon.fromTheme("gnuradio-grc")) + except Exception: + pass self.top_scroll_layout = Qt.QVBoxLayout() self.setLayout(self.top_scroll_layout) self.top_scroll = Qt.QScrollArea() @@ -243,176 +245,181 @@ def __init__(self): self.top_layout.addLayout(self.top_grid_layout) self.settings = Qt.QSettings("GNU Radio", "top_block") - # self.restoreGeometry(self.settings.value("geometry").toByteArray()) -#%% setup radio + # self.restoreGeometry(self.settings.value("geometry").toByteArray()) + # %% setup radio common_params(self) module_setup(self) -#%% Connect transmitter + # %% Connect transmitter self.connect(self.TX_LO, (self.tx_mixer, 0)) - if modtype=='am': - self.connect(self.mod_source, - self.tx_carrier_level, - self.mod_out, - self.mod_upsample, - (self.tx_mixer, 1)) - elif modtype=='fm': - self.connect(self.mod_source, - self.mod_out, - (self.tx_mixer, 1)) - elif modtype=='ssb': - self.connect(self.mod_source, - self.hilb, - self.mod_upsample, - (self.tx_mixer,1)) - - - self.connect(self.tx_mixer, - self.transmit_amplifier, - self.pathloss) -#%% connect channel model - self.connect(self.pathloss,(self.channeladder, 0)) - self.connect(self.thermal_noise,(self.channeladder,1)) -#%% connect receiver - self.connect(self.rx_lo,(self.rx_mixer,0)) - - - if modtype=='am': - self.connect(self.channeladder,(self.rx_mixer,1)) - self.connect(self.rx_mixer, - self.rx_downsample, - self.rxif_filter, - self.agc, - self.demod, - self.audio_bpf, - self.audio_out) - elif modtype=='fm': - self.connect(self.channeladder,(self.rx_mixer,1)) - self.connect(self.rx_mixer, - self.rx_downsample, - self.rxif_filter, - self.demod, - self.audio_bpf, - self.audio_out) - elif modtype=='ssb': - self.connect(self.channeladder, - self.rxif_filter, - (self.rx_mixer,1),) - self.connect(self.rx_mixer, - self.rx_downsample, - self.agc, - (self.demod,0), - self.audio_bpf, - self.audio_out) - -#%% SNR + if modtype == "am": + self.connect( + self.mod_source, + self.tx_carrier_level, + self.mod_out, + self.mod_upsample, + (self.tx_mixer, 1), + ) + elif modtype == "fm": + self.connect(self.mod_source, self.mod_out, (self.tx_mixer, 1)) + elif modtype == "ssb": + self.connect(self.mod_source, self.hilb, self.mod_upsample, (self.tx_mixer, 1)) + + self.connect(self.tx_mixer, self.transmit_amplifier, self.pathloss) + # %% connect channel model + self.connect(self.pathloss, (self.channeladder, 0)) + self.connect(self.thermal_noise, (self.channeladder, 1)) + # %% connect receiver + self.connect(self.rx_lo, (self.rx_mixer, 0)) + + if modtype == "am": + self.connect(self.channeladder, (self.rx_mixer, 1)) + self.connect( + self.rx_mixer, + self.rx_downsample, + self.rxif_filter, + self.agc, + self.demod, + self.audio_bpf, + self.audio_out, + ) + elif modtype == "fm": + self.connect(self.channeladder, (self.rx_mixer, 1)) + self.connect( + self.rx_mixer, + self.rx_downsample, + self.rxif_filter, + self.demod, + self.audio_bpf, + self.audio_out, + ) + elif modtype == "ssb": + self.connect(self.channeladder, self.rxif_filter, (self.rx_mixer, 1)) + self.connect( + self.rx_mixer, + self.rx_downsample, + self.agc, + (self.demod, 0), + self.audio_bpf, + self.audio_out, + ) + + # %% SNR # only SVR observed to work and only up to 18dB SNR. Need to use sinad instead, or quieting ratio. -# self._snr_tool_bar = Qt.QToolBar(self) -# -# if None: -# self._snr_formatter = None -# else: -# self._snr_formatter = lambda x: x -# -# self._snr_tool_bar.addWidget(Qt.QLabel("snr"+": ")) -# self._snr_label = Qt.QLabel(str(self._snr_formatter(self.snr))) -# self._snr_tool_bar.addWidget(self._snr_label) -# self.top_layout.addWidget(self._snr_tool_bar) -################ - # self.snr_est = digital.probe_mpsk_snr_est_c(digital.SNR_EST_SVR, self.samp_rate//10, 0.001) - #self.connect(self.rx_downsample, self.snr_est) -################ - # text(self.top_layout,self.snr_est) - -# def _snr_disp_probe(): -# while True: -# val = self.snr_est.snr() -# try: -# self.set_snr_disp('{:.2f}'.format(val)) -# except AttributeError: -# pass -# time.sleep(1.0 / (2)) -# _snr_disp_thread = threading.Thread(target=_snr_disp_probe) -# _snr_disp_thread.daemon = True -# _snr_disp_thread.start() -#%% SINAD -#TODO make probe instead of stream - self.sinad_est = sinad_ff(1000,self.fs_audio) + # self._snr_tool_bar = Qt.QToolBar(self) + # + # if None: + # self._snr_formatter = None + # else: + # self._snr_formatter = lambda x: x + # + # self._snr_tool_bar.addWidget(Qt.QLabel("snr"+": ")) + # self._snr_label = Qt.QLabel(str(self._snr_formatter(self.snr))) + # self._snr_tool_bar.addWidget(self._snr_label) + # self.top_layout.addWidget(self._snr_tool_bar) + ################ + # self.snr_est = digital.probe_mpsk_snr_est_c(digital.SNR_EST_SVR, self.samp_rate//10, 0.001) + # self.connect(self.rx_downsample, self.snr_est) + ################ + # text(self.top_layout,self.snr_est) + + # def _snr_disp_probe(): + # while True: + # val = self.snr_est.snr() + # try: + # self.set_snr_disp('{:.2f}'.format(val)) + # except AttributeError: + # pass + # time.sleep(1.0 / (2)) + # _snr_disp_thread = threading.Thread(target=_snr_disp_probe) + # _snr_disp_thread.daemon = True + # _snr_disp_thread.start() + # %% SINAD + # TODO make probe instead of stream + self.sinad_est = sinad_ff(1000, self.fs_audio) self.connect(self.audio_bpf, self.sinad_est) - scope_sinad = scope("SINAD",self.fs_audio,self.top_layout,(0,50),float) - self.connect(self.sinad_est,scope_sinad) - -# self.sinad_probe = blocks.probe_signal_f() -# sinad_ff(1000,self.fs_audio) -# def _sinad_probe(): -# while True: -# sinad = self.sinad_probe.level() -# print(sinad) -# time.sleep(0.5) -# -# #setup thread -# _sinad_thread=threading.Thread(target=_sinad_probe) -# _sinad_thread.daemon = True -# _sinad_thread.start() -# -# self.connect(self.audio_bpf,self.sinad_probe) - - - -#%% plots + scope_sinad = scope("SINAD", self.fs_audio, self.top_layout, (0, 50), float) + self.connect(self.sinad_est, scope_sinad) + + # self.sinad_probe = blocks.probe_signal_f() + # sinad_ff(1000,self.fs_audio) + # def _sinad_probe(): + # while True: + # sinad = self.sinad_probe.level() + # print(sinad) + # time.sleep(0.5) + # + # #setup thread + # _sinad_thread=threading.Thread(target=_sinad_probe) + # _sinad_thread.daemon = True + # _sinad_thread.start() + # + # self.connect(self.audio_bpf,self.sinad_probe) + + # %% plots if 0: - scope_tx = scope("TX output",self.samp_rate,self.top_layout,v2dbm,complex) + scope_tx = scope("TX output", self.samp_rate, self.top_layout, v2dbm, complex) self.connect(self.transmit_amplifier, scope_tx) if 0: - scope_rxant = scope("RX input",self.samp_rate,self.top_layout,2e-6,complex) - self.connect(self.channeladder,scope_rxant) + scope_rxant = scope("RX input", self.samp_rate, self.top_layout, 2e-6, complex) + self.connect(self.channeladder, scope_rxant) if 0: - scope_rxif_filter = scope("RX IF filtered",self.samp_rate,self.top_layout,2e-6,complex) - self.connect(self.rxif_filter,scope_rxif_filter) + scope_rxif_filter = scope( + "RX IF filtered", self.samp_rate, self.top_layout, 2e-6, complex + ) + self.connect(self.rxif_filter, scope_rxif_filter) if 1: - if modtype in ('am','ssb'): - specan_rxif = specan("RX IF",self.samp_rate,self.top_layout,complex,False,(-180,-100)) - self.connect(self.rxif_filter,specan_rxif) + if modtype in ("am", "ssb"): + specan_rxif = specan( + "RX IF", self.samp_rate, self.top_layout, complex, False, (-180, -100) + ) + self.connect(self.rxif_filter, specan_rxif) else: - specan_rxif = specan("RX IF",self.fs_audio,self.top_layout,complex,False,(-180,-100)) - self.connect(self.rx_downsample,specan_rxif) + specan_rxif = specan( + "RX IF", self.fs_audio, self.top_layout, complex, False, (-180, -100) + ) + self.connect(self.rx_downsample, specan_rxif) if 0: - scope_rxaudio = scope("RX audio",self.samp_rate,self.top_layout,v2dbm,float) - self.connect(self.audio_bpf,scope_rxaudio) + scope_rxaudio = scope("RX audio", self.samp_rate, self.top_layout, v2dbm, float) + self.connect(self.audio_bpf, scope_rxaudio) if 1: - specan_rxaudio = specan("RX audio",self.fs_audio,self.top_layout,float,False,(-100,0)) - self.connect(self.audio_bpf,specan_rxaudio) + specan_rxaudio = specan( + "RX audio", self.fs_audio, self.top_layout, float, False, (-100, 0) + ) + self.connect(self.audio_bpf, specan_rxaudio) + # %% -#%% def closeEvent(self, event): self.settings = Qt.QSettings("GNU Radio", "top_block") self.settings.setValue("geometry", self.saveGeometry()) event.accept() -#%% functions -- keep these for current/future GUI that GNURadio will call + + # %% functions -- keep these for current/future GUI that GNURadio will call + def get_samp_rate(self): return self.samp_rate def set_samp_rate(self, samp_rate): self.samp_rate = samp_rate - self.set_rx_decim(self.samp_rate/self.fs_audio) + self.set_rx_decim(self.samp_rate / self.fs_audio) self.TX_LO.set_sampling_freq(self.samp_rate) self.scope_tx.set_samp_rate(self.samp_rate) - self.snr_est.set_msg_nsample(self.samp_rate/10) + self.snr_est.set_msg_nsample(self.samp_rate / 10) def get_fs_audio(self): return self.fs_audio def set_fs_audio(self, fs_audio): self.fs_audio = fs_audio - self.set_rx_decim(self.samp_rate/self.fs_audio) + self.set_rx_decim(self.samp_rate / self.fs_audio) self.mod_source.set_sampling_freq(self.fs_audio) def get_tonelevel_dB(self): @@ -420,7 +427,7 @@ def get_tonelevel_dB(self): def set_tonelevel_dB(self, tonelevel_dB): self.tonelevel_dB = tonelevel_dB - self.mod_source.set_amplitude( 10**(self.tonelevel_dB/20)) #normalized + self.mod_source.set_amplitude(10 ** (self.tonelevel_dB / 20)) # normalized def get_rx_decim(self): return self.rx_decim @@ -453,7 +460,7 @@ def get_Ptx_dBm(self): def set_Ptx_dBm(self, Ptx_dBm): self.Ptx_dBm = Ptx_dBm - self.transmit_amplifier.set_k(v2dbm * 10**(self.Ptx_dBm/20)) + self.transmit_amplifier.set_k(v2dbm * 10 ** (self.Ptx_dBm / 20)) def get_Anoise(self): return self.Anoise @@ -476,20 +483,22 @@ def set_snr_disp(self, snr_disp): self.set_snr(self._snr_formatter(self.snr_disp)) -if __name__ == '__main__': +if __name__ == "__main__": import ctypes - if sys.platform.startswith('linux'): + + if sys.platform.startswith("linux"): try: - x11 = ctypes.cdll.LoadLibrary('libX11.so') + x11 = ctypes.cdll.LoadLibrary("libX11.so") x11.XInitThreads() - except: - print "Warning: failed to XInitThreads()" + except Exception: + print("Warning: failed to XInitThreads()") parser = OptionParser(option_class=eng_option, usage="%prog: [options]") (options, args) = parser.parse_args() from distutils.version import StrictVersion + if StrictVersion(Qt.qVersion()) >= StrictVersion("4.5.0"): - Qt.QApplication.setGraphicsSystem(gr.prefs().get_string('qtgui','style','raster')) + Qt.QApplication.setGraphicsSystem(gr.prefs().get_string("qtgui", "style", "raster")) qapp = Qt.QApplication(sys.argv) tb = top_block() tb.start() @@ -498,6 +507,7 @@ def set_snr_disp(self, snr_disp): def quitting(): tb.stop() tb.wait() + qapp.connect(qapp, Qt.SIGNAL("aboutToQuit()"), quitting) qapp.exec_() tb = None # to clean up Qt widgets diff --git a/sinad.py b/sinad.py index 139652e..34e0e0a 100644 --- a/sinad.py +++ b/sinad.py @@ -21,20 +21,18 @@ import numpy as np from gnuradio import gr + class sinad_ff(gr.sync_block): """ docstring for block sinad_ff """ + def __init__(self, sinadFreq, Fs): - gr.sync_block.__init__(self, - "sinad_ff", - [np.float32], - [np.float32]) + gr.sync_block.__init__(self, "sinad_ff", [np.float32], [np.float32]) self.Fs = Fs self.fRef = sinadFreq self.refWidth = 20 - def work(self, input_items, output_items): output_items[0][:] = self.__calc_sinad(input_items[0]) return len(output_items[0]) @@ -42,41 +40,46 @@ def work(self, input_items, output_items): def __calc_sinad(self, data): """ Takes float array and returns the sinad in dB """ -#%% compute FFT + # %% compute FFT # 5/9/07 - added data windowing - ASP - window = np.hamming( np.size(data) ) - for n in range( np.size(data) ): + window = np.hamming(np.size(data)) + for n in range(np.size(data)): data[n] = data[n] * window[n] psd = np.fft.fft(data) psd = psd.flatten() -#%% determine bin indices - bin1 = int(np.floor(float((self.fRef-self.refWidth/2))/float(self.Fs) * psd.shape[0])) - 1 - bin2 = int(np.ceil(float((self.fRef+self.refWidth/2))/float(self.Fs) * psd.shape[0])) - 1 + # %% determine bin indices + bin1 = ( + int(np.floor(float((self.fRef - self.refWidth / 2)) / float(self.Fs) * psd.shape[0])) + - 1 + ) + bin2 = ( + int(np.ceil(float((self.fRef + self.refWidth / 2)) / float(self.Fs) * psd.shape[0])) + - 1 + ) # 4/9/07 - added filtering 300 - 3000Hz ASP - bin300hz = int(np.floor(float(300)/float(self.Fs) * psd.shape[0])) - 1 - bin3000hz = int(np.floor(float(3000)/float(self.Fs) * psd.shape[0])) - 1 + bin300hz = int(np.floor(float(300) / float(self.Fs) * psd.shape[0])) - 1 + bin3000hz = int(np.floor(float(3000) / float(self.Fs) * psd.shape[0])) - 1 -#%% calculate SINAD = 10*log10(Ps/Pn) + # %% calculate SINAD = 10*log10(Ps/Pn) # 4/9/07 change signal = psd[bin1:bin2] to psd[bin300hz:bin3000hz] - ASP signal = psd[bin300hz:bin3000hz] - noise = np.concatenate((psd[bin300hz:bin1],psd[(bin2+1):bin3000hz])) + noise = np.concatenate((psd[bin300hz:bin1], psd[(bin2 + 1):bin3000hz])) - #Ps1 = float(sum(signal.real*signal.real + signal.imag*signal.imag)) #15/8 DON - #Pn1 = float(sum(noise.real*noise.real + noise.imag*noise.imag)) #15/8 DON + # Ps1 = float(sum(signal.real*signal.real + signal.imag*signal.imag)) #15/8 DON + # Pn1 = float(sum(noise.real*noise.real + noise.imag*noise.imag)) #15/8 DON - Ps = np.sum(np.abs(signal)**2) - Pn = np.sum(np.abs(noise)**2) + Ps = np.sum(np.abs(signal) ** 2) + Pn = np.sum(np.abs(noise) ** 2) - sinad = 10*np.log10(Ps/Pn) + sinad = 10 * np.log10(Ps / Pn) # if the tone is not present, sinad will be < 0 if sinad < 0: sinad = 0 return sinad -