Skip to content

Commit

Permalink
Merge pull request #6 from andrewpaulreeves/main
Browse files Browse the repository at this point in the history
migrate simps -> simpson for new scipy
  • Loading branch information
ojdf authored Oct 15, 2024
2 parents 3da410b + df12745 commit 5063eaf
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 92 deletions.
127 changes: 63 additions & 64 deletions fast/comms.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,27 +3,26 @@
'''
import numpy
from . import Fast
from scipy.special import erfc
from scipy.special import erfc
from scipy.ndimage import correlate1d
from scipy.integrate import simps
from aotools import gaussian2d
import logging

logger = logging.getLogger(__name__)

class Modulator():
'''
Takes array of optical powers and modulates/demodulates according to a modulation
scheme (OOK, BPSK, QPSK, QAM, etc) with random bits. Can add AWGN at a given
average signal-to-noise ratio.
Takes array of optical powers and modulates/demodulates according to a modulation
scheme (OOK, BPSK, QPSK, QAM, etc) with random bits. Can add AWGN at a given
average signal-to-noise ratio.
This allows Monte Carlo computation of bit error rate or symbol error probability.
This allows Monte Carlo computation of bit error rate or symbol error probability.
Parameters:
power (numpy.ndarray): array of optical powers
modulation (string): modulation scheme
modulation (string): modulation scheme
EsN0 (float, optional): (average) symbol signal to noise ratio
symbols_per_iter (int, optional): Number of symbols per iteration of FAST.
symbols_per_iter (int, optional): Number of symbols per iteration of FAST.
Defaults to 1000.
'''
def __init__(self, power, modulation, EsN0=None, symbols_per_iter=1000, data=None):
Expand All @@ -43,15 +42,15 @@ def generate_symbols(self):

elif self.modulation in ["QPSK", "QAM"]:
self.nsymbols = 4

elif len(self.modulation.split("-")) ==2:
self.nsymbols = int(self.modulation.split("-")[0])

else:
raise ValueError("Scheme not recognised")

self.bits_per_symbol = numpy.log2(self.nsymbols).astype(int)

if self.data is not None:
s, self._pad_bits = _encode(self.data, self.bits_per_symbol)
self.symbols = numpy.array([s]*len(self.power)).T
Expand Down Expand Up @@ -83,23 +82,23 @@ def modulate(self):
self.awgn = 0

self.recv_signal = mod + self.awgn

return self.recv_signal

def demodulate(self):

if self.modulation == None:
self.recv_symbols = None
return self.recv_symbols

# fast ways for OOK and BPSK
if self.modulation == "OOK":
cutoff = 0.5
cutoff = 0.5
self.recv_symbols = (self.recv_signal > cutoff).astype(int)

elif self.modulation == "BPSK":
self.recv_symbols = (self.recv_signal.real < 0).astype(int)

else:
d = numpy.array([abs(self.recv_signal - i) for i in self.constellation])
self.recv_symbols = d.argmin(0)
Expand All @@ -121,7 +120,7 @@ def compute_sep(self):

else:
self.sep = (self.recv_symbols != self.symbols).mean()

return self.sep

def compute_evm(self):
Expand All @@ -131,12 +130,12 @@ def compute_evm(self):

if self.modulation == None:
self.evm = None

else:
tx_signal = self.constellation[self.symbols]
ref = numpy.sqrt((tx_signal.real**2 + tx_signal.imag**2).mean()) # RMS of constellation points
self.evm = (abs(tx_signal - self.recv_signal) / ref).mean()

return self.evm

def run(self):
Expand All @@ -148,7 +147,7 @@ def run(self):

class FastFSOC(Fast):
'''
Subclass of Fast simulation object, adds optical comms functionality
Subclass of Fast simulation object, adds optical comms functionality
(modulation, demodulation, generating random symbol sequences for testing)
'''

Expand Down Expand Up @@ -198,13 +197,13 @@ def fade_dur(I, threshold, dt=1, min_fades=30):
def ber_ook(EbN0, samples=None):
'''
Bit Error Rate for On-Off-Keying communications channel.
Monte Carlo integration of Eq. 58 from Andrews and Phillips (2005) Ch 11.
Note that electrical SNR per bit (Eb/N0) is used, not OSNR as in A+P.
Args:
EbN0 (float): average signal-to-noise ratio per bit Eb/N0 in electrical domain, dB
samples (numpy.ndarray, optional): random samples of received power from FAST.
samples (numpy.ndarray, optional): random samples of received power from FAST.
If None is provided, assume no atmosphere (i.e. intensity pdf = delta function)
Returns:
Expand All @@ -215,28 +214,28 @@ def ber_ook(EbN0, samples=None):
if samples is None:
# return BER assuming no atmospheric spreading
return Q(snr)
# Normalise samples by mean

# Normalise samples by mean
s = samples/samples.mean()

return Q(s * snr).mean()


def sep_qam(M, EsN0, samples=None):
'''
Symbol error probabilty for square M-ary QAM, from Rice.
Symbol error probabilty for square M-ary QAM, from Rice.
Parameters:
M (int): Number of symbols (must be perfect square)
EsN0 (float): Average electrical symbol signal-to-noise ratio [dB]
samples (numpy.ndarray, optional): random samples of received power from FAST.
samples (numpy.ndarray, optional): random samples of received power from FAST.
'''
EsN0_frac = 10**(EsN0/10)
prefactor = (numpy.sqrt(M)-1)/numpy.sqrt(M)

if samples is None:
return 4*(prefactor*Q(numpy.sqrt(3/(M-1) * EsN0_frac)) - prefactor**2 * Q(numpy.sqrt(3/(M-1) * EsN0_frac))**2)

s = samples / samples.mean()
EsN0_frac *= s**2

Expand All @@ -245,13 +244,13 @@ def sep_qam(M, EsN0, samples=None):

def ber_qam(M, EbN0, samples=None):
'''
Bit error rate for square M-ary QAM, from Rice. Assumes only nearest neighbour
Bit error rate for square M-ary QAM, from Rice. Assumes only nearest neighbour
bit errors and Gray coding, i.e. 1 bit error per symbol error.
Parameters:
M (int): Number of symbols (must be perfect square)
EbN0 (float): Average electrical signal-to-noise ratio per bit [dB]
samples (numpy.ndarray, optional): random samples of received power from FAST.
samples (numpy.ndarray, optional): random samples of received power from FAST.
'''
return 1/numpy.log2(M) * sep_qam(M, 10*numpy.log10(numpy.log2(M)) + EbN0, samples)

Expand All @@ -265,19 +264,19 @@ def Q(x):

def generalised_mutual_information_qam(samples, M, npxls, EsN0, N0=None, shot=False):
'''
Generalised Mutual Information (GMI), adapted from Alvarado et al (2016)
Generalised Mutual Information (GMI), adapted from Alvarado et al (2016)
[10.1109/JLT.2015.2450537] and Cho et al (2017) [10.1109/ECOC.2017.8345872].
Assumes
Assumes
1) Perfect interleaving (no correlation between bits)
2) Bit-wise decoder (no memory of other bits sent)
3) Gray encoding of QAM symbols
4) Soft decision decoding with FEC
4) Soft decision decoding with FEC
Args:
samples (numpy.ndarray): Array of Monte Carlo complex field measurements or amplitudes
M (int): number of symbols (must be perfect square)
npxls (int): Number of pixels to use for binning
npxls (int): Number of pixels to use for binning
EsN0 (float): Symbol signal-to-noise ratio [dB]
N0 (float, optional): Noise variance (overrides EsN0, can be set to 0)
Expand Down Expand Up @@ -318,31 +317,31 @@ def mutual_information_qam(samples, M, npxls, EsN0, N0=None, shot=False):
def convolve_awgn_qam(samples, M, npxls, EsN0, N0=None, region_size="individual", shot=False):
'''
Method of computing received I-Q plane for M-ary QAM under AWGN assumptions
given a series of complex field measurements.
given a series of complex field measurements.
Bins the Monte Carlo field measurements into a 2D array of npxls x npxls bins.
The overall size of the 2d array is determined by the decision region size,
which depends on the M-ary QAM constellation. This is then convolved with a
gaussian to include AWGN.
Bins the Monte Carlo field measurements into a 2D array of npxls x npxls bins.
The overall size of the 2d array is determined by the decision region size,
which depends on the M-ary QAM constellation. This is then convolved with a
gaussian to include AWGN.
By integrating this distribution, and averaging over all constellation regions,
we can obtain the probability that a transmitted symbol ends up outside the
decision region, i.e. a symbol error. This [may be] more robust than simply
adding AWGN to the individual Monte Carlo datapoints, because that method
By integrating this distribution, and averaging over all constellation regions,
we can obtain the probability that a transmitted symbol ends up outside the
decision region, i.e. a symbol error. This [may be] more robust than simply
adding AWGN to the individual Monte Carlo datapoints, because that method
is limited by the number of samples.
Args:
samples (numpy.ndarray): Array of Monte Carlo complex field measurements or amplitudes
M (int): number of symbols (must be perfect square)
npxls (int): Number of pixels to use for binning
npxls (int): Number of pixels to use for binning
EsN0 (float): Symbol signal-to-noise ratio [dB]
N0 (float, optional): Noise variance (overrides EsN0, can be set to 0)
separate (bool, optional): Separate each region of the constellation (default: True)
region_size (str, optional): "individual" or "full" region to define the PDF (default: "individual")
Returns:
out (numpy.ndarray): (nsymbols x npxls x npxls) array consisting of the
binned histogram for each symbol.
out (numpy.ndarray): (nsymbols x npxls x npxls) array consisting of the
binned histogram for each symbol.
'''
# define constellation
constellation = define_constellation(f"{M}-QAM")
Expand All @@ -361,20 +360,20 @@ def convolve_awgn_qam(samples, M, npxls, EsN0, N0=None, region_size="individual"
if N0 == None:
Es = numpy.mean(numpy.abs(constellation_norm)**2)
N0 = Es / 10**(EsN0/10)
# for large variance, increase the size of decision_region_size for "full"

# for large variance, increase the size of decision_region_size for "full"
# type region to include +2sigma and avoid clipping the calculated PDF
if region_size == "full":
region_size_required = 2*(numpy.mean(numpy.abs(samples))/numpy.sqrt(2) + 2 * numpy.sqrt(N0))
if region_size_required > decision_region_size_norm:
logger.debug("AWGN noise level too large for region, increasing region size")
# npxls = int(numpy.round(npxls * region_size_required / decision_region_size_norm))
decision_region_size_norm = region_size_required
decision_region_size_norm = region_size_required

dx = decision_region_size_norm / npxls
x_g = numpy.linspace(-npxls/2, npxls/2, npxls+1)
# compute variance in pixel units. if less than one, set equal to one to avoid
x_g = numpy.linspace(-npxls/2, npxls/2, npxls+1)

# compute variance in pixel units. if less than one, set equal to one to avoid
# numerical issues with normalisation
sigma2 = N0 / (2 * dx**2)
if sigma2 < 1:
Expand Down Expand Up @@ -410,7 +409,7 @@ def convolve_awgn_qam(samples, M, npxls, EsN0, N0=None, region_size="individual"
h_conv += \
h[ix[i],iy[i]] * \
gaussian2d(h.shape, numpy.sqrt(sigma2*sigma_mults[i]/2), cent=(ix[i], iy[i])) / (numpy.pi * sigma2 * sigma_mults[i])

out[c] = h_conv

return out
Expand All @@ -422,16 +421,16 @@ def define_constellation(modulation):
OOK - On-Off Keying
BPSK - Binary Phase Shift Keying
QPSK - Quadrature Phase Shift Keying
QAM - Quadrature Amplitude Modulation
QPSK - Quadrature Phase Shift Keying
QAM - Quadrature Amplitude Modulation
M-PSK - M-ary PSK (e.g. 16-PSK)
M-QAM - M-ary QAM (e.g. 16-QAM)
Parameters:
modulation (string): Modulation scheme (e.g. "BPSK" or "64-QAM")
Returns:
constellation (numpy.ndarray): Complex array representing the constellation,
constellation (numpy.ndarray): Complex array representing the constellation,
of dimension (nsymbols), real and imaginary parts correspond to the
two axes of the modulation.
'''
Expand All @@ -444,7 +443,7 @@ def define_constellation(modulation):
# binary phase shift keying
nsymbols = 2
constellation = numpy.exp(1j * numpy.arange(nsymbols) * numpy.pi)

elif modulation in ["QPSK", "QAM"]:
# quadrature PSK, quadrature amplitude modulation (same thing?)
nsymbols = 4
Expand Down Expand Up @@ -495,7 +494,7 @@ def _bin2gray_qam(M):
tmp = numpy.array(symbols_gray).reshape(nside, nside).copy()
for i in tmp[1::2]:
i[:] = i[::-1]

symbols_gray = tmp.flatten()

return symbols_gray
Expand All @@ -505,7 +504,7 @@ def _bit_at_index(code, index, bit):

bit = str(bit)
out = numpy.zeros(len(code), dtype=bool)
for i,c in enumerate(code):
for i,c in enumerate(code):
out[i] = c[index] == str(bit)

return out
Expand All @@ -516,7 +515,7 @@ def _encode(bs, bps):
bits = numpy.unpackbits(a)
pad_bits = 0

# if bps = 1 we are done
# if bps = 1 we are done
if bps == 1:
return bits, pad_bits

Expand All @@ -530,11 +529,11 @@ def _encode(bs, bps):

def _decode(symbols, bps, pad_bits=0):

# in the common case where bps = 1, symbols are the bits
# in the common case where bps = 1, symbols are the bits
if bps == 1:
return numpy.packbits(symbols)
# unpack bits of symbol array, cut off the unused bits

# unpack bits of symbol array, cut off the unused bits
bits = numpy.unpackbits(symbols).reshape(-1,8)[:,-bps:].flatten()
return numpy.packbits(bits).tobytes()[:-(pad_bits>0) or None]

Expand All @@ -557,5 +556,5 @@ def flip_bits(data, ber):
newdata = (newbytes%128).tobytes().decode("ascii")
else:
newdata = numpy.frombuffer(newbytes.tobytes(), dtype=data.dtype).reshape(data.shape)
return newdata

return newdata
Loading

0 comments on commit 5063eaf

Please sign in to comment.