Skip to content

Commit

Permalink
feat: support cache for http response
Browse files Browse the repository at this point in the history
  • Loading branch information
vicanso committed Apr 20, 2024
1 parent 05f615f commit 21c0f5c
Show file tree
Hide file tree
Showing 8 changed files with 189 additions and 7 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ path-absolutize = "3.1.1"
pingora = { version = "0.1.1", default-features = false, features = [
"lb",
"openssl",
"cache",
] }
pingora-limits = "0.1.1"
regex = "1.10.4"
Expand Down
1 change: 1 addition & 0 deletions src/config/load.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ pub enum ProxyPluginCategory {
IpLimit,
KeyAuth,
BasicAuth,
Cache,
}

#[derive(PartialEq, Debug, Default, Deserialize_repr, Clone, Copy, Serialize_repr)]
Expand Down
4 changes: 3 additions & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,9 @@ fn run() -> Result<(), Box<dyn Error>> {
..Default::default()
});
}
let _ = plugin::init_proxy_plugins(proxy_plugin_confs);
if let Err(e) = plugin::init_proxy_plugins(proxy_plugin_confs) {
error!("init proxy plugins fail, {e}");
}
for server_conf in server_conf_list {
let ps = Server::new(server_conf)?;
let services = ps.run(&my_server.configuration)?;
Expand Down
125 changes: 125 additions & 0 deletions src/plugin/cache.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
// Copyright 2024 Tree xie.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use super::ProxyPlugin;
use super::{Error, Result};
use crate::config::ProxyPluginCategory;
use crate::config::ProxyPluginStep;
use crate::state::State;
use async_trait::async_trait;
use bytesize::ByteSize;
use http::Method;
use log::debug;
use once_cell::sync::Lazy;
use pingora::cache::eviction::simple_lru::Manager;
use pingora::cache::eviction::EvictionManager;
use pingora::cache::lock::CacheLock;
use pingora::cache::predictor::Predictor;
use pingora::cache::{MemCache, Storage};
use pingora::proxy::Session;
use std::str::FromStr;
use url::Url;

static MEM_BACKEND: Lazy<MemCache> = Lazy::new(MemCache::new);
static PREDICTOR: Lazy<Predictor<32>> = Lazy::new(|| Predictor::new(5, None));
static EVICTION_MANAGER: Lazy<Manager> = Lazy::new(|| Manager::new(8192));
static CACHE_LOCK_ONE_SECOND: Lazy<CacheLock> =
Lazy::new(|| CacheLock::new(std::time::Duration::from_secs(1)));
static CACHE_LOCK_TWO_SECONDS: Lazy<CacheLock> =
Lazy::new(|| CacheLock::new(std::time::Duration::from_secs(2)));
static CACHE_LOCK_THREE_SECONDS: Lazy<CacheLock> =
Lazy::new(|| CacheLock::new(std::time::Duration::from_secs(3)));

pub struct Cache {
proxy_step: ProxyPluginStep,
eviction: bool,
lock: u8,
storage: &'static (dyn Storage + Sync),
max_file_size: usize,
}

impl Cache {
pub fn new(value: &str, proxy_step: ProxyPluginStep) -> Result<Self> {
debug!("new cache storage proxy plugin, {value}, {proxy_step:?}");
let url_info = Url::parse(value).map_err(|e| Error::Invalid {
message: e.to_string(),
})?;
let mut lock = 0;
let mut eviction = false;
let mut max_file_size = 30 * 1024;
for (key, value) in url_info.query_pairs().into_iter() {
match key.as_ref() {
"lock" => {
if let Ok(d) = value.parse::<u8>() {
lock = d;
}
}
"max_file_size" => {
if let Ok(v) = ByteSize::from_str(&value) {
max_file_size = v.0 as usize;
}
}
"eviction" => eviction = true,
_ => {}
}
}

Ok(Self {
storage: &*MEM_BACKEND,
proxy_step,
eviction,
lock,
max_file_size,
})
}
}

#[async_trait]
impl ProxyPlugin for Cache {
#[inline]
fn step(&self) -> ProxyPluginStep {
self.proxy_step
}
#[inline]
fn category(&self) -> ProxyPluginCategory {
ProxyPluginCategory::Cache
}
#[inline]
async fn handle(&self, session: &mut Session, _ctx: &mut State) -> pingora::Result<bool> {
if ![Method::GET, Method::HEAD].contains(&session.req_header().method) {
return Ok(false);
}
let eviction = if self.eviction {
None
} else {
Some(&*EVICTION_MANAGER as &'static (dyn EvictionManager + Sync))
};

let lock = match self.lock {
1 => Some(&*CACHE_LOCK_ONE_SECOND),
2 => Some(&*CACHE_LOCK_TWO_SECONDS),
3 => Some(&*CACHE_LOCK_THREE_SECONDS),
_ => None,
};

session
.cache
.enable(self.storage, eviction, Some(&*PREDICTOR), lock);
if self.max_file_size > 0 {
session.cache.set_max_file_size_bytes(self.max_file_size);
}

Ok(false)
}
}
5 changes: 5 additions & 0 deletions src/plugin/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use std::num::ParseIntError;

mod admin;
mod basic_auth;
mod cache;
mod compression;
mod directory;
mod ip_limit;
Expand Down Expand Up @@ -138,6 +139,10 @@ pub fn init_proxy_plugins(confs: Vec<(String, ProxyPluginConf)>) -> Result<()> {
let b = basic_auth::BasicAuth::new(&conf.value, step)?;
plguins.insert(name, Box::new(b));
}
ProxyPluginCategory::Cache => {
let c = cache::Cache::new(&conf.value, step)?;
plguins.insert(name, Box::new(c));
}
};
}

Expand Down
44 changes: 38 additions & 6 deletions src/proxy/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,20 +24,20 @@ use base64::{engine::general_purpose::STANDARD, Engine};
use bytes::Bytes;
use http::StatusCode;
use log::{error, info};
use pingora::cache::cache_control::CacheControl;
use pingora::cache::filters::resp_cacheable;
use pingora::cache::{CacheMetaDefaults, NoCacheReason, RespCacheable};
use pingora::http::{RequestHeader, ResponseHeader};
use pingora::listeners::TlsSettings;
use pingora::protocols::http::error_resp;
use pingora::protocols::Digest;
use pingora::proxy::{http_proxy_service, HttpProxy};
use pingora::proxy::{ProxyHttp, Session};
use pingora::server::configuration;
use pingora::services::background::GenBackgroundService;
use pingora::services::listening::Service;
use pingora::services::Service as IService;
use pingora::upstreams::peer::Peer;
use pingora::{
proxy::{ProxyHttp, Session},
upstreams::peer::HttpPeer,
};
use pingora::upstreams::peer::{HttpPeer, Peer};
use snafu::{ResultExt, Snafu};
use std::fs;
use std::sync::atomic::{AtomicI32, AtomicU64, Ordering};
Expand Down Expand Up @@ -168,6 +168,8 @@ pub struct ServerServices {
pub bg_services: Vec<Box<dyn IService>>,
}

const META_DEFAULTS: CacheMetaDefaults = CacheMetaDefaults::new(|_| Some(1), 1, 1);

impl Server {
pub fn new(conf: ServerConf) -> Result<Self> {
let mut upstreams = vec![];
Expand Down Expand Up @@ -299,7 +301,6 @@ impl ProxyHttp for Server {
self.serve_admin(session, ctx).await?;
return Ok(true);
}
// session.cache.enable(storage, eviction, predictor, cache_lock)

let header = session.req_header_mut();
let path = header.uri.path();
Expand Down Expand Up @@ -337,6 +338,37 @@ impl ProxyHttp for Server {

Ok(false)
}

fn response_cache_filter(
&self,
session: &Session,
resp: &ResponseHeader,
_ctx: &mut Self::CTX,
) -> pingora::Result<RespCacheable> {
if !session.cache.enabled() {
return Ok(RespCacheable::Uncacheable(NoCacheReason::Custom("default")));
}
let cc = CacheControl::from_resp_headers(resp);
Ok(resp_cacheable(cc.as_ref(), resp, false, &META_DEFAULTS))
}

async fn response_filter(
&self,
session: &mut Session,
upstream_response: &mut ResponseHeader,
_ctx: &mut Self::CTX,
) -> pingora::Result<()>
where
Self::CTX: Send + Sync,
{
upstream_response.insert_header("x-cache-status", session.cache.phase().as_str())?;
if let Some(d) = session.cache.lock_duration() {
upstream_response.insert_header("x-cache-lock-time-ms", format!("{}", d.as_millis()))?
}

Ok(())
}

async fn proxy_upstream_filter(
&self,
session: &mut Session,
Expand Down
11 changes: 11 additions & 0 deletions web/src/components/form-editor.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ export enum ProxyPluginCategory {
IP_LIMIT = 7,
KEY_AUTH = 8,
BASIC_AUTH = 9,
CACHE = 10,
}

export function formatProxyPluginCategory(value: ProxyPluginCategory) {
Expand Down Expand Up @@ -92,6 +93,9 @@ export function formatProxyPluginCategory(value: ProxyPluginCategory) {
case ProxyPluginCategory.BASIC_AUTH: {
return "basicAuth";
}
case ProxyPluginCategory.CACHE: {
return "cache";
}
}
}

Expand Down Expand Up @@ -258,6 +262,13 @@ function FormProxyPluginField({
}
break;
}
case ProxyPluginCategory.CACHE: {
arr.push(value);
fields.push({
label: "The cache storage url",
});
break;
}
default: {
arr.push(value);
fields.push({
Expand Down
5 changes: 5 additions & 0 deletions web/src/pages/proxy-plugin-info.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,11 @@ export default function ProxyPluginInfo() {
option: 9,
value: ProxyPluginCategory.BASIC_AUTH,
},
{
label: "Cache",
option: 10,
value: ProxyPluginCategory.CACHE,
},
],
},
{
Expand Down

0 comments on commit 21c0f5c

Please sign in to comment.