Skip to content

Commit

Permalink
feat(momento-proxy): add delete support (#102)
Browse files Browse the repository at this point in the history
Adds support for delete/del commands for memcahce and redis
protocols.
  • Loading branch information
brayniac authored Oct 19, 2023
1 parent 8bf7964 commit 1cf5f56
Show file tree
Hide file tree
Showing 7 changed files with 203 additions and 3 deletions.
53 changes: 51 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion src/proxy/momento/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ config = { path = "../../config" }
libc = { workspace = true }
logger = { path = "../../logger" }
metriken = { workspace = true }
momento = "0.30.0"
momento = "0.32.0"
net = { path = "../../net" }
protocol-admin = { path = "../../protocol/admin" }
protocol-memcache = { path = "../../protocol/memcache" }
Expand Down
11 changes: 11 additions & 0 deletions src/proxy/momento/src/frontend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,14 @@ pub(crate) async fn handle_memcache_client(
let request = request.into_inner();

match request {
memcache::Request::Delete(r) => {
if memcache::delete(&mut client, &cache_name, &mut socket, &r)
.await
.is_err()
{
break;
}
}
memcache::Request::Get(r) => {
if memcache::get(&mut client, &cache_name, &mut socket, r.keys())
.await
Expand Down Expand Up @@ -108,6 +116,9 @@ pub(crate) async fn handle_resp_client(

let result: ProxyResult = async {
match &request {
resp::Request::Del(r) => {
resp::del(&mut client, &cache_name, &mut response_buf, r).await?
}
resp::Request::Get(r) => {
resp::get(&mut client, &cache_name, &mut response_buf, r.key()).await?
}
Expand Down
90 changes: 90 additions & 0 deletions src/proxy/momento/src/protocol/memcache/delete.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
use crate::klog::{klog_1, Status};
use crate::{Error, *};
use ::net::*;
use protocol_memcache::*;

pub async fn delete(
client: &mut SimpleCacheClient,
cache_name: &str,
socket: &mut tokio::net::TcpStream,
request: &protocol_memcache::Delete,
) -> Result<(), Error> {
let key = request.key();

// check if the key is invalid before sending the requests to the backend
if std::str::from_utf8(key).is_err() {
GET_EX.increment();

// invalid key
let _ = socket.write_all(b"ERROR\r\n").await;
return Err(Error::from(ErrorKind::InvalidInput));
}

BACKEND_REQUEST.increment();

match timeout(Duration::from_millis(200), client.delete(cache_name, key)).await {
Ok(Ok(_result)) => {
// it appears we can't tell deleted from not found in the momento
// protocol, so we treat all non-error responses as if the key has
// been deleted

DELETE_DELETED.increment();

if request.noreply() {
klog_1(&"delete", &key, Status::Deleted, 0);
} else {
klog_1(&"delete", &key, Status::Deleted, 8);
SESSION_SEND.increment();
SESSION_SEND_BYTE.add(8);
TCP_SEND_BYTE.add(8);
if let Err(e) = socket.write_all(b"DELETED\r\n").await {
SESSION_SEND_EX.increment();
// hangup if we can't send a response back
return Err(e);
}
}
}
Ok(Err(e)) => {
BACKEND_EX.increment();

DELETE_EX.increment();
SESSION_SEND.increment();

klog_1(&"delete", &key, Status::ServerError, 0);

let message = format!("SERVER_ERROR {e}\r\n");

SESSION_SEND_BYTE.add(message.len() as _);
TCP_SEND_BYTE.add(message.len() as _);

if let Err(e) = socket.write_all(message.as_bytes()).await {
SESSION_SEND_EX.increment();
// hangup if we can't send a response back
return Err(e);
}
}
Err(_) => {
// timeout
BACKEND_EX.increment();
BACKEND_EX_TIMEOUT.increment();

DELETE_EX.increment();
SESSION_SEND.increment();

klog_1(&"delete", &key, Status::Timeout, 0);

let message = "SERVER_ERROR backend timeout\r\n";

SESSION_SEND_BYTE.add(message.len() as _);
TCP_SEND_BYTE.add(message.len() as _);

if let Err(e) = socket.write_all(message.as_bytes()).await {
SESSION_SEND_EX.increment();
// hangup if we can't send a response back
return Err(e);
}
}
}

Ok(())
}
2 changes: 2 additions & 0 deletions src/proxy/momento/src/protocol/memcache/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@

pub use protocol_memcache::{Request, RequestParser};

mod delete;
mod get;
mod set;

pub use delete::*;
pub use get::*;
pub use set::*;
46 changes: 46 additions & 0 deletions src/proxy/momento/src/protocol/resp/del.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
use crate::klog::*;
use crate::*;
use protocol_resp::*;
use std::io::Write;

use super::update_method_metrics;

pub async fn del(
client: &mut SimpleCacheClient,
cache_name: &str,
response_buf: &mut Vec<u8>,
req: &Del,
) -> ProxyResult {
let keys: Vec<&[u8]> = req.keys().iter().map(|k| &**k).collect();

for key in keys {
let mut client = client.clone();

update_method_metrics(&DEL, &DEL_EX, async move {
match timeout(Duration::from_millis(200), client.delete(cache_name, key)).await {
Ok(Ok(_)) => {}
Ok(Err(e)) => {
klog_1(&"hdel", &key, Status::ServerError, 0);
return Err(ProxyError::from(e));
}
Err(e) => {
klog_1(&"hdel", &key, Status::Timeout, 0);
return Err(ProxyError::from(e));
}
}

Ok(())
})
.await?;
}

// NOTE: the Momento protocol does not inform us of how many keys are
// deleted. We lie to the client and say that they all were deleted.
write!(response_buf, ":{}\r\n", req.keys().len())?;

for key in req.keys() {
klog_1(&"del", &key, Status::Deleted, 0);
}

Ok(())
}
2 changes: 2 additions & 0 deletions src/proxy/momento/src/protocol/resp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::future::Future;
use momento::response::MomentoError;
pub use protocol_resp::{Request, RequestParser};

mod del;
mod get;
mod hdel;
mod hexists;
Expand Down Expand Up @@ -49,6 +50,7 @@ pub use self::sismember::*;
pub use self::smembers::*;
pub use self::srem::*;
pub use self::sunion::*;
pub use del::*;
pub use get::*;
pub use hdel::*;
pub use hexists::*;
Expand Down

0 comments on commit 1cf5f56

Please sign in to comment.