Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

client-daemon communication via gRPC #62

Draft
wants to merge 8 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion daemon/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,13 @@ notify-rust = "4"
reqwest = { version = "0.11.11", features = ["json"] }
common = { path = "../common", features = ["rocket"] }
anyhow = "1.0.61"
tokio = "1.20.1"
tokio = { version = "1.20.1", features = ["rt-multi-thread", "macros"] }
tokio-stream = "0.1"
log = "0.4.14"
env_logger = "0.9.0"
serde_json = "1.0"
tonic = "0.8.2"
prost = "0.11"

[build-dependencies]
tonic-build = "0.8"
5 changes: 5 additions & 0 deletions daemon/build.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
fn main() -> Result<(), Box<dyn std::error::Error>> {
tonic_build::compile_protos("../proto/client_daemon.proto")?;
println!("cargo:rerun-if-changed=migrations");
Ok(())
}
72 changes: 72 additions & 0 deletions daemon/src/client_server.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
use client_daemon::client_daemon_server::ClientDaemon;
use client_daemon::{
ChatHistoryRequest, ChatHistoryResponse, DeleteMessageRequest, DeleteMessageResponse,
EditMessageRequest, EditMessageResponse, Event, SendMessageRequest, SendMessageResponse,
};
use tokio_stream::wrappers::ReceiverStream;
use tonic::{Request, Response, Status};

pub mod client_daemon {
tonic::include_proto!("clientdaemon");
}

#[derive(Debug)]
// FIXME: name this something better
pub struct ClientDaemonService {}

#[tonic::async_trait]
impl ClientDaemon for ClientDaemonService {
async fn get_chat_history(
&self,
request: Request<ChatHistoryRequest>,
) -> Result<Response<ChatHistoryResponse>, Status> {
println!("Got chat history request: {:?}", request);

// TODO: Implement returning chat history with database, etc.
return Ok(Response::new(ChatHistoryResponse::default()));
}

type SubscribeToEventsStream = ReceiverStream<Result<Event, Status>>;

async fn subscribe_to_events(
&self,
request: Request<()>,
) -> Result<Response<Self::SubscribeToEventsStream>, Status> {
println!("Received subscribe to events request: {:?}", request);
// FIXME: Implement this
let (_tx, rx) = tokio::sync::mpsc::channel(4);

// TODO: send events as they come
return Ok(Response::new(ReceiverStream::new(rx)));
}

async fn send_message(
&self,
request: Request<SendMessageRequest>,
) -> Result<Response<SendMessageResponse>, Status> {
println!("Received send message request: {:?}", request);

// TODO: implement sending messages
return Ok(Response::new(SendMessageResponse::default()));
}

async fn edit_message(
&self,
request: Request<EditMessageRequest>,
) -> Result<Response<EditMessageResponse>, Status> {
println!("Received edit message request: {:?}", request);

// TODO: implement editing messages
return Ok(Response::new(EditMessageResponse::default()));
}

async fn delete_message(
&self,
request: Request<DeleteMessageRequest>,
) -> Result<Response<DeleteMessageResponse>, Status> {
println!("Received delete message request: {:?}", request);

// TODO: implement deleting messages
return Ok(Response::new(DeleteMessageResponse::default()));
}
}
14 changes: 13 additions & 1 deletion daemon/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
use reqwest;

use client_server::client_daemon::client_daemon_server::ClientDaemonServer;
use common::structures::{DiscoveryRequest, DiscoveryResponse, InfoResponse};
use discovery::{discover, discover_info, discover_root, DiscoveryServerConfig};
use std::net::{IpAddr, Ipv4Addr};
use tonic::transport::Server;

pub mod client_server;
pub mod contact;
pub mod discovery;
pub mod notif;

#[tokio::main]
async fn main() {
async fn main() -> Result<(), Box<dyn std::error::Error>> {
env_logger::init();
println!("Hello, world!");

Expand Down Expand Up @@ -68,4 +71,13 @@ async fn main() {
}
Err(_) => eprintln!("Could not connect to server. Possible it does not exist yet."),
}

// tonic gRPC server for the communication with the client
let addr = "0.0.0.0:5768".parse()?;
let cd_svc = client_server::ClientDaemonService {};
let cd_srv = ClientDaemonServer::new(cd_svc);
println!("Client gRPC server on {}", addr);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like you implement the connection here, is it work?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • working

Copy link
Member Author

@Sophon96 Sophon96 Nov 12, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep! If you run it, you can use a gRPC client like BloomRPC and import proto/client_daemon.proto and fire off RPC calls, though all of them are just blank for now.

Server::builder().add_service(cd_srv).serve(addr).await?;

Ok(())
}
104 changes: 104 additions & 0 deletions proto/client_daemon.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
syntax = "proto3";
package clientdaemon;

import "google/protobuf/empty.proto";

// Service for talking between the client and the daemon
// TODO: Come up with a better name
service ClientDaemon {
// Get the chat history with some user
rpc GetChatHistory(ChatHistoryRequest) returns (ChatHistoryResponse);

// Subscribe to events emitted by the daemon
rpc SubscribeToEvents(google.protobuf.Empty) returns (stream Event);

// Send a message
rpc SendMessage(SendMessageRequest) returns (SendMessageResponse);
// Edit a message
rpc EditMessage(EditMessageRequest) returns (EditMessageResponse);
// Delete a message
rpc DeleteMessage(DeleteMessageRequest) returns (DeleteMessageResponse);
}

// request to get chat history
message ChatHistoryRequest {
// user to get chat history with
string username = 1;
// id of last message received
string last_id = 2;
}

// chat history with a user
message ChatHistoryResponse {
// all of the messages
repeated Message messages = 1;
}


// An event emitted by the daemon
message Event {
// New or edited message
message MessageEvent {
// edited message
bool edit = 1;
// who sent it
string sender = 2;
// the message
Message message = 3;
}

// Deleted message
message MessageDeleteEvent {
// who
string sender = 1;
// message id
string id = 2;
}

// Other user request to talk
message ChatRequestEvent {
// user name of the user
string username = 1;
}

oneof event {
MessageEvent message = 1;
MessageDeleteEvent message_delete = 2;
ChatRequestEvent chat_request = 3;
}
}


message SendMessageRequest {
string recipient = 1;
Message message = 2;
}

// Currently blank, just here for the future
message SendMessageResponse {}


message EditMessageRequest {
string recipient = 1;
Message message = 2;
}

// Currently blank, just here for the future
message EditMessageResponse {}

message DeleteMessageRequest {
string recipient = 1;
Message message = 2;
}

// Currently blank, just here for the future
message DeleteMessageResponse {}

message Message {
string id = 1;
uint64 timestamp = 2;
string user = 3;
string content = 4;
// TODO: figure out how to do attachments
// repeated bytes attachments = 5;
}