gateware: generic SGPIOInterface and simpler capture management

This commit is contained in:
mndza
2025-12-24 17:05:22 +01:00
parent 0e05bda647
commit 70c211d0ff
10 changed files with 283 additions and 560 deletions

View File

@@ -37,14 +37,16 @@ class PralinePlatform(LatticeICE40Platform):
Attrs(IO_STANDARD="SB_LVCMOS")),
Resource("host_data", 0, Pins("21 19 6 13 10 3 4 18", dir="io"),
Attrs(IO_STANDARD="SB_LVCMOS")),
Resource("q_invert", 0, Pins("9", dir="i"),
Attrs(IO_STANDARD="SB_LVCMOS")),
Resource("direction", 0, Pins("12", dir="i"),
Attrs(IO_STANDARD="SB_LVCMOS")),
Resource("disable", 0, Pins("23", dir="i"),
Attrs(IO_STANDARD="SB_LVCMOS")),
Resource("capture_en", 0, Pins("11", dir="o"),
Attrs(IO_STANDARD="SB_LVCMOS")),
# Other I/O.
Resource("q_invert", 0, Pins("9", dir="i"),
Attrs(IO_STANDARD="SB_LVCMOS")),
Resource("trigger_in", 0, Pins("48", dir="i"),
Attrs(IO_STANDARD="SB_LVCMOS")),
Resource("trigger_out", 0, Pins("2", dir="o"),

Binary file not shown.

View File

@@ -1 +1,3 @@
from .max586x import MAX586xInterface
from .max586x import MAX586xInterface
from .spi import SPIRegisterInterface
from .sgpio import SGPIOInterface

View File

@@ -9,13 +9,11 @@ from amaranth.lib.wiring import Out, In
from util import IQSample
class MAX586xInterface(wiring.Component):
adc_stream: Out(stream.Signature(IQSample(8), always_ready=True))
dac_stream: In(stream.Signature(IQSample(8), always_ready=True))
adc_capture: In(1)
dac_capture: In(1)
q_invert: In(1)
class MAX586xInterface(wiring.Component):
adc_stream: Out(stream.Signature(IQSample(8), always_ready=True, always_valid=True))
dac_stream: In(stream.Signature(IQSample(8), always_ready=True))
q_invert: In(1)
def __init__(self, bb_domain):
super().__init__()
@@ -47,10 +45,9 @@ class MAX586xInterface(wiring.Component):
m.d.comb += [
adc_stream.p.i .eq(adc_in.i[0] ^ 0x80), # I: non-inverted between MAX2837 and MAX5864.
adc_stream.p.q .eq(adc_in.i[1] ^ rx_q_mask), # Q: inverted between MAX2837 and MAX5864.
adc_stream.valid .eq(self.adc_capture),
]
# Output the transformed data to the DAC using a DDR output buffer.
# Output to the DAC using a DDR output buffer.
m.submodules.dac_out = dac_out = io.DDRBuffer("o", platform.request("dd", dir="-"), o_domain=self._bb_domain)
with m.If(dac_stream.valid):
m.d.comb += [

View File

@@ -0,0 +1,202 @@
#
# This file is part of HackRF.
#
# Copyright (c) 2025 Great Scott Gadgets <info@greatscottgadgets.com>
# SPDX-License-Identifier: BSD-3-Clause
from amaranth import Module, Signal, DomainRenamer, EnableInserter, ClockSignal, Instance
from amaranth.lib import io, fifo, stream, wiring, cdc
from amaranth.lib.wiring import Out, In
from util import LinearFeedbackShiftRegister
class SGPIOInterface(wiring.Component):
def __init__(self, sample_width=8, rx_assignments=None, tx_assignments=None, domain="sync"):
self.sample_width = sample_width
if rx_assignments is None:
rx_assignments = _default_rx_assignments(sample_width // 8)
if tx_assignments is None:
tx_assignments = _default_tx_assignments(sample_width // 8)
self.rx_assignments = rx_assignments
self.tx_assignments = tx_assignments
self._domain = domain
super().__init__({
"adc_stream": In(stream.Signature(sample_width, always_ready=True)),
"dac_stream": Out(stream.Signature(sample_width)),
"trigger_en": In(1),
"prbs": In(1),
})
def elaborate(self, platform):
m = Module()
adc_stream = self.adc_stream
dac_stream = self.dac_stream
rx_cycles = len(self.rx_assignments)
tx_cycles = len(self.tx_assignments)
direction_i = platform.request("direction").i
enable_i = ~platform.request("disable").i
capture_en = platform.request("capture_en").o
m.d.comb += capture_en.eq(1)
# Determine data transfer direction.
direction = Signal()
m.submodules.direction_cdc = cdc.FFSynchronizer(direction_i, direction, o_domain=self._domain)
transfer_from_adc = (direction == 0)
# SGPIO clock and data lines.
tx_clk_en = Signal()
rx_clk_en = Signal()
data_to_host = Signal(self.sample_width)
byte_to_host = Signal(8)
data_from_host = Signal(self.sample_width)
byte_from_host = Signal(8)
m.submodules.clk_out = clk_out = io.DDRBuffer("o", platform.request("host_clk", dir="-"), o_domain=self._domain)
m.submodules.host_io = host_io = io.DDRBuffer('io', platform.request("host_data", dir="-"), i_domain=self._domain, o_domain=self._domain)
m.d.sync += clk_out.o[0].eq(tx_clk_en)
m.d.sync += clk_out.o[1].eq(rx_clk_en)
m.d.sync += host_io.oe.eq(transfer_from_adc)
m.d.comb += host_io.o[0].eq(byte_to_host)
m.d.comb += host_io.o[1].eq(byte_to_host)
m.d.comb += byte_from_host.eq(host_io.i[1])
# Transmission is handled differently to account for the latency before the data
# becomes available in the FPGA fabric.
ddr_in_latency = 2 # for iCE40 DDR inputs in Amaranth.
tx_write_latency = tx_cycles + ddr_in_latency
tx_write_pipe = Signal(tx_write_latency)
m.d.sync += tx_write_pipe.eq(tx_write_pipe << 1)
for i in range(tx_cycles-1): # don't store last byte
with m.If(tx_write_pipe[ddr_in_latency + i]):
m.d.sync += self.tx_assignments[i](data_from_host, byte_from_host)
# Small TX FIFO to avoid missing samples when the consumer deasserts its ready
# signal and transfers are in progress.
m.submodules.tx_fifo = tx_fifo = fifo.SyncFIFOBuffered(width=self.sample_width, depth=16)
m.d.comb += [
tx_fifo.w_data .eq(data_from_host),
self.tx_assignments[-1](tx_fifo.w_data, byte_from_host),
tx_fifo.w_en .eq(tx_write_pipe[-1]),
dac_stream.p .eq(tx_fifo.r_data),
dac_stream.valid .eq(tx_fifo.r_rdy),
tx_fifo.r_en .eq(dac_stream.ready),
]
# Pseudo-random binary sequence generator.
prbs_advance = Signal()
prbs_count = Signal(2)
m.submodules.prbs = prbs = EnableInserter(prbs_advance)(
LinearFeedbackShiftRegister(degree=8, taps=[8,6,5,4], init=0b10110001))
# Capture signal generation.
capture = Signal()
m.submodules.trigger_gen = trigger_gen = FlowAndTriggerControl(domain=self._domain)
m.d.comb += [
trigger_gen.enable.eq(enable_i),
trigger_gen.trigger_en.eq(self.trigger_en),
capture.eq(trigger_gen.capture),
]
# Main state machine.
with m.FSM():
with m.State("IDLE"):
with m.If(transfer_from_adc):
with m.If(self.prbs):
m.next = "PRBS"
with m.Elif(adc_stream.valid & capture):
m.d.comb += rx_clk_en.eq(1)
m.d.sync += data_to_host.eq(adc_stream.p)
m.d.sync += byte_to_host.eq(self.rx_assignments[0](adc_stream.p))
if rx_cycles > 1:
m.next = "RX0"
with m.Else():
with m.If(dac_stream.ready & capture):
m.d.comb += tx_clk_en.eq(1)
m.d.sync += tx_write_pipe[0].eq(capture)
if tx_cycles > 1:
m.next = "TX0"
for i in range(rx_cycles-1):
with m.State(f"RX{i}"):
m.d.comb += rx_clk_en.eq(1)
m.d.sync += byte_to_host.eq(self.rx_assignments[i+1](data_to_host))
m.next = "IDLE" if i == rx_cycles-2 else f"RX{i+1}"
for i in range(tx_cycles-1):
with m.State(f"TX{i}"):
m.d.comb += tx_clk_en.eq(1)
m.next = "IDLE" if i == tx_cycles-2 else f"TX{i+1}"
with m.State("PRBS"):
m.d.comb += rx_clk_en.eq(prbs_count == 0)
m.d.comb += prbs_advance.eq(prbs_count == 0)
m.d.sync += byte_to_host.eq(prbs.value)
m.d.sync += prbs_count.eq(prbs_count + 1)
with m.If(~self.prbs):
m.next = "IDLE"
# Convert to other clock domain if necessary.
if self._domain != "sync":
m = DomainRenamer(self._domain)(m)
return m
def _default_rx_assignments(n):
def rx_assignment(i):
def _f(w):
return w.word_select(i, 8)
return _f
return [ rx_assignment(i) for i in range(n) ]
def _default_tx_assignments(n):
def tx_assignment(i):
def _f(w, v):
return w.word_select(i, 8).eq(v)
return _f
return [ tx_assignment(i) for i in range(n) ]
class FlowAndTriggerControl(wiring.Component):
trigger_en: In(1)
enable: In(1)
capture: Out(1)
def __init__(self, domain):
super().__init__()
self._domain = domain
def elaborate(self, platform):
m = Module()
#
# Signal synchronization and trigger logic.
#
trigger_enable = self.trigger_en
trigger_in = platform.request("trigger_in").i
trigger_out = platform.request("trigger_out").o
m.d.comb += trigger_out.eq(self.enable)
# Create a latch for the trigger input signal using a special FPGA primitive.
trigger_in_latched = Signal()
trigger_in_reg = Instance("SB_DFFES",
i_D = 0,
i_S = trigger_in, # async set
i_E = ~self.enable,
i_C = ClockSignal(self._domain),
o_Q = trigger_in_latched
)
m.submodules.trigger_in_reg = trigger_in_reg
# Export signal for capture gating.
m.d[self._domain] += self.capture.eq(self.enable & (trigger_in_latched | ~trigger_enable))
return m

View File

@@ -4,15 +4,13 @@
# Copyright (c) 2025 Great Scott Gadgets <info@greatscottgadgets.com>
# SPDX-License-Identifier: BSD-3-Clause
from amaranth import Elaboratable, Module, Signal, Mux, Instance, Cat, ClockSignal, DomainRenamer
from amaranth.lib import io, fifo, stream, wiring
from amaranth.lib.wiring import Out, In, connect
from amaranth import Elaboratable, Module, Cat, DomainRenamer
from amaranth.lib.wiring import connect
from amaranth_future import fixed
from board import PralinePlatform, ClockDomainGenerator
from interface import MAX586xInterface
from interface.spi import SPIRegisterInterface
from interface import MAX586xInterface, SGPIOInterface, SPIRegisterInterface
from dsp.fir import FIRFilter
from dsp.fir_mac16 import HalfBandDecimatorMAC16
from dsp.cic import CICDecimator
@@ -21,119 +19,6 @@ from dsp.quarter_shift import QuarterShift
from util import ClockConverter, IQSample
class MCUInterface(wiring.Component):
adc_stream: In(stream.Signature(IQSample(12), always_ready=True))
direction: In(1)
enable: In(1)
def __init__(self, domain="sync"):
self._domain = domain
super().__init__()
def elaborate(self, platform):
m = Module()
adc_stream = self.adc_stream
# Determine data transfer direction.
direction = Signal()
enable = Signal()
m.d.sync += enable.eq(self.enable)
m.d.sync += direction.eq(self.direction)
transfer_from_adc = (direction == 0)
# SGPIO clock and data lines.
m.submodules.clk_out = clk_out = io.DDRBuffer("o", platform.request("host_clk", dir="-"), o_domain=self._domain)
m.submodules.host_io = host_io = io.DDRBuffer('io', platform.request("host_data", dir="-"), i_domain=self._domain, o_domain=self._domain)
# State machine to control SGPIO clock and data lines.
rx_clk_en = Signal()
m.d.sync += clk_out.o[1].eq(rx_clk_en)
m.d.sync += host_io.oe.eq(transfer_from_adc)
data_to_host = Signal.like(adc_stream.p)
rx_data_buffer = Signal(8)
m.d.comb += host_io.o[0].eq(rx_data_buffer)
m.d.comb += host_io.o[1].eq(rx_data_buffer)
with m.FSM():
with m.State("IDLE"):
m.d.comb += rx_clk_en.eq(enable & transfer_from_adc & adc_stream.valid)
with m.If(rx_clk_en):
m.d.sync += rx_data_buffer.eq(adc_stream.p.i >> 8)
m.d.sync += data_to_host.eq(adc_stream.p)
m.next = "RX_I1"
with m.State("RX_I1"):
m.d.comb += rx_clk_en.eq(1)
m.d.sync += rx_data_buffer.eq(data_to_host.i)
m.next = "RX_Q0"
with m.State("RX_Q0"):
m.d.comb += rx_clk_en.eq(1)
m.d.sync += rx_data_buffer.eq(data_to_host.q >> 8)
m.next = "RX_Q1"
with m.State("RX_Q1"):
m.d.comb += rx_clk_en.eq(1)
m.d.sync += rx_data_buffer.eq(data_to_host.q)
m.next = "IDLE"
if self._domain != "sync":
m = DomainRenamer(self._domain)(m)
return m
class FlowAndTriggerControl(wiring.Component):
trigger_en: In(1)
direction: Out(1) # async
enable: Out(1) # async
adc_capture: Out(1)
dac_capture: Out(1)
def __init__(self, domain):
super().__init__()
self._domain = domain
def elaborate(self, platform):
m = Module()
#
# Signal synchronization and trigger logic.
#
trigger_enable = self.trigger_en
trigger_in = platform.request("trigger_in").i
trigger_out = platform.request("trigger_out").o
host_data_enable = ~platform.request("disable").i
m.d.comb += trigger_out.eq(host_data_enable)
# Create a latch for the trigger input signal using a special FPGA primitive.
trigger_in_latched = Signal()
trigger_in_reg = Instance("SB_DFFES",
i_D = 0,
i_S = trigger_in, # async set
i_E = ~host_data_enable,
i_C = ClockSignal(self._domain),
o_Q = trigger_in_latched
)
m.submodules.trigger_in_reg = trigger_in_reg
# Export signals for direction control and capture gating.
m.d.comb += self.direction.eq(platform.request("direction").i)
m.d.comb += self.enable.eq(host_data_enable)
with m.If(host_data_enable):
m.d[self._domain] += self.adc_capture.eq((trigger_in_latched | ~trigger_enable) & (self.direction == 0))
m.d[self._domain] += self.dac_capture.eq((trigger_in_latched | ~trigger_enable) & (self.direction == 1))
with m.Else():
m.d[self._domain] += self.adc_capture.eq(0)
m.d[self._domain] += self.dac_capture.eq(0)
return m
class Top(Elaboratable):
def elaborate(self, platform):
@@ -142,15 +27,25 @@ class Top(Elaboratable):
m.submodules.clkgen = ClockDomainGenerator()
# Submodules.
m.submodules.flow_ctl = flow_ctl = FlowAndTriggerControl(domain="gck1")
m.submodules.adcdac_intf = adcdac_intf = MAX586xInterface(bb_domain="gck1")
m.submodules.mcu_intf = mcu_intf = MCUInterface(domain="sync")
m.submodules.mcu_intf = mcu_intf = SGPIOInterface(
sample_width=24,
rx_assignments=[
lambda w: Cat(w[8:12], w[11].replicate(4)),
lambda w: w[0:8],
lambda w: Cat(w[20:24], w[23].replicate(4)),
lambda w: w[12:20],
],
tx_assignments=[
lambda w, v: w[8:12].eq(v),
lambda w, v: w[0:8].eq(v),
lambda w, v: w[20:24].eq(v),
lambda w, v: w[12:20].eq(v),
],
domain="sync"
)
m.d.comb += adcdac_intf.adc_capture.eq(flow_ctl.adc_capture)
m.d.comb += adcdac_intf.dac_capture.eq(flow_ctl.dac_capture)
m.d.comb += adcdac_intf.q_invert.eq(platform.request("q_invert").i)
m.d.comb += mcu_intf.direction.eq(flow_ctl.direction)
m.d.comb += mcu_intf.enable.eq(flow_ctl.enable)
# Half-band filter taps.
taps_hb1 = [-2, 0, 5, 0, -10, 0,18, 0, -30, 0,53, 0,-101, 0, 323, 512, 323, 0,-101, 0, 53, 0, -30, 0,18, 0, -10, 0, 5, 0,-2]
@@ -173,7 +68,7 @@ class Top(Elaboratable):
"hbfir2": HalfBandDecimatorMAC16(taps_hb2, data_shape=fixed.SQ(11), overclock_rate=8, always_ready=True, domain="gck1"),
# Clock domain conversion.
"clkconv": ClockConverter(IQSample(12), 4, "gck1", "sync", always_ready=True),
"clkconv": ClockConverter(IQSample(12), 8, "gck1", "sync", always_ready=True),
}
for k,v in rx_chain.items():
m.submodules[f"rx_{k}"] = v
@@ -196,7 +91,7 @@ class Top(Elaboratable):
m.d.comb += [
# Trigger enable.
flow_ctl.trigger_en .eq(ctrl[7]),
mcu_intf.trigger_en .eq(ctrl[7]),
# RX settings.
rx_chain["dc_block"].enable .eq(ctrl[0]),

View File

@@ -4,140 +4,19 @@
# Copyright (c) 2025 Great Scott Gadgets <info@greatscottgadgets.com>
# SPDX-License-Identifier: BSD-3-Clause
from amaranth import Elaboratable, Module, Signal, Instance, Cat, ClockSignal, DomainRenamer
from amaranth.lib import io, fifo, stream, wiring
from amaranth.lib.wiring import Out, In, connect
from amaranth import Elaboratable, Module, Cat, DomainRenamer
from amaranth.lib.wiring import connect
from amaranth_future import fixed
from board import PralinePlatform, ClockDomainGenerator
from interface import MAX586xInterface
from interface.spi import SPIRegisterInterface
from interface import MAX586xInterface, SGPIOInterface, SPIRegisterInterface
from dsp.fir import FIRFilter
from dsp.fir_mac16 import HalfBandInterpolatorMAC16
from dsp.cic import CICInterpolator
from util import ClockConverter, IQSample, StreamSkidBuffer
class MCUInterface(wiring.Component):
dac_stream: Out(stream.Signature(IQSample(12)))
direction: In(1)
enable: In(1)
def __init__(self, domain="sync"):
self._domain = domain
super().__init__()
def elaborate(self, platform):
m = Module()
dac_stream = self.dac_stream
# Determine data transfer direction.
direction = Signal()
enable = Signal()
m.d.sync += enable.eq(self.enable)
m.d.sync += direction.eq(self.direction)
transfer_to_dac = (direction == 1)
# SGPIO clock and data lines.
m.submodules.clk_out = clk_out = io.DDRBuffer("o", platform.request("host_clk", dir="-"), o_domain=self._domain)
m.submodules.host_io = host_io = io.DDRBuffer('io', platform.request("host_data", dir="-"), i_domain=self._domain, o_domain=self._domain)
# State machine to control SGPIO clock and data lines.
tx_clk_en = Signal()
m.d.sync += clk_out.o[0].eq(tx_clk_en)
tx_dly_write = Signal(4)
tx_in_sample = Signal(4*8)
m.d.sync += tx_dly_write.eq(tx_dly_write << 1)
m.d.sync += tx_in_sample.eq(Cat(host_io.i[1], tx_in_sample))
# Small TX FIFO to avoid overflows from the write delay.
m.submodules.tx_fifo = tx_fifo = fifo.SyncFIFOBuffered(width=24, depth=4)
m.d.comb += [
tx_fifo.w_data.word_select(0, 12) .eq(tx_in_sample[20:32]),
tx_fifo.w_data.word_select(1, 12) .eq(tx_in_sample[4:16]),
tx_fifo.w_en .eq(tx_dly_write[-1]),
dac_stream.p .eq(tx_fifo.r_data),
dac_stream.valid .eq(tx_fifo.r_rdy),
tx_fifo.r_en .eq(dac_stream.ready),
]
with m.FSM():
with m.State("IDLE"):
m.d.comb += tx_clk_en.eq(enable & transfer_to_dac & dac_stream.ready)
with m.If(tx_clk_en):
m.next = "TX_I1"
with m.State("TX_I1"):
m.d.comb += tx_clk_en.eq(1)
m.next = "TX_Q0"
with m.State("TX_Q0"):
m.d.comb += tx_clk_en.eq(1)
m.next = "TX_Q1"
with m.State("TX_Q1"):
m.d.comb += tx_clk_en.eq(1)
m.d.sync += tx_dly_write[0].eq(1) # delayed write
m.next = "IDLE"
if self._domain != "sync":
m = DomainRenamer(self._domain)(m)
return m
class FlowAndTriggerControl(wiring.Component):
trigger_en: In(1)
direction: Out(1) # async
enable: Out(1) # async
adc_capture: Out(1)
dac_capture: Out(1)
def __init__(self, domain):
super().__init__()
self._domain = domain
def elaborate(self, platform):
m = Module()
#
# Signal synchronization and trigger logic.
#
trigger_enable = self.trigger_en
trigger_in = platform.request("trigger_in").i
trigger_out = platform.request("trigger_out").o
host_data_enable = ~platform.request("disable").i
m.d.comb += trigger_out.eq(host_data_enable)
# Create a latch for the trigger input signal using a special FPGA primitive.
trigger_in_latched = Signal()
trigger_in_reg = Instance("SB_DFFES",
i_D = 0,
i_S = trigger_in, # async set
i_E = ~host_data_enable,
i_C = ClockSignal(self._domain),
o_Q = trigger_in_latched
)
m.submodules.trigger_in_reg = trigger_in_reg
# Export signals for direction control and capture gating.
m.d.comb += self.direction.eq(platform.request("direction").i)
m.d.comb += self.enable.eq(host_data_enable)
with m.If(host_data_enable):
m.d[self._domain] += self.adc_capture.eq((trigger_in_latched | ~trigger_enable) & (self.direction == 0))
m.d[self._domain] += self.dac_capture.eq((trigger_in_latched | ~trigger_enable) & (self.direction == 1))
with m.Else():
m.d[self._domain] += self.adc_capture.eq(0)
m.d[self._domain] += self.dac_capture.eq(0)
return m
class Top(Elaboratable):
def elaborate(self, platform):
@@ -146,15 +25,27 @@ class Top(Elaboratable):
m.submodules.clkgen = ClockDomainGenerator()
# Submodules.
m.submodules.flow_ctl = flow_ctl = FlowAndTriggerControl(domain="gck1")
m.submodules.adcdac_intf = adcdac_intf = MAX586xInterface(bb_domain="gck1")
m.submodules.mcu_intf = mcu_intf = MCUInterface(domain="sync")
m.submodules.mcu_intf = mcu_intf = SGPIOInterface(
sample_width=24,
rx_assignments=[
lambda w: Cat(w[8:12], w[11].replicate(4)),
lambda w: w[0:8],
lambda w: Cat(w[20:24], w[23].replicate(4)),
lambda w: w[12:20],
],
tx_assignments=[
lambda w, v: w[8:12].eq(v),
lambda w, v: w[0:8].eq(v),
lambda w, v: w[20:24].eq(v),
lambda w, v: w[12:20].eq(v),
],
domain="sync"
)
m.d.comb += adcdac_intf.dac_capture.eq(flow_ctl.dac_capture)
m.d.comb += adcdac_intf.q_invert.eq(platform.request("q_invert").i)
m.d.comb += mcu_intf.direction.eq(flow_ctl.direction)
m.d.comb += mcu_intf.enable.eq(flow_ctl.enable)
# Half-band filter taps.
taps_hb1 = [-2, 0, 5, 0, -10, 0,18, 0, -30, 0,53, 0,-101, 0, 323, 512, 323, 0,-101, 0, 53, 0, -30, 0,18, 0, -10, 0, 5, 0,-2]
taps_hb1 = [ tap/1024 for tap in taps_hb1 ]
@@ -164,7 +55,7 @@ class Top(Elaboratable):
tx_chain = {
# Clock domain conversion.
"clkconv": ClockConverter(IQSample(12), 4, "sync", "gck1", always_ready=False),
"clkconv": ClockConverter(IQSample(12), 8, "sync", "gck1", always_ready=False),
# Half-band interpolation stages (+ skid buffers for timing closure).
"hbfir1": HalfBandInterpolatorMAC16(taps_hb1, data_shape=fixed.SQ(11),
@@ -176,9 +67,9 @@ class Top(Elaboratable):
# CIC interpolation stage.
"cic_comp": DomainRenamer("gck1")(FIRFilter([-0.125, 0, 0.75, 0, -0.125], shape=fixed.SQ(11), shape_out=fixed.SQ(11), always_ready=False, num_channels=2)),
"cic_interpolator": CICInterpolator(2, 4, (4, 8, 16, 32), 12, 8, num_channels=2,
always_ready=False, domain="gck1"),
"skid3": DomainRenamer("gck1")(StreamSkidBuffer(IQSample(8), always_ready=False)),
}
for k,v in tx_chain.items():
m.submodules[f"tx_{k}"] = v
@@ -201,7 +92,7 @@ class Top(Elaboratable):
m.d.comb += [
# Trigger enable.
flow_ctl.trigger_en .eq(ctrl[7]),
mcu_intf.trigger_en .eq(ctrl[7]),
# TX interpolation rate.
tx_chain["cic_interpolator"].factor .eq(tx_intrp + 2),

View File

@@ -5,128 +5,17 @@
# Copyright (c) 2024 Great Scott Gadgets <info@greatscottgadgets.com>
# SPDX-License-Identifier: BSD-3-Clause
from amaranth import Elaboratable, Module, Signal, C, Mux, Instance, Cat, ClockSignal, DomainRenamer, signed
from amaranth.lib import io, stream, wiring, cdc, data, fifo
from amaranth import Elaboratable, Module, DomainRenamer
from amaranth.lib import stream, wiring
from amaranth.lib.wiring import Out, In, connect
from board import PralinePlatform, ClockDomainGenerator
from interface import MAX586xInterface
from interface.spi import SPIRegisterInterface
from interface import MAX586xInterface, SGPIOInterface, SPIRegisterInterface
from dsp.dc_block import DCBlock
from dsp.round import convergent_round
from util import IQSample, ClockConverter
class MCUInterface(wiring.Component):
adc_stream: In(stream.Signature(IQSample(4), always_ready=True))
dac_stream: Out(stream.Signature(IQSample(4)))
direction: In(1)
enable: In(1)
def __init__(self, domain="sync"):
self._domain = domain
super().__init__()
def elaborate(self, platform):
m = Module()
adc_stream = self.adc_stream
dac_stream = self.dac_stream
# Determine data transfer direction.
direction = Signal()
enable = Signal()
m.d.sync += enable.eq(self.enable)
m.d.sync += direction.eq(self.direction)
transfer_from_adc = (direction == 0)
transfer_to_dac = (direction == 1)
# SGPIO clock and data lines.
m.submodules.clk_out = clk_out = io.DDRBuffer("o", platform.request("host_clk", dir="-"), o_domain=self._domain)
m.submodules.host_io = host_io = io.DDRBuffer('io', platform.request("host_data", dir="-"), i_domain=self._domain, o_domain=self._domain)
# State machine to control SGPIO clock and data lines.
m.d.sync += clk_out.o[0].eq(0)
m.d.sync += clk_out.o[1].eq(0)
m.d.sync += host_io.oe.eq(transfer_from_adc)
data_to_host = Signal.like(Cat(adc_stream.p.i, adc_stream.p.q))
assert len(data_to_host) == 8
m.d.comb += host_io.o[0].eq(data_to_host)
m.d.comb += host_io.o[1].eq(data_to_host)
tx_dly_write = Signal(2)
m.d.sync += tx_dly_write.eq(tx_dly_write << 1)
m.d.comb += dac_stream.payload.eq(host_io.i[1])
m.d.comb += dac_stream.valid.eq(tx_dly_write[-1])
with m.FSM():
with m.State("IDLE"):
with m.If(enable):
with m.If(transfer_from_adc & adc_stream.valid):
m.d.sync += data_to_host.eq(Cat(adc_stream.p.i, adc_stream.p.q))
m.d.sync += clk_out.o[1].eq(1)
with m.Elif(transfer_to_dac & dac_stream.ready):
m.d.sync += clk_out.o[0].eq(1)
m.d.sync += tx_dly_write[0].eq(1) # delayed write
if self._domain != "sync":
m = DomainRenamer(self._domain)(m)
return m
class FlowAndTriggerControl(wiring.Component):
trigger_en: In(1)
direction: Out(1) # async
enable: Out(1) # async
adc_capture: Out(1)
dac_capture: Out(1)
def __init__(self, domain):
super().__init__()
self._domain = domain
def elaborate(self, platform):
m = Module()
#
# Signal synchronization and trigger logic.
#
trigger_enable = self.trigger_en
trigger_in = platform.request("trigger_in").i
trigger_out = platform.request("trigger_out").o
host_data_enable = ~platform.request("disable").i
m.d.comb += trigger_out.eq(host_data_enable)
# Create a latch for the trigger input signal using a FPGA primitive.
trigger_in_latched = Signal()
trigger_in_reg = Instance("SB_DFFES",
i_D = 0,
i_S = trigger_in, # async set
i_E = ~host_data_enable,
i_C = ClockSignal(self._domain),
o_Q = trigger_in_latched
)
m.submodules.trigger_in_reg = trigger_in_reg
# Export signals for direction control and gating captures.
m.d.comb += self.direction.eq(platform.request("direction").i)
m.d.comb += self.enable.eq(host_data_enable)
with m.If(host_data_enable):
m.d[self._domain] += self.adc_capture.eq((trigger_in_latched | ~trigger_enable) & (self.direction == 0))
m.d[self._domain] += self.dac_capture.eq((trigger_in_latched | ~trigger_enable) & (self.direction == 1))
with m.Else():
m.d[self._domain] += self.adc_capture.eq(0)
m.d[self._domain] += self.dac_capture.eq(0)
return m
class IQHalfPrecisionConverter(wiring.Component):
input: In(stream.Signature(IQSample(8), always_ready=True))
output: Out(stream.Signature(IQSample(4), always_ready=True))
@@ -167,22 +56,18 @@ class Top(Elaboratable):
m.submodules.clkgen = ClockDomainGenerator()
# Submodules.
m.submodules.flow_ctl = flow_ctl = FlowAndTriggerControl(domain="gck1")
m.submodules.adcdac_intf = adcdac_intf = MAX586xInterface(bb_domain="gck1")
m.submodules.mcu_intf = mcu_intf = MCUInterface(domain="sync")
m.submodules.mcu_intf = mcu_intf = SGPIOInterface(sample_width=8, domain="sync")
m.d.comb += adcdac_intf.adc_capture.eq(flow_ctl.adc_capture)
m.d.comb += adcdac_intf.dac_capture.eq(flow_ctl.dac_capture)
m.d.comb += adcdac_intf.q_invert.eq(platform.request("q_invert").i)
m.d.comb += mcu_intf.direction.eq(flow_ctl.direction)
m.d.comb += mcu_intf.enable.eq(flow_ctl.enable)
rx_chain = {
"dc_block": DCBlock(width=8, num_channels=2, domain="gck1"),
"half_prec": DomainRenamer("gck1")(IQHalfPrecisionConverter()),
"clkconv": ClockConverter(IQSample(4), 4, "gck1", "sync"),
"clkconv": ClockConverter(IQSample(4), 16, "gck1", "sync"),
}
m.submodules += rx_chain.values()
for k,v in rx_chain.items():
m.submodules[f"rx_{k}"] = v
# Connect receiver chain.
last = adcdac_intf.adc_stream
@@ -193,10 +78,11 @@ class Top(Elaboratable):
tx_chain = {
"clkconv": ClockConverter(IQSample(4), 4, "sync", "gck1", always_ready=False),
"clkconv": ClockConverter(IQSample(4), 16, "sync", "gck1", always_ready=False),
"half_prec": DomainRenamer("gck1")(IQHalfPrecisionConverterInv()),
}
m.submodules += tx_chain.values()
for k,v in tx_chain.items():
m.submodules[f"tx_{k}"] = v
# Connect transmitter chain.
last = mcu_intf.dac_stream
@@ -213,7 +99,7 @@ class Top(Elaboratable):
ctrl = spi_regs.add_register(0x01, init=0)
m.d.comb += [
# Trigger enable.
flow_ctl.trigger_en .eq(ctrl[7]),
mcu_intf.trigger_en .eq(ctrl[7]),
# RX settings.
rx_chain["dc_block"].enable .eq(ctrl[0]),
@@ -224,4 +110,4 @@ class Top(Elaboratable):
if __name__ == "__main__":
plat = PralinePlatform()
plat.build(Top_HP())
plat.build(Top())

View File

@@ -4,168 +4,20 @@
# Copyright (c) 2025 Great Scott Gadgets <info@greatscottgadgets.com>
# SPDX-License-Identifier: BSD-3-Clause
from amaranth import Elaboratable, Module, Signal, Mux, Instance, Cat, ClockSignal, DomainRenamer, EnableInserter
from amaranth.lib import io, fifo, stream, wiring, cdc
from amaranth.lib.wiring import Out, In, connect
from amaranth import Elaboratable, Module, Signal, Mux, DomainRenamer
from amaranth.lib import cdc
from amaranth.lib.wiring import connect
from amaranth_future import fixed
from board import PralinePlatform, ClockDomainGenerator
from interface import MAX586xInterface
from interface.spi import SPIRegisterInterface
from interface import MAX586xInterface, SGPIOInterface, SPIRegisterInterface
from dsp.fir import HalfBandDecimator, HalfBandInterpolator
from dsp.cic import CICDecimator, CICInterpolator
from dsp.cic import CICInterpolator
from dsp.dc_block import DCBlock
from dsp.quarter_shift import QuarterShift
from dsp.nco import NCO
from util import ClockConverter, IQSample, StreamSkidBuffer, LinearFeedbackShiftRegister
class MCUInterface(wiring.Component):
adc_stream: In(stream.Signature(IQSample(8), always_ready=True))
dac_stream: Out(stream.Signature(IQSample(8)))
direction: In(1)
enable: In(1)
prbs: In(1)
def __init__(self, domain="sync"):
self._domain = domain
super().__init__()
def elaborate(self, platform):
m = Module()
adc_stream = self.adc_stream
dac_stream = self.dac_stream
# Determine data transfer direction.
direction = Signal()
enable = Signal()
m.submodules.enable_cdc = cdc.FFSynchronizer(self.enable, enable, o_domain=self._domain)
m.submodules.direction_cdc = cdc.FFSynchronizer(self.direction, direction, o_domain=self._domain)
transfer_from_adc = (direction == 0)
transfer_to_dac = (direction == 1)
# SGPIO clock and data lines.
m.submodules.clk_out = clk_out = io.DDRBuffer("o", platform.request("host_clk", dir="-"), o_domain=self._domain)
m.submodules.host_io = host_io = io.DDRBuffer('io', platform.request("host_data", dir="-"), i_domain=self._domain, o_domain=self._domain)
# State machine to control SGPIO clock and data lines.
tx_clk_en = Signal()
rx_clk_en = Signal()
m.d.sync += clk_out.o[0].eq(tx_clk_en)
m.d.sync += clk_out.o[1].eq(rx_clk_en)
m.d.sync += host_io.oe.eq(transfer_from_adc)
data_to_host = Signal.like(adc_stream.p)
m.d.comb += host_io.o[0].eq(data_to_host)
m.d.comb += host_io.o[1].eq(data_to_host)
tx_dly_write = Signal(3)
host_io_prev_data = Signal(8)
m.d.sync += tx_dly_write.eq(tx_dly_write << 1)
m.d.sync += host_io_prev_data.eq(host_io.i[1])
# Small TX FIFO to avoid overflows from the write delay.
m.submodules.tx_fifo = tx_fifo = fifo.SyncFIFOBuffered(width=16, depth=8)
m.d.comb += [
tx_fifo.w_data .eq(Cat(host_io_prev_data, host_io.i[1])),
tx_fifo.w_en .eq(tx_dly_write[-1]),
dac_stream.p .eq(tx_fifo.r_data),
dac_stream.valid .eq(tx_fifo.r_rdy),
tx_fifo.r_en .eq(dac_stream.ready),
]
# Pseudo-random binary sequence generator.
prbs_advance = Signal()
prbs_count = Signal(2)
m.submodules.prbs = prbs = EnableInserter(prbs_advance)(
LinearFeedbackShiftRegister(degree=8, taps=[8,6,5,4], init=0b10110001))
with m.FSM():
with m.State("IDLE"):
m.d.comb += tx_clk_en.eq(enable & transfer_to_dac & dac_stream.ready)
m.d.comb += rx_clk_en.eq(enable & transfer_from_adc & adc_stream.valid)
with m.If(self.prbs):
m.next = "PRBS"
with m.Elif(rx_clk_en):
m.d.sync += data_to_host.eq(adc_stream.p)
m.next = "RX_Q"
with m.Elif(tx_clk_en):
m.next = "TX_Q"
with m.State("RX_Q"):
m.d.comb += rx_clk_en.eq(1)
m.d.sync += data_to_host.i.eq(data_to_host.q)
m.next = "IDLE"
with m.State("TX_Q"):
m.d.comb += tx_clk_en.eq(1)
m.d.sync += tx_dly_write[0].eq(1) # delayed write
m.next = "IDLE"
with m.State("PRBS"):
m.d.sync += host_io.oe.eq(1)
m.d.sync += data_to_host.eq(prbs.value)
m.d.comb += rx_clk_en.eq(prbs_count == 0)
m.d.comb += prbs_advance.eq(prbs_count == 0)
m.d.sync += prbs_count.eq(prbs_count + 1)
with m.If(~self.prbs):
m.next = "IDLE"
if self._domain != "sync":
m = DomainRenamer(self._domain)(m)
return m
class FlowAndTriggerControl(wiring.Component):
trigger_en: In(1)
direction: Out(1) # async
enable: Out(1) # async
adc_capture: Out(1)
dac_capture: Out(1)
def __init__(self, domain):
super().__init__()
self._domain = domain
def elaborate(self, platform):
m = Module()
#
# Signal synchronization and trigger logic.
#
trigger_enable = self.trigger_en
trigger_in = platform.request("trigger_in").i
trigger_out = platform.request("trigger_out").o
host_data_enable = ~platform.request("disable").i
m.d.comb += trigger_out.eq(host_data_enable)
# Create a latch for the trigger input signal using a special FPGA primitive.
trigger_in_latched = Signal()
trigger_in_reg = Instance("SB_DFFES",
i_D = 0,
i_S = trigger_in, # async set
i_E = ~host_data_enable,
i_C = ClockSignal(self._domain),
o_Q = trigger_in_latched
)
m.submodules.trigger_in_reg = trigger_in_reg
# Export signals for direction control and capture gating.
m.d.comb += self.direction.eq(platform.request("direction").i)
m.d.comb += self.enable.eq(host_data_enable)
with m.If(host_data_enable):
m.d[self._domain] += self.adc_capture.eq((trigger_in_latched | ~trigger_enable) & (self.direction == 0))
m.d[self._domain] += self.dac_capture.eq((trigger_in_latched | ~trigger_enable) & (self.direction == 1))
with m.Else():
m.d[self._domain] += self.adc_capture.eq(0)
m.d[self._domain] += self.dac_capture.eq(0)
return m
from util import ClockConverter, IQSample, StreamSkidBuffer
class Top(Elaboratable):
@@ -176,15 +28,10 @@ class Top(Elaboratable):
m.submodules.clkgen = ClockDomainGenerator()
# Submodules.
m.submodules.flow_ctl = flow_ctl = FlowAndTriggerControl(domain="gck1")
m.submodules.adcdac_intf = adcdac_intf = MAX586xInterface(bb_domain="gck1")
m.submodules.mcu_intf = mcu_intf = MCUInterface(domain="sync")
m.submodules.mcu_intf = mcu_intf = SGPIOInterface(sample_width=16, domain="sync")
m.d.comb += adcdac_intf.adc_capture.eq(flow_ctl.adc_capture)
m.d.comb += adcdac_intf.dac_capture.eq(flow_ctl.dac_capture)
m.d.comb += adcdac_intf.q_invert.eq(platform.request("q_invert").i)
m.d.comb += mcu_intf.direction.eq(flow_ctl.direction)
m.d.comb += mcu_intf.enable.eq(flow_ctl.enable)
# Half-band filter taps.
taps = [-2, 0, 7, 0, -18, 0, 41, 0, -92, 0, 320, 512, 320, 0, -92, 0, 41, 0, -18, 0, 7, 0, -2]
@@ -221,7 +68,7 @@ class Top(Elaboratable):
"hbfir1": HalfBandDecimator(taps, **common_rx_filter_opts),
# Clock domain conversion.
"clkconv": ClockConverter(IQSample(8), 4, "gck1", "sync"),
"clkconv": ClockConverter(IQSample(8), 8, "gck1", "sync"),
}
for k,v in rx_chain.items():
m.submodules[f"rx_{k}"] = v
@@ -235,7 +82,7 @@ class Top(Elaboratable):
tx_chain = {
# Clock domain conversion.
"clkconv": ClockConverter(IQSample(8), 4, "sync", "gck1", always_ready=False),
"clkconv": ClockConverter(IQSample(8), 8, "sync", "gck1", always_ready=False),
# Half-band interpolation stages (+ skid buffers for timing closure).
"hbfir1": HalfBandInterpolator(taps, data_shape=fixed.SQ(7),
@@ -248,6 +95,7 @@ class Top(Elaboratable):
# CIC interpolation stage.
"cic_interpolator": CICInterpolator(1, 3, (1, 2, 4, 8), 8, 8, num_channels=2,
always_ready=False, domain="gck1"),
"skid4": DomainRenamer("gck1")(StreamSkidBuffer(IQSample(8), always_ready=False)),
}
for k,v in tx_chain.items():
m.submodules[f"tx_{k}"] = v
@@ -263,7 +111,7 @@ class Top(Elaboratable):
m.d.comb += [
adcdac_intf.dac_stream.p.eq(nco.output),
adcdac_intf.dac_stream.valid.eq(1),
tx_chain["cic_interpolator"].output.ready.eq(1),
last.ready.eq(1),
]
with m.Else():
connect(m, last, adcdac_intf.dac_stream)
@@ -281,7 +129,7 @@ class Top(Elaboratable):
m.d.sync += [
# Trigger enable.
flow_ctl.trigger_en .eq(ctrl[7]),
mcu_intf.trigger_en .eq(ctrl[7]),
# PRBS enable.
mcu_intf.prbs .eq(ctrl[6]),

View File

@@ -35,7 +35,7 @@ class ClockConverter(wiring.Component):
def elaborate(self, platform):
m = Module()
m.submodules.mem = mem = fifo.AsyncFIFO(
m.submodules.mem = mem = fifo.AsyncFIFOBuffered(
width=Shape.cast(self.shape).width,
depth=self.depth,
r_domain=self._output_domain,