Skip to content

Commit

Permalink
Expose the Progress interface through the HTTP/JSON API (#1092)
Browse files Browse the repository at this point in the history
Trello:
https://trello.com/c/2RvcZBFR/3564-5-expose-the-progress-api-over-http

This PR exposes the `Progress` API for the Manager and Software services
through the HTTP/JSON interface. It is implemented as a pair of
functions that can be reused in all services that want to implement such
an interface.

## A `/SERVICE/progress` endpoint

The `progress_router` function allows to add a `/SERVICE/progress` route
that exposes the current progress:

```json
{
  "current_step": 4,
  "max_steps": 4,
  "current_title": "Calculating the software proposal",
  "finished": false
}
```

## The events stream

The `progress_stream` builds an events stream that emits a new event
when the `current_step` changes:

```json
{
  "type": "Progress",
  "service": "org.opensuse.Agama.Software1",
  "current_step": 4,
  "max_steps": 4,
  "current_title": "Calculating the software proposal",
  "finished": false
}
```

## Enabling `ServiceStatus` and `Progress` for the Software service

Additionally, the PR enables the `ServiceStatus` (implemented
#1089) and the `Progress` for the
Software service too.
  • Loading branch information
imobachgs authored Mar 15, 2024
2 parents 3a6ce33 + 859617b commit 64893e6
Show file tree
Hide file tree
Showing 9 changed files with 289 additions and 122 deletions.
9 changes: 5 additions & 4 deletions rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 13 additions & 0 deletions rust/agama-lib/src/progress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,19 @@ impl Progress {
finished: finished?,
})
}

pub fn from_cached_proxy(proxy: &crate::proxies::ProgressProxy<'_>) -> Option<Progress> {
let (current_step, current_title) = proxy.cached_current_step().ok()??;
let max_steps = proxy.cached_total_steps().ok()??;
let finished = proxy.cached_finished().ok()??;

Some(Progress {
current_step,
current_title,
max_steps,
finished,
})
}
}

/// Monitorizes and reports the progress of Agama's current operation.
Expand Down
1 change: 1 addition & 0 deletions rust/agama-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ chrono = { version = "0.4.34", default-features = false, features = [
] }
pam = "0.8.0"
serde_with = "3.6.1"
pin-project = "1.1.5"

[[bin]]
name = "agama-dbus-server"
Expand Down
61 changes: 14 additions & 47 deletions rust/agama-server/src/manager/web.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
//! * `manager_service` which returns the Axum service.
//! * `manager_stream` which offers an stream that emits the manager events coming from D-Bus.

use std::pin::Pin;

use agama_lib::{
error::ServiceError,
manager::{InstallationPhase, ManagerClient},
Expand All @@ -25,7 +27,7 @@ use tokio_stream::{Stream, StreamExt};
use crate::{
error::Error,
web::{
common::{service_status_router, service_status_stream},
common::{progress_router, service_status_router},
Event,
},
};
Expand Down Expand Up @@ -63,47 +65,12 @@ pub struct InstallerStatus {

/// Returns a stream that emits manager related events coming from D-Bus.
///
/// It emits the Event::BusyServicesChanged and Event::InstallationPhaseChanged events.
/// It emits the Event::InstallationPhaseChanged event.
///
/// * `connection`: D-Bus connection to listen for events.
pub async fn manager_stream(dbus: zbus::Connection) -> Result<impl Stream<Item = Event>, Error> {
Ok(StreamExt::merge(
StreamExt::merge(
busy_services_changed_stream(dbus.clone()).await?,
installation_phase_changed_stream(dbus.clone()).await?,
),
service_status_stream(
dbus,
"org.opensuse.Agama.Manager1",
"/org/opensuse/Agama/Manager1",
)
.await?,
))
}

pub async fn busy_services_changed_stream(
dbus: zbus::Connection,
) -> Result<impl Stream<Item = Event>, Error> {
let proxy = Manager1Proxy::new(&dbus).await?;
let stream = proxy
.receive_busy_services_changed()
.await
.then(|change| async move {
if let Ok(busy_services) = change.get().await {
Some(Event::BusyServicesChanged {
services: busy_services,
})
} else {
None
}
})
.filter_map(|e| e);
Ok(stream)
}

pub async fn installation_phase_changed_stream(
pub async fn manager_stream(
dbus: zbus::Connection,
) -> Result<impl Stream<Item = Event>, Error> {
) -> Result<Pin<Box<dyn Stream<Item = Event> + Send>>, Error> {
let proxy = Manager1Proxy::new(&dbus).await?;
let stream = proxy
.receive_current_installation_phase_changed()
Expand All @@ -122,25 +89,25 @@ pub async fn installation_phase_changed_stream(
}
})
.filter_map(|e| e);
Ok(stream)
Ok(Box::pin(stream))
}

/// Sets up and returns the axum service for the manager module
pub async fn manager_service(dbus: zbus::Connection) -> Result<Router, ServiceError> {
let status_route = service_status_router(
&dbus,
"org.opensuse.Agama.Manager1",
"/org/opensuse/Agama/Manager1",
)
.await?;
const DBUS_SERVICE: &'static str = "org.opensuse.Agama.Manager1";
const DBUS_PATH: &'static str = "/org/opensuse/Agama/Manager1";

let status_router = service_status_router(&dbus, DBUS_SERVICE, DBUS_PATH).await?;
let progress_router = progress_router(&dbus, DBUS_SERVICE, DBUS_PATH).await?;
let manager = ManagerClient::new(dbus).await?;
let state = ManagerState { manager };
Ok(Router::new()
.route("/probe", post(probe_action))
.route("/install", post(install_action))
.route("/finish", post(finish_action))
.route("/installer", get(installer_status))
.merge(status_route)
.merge(status_router)
.merge(progress_router)
.with_state(state))
}

Expand Down
27 changes: 22 additions & 5 deletions rust/agama-server/src/software/web.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,13 @@
//! * `software_service` which returns the Axum service.
//! * `software_stream` which offers an stream that emits the software events coming from D-Bus.

use crate::{error::Error, web::Event};
use crate::{
error::Error,
web::{
common::{progress_router, service_status_router},
Event,
},
};
use agama_lib::{
error::ServiceError,
product::{Product, ProductClient},
Expand All @@ -23,7 +29,7 @@ use axum::{
};
use serde::{Deserialize, Serialize};
use serde_json::json;
use std::collections::HashMap;
use std::{collections::HashMap, pin::Pin};
use thiserror::Error;
use tokio_stream::{Stream, StreamExt};

Expand Down Expand Up @@ -59,11 +65,14 @@ impl IntoResponse for SoftwareError {
/// It emits the Event::ProductChanged and Event::PatternsChanged events.
///
/// * `connection`: D-Bus connection to listen for events.
pub async fn software_stream(dbus: zbus::Connection) -> Result<impl Stream<Item = Event>, Error> {
Ok(StreamExt::merge(
pub async fn software_stream(
dbus: zbus::Connection,
) -> Result<Pin<Box<dyn Stream<Item = Event> + Send>>, Error> {
let stream = StreamExt::merge(
product_changed_stream(dbus.clone()).await?,
patterns_changed_stream(dbus.clone()).await?,
))
);
Ok(Box::pin(stream))
}

async fn product_changed_stream(
Expand Down Expand Up @@ -122,6 +131,12 @@ fn reason_to_selected_by(

/// Sets up and returns the axum service for the software module.
pub async fn software_service(dbus: zbus::Connection) -> Result<Router, ServiceError> {
const DBUS_SERVICE: &'static str = "org.opensuse.Agama.Software1";
const DBUS_PATH: &'static str = "/org/opensuse/Agama/Software1";

let status_router = service_status_router(&dbus, DBUS_SERVICE, DBUS_PATH).await?;
let progress_router = progress_router(&dbus, DBUS_SERVICE, DBUS_PATH).await?;

let product = ProductClient::new(dbus.clone()).await?;
let software = SoftwareClient::new(dbus).await?;
let state = SoftwareState { product, software };
Expand All @@ -131,6 +146,8 @@ pub async fn software_service(dbus: zbus::Connection) -> Result<Router, ServiceE
.route("/proposal", get(proposal))
.route("/config", put(set_config).get(get_config))
.route("/probe", post(probe))
.merge(status_router)
.merge(progress_router)
.with_state(state);
Ok(router)
}
Expand Down
62 changes: 46 additions & 16 deletions rust/agama-server/src/web.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@
//! * Emit relevant events via websocket.
//! * Serve the code for the web user interface (not implemented yet).

use self::progress::EventsProgressPresenter;
use crate::{
error::Error,
l10n::web::l10n_service,
manager::web::{manager_service, manager_stream},
software::web::{software_service, software_stream},
web::common::{progress_stream, service_status_stream},
};
use axum::Router;

Expand All @@ -19,19 +19,18 @@ mod config;
mod docs;
mod event;
mod http;
mod progress;
mod service;
mod state;
mod ws;

use agama_lib::{connection, error::ServiceError, progress::ProgressMonitor};
use agama_lib::{connection, error::ServiceError};
pub use auth::generate_token;
pub use config::ServiceConfig;
pub use docs::ApiDoc;
pub use event::{Event, EventsReceiver, EventsSender};
pub use service::MainServiceBuilder;
use std::path::Path;
use tokio_stream::StreamExt;
use tokio_stream::{StreamExt, StreamMap};

/// Returns a service that implements the web-based Agama API.
///
Expand Down Expand Up @@ -63,14 +62,7 @@ where
///
/// * `events`: channel to send the events to.
pub async fn run_monitor(events: EventsSender) -> Result<(), ServiceError> {
let presenter = EventsProgressPresenter::new(events.clone());
let connection = connection().await?;
let mut monitor = ProgressMonitor::new(connection.clone()).await?;
tokio::spawn(async move {
if let Err(error) = monitor.run(presenter).await {
eprintln!("Could not monitor the D-Bus server: {}", error);
}
});
tokio::spawn(run_events_monitor(connection, events.clone()));

Ok(())
Expand All @@ -80,14 +72,52 @@ pub async fn run_monitor(events: EventsSender) -> Result<(), ServiceError> {
///
/// * `connection`: D-Bus connection.
/// * `events`: channel to send the events to.
pub async fn run_events_monitor(dbus: zbus::Connection, events: EventsSender) -> Result<(), Error> {
let stream = StreamExt::merge(
manager_stream(dbus.clone()).await?,
software_stream(dbus).await?,
async fn run_events_monitor(dbus: zbus::Connection, events: EventsSender) -> Result<(), Error> {
let mut stream = StreamMap::new();

stream.insert("manager", manager_stream(dbus.clone()).await?);
stream.insert(
"manager-status",
service_status_stream(
dbus.clone(),
"org.opensuse.Agama.Manager1",
"/org/opensuse/Agama/Manager1",
)
.await?,
);
stream.insert(
"manager-progress",
progress_stream(
dbus.clone(),
"org.opensuse.Agama.Manager1",
"/org/opensuse/Agama/Manager1",
)
.await?,
);

stream.insert("software", software_stream(dbus.clone()).await?);
stream.insert(
"software-status",
service_status_stream(
dbus.clone(),
"org.opensuse.Agama.Software1",
"/org/opensuse/Agama/Software1",
)
.await?,
);
stream.insert(
"software-progress",
progress_stream(
dbus.clone(),
"org.opensuse.Agama.Software1",
"/org/opensuse/Agama/Software1",
)
.await?,
);

tokio::pin!(stream);
let e = events.clone();
while let Some(event) = stream.next().await {
while let Some((_, event)) = stream.next().await {
_ = e.send(event);
}
Ok(())
Expand Down
Loading

0 comments on commit 64893e6

Please sign in to comment.