Skip to content

Commit

Permalink
Use MQTT as well as HTTP host to determine possible tenant url
Browse files Browse the repository at this point in the history
Signed-off-by: James Rhodes <[email protected]>
  • Loading branch information
jarhodes314 committed Jul 5, 2024
1 parent 9f1e2df commit 75084ac
Show file tree
Hide file tree
Showing 9 changed files with 93 additions and 30 deletions.
64 changes: 53 additions & 11 deletions crates/core/c8y_api/src/http_proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,17 @@ pub enum C8yEndPointError {
#[derive(Debug, Clone)]
pub struct C8yEndPoint {
c8y_host: String,
c8y_mqtt_host: String,
pub device_id: String,
pub token: Option<String>,
devices_internal_id: HashMap<String, String>,
}

impl C8yEndPoint {
pub fn new(c8y_host: &str, device_id: &str) -> C8yEndPoint {
pub fn new(c8y_host: &str, c8y_mqtt_host: &str, device_id: &str) -> C8yEndPoint {
C8yEndPoint {
c8y_host: c8y_host.into(),
c8y_mqtt_host: c8y_mqtt_host.into(),
device_id: device_id.into(),
token: None,
devices_internal_id: HashMap::new(),
Expand Down Expand Up @@ -101,20 +103,29 @@ impl C8yEndPoint {
// * <tenant_id>.<domain> eg: t12345.c8y.io
// These URLs may be both equivalent and point to the same tenant.
// We are going to remove that and only check if the domain is the same.
let (tenant_host, _port) = self
let (tenant_http_host, _port) = self
.c8y_host
.split_once(':')
.unwrap_or((&self.c8y_host, ""));
let (tenant_mqtt_host, _port) = self
.c8y_mqtt_host
.split_once(':')
.unwrap_or((&self.c8y_mqtt_host, ""));
let url = Url::parse(url).ok()?;
let url_host = url.domain()?;

let (_, host) = url_host.split_once('.').unwrap_or((url_host, ""));
let (_, c8y_host) = tenant_host.split_once('.').unwrap_or((tenant_host, ""));
let (_, c8y_http_host) = tenant_http_host
.split_once('.')
.unwrap_or((tenant_http_host, ""));
let (_, c8y_mqtt_host) = tenant_mqtt_host
.split_once('.')
.unwrap_or((tenant_mqtt_host, ""));

// The configured `c8y.http` setting may have a port value specified,
// but the incoming URL is less likely to have any port specified.
// Hence just matching the host prefix.
(host == c8y_host).then_some(url)
(host == c8y_http_host || host == c8y_mqtt_host).then_some(url)
}
}

Expand Down Expand Up @@ -208,7 +219,7 @@ mod tests {

#[test]
fn get_url_for_get_id_returns_correct_address() {
let c8y = C8yEndPoint::new("test_host", "test_device");
let c8y = C8yEndPoint::new("test_host", "test_host", "test_device");
let res = c8y.get_url_for_internal_id("test_device");

assert_eq!(
Expand All @@ -219,7 +230,7 @@ mod tests {

#[test]
fn get_url_for_sw_list_returns_correct_address() {
let mut c8y = C8yEndPoint::new("test_host", "test_device");
let mut c8y = C8yEndPoint::new("test_host", "test_host", "test_device");
c8y.devices_internal_id
.insert("test_device".to_string(), "12345".to_string());
let internal_id = c8y.get_internal_id("test_device".to_string()).unwrap();
Expand All @@ -237,8 +248,39 @@ mod tests {
#[test_case("https://t1124124.test.com/path")]
#[test_case("https://t1124124.test.com/path/to/file.test")]
#[test_case("https://t1124124.test.com/path/to/file")]
#[test_case("https://t1124124.mqtt-url.com/path/to/file")]
fn url_is_my_tenant_correct_urls(url: &str) {
let c8y = C8yEndPoint::new("test.test.com", "test_device");
let c8y = C8yEndPoint::new("test.test.com", "test.mqtt-url.com", "test_device");
assert_eq!(c8y.maybe_tenant_url(url), Some(url.parse().unwrap()));
}

#[test_case("http://aaa.test.com")]
#[test_case("https://aaa.test.com")]
#[test_case("ftp://aaa.test.com")]
#[test_case("mqtt://aaa.test.com")]
#[test_case("https://t1124124.test.com")]
#[test_case("https://t1124124.test.com:12345")]
#[test_case("https://t1124124.test.com/path")]
#[test_case("https://t1124124.test.com/path/to/file.test")]
#[test_case("https://t1124124.test.com/path/to/file")]
#[test_case("https://t1124124.mqtt-url.com/path/to/file")]
fn url_is_my_tenant_correct_urls_with_http_port(url: &str) {
let c8y = C8yEndPoint::new("test.test.com:443", "test.mqtt-url.com", "test_device");
assert_eq!(c8y.maybe_tenant_url(url), Some(url.parse().unwrap()));
}

#[test_case("http://aaa.test.com")]
#[test_case("https://aaa.test.com")]
#[test_case("ftp://aaa.test.com")]
#[test_case("mqtt://aaa.test.com")]
#[test_case("https://t1124124.test.com")]
#[test_case("https://t1124124.test.com:12345")]
#[test_case("https://t1124124.test.com/path")]
#[test_case("https://t1124124.test.com/path/to/file.test")]
#[test_case("https://t1124124.test.com/path/to/file")]
#[test_case("https://t1124124.mqtt-url.com/path/to/file")]
fn url_is_my_tenant_correct_urls_with_mqtt_port(url: &str) {
let c8y = C8yEndPoint::new("test.test.com", "test.mqtt-url.com:8883", "test_device");
assert_eq!(c8y.maybe_tenant_url(url), Some(url.parse().unwrap()));
}

Expand All @@ -250,28 +292,28 @@ mod tests {
#[test_case("http://localhost")]
#[test_case("http://abc.com")]
fn url_is_my_tenant_incorrect_urls(url: &str) {
let c8y = C8yEndPoint::new("test.test.com", "test_device");
let c8y = C8yEndPoint::new("test.test.com", "test.mqtt-url.com", "test_device");
assert!(c8y.maybe_tenant_url(url).is_none());
}

#[test]
fn url_is_my_tenant_with_hostname_without_commas() {
let c8y = C8yEndPoint::new("custom-domain", "test_device");
let c8y = C8yEndPoint::new("custom-domain", "non-custom-mqtt-domain", "test_device");
let url = "http://custom-domain/path";
assert_eq!(c8y.maybe_tenant_url(url), Some(url.parse().unwrap()));
}

#[ignore = "Until #2804 is fixed"]
#[test]
fn url_is_my_tenant_check_not_too_broad() {
let c8y = C8yEndPoint::new("abc.com", "test_device");
let c8y = C8yEndPoint::new("abc.com", "abc.com", "test_device");
dbg!(c8y.maybe_tenant_url("http://xyz.com"));
assert!(c8y.maybe_tenant_url("http://xyz.com").is_none());
}

#[test]
fn check_non_cached_internal_id_for_a_device() {
let mut c8y = C8yEndPoint::new("test_host", "test_device");
let mut c8y = C8yEndPoint::new("test_host", "test_host", "test_device");
c8y.devices_internal_id
.insert("test_device".to_string(), "12345".to_string());
let end_pt_err = c8y.get_internal_id("test_child".into()).unwrap_err();
Expand Down
5 changes: 4 additions & 1 deletion crates/extensions/c8y_firmware_manager/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ impl FirmwareManagerConfig {
data_dir: DataDir,
timeout_sec: Duration,
c8y_url: String,
c8y_mqtt: String,
c8y_prefix: TopicPrefix,
) -> Self {
let local_http_host = format!("{}:{}", local_http_host, local_http_port).into();
Expand All @@ -46,7 +47,7 @@ impl FirmwareManagerConfig {
let firmware_update_response_topics =
TopicFilter::new_unchecked(FIRMWARE_UPDATE_RESPONSE_TOPICS);

let c8y_end_point = C8yEndPoint::new(&c8y_url, &tedge_device_id);
let c8y_end_point = C8yEndPoint::new(&c8y_url, &c8y_mqtt, &tedge_device_id);

Self {
tedge_device_id,
Expand All @@ -72,6 +73,7 @@ impl FirmwareManagerConfig {
let timeout_sec = tedge_config.firmware.child.update.timeout.duration();

let c8y_url = tedge_config.c8y.http.or_config_not_set()?.to_string();
let c8y_mqtt = tedge_config.c8y.mqtt.or_config_not_set()?.to_string();
let c8y_prefix = tedge_config.c8y.bridge.topic_prefix.clone();

Ok(Self::new(
Expand All @@ -82,6 +84,7 @@ impl FirmwareManagerConfig {
data_dir,
timeout_sec,
c8y_url,
c8y_mqtt,
c8y_prefix,
))
}
Expand Down
1 change: 1 addition & 0 deletions crates/extensions/c8y_firmware_manager/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -643,6 +643,7 @@ async fn spawn_firmware_manager(
tmp_dir.utf8_path_buf().into(),
timeout_sec,
C8Y_HOST.into(),
C8Y_HOST.into(),
"c8y".try_into().unwrap(),
);

Expand Down
6 changes: 5 additions & 1 deletion crates/extensions/c8y_http_proxy/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,11 @@ impl Actor for C8YHttpProxyActor {

impl C8YHttpProxyActor {
pub fn new(config: C8YHttpConfig, message_box: C8YHttpProxyMessageBox) -> Self {
let end_point = C8yEndPoint::new(&config.c8y_host, &config.device_id);
let end_point = C8yEndPoint::new(
&config.c8y_http_host,
&config.c8y_mqtt_host,
&config.device_id,
);
C8YHttpProxyActor {
config,
end_point,
Expand Down
9 changes: 6 additions & 3 deletions crates/extensions/c8y_http_proxy/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ mod tests;
/// Configuration of C8Y REST API
#[derive(Default, Clone)]
pub struct C8YHttpConfig {
pub c8y_host: String,
pub c8y_http_host: String,
pub c8y_mqtt_host: String,
pub device_id: String,
pub tmp_dir: PathBuf,
identity: Option<Identity>,
Expand All @@ -45,14 +46,16 @@ impl TryFrom<&TEdgeConfig> for C8YHttpConfig {
type Error = C8yHttpConfigBuildError;

fn try_from(tedge_config: &TEdgeConfig) -> Result<Self, Self::Error> {
let c8y_host = tedge_config.c8y.http.or_config_not_set()?.to_string();
let c8y_http_host = tedge_config.c8y.http.or_config_not_set()?.to_string();
let c8y_mqtt_host = tedge_config.c8y.mqtt.or_config_not_set()?.to_string();
let device_id = tedge_config.device.id.try_read(tedge_config)?.to_string();
let tmp_dir = tedge_config.tmp.path.as_std_path().to_path_buf();
let identity = tedge_config.http.client.auth.identity()?;
let retry_interval = Duration::from_secs(5);

Ok(Self {
c8y_host,
c8y_http_host,
c8y_mqtt_host,
device_id,
tmp_dir,
identity,
Expand Down
9 changes: 6 additions & 3 deletions crates/extensions/c8y_http_proxy/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,8 @@ async fn retry_internal_id_on_expired_jwt_with_mock() {
let mut http_actor = HttpActor::new().builder();

let config = C8YHttpConfig {
c8y_host: target_url.clone(),
c8y_http_host: target_url.clone(),
c8y_mqtt_host: target_url.clone(),
device_id: external_id.into(),
tmp_dir: tmp_dir.into(),
identity: None,
Expand Down Expand Up @@ -418,7 +419,8 @@ async fn retry_create_event_on_expired_jwt_with_mock() {
let mut http_actor = HttpActor::new().builder();

let config = C8YHttpConfig {
c8y_host: target_url.clone(),
c8y_http_host: target_url.clone(),
c8y_mqtt_host: target_url.clone(),
device_id: external_id.into(),
tmp_dir: tmp_dir.into(),
identity: None,
Expand Down Expand Up @@ -661,7 +663,8 @@ async fn spawn_c8y_http_proxy(
let mut http = FakeServerBox::builder();

let config = C8YHttpConfig {
c8y_host,
c8y_http_host: c8y_host.clone(),
c8y_mqtt_host: c8y_host,
device_id,
tmp_dir,
identity: None,
Expand Down
4 changes: 4 additions & 0 deletions crates/extensions/c8y_mapper_ext/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ pub struct C8yMapperConfig {
pub device_type: String,
pub service: TEdgeConfigReaderService,
pub c8y_host: String,
pub c8y_mqtt: String,
pub tedge_http_host: Arc<str>,
pub topics: TopicFilter,
pub capabilities: Capabilities,
Expand Down Expand Up @@ -77,6 +78,7 @@ impl C8yMapperConfig {
device_type: String,
service: TEdgeConfigReaderService,
c8y_host: String,
c8y_mqtt: String,
tedge_http_host: Arc<str>,
topics: TopicFilter,
capabilities: Capabilities,
Expand Down Expand Up @@ -113,6 +115,7 @@ impl C8yMapperConfig {
device_type,
service,
c8y_host,
c8y_mqtt,
tedge_http_host,
topics,
capabilities,
Expand Down Expand Up @@ -232,6 +235,7 @@ impl C8yMapperConfig {
device_topic_id,
device_type,
service,
c8y_host.clone(),
c8y_host,
tedge_http_host,
topics,
Expand Down
22 changes: 12 additions & 10 deletions crates/extensions/c8y_mapper_ext/src/converter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,8 @@ impl CumulocityConverter {
config.service.ty.clone()
};

let c8y_host = config.c8y_host.clone();
let c8y_host = &config.c8y_host;
let c8y_mqtt = &config.c8y_mqtt;

let size_threshold = SizeThreshold(MQTT_MESSAGE_SIZE_THRESHOLD);

Expand All @@ -272,7 +273,7 @@ impl CumulocityConverter {
let log_dir = config.logs_path.join(TEDGE_AGENT_LOG_DIR);
let operation_logs = OperationLogs::try_new(log_dir)?;

let c8y_endpoint = C8yEndPoint::new(&c8y_host, &device_id);
let c8y_endpoint = C8yEndPoint::new(c8y_host, c8y_mqtt, &device_id);

let mqtt_schema = config.mqtt_schema.clone();

Expand Down Expand Up @@ -2001,37 +2002,37 @@ pub(crate) mod tests {
}

#[test_case(
"m/env",
"m/env",
json!({ "temp": 1})
;"measurement"
)]
#[test_case(
"e/click",
"e/click",
json!({ "text": "Someone clicked" })
;"event"
)]
#[test_case(
"a/temp",
"a/temp",
json!({ "text": "Temperature too high" })
;"alarm"
)]
#[test_case(
"twin/custom",
"twin/custom",
json!({ "foo": "bar" })
;"twin"
)]
#[test_case(
"status/health",
"status/health",
json!({ "status": "up" })
;"health status"
)]
#[test_case(
"cmd/restart",
"cmd/restart",
json!({ })
;"command metadata"
)]
#[test_case(
"cmd/restart/123",
"cmd/restart/123",
json!({ "status": "init" })
;"command"
)]
Expand Down Expand Up @@ -3408,7 +3409,7 @@ pub(crate) mod tests {
let device_topic_id = EntityTopicId::default_main_device();
let device_type = "test-device-type".into();
let tedge_config = TEdgeConfig::load_toml_str("service.ty = \"service\"");
let c8y_host = "test.c8y.io".into();
let c8y_host = "test.c8y.io".to_owned();
let tedge_http_host = "127.0.0.1".into();
let auth_proxy_addr = "127.0.0.1".into();
let auth_proxy_port = 8001;
Expand All @@ -3428,6 +3429,7 @@ pub(crate) mod tests {
device_topic_id,
device_type,
tedge_config.service.clone(),
c8y_host.clone(),
c8y_host,
tedge_http_host,
topics,
Expand Down
3 changes: 2 additions & 1 deletion crates/extensions/c8y_mapper_ext/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2686,7 +2686,7 @@ pub(crate) fn test_mapper_config(tmp_dir: &TempTedgeDir) -> C8yMapperConfig {
let device_topic_id = EntityTopicId::default_main_device();
let device_type = "test-device-type".into();
let config = TEdgeConfig::load_toml_str("service.ty = \"service\"");
let c8y_host = "test.c8y.io".into();
let c8y_host = "test.c8y.io".to_owned();
let tedge_http_host = "localhost:8888".into();
let mqtt_schema = MqttSchema::default();
let auth_proxy_addr = "127.0.0.1".into();
Expand All @@ -2712,6 +2712,7 @@ pub(crate) fn test_mapper_config(tmp_dir: &TempTedgeDir) -> C8yMapperConfig {
device_topic_id,
device_type,
config.service.clone(),
c8y_host.clone(),
c8y_host,
tedge_http_host,
topics,
Expand Down

0 comments on commit 75084ac

Please sign in to comment.