Skip to content
This repository has been archived by the owner on Oct 20, 2023. It is now read-only.

Commit

Permalink
BTStream support for Ethereum family
Browse files Browse the repository at this point in the history
  • Loading branch information
Oliver Weichhold committed Oct 24, 2018
1 parent fbfccee commit c7c7789
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 76 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ portions of the Software.
SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/

using Miningcore.Configuration;

namespace Miningcore.Blockchain.Ethereum.Configuration
{
public class EthereumPoolConfigExtra
Expand All @@ -36,5 +38,10 @@ public class EthereumPoolConfigExtra
/// Useful to specify the real chain type when running geth
/// </summary>
public string ChainTypeOverride { get; set; }

/// <summary>
/// getWork stream published via ZMQ
/// </summary>
public ZmqPubSubEndpointConfig BtStream { get; set; }
}
}
173 changes: 97 additions & 76 deletions src/Miningcore/Blockchain/Ethereum/EthereumJobManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,6 @@ private async Task<EthereumBlockTemplate> GetBlockTemplateGethAsync()

private EthereumBlockTemplate AssembleBlockTemplate(string[] work)
{
// only parity returns the 4th element (block height)
if (work.Length < 4)
{
logger.Error(() => $"Error(s) refreshing blocktemplate: getWork did not return blockheight. Are you really connected to a Parity daemon?");
Expand Down Expand Up @@ -652,99 +651,120 @@ private void ConfigureRewards()

protected virtual async Task SetupJobUpdatesAsync()
{
var enableStreaming = extraPoolConfig?.EnableDaemonWebsocketStreaming == true;

if (enableStreaming && !poolConfig.Daemons.Any(x =>
x.Extra.SafeExtensionDataAs<EthereumDaemonEndpointConfigExtra>()?.PortWs.HasValue == true))
if (extraPoolConfig?.BtStream == null)
{
logger.Warn(() => $"'{nameof(EthereumPoolConfigExtra.EnableDaemonWebsocketStreaming).ToLowerCamelCase()}' enabled but not a single daemon found with a configured websocket port ('{nameof(EthereumDaemonEndpointConfigExtra.PortWs).ToLowerCamelCase()}'). Falling back to polling.");
enableStreaming = false;
}

if (enableStreaming)
{
// collect ports
var wsDaemons = poolConfig.Daemons
.Where(x => x.Extra.SafeExtensionDataAs<EthereumDaemonEndpointConfigExtra>()?.PortWs.HasValue == true)
.ToDictionary(x => x, x =>
{
var extra = x.Extra.SafeExtensionDataAs<EthereumDaemonEndpointConfigExtra>();
var enableStreaming = extraPoolConfig?.EnableDaemonWebsocketStreaming == true;

return (extra.PortWs.Value, extra.HttpPathWs, extra.SslWs);
});

logger.Info(() => $"Subscribing to WebSocket(s) {string.Join(", ", wsDaemons.Keys.Select(x => $"{(wsDaemons[x].SslWs ? "wss" : "ws")}://{x.Host}:{wsDaemons[x].Value}").Distinct())}");
if (enableStreaming && !poolConfig.Daemons.Any(x =>
x.Extra.SafeExtensionDataAs<EthereumDaemonEndpointConfigExtra>()?.PortWs.HasValue == true))
{
logger.Warn(() => $"'{nameof(EthereumPoolConfigExtra.EnableDaemonWebsocketStreaming).ToLowerCamelCase()}' enabled but not a single daemon found with a configured websocket port ('{nameof(EthereumDaemonEndpointConfigExtra.PortWs).ToLowerCamelCase()}'). Falling back to polling.");
enableStreaming = false;
}

if (isParity)
if (enableStreaming)
{
// stream work updates
var getWorkObs = daemon.WebsocketSubscribe(logger, wsDaemons, EC.ParitySubscribe, new[] { (object) EC.GetWork })
.Select(data =>
// collect ports
var wsDaemons = poolConfig.Daemons
.Where(x => x.Extra.SafeExtensionDataAs<EthereumDaemonEndpointConfigExtra>()?.PortWs.HasValue == true)
.ToDictionary(x => x, x =>
{
try
{
var psp = DeserializeRequest(data).ParamsAs<PubSubParams<string[]>>();
return psp?.Result;
}
var extra = x.Extra.SafeExtensionDataAs<EthereumDaemonEndpointConfigExtra>();
catch(Exception ex)
{
logger.Info(() => $"Error deserializing pending block: {ex.Message}");
}
return null;
return (extra.PortWs.Value, extra.HttpPathWs, extra.SslWs);
});

Jobs = getWorkObs.Where(x => x != null)
.Select(AssembleBlockTemplate)
.Select(UpdateJob)
.Do(isNew =>
{
if (isNew)
logger.Info(() => $"New block {currentJob.BlockTemplate.Height} detected");
})
.Where(isNew => isNew)
.Select(_ => GetJobParamsForStratum(true))
.Publish()
.RefCount();
}
logger.Info(() => $"Subscribing to WebSocket(s) {string.Join(", ", wsDaemons.Keys.Select(x => $"{(wsDaemons[x].SslWs ? "wss" : "ws")}://{x.Host}:{wsDaemons[x].Value}").Distinct())}");

else
{
var wsSubscription = "newHeads";
var isRetry = false;
retry:
if (isParity)
{
// stream work updates
var getWorkObs = daemon.WebsocketSubscribe(logger, wsDaemons, EC.ParitySubscribe, new[] { (object)EC.GetWork })
.Select(data =>
{
try
{
var psp = DeserializeRequest(data).ParamsAs<PubSubParams<string[]>>();
return psp?.Result;
}
catch (Exception ex)
{
logger.Info(() => $"Error deserializing pending block: {ex.Message}");
}
return null;
});

Jobs = getWorkObs.Where(x => x != null)
.Select(AssembleBlockTemplate)
.Select(UpdateJob)
.Do(isNew =>
{
if (isNew)
logger.Info(() => $"New block {currentJob.BlockTemplate.Height} detected");
})
.Where(isNew => isNew)
.Select(_ => GetJobParamsForStratum(true))
.Publish()
.RefCount();
}

// stream work updates
var getWorkObs = daemon.WebsocketSubscribe(logger, wsDaemons, EC.Subscribe, new[] { (object) wsSubscription, new object() });
else
{
var wsSubscription = "newHeads";
var isRetry = false;
retry:

// test subscription
var subcriptionResponse = await getWorkObs
.Take(1)
.Select(x=> JsonConvert.DeserializeObject<JsonRpcResponse<string>>(Encoding.UTF8.GetString(x)))
.ToTask();
// stream work updates
var getWorkObs = daemon.WebsocketSubscribe(logger, wsDaemons, EC.Subscribe, new[] { (object)wsSubscription, new object() });

if(subcriptionResponse.Error != null)
{
// older versions of geth only support subscriptions to "newBlocks"
if(!isRetry && subcriptionResponse.Error.Code == (int) BitcoinRPCErrorCode.RPC_METHOD_NOT_FOUND)
// test subscription
var subcriptionResponse = await getWorkObs
.Take(1)
.Select(x => JsonConvert.DeserializeObject<JsonRpcResponse<string>>(Encoding.UTF8.GetString(x)))
.ToTask();

if (subcriptionResponse.Error != null)
{
wsSubscription = "newBlocks";
// older versions of geth only support subscriptions to "newBlocks"
if (!isRetry && subcriptionResponse.Error.Code == (int)BitcoinRPCErrorCode.RPC_METHOD_NOT_FOUND)
{
wsSubscription = "newBlocks";

isRetry = true;
goto retry;
isRetry = true;
goto retry;
}

logger.ThrowLogPoolStartupException($"Unable to subscribe to geth websocket '{wsSubscription}': {subcriptionResponse.Error.Message} [{subcriptionResponse.Error.Code}]");
}

logger.ThrowLogPoolStartupException($"Unable to subscribe to geth websocket '{wsSubscription}': {subcriptionResponse.Error.Message} [{subcriptionResponse.Error.Code}]");
Jobs = getWorkObs.Where(x => x != null)
.Select(_ => Observable.FromAsync(UpdateJobAsync))
.Concat()
.Do(isNew =>
{
if (isNew)
logger.Info(() => $"Detected new block {currentJob.BlockTemplate.Height} via WebSocket");
})
.Where(isNew => isNew)
.Select(_ => GetJobParamsForStratum(true))
.Publish()
.RefCount();
}
}

else
{
var pollingInterval = poolConfig.BlockRefreshInterval > 0 ? poolConfig.BlockRefreshInterval : 1000;

Jobs = getWorkObs.Skip(1).Where(x => x != null)
Jobs = Observable.Interval(TimeSpan.FromMilliseconds(pollingInterval))
.Select(_ => Observable.FromAsync(UpdateJobAsync))
.Concat()
.Do(isNew =>
{
if (isNew)
logger.Info(() => $"New block {currentJob.BlockTemplate.Height} detected");
logger.Info(() => $"Detected new block {currentJob.BlockTemplate.Height} via RPC Polling");
})
.Where(isNew => isNew)
.Select(_ => GetJobParamsForStratum(true))
Expand All @@ -755,15 +775,16 @@ protected virtual async Task SetupJobUpdatesAsync()

else
{
var pollingInterval = poolConfig.BlockRefreshInterval > 0 ? poolConfig.BlockRefreshInterval : 1000;
var btStream = BtStreamSubscribe(extraPoolConfig.BtStream);

Jobs = Observable.Interval(TimeSpan.FromMilliseconds(pollingInterval))
.Select(_ => Observable.FromAsync(UpdateJobAsync))
.Concat()
Jobs = btStream.Where(x => x != null)
.Select(JsonConvert.DeserializeObject<string[]>)
.Select(AssembleBlockTemplate)
.Select(UpdateJob)
.Do(isNew =>
{
if (isNew)
logger.Info(() => $"New block {currentJob.BlockTemplate.Height} detected");
logger.Info(() => $"Detected new block {currentJob.BlockTemplate.Height} via BT-Stream");
})
.Where(isNew => isNew)
.Select(_ => GetJobParamsForStratum(true))
Expand Down

0 comments on commit c7c7789

Please sign in to comment.