Skip to content

Commit

Permalink
fix: relay node fills in the connection details of the requesting node
Browse files Browse the repository at this point in the history
Part 1 of #489.

Related #489
  • Loading branch information
tegefaulkes committed Oct 25, 2022
1 parent d0ffca0 commit 5d92f6e
Show file tree
Hide file tree
Showing 8 changed files with 37 additions and 89 deletions.
24 changes: 18 additions & 6 deletions src/agent/service/nodesHolePunchMessageSend.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@ import type NodeManager from '../../nodes/NodeManager';
import type NodeConnectionManager from '../../nodes/NodeConnectionManager';
import type KeyManager from '../../keys/KeyManager';
import type { NodeId } from '../../ids/types';
import type * as nodesPB from '../../proto/js/polykey/v1/nodes/nodes_pb';
import type Logger from '@matrixai/logger';
import type { ConnectionInfoGet } from 'agent/types';
import type * as nodesPB from '../../proto/js/polykey/v1/nodes/nodes_pb';
import * as networkUtils from '../../network/utils';
import * as grpcUtils from '../../grpc/utils';
import { validateSync } from '../../validation';
Expand All @@ -19,12 +20,14 @@ function nodesHolePunchMessageSend({
nodeManager,
nodeConnectionManager,
db,
connectionInfoGet,
logger,
}: {
keyManager: KeyManager;
nodeManager: NodeManager;
nodeConnectionManager: NodeConnectionManager;
db: DB;
connectionInfoGet: ConnectionInfoGet;
logger: Logger;
}) {
return async (
Expand Down Expand Up @@ -55,19 +58,28 @@ function nodesHolePunchMessageSend({
sourceId: call.request.getSrcId(),
},
);
const connectionInfo = connectionInfoGet(call);
// Firstly, check if this node is the desired node
// If so, then we want to make this node start sending hole punching packets
// back to the source node.
await db.withTransactionF(async (tran) => {
if (keyManager.getNodeId().equals(targetId)) {
const [host, port] = networkUtils.parseAddress(
call.request.getProxyAddress(),
);
await nodeConnectionManager.holePunchReverse(host, port);
if (call.request.getProxyAddress() !== '') {
const [host, port] = networkUtils.parseAddress(
call.request.getProxyAddress(),
);
await nodeConnectionManager.holePunchReverse(host, port);
}
// Otherwise, find if node in table
// If so, ask the nodeManager to relay to the node
} else if (await nodeManager.knowsNode(sourceId, tran)) {
await nodeConnectionManager.relayHolePunchMessage(call.request);
// Const newHolePunchMessage = new nodesPB.Relay();
const proxyAddress = networkUtils.buildAddress(
connectionInfo!.remoteHost,
connectionInfo!.remotePort,
);
call.request.setProxyAddress(proxyAddress);
await nodeConnectionManager.relaySignallingMessage(call.request);
}
});
callback(null, response);
Expand Down
19 changes: 5 additions & 14 deletions src/nodes/NodeConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -100,15 +100,7 @@ class NodeConnection<T extends GRPCClient> {
port: proxy.getForwardPort(),
authToken: proxy.authToken,
};
// 1. Get the proxy port of the fwdProxy (used for hole punching)
const proxyAddress = networkUtils.buildAddress(
proxy.getProxyHost(),
proxy.getProxyPort(),
);
const signature = await keyManager.signWithRootKeyPair(
Buffer.from(proxyAddress),
);
// 2. Ask fwdProxy for connection to target (the revProxy of other node)
// 1. Ask fwdProxy for connection to target (the revProxy of other node)
// 2. Start sending hole-punching packets to the target (via the client start -
// this establishes a HTTP CONNECT request with the forward proxy)
// 3. Relay the proxy port to the broker/s (such that they can inform the other node)
Expand All @@ -132,13 +124,12 @@ class NodeConnection<T extends GRPCClient> {
if (!isSeedNode) {
// FIXME: this needs to be cancellable.
// It needs to timeout as well as abort for cleanup
void Array.from(seedNodes, (nodeId) => {
return nodeConnectionManager.sendHolePunchMessage(
nodeId,
void Array.from(seedNodes, (seedNodeId) => {
return nodeConnectionManager.sendSignallingMessage(
seedNodeId,
keyManager.getNodeId(),
targetNodeId,
proxyAddress,
signature,
undefined,
ctx,
);
});
Expand Down
35 changes: 9 additions & 26 deletions src/nodes/NodeConnectionManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -679,16 +679,13 @@ class NodeConnectionManager {
* @param sourceNodeId node ID of the current node (i.e. the sender)
* @param targetNodeId node ID of the target node to hole punch
* @param proxyAddress string of address in the form `proxyHost:proxyPort`
* @param signature signature to verify source node is sender (signature based
* on proxyAddress as message)
* @param ctx
*/
public sendHolePunchMessage(
public sendSignallingMessage(
relayNodeId: NodeId,
sourceNodeId: NodeId,
targetNodeId: NodeId,
proxyAddress: string,
signature: Buffer,
proxyAddress?: string,
ctx?: Partial<ContextTimed>,
): PromiseCancellable<void>;
@ready(new nodesErrors.ErrorNodeConnectionManagerNotRunning())
Expand All @@ -697,19 +694,17 @@ class NodeConnectionManager {
(nodeConnectionManager: NodeConnectionManager) =>
nodeConnectionManager.connConnectTime,
)
public async sendHolePunchMessage(
public async sendSignallingMessage(
relayNodeId: NodeId,
sourceNodeId: NodeId,
targetNodeId: NodeId,
proxyAddress: string,
signature: Buffer,
proxyAddress: string | undefined,
@context ctx: ContextTimed,
): Promise<void> {
const relayMsg = new nodesPB.Relay();
relayMsg.setSrcId(nodesUtils.encodeNodeId(sourceNodeId));
relayMsg.setTargetId(nodesUtils.encodeNodeId(targetNodeId));
relayMsg.setProxyAddress(proxyAddress);
relayMsg.setSignature(signature.toString());
if (proxyAddress != null) relayMsg.setProxyAddress(proxyAddress);
await this.withConnF(
relayNodeId,
async (connection) => {
Expand All @@ -729,7 +724,7 @@ class NodeConnectionManager {
* nodeConnection.start())
* @param ctx
*/
public relayHolePunchMessage(
public relaySignallingMessage(
message: nodesPB.Relay,
ctx?: Partial<ContextTimed>,
): PromiseCancellable<void>;
Expand All @@ -739,7 +734,7 @@ class NodeConnectionManager {
(nodeConnectionManager: NodeConnectionManager) =>
nodeConnectionManager.connConnectTime,
)
public async relayHolePunchMessage(
public async relaySignallingMessage(
message: nodesPB.Relay,
@context ctx: ContextTimed,
): Promise<void> {
Expand All @@ -755,12 +750,11 @@ class NodeConnectionManager {
knownAddress.port,
);
}
await this.sendHolePunchMessage(
await this.sendSignallingMessage(
validationUtils.parseNodeId(message.getTargetId()),
sourceNode,
validationUtils.parseNodeId(message.getTargetId()),
proxyAddress,
Buffer.from(message.getSignature()),
ctx,
);
}
Expand Down Expand Up @@ -805,23 +799,12 @@ class NodeConnectionManager {
@context ctx: ContextTimed,
): Promise<boolean> {
host = await networkUtils.resolveHost(host);
// If we can create a connection then we have punched though the NAT,
// authenticated and confirmed the nodeId matches
const proxyAddress = networkUtils.buildAddress(
this.proxy.getProxyHost(),
this.proxy.getProxyPort(),
);
const signature = await this.keyManager.signWithRootKeyPair(
Buffer.from(proxyAddress),
);
// FIXME: this needs to handle aborting
void Array.from(this.getSeedNodes(), (seedNodeId) => {
return this.sendHolePunchMessage(
return this.sendSignallingMessage(
seedNodeId,
this.keyManager.getNodeId(),
nodeId,
proxyAddress,
signature,
);
});
try {
Expand Down
3 changes: 0 additions & 3 deletions src/proto/js/polykey/v1/nodes/nodes_pb.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -188,8 +188,6 @@ export class Relay extends jspb.Message {
setTargetId(value: string): Relay;
getProxyAddress(): string;
setProxyAddress(value: string): Relay;
getSignature(): string;
setSignature(value: string): Relay;

serializeBinary(): Uint8Array;
toObject(includeInstance?: boolean): Relay.AsObject;
Expand All @@ -206,7 +204,6 @@ export namespace Relay {
srcId: string,
targetId: string,
proxyAddress: string,
signature: string,
}
}

Expand Down
32 changes: 1 addition & 31 deletions src/proto/js/polykey/v1/nodes/nodes_pb.js
Original file line number Diff line number Diff line change
Expand Up @@ -1626,8 +1626,7 @@ proto.polykey.v1.nodes.Relay.toObject = function(includeInstance, msg) {
var f, obj = {
srcId: jspb.Message.getFieldWithDefault(msg, 1, ""),
targetId: jspb.Message.getFieldWithDefault(msg, 2, ""),
proxyAddress: jspb.Message.getFieldWithDefault(msg, 3, ""),
signature: jspb.Message.getFieldWithDefault(msg, 4, "")
proxyAddress: jspb.Message.getFieldWithDefault(msg, 3, "")
};

if (includeInstance) {
Expand Down Expand Up @@ -1676,10 +1675,6 @@ proto.polykey.v1.nodes.Relay.deserializeBinaryFromReader = function(msg, reader)
var value = /** @type {string} */ (reader.readString());
msg.setProxyAddress(value);
break;
case 4:
var value = /** @type {string} */ (reader.readString());
msg.setSignature(value);
break;
default:
reader.skipField();
break;
Expand Down Expand Up @@ -1730,13 +1725,6 @@ proto.polykey.v1.nodes.Relay.serializeBinaryToWriter = function(message, writer)
f
);
}
f = message.getSignature();
if (f.length > 0) {
writer.writeString(
4,
f
);
}
};


Expand Down Expand Up @@ -1794,24 +1782,6 @@ proto.polykey.v1.nodes.Relay.prototype.setProxyAddress = function(value) {
};


/**
* optional string signature = 4;
* @return {string}
*/
proto.polykey.v1.nodes.Relay.prototype.getSignature = function() {
return /** @type {string} */ (jspb.Message.getFieldWithDefault(this, 4, ""));
};


/**
* @param {string} value
* @return {!proto.polykey.v1.nodes.Relay} returns this
*/
proto.polykey.v1.nodes.Relay.prototype.setSignature = function(value) {
return jspb.Message.setProto3StringField(this, 4, value);
};





Expand Down
1 change: 0 additions & 1 deletion src/proto/schemas/polykey/v1/nodes/nodes.proto
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ message Relay {
string src_id = 1;
string target_id = 2;
string proxy_address = 3;
string signature = 4;
}

message NodeTable {
Expand Down
6 changes: 2 additions & 4 deletions tests/agent/service/nodesHolePunchMessage.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import type { Host, Port } from '@/network/types';
import type { ConnectionInfo } from '@/network/types';
import fs from 'fs';
import path from 'path';
import os from 'os';
Expand Down Expand Up @@ -46,6 +47,7 @@ describe('nodesHolePunchMessage', () => {
nodeConnectionManager: pkAgent.nodeConnectionManager,
nodeManager: pkAgent.nodeManager,
db: pkAgent.db,
connectionInfoGet: () => ({} as ConnectionInfo),
logger,
}),
};
Expand Down Expand Up @@ -78,14 +80,10 @@ describe('nodesHolePunchMessage', () => {
pkAgent.proxy.getProxyHost(),
pkAgent.proxy.getProxyPort(),
);
const signature = await pkAgent.keyManager.signWithRootKeyPair(
Buffer.from(proxyAddress),
);
const relayMessage = new nodesPB.Relay();
relayMessage
.setTargetId(nodeId)
.setSrcId(nodeId)
.setSignature(signature.toString())
.setProxyAddress(proxyAddress);
await grpcClient.nodesHolePunchMessageSend(relayMessage);
// TODO: check if the ping was sent
Expand Down
6 changes: 2 additions & 4 deletions tests/nodes/NodeConnectionManager.general.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -476,12 +476,11 @@ describe(`${NodeConnectionManager.name} general test`, () => {
// 3. check that the relevant call was made.
const sourceNodeId = testNodesUtils.generateRandomNodeId();
const targetNodeId = testNodesUtils.generateRandomNodeId();
await nodeConnectionManager.sendHolePunchMessage(
await nodeConnectionManager.sendSignallingMessage(
remoteNodeId1,
sourceNodeId,
targetNodeId,
'',
Buffer.alloc(0),
);

expect(mockedNodesHolePunchMessageSend).toHaveBeenCalled();
Expand Down Expand Up @@ -516,9 +515,8 @@ describe(`${NodeConnectionManager.name} general test`, () => {
const relayMessage = new nodesPB.Relay();
relayMessage.setSrcId(nodesUtils.encodeNodeId(sourceNodeId));
relayMessage.setTargetId(nodesUtils.encodeNodeId(remoteNodeId1));
relayMessage.setSignature('');
relayMessage.setProxyAddress('');
await nodeConnectionManager.relayHolePunchMessage(relayMessage);
await nodeConnectionManager.relaySignallingMessage(relayMessage);

expect(mockedNodesHolePunchMessageSend).toHaveBeenCalled();
} finally {
Expand Down

0 comments on commit 5d92f6e

Please sign in to comment.