Skip to content
This repository has been archived by the owner on Aug 3, 2023. It is now read-only.

Commit

Permalink
Use cloudflare-rs to call tail endpoint, move enter tokio runtime clo…
Browse files Browse the repository at this point in the history
…ser to async logic
  • Loading branch information
nataliescottdavidson committed Jul 21, 2021
1 parent dfc333f commit 1275aa2
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 43 deletions.
10 changes: 1 addition & 9 deletions src/cli/tail.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,5 @@ pub fn tail(cli_params: &Cli) -> Result<()> {
let manifest = Manifest::new(&cli_params.config)?;
let target = manifest.get_target(cli_params.environment.as_deref(), false)?;
let user = GlobalUser::new()?;

let start_tail_future =
async move { commands::tail::start(target.clone(), user.clone()).await };

tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap()
.block_on(start_tail_future)
commands::tail::start(target, user)
}
61 changes: 27 additions & 34 deletions src/commands/tail.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::http;
use crate::settings::global_user::GlobalUser;
use crate::settings::toml::Target;

Expand All @@ -7,8 +8,9 @@ use crate::settings::toml::Target;
/// The API returns the address of the log forwarder.
/// 3. wrangler connects to the log forwarder with a Websocket.
/// 5. Upon receipt, wrangler prints log events to STDOUT.
use anyhow::{anyhow, Result};
use cloudflare::framework::response;
use anyhow::Result;
use cloudflare::endpoints::workers::{CreateTail, CreateTailParams};
use cloudflare::framework::apiclient::ApiClient;
use futures::stream::StreamExt;

#[derive(serde::Deserialize, Debug)]
Expand All @@ -17,11 +19,6 @@ struct TailResult {
expires_at: String,
}

async fn get_tail_tag(resp: reqwest::Response) -> Result<String, reqwest::Error> {
let body: response::ApiSuccess<TailResult> = resp.json().await?;
Ok(body.result.id)
}

// Main loop, listen for websocket messages or interrupt
async fn listen_tail(tail_tag: String) -> Result<(), ()> {
// ws listener setup
Expand All @@ -30,7 +27,7 @@ async fn listen_tail(tail_tag: String) -> Result<(), ()> {
tokio_tungstenite::connect_async(url::Url::parse(&listen_tail_endpoint).unwrap())
.await
.expect("Can't connect");
eprintln!(
log::debug!(
"Connected to the log forwarding server at {}",
listen_tail_endpoint
);
Expand All @@ -50,34 +47,30 @@ async fn listen_tail(tail_tag: String) -> Result<(), ()> {
}
}

pub async fn start(target: Target, user: GlobalUser) -> anyhow::Result<()> {
// Tell API to start tail. For now, do client logic here. TODO add endpoint to cloudflare-rs
let start_tail_endpoint = format!(
"https://api.cloudflare.com/client/v4/accounts/{}/workers/scripts/{}/tails",
target.account_id, target.name
);
let client = reqwest::Client::new().post(start_tail_endpoint);
let req_builder: reqwest::RequestBuilder = match user {
GlobalUser::GlobalKeyAuth { email, api_key } => {
let mut headers = reqwest::header::HeaderMap::new();
let email_hdr = reqwest::header::HeaderName::from_static("x-auth-email");
let api_key_hdr = reqwest::header::HeaderName::from_static("x-auth-key");
headers.insert(email_hdr, email.parse().unwrap());
headers.insert(api_key_hdr, api_key.parse().unwrap());
client.headers(headers)
}
GlobalUser::TokenAuth { api_token } => client.bearer_auth(api_token),
};
let res = req_builder.send().await;
pub fn start(target: Target, user: GlobalUser) -> Result<()> {
// Tell API to start tail
let client = http::cf_v4_client(&user)?;

let res = client.request(&CreateTail {
account_identifier: &target.account_id,
script_name: &target.name,
params: CreateTailParams { url: None },
});

match res {
Ok(resp) => match get_tail_tag(resp).await {
Err(e) => Err(anyhow!("Getting body failed: {:?}", e)),
Ok(tag) => match listen_tail(tag).await {
Err(e) => Err(anyhow!("Websocket err: {:?}", e)),
_ => Ok(()),
},
},
Ok(resp) => {
let start_tail_future = async move { listen_tail(resp.result.id).await };

match tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap()
.block_on(start_tail_future)
{
Ok(_) => Ok(()),
Err(e) => anyhow::bail!("Websocket listening failed with err {:?}", e),
}
}
Err(e) => anyhow::bail!("POST tail failed with err {:?}", e),
}
}

0 comments on commit 1275aa2

Please sign in to comment.