Skip to content

Commit

Permalink
Add a notification endpoint to the json-delta protocol. (#863)
Browse files Browse the repository at this point in the history
The PR adds a new endpoint /json-delta/notify that waits with responding
until new data is available and then returns a JSON object with the session
ID and serial number.

---------

Co-authored-by: Alex Band <[email protected]>
  • Loading branch information
partim and AlexanderBand authored May 31, 2023
1 parent c7beccd commit 4e86a95
Show file tree
Hide file tree
Showing 10 changed files with 270 additions and 234 deletions.
333 changes: 135 additions & 198 deletions Cargo.lock

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ rand = "0.8.1"
reqwest = { version = "0.11.0", default-features = false, features = ["blocking", "rustls-tls" ] }
ring = "0.16.12"
routecore = "0.3.1"
rpki = { version = "0.16.1", features = [ "repository", "rrdp", "rtr", "serde", "slurm" ] }
rpki = { git = "https://github.com/NLnetLabs/rpki-rs.git", features = [ "repository", "rrdp", "rtr", "serde", "slurm" ] }
rustls-pemfile = "1"
serde = { version = "1.0.95", features = [ "derive" ] }
serde_json = "1.0.57"
Expand All @@ -54,9 +54,9 @@ syslog = "6"
[features]
default = [ "socks", "ui"]
arbitrary = [ "dep:arbitrary", "chrono/arbitrary", "rpki/arbitrary" ]
socks = [ "reqwest/socks" ]
rta = []
native-tls = [ "reqwest/native-tls", "tls" ]
rta = []
socks = [ "reqwest/socks" ]
tls = []
ui = [ "routinator-ui" ]

Expand Down
12 changes: 12 additions & 0 deletions doc/manual/source/api-endpoints.rst
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,16 @@ The HTTP service supports GET requests on the following paths:
provided session and serial. If *reset* is *true*, the *withdrawn*
member is not present.

``/json-delta/notify, /json-delta/notify?session=session&serial=serial``
Returns a JSON object with two members *session* and *serial* which
contain the session ID and serial number of the current data set.

If the *session* and *serial* query parameters are provided, and the
session ID and serial number of the current data set are identical to
the provided values, the request will not return until a new data set is
available. This can be used as a means to get notified when the data set
has been updated.

In addition, the ``/log`` endpoint returns :doc:`logging<logging>`
information and the ``/metrics``, ``/status`` and
``/version`` endpoints provide :doc:`monitoring<monitoring>` data.
Expand All @@ -40,3 +50,5 @@ information and the ``/metrics``, ``/status`` and
The ``/json-delta`` path
.. versionchanged:: 0.9.0
The ``/api/v1/status`` path
.. versionadded:: 0.13.0
The ``/json-delta/notify`` path
10 changes: 10 additions & 0 deletions doc/manual/source/manual-page.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1378,6 +1378,16 @@ The service only supports GET requests with the following paths:
provided session and serial. If *reset* is *true*, the *withdrawn*
member is not present.

/json-delta/notify, /json-delta/notify?session=session&serial=serial
Returns a JSON object with two members *session* and *serial* which
contain the session ID and serial number of the current data set.

If the *session* and *serial* query parameters are provided, and the
session ID and serial number of the current data set are identical
to the provided values, the request will not return until a new data
set is available. This can be used as a means to get notified when
the data set has been updated.

In addition, the current set of VRPs is available for each output format at a
path with the same name as the output format. E.g., the CSV output is
available at ``/csv``.
Expand Down
101 changes: 78 additions & 23 deletions src/http/delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,15 @@ use futures::stream;
use hyper::{Body, Method, Request};
use rpki::rtr::Serial;
use rpki::rtr::payload::{Action, PayloadRef};
use rpki::rtr::server::PayloadDiff;
use rpki::rtr::server::{NotifySender, PayloadDiff};
use crate::payload::{
DeltaArcIter, PayloadDelta, PayloadSnapshot, SharedHistory, SnapshotArcIter
};
use crate::utils::fmt::WriteOrPanic;
use crate::utils::json::JsonBuilder;
use super::response::{ContentType, Response, ResponseBuilder};

//------------ handle_get ----------------------------------------------------
//------------ handle_get_or_head --------------------------------------------

pub fn handle_get_or_head(
req: &Request<Body>,
Expand Down Expand Up @@ -57,6 +58,81 @@ pub fn handle_get_or_head(
Some(handle_reset(history.session(), history.serial(), snapshot))
}

fn handle_delta(
session: u64, from_serial: Serial, to_serial: Serial,
delta: Arc<PayloadDelta>
) -> Response {
ResponseBuilder::ok().content_type(ContentType::JSON)
.body(Body::wrap_stream(stream::iter(
DeltaStream::new(session, from_serial, to_serial, delta)
.map(Result::<_, Infallible>::Ok)
)))
}

fn handle_reset(
session: u64, to_serial: Serial, snapshot: Arc<PayloadSnapshot>
) -> Response {
ResponseBuilder::ok().content_type(ContentType::JSON)
.body(Body::wrap_stream(stream::iter(
SnapshotStream::new(session, to_serial, snapshot)
.map(Result::<_, Infallible>::Ok)
)))
}


//------------ handle_notify_get_or_head -------------------------------------

pub async fn handle_notify_get_or_head(
req: &Request<Body>,
history: &SharedHistory,
notify: &NotifySender,
) -> Option<Response> {
if req.uri().path() != "/json-delta/notify" {
return None
}

let wait = match need_wait(req, history) {
Ok(wait) => wait,
Err(resp) => return Some(resp),
};

if wait {
notify.subscribe().recv().await;
}

if *req.method() == Method::HEAD {
Some(
ResponseBuilder::ok().content_type(ContentType::JSON).empty()
)
}
else {
let (session, serial) = history.read().session_and_serial();
Some(
ResponseBuilder::ok().content_type(ContentType::JSON).body(
JsonBuilder::build(|json| {
json.member_raw("session", session);
json.member_raw("serial", serial);
})
)
)
}
}

fn need_wait(
req: &Request<Body>,
history: &SharedHistory,
) -> Result<bool, Response> {
let version = match version_from_query(req.uri().query())? {
Some(version) => version,
None => return Ok(false),
};

Ok(history.read().session_and_serial() == version)
}


//------------ Helpers -------------------------------------------------------

fn version_from_query(
query: Option<&str>
) -> Result<Option<(u64, Serial)>, Response> {
Expand Down Expand Up @@ -95,27 +171,6 @@ fn version_from_query(
}
}

fn handle_delta(
session: u64, from_serial: Serial, to_serial: Serial,
delta: Arc<PayloadDelta>
) -> Response {
ResponseBuilder::ok().content_type(ContentType::JSON)
.body(Body::wrap_stream(stream::iter(
DeltaStream::new(session, from_serial, to_serial, delta)
.map(Result::<_, Infallible>::Ok)
)))
}

fn handle_reset(
session: u64, to_serial: Serial, snapshot: Arc<PayloadSnapshot>
) -> Response {
ResponseBuilder::ok().content_type(ContentType::JSON)
.body(Body::wrap_stream(stream::iter(
SnapshotStream::new(session, to_serial, snapshot)
.map(Result::<_, Infallible>::Ok)
)))
}


//------------ DeltaStream ---------------------------------------------------

Expand Down
13 changes: 11 additions & 2 deletions src/http/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use hyper::Server;
use hyper::server::accept::Accept;
use hyper::service::{make_service_fn, service_fn};
use log::error;
use rpki::rtr::server::NotifySender;
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use tokio::net::TcpListener;
use tokio_rustls::TlsAcceptor;
Expand All @@ -34,6 +35,7 @@ pub fn http_listener(
rtr_metrics: SharedRtrServerMetrics,
log: Option<Arc<LogOutput>>,
config: &Config,
notify: NotifySender,
) -> Result<impl Future<Output = ()>, ExitError> {
let metrics = Arc::new(HttpServerMetrics::default());

Expand All @@ -51,7 +53,7 @@ pub fn http_listener(
);
}
}
Ok(_http_listener(origins, metrics, rtr_metrics, log, listeners))
Ok(_http_listener(origins, metrics, rtr_metrics, log, notify, listeners))
}

fn create_tls_config(
Expand Down Expand Up @@ -79,6 +81,7 @@ async fn _http_listener(
metrics: Arc<HttpServerMetrics>,
rtr_metrics: SharedRtrServerMetrics,
log: Option<Arc<LogOutput>>,
notify: NotifySender,
listeners: Vec<(SocketAddr, Option<Arc<tls::ServerConfig>>, StdListener)>,
) {
// If there are no listeners, just never return.
Expand All @@ -93,6 +96,7 @@ async fn _http_listener(
addr, tls_config, listener,
origins.clone(), metrics.clone(),
rtr_metrics.clone(), log.clone(),
notify.clone(),
))
})
).await;
Expand All @@ -104,6 +108,7 @@ async fn _http_listener(
/// listener, in which case it will print an error and resolve the error case.
/// It will listen bind a Hyper server onto `addr` and produce any data
/// served from `origins`.
#[allow(clippy::too_many_arguments)]
async fn single_http_listener(
addr: SocketAddr,
tls_config: Option<Arc<tls::ServerConfig>>,
Expand All @@ -112,22 +117,26 @@ async fn single_http_listener(
metrics: Arc<HttpServerMetrics>,
rtr_metrics: SharedRtrServerMetrics,
log: Option<Arc<LogOutput>>,
notify: NotifySender,
) {
let make_service = make_service_fn(|_conn| {
let origins = origins.clone();
let metrics = metrics.clone();
let rtr_metrics = rtr_metrics.clone();
let log = log.clone();
let notify = notify.clone();
async move {
Ok::<_, Infallible>(service_fn(move |req| {
let origins = origins.clone();
let metrics = metrics.clone();
let rtr_metrics = rtr_metrics.clone();
let log = log.clone();
let notify = notify.clone();
async move {
Ok::<_, Infallible>(handle_request(
req, &origins, &metrics, &rtr_metrics,
log.as_ref().map(|x| x.as_ref())
log.as_ref().map(|x| x.as_ref()),
&notify,
).await.into_hyper())
}
}))
Expand Down
7 changes: 7 additions & 0 deletions src/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ mod validity;
//------------ handle_request ------------------------------------------------

use hyper::{Body, Method, Request};
use rpki::rtr::server::NotifySender;
use crate::metrics::{HttpServerMetrics, SharedRtrServerMetrics};
use crate::payload::SharedHistory;
use crate::process::LogOutput;
Expand All @@ -36,6 +37,7 @@ async fn handle_request(
metrics: &HttpServerMetrics,
rtr_metrics: &SharedRtrServerMetrics,
log: Option<&LogOutput>,
notify: &NotifySender,
) -> Response {
metrics.inc_requests();
if *req.method() != Method::GET && *req.method() != Method::HEAD {
Expand All @@ -45,6 +47,11 @@ async fn handle_request(
if let Some(response) = payload::handle_get_or_head(&req, origins) {
return response
}
if let Some(response) = delta::handle_notify_get_or_head(
&req, origins, notify,
).await {
return response
}
if let Some(response) = delta::handle_get_or_head(&req, origins) {
return response
}
Expand Down
8 changes: 5 additions & 3 deletions src/operation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,12 +228,14 @@ impl Server {
);

let history = SharedHistory::from_config(process.config());
let (mut notify, rtr) = rtr_listener(
let mut notify = NotifySender::new();
let rtr = rtr_listener(
history.clone(), rtr_metrics.clone(), process.config(),
process.get_listen_fd()?
notify.clone(), process.get_listen_fd()?
)?;
let http = http_listener(
history.clone(), rtr_metrics, log.clone(), process.config()
history.clone(), rtr_metrics, log.clone(), process.config(),
notify.clone(),
)?;

process.drop_privileges()?;
Expand Down
5 changes: 5 additions & 0 deletions src/payload/history.rs
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,11 @@ impl PayloadHistory {
self.session
}

/// Returns the session and serial number of the current data set.
pub fn session_and_serial(&self) -> (u64, Serial) {
(self.session(), self.serial())
}

/// Returns the RTR version of the session ID.
///
/// This is the last 16 bits of the full session ID.
Expand Down
9 changes: 4 additions & 5 deletions src/rtr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,9 @@ pub fn rtr_listener(
history: SharedHistory,
metrics: SharedRtrServerMetrics,
config: &Config,
sender: NotifySender,
extra_listener: Option<StdListener>,
) -> Result<(NotifySender, impl Future<Output = ()>), ExitError> {
let sender = NotifySender::new();

) -> Result<impl Future<Output = ()>, ExitError> {
// Binding needs to have happened before dropping privileges
// during detach. So we do this here synchronously.
let mut listeners = Vec::new();
Expand All @@ -54,9 +53,9 @@ pub fn rtr_listener(
));
}
}
Ok((sender.clone(), _rtr_listener(
Ok(_rtr_listener(
history, metrics, sender, listeners, config.rtr_tcp_keepalive,
)))
))
}

fn create_tls_config(
Expand Down

0 comments on commit 4e86a95

Please sign in to comment.