From d42a5da988a66b8d5cdecee4fdf9350251cdf9b9 Mon Sep 17 00:00:00 2001 From: a1q123456 Date: Fri, 2 Aug 2019 23:33:32 +0800 Subject: [PATCH 1/2] make dispose action async --- Harmonic/Controllers/Record/RecordStream.cs | 49 +++++++++++-------- .../Controllers/WebSocketPlayController.cs | 38 +------------- 2 files changed, 30 insertions(+), 57 deletions(-) diff --git a/Harmonic/Controllers/Record/RecordStream.cs b/Harmonic/Controllers/Record/RecordStream.cs index 9293735..afd4111 100644 --- a/Harmonic/Controllers/Record/RecordStream.cs +++ b/Harmonic/Controllers/Record/RecordStream.cs @@ -39,7 +39,7 @@ public class RecordStream : NetStream private RtmpChunkStream VideoChunkStream { get; set; } = null; private RtmpChunkStream AudioChunkStream { get; set; } = null; private bool _disposed = false; - protected override void Dispose(bool disposing) + protected override async void Dispose(bool disposing) { base.Dispose(disposing); if (!_disposed) @@ -47,29 +47,36 @@ protected override void Dispose(bool disposing) _disposed = true; if (_recordFileData != null) { - var filePath = _recordFileData.Name; - using (var recordFile = new FileStream(filePath.Substring(0, filePath.Length - 5) + ".flv", FileMode.OpenOrCreate)) + try { - recordFile.SetLength(0); - recordFile.Seek(0, SeekOrigin.Begin); - recordFile.Write(FlvMuxer.MultiplexFlvHeader(true, true)); - var metaData = _metaData.Data[1] as Dictionary; - metaData["duration"] = ((double)_maxTimestamp) / 1000; - metaData["keyframes"] = _keyframes; - _metaData.MessageHeader.MessageLength = 0; - var dataTagLen = FlvMuxer.MultiplexFlv(_metaData).Length; - - var offset = recordFile.Position + dataTagLen; - for (int i = 0; i < _keyframeFilePositions.Count; i++) + var filePath = _recordFileData.Name; + using (var recordFile = new FileStream(filePath.Substring(0, filePath.Length - 5) + ".flv", FileMode.OpenOrCreate)) { - _keyframeFilePositions[i] = (double)_keyframeFilePositions[i] + offset; + recordFile.SetLength(0); + recordFile.Seek(0, SeekOrigin.Begin); + await recordFile.WriteAsync(FlvMuxer.MultiplexFlvHeader(true, true)); + var metaData = _metaData.Data[1] as Dictionary; + metaData["duration"] = ((double)_maxTimestamp) / 1000; + metaData["keyframes"] = _keyframes; + _metaData.MessageHeader.MessageLength = 0; + var dataTagLen = FlvMuxer.MultiplexFlv(_metaData).Length; + + var offset = recordFile.Position + dataTagLen; + for (int i = 0; i < _keyframeFilePositions.Count; i++) + { + _keyframeFilePositions[i] = (double)_keyframeFilePositions[i] + offset; + } + + await recordFile.WriteAsync(FlvMuxer.MultiplexFlv(_metaData)); + _recordFileData.Seek(0, SeekOrigin.Begin); + await _recordFileData.CopyToAsync(recordFile); + _recordFileData.Dispose(); + File.Delete(filePath); } - - recordFile.Write(FlvMuxer.MultiplexFlv(_metaData)); - _recordFileData.Seek(0, SeekOrigin.Begin); - _recordFileData.CopyTo(recordFile); - _recordFileData.Dispose(); - File.Delete(filePath); + } + catch (Exception e) + { + Console.WriteLine(e); } } _recordFile?.Dispose(); diff --git a/Harmonic/Controllers/WebSocketPlayController.cs b/Harmonic/Controllers/WebSocketPlayController.cs index 3a05881..67452f3 100644 --- a/Harmonic/Controllers/WebSocketPlayController.cs +++ b/Harmonic/Controllers/WebSocketPlayController.cs @@ -16,14 +16,6 @@ namespace Harmonic.Controllers { public class WebSocketPlayController : WebSocketController, IDisposable { - class SeekAction - { - [JsonProperty("action")] - public string Action { get; set; } - [JsonProperty("filePos")] - public double FilePos { get; set; } - } - private RecordService _recordService = null; private PublisherSessionService _publisherSessionService = null; private List _cleanupActions = new List(); @@ -102,7 +94,7 @@ private async Task PlayRecordFile() { Interlocked.Exchange(ref _playing, 1); var buffer = new byte[512]; - int bytesRead = 0; + int bytesRead; do { await _playLock.WaitAsync(); @@ -127,34 +119,8 @@ private void SendAudio(AudioMessage message) Session.SendMessageAsync(message); } - public override async void OnMessage(string msg) + public override void OnMessage(string msg) { - //var obj = JObject.Parse(msg); - //var action = obj["action"].Value(); - - //if (action == "suspend") - //{ - // await _playLock.WaitAsync(); - //} - //else if (action == "seek") - //{ - // if (_playLock.CurrentCount != 0) - // { - // return; - // } - - // var pos = JsonConvert.DeserializeObject(msg).FilePos; - - // _recordFile.Seek((int)pos, SeekOrigin.Begin); - - // _playLock.Release(); - - // if (_playing == 0) - // { - // await PlayRecordFile(); - // } - - //} } #region IDisposable Support From a209e7de1b1fa652822113cffafbcfd113cd54ed Mon Sep 17 00:00:00 2001 From: a1q123456 Date: Sun, 4 Aug 2019 23:02:20 +0800 Subject: [PATCH 2/2] bandwidth limit & critical bugfix --- .../Controllers/Record/RecordController.cs | 3 +- Harmonic/Controllers/Record/RecordStream.cs | 117 ++++++++++++------ Harmonic/Harmonic.csproj | 9 ++ .../Amf/Serialization/Amf0/Amf0Reader.cs | 1 + .../Amf/Serialization/Amf0/Amf0Writer.cs | 20 +-- .../Amf/Serialization/Amf3/Amf3Writer.cs | 21 ++-- Harmonic/Networking/Flv/FlvDemuxer.cs | 33 +++-- .../Networking/Rtmp/ChunkStreamContext.cs | 89 ++++++++++--- Harmonic/Networking/Rtmp/HandshakeContext.cs | 4 +- Harmonic/Networking/Rtmp/IOPipeLine.cs | 31 ++--- Harmonic/Networking/Rtmp/RtmpSession.cs | 29 +++-- UnitTest/UnitTest.csproj | 6 +- 12 files changed, 257 insertions(+), 106 deletions(-) diff --git a/Harmonic/Controllers/Record/RecordController.cs b/Harmonic/Controllers/Record/RecordController.cs index 2638dca..5a574e2 100644 --- a/Harmonic/Controllers/Record/RecordController.cs +++ b/Harmonic/Controllers/Record/RecordController.cs @@ -1,4 +1,5 @@ -using Harmonic.Rpc; +using Harmonic.Networking.Rtmp.Messages; +using Harmonic.Rpc; using System; using System.Collections.Generic; using System.Text; diff --git a/Harmonic/Controllers/Record/RecordStream.cs b/Harmonic/Controllers/Record/RecordStream.cs index afd4111..36b6f41 100644 --- a/Harmonic/Controllers/Record/RecordStream.cs +++ b/Harmonic/Controllers/Record/RecordStream.cs @@ -29,16 +29,19 @@ public class RecordStream : NetStream private FileStream _recordFileData = null; private RecordService _recordService = null; private DataMessage _metaData = null; - private uint _maxTimestamp = 0; + private uint _currentTimestamp = 0; private SemaphoreSlim _playLock = new SemaphoreSlim(1); private int _playing = 0; private AmfObject _keyframes = null; private List _keyframeTimes; private List _keyframeFilePositions; + private long _bufferMs = -1; private RtmpChunkStream VideoChunkStream { get; set; } = null; private RtmpChunkStream AudioChunkStream { get; set; } = null; private bool _disposed = false; + private CancellationTokenSource _playCts; + protected override async void Dispose(bool disposing) { base.Dispose(disposing); @@ -56,7 +59,7 @@ protected override async void Dispose(bool disposing) recordFile.Seek(0, SeekOrigin.Begin); await recordFile.WriteAsync(FlvMuxer.MultiplexFlvHeader(true, true)); var metaData = _metaData.Data[1] as Dictionary; - metaData["duration"] = ((double)_maxTimestamp) / 1000; + metaData["duration"] = ((double)_currentTimestamp) / 1000; metaData["keyframes"] = _keyframes; _metaData.MessageHeader.MessageLength = 0; var dataTagLen = FlvMuxer.MultiplexFlv(_metaData).Length; @@ -109,6 +112,7 @@ public async Task Publish([FromOptionalArgument] string streamName, [FromOptiona MessageStream.RegisterMessageHandler(HandleData); MessageStream.RegisterMessageHandler(HandleAudioMessage); MessageStream.RegisterMessageHandler(HandleVideoMessage); + MessageStream.RegisterMessageHandler(HandleUserControlMessage); onStatus.InfoObject = new AmfObject { {"level", "status" }, @@ -127,11 +131,19 @@ public async Task Publish([FromOptionalArgument] string streamName, [FromOptiona _keyframes.Add("filepositions", _keyframeFilePositions); } + private void HandleUserControlMessage(UserControlMessage msg) + { + if (msg.UserControlEventType == UserControlEventType.SetBufferLength) + { + _bufferMs = (msg as SetBufferLengthMessage).BufferMilliseconds; + } + } + private async void HandleAudioMessage(AudioMessage message) { try { - _maxTimestamp = Math.Max(_maxTimestamp, message.MessageHeader.Timestamp); + _currentTimestamp = Math.Max(_currentTimestamp, message.MessageHeader.Timestamp); await SaveMessage(message); } @@ -145,7 +157,7 @@ private async void HandleVideoMessage(VideoMessage message) { try { - _maxTimestamp = Math.Max(_maxTimestamp, message.MessageHeader.Timestamp); + _currentTimestamp = Math.Max(_currentTimestamp, message.MessageHeader.Timestamp); var head = message.Data.Span[0]; @@ -191,21 +203,16 @@ public async Task Seek([FromOptionalArgument] double milliSeconds) resetStatus.InfoObject = resetData; await MessageStream.SendMessageAsync(ChunkStream, resetStatus); - await SeekAndPlay(milliSeconds); - } - - private async Task SeekAndPlay(double milliSeconds) - { - await _playLock.WaitAsync(); - _recordFileData.Seek(0, SeekOrigin.Begin); - - await FlvDemuxer.SeekAsync(milliSeconds); - - _playLock.Release(); - if (_playing == 0) + _playCts?.Cancel(); + while (_playing == 1) { - await StartPlay(); + await Task.Yield(); } + + var cts = new CancellationTokenSource(); + _playCts?.Dispose(); + _playCts = cts; + await SeekAndPlay(milliSeconds, cts.Token); } [RpcMethod("play")] @@ -236,15 +243,23 @@ public async Task Play( {"description", "Started playing." }, {"details", streamName } }; + var startStatus = RtmpSession.CreateCommandMessage(); startStatus.InfoObject = startData; await MessageStream.SendMessageAsync(ChunkStream, startStatus); - + var bandwidthLimit = new WindowAcknowledgementSizeMessage() + { + WindowSize = 500 * 1024 + }; + await RtmpSession.ControlMessageStream.SendMessageAsync(RtmpSession.ControlChunkStream, bandwidthLimit); VideoChunkStream = RtmpSession.CreateChunkStream(); AudioChunkStream = RtmpSession.CreateChunkStream(); + var cts = new CancellationTokenSource(); + _playCts?.Dispose(); + _playCts = cts; start = Math.Max(start, 0); - await SeekAndPlay(start / 1000); + await SeekAndPlay(start / 1000, cts.Token); } [RpcMethod("pause")] @@ -252,36 +267,64 @@ public async Task Pause([FromOptionalArgument] bool isPause, [FromOptionalArgume { if (isPause) { - await _playLock.WaitAsync(); - _recordFile.Seek(0, SeekOrigin.End); - _playLock.Release(); + _playCts?.Cancel(); + while (_playing == 1) + { + await Task.Yield(); + } } else { - await SeekAndPlay(milliseconds); + var cts = new CancellationTokenSource(); + _playCts?.Dispose(); + _playCts = cts; + await SeekAndPlay(milliseconds, cts.Token); } } - private async Task StartPlay() + private async Task StartPlayNoLock(CancellationToken ct) { - Interlocked.Exchange(ref _playing, 1); - while (_recordFile.Position < _recordFile.Length) + while (_recordFile.Position < _recordFile.Length && !ct.IsCancellationRequested) { - await PlayRecordFile(); + while (_bufferMs != -1 && _currentTimestamp >= _bufferMs) + { + await Task.Yield(); + } + + await PlayRecordFileNoLock(ct); } - Interlocked.Exchange(ref _playing, 0); } - private Task ReadMessage() + private Task ReadMessage(CancellationToken ct) { - return FlvDemuxer.DemultiplexFlvAsync(); + return FlvDemuxer.DemultiplexFlvAsync(ct); } - private async Task PlayRecordFile() + private async Task SeekAndPlay(double milliSeconds, CancellationToken ct) { await _playLock.WaitAsync(); - var message = await ReadMessage(); - _playLock.Release(); + Interlocked.Exchange(ref _playing, 1); + try + { + + _recordFile.Seek(9, SeekOrigin.Begin); + FlvDemuxer.SeekNoLock(milliSeconds, _metaData == null ? null : _metaData.Data[2] as Dictionary, ct); + await StartPlayNoLock(ct); + } + catch (Exception e) + { + Console.WriteLine(e); + } + finally + { + Interlocked.Exchange(ref _playing, 0); + _playLock.Release(); + } + } + + private async Task PlayRecordFileNoLock(CancellationToken ct) + { + var message = await ReadMessage(ct); if (message is AudioMessage) { await MessageStream.SendMessageAsync(AudioChunkStream, message); @@ -290,11 +333,13 @@ private async Task PlayRecordFile() { await MessageStream.SendMessageAsync(VideoChunkStream, message); } - else if (message is DataMessage) + else if (message is DataMessage data) { - await MessageStream.SendMessageAsync(ChunkStream, message); + data.Data.Insert(0, "@setDataFrame"); + _metaData = data; + await MessageStream.SendMessageAsync(ChunkStream, data); } - + _currentTimestamp = Math.Max(_currentTimestamp, message.MessageHeader.Timestamp); } private async Task SaveMessage(Message message) diff --git a/Harmonic/Harmonic.csproj b/Harmonic/Harmonic.csproj index 3f4a8a8..e3c210d 100644 --- a/Harmonic/Harmonic.csproj +++ b/Harmonic/Harmonic.csproj @@ -6,6 +6,15 @@ false + + true + TRACE + + + + false + + diff --git a/Harmonic/Networking/Amf/Serialization/Amf0/Amf0Reader.cs b/Harmonic/Networking/Amf/Serialization/Amf0/Amf0Reader.cs index 982d58c..a0fc42c 100644 --- a/Harmonic/Networking/Amf/Serialization/Amf0/Amf0Reader.cs +++ b/Harmonic/Networking/Amf/Serialization/Amf0/Amf0Reader.cs @@ -585,6 +585,7 @@ public bool TryGetStrictArray(Span buffer, out List array, out int int consumed = Amf0CommonValues.MARKER_LENGTH + sizeof(uint); var arrayBodyBuffer = buffer.Slice(consumed); var elementBodyBuffer = arrayBodyBuffer; + System.Diagnostics.Debug.WriteLine(elementCount); for (uint i = 0; i < elementCount; i++) { if (!TryGetValue(elementBodyBuffer, out _, out var element, out var bufferConsumed)) diff --git a/Harmonic/Networking/Amf/Serialization/Amf0/Amf0Writer.cs b/Harmonic/Networking/Amf/Serialization/Amf0/Amf0Writer.cs index 35ef855..9cf4326 100644 --- a/Harmonic/Networking/Amf/Serialization/Amf0/Amf0Writer.cs +++ b/Harmonic/Networking/Amf/Serialization/Amf0/Amf0Writer.cs @@ -103,11 +103,12 @@ private void WriteStringBytesImpl(string str, SerializationContext context, out var buffer = bufferBackend.AsSpan(0, bytesNeed); if (isLongString) { - Contract.Assert(NetworkBitConverter.TryGetBytes((uint)bodyLength, buffer)); + NetworkBitConverter.TryGetBytes((uint)bodyLength, buffer); } else { - Contract.Assert(NetworkBitConverter.TryGetBytes((ushort)bodyLength, buffer)); + var contractRet = NetworkBitConverter.TryGetBytes((ushort)bodyLength, buffer); + Contract.Assert(contractRet); } Encoding.UTF8.GetBytes(str, buffer.Slice(headerLength)); @@ -145,7 +146,8 @@ public void WriteBytes(double val, SerializationContext context) { var buffer = bufferBackend.AsSpan(0, bytesNeed); buffer[0] = (byte)Amf0Type.Number; - Contract.Assert(NetworkBitConverter.TryGetBytes(val, buffer.Slice(Amf0CommonValues.MARKER_LENGTH))); + var contractRet = NetworkBitConverter.TryGetBytes(val, buffer.Slice(Amf0CommonValues.MARKER_LENGTH)); + Contract.Assert(contractRet); context.Buffer.WriteToBuffer(buffer); } finally @@ -185,7 +187,8 @@ private void WriteReferenceIndexBytes(ushort index, SerializationContext context { var buffer = backend.AsSpan(0, bytesNeed); buffer[0] = (byte)Amf0Type.Reference; - Contract.Assert(NetworkBitConverter.TryGetBytes(index, buffer.Slice(Amf0CommonValues.MARKER_LENGTH))); + var contractRet = NetworkBitConverter.TryGetBytes(index, buffer.Slice(Amf0CommonValues.MARKER_LENGTH)); + Contract.Assert(contractRet); context.Buffer.WriteToBuffer(buffer); } finally @@ -213,7 +216,8 @@ public void WriteBytes(DateTime dateTime, SerializationContext context) buffer[0] = (byte)Amf0Type.Date; var dof = new DateTimeOffset(dateTime); var timestamp = (double)dof.ToUnixTimeMilliseconds(); - Contract.Assert(NetworkBitConverter.TryGetBytes(timestamp, buffer.Slice(Amf0CommonValues.MARKER_LENGTH))); + var contractRet = NetworkBitConverter.TryGetBytes(timestamp, buffer.Slice(Amf0CommonValues.MARKER_LENGTH)); + Contract.Assert(contractRet); context.Buffer.WriteToBuffer(buffer); } finally @@ -279,7 +283,8 @@ public void WriteBytes(List value, SerializationContext context) var countBuffer = _arrayPool.Rent(sizeof(uint)); try { - Contract.Assert(NetworkBitConverter.TryGetBytes((uint)value.Count, countBuffer)); + var contractRet = NetworkBitConverter.TryGetBytes((uint)value.Count, countBuffer); + Contract.Assert(contractRet); context.Buffer.WriteToBuffer(countBuffer.AsSpan(0, sizeof(uint))); } finally @@ -314,7 +319,8 @@ public void WriteBytes(Dictionary value, SerializationContext co var countBuffer = _arrayPool.Rent(sizeof(uint)); try { - Contract.Assert(NetworkBitConverter.TryGetBytes((uint)value.Count, countBuffer)); + var contractRet = NetworkBitConverter.TryGetBytes((uint)value.Count, countBuffer); + Contract.Assert(contractRet); context.Buffer.WriteToBuffer(countBuffer.AsSpan(0, sizeof(uint))); } finally diff --git a/Harmonic/Networking/Amf/Serialization/Amf3/Amf3Writer.cs b/Harmonic/Networking/Amf/Serialization/Amf3/Amf3Writer.cs index bb33230..45bad7d 100644 --- a/Harmonic/Networking/Amf/Serialization/Amf3/Amf3Writer.cs +++ b/Harmonic/Networking/Amf/Serialization/Amf3/Amf3Writer.cs @@ -68,7 +68,8 @@ public Amf3Writer() private void WrapVector(object value, SerializationContext context) { var valueType = value.GetType(); - Contract.Assert(valueType.IsGenericType); + var contractRet = valueType.IsGenericType; + Contract.Assert(contractRet); var defination = valueType.GetGenericTypeDefinition(); Contract.Assert(defination == typeof(Vector<>)); var vectorT = valueType.GetGenericArguments().First(); @@ -79,7 +80,8 @@ private void WrapVector(object value, SerializationContext context) private void WrapDictionary(object value, SerializationContext context) { var valueType = value.GetType(); - Contract.Assert(valueType.IsGenericType); + var contractRet = valueType.IsGenericType; + Contract.Assert(contractRet); var defination = valueType.GetGenericTypeDefinition(); Contract.Assert(defination == typeof(Amf3Dictionary<,>)); var tKey = valueType.GetGenericArguments().First(); @@ -205,7 +207,8 @@ public void WriteBytes(double value, SerializationContext context) var backend = _arrayPool.Rent(sizeof(double)); try { - Contract.Assert(NetworkBitConverter.TryGetBytes(value, backend)); + var contractRet = NetworkBitConverter.TryGetBytes(value, backend); + Contract.Assert(contractRet); context.Buffer.WriteToBuffer(backend.AsSpan(0, sizeof(double))); } finally @@ -290,7 +293,8 @@ public void WriteBytes(DateTime dateTime, SerializationContext context) var backend = _arrayPool.Rent(sizeof(double)); try { - Contract.Assert(NetworkBitConverter.TryGetBytes(timestamp, backend)); + var contractRet = NetworkBitConverter.TryGetBytes(timestamp, backend); + Contract.Assert(contractRet); context.Buffer.WriteToBuffer(backend.AsSpan(0, sizeof(double))); } finally @@ -511,7 +515,8 @@ public void WriteBytes(Vector value, SerializationContext context) { foreach (var i in value) { - Contract.Assert(NetworkBitConverter.TryGetBytes(i, buffer)); + var contractRet = NetworkBitConverter.TryGetBytes(i, buffer); + Contract.Assert(contractRet); context.Buffer.WriteToBuffer(buffer.AsSpan(0, sizeof(uint))); } } @@ -545,7 +550,8 @@ public void WriteBytes(Vector value, SerializationContext context) foreach (var i in value) { - Contract.Assert(NetworkBitConverter.TryGetBytes(i, buffer)); + var contractRet = NetworkBitConverter.TryGetBytes(i, buffer); + Contract.Assert(contractRet); context.Buffer.WriteToBuffer(buffer.AsSpan(0, sizeof(int))); } } @@ -579,7 +585,8 @@ public void WriteBytes(Vector value, SerializationContext context) { foreach (var i in value) { - Contract.Assert(NetworkBitConverter.TryGetBytes(i, buffer)); + var contractRet = NetworkBitConverter.TryGetBytes(i, buffer); + Contract.Assert(contractRet); context.Buffer.WriteToBuffer(buffer.AsSpan(0, sizeof(double))); } } diff --git a/Harmonic/Networking/Flv/FlvDemuxer.cs b/Harmonic/Networking/Flv/FlvDemuxer.cs index 7f0bf6d..9e9af4d 100644 --- a/Harmonic/Networking/Flv/FlvDemuxer.cs +++ b/Harmonic/Networking/Flv/FlvDemuxer.cs @@ -1,5 +1,6 @@ using Harmonic.Buffers; using Harmonic.Networking; +using Harmonic.Networking.Amf.Common; using Harmonic.Networking.Amf.Serialization.Amf0; using Harmonic.Networking.Amf.Serialization.Amf3; using Harmonic.Networking.Flv.Data; @@ -10,7 +11,9 @@ using System.Buffers; using System.Collections.Generic; using System.IO; +using System.Linq; using System.Text; +using System.Threading; using System.Threading.Tasks; using static Harmonic.Hosting.RtmpServerOptions; @@ -42,18 +45,26 @@ public async Task AttachStream(Stream stream, bool disposeOld = false) return headerBuffer; } - public async Task SeekAsync(double milliseconds) + public void SeekNoLock(double milliseconds, Dictionary metaData, CancellationToken ct = default) { - uint current = 0; - while (current <= milliseconds) + if (metaData == null) { - var header = await ReadHeader(); - current = header.Timestamp; - _stream.Seek(header.MessageLength + sizeof(int), SeekOrigin.Current); + return; } + var seconds = milliseconds / 1000; + var keyframes = metaData["keyframes"] as AmfObject; + var times = keyframes.Fields["times"] as List; + var idx = times.FindIndex(t => ((double)t) >= seconds); + if (idx == -1) + { + return; + } + var filePositions = keyframes.Fields["filepositions"] as List; + var pos = (double)filePositions[idx]; + _stream.Seek((int)(pos - 4), SeekOrigin.Begin); } - private async Task ReadHeader() + private async Task ReadHeader(CancellationToken ct = default) { byte[] headerBuffer = null; byte[] timestampBuffer = null; @@ -61,7 +72,7 @@ private async Task ReadHeader() { headerBuffer = _arrayPool.Rent(15); timestampBuffer = _arrayPool.Rent(4); - await _stream.ReadBytesAsync(headerBuffer.AsMemory(0, 15)); + await _stream.ReadBytesAsync(headerBuffer.AsMemory(0, 15), ct); var type = (MessageType)headerBuffer[4]; var length = NetworkBitConverter.ToUInt24(headerBuffer.AsSpan(5, 3)); @@ -124,13 +135,13 @@ public FlvVideoData DemultiplexVideoData(VideoMessage message) return ret; } - public async Task DemultiplexFlvAsync() + public async Task DemultiplexFlvAsync(CancellationToken ct = default) { byte[] bodyBuffer = null; try { - var header = await ReadHeader(); + var header = await ReadHeader(ct); bodyBuffer = _arrayPool.Rent((int)header.MessageLength); if (!_factories.TryGetValue(header.MessageType, out var factory)) @@ -138,7 +149,7 @@ public async Task DemultiplexFlvAsync() throw new InvalidOperationException(); } - await _stream.ReadBytesAsync(bodyBuffer.AsMemory(0, (int)header.MessageLength)); + await _stream.ReadBytesAsync(bodyBuffer.AsMemory(0, (int)header.MessageLength), ct); var context = new Networking.Rtmp.Serialization.SerializationContext() { diff --git a/Harmonic/Networking/Rtmp/ChunkStreamContext.cs b/Harmonic/Networking/Rtmp/ChunkStreamContext.cs index 732a26e..69bef2e 100644 --- a/Harmonic/Networking/Rtmp/ChunkStreamContext.cs +++ b/Harmonic/Networking/Rtmp/ChunkStreamContext.cs @@ -29,11 +29,11 @@ class ChunkStreamContext : IDisposable internal Dictionary _incompleteMessageState = new Dictionary(); internal uint? ReadWindowAcknowledgementSize { get; set; } = null; internal uint? WriteWindowAcknowledgementSize { get; set; } = null; - internal uint ReadWindowSize { get; set; } = 0; - internal uint WriteWindowSize { get; set; } = 0; internal int ReadChunkSize { get; set; } = 128; - internal bool BandwidthLimited { get; set; } = false; - internal int _writeChunkSize = 128; + internal long ReadUnAcknowledgedSize = 0; + internal long WriteUnAcknowledgedSize = 0; + + internal uint _writeChunkSize = 128; internal readonly int EXTENDED_TIMESTAMP_LENGTH = 4; internal readonly int TYPE0_SIZE = 11; internal readonly int TYPE1_SIZE = 7; @@ -46,7 +46,10 @@ class ChunkStreamContext : IDisposable internal Amf3Reader _amf3Reader = new Amf3Reader(); internal Amf3Writer _amf3Writer = new Amf3Writer(); + private IOPipeLine _ioPipeline = null; + private SemaphoreSlim _sync = new SemaphoreSlim(1); + internal LimitType? PreviousLimitType { get; set; } = null; public ChunkStreamContext(IOPipeLine stream) { @@ -64,7 +67,7 @@ public void Dispose() ((IDisposable)_rtmpSession).Dispose(); } - internal Task MultiplexMessageAsync(uint chunkStreamId, Message message) + internal async Task MultiplexMessageAsync(uint chunkStreamId, Message message) { if (!message.MessageHeader.MessageStreamId.HasValue) { @@ -112,22 +115,76 @@ internal Task MultiplexMessageAsync(uint chunkStreamId, Message message) _previousWriteMessageHeader[chunkStreamId] = (MessageHeader)message.MessageHeader.Clone(); var headerLength = basicHeaderLength + messageHeaderLength; var bodySize = (int)(length - i >= _writeChunkSize ? _writeChunkSize : length - i); + var chunkBuffer = _arrayPool.Rent(headerLength + bodySize); - basicHeader.AsSpan(0, basicHeaderLength).CopyTo(chunkBuffer); - messageHeader.AsSpan(0, messageHeaderLength).CopyTo(chunkBuffer.AsSpan(basicHeaderLength)); - _arrayPool.Return(basicHeader); - _arrayPool.Return(messageHeader); - buffer.AsSpan(i, bodySize).CopyTo(chunkBuffer.AsSpan(headerLength)); - i += bodySize; - var isLastChunk = message.MessageHeader.MessageLength - i == 0; - var tsk = _ioPipeline.SendRawData(chunkBuffer.AsSpan(0, headerLength + bodySize)); - if (isLastChunk) + await _sync.WaitAsync(); + try { - ret = tsk; + basicHeader.AsSpan(0, basicHeaderLength).CopyTo(chunkBuffer); + messageHeader.AsSpan(0, messageHeaderLength).CopyTo(chunkBuffer.AsSpan(basicHeaderLength)); + _arrayPool.Return(basicHeader); + _arrayPool.Return(messageHeader); + buffer.AsSpan(i, bodySize).CopyTo(chunkBuffer.AsSpan(headerLength)); + i += bodySize; + var isLastChunk = message.MessageHeader.MessageLength - i == 0; + + long offset = 0; + long totalLength = headerLength + bodySize; + long currentSendSize = totalLength; + + while (offset != (headerLength + bodySize)) + { + if (WriteWindowAcknowledgementSize.HasValue && Interlocked.Read(ref WriteUnAcknowledgedSize) + headerLength + bodySize > WriteWindowAcknowledgementSize.Value) + { + currentSendSize = Math.Min(WriteWindowAcknowledgementSize.Value, currentSendSize); + //var delayCount = 0; + while (currentSendSize + Interlocked.Read(ref WriteUnAcknowledgedSize) >= WriteWindowAcknowledgementSize.Value) + { + await Task.Delay(1); + } + } + var tsk = _ioPipeline.SendRawData(chunkBuffer.AsMemory((int)offset, (int)currentSendSize)); + offset += currentSendSize; + totalLength -= currentSendSize; + + if (WriteWindowAcknowledgementSize.HasValue) + { + Interlocked.Add(ref WriteUnAcknowledgedSize, currentSendSize); + } + + if (isLastChunk) + { + ret = tsk; + } + } + if (isLastChunk) + { + if (message.MessageHeader.MessageType == MessageType.SetChunkSize) + { + var setChunkSize = message as SetChunkSizeMessage; + _writeChunkSize = setChunkSize.ChunkSize; + } + else if (message.MessageHeader.MessageType == MessageType.SetPeerBandwidth) + { + var m = message as SetPeerBandwidthMessage; + ReadWindowAcknowledgementSize = m.WindowSize; + } + else if (message.MessageHeader.MessageType == MessageType.WindowAcknowledgementSize) + { + var m = message as WindowAcknowledgementSizeMessage; + WriteWindowAcknowledgementSize = m.WindowSize; + } + } + } + finally + { + _sync.Release(); + _arrayPool.Return(chunkBuffer); } } Debug.Assert(ret != null); - return ret; + await ret; + } finally { diff --git a/Harmonic/Networking/Rtmp/HandshakeContext.cs b/Harmonic/Networking/Rtmp/HandshakeContext.cs index 1d40ef6..d1d2815 100644 --- a/Harmonic/Networking/Rtmp/HandshakeContext.cs +++ b/Harmonic/Networking/Rtmp/HandshakeContext.cs @@ -67,13 +67,13 @@ private bool ProcessHandshakeC0C1(ReadOnlySequence buffer, ref int consume arr[0] = 3; NetworkBitConverter.TryGetBytes(_writerTimestampEpoch, arr.AsSpan(1, 4)); _s1Data.AsSpan(0, 1528).CopyTo(arr.AsSpan(9)); - _ioPipeline.SendRawData(arr.AsSpan(0, 1537)); + _ = _ioPipeline.SendRawData(arr.AsMemory(0, 1537)); // s2 NetworkBitConverter.TryGetBytes(_readerTimestampEpoch, arr.AsSpan(0, 4)); NetworkBitConverter.TryGetBytes((uint)0, arr.AsSpan(4, 4)); - _ioPipeline.SendRawData(arr.AsSpan(0, 1536)); + _ = _ioPipeline.SendRawData(arr.AsMemory(0, 1536)); buffer.Slice(consumed, 1528).CopyTo(arr.AsSpan(8)); consumed += 1528; diff --git a/Harmonic/Networking/Rtmp/IOPipeLine.cs b/Harmonic/Networking/Rtmp/IOPipeLine.cs index 8bf787e..d55697d 100644 --- a/Harmonic/Networking/Rtmp/IOPipeLine.cs +++ b/Harmonic/Networking/Rtmp/IOPipeLine.cs @@ -138,11 +138,13 @@ internal void OnHandshakeSuccessful() } #region Sender - internal Task SendRawData(ReadOnlySpan data) + + internal async Task SendRawData(ReadOnlyMemory data) { var tcs = new TaskCompletionSource(); var buffer = _arrayPool.Rent(data.Length); data.CopyTo(buffer); + _writerQueue.Enqueue(new WriteState() { Buffer = buffer, @@ -150,7 +152,7 @@ internal Task SendRawData(ReadOnlySpan data) Length = data.Length }); _writerSignal.Release(); - return tcs.Task; + await tcs.Task; } private async Task Writer(CancellationToken ct) @@ -186,18 +188,6 @@ private async Task Producer(Socket s, PipeWriter writer, CancellationToken ct = { break; } - if (ChunkStreamContext != null) - { - ChunkStreamContext.ReadWindowSize += (uint)bytesRead; - if (ChunkStreamContext.ReadWindowAcknowledgementSize.HasValue) - { - if (ChunkStreamContext.ReadWindowSize >= ChunkStreamContext.ReadWindowAcknowledgementSize) - { - ChunkStreamContext._rtmpSession.Acknowledgement(ChunkStreamContext.ReadWindowAcknowledgementSize.Value); - ChunkStreamContext.ReadWindowSize -= ChunkStreamContext.ReadWindowAcknowledgementSize.Value; - } - } - } writer.Advance(bytesRead); var result = await writer.FlushAsync(ct); if (result.IsCompleted || result.IsCanceled) @@ -227,7 +217,18 @@ private async Task Consumer(PipeReader reader, CancellationToken ct = default) buffer = buffer.Slice(consumed); reader.AdvanceTo(buffer.Start, buffer.End); - + if (ChunkStreamContext != null) + { + ChunkStreamContext.ReadUnAcknowledgedSize += consumed; + if (ChunkStreamContext.ReadWindowAcknowledgementSize.HasValue) + { + if (ChunkStreamContext.ReadUnAcknowledgedSize >= ChunkStreamContext.ReadWindowAcknowledgementSize) + { + ChunkStreamContext._rtmpSession.Acknowledgement((uint)ChunkStreamContext.ReadUnAcknowledgedSize); + ChunkStreamContext.ReadUnAcknowledgedSize -= 0; + } + } + } if (result.IsCompleted || result.IsCanceled) { break; diff --git a/Harmonic/Networking/Rtmp/RtmpSession.cs b/Harmonic/Networking/Rtmp/RtmpSession.cs index 65f2515..df92bda 100644 --- a/Harmonic/Networking/Rtmp/RtmpSession.cs +++ b/Harmonic/Networking/Rtmp/RtmpSession.cs @@ -13,6 +13,7 @@ using System.Reflection; using System.Text; using System.Threading.Tasks; +using System.Threading; namespace Harmonic.Networking.Rtmp { @@ -39,9 +40,15 @@ internal RtmpSession(IOPipeLine ioPipeline) ControlMessageStream.RegisterMessageHandler(HandleSetChunkSize); ControlMessageStream.RegisterMessageHandler(HandleWindowAcknowledgementSize); ControlMessageStream.RegisterMessageHandler(HandleSetPeerBandwidth); + ControlMessageStream.RegisterMessageHandler(HandleAcknowledgement); _rpcService = ioPipeline.Options.ServerLifetime.Resolve(); } + private void HandleAcknowledgement(AcknowledgementMessage ack) + { + Interlocked.Add(ref IOPipeline.ChunkStreamContext.WriteUnAcknowledgedSize, ack.BytesReceived * -1); + } + internal void AssertStreamId(uint msid) { Debug.Assert(_messageStreams.ContainsKey(msid)); @@ -251,23 +258,25 @@ internal void Acknowledgement(uint bytesReceived) private void HandleSetPeerBandwidth(SetPeerBandwidthMessage message) { - IOPipeline.ChunkStreamContext.ReadWindowAcknowledgementSize = message.WindowSize; - SendControlMessageAsync(new AcknowledgementMessage() + if (IOPipeline.ChunkStreamContext.WriteWindowAcknowledgementSize.HasValue && message.LimitType == LimitType.Soft && message.WindowSize > IOPipeline.ChunkStreamContext.WriteWindowAcknowledgementSize) + { + return; + } + if (IOPipeline.ChunkStreamContext.PreviousLimitType.HasValue && message.LimitType == LimitType.Dynamic && IOPipeline.ChunkStreamContext.PreviousLimitType != LimitType.Hard) + { + return; + } + IOPipeline.ChunkStreamContext.PreviousLimitType = message.LimitType; + IOPipeline.ChunkStreamContext.WriteWindowAcknowledgementSize = message.WindowSize; + SendControlMessageAsync(new WindowAcknowledgementSizeMessage() { - BytesReceived = IOPipeline.ChunkStreamContext.ReadWindowSize + WindowSize = message.WindowSize }); - IOPipeline.ChunkStreamContext.ReadWindowSize = 0; - IOPipeline.ChunkStreamContext.BandwidthLimited = true; } private void HandleWindowAcknowledgementSize(WindowAcknowledgementSizeMessage message) { IOPipeline.ChunkStreamContext.ReadWindowAcknowledgementSize = message.WindowSize; - SendControlMessageAsync(new AcknowledgementMessage() - { - BytesReceived = IOPipeline.ChunkStreamContext.ReadWindowSize - }); - IOPipeline.ChunkStreamContext.ReadWindowSize = 0; } private void HandleSetChunkSize(SetChunkSizeMessage setChunkSize) diff --git a/UnitTest/UnitTest.csproj b/UnitTest/UnitTest.csproj index bf85c35..fa476c4 100644 --- a/UnitTest/UnitTest.csproj +++ b/UnitTest/UnitTest.csproj @@ -1,4 +1,4 @@ - + netcoreapp2.2 @@ -6,6 +6,10 @@ false + + true + +