Merge pull request #1639 from mndza/generic-sgpio-intf

gateware: generic SGPIOInterface, simpler capture mgmt, fix resampler bugs
This commit is contained in:
Michael Ossmann
2026-01-11 23:22:03 -05:00
committed by GitHub
13 changed files with 674 additions and 905 deletions

View File

@@ -37,14 +37,16 @@ class PralinePlatform(LatticeICE40Platform):
Attrs(IO_STANDARD="SB_LVCMOS")),
Resource("host_data", 0, Pins("21 19 6 13 10 3 4 18", dir="io"),
Attrs(IO_STANDARD="SB_LVCMOS")),
Resource("q_invert", 0, Pins("9", dir="i"),
Attrs(IO_STANDARD="SB_LVCMOS")),
Resource("direction", 0, Pins("12", dir="i"),
Attrs(IO_STANDARD="SB_LVCMOS")),
Resource("disable", 0, Pins("23", dir="i"),
Attrs(IO_STANDARD="SB_LVCMOS")),
Resource("capture_en", 0, Pins("11", dir="o"),
Attrs(IO_STANDARD="SB_LVCMOS")),
# Other I/O.
Resource("q_invert", 0, Pins("9", dir="i"),
Attrs(IO_STANDARD="SB_LVCMOS")),
Resource("trigger_in", 0, Pins("48", dir="i"),
Attrs(IO_STANDARD="SB_LVCMOS")),
Resource("trigger_out", 0, Pins("2", dir="o"),

Binary file not shown.

View File

@@ -7,7 +7,7 @@
from math import ceil, log2
from amaranth import Module, Signal, Mux, DomainRenamer
from amaranth.lib import wiring, stream, data, memory
from amaranth.lib import wiring, stream, data, memory, fifo
from amaranth.lib.wiring import In, Out
from amaranth.utils import bits_for
@@ -58,6 +58,17 @@ class HalfBandDecimator(wiring.Component):
# Arms
m.submodules.fir = fir = FIRFilter(fir_taps, shape=self.data_shape, always_ready=always_ready,
num_channels=1, add_tap=len(fir_taps)//2+1)
fir_out_odd = Signal()
with m.If(fir.output.valid & fir.output.ready):
m.d.sync += fir_out_odd.eq(~fir_out_odd)
odd = Signal()
with m.If(self.input.valid & self.input.ready):
m.d.sync += odd.eq(~odd)
# Only switch modes at even samples.
switch_stb = Signal()
m.d.comb += switch_stb.eq((~odd) ^ (self.input.valid & self.input.ready))
with m.FSM():
@@ -70,72 +81,54 @@ class HalfBandDecimator(wiring.Component):
if not self.input.signature.always_ready:
m.d.comb += self.input.ready.eq(1)
with m.If(self.enable):
with m.If(self.enable & switch_stb):
m.next = "DECIMATE"
with m.State("DECIMATE"):
# Input switching.
odd = Signal()
input_idx = Signal()
even_valid = Signal()
# I and Q channels are muxed in time, demuxed later in the output stage.
even_buffer = Signal.like(self.input.p)
q_inputs = Signal.like(self.input.p)
odd_buffer = Signal.like(self.input.p)
q_valid = Signal()
if not self.input.signature.always_ready:
m.d.comb += self.input.ready.eq((~odd & ~even_valid) | fir.input.ready)
m.d.comb += self.input.ready.eq(fir.input.ready)
# Even samples are buffered and used as a secondary
# carry addition for the FIR filter.
# I and Q channels are muxed in time, demuxed later in the output stage.
with m.If(self.input.valid & self.input.ready):
m.d.sync += odd.eq(~odd)
with m.If(~odd):
with m.If(~even_valid | fir.input.ready):
m.d.sync += even_valid.eq(self.input.valid)
with m.If(self.input.valid):
m.d.sync += even_buffer.eq(self.input.p)
with m.If(self.input.ready & self.input.valid):
with m.If(~odd):
m.d.sync += even_buffer.eq(self.input.p)
with m.Else():
m.d.sync += odd_buffer.eq(self.input.p)
m.d.sync += q_valid.eq(1)
# Process two I samples and two Q samples in sequence.
with m.If(fir.input.ready & fir.input.valid):
m.d.sync += input_idx.eq(input_idx ^ 1)
with m.If(input_idx == 0):
with m.If(odd):
m.d.comb += [
fir.add_input .eq(even_buffer[0]),
fir.input.p .eq(self.input.p[0]),
fir.input.valid .eq(self.input.valid & even_valid),
fir.input.valid .eq(self.input.valid),
]
with m.If(fir.input.ready & fir.input.valid):
m.d.sync += [
q_inputs[0].eq(even_buffer[1]),
q_inputs[1].eq(self.input.p[1]),
]
with m.Else():
m.d.comb += [
fir.add_input .eq(q_inputs[0]),
fir.input.p .eq(q_inputs[1]),
fir.input.valid .eq(1),
fir.add_input .eq(even_buffer[1]),
fir.input.p .eq(odd_buffer[1]),
fir.input.valid .eq(q_valid),
]
with m.If(fir.input.ready):
m.d.sync += q_valid.eq(0)
# Output sum and demux.
output_idx = Signal()
with m.If(~self.output.valid | self.output.ready):
if not fir.output.signature.always_ready:
m.d.comb += fir.output.ready.eq(1)
m.d.sync += self.output.valid.eq(fir.output.valid & output_idx)
m.d.sync += self.output.valid.eq(fir.output.valid & fir_out_odd)
with m.If(fir.output.valid):
m.d.sync += self.output.p[0].eq(self.output.p[1])
m.d.sync += self.output.p[1].eq(fir.output.p[0] * fixed.Const(0.5))
m.d.sync += output_idx.eq(output_idx ^ 1)
# Mode switch logic.
with m.If(~self.enable):
m.d.sync += input_idx.eq(0)
m.d.sync += output_idx.eq(0)
m.d.sync += odd.eq(0)
m.d.sync += even_valid.eq(0)
# Mode switch logic
with m.If(~self.enable & switch_stb):
m.d.sync += even_buffer.eq(0)
m.d.sync += odd_buffer.eq(0)
m.next = "BYPASS"
if self._domain != "sync":
@@ -180,9 +173,17 @@ class HalfBandInterpolator(wiring.Component):
delay = arm1_taps.index(1)
# Arms
m.submodules.fir0 = fir0 = FIRFilter(arm0_taps, shape=self.data_shape, shape_out=self.shape_out, always_ready=always_ready, num_channels=self.num_channels)
m.submodules.fir1 = fir1 = Delay(delay, shape=self.data_shape, always_ready=always_ready, num_channels=self.num_channels)
arms = [fir0, fir1]
m.submodules.fir = fir = FIRFilter(arm0_taps, shape=self.data_shape, shape_out=self.shape_out, always_ready=always_ready, num_channels=self.num_channels)
m.submodules.dly = dly = Delay(delay, shape=self.data_shape, always_ready=always_ready, num_channels=self.num_channels)
m.submodules.dly_fifo = dly_fifo = fifo.SyncFIFOBuffered(width=self.num_channels*self.data_shape.as_shape().width, depth=1)
arms = [fir, dly]
m.d.comb += [
dly_fifo.w_data.eq(dly.output.p),
dly_fifo.w_en.eq(dly.output.valid),
]
if not dly.output.signature.always_ready:
m.d.comb += dly.output.ready.eq(dly_fifo.w_rdy)
with m.FSM():
@@ -205,7 +206,6 @@ class HalfBandInterpolator(wiring.Component):
m.next = "BYPASS"
# Input
for i, arm in enumerate(arms):
m.d.comb += arm.input.payload.eq(self.input.payload)
m.d.comb += arm.input.valid.eq(self.input.valid & arms[i^1].input.ready)
@@ -218,29 +218,25 @@ class HalfBandInterpolator(wiring.Component):
arm_index = Signal()
# Output buffers for each arm.
arm_outputs = [arm.output for arm in arms]
if self.output.signature.always_ready:
buffers = [stream.Signature(arm.payload.shape()).create() for arm in arm_outputs]
for arm, buf in zip(arm_outputs, buffers):
with m.If(~buf.valid | buf.ready):
if not arm.signature.always_ready:
m.d.comb += arm.ready.eq(1)
m.d.sync += buf.valid.eq(arm.valid)
with m.If(arm.valid):
m.d.sync += buf.payload.eq(arm.payload)
arm_outputs = buffers
r_data_cast = data.ArrayLayout(self.data_shape, self.num_channels)(dly_fifo.r_data)
with m.If(~self.output.valid | self.output.ready):
with m.Switch(arm_index):
for i, arm in enumerate(arm_outputs):
with m.Case(i):
for c in range(self.num_channels):
m.d.sync += self.output.payload[c].eq(arm.payload[c])
m.d.sync += self.output.valid.eq(arm.valid)
if not arm.signature.always_ready:
m.d.comb += arm.ready.eq(1)
with m.If(arm.valid):
m.d.sync += arm_index.eq(arm_index ^ 1)
with m.Case(0):
for c in range(self.num_channels):
m.d.sync += self.output.payload[c].eq(fir.output.payload[c])
m.d.sync += self.output.valid.eq(fir.output.valid)
if not fir.output.signature.always_ready:
m.d.comb += fir.output.ready.eq(1)
with m.If(fir.output.valid):
m.d.sync += arm_index.eq(1)
with m.Case(1):
for c in range(self.num_channels):
m.d.sync += self.output.payload[c].eq(r_data_cast[c])
m.d.sync += self.output.valid.eq(dly_fifo.r_rdy)
m.d.comb += dly_fifo.r_en.eq(1)
with m.If(dly_fifo.r_rdy):
m.d.sync += arm_index.eq(0)
if self._domain != "sync":
m = DomainRenamer(self._domain)(m)
@@ -446,24 +442,26 @@ class _TestFilter(unittest.TestCase):
return samples / (1 << f_width)
return samples
def _filter(self, dut, samples, count, num_channels=1, outfile=None, empty_cycles=0):
def _filter(self, dut, samples, count, num_channels=1, outfile=None, empty_cycles=0, empty_ready_cycles=0):
async def input_process(ctx):
if hasattr(dut, "enable"):
ctx.set(dut.enable, 1)
await ctx.tick()
ctx.set(dut.input.valid, 1)
for sample in samples:
await ctx.tick()
for i, sample in enumerate(samples):
if num_channels > 1:
ctx.set(dut.input.payload, [s.item() for s in sample])
else:
ctx.set(dut.input.payload, [sample.item()])
if isinstance(dut.input.payload.shape(), data.ArrayLayout):
ctx.set(dut.input.payload, [sample.item()])
else:
ctx.set(dut.input.payload, sample.item())
ctx.set(dut.input.valid, 1)
await ctx.tick().until(dut.input.ready)
ctx.set(dut.input.valid, 0)
if empty_cycles > 0:
ctx.set(dut.input.valid, 0)
await ctx.tick().repeat(empty_cycles)
ctx.set(dut.input.valid, 1)
ctx.set(dut.input.valid, 0)
filtered = []
async def output_process(ctx):
@@ -474,7 +472,14 @@ class _TestFilter(unittest.TestCase):
if num_channels > 1:
filtered.append([v.as_float() for v in payload])
else:
filtered.append(payload[0].as_float())
if isinstance(payload.shape(), data.ArrayLayout):
filtered.append(payload[0].as_float())
else:
filtered.append(payload.as_float())
if empty_ready_cycles > 0:
ctx.set(dut.output.ready, 0)
await ctx.tick().repeat(empty_ready_cycles)
ctx.set(dut.output.ready, 1)
if not dut.output.signature.always_ready:
ctx.set(dut.output.ready, 0)
@@ -505,100 +510,154 @@ class TestFIRFilter(_TestFilter):
filtered_np = np.convolve(input_samples, taps).tolist()
# Simulate DUT
dut = FIRFilter(taps, fixed.SQ(15, 0), always_ready=True)
filtered = self._filter(dut, input_samples, len(input_samples))
dut = FIRFilter(taps, shape=fixed.SQ(8, 0), always_ready=False)
filtered = self._filter(dut, input_samples, len(input_samples), empty_ready_cycles=5)
self.assertListEqual(filtered_np[:len(filtered)], filtered)
class TestHalfBandDecimator(_TestFilter):
def test_filter_no_backpressure(self):
taps = [-1, 0, 9, 16, 9, 0, -1]
taps = [ tap / 32 for tap in taps ]
def test_filter(self):
num_samples = 1024
input_width = 8
samples_i_in = self._generate_samples(num_samples, input_width, f_width=7)
samples_q_in = self._generate_samples(num_samples, input_width, f_width=7)
common_dut_options = dict(
data_shape=fixed.SQ(7),
shape_out=fixed.SQ(0,31),
)
# Compute the expected result
filtered_i_np = np.convolve(samples_i_in, taps)[1::2].tolist()
filtered_q_np = np.convolve(samples_q_in, taps)[1::2].tolist()
taps0 = (np.array([-1, 0, 9, 16, 9, 0, -1]) / 32).tolist()
taps1 = (np.array([-2, 0, 7, 0, -18, 0, 41, 0, -92, 0, 320, 512, 320, 0, -92, 0, 41, 0, -18, 0, 7, 0, -2]) / 1024).tolist()
# Simulate DUT
dut = HalfBandDecimator(taps, data_shape=fixed.SQ(7), shape_out=fixed.SQ(0,16), always_ready=True)
filtered = self._filter(dut, zip(samples_i_in, samples_q_in), len(samples_i_in) // 2, num_channels=2)
filtered_i = [ x[0] for x in filtered ]
filtered_q = [ x[1] for x in filtered ]
self.assertListEqual(filtered_i_np[:len(filtered_i)], filtered_i)
self.assertListEqual(filtered_q_np[:len(filtered_q)], filtered_q)
inputs = {
def test_filter_with_spare_cycles(self):
taps = [-1, 0, 9, 16, 9, 0, -1]
taps = [ tap / 32 for tap in taps ]
"test_filter_with_backpressure": {
"num_samples": 1024,
"dut_options": dict(**common_dut_options, always_ready=False, taps=taps0),
"sim_opts": dict(empty_cycles=0),
},
num_samples = 1024
input_width = 8
samples_i_in = self._generate_samples(num_samples, input_width, f_width=7)
samples_q_in = self._generate_samples(num_samples, input_width, f_width=7)
"test_filter_with_backpressure_and_empty_cycles": {
"num_samples": 1024,
"dut_options": dict(**common_dut_options, always_ready=False, taps=taps0),
"sim_opts": dict(empty_cycles=3),
},
# Compute the expected result
filtered_i_np = np.convolve(samples_i_in, taps)[1::2].tolist()
filtered_q_np = np.convolve(samples_q_in, taps)[1::2].tolist()
"test_filter_with_backpressure_taps1": {
"num_samples": 1024,
"dut_options": dict(**common_dut_options, always_ready=False, taps=taps1),
"sim_opts": dict(empty_cycles=0),
},
# Simulate DUT
dut = HalfBandDecimator(taps, data_shape=fixed.SQ(7), shape_out=fixed.SQ(0,16), always_ready=True)
filtered = self._filter(dut, zip(samples_i_in, samples_q_in), len(samples_i_in) // 2, num_channels=2, empty_cycles=3)
filtered_i = [ x[0] for x in filtered ]
filtered_q = [ x[1] for x in filtered ]
"test_filter_no_backpressure_and_empty_cycles_taps1": {
"num_samples": 1024,
"dut_options": dict(**common_dut_options, always_ready=True, taps=taps0),
"sim_opts": dict(empty_cycles=6),
},
self.assertListEqual(filtered_i_np[:len(filtered_i)], filtered_i)
self.assertListEqual(filtered_q_np[:len(filtered_q)], filtered_q)
"test_filter_no_backpressure": {
"num_samples": 1024,
"dut_options": dict(**common_dut_options, always_ready=True, taps=taps1),
"sim_opts": dict(empty_cycles=3),
},
}
for name, scenario in inputs.items():
def test_filter_with_backpressure(self):
taps = [-1, 0, 9, 16, 9, 0, -1]
taps = [ tap / 32 for tap in taps ]
with self.subTest(name):
taps = scenario["dut_options"]["taps"]
num_samples = scenario["num_samples"]
num_samples = 1024
input_width = 8
samples_i_in = self._generate_samples(num_samples, input_width, f_width=7)
samples_q_in = self._generate_samples(num_samples, input_width, f_width=7)
input_width = 8
samples_i_in = self._generate_samples(num_samples, input_width, f_width=7)
samples_q_in = self._generate_samples(num_samples, input_width, f_width=7)
# Compute the expected result
filtered_i_np = np.convolve(samples_i_in, taps)[1::2].tolist()
filtered_q_np = np.convolve(samples_q_in, taps)[1::2].tolist()
# Compute the expected result
filtered_i_np = np.convolve(samples_i_in, taps)[1::2].tolist()
filtered_q_np = np.convolve(samples_q_in, taps)[1::2].tolist()
# Simulate DUT
dut = HalfBandDecimator(taps, data_shape=fixed.SQ(7), shape_out=fixed.SQ(0,16), always_ready=False)
filtered = self._filter(dut, zip(samples_i_in, samples_q_in), len(samples_i_in) // 2, num_channels=2)
filtered_i = [ x[0] for x in filtered ]
filtered_q = [ x[1] for x in filtered ]
# Simulate DUT
dut = HalfBandDecimator(**scenario["dut_options"])
filtered = self._filter(dut, zip(samples_i_in, samples_q_in), len(samples_i_in) // 2, num_channels=2, **scenario["sim_opts"])
filtered_i = [ x[0] for x in filtered ]
filtered_q = [ x[1] for x in filtered ]
self.assertListEqual(filtered_i_np[:len(filtered_i)], filtered_i)
self.assertListEqual(filtered_q_np[:len(filtered_q)], filtered_q)
self.assertListEqual(filtered_i_np[:len(filtered_i)], filtered_i)
self.assertListEqual(filtered_q_np[:len(filtered_q)], filtered_q)
class TestHalfBandInterpolator(_TestFilter):
def test_filter(self):
taps = [-1, 0, 9, 16, 9, 0, -1]
taps = [ tap / 32 for tap in taps ]
num_samples = 1024
input_width = 8
input_samples = self._generate_samples(num_samples, input_width, f_width=7)
# Compute the expected result
input_samples_pad = np.zeros(2*len(input_samples))
input_samples_pad[0::2] = 2*input_samples # pad with zeros, adjust gain
filtered_np = np.convolve(input_samples_pad, taps).tolist()
common_dut_options = dict(
data_shape=fixed.SQ(7),
shape_out=fixed.SQ(1,16),
)
# Simulate DUT
dut = HalfBandInterpolator(taps, data_shape=fixed.SQ(0, 7), shape_out=fixed.SQ(0,16), always_ready=False)
filtered = self._filter(dut, input_samples, len(input_samples) * 2)
taps0 = (np.array([-1, 0, 9, 16, 9, 0, -1]) / 32).tolist()
taps1 = (np.array([-2, 0, 7, 0, -18, 0, 41, 0, -92, 0, 320, 512, 320, 0, -92, 0, 41, 0, -18, 0, 7, 0, -2]) / 1024).tolist()
self.assertListEqual(filtered_np[:len(filtered)], filtered)
inputs = {
"test_filter_with_backpressure": {
"num_samples": 1024,
"dut_options": dict(**common_dut_options, always_ready=False, num_channels=2, taps=taps1),
"sim_opts": dict(empty_cycles=0, empty_ready_cycles=0),
},
"test_filter_with_backpressure_and_empty_cycles": {
"num_samples": 1024,
"dut_options": dict(**common_dut_options, num_channels=2, always_ready=False, taps=taps0),
"sim_opts": dict(empty_ready_cycles=7, empty_cycles=3),
},
"test_filter_with_backpressure_taps1": {
"num_samples": 1024,
"dut_options": dict(**common_dut_options, num_channels=2, always_ready=False, taps=taps1),
"sim_opts": dict(empty_ready_cycles=7, empty_cycles=0),
},
"test_filter_no_backpressure_and_empty_cycles_taps1": {
"num_samples": 1024,
"dut_options": dict(**common_dut_options, num_channels=2, always_ready=True, taps=taps0),
"sim_opts": dict(empty_cycles=8),
},
"test_filter_no_backpressure": {
"num_samples": 1024,
"dut_options": dict(**common_dut_options, num_channels=2, always_ready=True, taps=taps1),
"sim_opts": dict(empty_cycles=16),
},
}
for name, scenario in inputs.items():
with self.subTest(name):
taps = scenario["dut_options"]["taps"]
num_samples = scenario["num_samples"]
input_width = 8
samples_i_in = self._generate_samples(num_samples, input_width, f_width=7)
samples_q_in = self._generate_samples(num_samples, input_width, f_width=7)
# Compute the expected result
input_samples_pad = np.zeros(2*len(samples_i_in))
input_samples_pad[0::2] = 2*samples_i_in # pad with zeros, adjust gain
filtered_i_np = np.convolve(input_samples_pad, taps).tolist()
input_samples_pad = np.zeros(2*len(samples_q_in))
input_samples_pad[0::2] = 2*samples_q_in # pad with zeros, adjust gain
filtered_q_np = np.convolve(input_samples_pad, taps).tolist()
# Simulate DUT
dut = HalfBandInterpolator(**scenario["dut_options"])
filtered = self._filter(dut, zip(samples_i_in, samples_q_in), len(samples_i_in) * 2, num_channels=2, **scenario["sim_opts"])
filtered_i = [ x[0] for x in filtered ]
filtered_q = [ x[1] for x in filtered ]
self.assertListEqual(filtered_i_np[:len(filtered_i)], filtered_i)
self.assertListEqual(filtered_q_np[:len(filtered_q)], filtered_q)
if __name__ == "__main__":
unittest.main()
unittest.main()

View File

@@ -7,7 +7,7 @@
from math import ceil, log2
from amaranth import Module, Signal, Mux, DomainRenamer, ClockSignal, signed
from amaranth.lib import wiring, stream, data, memory
from amaranth.lib import wiring, stream, data, memory, fifo
from amaranth.lib.wiring import In, Out
from amaranth.utils import bits_for
@@ -58,7 +58,7 @@ class HalfBandDecimatorMAC16(wiring.Component):
if not self.input.signature.always_ready:
m.d.comb += self.input.ready.eq(~odd | fir.input.ready)
m.d.comb += dly.output.ready.eq(1)
m.d.comb += dly.output.ready.eq(fir.input.ready)
m.d.comb += [
dly.input.p.eq(self.input.p),
@@ -126,30 +126,43 @@ class HalfBandInterpolatorMAC16(wiring.Component):
taps = [ 2 * tap for tap in self.taps ]
arm0_taps = taps[0::2]
arm1_taps = taps[1::2]
delay = arm1_taps.index(1)
# Arms
m.submodules.fir = fir = FIRFilterMAC16(arm0_taps, shape=self.data_shape, shape_out=self.shape_out, overclock_rate=self.overclock_rate, always_ready=always_ready, num_channels=self.num_channels, delayed_port=True)
m.submodules.fir = fir = FIRFilterMAC16(arm0_taps, shape=self.data_shape, shape_out=self.shape_out, overclock_rate=self.overclock_rate, always_ready=always_ready, num_channels=self.num_channels)
m.submodules.dly = dly = Delay(delay, shape=self.data_shape, always_ready=always_ready, num_channels=self.num_channels)
m.submodules.dly_fifo = dly_fifo = fifo.SyncFIFOBuffered(width=self.num_channels*self.data_shape.as_shape().width, depth=self.overclock_rate+1)
m.d.comb += [
dly_fifo.w_data.eq(dly.output.p),
dly_fifo.w_en.eq(dly.output.valid),
]
if not dly.output.signature.always_ready:
m.d.comb += dly.output.ready.eq(dly_fifo.w_rdy)
busy = Signal()
with m.If(fir.input.valid & fir.input.ready):
m.d.sync += busy.eq(1)
#busy = Signal()
#with m.If(fir.input.valid & fir.input.ready):
# m.d.sync += busy.eq(1)
# Input
m.d.comb += fir.input.payload.eq(self.input.payload)
m.d.comb += fir.input.valid.eq(self.input.valid & ~busy)
m.d.comb += fir.input.valid.eq(self.input.valid & dly.input.ready)
m.d.comb += dly.input.payload.eq(self.input.payload)
m.d.comb += dly.input.valid.eq(self.input.valid & fir.input.ready)
if not self.input.signature.always_ready:
m.d.comb += self.input.ready.eq(fir.input.ready & ~busy)
m.d.comb += self.input.ready.eq(fir.input.ready & dly.input.ready)
# Output
# Arm index selection: switch after every delivered sample
arm_index = Signal()
delayed = Signal.like(fir.input_delayed)
with m.If(fir.output.valid & fir.output.ready):
m.d.sync += delayed.eq(fir.input_delayed)
#delayed = Signal.like(fir.input_delayed)
#with m.If(fir.output.valid & fir.output.ready):
# m.d.sync += delayed.eq(fir.input_delayed)
r_data_cast = data.ArrayLayout(self.data_shape, self.num_channels)(dly_fifo.r_data)
with m.If(~self.output.valid | self.output.ready):
with m.Switch(arm_index):
@@ -163,10 +176,11 @@ class HalfBandInterpolatorMAC16(wiring.Component):
m.d.sync += arm_index.eq(1)
with m.Case(1):
for c in range(self.num_channels):
m.d.sync += self.output.payload[c].eq(delayed[c])
m.d.sync += self.output.valid.eq(1)
m.d.sync += arm_index.eq(0)
m.d.sync += busy.eq(0)
m.d.sync += self.output.payload[c].eq(r_data_cast[c])
m.d.sync += self.output.valid.eq(dly_fifo.r_rdy)
m.d.comb += dly_fifo.r_en.eq(1)
with m.If(dly_fifo.r_rdy):
m.d.sync += arm_index.eq(0)
if self._domain != "sync":
m = DomainRenamer(self._domain)(m)
@@ -208,11 +222,12 @@ class FIRFilterMAC16(wiring.Component):
})
super().__init__(signature)
def taps_shape(self):
taps_as_ratios = [tap.as_integer_ratio() for tap in self.taps]
def taps_shape(self, taps=None):
taps = taps or self.taps
taps_as_ratios = [tap.as_integer_ratio() for tap in taps]
f_width = bits_for(max(tap[1] for tap in taps_as_ratios)) - 1
i_width = max(0, bits_for(max(abs(tap[0]) for tap in taps_as_ratios)) - f_width)
return fixed.Shape(i_width, f_width, signed=any(tap < 0 for tap in self.taps))
return fixed.Shape(i_width, f_width, signed=any(tap < 0 for tap in taps))
def compute_output_shape(self):
taps_shape = self.taps_shape()
@@ -229,101 +244,105 @@ class FIRFilterMAC16(wiring.Component):
def elaborate(self, platform):
m = Module()
# Build filter out of FIRFilterSerialMAC16 blocks.
# Build filter out of SerialMAC16 blocks.
overclock_factor = self.overclock_rate
# Symmetric coefficients special case.
symmetric = (self.taps == self.taps[::-1])
# Even-symmetric case. (N=2*K)
# Odd-symmetric case. (N=2*K+1)
if symmetric:
taps = self.taps[:ceil(len(self.taps)/2)]
odd_symmetric = ((len(self.taps) % 2) == 1)
else:
taps = self.taps
dsp_block_count = ceil(len(taps) / overclock_factor)
def pipe(signal, length):
name = signal.name if hasattr(signal, "name") else "signal"
pipe = [ signal ] + [ Signal.like(signal, name=f"{name}_q{i}") for i in range(length) ]
for i in range(length):
m.d.sync += pipe[i+1].eq(pipe[i])
return pipe
taps = self.taps
if self.carry is not None:
sum_carry_q = Signal.like(self.sum_carry)
with m.If(self.input.valid & self.input.ready):
m.d.sync += sum_carry_q.eq(self.sum_carry)
filters_ready = Signal()
window_valid = Signal()
input_ready = Signal()
m.d.comb += input_ready.eq(~window_valid | filters_ready)
if not self.input.signature.always_ready:
m.d.comb += self.input.ready.eq(input_ready)
# Samples window.
window = [ Signal.like(self.input.p, name=f"window_{i}") for i in range(len(self.taps)) ]
with m.If(input_ready):
m.d.sync += window_valid.eq(self.input.valid)
with m.If(self.input.valid):
m.d.sync += window[0].eq(self.input.p)
for i in range(1, len(window)):
m.d.sync += window[i].eq(window[i-1])
if self.carry is not None:
m.d.sync += sum_carry_q.eq(self.sum_carry)
# When filter is symmetric, presum samples to obtain a smaller window.
symmetric = (self.taps == self.taps[::-1])
if symmetric:
sum_shape = (self.input.p[0] + self.input.p[0]).shape()
odd_symmetric = ((len(self.taps) % 2) == 1)
new_len = len(self.taps) // 2 + odd_symmetric
new_window = [ Signal(data.ArrayLayout(sum_shape, self.num_channels), name=f"window_sym_{i}") for i in range(new_len) ]
for i in range(len(new_window) - odd_symmetric):
for c in range(self.num_channels):
m.d.comb += new_window[i][c].eq(window[i][c] + window[-i-1][c])
if odd_symmetric:
for c in range(self.num_channels):
m.d.comb += new_window[-1][c].eq(window[len(self.taps)//2][c])
window = new_window
taps = self.taps[:ceil(len(self.taps)/2)]
samples_shape = sum_shape
else:
samples_shape = self.shape
# Build filter out of SerialMAC16 blocks: each one multiplies and
# accumulates `overclock_factor` taps serially.
dsp_block_count = ceil(len(taps) / overclock_factor)
# If we have multiple subfilters, make them all the same size.
if dsp_block_count > 1 and len(taps) % overclock_factor != 0:
taps = taps + [0]*(overclock_factor - (len(taps)%overclock_factor))
for c in range(self.num_channels):
last = self.input
dsp_blocks = []
for i in range(dsp_block_count):
taps_slice = taps[i*overclock_factor:(i+1)*overclock_factor]
input_delayed = len(taps_slice)
carry = last.output.p.shape() if i > 0 else self.carry
window_slice = window[i*overclock_factor:(i+1)*overclock_factor]
carry = None if i > 0 else self.carry
if (i == dsp_block_count-1) and symmetric and odd_symmetric:
taps_slice[-1] /= 2
input_delayed -= 1
dsp = FIRFilterSerialMAC16(taps=taps_slice, shape=self.shape, taps_shape=self.taps_shape(), carry=carry, symmetry=symmetric,
input_delayed_cycles=input_delayed, always_ready=self.always_ready)
dsp = SerialMAC16(taps=taps_slice, shape=samples_shape, taps_shape=self.taps_shape(taps), carry=carry, always_ready=self.always_ready)
dsp_blocks.append(dsp)
for j in range(len(window_slice)):
m.d.comb += dsp.input.p[j].eq(window_slice[j][c])
m.d.comb += dsp.input.valid.eq(window_valid)
if i == 0:
m.d.comb += [
dsp.input.p .eq(self.input.p[c]),
dsp.input.valid .eq(self.input.valid & self.input.ready),
]
if not self.input.signature.always_ready:
m.d.comb += self.input.ready.eq(dsp.input.ready)
m.d.comb += filters_ready.eq(dsp.input.ready)
if self.carry is not None:
m.d.comb += dsp.sum_carry.eq(sum_carry_q[c])
else:
m.d.comb += [
dsp.input.p .eq(pipe(last.input_delayed, last.delay())[-1]),
dsp.input.valid .eq(last.output.valid),
dsp.sum_carry .eq(last.output.p),
]
if not last.output.signature.always_ready:
m.d.comb += last.output.ready.eq(dsp.input.ready)
last = dsp
if self.delayed_port:
m.d.comb += self.input_delayed[c].eq(last.input_delayed)
if symmetric:
for i in reversed(range(dsp_block_count)):
end_block = (i == dsp_block_count-1)
m.d.comb += [
dsp_blocks[i].rev_input .eq(dsp_blocks[i+1].rev_delayed if not end_block else dsp_blocks[i].input_delayed),
]
m.submodules += dsp_blocks
m.d.comb += [
self.output.payload[c] .eq(last.output.p),
self.output.valid .eq(last.output.valid),
]
if not last.output.signature.always_ready:
m.d.comb += last.output.ready.eq(self.output.ready)
# Adder tree for channel c
if dsp_block_count > 1:
with m.If(~self.output.valid | self.output.ready):
for i in range(dsp_block_count):
if not dsp_blocks[i].output.signature.always_ready:
m.d.comb += dsp_blocks[i].output.ready.eq(1)
m.d.sync += self.output.valid.eq(dsp_blocks[0].output.valid)
with m.If(dsp_blocks[0].output.valid):
m.d.sync += self.output.payload[c] .eq(sum(dsp_blocks[i].output.p for i in range(dsp_block_count)))
else:
m.d.comb += self.output.payload[c].eq(dsp_blocks[0].output.p)
m.d.comb += self.output.valid.eq(dsp_blocks[0].output.valid)
if not dsp_blocks[0].output.signature.always_ready:
m.d.comb += dsp_blocks[0].output.ready.eq(self.output.ready)
return m
class FIRFilterSerialMAC16(wiring.Component):
class SerialMAC16(wiring.Component):
def __init__(self, taps, shape, shape_out=None, taps_shape=None, carry=None, symmetry=False, input_delayed_cycles=None, always_ready=False):
assert shape.as_shape().width <= 16, "DSP slice inputs have a maximum width of 16 bit."
def __init__(self, taps, shape, shape_out=None, taps_shape=None, carry=None, always_ready=False):
assert shape.as_shape().width <= 16, f"DSP slice inputs have a maximum width of 16 bit. {shape} {shape.as_shape().width}"
self.carry = carry
self.taps = list(taps)
@@ -333,15 +352,8 @@ class FIRFilterSerialMAC16(wiring.Component):
shape_out = self.compute_output_shape()
self.shape_out = shape_out
self.always_ready = always_ready
self.symmetry = symmetry
if input_delayed_cycles is None:
self.input_delayed_cycles = len(self.taps)
else:
self.input_delayed_cycles = input_delayed_cycles
signature = {
"input": In(stream.Signature(shape, always_ready=always_ready)),
"input_delayed": Out(shape),
"input": In(stream.Signature(data.ArrayLayout(shape, len(taps)), always_ready=always_ready)),
"output": Out(stream.Signature(shape_out, always_ready=always_ready)),
}
if carry is not None:
@@ -350,11 +362,6 @@ class FIRFilterSerialMAC16(wiring.Component):
})
else:
self.sum_carry = 0
if symmetry:
signature.update({
"rev_input": In(shape),
"rev_delayed": Out(shape),
})
super().__init__(signature)
def taps_shape(self):
@@ -375,72 +382,36 @@ class FIRFilterSerialMAC16(wiring.Component):
shape_out = fixed.Shape(i_width, f_width, signed=signed)
return shape_out
def delay(self):
return 1 + 1 + 3 + len(self.taps) - 1
def elaborate(self, platform):
m = Module()
depth = len(self.taps)
counter_in = Signal(range(depth))
counter_mult = Signal(range(depth))
counter_out = Signal(range(depth))
dsp_ready = ~self.output.valid | self.output.ready
window_valid = Signal()
window_ready = dsp_ready
dsp_ready = Signal()
multin_valid = Signal()
input_ready = Signal()
# Ready to process a sample either when the DSP slice is ready and the samples window is:
# - Not valid yet.
# - Only valid for 1 more cycle.
m.d.comb += input_ready.eq(~window_valid | ((counter_in == depth-1) & window_ready))
m.d.comb += input_ready.eq((counter_in == depth-1) & dsp_ready)
if not self.input.signature.always_ready:
m.d.comb += self.input.ready.eq(input_ready)
window = [ Signal.like(self.input.p, name=f"window_{i}") for i in range(max(depth, self.input_delayed_cycles)) ]
# Sample window.
with m.If(input_ready):
m.d.sync += window_valid.eq(self.input.valid)
with m.If(self.input.valid):
m.d.sync += window[0].eq(self.input.p)
for i in range(1, len(window)):
m.d.sync += window[i].eq(window[i-1])
m.d.sync += multin_valid.eq(window_valid)
dsp_a = Signal.like(self.input.p)
with m.Switch(counter_in):
for i in range(depth):
with m.Case(i):
m.d.sync += dsp_a.eq(window[i])
m.d.comb += self.input_delayed.eq(window[self.input_delayed_cycles-1])
# Sample counter.
with m.If(window_ready & window_valid):
with m.If((self.input.valid | (counter_in != 0)) & dsp_ready):
m.d.sync += counter_in.eq(_incr(counter_in, depth))
# Symmetry handling.
if self.symmetry:
with m.If(dsp_ready):
m.d.sync += multin_valid.eq(self.input.valid | (counter_in != 0))
window_rev = [ Signal.like(self.input.p, name=f"window_rev_{i}") for i in range(depth) ]
with m.If(input_ready & self.input.valid):
m.d.sync += window_rev[0].eq(self.rev_input)
m.d.sync += [ window_rev[i].eq(window_rev[i-1]) for i in range(1, len(window_rev)) ]
m.d.comb += self.rev_delayed.eq(window_rev[-1])
dsp_a_rev = Signal.like(self.input.p)
# Select sample from window.
dsp_a = Signal(self.shape)
with m.If(dsp_ready):
with m.Switch(counter_in):
for i in range(depth):
with m.Case(i):
m.d.sync += dsp_a_rev.eq(window_rev[depth-1-i])
m.d.sync += dsp_a.eq(self.input.p[i])
# Coefficient ROM.
taps_shape = self.taps_shape
@@ -453,33 +424,38 @@ class FIRFilterSerialMAC16(wiring.Component):
m.submodules.coeff_rom = coeff_rom = memory.Memory(data=coeff_data)
coeff_rd = coeff_rom.read_port(domain="sync")
m.d.comb += coeff_rd.addr.eq(counter_in)
m.d.comb += coeff_rd.en.eq(dsp_ready)
shape_out = self.compute_output_shape()
if self.carry:
sum_carry_q = Signal.like(self.sum_carry)
with m.If(self.input.ready & self.input.valid):
with m.If(input_ready):
m.d.sync += sum_carry_q.eq(self.sum_carry)
m.submodules.dsp = dsp = iCE40Multiplier()
if self.symmetry:
m.d.comb += dsp.a.eq(dsp_a + dsp_a_rev)
else:
m.d.comb += dsp.a.eq(dsp_a)
m.submodules.dsp = dsp = iCE40Multiplier(
o_width=shape_out.as_shape().width,
always_ready=self.always_ready)
valid_cnt = Signal(depth, init=1)
mult_cnt = Signal(depth, init=1)
m.d.comb += [
dsp.a .eq(dsp_a),
dsp.b .eq(coeff_rd.data),
shape_out(dsp.p) .eq(sum_carry_q if self.carry is not None else 0),
dsp.valid_in .eq(multin_valid & window_ready),
dsp.p_load .eq(counter_mult == 0),
dsp.valid_in .eq(multin_valid),
dsp_ready .eq(dsp.ready_in),
dsp.p_load .eq(mult_cnt[0]),
self.output.p .eq(shape_out(dsp.o)),
self.output.valid .eq(dsp.valid_out & (counter_out == depth-1)),
self.output.valid .eq(dsp.valid_out & valid_cnt[-1]),
dsp.ready_out .eq(self.output.ready | ~valid_cnt[-1]),
]
# Multiplier input and output counters.
with m.If(dsp.valid_in):
m.d.sync += counter_mult.eq(_incr(counter_mult, depth))
with m.If(dsp.valid_out):
m.d.sync += counter_out.eq(_incr(counter_out, depth))
with m.If(dsp.valid_in & dsp.ready_in):
m.d.sync += mult_cnt.eq(mult_cnt.rotate_left(1))
with m.If(dsp.valid_out & dsp.ready_out):
m.d.sync += valid_cnt.eq(valid_cnt.rotate_left(1))
return m
@@ -487,15 +463,20 @@ class FIRFilterSerialMAC16(wiring.Component):
class iCE40Multiplier(wiring.Component):
a: In(signed(16))
b: In(signed(16))
valid_in: In(1)
p: In(signed(32))
p_load: In(1)
o: Out(signed(32))
valid_out: Out(1)
def __init__(self, a_width=16, b_width=16, p_width=32, o_width=32, always_ready=False):
super().__init__({
"a": In(signed(a_width)),
"b": In(signed(b_width)),
"valid_in": In(1),
"ready_in": In(1),
"p": In(signed(p_width)),
"p_load": In(1),
"o": Out(signed(o_width)),
"valid_out": Out(1),
"ready_out": In(1),
})
self.always_ready = always_ready
self.o_width = o_width
def elaborate(self, platform):
m = Module()
@@ -507,13 +488,20 @@ class iCE40Multiplier(wiring.Component):
return pipe
p_load_v = Signal()
valid_v = Signal()
m.d.comb += valid_v.eq(self.valid_in & self.ready_in)
dsp_delay = 3
valid_pipe = pipe(self.valid_in, dsp_delay)
m.d.comb += p_load_v.eq(self.p_load & self.valid_in)
valid_pipe = pipe(valid_v, dsp_delay)
m.d.comb += p_load_v.eq(self.p_load & valid_v)
p_pipe = pipe(self.p, dsp_delay-1)
p_load_pipe = pipe(p_load_v, dsp_delay - 1)
m.d.comb += self.valid_out.eq(valid_pipe[dsp_delay])
# skid buffer
if not self.always_ready:
m.submodules.out_fifo = out_fifo = fifo.SyncFIFOBuffered(width=self.o_width, depth=dsp_delay+2)
m.d.comb += self.ready_in.eq(~self.valid_out | self.ready_out)
m.submodules.sb_mac16 = mac = SB_MAC16(
C_REG=0,
@@ -541,10 +529,10 @@ class iCE40Multiplier(wiring.Component):
# Inputs.
mac.CLK .eq(ClockSignal("sync")),
mac.CE .eq(1),
mac.C .eq(Mux(p_load_pipe[2], p_pipe[2][16:], self.o[16:])),
mac.A .eq(self.a),
mac.B .eq(self.b),
mac.D .eq(Mux(p_load_pipe[2], p_pipe[2][:16], self.o[:16])),
mac.C.as_signed().eq(Mux(p_load_pipe[2], p_pipe[2][16:], mac.O[16:])),
mac.A.as_signed().eq(self.a),
mac.B.as_signed().eq(self.b),
mac.D.as_signed().eq(Mux(p_load_pipe[2], p_pipe[2][:16], mac.O[:16])),
mac.AHOLD .eq(~valid_pipe[0]), # 0: load
mac.BHOLD .eq(~valid_pipe[0]),
mac.CHOLD .eq(0),
@@ -555,11 +543,23 @@ class iCE40Multiplier(wiring.Component):
mac.ADDSUBBOT .eq(0),
mac.OLOADTOP .eq(0),
mac.OLOADBOT .eq(0),
# Outputs.
self.o .eq(mac.O),
]
if not self.always_ready:
m.d.comb += [
out_fifo.w_data.eq(mac.O),
out_fifo.w_en.eq(valid_pipe[dsp_delay]),
self.o.eq(out_fifo.r_data),
self.valid_out.eq(out_fifo.r_rdy),
out_fifo.r_en.eq(self.ready_out),
]
else:
m.d.comb += [
self.o.eq(mac.O),
self.valid_out.eq(valid_pipe[dsp_delay]),
]
return m
@@ -593,7 +593,7 @@ class _TestFilter(unittest.TestCase):
return samples / (1 << f_width)
return samples
def _filter(self, dut, samples, count, num_channels=1, outfile=None, empty_cycles=0):
def _filter(self, dut, samples, count, num_channels=1, outfile=None, empty_cycles=0, empty_ready_cycles=0):
async def input_process(ctx):
if hasattr(dut, "enable"):
@@ -627,6 +627,10 @@ class _TestFilter(unittest.TestCase):
filtered.append(payload[0].as_float())
else:
filtered.append(payload.as_float())
if empty_ready_cycles > 0:
ctx.set(dut.output.ready, 0)
await ctx.tick().repeat(empty_ready_cycles)
ctx.set(dut.output.ready, 1)
if not dut.output.signature.always_ready:
ctx.set(dut.output.ready, 0)
@@ -645,23 +649,6 @@ class _TestFilter(unittest.TestCase):
class TestFIRFilterMAC16(_TestFilter):
def test_filter_serial(self):
taps = [-1, 0, 9, 16, 9, 0, -1]
taps = [ tap / 32 for tap in taps ]
num_samples = 1024
input_width = 8
input_samples = self._generate_samples(num_samples, input_width)
# Compute the expected result
filtered_np = np.convolve(input_samples, taps).tolist()
# Simulate DUT
dut = FIRFilterSerialMAC16(taps, fixed.SQ(15, 0), always_ready=False)
filtered = self._filter(dut, input_samples, len(input_samples))
self.assertListEqual(filtered_np[:len(filtered)], filtered)
def test_filter(self):
taps = [-1, 0, 9, 16, 9, 0, -1]
taps = [ tap / 32 for tap in taps ]
@@ -674,8 +661,8 @@ class TestFIRFilterMAC16(_TestFilter):
filtered_np = np.convolve(input_samples, taps).tolist()
# Simulate DUT
dut = FIRFilterMAC16(taps, fixed.SQ(15, 0), always_ready=False)
filtered = self._filter(dut, input_samples, len(input_samples))
dut = FIRFilterMAC16(taps, shape=fixed.SQ(8, 0), always_ready=False)
filtered = self._filter(dut, input_samples, len(input_samples), empty_ready_cycles=5)
self.assertListEqual(filtered_np[:len(filtered)], filtered)
@@ -717,7 +704,7 @@ class TestHalfBandDecimatorMAC16(_TestFilter):
"test_filter_no_backpressure_and_empty_cycles_taps1": {
"num_samples": 1024,
"dut_options": dict(**common_dut_options, always_ready=True, taps=taps0),
"sim_opts": dict(empty_cycles=3),
"sim_opts": dict(empty_cycles=6),
},
"test_filter_no_backpressure": {
@@ -768,20 +755,20 @@ class TestHalfBandInterpolatorMAC16(_TestFilter):
"test_filter_with_backpressure": {
"num_samples": 1024,
"dut_options": dict(**common_dut_options, always_ready=False, num_channels=2, taps=taps0),
"sim_opts": dict(empty_cycles=0),
"dut_options": dict(**common_dut_options, always_ready=False, num_channels=2, taps=taps1),
"sim_opts": dict(empty_cycles=0, empty_ready_cycles=0),
},
"test_filter_with_backpressure_and_empty_cycles": {
"num_samples": 1024,
"dut_options": dict(**common_dut_options, num_channels=2, always_ready=False, taps=taps0),
"sim_opts": dict(empty_cycles=3),
"sim_opts": dict(empty_ready_cycles=7, empty_cycles=3),
},
"test_filter_with_backpressure_taps1": {
"num_samples": 1024,
"dut_options": dict(**common_dut_options, num_channels=2, always_ready=False, taps=taps1),
"sim_opts": dict(empty_cycles=0),
"sim_opts": dict(empty_ready_cycles=7, empty_cycles=0),
},
"test_filter_no_backpressure_and_empty_cycles_taps1": {

View File

@@ -1 +1,3 @@
from .max586x import MAX586xInterface
from .max586x import MAX586xInterface
from .spi import SPIRegisterInterface
from .sgpio import SGPIOInterface

View File

@@ -9,13 +9,11 @@ from amaranth.lib.wiring import Out, In
from util import IQSample
class MAX586xInterface(wiring.Component):
adc_stream: Out(stream.Signature(IQSample(8), always_ready=True))
dac_stream: In(stream.Signature(IQSample(8), always_ready=True))
adc_capture: In(1)
dac_capture: In(1)
q_invert: In(1)
class MAX586xInterface(wiring.Component):
adc_stream: Out(stream.Signature(IQSample(8), always_ready=True, always_valid=True))
dac_stream: In(stream.Signature(IQSample(8), always_ready=True))
q_invert: In(1)
def __init__(self, bb_domain):
super().__init__()
@@ -47,10 +45,9 @@ class MAX586xInterface(wiring.Component):
m.d.comb += [
adc_stream.p.i .eq(adc_in.i[0] ^ 0x80), # I: non-inverted between MAX2837 and MAX5864.
adc_stream.p.q .eq(adc_in.i[1] ^ rx_q_mask), # Q: inverted between MAX2837 and MAX5864.
adc_stream.valid .eq(self.adc_capture),
]
# Output the transformed data to the DAC using a DDR output buffer.
# Output to the DAC using a DDR output buffer.
m.submodules.dac_out = dac_out = io.DDRBuffer("o", platform.request("dd", dir="-"), o_domain=self._bb_domain)
with m.If(dac_stream.valid):
m.d.comb += [

View File

@@ -0,0 +1,202 @@
#
# This file is part of HackRF.
#
# Copyright (c) 2025 Great Scott Gadgets <info@greatscottgadgets.com>
# SPDX-License-Identifier: BSD-3-Clause
from amaranth import Module, Signal, DomainRenamer, EnableInserter, ClockSignal, Instance
from amaranth.lib import io, fifo, stream, wiring, cdc
from amaranth.lib.wiring import Out, In
from util import LinearFeedbackShiftRegister
class SGPIOInterface(wiring.Component):
def __init__(self, sample_width=8, rx_assignments=None, tx_assignments=None, domain="sync"):
self.sample_width = sample_width
if rx_assignments is None:
rx_assignments = _default_rx_assignments(sample_width // 8)
if tx_assignments is None:
tx_assignments = _default_tx_assignments(sample_width // 8)
self.rx_assignments = rx_assignments
self.tx_assignments = tx_assignments
self._domain = domain
super().__init__({
"adc_stream": In(stream.Signature(sample_width, always_ready=True)),
"dac_stream": Out(stream.Signature(sample_width)),
"trigger_en": In(1),
"prbs": In(1),
})
def elaborate(self, platform):
m = Module()
adc_stream = self.adc_stream
dac_stream = self.dac_stream
rx_cycles = len(self.rx_assignments)
tx_cycles = len(self.tx_assignments)
direction_i = platform.request("direction").i
enable_i = ~platform.request("disable").i
capture_en = platform.request("capture_en").o
m.d.comb += capture_en.eq(1)
# Determine data transfer direction.
direction = Signal()
m.submodules.direction_cdc = cdc.FFSynchronizer(direction_i, direction, o_domain=self._domain)
transfer_from_adc = (direction == 0)
# SGPIO clock and data lines.
tx_clk_en = Signal()
rx_clk_en = Signal()
data_to_host = Signal(self.sample_width)
byte_to_host = Signal(8)
data_from_host = Signal(self.sample_width)
byte_from_host = Signal(8)
m.submodules.clk_out = clk_out = io.DDRBuffer("o", platform.request("host_clk", dir="-"), o_domain=self._domain)
m.submodules.host_io = host_io = io.DDRBuffer('io', platform.request("host_data", dir="-"), i_domain=self._domain, o_domain=self._domain)
m.d.sync += clk_out.o[0].eq(tx_clk_en)
m.d.sync += clk_out.o[1].eq(rx_clk_en)
m.d.sync += host_io.oe.eq(transfer_from_adc)
m.d.comb += host_io.o[0].eq(byte_to_host)
m.d.comb += host_io.o[1].eq(byte_to_host)
m.d.comb += byte_from_host.eq(host_io.i[1])
# Transmission is handled differently to account for the latency before the data
# becomes available in the FPGA fabric.
ddr_in_latency = 2 # for iCE40 DDR inputs in Amaranth.
tx_write_latency = tx_cycles + ddr_in_latency
tx_write_pipe = Signal(tx_write_latency)
m.d.sync += tx_write_pipe.eq(tx_write_pipe << 1)
for i in range(tx_cycles-1): # don't store last byte
with m.If(tx_write_pipe[ddr_in_latency + i]):
m.d.sync += self.tx_assignments[i](data_from_host, byte_from_host)
# Small TX FIFO to avoid missing samples when the consumer deasserts its ready
# signal and transfers are in progress.
m.submodules.tx_fifo = tx_fifo = fifo.SyncFIFOBuffered(width=self.sample_width, depth=16)
m.d.comb += [
tx_fifo.w_data .eq(data_from_host),
self.tx_assignments[-1](tx_fifo.w_data, byte_from_host),
tx_fifo.w_en .eq(tx_write_pipe[-1]),
dac_stream.p .eq(tx_fifo.r_data),
dac_stream.valid .eq(tx_fifo.r_rdy),
tx_fifo.r_en .eq(dac_stream.ready),
]
# Pseudo-random binary sequence generator.
prbs_advance = Signal()
prbs_count = Signal(2)
m.submodules.prbs = prbs = EnableInserter(prbs_advance)(
LinearFeedbackShiftRegister(degree=8, taps=[8,6,5,4], init=0b10110001))
# Capture signal generation.
capture = Signal()
m.submodules.trigger_gen = trigger_gen = FlowAndTriggerControl(domain=self._domain)
m.d.comb += [
trigger_gen.enable.eq(enable_i),
trigger_gen.trigger_en.eq(self.trigger_en),
capture.eq(trigger_gen.capture),
]
# Main state machine.
with m.FSM():
with m.State("IDLE"):
with m.If(transfer_from_adc):
with m.If(self.prbs):
m.next = "PRBS"
with m.Elif(adc_stream.valid & capture):
m.d.comb += rx_clk_en.eq(1)
m.d.sync += data_to_host.eq(adc_stream.p)
m.d.sync += byte_to_host.eq(self.rx_assignments[0](adc_stream.p))
if rx_cycles > 1:
m.next = "RX0"
with m.Else():
with m.If(dac_stream.ready & capture):
m.d.comb += tx_clk_en.eq(1)
m.d.sync += tx_write_pipe[0].eq(capture)
if tx_cycles > 1:
m.next = "TX0"
for i in range(rx_cycles-1):
with m.State(f"RX{i}"):
m.d.comb += rx_clk_en.eq(1)
m.d.sync += byte_to_host.eq(self.rx_assignments[i+1](data_to_host))
m.next = "IDLE" if i == rx_cycles-2 else f"RX{i+1}"
for i in range(tx_cycles-1):
with m.State(f"TX{i}"):
m.d.comb += tx_clk_en.eq(1)
m.next = "IDLE" if i == tx_cycles-2 else f"TX{i+1}"
with m.State("PRBS"):
m.d.comb += rx_clk_en.eq(prbs_count == 0)
m.d.comb += prbs_advance.eq(prbs_count == 0)
m.d.sync += byte_to_host.eq(prbs.value)
m.d.sync += prbs_count.eq(prbs_count + 1)
with m.If(~self.prbs):
m.next = "IDLE"
# Convert to other clock domain if necessary.
if self._domain != "sync":
m = DomainRenamer(self._domain)(m)
return m
def _default_rx_assignments(n):
def rx_assignment(i):
def _f(w):
return w.word_select(i, 8)
return _f
return [ rx_assignment(i) for i in range(n) ]
def _default_tx_assignments(n):
def tx_assignment(i):
def _f(w, v):
return w.word_select(i, 8).eq(v)
return _f
return [ tx_assignment(i) for i in range(n) ]
class FlowAndTriggerControl(wiring.Component):
trigger_en: In(1)
enable: In(1)
capture: Out(1)
def __init__(self, domain):
super().__init__()
self._domain = domain
def elaborate(self, platform):
m = Module()
#
# Signal synchronization and trigger logic.
#
trigger_enable = self.trigger_en
trigger_in = platform.request("trigger_in").i
trigger_out = platform.request("trigger_out").o
m.d.comb += trigger_out.eq(self.enable)
# Create a latch for the trigger input signal using a special FPGA primitive.
trigger_in_latched = Signal()
trigger_in_reg = Instance("SB_DFFES",
i_D = 0,
i_S = trigger_in, # async set
i_E = ~self.enable,
i_C = ClockSignal(self._domain),
o_Q = trigger_in_latched
)
m.submodules.trigger_in_reg = trigger_in_reg
# Export signal for capture gating.
m.d[self._domain] += self.capture.eq(self.enable & (trigger_in_latched | ~trigger_enable))
return m

View File

@@ -1,3 +1,4 @@
amaranth==v0.5.8
amaranth-boards @ git+https://github.com/amaranth-lang/amaranth-boards.git@23c66d6
lz4
numpy

View File

@@ -4,15 +4,13 @@
# Copyright (c) 2025 Great Scott Gadgets <info@greatscottgadgets.com>
# SPDX-License-Identifier: BSD-3-Clause
from amaranth import Elaboratable, Module, Signal, Mux, Instance, Cat, ClockSignal, DomainRenamer
from amaranth.lib import io, fifo, stream, wiring
from amaranth.lib.wiring import Out, In, connect
from amaranth import Elaboratable, Module, Cat, DomainRenamer
from amaranth.lib.wiring import connect
from amaranth_future import fixed
from board import PralinePlatform, ClockDomainGenerator
from interface import MAX586xInterface
from interface.spi import SPIRegisterInterface
from interface import MAX586xInterface, SGPIOInterface, SPIRegisterInterface
from dsp.fir import FIRFilter
from dsp.fir_mac16 import HalfBandDecimatorMAC16
from dsp.cic import CICDecimator
@@ -21,119 +19,6 @@ from dsp.quarter_shift import QuarterShift
from util import ClockConverter, IQSample
class MCUInterface(wiring.Component):
adc_stream: In(stream.Signature(IQSample(12), always_ready=True))
direction: In(1)
enable: In(1)
def __init__(self, domain="sync"):
self._domain = domain
super().__init__()
def elaborate(self, platform):
m = Module()
adc_stream = self.adc_stream
# Determine data transfer direction.
direction = Signal()
enable = Signal()
m.d.sync += enable.eq(self.enable)
m.d.sync += direction.eq(self.direction)
transfer_from_adc = (direction == 0)
# SGPIO clock and data lines.
m.submodules.clk_out = clk_out = io.DDRBuffer("o", platform.request("host_clk", dir="-"), o_domain=self._domain)
m.submodules.host_io = host_io = io.DDRBuffer('io', platform.request("host_data", dir="-"), i_domain=self._domain, o_domain=self._domain)
# State machine to control SGPIO clock and data lines.
rx_clk_en = Signal()
m.d.sync += clk_out.o[1].eq(rx_clk_en)
m.d.sync += host_io.oe.eq(transfer_from_adc)
data_to_host = Signal.like(adc_stream.p)
rx_data_buffer = Signal(8)
m.d.comb += host_io.o[0].eq(rx_data_buffer)
m.d.comb += host_io.o[1].eq(rx_data_buffer)
with m.FSM():
with m.State("IDLE"):
m.d.comb += rx_clk_en.eq(enable & transfer_from_adc & adc_stream.valid)
with m.If(rx_clk_en):
m.d.sync += rx_data_buffer.eq(adc_stream.p.i >> 8)
m.d.sync += data_to_host.eq(adc_stream.p)
m.next = "RX_I1"
with m.State("RX_I1"):
m.d.comb += rx_clk_en.eq(1)
m.d.sync += rx_data_buffer.eq(data_to_host.i)
m.next = "RX_Q0"
with m.State("RX_Q0"):
m.d.comb += rx_clk_en.eq(1)
m.d.sync += rx_data_buffer.eq(data_to_host.q >> 8)
m.next = "RX_Q1"
with m.State("RX_Q1"):
m.d.comb += rx_clk_en.eq(1)
m.d.sync += rx_data_buffer.eq(data_to_host.q)
m.next = "IDLE"
if self._domain != "sync":
m = DomainRenamer(self._domain)(m)
return m
class FlowAndTriggerControl(wiring.Component):
trigger_en: In(1)
direction: Out(1) # async
enable: Out(1) # async
adc_capture: Out(1)
dac_capture: Out(1)
def __init__(self, domain):
super().__init__()
self._domain = domain
def elaborate(self, platform):
m = Module()
#
# Signal synchronization and trigger logic.
#
trigger_enable = self.trigger_en
trigger_in = platform.request("trigger_in").i
trigger_out = platform.request("trigger_out").o
host_data_enable = ~platform.request("disable").i
m.d.comb += trigger_out.eq(host_data_enable)
# Create a latch for the trigger input signal using a special FPGA primitive.
trigger_in_latched = Signal()
trigger_in_reg = Instance("SB_DFFES",
i_D = 0,
i_S = trigger_in, # async set
i_E = ~host_data_enable,
i_C = ClockSignal(self._domain),
o_Q = trigger_in_latched
)
m.submodules.trigger_in_reg = trigger_in_reg
# Export signals for direction control and capture gating.
m.d.comb += self.direction.eq(platform.request("direction").i)
m.d.comb += self.enable.eq(host_data_enable)
with m.If(host_data_enable):
m.d[self._domain] += self.adc_capture.eq((trigger_in_latched | ~trigger_enable) & (self.direction == 0))
m.d[self._domain] += self.dac_capture.eq((trigger_in_latched | ~trigger_enable) & (self.direction == 1))
with m.Else():
m.d[self._domain] += self.adc_capture.eq(0)
m.d[self._domain] += self.dac_capture.eq(0)
return m
class Top(Elaboratable):
def elaborate(self, platform):
@@ -142,15 +27,25 @@ class Top(Elaboratable):
m.submodules.clkgen = ClockDomainGenerator()
# Submodules.
m.submodules.flow_ctl = flow_ctl = FlowAndTriggerControl(domain="gck1")
m.submodules.adcdac_intf = adcdac_intf = MAX586xInterface(bb_domain="gck1")
m.submodules.mcu_intf = mcu_intf = MCUInterface(domain="sync")
m.submodules.mcu_intf = mcu_intf = SGPIOInterface(
sample_width=24,
rx_assignments=[
lambda w: Cat(w[8:12], w[11].replicate(4)),
lambda w: w[0:8],
lambda w: Cat(w[20:24], w[23].replicate(4)),
lambda w: w[12:20],
],
tx_assignments=[
lambda w, v: w[8:12].eq(v),
lambda w, v: w[0:8].eq(v),
lambda w, v: w[20:24].eq(v),
lambda w, v: w[12:20].eq(v),
],
domain="sync"
)
m.d.comb += adcdac_intf.adc_capture.eq(flow_ctl.adc_capture)
m.d.comb += adcdac_intf.dac_capture.eq(flow_ctl.dac_capture)
m.d.comb += adcdac_intf.q_invert.eq(platform.request("q_invert").i)
m.d.comb += mcu_intf.direction.eq(flow_ctl.direction)
m.d.comb += mcu_intf.enable.eq(flow_ctl.enable)
# Half-band filter taps.
taps_hb1 = [-2, 0, 5, 0, -10, 0,18, 0, -30, 0,53, 0,-101, 0, 323, 512, 323, 0,-101, 0, 53, 0, -30, 0,18, 0, -10, 0, 5, 0,-2]
@@ -173,7 +68,7 @@ class Top(Elaboratable):
"hbfir2": HalfBandDecimatorMAC16(taps_hb2, data_shape=fixed.SQ(11), overclock_rate=8, always_ready=True, domain="gck1"),
# Clock domain conversion.
"clkconv": ClockConverter(IQSample(12), 4, "gck1", "sync", always_ready=True),
"clkconv": ClockConverter(IQSample(12), 8, "gck1", "sync", always_ready=True),
}
for k,v in rx_chain.items():
m.submodules[f"rx_{k}"] = v
@@ -196,7 +91,7 @@ class Top(Elaboratable):
m.d.comb += [
# Trigger enable.
flow_ctl.trigger_en .eq(ctrl[7]),
mcu_intf.trigger_en .eq(ctrl[7]),
# RX settings.
rx_chain["dc_block"].enable .eq(ctrl[0]),

View File

@@ -4,140 +4,19 @@
# Copyright (c) 2025 Great Scott Gadgets <info@greatscottgadgets.com>
# SPDX-License-Identifier: BSD-3-Clause
from amaranth import Elaboratable, Module, Signal, Instance, Cat, ClockSignal, DomainRenamer
from amaranth.lib import io, fifo, stream, wiring
from amaranth.lib.wiring import Out, In, connect
from amaranth import Elaboratable, Module, Cat, DomainRenamer
from amaranth.lib.wiring import connect
from amaranth_future import fixed
from board import PralinePlatform, ClockDomainGenerator
from interface import MAX586xInterface
from interface.spi import SPIRegisterInterface
from interface import MAX586xInterface, SGPIOInterface, SPIRegisterInterface
from dsp.fir import FIRFilter
from dsp.fir_mac16 import HalfBandInterpolatorMAC16
from dsp.cic import CICInterpolator
from util import ClockConverter, IQSample, StreamSkidBuffer
class MCUInterface(wiring.Component):
dac_stream: Out(stream.Signature(IQSample(12)))
direction: In(1)
enable: In(1)
def __init__(self, domain="sync"):
self._domain = domain
super().__init__()
def elaborate(self, platform):
m = Module()
dac_stream = self.dac_stream
# Determine data transfer direction.
direction = Signal()
enable = Signal()
m.d.sync += enable.eq(self.enable)
m.d.sync += direction.eq(self.direction)
transfer_to_dac = (direction == 1)
# SGPIO clock and data lines.
m.submodules.clk_out = clk_out = io.DDRBuffer("o", platform.request("host_clk", dir="-"), o_domain=self._domain)
m.submodules.host_io = host_io = io.DDRBuffer('io', platform.request("host_data", dir="-"), i_domain=self._domain, o_domain=self._domain)
# State machine to control SGPIO clock and data lines.
tx_clk_en = Signal()
m.d.sync += clk_out.o[0].eq(tx_clk_en)
tx_dly_write = Signal(4)
tx_in_sample = Signal(4*8)
m.d.sync += tx_dly_write.eq(tx_dly_write << 1)
m.d.sync += tx_in_sample.eq(Cat(host_io.i[1], tx_in_sample))
# Small TX FIFO to avoid overflows from the write delay.
m.submodules.tx_fifo = tx_fifo = fifo.SyncFIFOBuffered(width=24, depth=4)
m.d.comb += [
tx_fifo.w_data.word_select(0, 12) .eq(tx_in_sample[20:32]),
tx_fifo.w_data.word_select(1, 12) .eq(tx_in_sample[4:16]),
tx_fifo.w_en .eq(tx_dly_write[-1]),
dac_stream.p .eq(tx_fifo.r_data),
dac_stream.valid .eq(tx_fifo.r_rdy),
tx_fifo.r_en .eq(dac_stream.ready),
]
with m.FSM():
with m.State("IDLE"):
m.d.comb += tx_clk_en.eq(enable & transfer_to_dac & dac_stream.ready)
with m.If(tx_clk_en):
m.next = "TX_I1"
with m.State("TX_I1"):
m.d.comb += tx_clk_en.eq(1)
m.next = "TX_Q0"
with m.State("TX_Q0"):
m.d.comb += tx_clk_en.eq(1)
m.next = "TX_Q1"
with m.State("TX_Q1"):
m.d.comb += tx_clk_en.eq(1)
m.d.sync += tx_dly_write[0].eq(1) # delayed write
m.next = "IDLE"
if self._domain != "sync":
m = DomainRenamer(self._domain)(m)
return m
class FlowAndTriggerControl(wiring.Component):
trigger_en: In(1)
direction: Out(1) # async
enable: Out(1) # async
adc_capture: Out(1)
dac_capture: Out(1)
def __init__(self, domain):
super().__init__()
self._domain = domain
def elaborate(self, platform):
m = Module()
#
# Signal synchronization and trigger logic.
#
trigger_enable = self.trigger_en
trigger_in = platform.request("trigger_in").i
trigger_out = platform.request("trigger_out").o
host_data_enable = ~platform.request("disable").i
m.d.comb += trigger_out.eq(host_data_enable)
# Create a latch for the trigger input signal using a special FPGA primitive.
trigger_in_latched = Signal()
trigger_in_reg = Instance("SB_DFFES",
i_D = 0,
i_S = trigger_in, # async set
i_E = ~host_data_enable,
i_C = ClockSignal(self._domain),
o_Q = trigger_in_latched
)
m.submodules.trigger_in_reg = trigger_in_reg
# Export signals for direction control and capture gating.
m.d.comb += self.direction.eq(platform.request("direction").i)
m.d.comb += self.enable.eq(host_data_enable)
with m.If(host_data_enable):
m.d[self._domain] += self.adc_capture.eq((trigger_in_latched | ~trigger_enable) & (self.direction == 0))
m.d[self._domain] += self.dac_capture.eq((trigger_in_latched | ~trigger_enable) & (self.direction == 1))
with m.Else():
m.d[self._domain] += self.adc_capture.eq(0)
m.d[self._domain] += self.dac_capture.eq(0)
return m
class Top(Elaboratable):
def elaborate(self, platform):
@@ -146,15 +25,27 @@ class Top(Elaboratable):
m.submodules.clkgen = ClockDomainGenerator()
# Submodules.
m.submodules.flow_ctl = flow_ctl = FlowAndTriggerControl(domain="gck1")
m.submodules.adcdac_intf = adcdac_intf = MAX586xInterface(bb_domain="gck1")
m.submodules.mcu_intf = mcu_intf = MCUInterface(domain="sync")
m.submodules.mcu_intf = mcu_intf = SGPIOInterface(
sample_width=24,
rx_assignments=[
lambda w: Cat(w[8:12], w[11].replicate(4)),
lambda w: w[0:8],
lambda w: Cat(w[20:24], w[23].replicate(4)),
lambda w: w[12:20],
],
tx_assignments=[
lambda w, v: w[8:12].eq(v),
lambda w, v: w[0:8].eq(v),
lambda w, v: w[20:24].eq(v),
lambda w, v: w[12:20].eq(v),
],
domain="sync"
)
m.d.comb += adcdac_intf.dac_capture.eq(flow_ctl.dac_capture)
m.d.comb += adcdac_intf.q_invert.eq(platform.request("q_invert").i)
m.d.comb += mcu_intf.direction.eq(flow_ctl.direction)
m.d.comb += mcu_intf.enable.eq(flow_ctl.enable)
# Half-band filter taps.
taps_hb1 = [-2, 0, 5, 0, -10, 0,18, 0, -30, 0,53, 0,-101, 0, 323, 512, 323, 0,-101, 0, 53, 0, -30, 0,18, 0, -10, 0, 5, 0,-2]
taps_hb1 = [ tap/1024 for tap in taps_hb1 ]
@@ -164,7 +55,7 @@ class Top(Elaboratable):
tx_chain = {
# Clock domain conversion.
"clkconv": ClockConverter(IQSample(12), 4, "sync", "gck1", always_ready=False),
"clkconv": ClockConverter(IQSample(12), 8, "sync", "gck1", always_ready=False),
# Half-band interpolation stages (+ skid buffers for timing closure).
"hbfir1": HalfBandInterpolatorMAC16(taps_hb1, data_shape=fixed.SQ(11),
@@ -176,7 +67,6 @@ class Top(Elaboratable):
# CIC interpolation stage.
"cic_comp": DomainRenamer("gck1")(FIRFilter([-0.125, 0, 0.75, 0, -0.125], shape=fixed.SQ(11), shape_out=fixed.SQ(11), always_ready=False, num_channels=2)),
"cic_interpolator": CICInterpolator(2, 4, (4, 8, 16, 32), 12, 8, num_channels=2,
always_ready=False, domain="gck1"),
}
@@ -201,7 +91,7 @@ class Top(Elaboratable):
m.d.comb += [
# Trigger enable.
flow_ctl.trigger_en .eq(ctrl[7]),
mcu_intf.trigger_en .eq(ctrl[7]),
# TX interpolation rate.
tx_chain["cic_interpolator"].factor .eq(tx_intrp + 2),

View File

@@ -5,128 +5,17 @@
# Copyright (c) 2024 Great Scott Gadgets <info@greatscottgadgets.com>
# SPDX-License-Identifier: BSD-3-Clause
from amaranth import Elaboratable, Module, Signal, C, Mux, Instance, Cat, ClockSignal, DomainRenamer, signed
from amaranth.lib import io, stream, wiring, cdc, data, fifo
from amaranth import Elaboratable, Module, DomainRenamer
from amaranth.lib import stream, wiring
from amaranth.lib.wiring import Out, In, connect
from board import PralinePlatform, ClockDomainGenerator
from interface import MAX586xInterface
from interface.spi import SPIRegisterInterface
from interface import MAX586xInterface, SGPIOInterface, SPIRegisterInterface
from dsp.dc_block import DCBlock
from dsp.round import convergent_round
from util import IQSample, ClockConverter
class MCUInterface(wiring.Component):
adc_stream: In(stream.Signature(IQSample(4), always_ready=True))
dac_stream: Out(stream.Signature(IQSample(4)))
direction: In(1)
enable: In(1)
def __init__(self, domain="sync"):
self._domain = domain
super().__init__()
def elaborate(self, platform):
m = Module()
adc_stream = self.adc_stream
dac_stream = self.dac_stream
# Determine data transfer direction.
direction = Signal()
enable = Signal()
m.d.sync += enable.eq(self.enable)
m.d.sync += direction.eq(self.direction)
transfer_from_adc = (direction == 0)
transfer_to_dac = (direction == 1)
# SGPIO clock and data lines.
m.submodules.clk_out = clk_out = io.DDRBuffer("o", platform.request("host_clk", dir="-"), o_domain=self._domain)
m.submodules.host_io = host_io = io.DDRBuffer('io', platform.request("host_data", dir="-"), i_domain=self._domain, o_domain=self._domain)
# State machine to control SGPIO clock and data lines.
m.d.sync += clk_out.o[0].eq(0)
m.d.sync += clk_out.o[1].eq(0)
m.d.sync += host_io.oe.eq(transfer_from_adc)
data_to_host = Signal.like(Cat(adc_stream.p.i, adc_stream.p.q))
assert len(data_to_host) == 8
m.d.comb += host_io.o[0].eq(data_to_host)
m.d.comb += host_io.o[1].eq(data_to_host)
tx_dly_write = Signal(2)
m.d.sync += tx_dly_write.eq(tx_dly_write << 1)
m.d.comb += dac_stream.payload.eq(host_io.i[1])
m.d.comb += dac_stream.valid.eq(tx_dly_write[-1])
with m.FSM():
with m.State("IDLE"):
with m.If(enable):
with m.If(transfer_from_adc & adc_stream.valid):
m.d.sync += data_to_host.eq(Cat(adc_stream.p.i, adc_stream.p.q))
m.d.sync += clk_out.o[1].eq(1)
with m.Elif(transfer_to_dac & dac_stream.ready):
m.d.sync += clk_out.o[0].eq(1)
m.d.sync += tx_dly_write[0].eq(1) # delayed write
if self._domain != "sync":
m = DomainRenamer(self._domain)(m)
return m
class FlowAndTriggerControl(wiring.Component):
trigger_en: In(1)
direction: Out(1) # async
enable: Out(1) # async
adc_capture: Out(1)
dac_capture: Out(1)
def __init__(self, domain):
super().__init__()
self._domain = domain
def elaborate(self, platform):
m = Module()
#
# Signal synchronization and trigger logic.
#
trigger_enable = self.trigger_en
trigger_in = platform.request("trigger_in").i
trigger_out = platform.request("trigger_out").o
host_data_enable = ~platform.request("disable").i
m.d.comb += trigger_out.eq(host_data_enable)
# Create a latch for the trigger input signal using a FPGA primitive.
trigger_in_latched = Signal()
trigger_in_reg = Instance("SB_DFFES",
i_D = 0,
i_S = trigger_in, # async set
i_E = ~host_data_enable,
i_C = ClockSignal(self._domain),
o_Q = trigger_in_latched
)
m.submodules.trigger_in_reg = trigger_in_reg
# Export signals for direction control and gating captures.
m.d.comb += self.direction.eq(platform.request("direction").i)
m.d.comb += self.enable.eq(host_data_enable)
with m.If(host_data_enable):
m.d[self._domain] += self.adc_capture.eq((trigger_in_latched | ~trigger_enable) & (self.direction == 0))
m.d[self._domain] += self.dac_capture.eq((trigger_in_latched | ~trigger_enable) & (self.direction == 1))
with m.Else():
m.d[self._domain] += self.adc_capture.eq(0)
m.d[self._domain] += self.dac_capture.eq(0)
return m
class IQHalfPrecisionConverter(wiring.Component):
input: In(stream.Signature(IQSample(8), always_ready=True))
output: Out(stream.Signature(IQSample(4), always_ready=True))
@@ -167,22 +56,18 @@ class Top(Elaboratable):
m.submodules.clkgen = ClockDomainGenerator()
# Submodules.
m.submodules.flow_ctl = flow_ctl = FlowAndTriggerControl(domain="gck1")
m.submodules.adcdac_intf = adcdac_intf = MAX586xInterface(bb_domain="gck1")
m.submodules.mcu_intf = mcu_intf = MCUInterface(domain="sync")
m.submodules.mcu_intf = mcu_intf = SGPIOInterface(sample_width=8, domain="sync")
m.d.comb += adcdac_intf.adc_capture.eq(flow_ctl.adc_capture)
m.d.comb += adcdac_intf.dac_capture.eq(flow_ctl.dac_capture)
m.d.comb += adcdac_intf.q_invert.eq(platform.request("q_invert").i)
m.d.comb += mcu_intf.direction.eq(flow_ctl.direction)
m.d.comb += mcu_intf.enable.eq(flow_ctl.enable)
rx_chain = {
"dc_block": DCBlock(width=8, num_channels=2, domain="gck1"),
"half_prec": DomainRenamer("gck1")(IQHalfPrecisionConverter()),
"clkconv": ClockConverter(IQSample(4), 4, "gck1", "sync"),
"clkconv": ClockConverter(IQSample(4), 16, "gck1", "sync"),
}
m.submodules += rx_chain.values()
for k,v in rx_chain.items():
m.submodules[f"rx_{k}"] = v
# Connect receiver chain.
last = adcdac_intf.adc_stream
@@ -193,10 +78,11 @@ class Top(Elaboratable):
tx_chain = {
"clkconv": ClockConverter(IQSample(4), 4, "sync", "gck1", always_ready=False),
"clkconv": ClockConverter(IQSample(4), 16, "sync", "gck1", always_ready=False),
"half_prec": DomainRenamer("gck1")(IQHalfPrecisionConverterInv()),
}
m.submodules += tx_chain.values()
for k,v in tx_chain.items():
m.submodules[f"tx_{k}"] = v
# Connect transmitter chain.
last = mcu_intf.dac_stream
@@ -213,7 +99,7 @@ class Top(Elaboratable):
ctrl = spi_regs.add_register(0x01, init=0)
m.d.comb += [
# Trigger enable.
flow_ctl.trigger_en .eq(ctrl[7]),
mcu_intf.trigger_en .eq(ctrl[7]),
# RX settings.
rx_chain["dc_block"].enable .eq(ctrl[0]),
@@ -224,4 +110,4 @@ class Top(Elaboratable):
if __name__ == "__main__":
plat = PralinePlatform()
plat.build(Top_HP())
plat.build(Top())

View File

@@ -4,168 +4,20 @@
# Copyright (c) 2025 Great Scott Gadgets <info@greatscottgadgets.com>
# SPDX-License-Identifier: BSD-3-Clause
from amaranth import Elaboratable, Module, Signal, Mux, Instance, Cat, ClockSignal, DomainRenamer, EnableInserter
from amaranth.lib import io, fifo, stream, wiring, cdc
from amaranth.lib.wiring import Out, In, connect
from amaranth import Elaboratable, Module, Signal, Mux, DomainRenamer
from amaranth.lib import cdc
from amaranth.lib.wiring import connect
from amaranth_future import fixed
from board import PralinePlatform, ClockDomainGenerator
from interface import MAX586xInterface
from interface.spi import SPIRegisterInterface
from interface import MAX586xInterface, SGPIOInterface, SPIRegisterInterface
from dsp.fir import HalfBandDecimator, HalfBandInterpolator
from dsp.cic import CICDecimator, CICInterpolator
from dsp.cic import CICInterpolator
from dsp.dc_block import DCBlock
from dsp.quarter_shift import QuarterShift
from dsp.nco import NCO
from util import ClockConverter, IQSample, StreamSkidBuffer, LinearFeedbackShiftRegister
class MCUInterface(wiring.Component):
adc_stream: In(stream.Signature(IQSample(8), always_ready=True))
dac_stream: Out(stream.Signature(IQSample(8)))
direction: In(1)
enable: In(1)
prbs: In(1)
def __init__(self, domain="sync"):
self._domain = domain
super().__init__()
def elaborate(self, platform):
m = Module()
adc_stream = self.adc_stream
dac_stream = self.dac_stream
# Determine data transfer direction.
direction = Signal()
enable = Signal()
m.submodules.enable_cdc = cdc.FFSynchronizer(self.enable, enable, o_domain=self._domain)
m.submodules.direction_cdc = cdc.FFSynchronizer(self.direction, direction, o_domain=self._domain)
transfer_from_adc = (direction == 0)
transfer_to_dac = (direction == 1)
# SGPIO clock and data lines.
m.submodules.clk_out = clk_out = io.DDRBuffer("o", platform.request("host_clk", dir="-"), o_domain=self._domain)
m.submodules.host_io = host_io = io.DDRBuffer('io', platform.request("host_data", dir="-"), i_domain=self._domain, o_domain=self._domain)
# State machine to control SGPIO clock and data lines.
tx_clk_en = Signal()
rx_clk_en = Signal()
m.d.sync += clk_out.o[0].eq(tx_clk_en)
m.d.sync += clk_out.o[1].eq(rx_clk_en)
m.d.sync += host_io.oe.eq(transfer_from_adc)
data_to_host = Signal.like(adc_stream.p)
m.d.comb += host_io.o[0].eq(data_to_host)
m.d.comb += host_io.o[1].eq(data_to_host)
tx_dly_write = Signal(3)
host_io_prev_data = Signal(8)
m.d.sync += tx_dly_write.eq(tx_dly_write << 1)
m.d.sync += host_io_prev_data.eq(host_io.i[1])
# Small TX FIFO to avoid overflows from the write delay.
m.submodules.tx_fifo = tx_fifo = fifo.SyncFIFOBuffered(width=16, depth=8)
m.d.comb += [
tx_fifo.w_data .eq(Cat(host_io_prev_data, host_io.i[1])),
tx_fifo.w_en .eq(tx_dly_write[-1]),
dac_stream.p .eq(tx_fifo.r_data),
dac_stream.valid .eq(tx_fifo.r_rdy),
tx_fifo.r_en .eq(dac_stream.ready),
]
# Pseudo-random binary sequence generator.
prbs_advance = Signal()
prbs_count = Signal(2)
m.submodules.prbs = prbs = EnableInserter(prbs_advance)(
LinearFeedbackShiftRegister(degree=8, taps=[8,6,5,4], init=0b10110001))
with m.FSM():
with m.State("IDLE"):
m.d.comb += tx_clk_en.eq(enable & transfer_to_dac & dac_stream.ready)
m.d.comb += rx_clk_en.eq(enable & transfer_from_adc & adc_stream.valid)
with m.If(self.prbs):
m.next = "PRBS"
with m.Elif(rx_clk_en):
m.d.sync += data_to_host.eq(adc_stream.p)
m.next = "RX_Q"
with m.Elif(tx_clk_en):
m.next = "TX_Q"
with m.State("RX_Q"):
m.d.comb += rx_clk_en.eq(1)
m.d.sync += data_to_host.i.eq(data_to_host.q)
m.next = "IDLE"
with m.State("TX_Q"):
m.d.comb += tx_clk_en.eq(1)
m.d.sync += tx_dly_write[0].eq(1) # delayed write
m.next = "IDLE"
with m.State("PRBS"):
m.d.sync += host_io.oe.eq(1)
m.d.sync += data_to_host.eq(prbs.value)
m.d.comb += rx_clk_en.eq(prbs_count == 0)
m.d.comb += prbs_advance.eq(prbs_count == 0)
m.d.sync += prbs_count.eq(prbs_count + 1)
with m.If(~self.prbs):
m.next = "IDLE"
if self._domain != "sync":
m = DomainRenamer(self._domain)(m)
return m
class FlowAndTriggerControl(wiring.Component):
trigger_en: In(1)
direction: Out(1) # async
enable: Out(1) # async
adc_capture: Out(1)
dac_capture: Out(1)
def __init__(self, domain):
super().__init__()
self._domain = domain
def elaborate(self, platform):
m = Module()
#
# Signal synchronization and trigger logic.
#
trigger_enable = self.trigger_en
trigger_in = platform.request("trigger_in").i
trigger_out = platform.request("trigger_out").o
host_data_enable = ~platform.request("disable").i
m.d.comb += trigger_out.eq(host_data_enable)
# Create a latch for the trigger input signal using a special FPGA primitive.
trigger_in_latched = Signal()
trigger_in_reg = Instance("SB_DFFES",
i_D = 0,
i_S = trigger_in, # async set
i_E = ~host_data_enable,
i_C = ClockSignal(self._domain),
o_Q = trigger_in_latched
)
m.submodules.trigger_in_reg = trigger_in_reg
# Export signals for direction control and capture gating.
m.d.comb += self.direction.eq(platform.request("direction").i)
m.d.comb += self.enable.eq(host_data_enable)
with m.If(host_data_enable):
m.d[self._domain] += self.adc_capture.eq((trigger_in_latched | ~trigger_enable) & (self.direction == 0))
m.d[self._domain] += self.dac_capture.eq((trigger_in_latched | ~trigger_enable) & (self.direction == 1))
with m.Else():
m.d[self._domain] += self.adc_capture.eq(0)
m.d[self._domain] += self.dac_capture.eq(0)
return m
from util import ClockConverter, IQSample, StreamSkidBuffer
class Top(Elaboratable):
@@ -176,15 +28,10 @@ class Top(Elaboratable):
m.submodules.clkgen = ClockDomainGenerator()
# Submodules.
m.submodules.flow_ctl = flow_ctl = FlowAndTriggerControl(domain="gck1")
m.submodules.adcdac_intf = adcdac_intf = MAX586xInterface(bb_domain="gck1")
m.submodules.mcu_intf = mcu_intf = MCUInterface(domain="sync")
m.submodules.mcu_intf = mcu_intf = SGPIOInterface(sample_width=16, domain="sync")
m.d.comb += adcdac_intf.adc_capture.eq(flow_ctl.adc_capture)
m.d.comb += adcdac_intf.dac_capture.eq(flow_ctl.dac_capture)
m.d.comb += adcdac_intf.q_invert.eq(platform.request("q_invert").i)
m.d.comb += mcu_intf.direction.eq(flow_ctl.direction)
m.d.comb += mcu_intf.enable.eq(flow_ctl.enable)
# Half-band filter taps.
taps = [-2, 0, 7, 0, -18, 0, 41, 0, -92, 0, 320, 512, 320, 0, -92, 0, 41, 0, -18, 0, 7, 0, -2]
@@ -221,7 +68,7 @@ class Top(Elaboratable):
"hbfir1": HalfBandDecimator(taps, **common_rx_filter_opts),
# Clock domain conversion.
"clkconv": ClockConverter(IQSample(8), 4, "gck1", "sync"),
"clkconv": ClockConverter(IQSample(8), 8, "gck1", "sync"),
}
for k,v in rx_chain.items():
m.submodules[f"rx_{k}"] = v
@@ -235,7 +82,7 @@ class Top(Elaboratable):
tx_chain = {
# Clock domain conversion.
"clkconv": ClockConverter(IQSample(8), 4, "sync", "gck1", always_ready=False),
"clkconv": ClockConverter(IQSample(8), 8, "sync", "gck1", always_ready=False),
# Half-band interpolation stages (+ skid buffers for timing closure).
"hbfir1": HalfBandInterpolator(taps, data_shape=fixed.SQ(7),
@@ -248,6 +95,7 @@ class Top(Elaboratable):
# CIC interpolation stage.
"cic_interpolator": CICInterpolator(1, 3, (1, 2, 4, 8), 8, 8, num_channels=2,
always_ready=False, domain="gck1"),
"skid4": DomainRenamer("gck1")(StreamSkidBuffer(IQSample(8), always_ready=False)),
}
for k,v in tx_chain.items():
m.submodules[f"tx_{k}"] = v
@@ -263,7 +111,7 @@ class Top(Elaboratable):
m.d.comb += [
adcdac_intf.dac_stream.p.eq(nco.output),
adcdac_intf.dac_stream.valid.eq(1),
tx_chain["cic_interpolator"].output.ready.eq(1),
last.ready.eq(1),
]
with m.Else():
connect(m, last, adcdac_intf.dac_stream)
@@ -281,7 +129,7 @@ class Top(Elaboratable):
m.d.sync += [
# Trigger enable.
flow_ctl.trigger_en .eq(ctrl[7]),
mcu_intf.trigger_en .eq(ctrl[7]),
# PRBS enable.
mcu_intf.prbs .eq(ctrl[6]),

View File

@@ -35,7 +35,7 @@ class ClockConverter(wiring.Component):
def elaborate(self, platform):
m = Module()
m.submodules.mem = mem = fifo.AsyncFIFO(
m.submodules.mem = mem = fifo.AsyncFIFOBuffered(
width=Shape.cast(self.shape).width,
depth=self.depth,
r_domain=self._output_domain,