Skip to content

Commit

Permalink
Switch more components to leak bus implementation.
Browse files Browse the repository at this point in the history
  • Loading branch information
amacal committed Nov 12, 2016
1 parent 4ac2a59 commit e326999
Show file tree
Hide file tree
Showing 31 changed files with 317 additions and 80 deletions.
6 changes: 5 additions & 1 deletion sources/Leak.Core/Cando/CandoConfiguration.cs
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
namespace Leak.Core.Cando
using Leak.Core.Core;

namespace Leak.Core.Cando
{
public class CandoConfiguration
{
public CandoBuilder Extensions { get; set; }

public CandoCallback Callback { get; set; }

public LeakBus Bus { get; set; }
}
}
8 changes: 7 additions & 1 deletion sources/Leak.Core/Cando/CandoContext.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using Leak.Core.Core;
using System;

namespace Leak.Core.Cando
{
Expand Down Expand Up @@ -35,6 +36,11 @@ public CandoCallback Callback
get { return configuration.Callback; }
}

public LeakBus Bus
{
get { return configuration.Bus; }
}

public CandoCollection Collection
{
get { return collection; }
Expand Down
6 changes: 6 additions & 0 deletions sources/Leak.Core/Cando/CandoMap.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using Leak.Core.Bencoding;
using System.Collections.Generic;
using System.Linq;

namespace Leak.Core.Cando
{
Expand All @@ -23,6 +24,11 @@ public void Add(byte value, string name)
}
}

public string[] All()
{
return byName.Keys.ToArray();
}

public string Translate(byte id)
{
string name;
Expand Down
7 changes: 7 additions & 0 deletions sources/Leak.Core/Cando/CandoService.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using Leak.Core.Bencoding;
using Leak.Core.Cando.Events;
using Leak.Core.Common;
using Leak.Core.Connector;
using Leak.Core.Listener;
Expand Down Expand Up @@ -120,6 +121,12 @@ private void SendHandshakeIfRequested(CandoEntry entry, Extended payload)

CallHandshakeOnEachHandler(entry, handshake);
CallHandshakeIfRequired(entry);

context.Bus.Publish("extension-exchanged", new ExtensionExchanged
{
Peer = entry.Session.Peer,
Extensions = entry.Remote.All()
});
}
}

Expand Down
11 changes: 11 additions & 0 deletions sources/Leak.Core/Cando/Events/ExtensionExchanged.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
using Leak.Core.Common;

namespace Leak.Core.Cando.Events
{
public class ExtensionExchanged
{
public PeerHash Peer;

public string[] Extensions;
}
}
7 changes: 4 additions & 3 deletions sources/Leak.Core/Client/PeerClientContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public PeerClientContext(Action<PeerClientConfiguration> configurer)

collector = new PeerCollector(with =>
{
with.Bus = bus;
with.Callback = new PeerClientToCollector(this);
with.Countries = configuration.Countries;
Expand Down Expand Up @@ -119,15 +120,15 @@ public PeerClientContext(Action<PeerClientConfiguration> configurer)
});
}

worker.Start();
pipeline.Start();

bus?.Start(pipeline);
network?.Start(pipeline);
connector?.Start(pipeline);
listener?.Start(pipeline);
telegraph?.Start(pipeline);
collector?.Start(pipeline);

worker.Start();
pipeline.Start();
}

/// <summary>
Expand Down
30 changes: 1 addition & 29 deletions sources/Leak.Core/Collector/Callbacks/PeerCollectorToListener.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
using Leak.Core.Collector.Events;
using Leak.Core.Common;
using Leak.Core.Common;
using Leak.Core.Listener;
using Leak.Core.Network;

Expand Down Expand Up @@ -28,33 +27,6 @@ public override void OnConnecting(PeerListenerConnecting connecting)
context.Callback.OnConnectingFrom(connecting.Connection.Remote);
}

public override void OnConnected(NetworkConnection connection)
{
int total = 0;
bool accepted = false;

lock (context.Synchronized)
{
if (context.Bouncer.AcceptRemote(connection))
{
accepted = true;
total = context.Bouncer.Count();
}
}

if (accepted)
{
PeerAddress peer = connection.Remote;
PeerCollectorConnected connected = new PeerCollectorConnected(peer, total);

context.Callback.OnConnectedFrom(connected);
}
else
{
connection.Terminate();
}
}

public override void OnRejected(NetworkConnection connection)
{
context.Callback.OnRejected(connection.Remote);
Expand Down
7 changes: 2 additions & 5 deletions sources/Leak.Core/Collector/Criterions/OrderByBitfield.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,10 @@ public IEnumerable<PeerSession> Accept(IEnumerable<PeerSession> sessions, PeerCo
Completed = context.Battlefield.Get(session)?.Completed
};

if (peer.Completed != null)
{
result.Add(peer);
}
result.Add(peer);
}

return result.OrderByDescending(x => x.Completed.Value).Select(x => x.Session);
return result.OrderByDescending(x => x.Completed).Select(x => x.Session);
}

private struct Peer
Expand Down
6 changes: 5 additions & 1 deletion sources/Leak.Core/Collector/PeerCollectorConfiguration.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
namespace Leak.Core.Collector
using Leak.Core.Core;

namespace Leak.Core.Collector
{
public class PeerCollectorConfiguration
{
public LeakBus Bus { get; set; }

public PeerCollectorCallback Callback { get; set; }

public PeerCollectorExtensionBuilder Extensions { get; set; }
Expand Down
6 changes: 6 additions & 0 deletions sources/Leak.Core/Collector/PeerCollectorContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ public class PeerCollectorContext
private readonly RankingService ranking;
private readonly BattlefieldService battlefield;
private readonly PeerCollectorBlockFactory blockFactory;
private readonly PeerCollectorSubscriber subscriber;

public PeerCollectorContext(Action<PeerCollectorConfiguration> configurer)
{
Expand Down Expand Up @@ -68,7 +69,9 @@ public PeerCollectorContext(Action<PeerCollectorConfiguration> configurer)

cando = new CandoService(with =>
{
with.Bus = configuration.Bus;
with.Callback = new PeerCollectorToCando(this);
configuration.Extensions.Apply(with);
});

Expand All @@ -85,6 +88,9 @@ public PeerCollectorContext(Action<PeerCollectorConfiguration> configurer)

synchronized = new object();
blockFactory = new PeerCollectorBlockFactory();

subscriber = new PeerCollectorSubscriber(this);
configuration.Bus.Subscribe(subscriber.Handle);
}

public object Synchronized
Expand Down
53 changes: 53 additions & 0 deletions sources/Leak.Core/Collector/PeerCollectorSubscriber.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
using Leak.Core.Collector.Events;
using Leak.Core.Common;
using Leak.Core.Network;

namespace Leak.Core.Collector
{
public class PeerCollectorSubscriber
{
private readonly PeerCollectorContext context;

public PeerCollectorSubscriber(PeerCollectorContext context)
{
this.context = context;
}

public void Handle(string name, dynamic payload)
{
switch (name)
{
case "listener-accepted":
HandleConnected(payload.Connection);
break;
}
}

private void HandleConnected(NetworkConnection connection)
{
int total = 0;
bool accepted = false;

lock (context.Synchronized)
{
if (context.Bouncer.AcceptRemote(connection))
{
accepted = true;
total = context.Bouncer.Count();
}
}

if (accepted)
{
PeerAddress peer = connection.Remote;
PeerCollectorConnected connected = new PeerCollectorConnected(peer, total);

context.Callback.OnConnectedFrom(connected);
}
else
{
connection.Terminate();
}
}
}
}
7 changes: 4 additions & 3 deletions sources/Leak.Core/Core/LeakBus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ private class Trigger : LeakPipelineTrigger
private readonly LeakBus bus;
private readonly ConcurrentQueue<Event> events;

private ManualResetEvent ready;
private ManualResetEvent onReady;

public Trigger(LeakBus bus)
{
Expand All @@ -83,7 +83,7 @@ public Trigger(LeakBus bus)

public void Register(ManualResetEvent watch)
{
ready = watch;
onReady = watch;
}

public void Publish(string name, object payload)
Expand All @@ -94,12 +94,13 @@ public void Publish(string name, object payload)
Payload = payload
});

ready.Set();
onReady.Set();
}

public void Execute()
{
Event data;
onReady.Reset();

while (events.TryDequeue(out data))
{
Expand Down
12 changes: 12 additions & 0 deletions sources/Leak.Core/Core/LeakPipeline.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ public LeakPipeline()

public void Start()
{
Register(new Terminator());
worker.Start();
}

Expand Down Expand Up @@ -81,5 +82,16 @@ private void Execute()
}
}
}

private class Terminator : LeakPipelineTrigger
{
public void Register(ManualResetEvent watch)
{
}

public void Execute()
{
}
}
}
}
5 changes: 5 additions & 0 deletions sources/Leak.Core/Leak.Core.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
<Compile Include="Cando\CandoHandler.cs" />
<Compile Include="Cando\CandoMap.cs" />
<Compile Include="Cando\CandoService.cs" />
<Compile Include="Cando\Events\ExtensionExchanged.cs" />
<Compile Include="Cando\Metadata\MetadataSize.cs" />
<Compile Include="Client\Configuration\PeerClientDownloadBuilder.cs" />
<Compile Include="Client\Configuration\PeerClientMetadataBuilder.cs" />
Expand Down Expand Up @@ -98,6 +99,7 @@
<Compile Include="Collector\PeerCollectorExtension.cs" />
<Compile Include="Collector\PeerCollectorBlockFactory.cs" />
<Compile Include="Collector\PeerCollectorExtensionBuilder.cs" />
<Compile Include="Collector\PeerCollectorSubscriber.cs" />
<Compile Include="Common\BitfieldInfo.cs" />
<Compile Include="Common\PeerSession.cs" />
<Compile Include="Common\PieceInfo.cs" />
Expand All @@ -113,7 +115,10 @@
<Compile Include="Core\LeakPipelineTrigger.cs" />
<Compile Include="Core\LeakQueueBase.cs" />
<Compile Include="HashExtensions.cs" />
<Compile Include="Listener\Events\ListenerAccepted.cs" />
<Compile Include="Listener\Events\ListenerAccepting.cs" />
<Compile Include="Listener\Events\ListenerStarted.cs" />
<Compile Include="Loop\Events\MessageArrived.cs" />
<Compile Include="Messages\DataBlock.cs" />
<Compile Include="Messages\DataBlockFactory.cs" />
<Compile Include="Metaget\MetagetTaskVerify.cs" />
Expand Down
14 changes: 14 additions & 0 deletions sources/Leak.Core/Listener/Events/ListenerAccepted.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
using Leak.Core.Common;
using Leak.Core.Network;

namespace Leak.Core.Listener.Events
{
public class ListenerAccepted
{
public PeerHash Local;

public PeerAddress Remote;

public NetworkConnection Connection;
}
}
11 changes: 11 additions & 0 deletions sources/Leak.Core/Listener/Events/ListenerAccepting.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
using Leak.Core.Common;

namespace Leak.Core.Listener.Events
{
public class ListenerAccepting
{
public PeerHash Local;

public PeerAddress Remote;
}
}
2 changes: 1 addition & 1 deletion sources/Leak.Core/Listener/Events/ListenerStarted.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ namespace Leak.Core.Listener.Events
{
public class ListenerStarted
{
public PeerHash Peer;
public PeerHash Local;

public int Port;
}
Expand Down
Loading

0 comments on commit e326999

Please sign in to comment.