gateware: fix SB_MAC16 versions of the FIR decimator, interpolator

Simplified the filter architecture by using an adder tree for obtaining
the final sum of the subfilters (vs systolic array). Previously, no
backpressure (always_ready=True) worked fine but otherwise we lost
samples.
This commit is contained in:
mndza
2025-12-17 11:59:10 +01:00
parent 99f96d2498
commit 91c005d13e
4 changed files with 200 additions and 204 deletions

Binary file not shown.

View File

@@ -7,7 +7,7 @@
from math import ceil, log2
from amaranth import Module, Signal, Mux, DomainRenamer, ClockSignal, signed
from amaranth.lib import wiring, stream, data, memory
from amaranth.lib import wiring, stream, data, memory, fifo
from amaranth.lib.wiring import In, Out
from amaranth.utils import bits_for
@@ -58,7 +58,7 @@ class HalfBandDecimatorMAC16(wiring.Component):
if not self.input.signature.always_ready:
m.d.comb += self.input.ready.eq(~odd | fir.input.ready)
m.d.comb += dly.output.ready.eq(1)
m.d.comb += dly.output.ready.eq(fir.input.ready)
m.d.comb += [
dly.input.p.eq(self.input.p),
@@ -126,30 +126,52 @@ class HalfBandInterpolatorMAC16(wiring.Component):
taps = [ 2 * tap for tap in self.taps ]
arm0_taps = taps[0::2]
arm1_taps = taps[1::2]
delay = arm1_taps.index(1)
# Arms
m.submodules.fir = fir = FIRFilterMAC16(arm0_taps, shape=self.data_shape, shape_out=self.shape_out, overclock_rate=self.overclock_rate, always_ready=always_ready, num_channels=self.num_channels, delayed_port=True)
m.submodules.fir = fir = FIRFilterMAC16(arm0_taps, shape=self.data_shape, shape_out=self.shape_out, overclock_rate=self.overclock_rate, always_ready=always_ready, num_channels=self.num_channels)
m.submodules.dly = dly = Delay(delay, shape=self.data_shape, always_ready=always_ready, num_channels=self.num_channels)
m.submodules.dly_fifo = dly_fifo = fifo.SyncFIFOBuffered(width=self.num_channels*self.data_shape.as_shape().width, depth=self.overclock_rate+1)
m.d.comb += [
dly_fifo.w_data.eq(dly.output.p),
dly_fifo.w_en.eq(dly.output.valid),
]
if not dly.output.signature.always_ready:
m.d.comb += dly.output.ready.eq(dly_fifo.w_rdy)
busy = Signal()
with m.If(fir.input.valid & fir.input.ready):
m.d.sync += busy.eq(1)
winchin_valid = Signal()
winchin_ready_0 = Signal()
winchin_ready = Signal()
m.d.comb += [
winchin_valid.eq(self.input.valid),
winchin_ready.eq(self.input.ready),
winchin_ready_0.eq(fir.input.ready),
]
#busy = Signal()
#with m.If(fir.input.valid & fir.input.ready):
# m.d.sync += busy.eq(1)
# Input
m.d.comb += fir.input.payload.eq(self.input.payload)
m.d.comb += fir.input.valid.eq(self.input.valid & ~busy)
m.d.comb += fir.input.valid.eq(self.input.valid & dly.input.ready)
m.d.comb += dly.input.payload.eq(self.input.payload)
m.d.comb += dly.input.valid.eq(self.input.valid & fir.input.ready)
if not self.input.signature.always_ready:
m.d.comb += self.input.ready.eq(fir.input.ready & ~busy)
m.d.comb += self.input.ready.eq(fir.input.ready & dly.input.ready)
# Output
# Arm index selection: switch after every delivered sample
arm_index = Signal()
delayed = Signal.like(fir.input_delayed)
with m.If(fir.output.valid & fir.output.ready):
m.d.sync += delayed.eq(fir.input_delayed)
#delayed = Signal.like(fir.input_delayed)
#with m.If(fir.output.valid & fir.output.ready):
# m.d.sync += delayed.eq(fir.input_delayed)
r_data_cast = data.ArrayLayout(self.data_shape, self.num_channels)(dly_fifo.r_data)
with m.If(~self.output.valid | self.output.ready):
with m.Switch(arm_index):
@@ -163,10 +185,11 @@ class HalfBandInterpolatorMAC16(wiring.Component):
m.d.sync += arm_index.eq(1)
with m.Case(1):
for c in range(self.num_channels):
m.d.sync += self.output.payload[c].eq(delayed[c])
m.d.sync += self.output.valid.eq(1)
m.d.sync += arm_index.eq(0)
m.d.sync += busy.eq(0)
m.d.sync += self.output.payload[c].eq(r_data_cast[c])
m.d.sync += self.output.valid.eq(dly_fifo.r_rdy)
m.d.comb += dly_fifo.r_en.eq(1)
with m.If(dly_fifo.r_rdy):
m.d.sync += arm_index.eq(0)
if self._domain != "sync":
m = DomainRenamer(self._domain)(m)
@@ -208,11 +231,12 @@ class FIRFilterMAC16(wiring.Component):
})
super().__init__(signature)
def taps_shape(self):
taps_as_ratios = [tap.as_integer_ratio() for tap in self.taps]
def taps_shape(self, taps=None):
taps = taps or self.taps
taps_as_ratios = [tap.as_integer_ratio() for tap in taps]
f_width = bits_for(max(tap[1] for tap in taps_as_ratios)) - 1
i_width = max(0, bits_for(max(abs(tap[0]) for tap in taps_as_ratios)) - f_width)
return fixed.Shape(i_width, f_width, signed=any(tap < 0 for tap in self.taps))
return fixed.Shape(i_width, f_width, signed=any(tap < 0 for tap in taps))
def compute_output_shape(self):
taps_shape = self.taps_shape()
@@ -229,101 +253,105 @@ class FIRFilterMAC16(wiring.Component):
def elaborate(self, platform):
m = Module()
# Build filter out of FIRFilterSerialMAC16 blocks.
# Build filter out of SerialMAC16 blocks.
overclock_factor = self.overclock_rate
# Symmetric coefficients special case.
symmetric = (self.taps == self.taps[::-1])
# Even-symmetric case. (N=2*K)
# Odd-symmetric case. (N=2*K+1)
if symmetric:
taps = self.taps[:ceil(len(self.taps)/2)]
odd_symmetric = ((len(self.taps) % 2) == 1)
else:
taps = self.taps
dsp_block_count = ceil(len(taps) / overclock_factor)
def pipe(signal, length):
name = signal.name if hasattr(signal, "name") else "signal"
pipe = [ signal ] + [ Signal.like(signal, name=f"{name}_q{i}") for i in range(length) ]
for i in range(length):
m.d.sync += pipe[i+1].eq(pipe[i])
return pipe
taps = self.taps
if self.carry is not None:
sum_carry_q = Signal.like(self.sum_carry)
with m.If(self.input.valid & self.input.ready):
m.d.sync += sum_carry_q.eq(self.sum_carry)
filters_ready = Signal()
window_valid = Signal()
input_ready = Signal()
m.d.comb += input_ready.eq(~window_valid | filters_ready)
if not self.input.signature.always_ready:
m.d.comb += self.input.ready.eq(input_ready)
# Samples window.
window = [ Signal.like(self.input.p, name=f"window_{i}") for i in range(len(self.taps)) ]
with m.If(input_ready):
m.d.sync += window_valid.eq(self.input.valid)
with m.If(self.input.valid):
m.d.sync += window[0].eq(self.input.p)
for i in range(1, len(window)):
m.d.sync += window[i].eq(window[i-1])
if self.carry is not None:
m.d.sync += sum_carry_q.eq(self.sum_carry)
# When filter is symmetric, presum samples to obtain a smaller window.
symmetric = (self.taps == self.taps[::-1])
if symmetric:
sum_shape = (self.input.p[0] + self.input.p[0]).shape()
odd_symmetric = ((len(self.taps) % 2) == 1)
new_len = len(self.taps) // 2 + odd_symmetric
new_window = [ Signal(data.ArrayLayout(sum_shape, self.num_channels), name=f"window_sym_{i}") for i in range(new_len) ]
for i in range(len(new_window) - odd_symmetric):
for c in range(self.num_channels):
m.d.comb += new_window[i][c].eq(window[i][c] + window[-i-1][c])
if odd_symmetric:
for c in range(self.num_channels):
m.d.comb += new_window[-1][c].eq(window[len(self.taps)//2][c])
window = new_window
taps = self.taps[:ceil(len(self.taps)/2)]
samples_shape = sum_shape
else:
samples_shape = self.shape
# Build filter out of SerialMAC16 blocks: each one multiplies and
# accumulates `overclock_factor` taps serially.
dsp_block_count = ceil(len(taps) / overclock_factor)
# If we have multiple subfilters, make them all the same size.
if dsp_block_count > 1 and len(taps) % overclock_factor != 0:
taps = taps + [0]*(overclock_factor - (len(taps)%overclock_factor))
for c in range(self.num_channels):
last = self.input
dsp_blocks = []
for i in range(dsp_block_count):
taps_slice = taps[i*overclock_factor:(i+1)*overclock_factor]
input_delayed = len(taps_slice)
carry = last.output.p.shape() if i > 0 else self.carry
window_slice = window[i*overclock_factor:(i+1)*overclock_factor]
carry = None if i > 0 else self.carry
if (i == dsp_block_count-1) and symmetric and odd_symmetric:
taps_slice[-1] /= 2
input_delayed -= 1
dsp = FIRFilterSerialMAC16(taps=taps_slice, shape=self.shape, taps_shape=self.taps_shape(), carry=carry, symmetry=symmetric,
input_delayed_cycles=input_delayed, always_ready=self.always_ready)
dsp = SerialMAC16(taps=taps_slice, shape=samples_shape, taps_shape=self.taps_shape(taps), carry=carry, always_ready=self.always_ready)
dsp_blocks.append(dsp)
for j in range(len(window_slice)):
m.d.comb += dsp.input.p[j].eq(window_slice[j][c])
m.d.comb += dsp.input.valid.eq(window_valid)
if i == 0:
m.d.comb += [
dsp.input.p .eq(self.input.p[c]),
dsp.input.valid .eq(self.input.valid & self.input.ready),
]
if not self.input.signature.always_ready:
m.d.comb += self.input.ready.eq(dsp.input.ready)
m.d.comb += filters_ready.eq(dsp.input.ready)
if self.carry is not None:
m.d.comb += dsp.sum_carry.eq(sum_carry_q[c])
else:
m.d.comb += [
dsp.input.p .eq(pipe(last.input_delayed, last.delay())[-1]),
dsp.input.valid .eq(last.output.valid),
dsp.sum_carry .eq(last.output.p),
]
if not last.output.signature.always_ready:
m.d.comb += last.output.ready.eq(dsp.input.ready)
last = dsp
if self.delayed_port:
m.d.comb += self.input_delayed[c].eq(last.input_delayed)
if symmetric:
for i in reversed(range(dsp_block_count)):
end_block = (i == dsp_block_count-1)
m.d.comb += [
dsp_blocks[i].rev_input .eq(dsp_blocks[i+1].rev_delayed if not end_block else dsp_blocks[i].input_delayed),
]
m.submodules += dsp_blocks
m.d.comb += [
self.output.payload[c] .eq(last.output.p),
self.output.valid .eq(last.output.valid),
]
if not last.output.signature.always_ready:
m.d.comb += last.output.ready.eq(self.output.ready)
# Adder tree for channel c
if dsp_block_count > 1:
with m.If(~self.output.valid | self.output.ready):
for i in range(dsp_block_count):
if not dsp_blocks[i].output.signature.always_ready:
m.d.comb += dsp_blocks[i].output.ready.eq(1)
m.d.sync += self.output.valid.eq(dsp_blocks[0].output.valid)
with m.If(dsp_blocks[0].output.valid):
m.d.sync += self.output.payload[c] .eq(sum(dsp_blocks[i].output.p for i in range(dsp_block_count)))
else:
m.d.comb += self.output.payload[c].eq(dsp_blocks[0].output.p)
m.d.comb += self.output.valid.eq(dsp_blocks[0].output.valid)
if not dsp_blocks[0].output.signature.always_ready:
m.d.comb += dsp_blocks[0].output.ready.eq(self.output.ready)
return m
class FIRFilterSerialMAC16(wiring.Component):
class SerialMAC16(wiring.Component):
def __init__(self, taps, shape, shape_out=None, taps_shape=None, carry=None, symmetry=False, input_delayed_cycles=None, always_ready=False):
assert shape.as_shape().width <= 16, "DSP slice inputs have a maximum width of 16 bit."
def __init__(self, taps, shape, shape_out=None, taps_shape=None, carry=None, always_ready=False):
assert shape.as_shape().width <= 16, f"DSP slice inputs have a maximum width of 16 bit. {shape} {shape.as_shape().width}"
self.carry = carry
self.taps = list(taps)
@@ -333,15 +361,8 @@ class FIRFilterSerialMAC16(wiring.Component):
shape_out = self.compute_output_shape()
self.shape_out = shape_out
self.always_ready = always_ready
self.symmetry = symmetry
if input_delayed_cycles is None:
self.input_delayed_cycles = len(self.taps)
else:
self.input_delayed_cycles = input_delayed_cycles
signature = {
"input": In(stream.Signature(shape, always_ready=always_ready)),
"input_delayed": Out(shape),
"input": In(stream.Signature(data.ArrayLayout(shape, len(taps)), always_ready=always_ready)),
"output": Out(stream.Signature(shape_out, always_ready=always_ready)),
}
if carry is not None:
@@ -350,11 +371,6 @@ class FIRFilterSerialMAC16(wiring.Component):
})
else:
self.sum_carry = 0
if symmetry:
signature.update({
"rev_input": In(shape),
"rev_delayed": Out(shape),
})
super().__init__(signature)
def taps_shape(self):
@@ -375,72 +391,36 @@ class FIRFilterSerialMAC16(wiring.Component):
shape_out = fixed.Shape(i_width, f_width, signed=signed)
return shape_out
def delay(self):
return 1 + 1 + 3 + len(self.taps) - 1
def elaborate(self, platform):
m = Module()
depth = len(self.taps)
counter_in = Signal(range(depth))
counter_mult = Signal(range(depth))
counter_out = Signal(range(depth))
dsp_ready = ~self.output.valid | self.output.ready
window_valid = Signal()
window_ready = dsp_ready
dsp_ready = Signal()
multin_valid = Signal()
input_ready = Signal()
# Ready to process a sample either when the DSP slice is ready and the samples window is:
# - Not valid yet.
# - Only valid for 1 more cycle.
m.d.comb += input_ready.eq(~window_valid | ((counter_in == depth-1) & window_ready))
m.d.comb += input_ready.eq((counter_in == depth-1) & dsp_ready)
if not self.input.signature.always_ready:
m.d.comb += self.input.ready.eq(input_ready)
window = [ Signal.like(self.input.p, name=f"window_{i}") for i in range(max(depth, self.input_delayed_cycles)) ]
# Sample window.
with m.If(input_ready):
m.d.sync += window_valid.eq(self.input.valid)
with m.If(self.input.valid):
m.d.sync += window[0].eq(self.input.p)
for i in range(1, len(window)):
m.d.sync += window[i].eq(window[i-1])
m.d.sync += multin_valid.eq(window_valid)
dsp_a = Signal.like(self.input.p)
with m.Switch(counter_in):
for i in range(depth):
with m.Case(i):
m.d.sync += dsp_a.eq(window[i])
m.d.comb += self.input_delayed.eq(window[self.input_delayed_cycles-1])
# Sample counter.
with m.If(window_ready & window_valid):
with m.If((self.input.valid | (counter_in != 0)) & dsp_ready):
m.d.sync += counter_in.eq(_incr(counter_in, depth))
# Symmetry handling.
if self.symmetry:
with m.If(dsp_ready):
m.d.sync += multin_valid.eq(self.input.valid | (counter_in != 0))
window_rev = [ Signal.like(self.input.p, name=f"window_rev_{i}") for i in range(depth) ]
with m.If(input_ready & self.input.valid):
m.d.sync += window_rev[0].eq(self.rev_input)
m.d.sync += [ window_rev[i].eq(window_rev[i-1]) for i in range(1, len(window_rev)) ]
m.d.comb += self.rev_delayed.eq(window_rev[-1])
dsp_a_rev = Signal.like(self.input.p)
# Select sample from window.
dsp_a = Signal(self.shape)
with m.If(dsp_ready):
with m.Switch(counter_in):
for i in range(depth):
with m.Case(i):
m.d.sync += dsp_a_rev.eq(window_rev[depth-1-i])
m.d.sync += dsp_a.eq(self.input.p[i])
# Coefficient ROM.
taps_shape = self.taps_shape
@@ -453,33 +433,38 @@ class FIRFilterSerialMAC16(wiring.Component):
m.submodules.coeff_rom = coeff_rom = memory.Memory(data=coeff_data)
coeff_rd = coeff_rom.read_port(domain="sync")
m.d.comb += coeff_rd.addr.eq(counter_in)
m.d.comb += coeff_rd.en.eq(dsp_ready)
shape_out = self.compute_output_shape()
if self.carry:
sum_carry_q = Signal.like(self.sum_carry)
with m.If(self.input.ready & self.input.valid):
with m.If(input_ready):
m.d.sync += sum_carry_q.eq(self.sum_carry)
m.submodules.dsp = dsp = iCE40Multiplier()
if self.symmetry:
m.d.comb += dsp.a.eq(dsp_a + dsp_a_rev)
else:
m.d.comb += dsp.a.eq(dsp_a)
m.submodules.dsp = dsp = iCE40Multiplier(
o_width=shape_out.as_shape().width,
always_ready=self.always_ready)
valid_cnt = Signal(depth, init=1)
mult_cnt = Signal(depth, init=1)
m.d.comb += [
dsp.a .eq(dsp_a),
dsp.b .eq(coeff_rd.data),
shape_out(dsp.p) .eq(sum_carry_q if self.carry is not None else 0),
dsp.valid_in .eq(multin_valid & window_ready),
dsp.p_load .eq(counter_mult == 0),
dsp.valid_in .eq(multin_valid),
dsp_ready .eq(dsp.ready_in),
dsp.p_load .eq(mult_cnt[0]),
self.output.p .eq(shape_out(dsp.o)),
self.output.valid .eq(dsp.valid_out & (counter_out == depth-1)),
self.output.valid .eq(dsp.valid_out & valid_cnt[-1]),
dsp.ready_out .eq(self.output.ready | ~valid_cnt[-1]),
]
# Multiplier input and output counters.
with m.If(dsp.valid_in):
m.d.sync += counter_mult.eq(_incr(counter_mult, depth))
with m.If(dsp.valid_out):
m.d.sync += counter_out.eq(_incr(counter_out, depth))
with m.If(dsp.valid_in & dsp.ready_in):
m.d.sync += mult_cnt.eq(mult_cnt.rotate_left(1))
with m.If(dsp.valid_out & dsp.ready_out):
m.d.sync += valid_cnt.eq(valid_cnt.rotate_left(1))
return m
@@ -487,15 +472,20 @@ class FIRFilterSerialMAC16(wiring.Component):
class iCE40Multiplier(wiring.Component):
a: In(signed(16))
b: In(signed(16))
valid_in: In(1)
p: In(signed(32))
p_load: In(1)
o: Out(signed(32))
valid_out: Out(1)
def __init__(self, a_width=16, b_width=16, p_width=32, o_width=32, always_ready=False):
super().__init__({
"a": In(signed(a_width)),
"b": In(signed(b_width)),
"valid_in": In(1),
"ready_in": In(1),
"p": In(signed(p_width)),
"p_load": In(1),
"o": Out(signed(o_width)),
"valid_out": Out(1),
"ready_out": In(1),
})
self.always_ready = always_ready
self.o_width = o_width
def elaborate(self, platform):
m = Module()
@@ -507,13 +497,20 @@ class iCE40Multiplier(wiring.Component):
return pipe
p_load_v = Signal()
valid_v = Signal()
m.d.comb += valid_v.eq(self.valid_in & self.ready_in)
dsp_delay = 3
valid_pipe = pipe(self.valid_in, dsp_delay)
m.d.comb += p_load_v.eq(self.p_load & self.valid_in)
valid_pipe = pipe(valid_v, dsp_delay)
m.d.comb += p_load_v.eq(self.p_load & valid_v)
p_pipe = pipe(self.p, dsp_delay-1)
p_load_pipe = pipe(p_load_v, dsp_delay - 1)
m.d.comb += self.valid_out.eq(valid_pipe[dsp_delay])
# skid buffer
if not self.always_ready:
m.submodules.out_fifo = out_fifo = fifo.SyncFIFOBuffered(width=self.o_width, depth=dsp_delay+2)
m.d.comb += self.ready_in.eq(~self.valid_out | self.ready_out)
m.submodules.sb_mac16 = mac = SB_MAC16(
C_REG=0,
@@ -541,10 +538,10 @@ class iCE40Multiplier(wiring.Component):
# Inputs.
mac.CLK .eq(ClockSignal("sync")),
mac.CE .eq(1),
mac.C .eq(Mux(p_load_pipe[2], p_pipe[2][16:], self.o[16:])),
mac.A .eq(self.a),
mac.B .eq(self.b),
mac.D .eq(Mux(p_load_pipe[2], p_pipe[2][:16], self.o[:16])),
mac.C.as_signed().eq(Mux(p_load_pipe[2], p_pipe[2][16:], mac.O[16:])),
mac.A.as_signed().eq(self.a),
mac.B.as_signed().eq(self.b),
mac.D.as_signed().eq(Mux(p_load_pipe[2], p_pipe[2][:16], mac.O[:16])),
mac.AHOLD .eq(~valid_pipe[0]), # 0: load
mac.BHOLD .eq(~valid_pipe[0]),
mac.CHOLD .eq(0),
@@ -555,11 +552,23 @@ class iCE40Multiplier(wiring.Component):
mac.ADDSUBBOT .eq(0),
mac.OLOADTOP .eq(0),
mac.OLOADBOT .eq(0),
# Outputs.
self.o .eq(mac.O),
]
if not self.always_ready:
m.d.comb += [
out_fifo.w_data.eq(mac.O),
out_fifo.w_en.eq(valid_pipe[dsp_delay]),
self.o.eq(out_fifo.r_data),
self.valid_out.eq(out_fifo.r_rdy),
out_fifo.r_en.eq(self.ready_out),
]
else:
m.d.comb += [
self.o.eq(mac.O),
self.valid_out.eq(valid_pipe[dsp_delay]),
]
return m
@@ -593,7 +602,7 @@ class _TestFilter(unittest.TestCase):
return samples / (1 << f_width)
return samples
def _filter(self, dut, samples, count, num_channels=1, outfile=None, empty_cycles=0):
def _filter(self, dut, samples, count, num_channels=1, outfile=None, empty_cycles=0, empty_ready_cycles=0):
async def input_process(ctx):
if hasattr(dut, "enable"):
@@ -627,6 +636,10 @@ class _TestFilter(unittest.TestCase):
filtered.append(payload[0].as_float())
else:
filtered.append(payload.as_float())
if empty_ready_cycles > 0:
ctx.set(dut.output.ready, 0)
await ctx.tick().repeat(empty_ready_cycles)
ctx.set(dut.output.ready, 1)
if not dut.output.signature.always_ready:
ctx.set(dut.output.ready, 0)
@@ -645,23 +658,6 @@ class _TestFilter(unittest.TestCase):
class TestFIRFilterMAC16(_TestFilter):
def test_filter_serial(self):
taps = [-1, 0, 9, 16, 9, 0, -1]
taps = [ tap / 32 for tap in taps ]
num_samples = 1024
input_width = 8
input_samples = self._generate_samples(num_samples, input_width)
# Compute the expected result
filtered_np = np.convolve(input_samples, taps).tolist()
# Simulate DUT
dut = FIRFilterSerialMAC16(taps, fixed.SQ(15, 0), always_ready=False)
filtered = self._filter(dut, input_samples, len(input_samples))
self.assertListEqual(filtered_np[:len(filtered)], filtered)
def test_filter(self):
taps = [-1, 0, 9, 16, 9, 0, -1]
taps = [ tap / 32 for tap in taps ]
@@ -674,8 +670,8 @@ class TestFIRFilterMAC16(_TestFilter):
filtered_np = np.convolve(input_samples, taps).tolist()
# Simulate DUT
dut = FIRFilterMAC16(taps, fixed.SQ(15, 0), always_ready=False)
filtered = self._filter(dut, input_samples, len(input_samples))
dut = FIRFilterMAC16(taps, shape=fixed.SQ(8, 0), always_ready=False)
filtered = self._filter(dut, input_samples, len(input_samples), empty_ready_cycles=5)
self.assertListEqual(filtered_np[:len(filtered)], filtered)
@@ -717,7 +713,7 @@ class TestHalfBandDecimatorMAC16(_TestFilter):
"test_filter_no_backpressure_and_empty_cycles_taps1": {
"num_samples": 1024,
"dut_options": dict(**common_dut_options, always_ready=True, taps=taps0),
"sim_opts": dict(empty_cycles=3),
"sim_opts": dict(empty_cycles=6),
},
"test_filter_no_backpressure": {
@@ -768,20 +764,20 @@ class TestHalfBandInterpolatorMAC16(_TestFilter):
"test_filter_with_backpressure": {
"num_samples": 1024,
"dut_options": dict(**common_dut_options, always_ready=False, num_channels=2, taps=taps0),
"sim_opts": dict(empty_cycles=0),
"dut_options": dict(**common_dut_options, always_ready=False, num_channels=2, taps=taps1),
"sim_opts": dict(empty_cycles=0, empty_ready_cycles=0),
},
"test_filter_with_backpressure_and_empty_cycles": {
"num_samples": 1024,
"dut_options": dict(**common_dut_options, num_channels=2, always_ready=False, taps=taps0),
"sim_opts": dict(empty_cycles=3),
"sim_opts": dict(empty_ready_cycles=7, empty_cycles=3),
},
"test_filter_with_backpressure_taps1": {
"num_samples": 1024,
"dut_options": dict(**common_dut_options, num_channels=2, always_ready=False, taps=taps1),
"sim_opts": dict(empty_cycles=0),
"sim_opts": dict(empty_ready_cycles=7, empty_cycles=0),
},
"test_filter_no_backpressure_and_empty_cycles_taps1": {

View File

@@ -1,3 +1,4 @@
amaranth==v0.5.8
amaranth-boards @ git+https://github.com/amaranth-lang/amaranth-boards.git@23c66d6
lz4
numpy

View File

@@ -69,7 +69,6 @@ class Top(Elaboratable):
"cic_comp": DomainRenamer("gck1")(FIRFilter([-0.125, 0, 0.75, 0, -0.125], shape=fixed.SQ(11), shape_out=fixed.SQ(11), always_ready=False, num_channels=2)),
"cic_interpolator": CICInterpolator(2, 4, (4, 8, 16, 32), 12, 8, num_channels=2,
always_ready=False, domain="gck1"),
"skid3": DomainRenamer("gck1")(StreamSkidBuffer(IQSample(8), always_ready=False)),
}
for k,v in tx_chain.items():
m.submodules[f"tx_{k}"] = v