From 9b94237e1a0e8aef1de470c5de07e3d49d5c95e5 Mon Sep 17 00:00:00 2001 From: Kevin Wang Date: Wed, 9 Feb 2022 10:36:58 -0500 Subject: [PATCH] Add H265 payloader and depacketizer This change completes the H265 implementation. --- codecs/h265_packet.go | 333 +++++++++++++++++++++++++++++++++++-- codecs/h265_packet_test.go | 2 +- 2 files changed, 324 insertions(+), 11 deletions(-) diff --git a/codecs/h265_packet.go b/codecs/h265_packet.go index 6f0490d..6054b23 100644 --- a/codecs/h265_packet.go +++ b/codecs/h265_packet.go @@ -4,6 +4,8 @@ import ( "encoding/binary" "errors" "fmt" + "math" + "sort" ) // @@ -724,18 +726,56 @@ var ( // Packet implementation // +type donKeyedNALU struct { + DON int + NALU []byte +} + // H265Packet represents a H265 packet, stored in the payload of an RTP packet. type H265Packet struct { - packet isH265Packet - mightNeedDONL bool + packet isH265Packet + maxDONDiff uint16 + depackBufNALUs uint16 + + prevDON *uint16 + prevAbsDON *int + + naluBuffer []donKeyedNALU + fuBuffer []byte videoDepacketizer } -// WithDONL can be called to specify whether or not DONL might be parsed. -// DONL may need to be parsed if `sprop-max-don-diff` is greater than 0 on the RTP stream. -func (p *H265Packet) WithDONL(value bool) { - p.mightNeedDONL = value +func toAbsDON(don uint16, prevDON *uint16, prevAbsDON *int) int { + if prevDON == nil || prevAbsDON == nil { + return int(don) + } + if don == *prevDON { + return *prevAbsDON + } + if don > *prevDON && don-*prevDON < 32768 { + return *prevAbsDON + int(don-*prevDON) + } + if don < *prevDON && *prevDON-don >= 32768 { + return *prevAbsDON + 65536 + int(*prevDON-don) + } + if don > *prevDON && don-*prevDON >= 32768 { + return *prevAbsDON - (int(*prevDON) + 65536 - int(don)) + } + if don < *prevDON && *prevDON-don < 32768 { + return *prevAbsDON - int(*prevDON-don) + } + return 0 +} + +// WithMaxDONDiff sets the maximum difference between DON values before being emitted. +func (p *H265Packet) WithMaxDONDiff(value uint16) { + p.maxDONDiff = value +} + +// WithDepackBufNALUs sets the maximum number of NALUs to be buffered. +func (p *H265Packet) WithDepackBufNALUs(value uint16) { + p.depackBufNALUs = value } // Unmarshal parses the passed byte slice and stores the result in the H265Packet this method is called upon @@ -762,7 +802,7 @@ func (p *H265Packet) Unmarshal(payload []byte) ([]byte, error) { case payloadHeader.IsFragmentationUnit(): decoded := &H265FragmentationUnitPacket{} - decoded.WithDONL(p.mightNeedDONL) + decoded.WithDONL(p.maxDONDiff > 0) if _, err := decoded.Unmarshal(payload); err != nil { return nil, err @@ -770,9 +810,32 @@ func (p *H265Packet) Unmarshal(payload []byte) ([]byte, error) { p.packet = decoded + if decoded.FuHeader().S() { + // push the nalu header + header := decoded.PayloadHeader() + p.fuBuffer = []byte{ + (uint8(header>>8) & 0b10000001) | (decoded.FuHeader().FuType() << 1), + uint8(header), + } + } + p.fuBuffer = append(p.fuBuffer, decoded.Payload()...) + if decoded.FuHeader().E() { + var absDON int + if p.maxDONDiff > 0 { + absDON = toAbsDON(*decoded.DONL(), p.prevDON, p.prevAbsDON) + p.prevDON = decoded.DONL() + p.prevAbsDON = &absDON + } + p.naluBuffer = append(p.naluBuffer, donKeyedNALU{ + DON: absDON, + NALU: p.fuBuffer, + }) + p.fuBuffer = nil + } + case payloadHeader.IsAggregationPacket(): decoded := &H265AggregationPacket{} - decoded.WithDONL(p.mightNeedDONL) + decoded.WithDONL(p.maxDONDiff > 0) if _, err := decoded.Unmarshal(payload); err != nil { return nil, err @@ -780,18 +843,78 @@ func (p *H265Packet) Unmarshal(payload []byte) ([]byte, error) { p.packet = decoded + var absDON int + if p.maxDONDiff > 0 { + absDON = toAbsDON(*decoded.FirstUnit().DONL(), p.prevDON, p.prevAbsDON) + p.prevDON = decoded.FirstUnit().DONL() + p.prevAbsDON = &absDON + } + p.naluBuffer = append(p.naluBuffer, donKeyedNALU{DON: absDON, NALU: decoded.FirstUnit().NalUnit()}) + for _, unit := range decoded.OtherUnits() { + if p.maxDONDiff > 0 { + donl := uint16(*unit.DOND()) + 1 + *decoded.FirstUnit().DONL() + absDON = toAbsDON(donl, p.prevDON, p.prevAbsDON) + p.prevDON = &donl + p.prevAbsDON = &absDON + } + p.naluBuffer = append(p.naluBuffer, donKeyedNALU{DON: absDON, NALU: unit.NalUnit()}) + } + default: decoded := &H265SingleNALUnitPacket{} - decoded.WithDONL(p.mightNeedDONL) + decoded.WithDONL(p.maxDONDiff > 0) if _, err := decoded.Unmarshal(payload); err != nil { return nil, err } p.packet = decoded + + buf := make([]byte, 2+len(decoded.payload)) + binary.BigEndian.PutUint16(buf[0:2], uint16(decoded.payloadHeader)) + copy(buf[2:], decoded.payload) + + var absDON int + if p.maxDONDiff > 0 { + absDON = toAbsDON(*decoded.DONL(), p.prevDON, p.prevAbsDON) + p.prevDON = decoded.DONL() + p.prevAbsDON = &absDON + } + p.naluBuffer = append(p.naluBuffer, donKeyedNALU{DON: absDON, NALU: buf}) } - return nil, nil + buf := []byte{} + if p.maxDONDiff > 0 { + // https://datatracker.ietf.org/doc/html/rfc7798#section-6 + // sort by AbsDON + sort.Slice(p.naluBuffer, func(i, j int) bool { + return p.naluBuffer[i].DON < p.naluBuffer[j].DON + }) + // find the max DONL value + var maxDONL int + for _, nalu := range p.naluBuffer { + if nalu.DON > maxDONL { + maxDONL = nalu.DON + } + } + minDONL := maxDONL - int(p.maxDONDiff) + // merge all NALUs while condition A or condition B are true + for len(p.naluBuffer) > 0 && (p.naluBuffer[0].DON < minDONL || len(p.naluBuffer) > int(p.depackBufNALUs)) { + // TODO: this is not actually correct following B.2.2, not all NALUs have a 4-byte start code. + buf = append(buf, annexbNALUStartCode()...) + buf = append(buf, p.naluBuffer[0].NALU...) + p.naluBuffer = p.naluBuffer[1:] + } + } else { + // return the nalu buffer joined together + for _, val := range p.naluBuffer { + // TODO: this is not actually correct following B.2.2, not all NALUs have a 4-byte start code. + buf = append(buf, annexbNALUStartCode()...) + buf = append(buf, val.NALU...) + } + p.naluBuffer = nil + } + return buf, nil } // Packet returns the populated packet. @@ -817,3 +940,193 @@ func (*H265Packet) IsPartitionHead(payload []byte) bool { return true } + +// H265Payloader payloads H265 packets +type H265Payloader struct { + AddDONL bool + SkipAggregation bool + donl uint16 +} + +// Payload fragments a H265 packet across one or more byte arrays +func (p *H265Payloader) Payload(mtu uint16, payload []byte) [][]byte { + var payloads [][]byte + if len(payload) == 0 { + return payloads + } + + bufferedNALUs := make([][]byte, 0) + aggregationBufferSize := 0 + + flushBufferedNals := func() { + if len(bufferedNALUs) == 0 { + return + } + if len(bufferedNALUs) == 1 { + // emit this as a single NALU packet + nalu := bufferedNALUs[0] + + if p.AddDONL { + buf := make([]byte, len(nalu)+2) + + // copy the NALU header to the payload header + copy(buf[0:h265NaluHeaderSize], nalu[0:h265NaluHeaderSize]) + + // copy the DONL into the header + binary.BigEndian.PutUint16(buf[h265NaluHeaderSize:h265NaluHeaderSize+2], p.donl) + + // write the payload + copy(buf[h265NaluHeaderSize+2:], nalu[h265NaluHeaderSize:]) + + p.donl++ + + payloads = append(payloads, buf) + } else { + // write the nalu directly to the payload + payloads = append(payloads, nalu) + } + } else { + // construct an aggregation packet + aggregationPacketSize := aggregationBufferSize + 2 + buf := make([]byte, aggregationPacketSize) + + layerID := uint8(math.MaxUint8) + tid := uint8(math.MaxUint8) + for _, nalu := range bufferedNALUs { + header := newH265NALUHeader(nalu[0], nalu[1]) + headerLayerID := header.LayerID() + headerTID := header.TID() + if headerLayerID < layerID { + layerID = headerLayerID + } + if headerTID < tid { + tid = headerTID + } + } + + binary.BigEndian.PutUint16(buf[0:2], (uint16(h265NaluAggregationPacketType)<<9)|(uint16(layerID)<<3)|uint16(tid)) + + index := 2 + for i, nalu := range bufferedNALUs { + if p.AddDONL { + if i == 0 { + binary.BigEndian.PutUint16(buf[index:index+2], p.donl) + index += 2 + } else { + buf[index] = byte(i - 1) + index++ + } + } + binary.BigEndian.PutUint16(buf[index:index+2], uint16(len(nalu))) + index += 2 + if copy(buf[index:], nalu) != len(nalu) { + panic("invalid state, short copy") + } + index += len(nalu) + } + + if index != aggregationPacketSize { + panic("invalid state, index != aggregationPacketSize") + } + + payloads = append(payloads, buf) + } + // clear the buffered NALUs + bufferedNALUs = make([][]byte, 0) + aggregationBufferSize = 0 + } + + emitNalus(payload, func(nalu []byte) { + if len(nalu) == 0 { + return + } + + if len(nalu) <= int(mtu) { + // this nalu fits into a single packet, either it can be emitted as + // a single nalu or appended to the previous aggregation packet + + marginalAggregationSize := len(nalu) + 2 + if p.AddDONL { + marginalAggregationSize += 1 + } + + if aggregationBufferSize+marginalAggregationSize > int(mtu) { + flushBufferedNals() + } + bufferedNALUs = append(bufferedNALUs, nalu) + aggregationBufferSize += marginalAggregationSize + if p.SkipAggregation { + // emit this immediately. + flushBufferedNals() + } + } else { + // if this nalu doesn't fit in the current mtu, it needs to be fragmented + fuPacketHeaderSize := h265FragmentationUnitHeaderSize + 2 /* payload header size */ + if p.AddDONL { + fuPacketHeaderSize += 2 + } + + // then, fragment the nalu + maxFUPayloadSize := int(mtu) - fuPacketHeaderSize + + naluHeader := newH265NALUHeader(nalu[0], nalu[1]) + + // the nalu header is omitted from the fragmentation packet payload + nalu = nalu[h265NaluHeaderSize:] + + if maxFUPayloadSize == 0 || len(nalu) == 0 { + return + } + + // flush any buffered aggregation packets. + flushBufferedNals() + + fullNALUSize := len(nalu) + for len(nalu) > 0 { + curentFUPayloadSize := len(nalu) + if curentFUPayloadSize > maxFUPayloadSize { + curentFUPayloadSize = maxFUPayloadSize + } + + out := make([]byte, fuPacketHeaderSize+curentFUPayloadSize) + + // write the payload header + binary.BigEndian.PutUint16(out[0:2], uint16(naluHeader)) + out[0] = (out[0] & 0b10000001) | h265NaluFragmentationUnitType<<1 + + // write the fragment header + out[2] = byte(H265FragmentationUnitHeader(naluHeader.Type())) + if len(nalu) == fullNALUSize { + // Set start bit + out[2] |= 1 << 7 + } else if len(nalu)-curentFUPayloadSize == 0 { + // Set end bit + out[2] |= 1 << 6 + } + + if p.AddDONL { + // write the DONL header + binary.BigEndian.PutUint16(out[3:5], p.donl) + + p.donl++ + + // copy the fragment payload + copy(out[5:], nalu[0:curentFUPayloadSize]) + } else { + // copy the fragment payload + copy(out[3:], nalu[0:curentFUPayloadSize]) + } + + // append the fragment to the payload + payloads = append(payloads, out) + + // advance the nalu data pointer + nalu = nalu[curentFUPayloadSize:] + } + } + }) + + flushBufferedNals() + + return payloads +} diff --git a/codecs/h265_packet_test.go b/codecs/h265_packet_test.go index 2172abe..bfbef22 100644 --- a/codecs/h265_packet_test.go +++ b/codecs/h265_packet_test.go @@ -791,7 +791,7 @@ func TestH265_Packet(t *testing.T) { for _, cur := range tt { pck := &H265Packet{} if cur.WithDONL { - pck.WithDONL(true) + pck.WithMaxDONDiff(1) } _, err := pck.Unmarshal(cur.Raw)