Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: use MQTT as well as HTTP host to determine possible tenant url #2977

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 53 additions & 11 deletions crates/core/c8y_api/src/http_proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,17 @@ pub enum C8yEndPointError {
#[derive(Debug, Clone)]
pub struct C8yEndPoint {
c8y_host: String,
c8y_mqtt_host: String,
pub device_id: String,
pub token: Option<String>,
devices_internal_id: HashMap<String, String>,
}

impl C8yEndPoint {
pub fn new(c8y_host: &str, device_id: &str) -> C8yEndPoint {
pub fn new(c8y_host: &str, c8y_mqtt_host: &str, device_id: &str) -> C8yEndPoint {
C8yEndPoint {
c8y_host: c8y_host.into(),
c8y_mqtt_host: c8y_mqtt_host.into(),
device_id: device_id.into(),
token: None,
devices_internal_id: HashMap::new(),
Expand Down Expand Up @@ -101,20 +103,29 @@ impl C8yEndPoint {
// * <tenant_id>.<domain> eg: t12345.c8y.io
// These URLs may be both equivalent and point to the same tenant.
// We are going to remove that and only check if the domain is the same.
let (tenant_host, _port) = self
let (tenant_http_host, _port) = self
.c8y_host
.split_once(':')
.unwrap_or((&self.c8y_host, ""));
let (tenant_mqtt_host, _port) = self
.c8y_mqtt_host
.split_once(':')
.unwrap_or((&self.c8y_mqtt_host, ""));
let url = Url::parse(url).ok()?;
let url_host = url.domain()?;

let (_, host) = url_host.split_once('.').unwrap_or((url_host, ""));
let (_, c8y_host) = tenant_host.split_once('.').unwrap_or((tenant_host, ""));
let (_, c8y_http_host) = tenant_http_host
.split_once('.')
.unwrap_or((tenant_http_host, ""));
let (_, c8y_mqtt_host) = tenant_mqtt_host
.split_once('.')
.unwrap_or((tenant_mqtt_host, ""));

// The configured `c8y.http` setting may have a port value specified,
// but the incoming URL is less likely to have any port specified.
// Hence just matching the host prefix.
(host == c8y_host).then_some(url)
(host == c8y_http_host || host == c8y_mqtt_host).then_some(url)
}
}

Expand Down Expand Up @@ -208,7 +219,7 @@ mod tests {

#[test]
fn get_url_for_get_id_returns_correct_address() {
let c8y = C8yEndPoint::new("test_host", "test_device");
let c8y = C8yEndPoint::new("test_host", "test_host", "test_device");
let res = c8y.get_url_for_internal_id("test_device");

assert_eq!(
Expand All @@ -219,7 +230,7 @@ mod tests {

#[test]
fn get_url_for_sw_list_returns_correct_address() {
let mut c8y = C8yEndPoint::new("test_host", "test_device");
let mut c8y = C8yEndPoint::new("test_host", "test_host", "test_device");
c8y.devices_internal_id
.insert("test_device".to_string(), "12345".to_string());
let internal_id = c8y.get_internal_id("test_device".to_string()).unwrap();
Expand All @@ -237,8 +248,39 @@ mod tests {
#[test_case("https://t1124124.test.com/path")]
#[test_case("https://t1124124.test.com/path/to/file.test")]
#[test_case("https://t1124124.test.com/path/to/file")]
#[test_case("https://t1124124.mqtt-url.com/path/to/file")]
fn url_is_my_tenant_correct_urls(url: &str) {
let c8y = C8yEndPoint::new("test.test.com", "test_device");
let c8y = C8yEndPoint::new("test.test.com", "test.mqtt-url.com", "test_device");
assert_eq!(c8y.maybe_tenant_url(url), Some(url.parse().unwrap()));
}

#[test_case("http://aaa.test.com")]
#[test_case("https://aaa.test.com")]
#[test_case("ftp://aaa.test.com")]
#[test_case("mqtt://aaa.test.com")]
#[test_case("https://t1124124.test.com")]
#[test_case("https://t1124124.test.com:12345")]
#[test_case("https://t1124124.test.com/path")]
#[test_case("https://t1124124.test.com/path/to/file.test")]
#[test_case("https://t1124124.test.com/path/to/file")]
#[test_case("https://t1124124.mqtt-url.com/path/to/file")]
fn url_is_my_tenant_correct_urls_with_http_port(url: &str) {
let c8y = C8yEndPoint::new("test.test.com:443", "test.mqtt-url.com", "test_device");
assert_eq!(c8y.maybe_tenant_url(url), Some(url.parse().unwrap()));
}

#[test_case("http://aaa.test.com")]
#[test_case("https://aaa.test.com")]
#[test_case("ftp://aaa.test.com")]
#[test_case("mqtt://aaa.test.com")]
#[test_case("https://t1124124.test.com")]
#[test_case("https://t1124124.test.com:12345")]
#[test_case("https://t1124124.test.com/path")]
#[test_case("https://t1124124.test.com/path/to/file.test")]
#[test_case("https://t1124124.test.com/path/to/file")]
#[test_case("https://t1124124.mqtt-url.com/path/to/file")]
fn url_is_my_tenant_correct_urls_with_mqtt_port(url: &str) {
let c8y = C8yEndPoint::new("test.test.com", "test.mqtt-url.com:8883", "test_device");
assert_eq!(c8y.maybe_tenant_url(url), Some(url.parse().unwrap()));
}

Expand All @@ -250,28 +292,28 @@ mod tests {
#[test_case("http://localhost")]
#[test_case("http://abc.com")]
fn url_is_my_tenant_incorrect_urls(url: &str) {
let c8y = C8yEndPoint::new("test.test.com", "test_device");
let c8y = C8yEndPoint::new("test.test.com", "test.mqtt-url.com", "test_device");
assert!(c8y.maybe_tenant_url(url).is_none());
}

#[test]
fn url_is_my_tenant_with_hostname_without_commas() {
let c8y = C8yEndPoint::new("custom-domain", "test_device");
let c8y = C8yEndPoint::new("custom-domain", "non-custom-mqtt-domain", "test_device");
let url = "http://custom-domain/path";
assert_eq!(c8y.maybe_tenant_url(url), Some(url.parse().unwrap()));
}

#[ignore = "Until #2804 is fixed"]
#[test]
fn url_is_my_tenant_check_not_too_broad() {
let c8y = C8yEndPoint::new("abc.com", "test_device");
let c8y = C8yEndPoint::new("abc.com", "abc.com", "test_device");
dbg!(c8y.maybe_tenant_url("http://xyz.com"));
assert!(c8y.maybe_tenant_url("http://xyz.com").is_none());
}

#[test]
fn check_non_cached_internal_id_for_a_device() {
let mut c8y = C8yEndPoint::new("test_host", "test_device");
let mut c8y = C8yEndPoint::new("test_host", "test_host", "test_device");
c8y.devices_internal_id
.insert("test_device".to_string(), "12345".to_string());
let end_pt_err = c8y.get_internal_id("test_child".into()).unwrap_err();
Expand Down
5 changes: 4 additions & 1 deletion crates/extensions/c8y_firmware_manager/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ impl FirmwareManagerConfig {
data_dir: DataDir,
timeout_sec: Duration,
c8y_url: String,
c8y_mqtt: String,
c8y_prefix: TopicPrefix,
) -> Self {
let local_http_host = format!("{}:{}", local_http_host, local_http_port).into();
Expand All @@ -46,7 +47,7 @@ impl FirmwareManagerConfig {
let firmware_update_response_topics =
TopicFilter::new_unchecked(FIRMWARE_UPDATE_RESPONSE_TOPICS);

let c8y_end_point = C8yEndPoint::new(&c8y_url, &tedge_device_id);
let c8y_end_point = C8yEndPoint::new(&c8y_url, &c8y_mqtt, &tedge_device_id);

Self {
tedge_device_id,
Expand All @@ -72,6 +73,7 @@ impl FirmwareManagerConfig {
let timeout_sec = tedge_config.firmware.child.update.timeout.duration();

let c8y_url = tedge_config.c8y.http.or_config_not_set()?.to_string();
let c8y_mqtt = tedge_config.c8y.mqtt.or_config_not_set()?.to_string();
let c8y_prefix = tedge_config.c8y.bridge.topic_prefix.clone();

Ok(Self::new(
Expand All @@ -82,6 +84,7 @@ impl FirmwareManagerConfig {
data_dir,
timeout_sec,
c8y_url,
c8y_mqtt,
c8y_prefix,
))
}
Expand Down
1 change: 1 addition & 0 deletions crates/extensions/c8y_firmware_manager/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -643,6 +643,7 @@ async fn spawn_firmware_manager(
tmp_dir.utf8_path_buf().into(),
timeout_sec,
C8Y_HOST.into(),
C8Y_HOST.into(),
"c8y".try_into().unwrap(),
);

Expand Down
6 changes: 5 additions & 1 deletion crates/extensions/c8y_http_proxy/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,11 @@ impl Actor for C8YHttpProxyActor {

impl C8YHttpProxyActor {
pub fn new(config: C8YHttpConfig, message_box: C8YHttpProxyMessageBox) -> Self {
let end_point = C8yEndPoint::new(&config.c8y_host, &config.device_id);
let end_point = C8yEndPoint::new(
&config.c8y_http_host,
&config.c8y_mqtt_host,
&config.device_id,
);
C8YHttpProxyActor {
config,
end_point,
Expand Down
9 changes: 6 additions & 3 deletions crates/extensions/c8y_http_proxy/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ mod tests;
/// Configuration of C8Y REST API
#[derive(Default, Clone)]
pub struct C8YHttpConfig {
pub c8y_host: String,
pub c8y_http_host: String,
pub c8y_mqtt_host: String,
pub device_id: String,
pub tmp_dir: PathBuf,
identity: Option<Identity>,
Expand All @@ -45,14 +46,16 @@ impl TryFrom<&TEdgeConfig> for C8YHttpConfig {
type Error = C8yHttpConfigBuildError;

fn try_from(tedge_config: &TEdgeConfig) -> Result<Self, Self::Error> {
let c8y_host = tedge_config.c8y.http.or_config_not_set()?.to_string();
let c8y_http_host = tedge_config.c8y.http.or_config_not_set()?.to_string();
let c8y_mqtt_host = tedge_config.c8y.mqtt.or_config_not_set()?.to_string();
let device_id = tedge_config.device.id.try_read(tedge_config)?.to_string();
let tmp_dir = tedge_config.tmp.path.as_std_path().to_path_buf();
let identity = tedge_config.http.client.auth.identity()?;
let retry_interval = Duration::from_secs(5);

Ok(Self {
c8y_host,
c8y_http_host,
c8y_mqtt_host,
device_id,
tmp_dir,
identity,
Expand Down
9 changes: 6 additions & 3 deletions crates/extensions/c8y_http_proxy/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,8 @@ async fn retry_internal_id_on_expired_jwt_with_mock() {
let mut http_actor = HttpActor::new().builder();

let config = C8YHttpConfig {
c8y_host: target_url.clone(),
c8y_http_host: target_url.clone(),
c8y_mqtt_host: target_url.clone(),
device_id: external_id.into(),
tmp_dir: tmp_dir.into(),
identity: None,
Expand Down Expand Up @@ -418,7 +419,8 @@ async fn retry_create_event_on_expired_jwt_with_mock() {
let mut http_actor = HttpActor::new().builder();

let config = C8YHttpConfig {
c8y_host: target_url.clone(),
c8y_http_host: target_url.clone(),
c8y_mqtt_host: target_url.clone(),
device_id: external_id.into(),
tmp_dir: tmp_dir.into(),
identity: None,
Expand Down Expand Up @@ -661,7 +663,8 @@ async fn spawn_c8y_http_proxy(
let mut http = FakeServerBox::builder();

let config = C8YHttpConfig {
c8y_host,
c8y_http_host: c8y_host.clone(),
c8y_mqtt_host: c8y_host,
device_id,
tmp_dir,
identity: None,
Expand Down
4 changes: 4 additions & 0 deletions crates/extensions/c8y_mapper_ext/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ pub struct C8yMapperConfig {
pub device_type: String,
pub service: TEdgeConfigReaderService,
pub c8y_host: String,
pub c8y_mqtt: String,
pub tedge_http_host: Arc<str>,
pub topics: TopicFilter,
pub capabilities: Capabilities,
Expand Down Expand Up @@ -77,6 +78,7 @@ impl C8yMapperConfig {
device_type: String,
service: TEdgeConfigReaderService,
c8y_host: String,
c8y_mqtt: String,
tedge_http_host: Arc<str>,
topics: TopicFilter,
capabilities: Capabilities,
Expand Down Expand Up @@ -113,6 +115,7 @@ impl C8yMapperConfig {
device_type,
service,
c8y_host,
c8y_mqtt,
tedge_http_host,
topics,
capabilities,
Expand Down Expand Up @@ -232,6 +235,7 @@ impl C8yMapperConfig {
device_topic_id,
device_type,
service,
c8y_host.clone(),
c8y_host,
tedge_http_host,
topics,
Expand Down
22 changes: 12 additions & 10 deletions crates/extensions/c8y_mapper_ext/src/converter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,8 @@ impl CumulocityConverter {
config.service.ty.clone()
};

let c8y_host = config.c8y_host.clone();
let c8y_host = &config.c8y_host;
let c8y_mqtt = &config.c8y_mqtt;

let size_threshold = SizeThreshold(MQTT_MESSAGE_SIZE_THRESHOLD);

Expand All @@ -220,7 +221,7 @@ impl CumulocityConverter {
let log_dir = config.logs_path.join(TEDGE_AGENT_LOG_DIR);
let operation_logs = OperationLogs::try_new(log_dir)?;

let c8y_endpoint = C8yEndPoint::new(&c8y_host, &device_id);
let c8y_endpoint = C8yEndPoint::new(c8y_host, c8y_mqtt, &device_id);

let mqtt_schema = config.mqtt_schema.clone();

Expand Down Expand Up @@ -1643,37 +1644,37 @@ pub(crate) mod tests {
}

#[test_case(
"m/env",
"m/env",
json!({ "temp": 1})
;"measurement"
)]
#[test_case(
"e/click",
"e/click",
json!({ "text": "Someone clicked" })
;"event"
)]
#[test_case(
"a/temp",
"a/temp",
json!({ "text": "Temperature too high" })
;"alarm"
)]
#[test_case(
"twin/custom",
"twin/custom",
json!({ "foo": "bar" })
;"twin"
)]
#[test_case(
"status/health",
"status/health",
json!({ "status": "up" })
;"health status"
)]
#[test_case(
"cmd/restart",
"cmd/restart",
json!({ })
;"command metadata"
)]
#[test_case(
"cmd/restart/123",
"cmd/restart/123",
json!({ "status": "init" })
;"command"
)]
Expand Down Expand Up @@ -3050,7 +3051,7 @@ pub(crate) mod tests {
let device_topic_id = EntityTopicId::default_main_device();
let device_type = "test-device-type".into();
let tedge_config = TEdgeConfig::load_toml_str("service.ty = \"service\"");
let c8y_host = "test.c8y.io".into();
let c8y_host = "test.c8y.io".to_owned();
let tedge_http_host = "127.0.0.1".into();
let auth_proxy_addr = "127.0.0.1".into();
let auth_proxy_port = 8001;
Expand All @@ -3070,6 +3071,7 @@ pub(crate) mod tests {
device_topic_id,
device_type,
tedge_config.service.clone(),
c8y_host.clone(),
c8y_host,
tedge_http_host,
topics,
Expand Down
1 change: 1 addition & 0 deletions crates/extensions/c8y_mapper_ext/src/operations/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ impl OperationHandler {

c8y_endpoint: C8yEndPoint::new(
&c8y_mapper_config.c8y_host,
&c8y_mapper_config.c8y_mqtt,
&c8y_mapper_config.device_id,
),
http_proxy: http_proxy.clone(),
Expand Down
3 changes: 2 additions & 1 deletion crates/extensions/c8y_mapper_ext/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2761,7 +2761,7 @@ pub(crate) fn test_mapper_config(tmp_dir: &TempTedgeDir) -> C8yMapperConfig {
let device_topic_id = EntityTopicId::default_main_device();
let device_type = "test-device-type".into();
let config = TEdgeConfig::load_toml_str("service.ty = \"service\"");
let c8y_host = "test.c8y.io".into();
let c8y_host = "test.c8y.io".to_owned();
let tedge_http_host = "localhost:8888".into();
let mqtt_schema = MqttSchema::default();
let auth_proxy_addr = "127.0.0.1".into();
Expand All @@ -2787,6 +2787,7 @@ pub(crate) fn test_mapper_config(tmp_dir: &TempTedgeDir) -> C8yMapperConfig {
device_topic_id,
device_type,
config.service.clone(),
c8y_host.clone(),
c8y_host,
tedge_http_host,
topics,
Expand Down