Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] NOTIFY Command #64

Closed
wants to merge 15 commits into from
171 changes: 164 additions & 7 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@ use nom::IResult;
use std::io::{self, Read, Write};
use std::net::{TcpStream, ToSocketAddrs};
use std::time::Duration;
use std::collections::HashMap;

use super::authenticator::Authenticator;
use super::error::{Error, ParseError, Result, ValidateError};
use super::parse::{parse_authenticate_response, parse_capabilities, parse_fetches, parse_mailbox,
parse_names};
parse_names, parse_notify_status};
use super::types::*;

static TAG_PREFIX: &'static str = "a";
Expand Down Expand Up @@ -54,6 +55,46 @@ pub struct IdleHandle<'a, T: Read + Write + 'a> {
done: bool,
}

/// A single item that a `NOTIFY` command should register for
pub struct NotifyItem<'a> {
mailbox: &'a str,
events: &'a[&'a str],
}

/// A builder for a `NOTIFY` operation
pub struct NotifyOp<'a> {
items: Vec<NotifyItem<'a>>,
}

impl<'a> NotifyOp<'a> {
pub fn new() -> Self {
NotifyOp {
items: vec![],
}
}

/// Add a new mailbox specification to the command builder
pub fn add_mailbox(mut self, mailbox_spec: &'a str, events: &'a[&'a str]) -> NotifyOp<'a> {
self.items.push(NotifyItem {
mailbox: mailbox_spec,
events: events,
});
self
}

pub fn is_none(&self) -> bool {
self.items.is_empty()
}
}


#[derive(Debug)]
pub struct NotifyHandle<'a, T: Read + Write + 'a> {
client: &'a mut Client<T>,
keepalive: Duration,
}


/// Must be implemented for a transport in order for a `Client` using that transport to support
/// operations with timeouts.
///
Expand All @@ -68,6 +109,75 @@ pub trait SetReadTimeout {
fn set_read_timeout(&mut self, timeout: Option<Duration>) -> Result<()>;
}

impl<'a, T: Read + Write + 'a> NotifyHandle<'a, T> {
fn new(client: &'a mut Client<T>) -> Result<Self> {
Ok(NotifyHandle {
client: client,
keepalive: Duration::from_secs(29 * 60),
})
}

/// Replace the currently running `NOTIFY` operation or create a new one
pub fn set(&mut self, cmd: &NotifyOp) -> Result<()> {
// This command will end in a tagged OK but if the NOTIFY args contains
// a `STATUS` identifier, one status line for each specified mailbox
// will be emitted before the final OK.
// self.client.run_command_and_read_response(&format!("NOTIFY {}"))

let mut s = "NOTIFY ".to_owned();

if cmd.items.len() == 0 {
s += "NONE";
} else {
s += "SET ";
for i in &cmd.items {
s += &format!("(mailboxes {} ({}))",
i.mailbox,
i.events.iter().fold(String::new(), |sofar, cur| format!("{} {}", sofar, cur)));
}
};

self.client.run_command_and_check_ok(&s)
}

/// Replace the currently running `NOTIFY` operation or create a new one.
/// This version adds a `STATUS` flag that results in a list of all specified mailboxes' statuses being returned.
pub fn set_status(&mut self, cmd: &NotifyOp) -> Result<HashMap<String, Mailbox>> {
if cmd.items.is_empty() {
panic!("Cannot request a STATUS response without a mailbox specification!");
}

let mut s = "NOTIFY ".to_owned();

if cmd.items.is_empty() {
s += "NONE";
} else {
s += "SET STATUS ";
for i in &cmd.items {
s += &format!("(mailboxes {} ({}))",
i.mailbox,
i.events.iter().fold(String::new(), |sofar, cur| format!("{} {}", sofar, cur)));
}
};

let response: Vec<u8> = self.client.run_command_and_read_response(&s)?;
parse_notify_status(&response)
}

/// Waits for a registered `NOTIFY` event.
/// Returns the event's mailbox name and data.
pub fn wait(&mut self) -> Result<(String, Mailbox)> {
let mut response :Vec<u8> = Vec::new();
self.client.readline(&mut response).unwrap();
let mailboxes = parse_notify_status(&response);

// Exactly one mailbox per line.
// TODO might be a little expensive to create a new HashMap even if we definitely there is exactly one
let mailbox = mailboxes.unwrap().into_iter().next().unwrap();
Ok(mailbox)
}
}

impl<'a, T: Read + Write + 'a> IdleHandle<'a, T> {
fn new(client: &'a mut Client<T>) -> Result<Self> {
let mut h = IdleHandle {
Expand Down Expand Up @@ -97,8 +207,8 @@ impl<'a, T: Read + Write + 'a> IdleHandle<'a, T> {
}

self.client.read_response_onto(&mut v)?;
// We should *only* get a continuation on an error (i.e., it gives BAD or NO).
unreachable!();
// We should *only* get a continuation on an error (i.e., it gives BAD or NO).
unreachable!();
}

fn terminate(&mut self) -> Result<()> {
Expand Down Expand Up @@ -297,13 +407,13 @@ impl<T: Read + Write> Client<T> {

/// Selects a mailbox
pub fn select(&mut self, mailbox_name: &str) -> Result<Mailbox> {
self.run_command_and_read_response(&format!("SELECT {}", validate_str(mailbox_name)?))
self.run_command_and_read_response(&format!("SELECT {}", validate_str(mailbox_name)?))
.and_then(|lines| parse_mailbox(&lines[..]))
}

/// Examine is identical to Select, but the selected mailbox is identified as read-only
pub fn examine(&mut self, mailbox_name: &str) -> Result<Mailbox> {
self.run_command_and_read_response(&format!("EXAMINE {}", validate_str(mailbox_name)?))
self.run_command_and_read_response(&format!("EXAMINE {}", validate_str(mailbox_name)?))
.and_then(|lines| parse_mailbox(&lines[..]))
}

Expand Down Expand Up @@ -445,6 +555,17 @@ impl<T: Read + Write> Client<T> {
IdleHandle::new(self)
}

/// Returns a handle that can be used to issue NOTIFY requests to a server that supports it
pub fn notify(&mut self, op: &NotifyOp) -> Result<NotifyHandle<T>> {
NotifyHandle::new(self).and_then(|mut handle| handle.set(op).and_then(|_| Ok(handle)))
}

/// Returns a handle that can be used to issue NOTIFY requests to a server that supports it
/// and also immediately returns `STATUS` messages for all mailboxes specified
pub fn notify_status(&mut self, op: &NotifyOp) -> Result<(NotifyHandle<T>, HashMap<String,Mailbox>)> {
NotifyHandle::new(self).and_then(|mut handle| handle.set_status(op).and_then(|mailboxes| Ok((handle,mailboxes)) ) )
}

/// The APPEND command adds a mail to a mailbox.
pub fn append(&mut self, folder: &str, content: &[u8]) -> Result<()> {
try!(self.run_command(&format!("APPEND \"{}\" {{{}}}", folder, content.len())));
Expand Down Expand Up @@ -513,11 +634,11 @@ impl<T: Read + Write> Client<T> {
Some(match status {
Status::Bad | Status::No => {
Err((status, information.map(|s| s.to_string())))
}
}
Status::Ok => Ok(()),
status => Err((status, None)),
})
}
}
IResult::Done(..) => None,
IResult::Incomplete(..) => {
continue_from = Some(line_start);
Expand Down Expand Up @@ -939,6 +1060,42 @@ mod tests {
);
}

#[test]
fn notify_status() {
let response = b"* STATUS Spam (MESSAGES 42 UIDNEXT 1234 UIDVALIDITY 1224144230 UNSEEN 42)\r\n\
* STATUS Trash (MESSAGES 420 UIDNEXT 10234 UIDVALIDITY 1224144231 UNSEEN 23)\r\n\
* NO [SERVERBUG] Internal error occured. Refer to server log for more information.\r\n\
* STATUS RustNews (MESSAGES 1 UIDNEXT 2 UIDVALIDITY 1224144232 UNSEEN 1)\r\n\
a1 OK NOTIFY completed.\r\n"
.to_vec();

let expected_mailboxes: HashMap<String, Mailbox> =
[("Spam".to_owned(), Mailbox { exists: 42, uid_next: Some(1234), uid_validity: Some(1224144230), unseen: Some(42), flags: vec![], permanent_flags: vec![], recent: 0}),
("Trash".to_owned(), Mailbox { exists: 420, uid_next: Some(10234), uid_validity: Some(1224144231), unseen: Some(23), flags: vec![], permanent_flags: vec![], recent: 0}),
("RustNews".to_owned(), Mailbox { exists: 1, uid_next: Some(2), uid_validity: Some(1224144232), unseen: Some(1), flags: vec![], permanent_flags: vec![], recent: 0}),
].iter().cloned().collect();

let mock_stream = MockStream::new(response);
let mut client = Client::new(mock_stream);

let op = NotifyOp::new()
.add_mailbox("Spam", &["MessageNew"])
.add_mailbox("Trash", &["MessageNew", "MessageExpunge"])
.add_mailbox("SomeBug", &["MessageNew"])
.add_mailbox("RustNews", &["MessageNew", "MessageExpunge"]);

let (_notify_handle, mailboxes) = client.notify_status(&op).unwrap();

assert_eq!(mailboxes.len(), 3); // not 4
assert!(mailboxes.contains_key("Spam"));
assert!(mailboxes.contains_key("Trash"));
assert!(mailboxes.contains_key("RustNews"));

for (k, v) in mailboxes {
assert_eq!(v, *expected_mailboxes.get(&k).unwrap());
}
}

#[test]
fn store() {
generic_store(" ", |c, set, query| c.store(set, query));
Expand Down
80 changes: 70 additions & 10 deletions src/parse.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use imap_proto::{self, Response};
use imap_proto::{self, Response, StatusAttribute};
use nom::IResult;
use regex::Regex;
use std::collections::HashMap;

//use super::mailbox::Mailbox;
use super::error::{Error, ParseError, Result};
use super::types::*;

Expand Down Expand Up @@ -35,11 +37,11 @@ where
match map(resp) {
MapOrNot::Map(t) => things.push(t),
MapOrNot::Not(resp) => break Err(resp.into()),
}
}

if lines.is_empty() {
break Ok(things);
}
}
}
_ => {
break Err(Error::Parse(ParseError::Invalid(lines.to_vec())));
Expand All @@ -51,6 +53,56 @@ where
ZeroCopy::new(lines, f)
}

pub fn parse_notify_status(mut lines: &[u8]) -> Result<HashMap<String, Mailbox>> {
let mut mailboxes = HashMap::new();
loop {

match imap_proto::parse_response(&lines) {
// Response contains data relating to a mailbox, exactly one mailbox per line
IResult::Done(rest, Response::MailboxData(m)) => {
lines = rest;

use imap_proto::MailboxDatum;
match m {
MailboxDatum::Status { mailbox: mailbox_name, status } => {
let mut mailbox = Mailbox::default();

for f in status.into_iter() {
match f {
StatusAttribute::Recent(r) => mailbox.recent = r,
StatusAttribute::Unseen(u) => mailbox.unseen = Some(u),
StatusAttribute::UidNext(u) => mailbox.uid_next = Some(u),
StatusAttribute::UidValidity(u) => mailbox.uid_validity = Some(u),
StatusAttribute::Messages(m) => mailbox.exists = m,
}
}

mailboxes.insert(mailbox_name.to_owned(), mailbox);
if lines.is_empty() {
break Ok(mailboxes)
}
},
_ => {}
}
},
IResult::Done(rest, Response::Data { ref status, ..}) if *status == imap_proto::Status::No => {
// It is okay to get a NO response in a data line. This happens when the IMAP server
// iterates over the mail folder and finds subfolders that are not mailboxes. This is
// not a client error but a server misconfiguration and happens only to single mailboxes.
// Other lines may contain valid mailbox statuses, so be a little more reluctant here.
lines = rest;
}
IResult::Done(_, resp) => {
break Err(resp.into());
}
_ => {
break Err(Error::Parse(ParseError::Invalid(lines.to_vec())));
}
}
}
}


pub fn parse_names(lines: Vec<u8>) -> ZeroCopyResult<Vec<Name>> {
use imap_proto::MailboxDatum;
let f = |resp| match resp {
Expand Down Expand Up @@ -84,7 +136,7 @@ pub fn parse_fetches(lines: Vec<u8>) -> ZeroCopyResult<Vec<Fetch>> {
uid: None,
rfc822_header: None,
rfc822: None,
};
};

for attr in attrs {
use imap_proto::AttributeValue;
Expand All @@ -95,12 +147,12 @@ pub fn parse_fetches(lines: Vec<u8>) -> ZeroCopyResult<Vec<Fetch>> {
AttributeValue::Uid(uid) => fetch.uid = Some(uid),
AttributeValue::Rfc822(rfc) => fetch.rfc822 = rfc,
AttributeValue::Rfc822Header(rfc) => fetch.rfc822_header = rfc,
_ => {}
}
}
_ => {}
}
}

MapOrNot::Map(fetch)
}
}
resp => MapOrNot::Not(resp),
};

Expand Down Expand Up @@ -172,8 +224,16 @@ pub fn parse_mailbox(mut lines: &[u8]) -> Result<Mailbox> {

use imap_proto::MailboxDatum;
match m {
MailboxDatum::Status { .. } => {
// TODO: we probably want to expose statuses too
MailboxDatum::Status { mailbox: _, status } => {
for f in status.into_iter() {
match f {
StatusAttribute::Recent(r) => mailbox.recent = r,
StatusAttribute::Unseen(u) => mailbox.unseen = Some(u),
StatusAttribute::UidNext(u) => mailbox.uid_next = Some(u),
StatusAttribute::UidValidity(u) => mailbox.uid_validity = Some(u),
StatusAttribute::Messages(m) => mailbox.exists = m,
}
}
}
MailboxDatum::Exists(e) => {
mailbox.exists = e;
Expand Down