Skip to content

Commit

Permalink
Merge branch 'dev'
Browse files Browse the repository at this point in the history
  • Loading branch information
a1q123456 committed Aug 4, 2019
2 parents 6b03f5f + a209e7d commit a64c5aa
Show file tree
Hide file tree
Showing 13 changed files with 286 additions and 162 deletions.
3 changes: 2 additions & 1 deletion Harmonic/Controllers/Record/RecordController.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using Harmonic.Rpc;
using Harmonic.Networking.Rtmp.Messages;
using Harmonic.Rpc;
using System;
using System.Collections.Generic;
using System.Text;
Expand Down
164 changes: 108 additions & 56 deletions Harmonic/Controllers/Record/RecordStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,47 +29,57 @@ public class RecordStream : NetStream
private FileStream _recordFileData = null;
private RecordService _recordService = null;
private DataMessage _metaData = null;
private uint _maxTimestamp = 0;
private uint _currentTimestamp = 0;
private SemaphoreSlim _playLock = new SemaphoreSlim(1);
private int _playing = 0;
private AmfObject _keyframes = null;
private List<object> _keyframeTimes;
private List<object> _keyframeFilePositions;
private long _bufferMs = -1;

private RtmpChunkStream VideoChunkStream { get; set; } = null;
private RtmpChunkStream AudioChunkStream { get; set; } = null;
private bool _disposed = false;
protected override void Dispose(bool disposing)
private CancellationTokenSource _playCts;

protected override async void Dispose(bool disposing)
{
base.Dispose(disposing);
if (!_disposed)
{
_disposed = true;
if (_recordFileData != null)
{
var filePath = _recordFileData.Name;
using (var recordFile = new FileStream(filePath.Substring(0, filePath.Length - 5) + ".flv", FileMode.OpenOrCreate))
try
{
recordFile.SetLength(0);
recordFile.Seek(0, SeekOrigin.Begin);
recordFile.Write(FlvMuxer.MultiplexFlvHeader(true, true));
var metaData = _metaData.Data[1] as Dictionary<string, object>;
metaData["duration"] = ((double)_maxTimestamp) / 1000;
metaData["keyframes"] = _keyframes;
_metaData.MessageHeader.MessageLength = 0;
var dataTagLen = FlvMuxer.MultiplexFlv(_metaData).Length;

var offset = recordFile.Position + dataTagLen;
for (int i = 0; i < _keyframeFilePositions.Count; i++)
var filePath = _recordFileData.Name;
using (var recordFile = new FileStream(filePath.Substring(0, filePath.Length - 5) + ".flv", FileMode.OpenOrCreate))
{
_keyframeFilePositions[i] = (double)_keyframeFilePositions[i] + offset;
recordFile.SetLength(0);
recordFile.Seek(0, SeekOrigin.Begin);
await recordFile.WriteAsync(FlvMuxer.MultiplexFlvHeader(true, true));
var metaData = _metaData.Data[1] as Dictionary<string, object>;
metaData["duration"] = ((double)_currentTimestamp) / 1000;
metaData["keyframes"] = _keyframes;
_metaData.MessageHeader.MessageLength = 0;
var dataTagLen = FlvMuxer.MultiplexFlv(_metaData).Length;

var offset = recordFile.Position + dataTagLen;
for (int i = 0; i < _keyframeFilePositions.Count; i++)
{
_keyframeFilePositions[i] = (double)_keyframeFilePositions[i] + offset;
}

await recordFile.WriteAsync(FlvMuxer.MultiplexFlv(_metaData));
_recordFileData.Seek(0, SeekOrigin.Begin);
await _recordFileData.CopyToAsync(recordFile);
_recordFileData.Dispose();
File.Delete(filePath);
}

recordFile.Write(FlvMuxer.MultiplexFlv(_metaData));
_recordFileData.Seek(0, SeekOrigin.Begin);
_recordFileData.CopyTo(recordFile);
_recordFileData.Dispose();
File.Delete(filePath);
}
catch (Exception e)
{
Console.WriteLine(e);
}
}
_recordFile?.Dispose();
Expand Down Expand Up @@ -102,6 +112,7 @@ public async Task Publish([FromOptionalArgument] string streamName, [FromOptiona
MessageStream.RegisterMessageHandler<DataMessage>(HandleData);
MessageStream.RegisterMessageHandler<AudioMessage>(HandleAudioMessage);
MessageStream.RegisterMessageHandler<VideoMessage>(HandleVideoMessage);
MessageStream.RegisterMessageHandler<UserControlMessage>(HandleUserControlMessage);
onStatus.InfoObject = new AmfObject
{
{"level", "status" },
Expand All @@ -120,11 +131,19 @@ public async Task Publish([FromOptionalArgument] string streamName, [FromOptiona
_keyframes.Add("filepositions", _keyframeFilePositions);
}

private void HandleUserControlMessage(UserControlMessage msg)
{
if (msg.UserControlEventType == UserControlEventType.SetBufferLength)
{
_bufferMs = (msg as SetBufferLengthMessage).BufferMilliseconds;
}
}

private async void HandleAudioMessage(AudioMessage message)
{
try
{
_maxTimestamp = Math.Max(_maxTimestamp, message.MessageHeader.Timestamp);
_currentTimestamp = Math.Max(_currentTimestamp, message.MessageHeader.Timestamp);

await SaveMessage(message);
}
Expand All @@ -138,7 +157,7 @@ private async void HandleVideoMessage(VideoMessage message)
{
try
{
_maxTimestamp = Math.Max(_maxTimestamp, message.MessageHeader.Timestamp);
_currentTimestamp = Math.Max(_currentTimestamp, message.MessageHeader.Timestamp);

var head = message.Data.Span[0];

Expand Down Expand Up @@ -184,21 +203,16 @@ public async Task Seek([FromOptionalArgument] double milliSeconds)
resetStatus.InfoObject = resetData;
await MessageStream.SendMessageAsync(ChunkStream, resetStatus);

await SeekAndPlay(milliSeconds);
}

private async Task SeekAndPlay(double milliSeconds)
{
await _playLock.WaitAsync();
_recordFileData.Seek(0, SeekOrigin.Begin);

await FlvDemuxer.SeekAsync(milliSeconds);

_playLock.Release();
if (_playing == 0)
_playCts?.Cancel();
while (_playing == 1)
{
await StartPlay();
await Task.Yield();
}

var cts = new CancellationTokenSource();
_playCts?.Dispose();
_playCts = cts;
await SeekAndPlay(milliSeconds, cts.Token);
}

[RpcMethod("play")]
Expand Down Expand Up @@ -229,52 +243,88 @@ public async Task Play(
{"description", "Started playing." },
{"details", streamName }
};

var startStatus = RtmpSession.CreateCommandMessage<OnStatusCommandMessage>();
startStatus.InfoObject = startData;
await MessageStream.SendMessageAsync(ChunkStream, startStatus);

var bandwidthLimit = new WindowAcknowledgementSizeMessage()
{
WindowSize = 500 * 1024
};
await RtmpSession.ControlMessageStream.SendMessageAsync(RtmpSession.ControlChunkStream, bandwidthLimit);
VideoChunkStream = RtmpSession.CreateChunkStream();
AudioChunkStream = RtmpSession.CreateChunkStream();

var cts = new CancellationTokenSource();
_playCts?.Dispose();
_playCts = cts;
start = Math.Max(start, 0);
await SeekAndPlay(start / 1000);
await SeekAndPlay(start / 1000, cts.Token);
}

[RpcMethod("pause")]
public async Task Pause([FromOptionalArgument] bool isPause, [FromOptionalArgument] double milliseconds)
{
if (isPause)
{
await _playLock.WaitAsync();
_recordFile.Seek(0, SeekOrigin.End);
_playLock.Release();
_playCts?.Cancel();
while (_playing == 1)
{
await Task.Yield();
}
}
else
{
await SeekAndPlay(milliseconds);
var cts = new CancellationTokenSource();
_playCts?.Dispose();
_playCts = cts;
await SeekAndPlay(milliseconds, cts.Token);
}
}

private async Task StartPlay()
private async Task StartPlayNoLock(CancellationToken ct)
{
Interlocked.Exchange(ref _playing, 1);
while (_recordFile.Position < _recordFile.Length)
while (_recordFile.Position < _recordFile.Length && !ct.IsCancellationRequested)
{
await PlayRecordFile();
while (_bufferMs != -1 && _currentTimestamp >= _bufferMs)
{
await Task.Yield();
}

await PlayRecordFileNoLock(ct);
}
Interlocked.Exchange(ref _playing, 0);
}

private Task<Message> ReadMessage()
private Task<Message> ReadMessage(CancellationToken ct)
{
return FlvDemuxer.DemultiplexFlvAsync();
return FlvDemuxer.DemultiplexFlvAsync(ct);
}

private async Task PlayRecordFile()
private async Task SeekAndPlay(double milliSeconds, CancellationToken ct)
{
await _playLock.WaitAsync();
var message = await ReadMessage();
_playLock.Release();
Interlocked.Exchange(ref _playing, 1);
try
{

_recordFile.Seek(9, SeekOrigin.Begin);
FlvDemuxer.SeekNoLock(milliSeconds, _metaData == null ? null : _metaData.Data[2] as Dictionary<string, object>, ct);
await StartPlayNoLock(ct);
}
catch (Exception e)
{
Console.WriteLine(e);
}
finally
{
Interlocked.Exchange(ref _playing, 0);
_playLock.Release();
}
}

private async Task PlayRecordFileNoLock(CancellationToken ct)
{
var message = await ReadMessage(ct);
if (message is AudioMessage)
{
await MessageStream.SendMessageAsync(AudioChunkStream, message);
Expand All @@ -283,11 +333,13 @@ private async Task PlayRecordFile()
{
await MessageStream.SendMessageAsync(VideoChunkStream, message);
}
else if (message is DataMessage)
else if (message is DataMessage data)
{
await MessageStream.SendMessageAsync(ChunkStream, message);
data.Data.Insert(0, "@setDataFrame");
_metaData = data;
await MessageStream.SendMessageAsync(ChunkStream, data);
}

_currentTimestamp = Math.Max(_currentTimestamp, message.MessageHeader.Timestamp);
}

private async Task SaveMessage(Message message)
Expand Down
38 changes: 2 additions & 36 deletions Harmonic/Controllers/WebSocketPlayController.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,6 @@ namespace Harmonic.Controllers
{
public class WebSocketPlayController : WebSocketController, IDisposable
{
class SeekAction
{
[JsonProperty("action")]
public string Action { get; set; }
[JsonProperty("filePos")]
public double FilePos { get; set; }
}

private RecordService _recordService = null;
private PublisherSessionService _publisherSessionService = null;
private List<Action> _cleanupActions = new List<Action>();
Expand Down Expand Up @@ -102,7 +94,7 @@ private async Task PlayRecordFile()
{
Interlocked.Exchange(ref _playing, 1);
var buffer = new byte[512];
int bytesRead = 0;
int bytesRead;
do
{
await _playLock.WaitAsync();
Expand All @@ -127,34 +119,8 @@ private void SendAudio(AudioMessage message)
Session.SendMessageAsync(message);
}

public override async void OnMessage(string msg)
public override void OnMessage(string msg)
{
//var obj = JObject.Parse(msg);
//var action = obj["action"].Value<string>();

//if (action == "suspend")
//{
// await _playLock.WaitAsync();
//}
//else if (action == "seek")
//{
// if (_playLock.CurrentCount != 0)
// {
// return;
// }

// var pos = JsonConvert.DeserializeObject<SeekAction>(msg).FilePos;

// _recordFile.Seek((int)pos, SeekOrigin.Begin);

// _playLock.Release();

// if (_playing == 0)
// {
// await PlayRecordFile();
// }

//}
}

#region IDisposable Support
Expand Down
9 changes: 9 additions & 0 deletions Harmonic/Harmonic.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,15 @@
<AllowUnsafeBlocks>false</AllowUnsafeBlocks>
</PropertyGroup>

<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|AnyCPU'">
<Optimize>true</Optimize>
<DefineConstants>TRACE</DefineConstants>
</PropertyGroup>

<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|AnyCPU'">
<Optimize>false</Optimize>
</PropertyGroup>

<ItemGroup>
<Compile Remove="Networking\IStreamSession.cs" />
<Compile Remove="Networking\Rtmp\Supervisor.cs" />
Expand Down
1 change: 1 addition & 0 deletions Harmonic/Networking/Amf/Serialization/Amf0/Amf0Reader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -585,6 +585,7 @@ public bool TryGetStrictArray(Span<byte> buffer, out List<object> array, out int
int consumed = Amf0CommonValues.MARKER_LENGTH + sizeof(uint);
var arrayBodyBuffer = buffer.Slice(consumed);
var elementBodyBuffer = arrayBodyBuffer;
System.Diagnostics.Debug.WriteLine(elementCount);
for (uint i = 0; i < elementCount; i++)
{
if (!TryGetValue(elementBodyBuffer, out _, out var element, out var bufferConsumed))
Expand Down
Loading

0 comments on commit a64c5aa

Please sign in to comment.