Skip to content

Commit

Permalink
Merge pull request #491 from MatrixAI/feature-dns-multi
Browse files Browse the repository at this point in the history
Multi-Host DNS and Multi NodeID resolution - for network entry and general usage
  • Loading branch information
tegefaulkes authored Nov 8, 2022
2 parents 854ef56 + ec5d32f commit 1890801
Show file tree
Hide file tree
Showing 27 changed files with 904 additions and 374 deletions.
1 change: 1 addition & 0 deletions src/PolykeyAgent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -573,6 +573,7 @@ class PolykeyAgent {
PolykeyAgent.eventSymbols.Proxy,
async (data: ConnectionData) => {
if (data.type === 'reverse') {
if (this.keyManager.getNodeId().equals(data.remoteNodeId)) return;
const address = networkUtils.buildAddress(
data.remoteHost,
data.remotePort,
Expand Down
8 changes: 6 additions & 2 deletions src/agent/GRPCClientAgent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,12 @@ class GRPCClientAgent extends GRPCClient<AgentServiceClient> {
return grpcClientAgent;
}

public async destroy() {
await super.destroy();
public async destroy({
timeout,
}: {
timeout?: number;
} = {}) {
await super.destroy({ timeout });
}

@ready(new agentErrors.ErrorAgentClientDestroyed())
Expand Down
15 changes: 11 additions & 4 deletions src/agent/service/nodesHolePunchMessageSend.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ function nodesHolePunchMessageSend({
call.request.getProxyAddress(),
);
logger.debug(
`Received signalling message to target ${call.request.getSrcId()}@${host}:${port}`,
`Received signaling message to target ${call.request.getSrcId()}@${host}:${port}`,
);
// Ignore failure
try {
Expand All @@ -81,7 +81,7 @@ function nodesHolePunchMessageSend({
}
} else {
logger.error(
'Received signalling message, target information was missing, skipping reverse hole punch',
'Received signaling message, target information was missing, skipping reverse hole punch',
);
}
} else if (await nodeManager.knowsNode(sourceId, tran)) {
Expand All @@ -92,15 +92,22 @@ function nodesHolePunchMessageSend({
connectionInfo!.remoteHost,
connectionInfo!.remotePort,
);
// Checking if the source and destination are the same
if (sourceId?.equals(targetId)) {
// Logging and silently dropping operation
logger.warn('Signaling relay message requested signal to itself');
callback(null, response);
return;
}
call.request.setProxyAddress(proxyAddress);
logger.debug(
`Relaying signalling message from ${srcNodeId}@${
`Relaying signaling message from ${srcNodeId}@${
connectionInfo!.remoteHost
}:${
connectionInfo!.remotePort
} to ${targetNodeId} with information ${proxyAddress}`,
);
await nodeConnectionManager.relaySignallingMessage(call.request, {
await nodeConnectionManager.relaySignalingMessage(call.request, {
host: connectionInfo!.remoteHost,
port: connectionInfo!.remotePort,
});
Expand Down
22 changes: 20 additions & 2 deletions src/bin/CommandPolykey.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
import type { FileSystem } from '../types';
import commander from 'commander';
import Logger, { StreamHandler, formatting } from '@matrixai/logger';
import Logger, {
StreamHandler,
formatting,
levelToString,
evalLogDataValue,
} from '@matrixai/logger';
import * as binUtils from './utils';
import * as binOptions from './utils/options';
import * as binErrors from './errors';
Expand Down Expand Up @@ -68,8 +73,21 @@ class CommandPolykey extends commander.Command {
// Set the logger formatter according to the format
if (opts.format === 'json') {
this.logger.handlers.forEach((handler) =>
handler.setFormatter(formatting.jsonFormatter),
handler.setFormatter((record) => {
return JSON.stringify(
{
level: levelToString(record.level),
keys: record.keys,
msg: record.msg,
...record.data,
},
evalLogDataValue,
);
}),
);
} else {
const format = formatting.format`${formatting.level}:${formatting.keys}:${formatting.msg}`;
this.logger.handlers.forEach((handler) => handler.setFormatter(format));
}
// Set the global upstream GRPC logger
grpcSetLogger(this.logger.getChild('grpc'));
Expand Down
26 changes: 24 additions & 2 deletions src/config.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,30 @@
import type { Host, Port } from './network/types';
import type { NodeAddress } from 'nodes/types';
import { getDefaultNodePath } from './utils';
// @ts-ignore package.json is outside rootDir
import { version } from '../package.json';

/**
* Configuration for testnet node addresses.
* Extracted here to enforce types properly.
*/
const testnet: Record<string, NodeAddress> = {
vg9a9e957878s2qgtbdmu2atvli8ms7muukb1dk4dpbm4llkki3h0: {
host: 'testnet.polykey.io' as Host,
port: 1314 as Port,
},
vh9oqtvct10eaiv3cl4ebm0ko33sl0qqpvb59vud8cngfvqs4p4ng: {
host: 'testnet.polykey.io' as Host,
port: 1314 as Port,
},
};

/**
* Configuration for main net node addresses.
* Extracted here to enforce types properly.
*/
const mainnet: Record<string, NodeAddress> = {};

/**
* Polykey static configuration
* This is intended only for static properties
Expand Down Expand Up @@ -96,8 +118,8 @@ const config = {
},
// This is not used by the `PolykeyAgent` which defaults to `{}`
network: {
mainnet: {},
testnet: {},
mainnet: mainnet,
testnet: testnet,
},
},
};
Expand Down
2 changes: 2 additions & 0 deletions src/contexts/functions/timedCancellable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ function setupTimedCancellable<C extends ContextTimed, P extends Array<any>, R>(
);
ctx.signal = abortController.signal;
teardownContext = () => {
// The timer is not cancelled here because
// it was not created in this scope
finished = true;
};
} else {
Expand Down
2 changes: 1 addition & 1 deletion src/grpc/GRPCClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ abstract class GRPCClient<T extends Client = Client> {
const socket = session.socket as TLSSocket;
serverCertChain = networkUtils.getCertificateChain(socket);
try {
networkUtils.verifyServerCertificateChain(nodeId, serverCertChain);
networkUtils.verifyServerCertificateChain([nodeId], serverCertChain);
} catch (e) {
const e_ = e;
if (e instanceof networkErrors.ErrorCertChain) {
Expand Down
24 changes: 18 additions & 6 deletions src/network/ConnectionForward.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ type ConnectionsForward = {
interface ConnectionForward extends StartStop {}
@StartStop()
class ConnectionForward extends Connection {
public readonly nodeId: NodeId;

protected nodeId_: NodeId;
protected nodeIds: Array<NodeId>;
protected connections: ConnectionsForward;
protected pingInterval: ReturnType<typeof setInterval>;
protected utpConn: UTPConnection;
Expand Down Expand Up @@ -98,15 +98,15 @@ class ConnectionForward extends Connection {
};

public constructor({
nodeId,
nodeIds,
connections,
...rest
}: {
nodeId: NodeId;
nodeIds: Array<NodeId>;
connections: ConnectionsForward;
} & AbstractConstructorParameters<typeof Connection>[0]) {
super(rest);
this.nodeId = nodeId;
this.nodeIds = nodeIds;
this.connections = connections;
}

Expand Down Expand Up @@ -137,6 +137,10 @@ class ConnectionForward extends Connection {
} else {
ctx.signal.addEventListener('abort', () => resolveAbortedP());
}
void ctx.timer.then(
() => resolveAbortedP(),
() => {},
);
this.resolveReadyP = resolveReadyP;
this.utpSocket.on('message', this.handleMessage);
const handleStartError = (e) => {
Expand Down Expand Up @@ -211,7 +215,10 @@ class ConnectionForward extends Connection {
}
const serverCertChain = networkUtils.getCertificateChain(this.tlsSocket);
try {
networkUtils.verifyServerCertificateChain(this.nodeId, serverCertChain);
this.nodeId_ = networkUtils.verifyServerCertificateChain(
this.nodeIds,
serverCertChain,
);
} catch (e) {
this.logger.debug(
`Failed to start Connection Forward: verification failed`,
Expand Down Expand Up @@ -259,6 +266,11 @@ class ConnectionForward extends Connection {
this.logger.info('Stopped Connection Forward');
}

@ready(new networkErrors.ErrorConnectionNotRunning())
get nodeId() {
return this.nodeId_;
}

@ready(new networkErrors.ErrorConnectionNotRunning())
public compose(clientSocket: Socket): void {
try {
Expand Down
59 changes: 42 additions & 17 deletions src/network/Proxy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import * as nodesUtils from '../nodes/utils';
import { promisify } from '../utils';
import { timedCancellable, context } from '../contexts';

const clientConnectionClosedReason = Symbol('clientConnectionClosedReason');

interface Proxy extends StartStop {}
@StartStop()
class Proxy {
Expand Down Expand Up @@ -316,23 +318,32 @@ class Proxy {
* Set timer to `null` explicitly to wait forever
*/
public openConnectionForward(
nodeId: NodeId,
nodeIds: Array<NodeId>,
proxyHost: Host,
proxyPort: Port,
ctx?: Partial<ContextTimed>,
): PromiseCancellable<void>;
): PromiseCancellable<NodeId>;
@ready(new networkErrors.ErrorProxyNotRunning(), true)
@timedCancellable(true, (proxy: Proxy) => proxy.connConnectTime)
public async openConnectionForward(
nodeId: NodeId,
nodeId: Array<NodeId>,
proxyHost: Host,
proxyPort: Port,
@context ctx: ContextTimed,
): Promise<void> {
): Promise<NodeId> {
const proxyAddress = networkUtils.buildAddress(proxyHost, proxyPort);
await this.connectionLocksForward.withF([proxyAddress, Lock], async () => {
await this.establishConnectionForward(nodeId, proxyHost, proxyPort, ctx);
});
return await this.connectionLocksForward.withF(
[proxyAddress, Lock],
async () => {
const connectionForward = await this.establishConnectionForward(
nodeId,
proxyHost,
proxyPort,
ctx,
);
return connectionForward.nodeId;
},
);
}

@ready(new networkErrors.ErrorProxyNotRunning(), true)
Expand Down Expand Up @@ -409,10 +420,22 @@ class Proxy {
}
await this.connectionLocksForward.withF([proxyAddress, Lock], async () => {
const timer = new Timer({ delay: this.connConnectTime });
let cleanUpConnectionListener = () => {};
try {
await this.connectForward(nodeId, proxyHost, proxyPort, clientSocket, {
timer,
});
const connectForwardProm = this.connectForward(
[nodeId],
proxyHost,
proxyPort,
clientSocket,
{
timer,
},
);
cleanUpConnectionListener = () => {
connectForwardProm.cancel(clientConnectionClosedReason);
};
clientSocket.addListener('close', cleanUpConnectionListener);
await connectForwardProm;
} catch (e) {
if (e instanceof networkErrors.ErrorProxyConnectInvalidUrl) {
if (!clientSocket.destroyed) {
Expand Down Expand Up @@ -471,6 +494,7 @@ class Proxy {
return;
} finally {
timer.cancel();
clientSocket.removeListener('close', cleanUpConnectionListener);
}
// After composing, switch off this error handler
clientSocket.off('error', handleConnectError);
Expand All @@ -482,22 +506,22 @@ class Proxy {
};

protected connectForward(
nodeId: NodeId,
nodeIds: Array<NodeId>,
proxyHost: Host,
proxyPort: Port,
clientSocket: Socket,
ctx?: Partial<ContextTimed>,
): PromiseCancellable<void>;
): PromiseCancellable<NodeId>;
@timedCancellable(true, (proxy: Proxy) => proxy.connConnectTime)
protected async connectForward(
nodeId: NodeId,
nodeIds: Array<NodeId>,
proxyHost: Host,
proxyPort: Port,
clientSocket: Socket,
@context ctx: ContextTimed,
): Promise<void> {
): Promise<NodeId> {
const conn = await this.establishConnectionForward(
nodeId,
nodeIds,
proxyHost,
proxyPort,
ctx,
Expand All @@ -511,10 +535,11 @@ class Proxy {
remotePort: conn.port,
type: 'forward',
});
return conn.nodeId;
}

protected async establishConnectionForward(
nodeId: NodeId,
nodeIds: Array<NodeId>,
proxyHost: Host,
proxyPort: Port,
ctx: ContextTimed,
Expand All @@ -530,7 +555,7 @@ class Proxy {
return conn;
}
conn = new ConnectionForward({
nodeId,
nodeIds,
connections: this.connectionsForward,
utpSocket: this.utpSocket,
host: proxyHost,
Expand Down
14 changes: 10 additions & 4 deletions src/network/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@ class ErrorConnectionEndTimeout<T> extends ErrorConnection<T> {
exitCode = sysexits.UNAVAILABLE;
}

class ErrorConnectionNodesEmpty<T> extends ErrorConnection<T> {
static description = 'Nodes list to verify against was empty';
exitCode = sysexits.USAGE;
}

/**
* Used by ConnectionForward and ConnectionReverse
*/
Expand Down Expand Up @@ -129,9 +134,9 @@ class ErrorCertChainSignatureInvalid<T> extends ErrorCertChain<T> {
exitCode = sysexits.PROTOCOL;
}

class ErrorHostnameResolutionFailed<T> extends ErrorNetwork<T> {
static description = 'Unable to resolve hostname';
exitCode = sysexits.USAGE;
class ErrorDNSResolver<T> extends ErrorNetwork<T> {
static description = 'DNS resolution failed';
exitCode = sysexits.SOFTWARE;
}

export {
Expand All @@ -148,6 +153,7 @@ export {
ErrorConnectionMessageParse,
ErrorConnectionTimeout,
ErrorConnectionEndTimeout,
ErrorConnectionNodesEmpty,
ErrorConnectionStart,
ErrorConnectionStartTimeout,
ErrorConnectionStartTimeoutMax,
Expand All @@ -161,5 +167,5 @@ export {
ErrorCertChainNameInvalid,
ErrorCertChainKeyInvalid,
ErrorCertChainSignatureInvalid,
ErrorHostnameResolutionFailed,
ErrorDNSResolver,
};
Loading

0 comments on commit 1890801

Please sign in to comment.