Skip to content

Commit

Permalink
bandwidth limit & critical bugfix
Browse files Browse the repository at this point in the history
  • Loading branch information
a1q123456 committed Aug 4, 2019
1 parent d42a5da commit a209e7d
Show file tree
Hide file tree
Showing 12 changed files with 257 additions and 106 deletions.
3 changes: 2 additions & 1 deletion Harmonic/Controllers/Record/RecordController.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using Harmonic.Rpc;
using Harmonic.Networking.Rtmp.Messages;
using Harmonic.Rpc;
using System;
using System.Collections.Generic;
using System.Text;
Expand Down
117 changes: 81 additions & 36 deletions Harmonic/Controllers/Record/RecordStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,19 @@ public class RecordStream : NetStream
private FileStream _recordFileData = null;
private RecordService _recordService = null;
private DataMessage _metaData = null;
private uint _maxTimestamp = 0;
private uint _currentTimestamp = 0;
private SemaphoreSlim _playLock = new SemaphoreSlim(1);
private int _playing = 0;
private AmfObject _keyframes = null;
private List<object> _keyframeTimes;
private List<object> _keyframeFilePositions;
private long _bufferMs = -1;

private RtmpChunkStream VideoChunkStream { get; set; } = null;
private RtmpChunkStream AudioChunkStream { get; set; } = null;
private bool _disposed = false;
private CancellationTokenSource _playCts;

protected override async void Dispose(bool disposing)
{
base.Dispose(disposing);
Expand All @@ -56,7 +59,7 @@ protected override async void Dispose(bool disposing)
recordFile.Seek(0, SeekOrigin.Begin);
await recordFile.WriteAsync(FlvMuxer.MultiplexFlvHeader(true, true));
var metaData = _metaData.Data[1] as Dictionary<string, object>;
metaData["duration"] = ((double)_maxTimestamp) / 1000;
metaData["duration"] = ((double)_currentTimestamp) / 1000;
metaData["keyframes"] = _keyframes;
_metaData.MessageHeader.MessageLength = 0;
var dataTagLen = FlvMuxer.MultiplexFlv(_metaData).Length;
Expand Down Expand Up @@ -109,6 +112,7 @@ public async Task Publish([FromOptionalArgument] string streamName, [FromOptiona
MessageStream.RegisterMessageHandler<DataMessage>(HandleData);
MessageStream.RegisterMessageHandler<AudioMessage>(HandleAudioMessage);
MessageStream.RegisterMessageHandler<VideoMessage>(HandleVideoMessage);
MessageStream.RegisterMessageHandler<UserControlMessage>(HandleUserControlMessage);
onStatus.InfoObject = new AmfObject
{
{"level", "status" },
Expand All @@ -127,11 +131,19 @@ public async Task Publish([FromOptionalArgument] string streamName, [FromOptiona
_keyframes.Add("filepositions", _keyframeFilePositions);
}

private void HandleUserControlMessage(UserControlMessage msg)
{
if (msg.UserControlEventType == UserControlEventType.SetBufferLength)
{
_bufferMs = (msg as SetBufferLengthMessage).BufferMilliseconds;
}
}

private async void HandleAudioMessage(AudioMessage message)
{
try
{
_maxTimestamp = Math.Max(_maxTimestamp, message.MessageHeader.Timestamp);
_currentTimestamp = Math.Max(_currentTimestamp, message.MessageHeader.Timestamp);

await SaveMessage(message);
}
Expand All @@ -145,7 +157,7 @@ private async void HandleVideoMessage(VideoMessage message)
{
try
{
_maxTimestamp = Math.Max(_maxTimestamp, message.MessageHeader.Timestamp);
_currentTimestamp = Math.Max(_currentTimestamp, message.MessageHeader.Timestamp);

var head = message.Data.Span[0];

Expand Down Expand Up @@ -191,21 +203,16 @@ public async Task Seek([FromOptionalArgument] double milliSeconds)
resetStatus.InfoObject = resetData;
await MessageStream.SendMessageAsync(ChunkStream, resetStatus);

await SeekAndPlay(milliSeconds);
}

private async Task SeekAndPlay(double milliSeconds)
{
await _playLock.WaitAsync();
_recordFileData.Seek(0, SeekOrigin.Begin);

await FlvDemuxer.SeekAsync(milliSeconds);

_playLock.Release();
if (_playing == 0)
_playCts?.Cancel();
while (_playing == 1)
{
await StartPlay();
await Task.Yield();
}

var cts = new CancellationTokenSource();
_playCts?.Dispose();
_playCts = cts;
await SeekAndPlay(milliSeconds, cts.Token);
}

[RpcMethod("play")]
Expand Down Expand Up @@ -236,52 +243,88 @@ public async Task Play(
{"description", "Started playing." },
{"details", streamName }
};

var startStatus = RtmpSession.CreateCommandMessage<OnStatusCommandMessage>();
startStatus.InfoObject = startData;
await MessageStream.SendMessageAsync(ChunkStream, startStatus);

var bandwidthLimit = new WindowAcknowledgementSizeMessage()
{
WindowSize = 500 * 1024
};
await RtmpSession.ControlMessageStream.SendMessageAsync(RtmpSession.ControlChunkStream, bandwidthLimit);
VideoChunkStream = RtmpSession.CreateChunkStream();
AudioChunkStream = RtmpSession.CreateChunkStream();

var cts = new CancellationTokenSource();
_playCts?.Dispose();
_playCts = cts;
start = Math.Max(start, 0);
await SeekAndPlay(start / 1000);
await SeekAndPlay(start / 1000, cts.Token);
}

[RpcMethod("pause")]
public async Task Pause([FromOptionalArgument] bool isPause, [FromOptionalArgument] double milliseconds)
{
if (isPause)
{
await _playLock.WaitAsync();
_recordFile.Seek(0, SeekOrigin.End);
_playLock.Release();
_playCts?.Cancel();
while (_playing == 1)
{
await Task.Yield();
}
}
else
{
await SeekAndPlay(milliseconds);
var cts = new CancellationTokenSource();
_playCts?.Dispose();
_playCts = cts;
await SeekAndPlay(milliseconds, cts.Token);
}
}

private async Task StartPlay()
private async Task StartPlayNoLock(CancellationToken ct)
{
Interlocked.Exchange(ref _playing, 1);
while (_recordFile.Position < _recordFile.Length)
while (_recordFile.Position < _recordFile.Length && !ct.IsCancellationRequested)
{
await PlayRecordFile();
while (_bufferMs != -1 && _currentTimestamp >= _bufferMs)
{
await Task.Yield();
}

await PlayRecordFileNoLock(ct);
}
Interlocked.Exchange(ref _playing, 0);
}

private Task<Message> ReadMessage()
private Task<Message> ReadMessage(CancellationToken ct)
{
return FlvDemuxer.DemultiplexFlvAsync();
return FlvDemuxer.DemultiplexFlvAsync(ct);
}

private async Task PlayRecordFile()
private async Task SeekAndPlay(double milliSeconds, CancellationToken ct)
{
await _playLock.WaitAsync();
var message = await ReadMessage();
_playLock.Release();
Interlocked.Exchange(ref _playing, 1);
try
{

_recordFile.Seek(9, SeekOrigin.Begin);
FlvDemuxer.SeekNoLock(milliSeconds, _metaData == null ? null : _metaData.Data[2] as Dictionary<string, object>, ct);
await StartPlayNoLock(ct);
}
catch (Exception e)
{
Console.WriteLine(e);
}
finally
{
Interlocked.Exchange(ref _playing, 0);
_playLock.Release();
}
}

private async Task PlayRecordFileNoLock(CancellationToken ct)
{
var message = await ReadMessage(ct);
if (message is AudioMessage)
{
await MessageStream.SendMessageAsync(AudioChunkStream, message);
Expand All @@ -290,11 +333,13 @@ private async Task PlayRecordFile()
{
await MessageStream.SendMessageAsync(VideoChunkStream, message);
}
else if (message is DataMessage)
else if (message is DataMessage data)
{
await MessageStream.SendMessageAsync(ChunkStream, message);
data.Data.Insert(0, "@setDataFrame");
_metaData = data;
await MessageStream.SendMessageAsync(ChunkStream, data);
}

_currentTimestamp = Math.Max(_currentTimestamp, message.MessageHeader.Timestamp);
}

private async Task SaveMessage(Message message)
Expand Down
9 changes: 9 additions & 0 deletions Harmonic/Harmonic.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,15 @@
<AllowUnsafeBlocks>false</AllowUnsafeBlocks>
</PropertyGroup>

<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|AnyCPU'">
<Optimize>true</Optimize>
<DefineConstants>TRACE</DefineConstants>
</PropertyGroup>

<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|AnyCPU'">
<Optimize>false</Optimize>
</PropertyGroup>

<ItemGroup>
<Compile Remove="Networking\IStreamSession.cs" />
<Compile Remove="Networking\Rtmp\Supervisor.cs" />
Expand Down
1 change: 1 addition & 0 deletions Harmonic/Networking/Amf/Serialization/Amf0/Amf0Reader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -585,6 +585,7 @@ public bool TryGetStrictArray(Span<byte> buffer, out List<object> array, out int
int consumed = Amf0CommonValues.MARKER_LENGTH + sizeof(uint);
var arrayBodyBuffer = buffer.Slice(consumed);
var elementBodyBuffer = arrayBodyBuffer;
System.Diagnostics.Debug.WriteLine(elementCount);
for (uint i = 0; i < elementCount; i++)
{
if (!TryGetValue(elementBodyBuffer, out _, out var element, out var bufferConsumed))
Expand Down
20 changes: 13 additions & 7 deletions Harmonic/Networking/Amf/Serialization/Amf0/Amf0Writer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,12 @@ private void WriteStringBytesImpl(string str, SerializationContext context, out
var buffer = bufferBackend.AsSpan(0, bytesNeed);
if (isLongString)
{
Contract.Assert(NetworkBitConverter.TryGetBytes((uint)bodyLength, buffer));
NetworkBitConverter.TryGetBytes((uint)bodyLength, buffer);
}
else
{
Contract.Assert(NetworkBitConverter.TryGetBytes((ushort)bodyLength, buffer));
var contractRet = NetworkBitConverter.TryGetBytes((ushort)bodyLength, buffer);
Contract.Assert(contractRet);
}

Encoding.UTF8.GetBytes(str, buffer.Slice(headerLength));
Expand Down Expand Up @@ -145,7 +146,8 @@ public void WriteBytes(double val, SerializationContext context)
{
var buffer = bufferBackend.AsSpan(0, bytesNeed);
buffer[0] = (byte)Amf0Type.Number;
Contract.Assert(NetworkBitConverter.TryGetBytes(val, buffer.Slice(Amf0CommonValues.MARKER_LENGTH)));
var contractRet = NetworkBitConverter.TryGetBytes(val, buffer.Slice(Amf0CommonValues.MARKER_LENGTH));
Contract.Assert(contractRet);
context.Buffer.WriteToBuffer(buffer);
}
finally
Expand Down Expand Up @@ -185,7 +187,8 @@ private void WriteReferenceIndexBytes(ushort index, SerializationContext context
{
var buffer = backend.AsSpan(0, bytesNeed);
buffer[0] = (byte)Amf0Type.Reference;
Contract.Assert(NetworkBitConverter.TryGetBytes(index, buffer.Slice(Amf0CommonValues.MARKER_LENGTH)));
var contractRet = NetworkBitConverter.TryGetBytes(index, buffer.Slice(Amf0CommonValues.MARKER_LENGTH));
Contract.Assert(contractRet);
context.Buffer.WriteToBuffer(buffer);
}
finally
Expand Down Expand Up @@ -213,7 +216,8 @@ public void WriteBytes(DateTime dateTime, SerializationContext context)
buffer[0] = (byte)Amf0Type.Date;
var dof = new DateTimeOffset(dateTime);
var timestamp = (double)dof.ToUnixTimeMilliseconds();
Contract.Assert(NetworkBitConverter.TryGetBytes(timestamp, buffer.Slice(Amf0CommonValues.MARKER_LENGTH)));
var contractRet = NetworkBitConverter.TryGetBytes(timestamp, buffer.Slice(Amf0CommonValues.MARKER_LENGTH));
Contract.Assert(contractRet);
context.Buffer.WriteToBuffer(buffer);
}
finally
Expand Down Expand Up @@ -279,7 +283,8 @@ public void WriteBytes(List<object> value, SerializationContext context)
var countBuffer = _arrayPool.Rent(sizeof(uint));
try
{
Contract.Assert(NetworkBitConverter.TryGetBytes((uint)value.Count, countBuffer));
var contractRet = NetworkBitConverter.TryGetBytes((uint)value.Count, countBuffer);
Contract.Assert(contractRet);
context.Buffer.WriteToBuffer(countBuffer.AsSpan(0, sizeof(uint)));
}
finally
Expand Down Expand Up @@ -314,7 +319,8 @@ public void WriteBytes(Dictionary<string, object> value, SerializationContext co
var countBuffer = _arrayPool.Rent(sizeof(uint));
try
{
Contract.Assert(NetworkBitConverter.TryGetBytes((uint)value.Count, countBuffer));
var contractRet = NetworkBitConverter.TryGetBytes((uint)value.Count, countBuffer);
Contract.Assert(contractRet);
context.Buffer.WriteToBuffer(countBuffer.AsSpan(0, sizeof(uint)));
}
finally
Expand Down
21 changes: 14 additions & 7 deletions Harmonic/Networking/Amf/Serialization/Amf3/Amf3Writer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ public Amf3Writer()
private void WrapVector(object value, SerializationContext context)
{
var valueType = value.GetType();
Contract.Assert(valueType.IsGenericType);
var contractRet = valueType.IsGenericType;
Contract.Assert(contractRet);
var defination = valueType.GetGenericTypeDefinition();
Contract.Assert(defination == typeof(Vector<>));
var vectorT = valueType.GetGenericArguments().First();
Expand All @@ -79,7 +80,8 @@ private void WrapVector(object value, SerializationContext context)
private void WrapDictionary(object value, SerializationContext context)
{
var valueType = value.GetType();
Contract.Assert(valueType.IsGenericType);
var contractRet = valueType.IsGenericType;
Contract.Assert(contractRet);
var defination = valueType.GetGenericTypeDefinition();
Contract.Assert(defination == typeof(Amf3Dictionary<,>));
var tKey = valueType.GetGenericArguments().First();
Expand Down Expand Up @@ -205,7 +207,8 @@ public void WriteBytes(double value, SerializationContext context)
var backend = _arrayPool.Rent(sizeof(double));
try
{
Contract.Assert(NetworkBitConverter.TryGetBytes(value, backend));
var contractRet = NetworkBitConverter.TryGetBytes(value, backend);
Contract.Assert(contractRet);
context.Buffer.WriteToBuffer(backend.AsSpan(0, sizeof(double)));
}
finally
Expand Down Expand Up @@ -290,7 +293,8 @@ public void WriteBytes(DateTime dateTime, SerializationContext context)
var backend = _arrayPool.Rent(sizeof(double));
try
{
Contract.Assert(NetworkBitConverter.TryGetBytes(timestamp, backend));
var contractRet = NetworkBitConverter.TryGetBytes(timestamp, backend);
Contract.Assert(contractRet);
context.Buffer.WriteToBuffer(backend.AsSpan(0, sizeof(double)));
}
finally
Expand Down Expand Up @@ -511,7 +515,8 @@ public void WriteBytes(Vector<uint> value, SerializationContext context)
{
foreach (var i in value)
{
Contract.Assert(NetworkBitConverter.TryGetBytes(i, buffer));
var contractRet = NetworkBitConverter.TryGetBytes(i, buffer);
Contract.Assert(contractRet);
context.Buffer.WriteToBuffer(buffer.AsSpan(0, sizeof(uint)));
}
}
Expand Down Expand Up @@ -545,7 +550,8 @@ public void WriteBytes(Vector<int> value, SerializationContext context)

foreach (var i in value)
{
Contract.Assert(NetworkBitConverter.TryGetBytes(i, buffer));
var contractRet = NetworkBitConverter.TryGetBytes(i, buffer);
Contract.Assert(contractRet);
context.Buffer.WriteToBuffer(buffer.AsSpan(0, sizeof(int)));
}
}
Expand Down Expand Up @@ -579,7 +585,8 @@ public void WriteBytes(Vector<double> value, SerializationContext context)
{
foreach (var i in value)
{
Contract.Assert(NetworkBitConverter.TryGetBytes(i, buffer));
var contractRet = NetworkBitConverter.TryGetBytes(i, buffer);
Contract.Assert(contractRet);
context.Buffer.WriteToBuffer(buffer.AsSpan(0, sizeof(double)));
}
}
Expand Down
Loading

0 comments on commit a209e7d

Please sign in to comment.