Skip to content

Commit

Permalink
feat: support compression for downupstream
Browse files Browse the repository at this point in the history
  • Loading branch information
vicanso committed Apr 9, 2024
1 parent 3c16387 commit 8998c40
Show file tree
Hide file tree
Showing 12 changed files with 171 additions and 76 deletions.
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ bench:
dev:
RUST_LOG=INFO cargo watch -w src -x 'run -- -c=~/github/pingap/conf/pingap.toml'

devtest:
RUST_LOG=INFO cargo watch -w src -x 'run -- -c=~/tmp/pingap.toml'


udeps:
cargo +nightly udeps

Expand Down
2 changes: 2 additions & 0 deletions TODO.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,5 @@
- [x] sentry uri config
- [x] charset for static file
- [x] web hook for wecom, dingtalk robot
- [ ] verify_cert option for http peer
- [ ] compression: zstd, br, gzip
4 changes: 4 additions & 0 deletions src/config/load.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ pub struct UpstreamConf {
#[serde(default)]
#[serde(with = "humantime_serde")]
pub write_timeout: Option<Duration>,
pub verify_cert: Option<bool>,
pub remark: Option<String>,
}
impl UpstreamConf {
Expand Down Expand Up @@ -118,6 +119,9 @@ pub struct LocationConf {
pub headers: Option<Vec<String>>,
pub rewrite: Option<String>,
pub weight: Option<u16>,
pub gzip_level: Option<u32>,
pub br_level: Option<u32>,
pub zstd_level: Option<u32>,
pub remark: Option<String>,
}

Expand Down
9 changes: 6 additions & 3 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,11 @@ fn run() -> Result<(), Box<dyn Error>> {

builder
.format(move |buf, record| {
if !webhook_url.is_empty() && record.level() == Level::Warn {
let msg = format!("{}", record.args());
if !webhook_url.is_empty()
&& record.level() == Level::Warn
&& msg.contains("becomes unhealthy")
{
webhook::send(webhook::WebhookSendParams {
url: webhook_url.clone(),
category: webhook_type.clone(),
Expand All @@ -124,10 +128,9 @@ fn run() -> Result<(), Box<dyn Error>> {

writeln!(
buf,
"{} {} {}",
"{} {} {msg}",
record.level(),
chrono::Local::now().to_rfc3339(),
record.args()
)
})
.try_init()?;
Expand Down
48 changes: 48 additions & 0 deletions src/proxy/location.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
use super::Upstream;
use crate::config::LocationConf;
use crate::http_extra::{convert_headers, HttpHeader};
use http::header::HeaderValue;
use once_cell::sync::Lazy;
use pingora::http::{RequestHeader, ResponseHeader};
use regex::Regex;
use snafu::{ResultExt, Snafu};
Expand All @@ -30,6 +32,10 @@ pub enum Error {
}
type Result<T, E = Error> = std::result::Result<T, E>;

static ZSTD_ENCODING: Lazy<HeaderValue> = Lazy::new(|| "zstd".try_into().unwrap());
static BR_ENCODING: Lazy<HeaderValue> = Lazy::new(|| "br".try_into().unwrap());
static GZIP_ENCODING: Lazy<HeaderValue> = Lazy::new(|| "gzip".try_into().unwrap());

struct RegexPath {
value: Regex,
}
Expand Down Expand Up @@ -80,6 +86,10 @@ pub struct Location {
reg_rewrite: Option<(Regex, String)>,
headers: Option<Vec<HttpHeader>>,
proxy_headers: Option<Vec<HttpHeader>>,
gzip_level: u32,
br_level: u32,
zstd_level: u32,
pub support_compression: bool,
pub upstream: Arc<Upstream>,
}

Expand Down Expand Up @@ -123,6 +133,10 @@ impl Location {
}
}

let gzip_level = conf.gzip_level.unwrap_or_default();
let br_level = conf.br_level.unwrap_or_default();
let zstd_level = conf.zstd_level.unwrap_or_default();
let support_compression = gzip_level + br_level + zstd_level > 0;
let path = conf.path.clone().unwrap_or_default();
Ok(Location {
// name: conf.name.clone(),
Expand All @@ -133,6 +147,10 @@ impl Location {
reg_rewrite,
headers: format_headers(&conf.headers)?,
proxy_headers: format_headers(&conf.proxy_headers)?,
gzip_level,
br_level,
zstd_level,
support_compression,
})
}
/// Returns `true` if the host and path match location.
Expand Down Expand Up @@ -185,6 +203,36 @@ impl Location {
}
}
}
/// Modify accpet encoding for choose compression
#[inline]
pub fn modify_accept_encoding(&self, header: &mut RequestHeader) -> Option<u32> {
if !self.support_compression {
return None;
}
// TODO wait for the feature
// pingora_cor:compression:decide_action
// TODO: support to configure preferred encoding
if let Some(accept_encoding) = header.headers.get(http::header::ACCEPT_ENCODING) {
let accept_encoding = accept_encoding.to_str().unwrap_or_default();
if accept_encoding.is_empty() {
return None;
}
// zstd first
return if self.zstd_level > 0 && accept_encoding.contains("zstd") {
let _ = header.insert_header(http::header::ACCEPT_ENCODING, ZSTD_ENCODING.clone());
Some(self.zstd_level)
} else if self.br_level > 0 && accept_encoding.contains("br") {
let _ = header.insert_header(http::header::ACCEPT_ENCODING, BR_ENCODING.clone());
Some(self.br_level)
} else if self.gzip_level > 0 && accept_encoding.contains("gzip") {
let _ = header.insert_header(http::header::ACCEPT_ENCODING, GZIP_ENCODING.clone());
Some(self.gzip_level)
} else {
None
};
}
None
}
}

#[cfg(test)]
Expand Down
14 changes: 7 additions & 7 deletions src/proxy/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,12 @@ impl ProxyHttp for Server {
// TODO parse error
let _ = new_path.parse::<http::Uri>().map(|uri| header.set_uri(uri));
}
// TODO find a way for compressing for static file
if let Some(level) = lo.modify_accept_encoding(header) {
session.downstream_compression.adjust_decompression(true);
session.downstream_compression.adjust_level(level);
}

ctx.location_index = Some(location_index);
if let Some(dir) = lo.upstream.as_directory() {
let result = dir.handle(session, ctx).await?;
Expand All @@ -418,13 +424,7 @@ impl ProxyHttp for Server {
session: &mut Session,
ctx: &mut State,
) -> pingora::Result<Box<HttpPeer>> {
let location_index = ctx
.location_index
.ok_or_else(|| pingora::Error::new_str(LOCATION_NOT_FOUND))?;
let lo = self
.locations
.get(location_index)
.ok_or_else(|| pingora::Error::new_str(LOCATION_NOT_FOUND))?;
let lo = &self.locations[ctx.location_index.unwrap_or_default()];

let peer = lo
.upstream
Expand Down
5 changes: 5 additions & 0 deletions src/proxy/upstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ pub struct Upstream {
read_timeout: Option<Duration>,
idle_timeout: Option<Duration>,
write_timeout: Option<Duration>,
verify_cert: Option<bool>,
}

#[derive(Debug)]
Expand Down Expand Up @@ -302,6 +303,7 @@ impl Upstream {
read_timeout: conf.read_timeout,
idle_timeout: conf.idle_timeout,
write_timeout: conf.write_timeout,
verify_cert: conf.verify_cert,
})
}
/// Returns a new http peer, if there is no healthy backend, it will return `None`.
Expand Down Expand Up @@ -333,6 +335,9 @@ impl Upstream {
p.options.read_timeout = self.read_timeout;
p.options.idle_timeout = self.idle_timeout;
p.options.write_timeout = self.write_timeout;
if let Some(verify_cert) = self.verify_cert {
p.options.verify_cert = verify_cert;
}
p
})
}
Expand Down
5 changes: 1 addition & 4 deletions src/webhook/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,6 @@ pub struct WebhookSendParams {
}

pub fn send(params: WebhookSendParams) {
if !params.msg.contains("becomes unhealthy") {
return;
}
std::thread::spawn(move || {
if let Ok(rt) = tokio::runtime::Runtime::new() {
let category = "backend_unhealthy";
Expand All @@ -35,7 +32,7 @@ pub fn send(params: WebhookSendParams) {
let mut data = serde_json::Map::new();
let hostname = state::get_hostname().clone();
let content = format!(
r###"Pingap Error
r###"Pingap Warnning
>hostname: {hostname}
>category: {category}
>message: {}
Expand Down
122 changes: 61 additions & 61 deletions web/src/components/form-editor.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,45 @@ function getStyles(name: string, selectItems: string[], theme: Theme) {
};
}

function FormSelectField({
options,
label,
value,
onUpdate,
}: {
options?: string[];
label: string;
value: string;
onUpdate: (data: string) => void;
}) {
const [newValue, setNewValue] = React.useState(value);

const opts = options || [];

return (
<React.Fragment>
<InputLabel id={`{item.id}-label`}>{label}</InputLabel>
<Select
labelId={`{item.id}-label`}
id={label}
value={newValue}
onChange={(e) => {
const { value } = e.target;
setNewValue(value);
onUpdate(value);
}}
input={<OutlinedInput label={label} />}
>
{opts.map((name) => (
<MenuItem key={name} value={name}>
{name}
</MenuItem>
))}
</Select>
</React.Fragment>
);
}

function FormTwoInputFields({
id,
divide,
Expand All @@ -83,7 +122,11 @@ function FormTwoInputFields({
addLabel: string;
onUpdate: (data: string[]) => void;
}) {
const [newValues, setNewValues] = React.useState(values || []);
const arr = values || [];
if (arr.length === 0) {
arr.push("");
}
const [newValues, setNewValues] = React.useState(arr);
const divideToTwoValues = (value: string) => {
let arr = value.split(divide);
if (arr.length < 2) {
Expand All @@ -108,7 +151,14 @@ function FormTwoInputFields({
}
cloneValues[index] = arr.join(divide).trim();
setNewValues(cloneValues);
onUpdate(cloneValues);
const updateValues: string[] = [];
cloneValues.forEach((item) => {
const v = item.trim();
if (v) {
updateValues.push(v);
}
});
onUpdate(updateValues);
};

const list = newValues.map((item, index) => {
Expand Down Expand Up @@ -212,8 +262,6 @@ export default function FormEditor({
const theme = useTheme();
const [data, setData] = React.useState(getDefaultValues(items));
const defaultLocations: string[] = [];
let defaultUpstream = "";
let defaultWebhookType = "";
items.forEach((item) => {
switch (item.category) {
case FormItemCategory.LOCATION: {
Expand All @@ -223,20 +271,10 @@ export default function FormEditor({
});
break;
}
case FormItemCategory.UPSTREAM: {
defaultUpstream = item.defaultValue as string;
break;
}
case FormItemCategory.WEBHOOK_TYPE: {
defaultWebhookType = item.defaultValue as string;
break;
}
}
});

const [locations, setLocations] = React.useState<string[]>(defaultLocations);
const [upstream, setUpstream] = React.useState(defaultUpstream);
const [webhookType, setWebhookType] = React.useState(defaultWebhookType);

const [updated, setUpdated] = React.useState(false);
const [processing, setProcessing] = React.useState(false);
Expand Down Expand Up @@ -327,55 +365,17 @@ export default function FormEditor({
);
break;
}
case FormItemCategory.UPSTREAM: {
const options = item.options || [];
formItem = (
<React.Fragment>
<InputLabel id={`{item.id}-label`}>{item.label}</InputLabel>
<Select
labelId={`{item.id}-label`}
id={item.label}
value={upstream}
onChange={(e) => {
const { value } = e.target;
setUpstream(value);
updateValue(item.id, value);
}}
input={<OutlinedInput label={item.label} />}
>
{options.map((name) => (
<MenuItem key={name} value={name}>
{name}
</MenuItem>
))}
</Select>
</React.Fragment>
);
break;
}
case FormItemCategory.UPSTREAM:
case FormItemCategory.WEBHOOK_TYPE: {
const options = item.options || [];
formItem = (
<React.Fragment>
<InputLabel id={`{item.id}-label`}>{item.label}</InputLabel>
<Select
labelId={`{item.id}-label`}
id={item.label}
value={webhookType}
onChange={(e) => {
const { value } = e.target;
setWebhookType(value);
updateValue(item.id, value);
}}
input={<OutlinedInput label={item.label} />}
>
{options.map((name) => (
<MenuItem key={name} value={name}>
{name}
</MenuItem>
))}
</Select>
</React.Fragment>
<FormSelectField
label={item.label}
options={item.options}
value={item.defaultValue as string}
onUpdate={(value) => {
updateValue(item.id, value);
}}
/>
);
break;
}
Expand Down
Loading

0 comments on commit 8998c40

Please sign in to comment.