Skip to content

Commit

Permalink
logger: store span names (#1166)
Browse files Browse the repository at this point in the history
* logger: account for span name information

* logger/tests: updated tests to account for spans

* logger: get logs result should be ordered by timestamp

* logger: fix tests

* address O nit feedback
  • Loading branch information
iulianbarbu committed Aug 22, 2023
1 parent 4c83051 commit f04245f
Show file tree
Hide file tree
Showing 2 changed files with 113 additions and 49 deletions.
77 changes: 45 additions & 32 deletions logger/src/dal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,7 @@ use std::{path::Path, str::FromStr, time::SystemTime};
use async_broadcast::{broadcast, Sender};
use async_trait::async_trait;
use chrono::NaiveDateTime;
use opentelemetry_proto::tonic::{
common::v1::{any_value, KeyValue},
trace::v1::{ResourceSpans, ScopeSpans, Span},
};
use opentelemetry_proto::tonic::trace::v1::{ResourceSpans, ScopeSpans, Span};
use prost_types::Timestamp;
use serde_json::Value;
use shuttle_common::{
Expand Down Expand Up @@ -116,10 +113,11 @@ impl Sqlite {
#[async_trait]
impl Dal for Sqlite {
async fn get_logs(&self, deployment_id: String) -> Result<Vec<Log>, DalError> {
let result = sqlx::query_as("SELECT * FROM logs WHERE deployment_id = ?")
.bind(deployment_id)
.fetch_all(&self.pool)
.await?;
let result =
sqlx::query_as("SELECT * FROM logs WHERE deployment_id = ? ORDER BY timestamp")
.bind(deployment_id)
.fetch_all(&self.pool)
.await?;

Ok(result)
}
Expand Down Expand Up @@ -179,14 +177,16 @@ impl Log {
schema_url: _,
} = resource_spans;

// TODO: we should get both of these attributes in the same function and avoid this clone.
let resource = resource?;
let shuttle_service_name = get_attribute(resource.clone().attributes, "service.name")?;

// Try to get the deployment_id from the resource attributes, this will be the case for the runtimes,
// they add the deployment_id to the otlp tracer config.
let fields = from_any_value_kv_to_serde_json_map(resource?.attributes);
let shuttle_service_name = fields.get("service.name")?.as_str()?.to_string();
// TODO: should this be named "deployment.id" to conform to otlp standard?
let deployment_id = get_attribute(resource.attributes, "deployment_id");
let deployment_id = fields
.get("deployment_id")
.map(|v| {
v.as_str()
.expect("expected to have a string value for deployment_id key")
})
.map(|inner| inner.to_string());

let logs = scope_spans
.into_iter()
Expand Down Expand Up @@ -220,9 +220,12 @@ impl Log {
deployment_id: Option<String>,
) -> Option<Vec<Self>> {
// If we didn't find the id in the resource span, check the inner spans.
let deployment_id = deployment_id.or(get_attribute(span.attributes, "deployment_id"))?;

let logs = span
let mut span_fields = from_any_value_kv_to_serde_json_map(span.attributes);
let deployment_id = deployment_id.or(span_fields
.get("deployment_id")?
.as_str()
.map(|inner| inner.to_string()))?;
let mut logs: Vec<Self> = span
.events
.into_iter()
.flat_map(|event| {
Expand All @@ -246,14 +249,37 @@ impl Log {

Some(Log {
shuttle_service_name: shuttle_service_name.to_string(),
deployment_id: deployment_id.to_string(),
deployment_id: deployment_id.clone(),
timestamp: DateTime::from_utc(naive, Utc),
level: level.as_str()?.parse().ok()?,
fields: Value::Object(fields),
})
})
.collect();

span_fields.insert(
MESSAGE_KEY.to_string(),
format!("[span] {}", span.name).into(),
);

logs.push(Log {
shuttle_service_name: shuttle_service_name.to_string(),
deployment_id,
timestamp: DateTime::from_utc(
NaiveDateTime::from_timestamp_opt(
(span.start_time_unix_nano / 1_000_000_000)
.try_into()
.unwrap_or_default(),
(span.start_time_unix_nano % 1_000_000_000) as u32,
)
.unwrap_or_default(),
Utc,
),
// Span level doesn't exist for opentelemetry spans, so this info is not relevant.
level: LogLevel::Info,
fields: Value::Object(span_fields),
});

Some(logs)
}
}
Expand All @@ -268,16 +294,3 @@ impl From<Log> for LogItem {
}
}
}

/// Get an attribute with the given key
fn get_attribute(attributes: Vec<KeyValue>, key: &str) -> Option<String> {
match attributes
.into_iter()
.find(|kv| kv.key == key)?
.value?
.value?
{
any_value::Value::StringValue(s) => Some(s),
_ => None,
}
}
85 changes: 68 additions & 17 deletions logger/tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use pretty_assertions::assert_eq;
use serde_json::{json, Value};
use shuttle_common::{
claims::Scope,
tracing::{FILEPATH_KEY, LINENO_KEY, NAMESPACE_KEY, TARGET_KEY},
tracing::{FILEPATH_KEY, LINENO_KEY, MESSAGE_KEY, NAMESPACE_KEY, TARGET_KEY},
};
use shuttle_common_tests::JwtScopesLayer;
use shuttle_logger::{Service, ShuttleLogsOtlp, Sqlite};
Expand Down Expand Up @@ -63,10 +63,11 @@ async fn generate_and_get_runtime_logs() {
.unwrap()
.into_inner();

let quoted_deployment_id = format!("\"{DEPLOYMENT_ID}\"");
let expected = vec![
MinLogItem {
level: LogLevel::Trace,
fields: json!({"message": "foo"}),
level: LogLevel::Info,
fields: json!({"message": "[span] deploy", "deployment_id": quoted_deployment_id }),
},
MinLogItem {
level: LogLevel::Error,
Expand All @@ -88,6 +89,14 @@ async fn generate_and_get_runtime_logs() {
level: LogLevel::Trace,
fields: json!({"message": "trace"}),
},
MinLogItem {
level: LogLevel::Info,
fields: json!({"message": "[span] span_name1", "deployment_id": quoted_deployment_id }),
},
MinLogItem {
level: LogLevel::Trace,
fields: json!({"message": "inside span 1 event"}),
},
];

assert_eq!(
Expand Down Expand Up @@ -149,7 +158,7 @@ async fn generate_and_get_service_logs() {

// Start a subscriber and generate some logs using an instrumented deploy function.
generate_service_logs(port, DEPLOYMENT_ID.into(), deploy_instrumented);
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
tokio::time::sleep(std::time::Duration::from_millis(1000)).await;

let dst = format!("http://localhost:{port}");

Expand All @@ -165,6 +174,10 @@ async fn generate_and_get_service_logs() {
.into_inner();

let expected = vec![
MinLogItem {
level: LogLevel::Info,
fields: json!({"message": "[span] deploy_instrumented", "deployment_id": DEPLOYMENT_ID.to_string() }),
},
MinLogItem {
level: LogLevel::Error,
fields: json!({"message": "error"}),
Expand Down Expand Up @@ -243,7 +256,7 @@ async fn generate_and_stream_logs() {
tokio::time::sleep(std::time::Duration::from_millis(300)).await;

// Start a subscriber and generate some logs.
generate_runtime_logs(port, DEPLOYMENT_ID.into(), foo);
generate_runtime_logs(port, DEPLOYMENT_ID.into(), span_name1);

// Connect to the logger server so we can fetch logs.
let dst = format!("http://localhost:{port}");
Expand All @@ -264,28 +277,54 @@ async fn generate_and_stream_logs() {
.unwrap()
.unwrap();

let quoted_deployment_id = format!("\"{DEPLOYMENT_ID}\"");
assert_eq!(
MinLogItem::from(log),
MinLogItem {
level: LogLevel::Info,
fields: json!({"message": "[span] span_name1", "deployment_id": quoted_deployment_id}),
},
);

let log = timeout(std::time::Duration::from_millis(500), response.message())
.await
.unwrap()
.unwrap()
.unwrap();
assert_eq!(
MinLogItem::from(log),
MinLogItem {
level: LogLevel::Trace,
fields: json!({"message": "foo"}),
fields: json!({"message": "inside span 1 event"}),
},
);

// Start a subscriber and generate some more logs.
generate_runtime_logs(port, DEPLOYMENT_ID.into(), bar);
generate_runtime_logs(port, DEPLOYMENT_ID.into(), span_name2);

let log = timeout(std::time::Duration::from_millis(500), response.message())
.await
.unwrap()
.unwrap()
.unwrap();

assert_eq!(
MinLogItem::from(log),
MinLogItem {
level: LogLevel::Trace,
fields: json!({"message": "bar"}),
fields: json!({"message": "inside span 2 event"}),
},
);

let log = timeout(std::time::Duration::from_millis(500), response.message())
.await
.unwrap()
.unwrap()
.unwrap();
assert_eq!(
MinLogItem::from(log),
MinLogItem {
level: LogLevel::Info,
fields: json!({"message": "[span] span_name2", "deployment_id": quoted_deployment_id}),
},
);
}
Expand Down Expand Up @@ -356,7 +395,7 @@ fn deploy(deployment_id: String) {
debug!("debug");
trace!("trace");
// This tests that we handle nested spans.
foo(deployment_id);
span_name1(deployment_id);
}

#[instrument(fields(%deployment_id))]
Expand All @@ -369,13 +408,13 @@ fn deploy_instrumented(deployment_id: String) {
}

#[instrument]
fn foo(deployment_id: String) {
trace!("foo");
fn span_name1(deployment_id: String) {
trace!("inside span 1 event");
}

#[instrument]
fn bar(deployment_id: String) {
trace!("bar");
fn span_name2(deployment_id: String) {
trace!("inside span 2 event");
}

#[derive(Debug, Eq, PartialEq)]
Expand All @@ -394,10 +433,22 @@ impl From<LogItem> for MinLogItem {
let mut fields: Value = serde_json::from_slice(&log.fields).unwrap();

let map = fields.as_object_mut().unwrap();
let target = map.remove(TARGET_KEY).unwrap();
let filepath = map.remove(FILEPATH_KEY).unwrap();

assert_eq!(target, "integration_tests");
let message = map.get(MESSAGE_KEY).unwrap();
// Span logs don't contain a target field
if !message.as_str().unwrap().starts_with("[span] ") {
let target = map.remove(TARGET_KEY).unwrap();
assert_eq!(target, "integration_tests");
} else {
// We want to remove what's not of interest for checking
// the spans are containing the right information.
let _ = map.remove("busy_ns").unwrap();
let _ = map.remove("idle_ns").unwrap();
let _ = map.remove("thread.id").unwrap();
let _ = map.remove("thread.name").unwrap();
}

let filepath = map.remove(FILEPATH_KEY).unwrap();
assert_eq!(filepath, "logger/tests/integration_tests.rs");

map.remove(LINENO_KEY).unwrap();
Expand Down

0 comments on commit f04245f

Please sign in to comment.