Spinal Stream协议

Overview

Stream实现了valid-ready握手协议,用于blocking传输。基于Stream,SpinalHDL提供了大量的方法,以及相关的building blocks。

小逻辑块

以下介绍9大类与Stream相关的常用硬件电路实现,它们均作为Stream的成员方法提供,往往不会单独成为一个Module。这些方法的特点是其返回值往往就是被驱动的下游,因此可以好几种方法级联使用,轻松组合出想要的连接。

Flow转换

将blocking传输转为non-blocking 传输,即flow协议。

  • toFlow ,flow valid、payload直接赋值,ready always true
  • toFlowFire , flow valid等于fire,payload直接赋值
  • asFlow ,ready不管,flow valid、payload直接赋值

突发传输

在Burst连续传输的情况下,我们需要给负载payload增加一个last标志。

  • addFragmentLast(last : Bool) : Stream[Fragment[T]] last由外部计数器逻辑处理
  • addFragmentLast(counter: Counter) : Stream[Fragment[T]] last根据传入的计数器处理

状态检测

用于表明当前stream所处的状态,是否拥堵?是否空闲?是否正处于握手状态?

  • isStall 表示当前cycle是否正在堵塞,ready处于deassert状态
  • isNew 表示当前cycle是否有新的transaction发生,为堵塞结束后的第一个cycle
  • isFree 表示当前并不拥堵:ready为高或者valid为低,没有data需要上路。
  • fire 表示当前cycle是否处于握手阶段

Repeater

在link上面增加寄存器(repeater),以优化timing。

  • m2sPipe(collapsBubble: Boolean=true, crossClockData: Boolean=false, flush: Bool=null,holdPayload: Boolean=false) , 给forward flow,即valid、payload部分打拍。同样的, stage() 则是 m2sPipe()
  • s2mPipe(stagesCount: Int) 则是在backward ready上打拍,forward部分仍然直连。
  • validPipe() 则是只在valid上打拍,payload与ready没打拍
  • halfPipe() 所有信号都打拍,而bandwidth小了一半。
  • pipeline 等于上面的各种组合

以上函数返回新的被驱动的Stream

连接符号

以下函数、操作符返回操作符右边的stream,即不是 this stream,而是作为函数参数的 that stream。这里的连接符号和上面各种打拍组合起来,方便使用。

  • <<, >> 直连
  • <-<, >-> 只有Flow的部分被加了一级流水,即m2sPipe
  • </<, >/> 只有ready部分加了一级流水,即s2mPipe
  • <-/<, >/-> Flow和ready的部分均加了一级流水,即m2sPipe.s2mPipe

还有其他连接符,如 & ~ ~~ 这类后面再说。

流量控制

这些方法,通过一个condition信号,来完成基本的流量控制任务,比如halt住,丢弃掉,或者放入buffer(FIFO)。

  • continueWhen(cond: Bool) \ haltWhen(cond: Bool) ,当cond为高的时候,前者才继续传输,而后者则阻塞传输。(符号 &continueWhen(cond) 一样)
  • throwWhen(cond: Bool) \ takeWhen(cond: Bool) , 当cond为高,前者丢弃掉当前的事务,而后者则只有cond为低的时候才丢弃。

或者通过一个FIFO,来缓存事务,以增加吞吐

  • queue(size: Int) ,在link上增加一个深度为size的同步FIFO作为一个buffer。
  • queueWithOccupancy(size: Int): (Stream[T],UInt) ,在 credit-base 传输中,可以返回buffer当前的占用率,作为credit告知upstream是否还能继续发起传输。
  • queueWithAvailability(size: Int): (Stream[T], UInt) ,返回buffer及其当前空置率,告知upstream。
  • queueLowLatency(size: Int, latency: Int = 0) 增加一个无延迟或低延迟的buffer

分频处理

下游时钟域和上游同步,但被分频得更慢。(不一定用在分频时钟的设计中

  • repeat(times: Int): (Stream[T], UInt) ,每个beat都被至少重复times次,使之能被下游的时钟catch。上游的吞吐因此降低了times倍
  • slowdown(factor: Int): Stream[Vec[T]] ,内部有一个移位寄存器做串并转换,下游的payload比上游宽factor倍,在ready始终enable下,每隔factor个cycle,下游fire一次。上游的吞吐不会降低。

CDC处理

存在多时钟域的时候,可能需要处理跨时钟域的握手问题。包括简单的二级buffer,或者异步FIFO。

二级buffer:

  • ccToggle(pushClock: ClockDomain, popClock: ClockDomain) 打两拍
  • ccToggleWithoutBuffer(pushClock: ClockDomain, popClock: ClockDomain) 不打拍(??)直接过。

异步FIFO:

  • queue(size: Int, pushClock: ClockDomain, popClock: ClockDomain) , 增加一个异步FIFO
  • queueWithPushOccupancy(size: Int, pushClock: ClockDomain, popClock: ClockDomain): (Stream[T], UInt) ,返回异步buffer及其占用率

负载转换

在某些场景下,上游的Stream需要经过某个处理,再与下游的Stream连接。这里,我们把this称为变换前的上游,that则称为变换后的下游。

上下游直连,负载不变:

  • connectFrom(that: Stream[T]): Stream[T] ,返回下游
  • arbitrationFrom(that: Stream[T]): Unit ,只直连上下流的握手信号,payload给其他逻辑管。

通常情况下,我们提供一个函数,描述了负载转换的逻辑。

这里,函数 transformation: (this.payload.type, that.payload.type) => Unit 表示了这种转换:

  • translateFrom(that)(transformation) 握手信号直连,返回this 上游。

镜像地, transformation: (that.payload.type, this.payload.type) => Unit :

  • translateInto(that)(transformation) 用法与上面对偶。返回 that 下游。

转换函数还可以是更直接的形式transformer: (T) => T2 ,直接表示从上游的payload到下流payload的一个转换函数,并通过连接符 ~~ 来使用,返回下游

1
2
3
val upStream: Stream[T]
...
val downStream: Stream[T2] = upStream ~~ { upPayload => doSomethingOn(upPayload) }

另外,若转换由外部逻辑预先完成,我们只需要将转换后的payload直接送给下游即可:

  • translateWith(thatPayload:T) that是新的下游的payload,返回下游。
  • 连接符 ~ 和上面的方法一样。

还有的场景下,下游的payload数据类型有变化,只是继承上游的握手信号,但暂时不对下游的payload做连接

  • swapPayload(that: HardType[T2]): Stream[T2] 返回变换类型的新下游。

或者做bit-level连接:

  • transmuteWith[T2 <: Data](that: HardType[T2]): Stream[T2] 返回新下游,下游负载被上游做了bit-level赋值。

大逻辑块

这里Stream相关的功能,往往以单独的模块提供。提供的模块包括:仲裁器,Mux/DeMux,Fork/Join, 同步FIFO/异步FIFO,二级buffer,位宽调制器等。

仲裁处理

详见SpinalHDL文档。这里做点补充:

提供的仲裁机制:

  • lowerFirst:优先级固定。
  • sequentialOrder:记录已经发射的次数,第几次发射,就选择第几号
  • roundRobin:不必多说。

提供的锁机制:

  • none:不锁,仲裁结果随时可能变化
  • transactionLock:满足valid-ready握手规则,一旦仲裁后,valid assert而ready尚未 assert,必须维持仲裁结果不变。这里只是单发传输
  • fragmentLock: 满足握手规则的前提下,支持突发传输。在last assert之前,保持仲裁结果不变。

Mux\Demux

NoC的crossbar结构基本单元。

上游多个stream连接到下游一个stream,用一个选择信号选通上游某个通道,使用mux;往往配合仲裁器使用,选择信号由仲裁器给出。

1
2
3
4
5
6
7
8
9
10
11
12
13
object StreamMux {
def apply[T <: Data](select: UInt, inputs: Seq[Stream[T]]): Stream[T] = {
val vec = Vec(inputs)
StreamMux(select, vec)
}

def apply[T <: Data](select: UInt, inputs: Vec[Stream[T]]): Stream[T] = {
val c = new StreamMux(inputs(0).payload, inputs.length)
(c.io.inputs, inputs).zipped.foreach(_ << _)
c.io.select:= select
c.io.output
}
}

上游一个stream连接到下游多个stream,用一个选择信号选通下游某个通道,使用demux;往往配合decoder使用,根据地址将传输送往特定的slave。

1
2
3
4
5
6
7
8
object StreamDemux{
def apply[T <: Data](input: Stream[T], select : UInt, portCount: Int) : Vec[Stream[T]] = {
val c = new StreamDemux(input.payload,portCount)
c.io.input<< input
c.io.select:= select
c.io.outputs
}
}

Fork

上游需要广播到多个下游。

1
2
3
4
5
6
7
object StreamFork {
def apply[T <: Data](input: Stream[T], portCount: Int, synchronous: Boolean = false): Vec[Stream[T]] = {
val fork = new StreamFork(input.payloadType, portCount, synchronous).setCompositeName(input, "fork", true)
fork.io.input<< input
return fork.io.outputs
}
}

同步机制:

  • 如果需要同步,那么只有当所有下游都ready,上游才能fire,下游可以同时拿到东西。这一机制可以造成死锁。同时违反axi握手依赖规则。
  • 反之,每个下游单独ready,单独拿到东西。上游在所有下游全部fire之前stall住。

Join

上游的多个stream,其payload打包,送给一个下游stream。握手信号做了简单处理,使得下游只有在上游都fire了才能fire。

1
2
3
4
5
6
7
8
object StreamJoin {
...
// 简单打包两个相同的payload
def apply[T1 <: Data,T2 <: Data](source1: Stream[T1], source2: Stream[T2]): Stream[TupleBundle2[T1, T2]]
// 这里只保留上游join到下游的握手信号,payload另外给,因为不同的payload可能类型不同
def arg(sources : Stream[_]*) : Event
// 多个类型相同的payload放在一起,打包成一个vec
def vec[T <: Data](sources: Seq[Stream[T]]): Stream[Vec[T]]

FIFO

同步FIFO和异步FIFO不用多说,embed在上面的小逻辑块部分。

需要补充的:

  • StreamFifoLowLatency 低延迟同步FIFO
  • StreamFifoMultiChannelSharedSpace 没做完??

位宽调制

上下游之间数据位宽不同,连接需要做一定处理。上游宽度大,下游需要拆分成多笔传输;下游宽度大,上游多笔传输合并成一笔。

单发传输:

1
2
3
4
5
6
7
object StreamWidthAdapter {
...
def make[T <: Data, T2 <: Data](input : Stream[T], outputPayloadType : HardType[T2], endianness: Endianness = LITTLE, padding : Boolean = false) : Stream[T2] = {
val ret = Stream(outputPayloadType())
StreamWidthAdapter(input,ret,endianness,padding)
ret
}

突发传输:

1
2
3
4
5
def make[T <: Data, T2 <: Data](input : Stream[Fragment[T]], outputPayloadType : HardType[T2], endianness: Endianness = LITTLE, padding : Boolean = false) : Stream[Fragment[T2]] = {
val ret = Stream(Fragment(outputPayloadType()))
StreamFragmentWidthAdapter(input,ret,endianness,padding)
ret
}

StreamTransactionExtender

用途不明。

功能:动态输入一个计数上限,计数器到达以前上游stall,直至达到上限才fire。下游payload根据计数器值,发生相应变化或者不变

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
class StreamTransactionExtender[T <: Data, T2 <: Data](
dataType: HardType[T],
outDataType: HardType[T2],
countWidth: Int,
var driver: (UInt, T) => T2 // 下游负载变化函数
) extends Component {
val io = new Bundle {
val count = in UInt (countWidth bit) //计数上限
val input = slave Stream dataType
val output = master Stream outDataType
}
...

object StreamTransactionExtender {
def apply[T <: Data](input: Stream[T], count: UInt)(
driver: (UInt, T) => T = (_: UInt, p: T) => p // 默认不变
): Stream[T] = {
val c = new StreamTransactionExtender(input.payloadType, input.payloadType, count.getBitsWidth, driver)
c.io.input << input
c.io.count := count
c.io.output
}
...

验证用具

spinal.lib.sim._ 下,有专门用于验证Stream接口的工具。包括了基本的 StreamDriver 和基本的 StreamMonitor

Stream Driver

其核心定义:

class StreamDriver[T <: Data](stream : Stream[T], clockDomain: ClockDomain, var driver : (T) => Boolean)

除了传入一个Stream接口以外,还要提供一个driver函数,等效于以下操作:

1
2
3
4
5
6
7
8
9
10
11
fork{
stream.valid #= false
while(true) {
clockDomain.waitSampling(transactionDelay())
clockDomain.waitSamplingWhere(driver(stream.payload))
stream.valid #= true
clockDomain.waitSamplingWhere(stream.ready.toBoolean)
stream.valid #= false
stream.payload.randomize()
}
}

driver只需对payload进行驱动操作,以及控制仿真进程的结束。

Stream Monitor

其定义如下:

1
2
3
4
5
6
7
8
class StreamMonitor[T <: Data](stream : Stream[T], clockDomain: ClockDomain){
valcallbacks= ArrayBuffer[(T) => Unit]()

def addCallback(callback : (T) => Unit): this.type = {
callbacks+= callback
this
}
...

需要用户手动添加回调函数进行数据收集操作。