Skip to content

Commit

Permalink
Fix hanging when the buffer factory is exhausted.
Browse files Browse the repository at this point in the history
  • Loading branch information
amacal committed Sep 25, 2016
1 parent a2850cd commit 31b51ab
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 22 deletions.
30 changes: 14 additions & 16 deletions sources/Leak.Core/Collector/PeerCollectorBlockFactory.cs
Original file line number Diff line number Diff line change
@@ -1,20 +1,18 @@
using Leak.Core.Messages;
using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Threading;

namespace Leak.Core.Collector
{
public class PeerCollectorBlockFactory : DataBlockFactory
{
private readonly byte[] buffer;
private readonly ConcurrentQueue<int> map;
private readonly int size;
private readonly ConcurrentQueue<byte[]> buffer;

public PeerCollectorBlockFactory()
{
buffer = new byte[40000 * 128];
map = new ConcurrentQueue<int>(Enumerable.Range(0, 128));
size = 40000;
buffer = new ConcurrentQueue<byte[]>();
}

public DataBlock Create(byte[] data, int offset, int count)
Expand All @@ -24,30 +22,30 @@ public DataBlock Create(byte[] data, int offset, int count)

public DataBlock New(int count, Action<byte[], int, int> callback)
{
int index = 0;
byte[] data;

while (map.TryDequeue(out index) == false)
if (buffer.TryDequeue(out data) == false)
{
Thread.Sleep(100);
data = new byte[size];
}

callback.Invoke(buffer, index * 40000, count);
return new DataBlockInstance(buffer, index * 40000, count, () => map.Enqueue(index));
callback.Invoke(data, 0, count);
return new DataBlockInstance(data, 0, count, buffer);
}

private class DataBlockInstance : DataBlock
{
private readonly byte[] data;
private readonly int start;
private readonly int count;
private readonly Action onCompleted;
private readonly ConcurrentQueue<byte[]> buffer;

public DataBlockInstance(byte[] data, int start, int count, Action onCompleted)
public DataBlockInstance(byte[] data, int start, int count, ConcurrentQueue<byte[]> buffer)
{
this.data = data;
this.start = start;
this.count = count;
this.onCompleted = onCompleted;
this.buffer = buffer;
}

public int Size
Expand All @@ -67,12 +65,12 @@ public void Write(Action<byte[], int, int> stream)

public DataBlock Scope(int offset)
{
return new DataBlockInstance(data, start + offset, count - offset, onCompleted);
return new DataBlockInstance(data, start + offset, count - offset, buffer);
}

public void Dispose()
{
onCompleted?.Invoke();
buffer.Enqueue(data);
}
}
}
Expand Down
9 changes: 3 additions & 6 deletions sources/Leak.Core/Messages/PieceMessage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,16 @@
{
public class PieceMessage
{
private readonly DataBlock block;
private readonly DataBlock data;

private readonly int piece;
private readonly int offset;

public PieceMessage(DataBlock block)
{
this.block = block;
this.data = block.Scope(8);

this.piece = block[3] + block[2] * 256 + block[1] * 256 * 256;
this.offset = block[7] + block[6] * 256 + block[5] * 256 * 256;
data = block.Scope(8);
piece = block[3] + block[2] * 256 + block[1] * 256 * 256;
offset = block[7] + block[6] * 256 + block[5] * 256 * 256;
}

public int Piece
Expand Down
17 changes: 17 additions & 0 deletions sources/Leak.Core/Omnibus/Components/OmnibusReservationComparer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
using System.Collections.Generic;

namespace Leak.Core.Omnibus.Components
{
public class OmnibusReservationComparer : IEqualityComparer<OmnibusBlock>
{
public int GetHashCode(OmnibusBlock obj)
{
return OmnibusBlock.GetHashCode(obj);
}

public bool Equals(OmnibusBlock x, OmnibusBlock y)
{
return OmnibusBlock.Equals(x, y);
}
}
}

0 comments on commit 31b51ab

Please sign in to comment.