Skip to content

Commit

Permalink
Add last N message cache and broadcast on join
Browse files Browse the repository at this point in the history
  • Loading branch information
knadh committed Mar 8, 2020
1 parent 6405c78 commit 33c3c94
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 21 deletions.
2 changes: 1 addition & 1 deletion internal/hub/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func (p *Peer) writeWSControl(control int, payload []byte) error {

// processMessage processes incoming messages from peers.
func (p *Peer) processMessage(b []byte) {
var m msgWrap
var m payloadMsgWrap

if err := json.Unmarshal(b, &m); err != nil {
// TODO: Respond
Expand Down
61 changes: 42 additions & 19 deletions internal/hub/room.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,18 @@ import (
"github.com/gorilla/websocket"
)

type msgWrap struct {
type payloadMsgWrap struct {
Type string `json:"type"`
Timestamp time.Time `json:"timestamp"`
Data interface{} `json:"data"`
}

type msgPeer struct {
type payloadMsgPeer struct {
ID string `json:"id"`
Handle string `json:"handle"`
}

type msgChat struct {
type payloadMsgChat struct {
PeerID string `json:"peer_id"`
PeerHandle string `json:"peer_handle"`
Msg string `json:"message"`
Expand Down Expand Up @@ -55,22 +55,22 @@ type Room struct {
disposeSig chan bool
closed bool

// Counter for auto generated peer handles.
counter int
// Message / payload cache.
payloadCache [][]byte
}

// NewRoom returns a new instance of Room.
func NewRoom(id, name string, password []byte, h *Hub) *Room {
return &Room{
ID: id,
Name: name,
Password: password,
hub: h,
peers: make(map[*Peer]bool, 100),
broadcastQ: make(chan []byte, 100),
peerQ: make(chan peerReq, 100),
disposeSig: make(chan bool),
counter: 1,
ID: id,
Name: name,
Password: password,
hub: h,
peers: make(map[*Peer]bool, 100),
broadcastQ: make(chan []byte, 100),
peerQ: make(chan peerReq, 100),
disposeSig: make(chan bool),
payloadCache: make([][]byte, 0, h.cfg.MaxCachedMessages),
}
}

Expand All @@ -88,6 +88,7 @@ func (r *Room) Dispose() {

// Broadcast broadcasts a message to all connected peers.
func (r *Room) Broadcast(data []byte) {
r.recordMsgPayload(data)
r.broadcastQ <- data

// Extend the room's expiry.
Expand Down Expand Up @@ -131,6 +132,13 @@ loop:
// Send the peer its info.
req.peer.SendData(r.makePeerUpdatePayload(req.peer, TypePeerInfo))

// Send the peer last N message.
if r.hub.cfg.MaxCachedMessages > 0 {
for _, b := range r.payloadCache {
req.peer.SendData(b)
}
}

// Notify all peers of the new addition.
r.Broadcast(r.makePeerUpdatePayload(req.peer, TypePeerJoin))
r.hub.log.Printf("%s@%s joined %s", req.peer.Handle, req.peer.ID, r.ID)
Expand Down Expand Up @@ -183,6 +191,21 @@ func (r *Room) remove() {
r.hub.removeRoom(r.ID)
}

// recordMsgPayload records message payloads (events) sent out. It maintains last
// N messages to be sent to new users when they join.
func (r *Room) recordMsgPayload(b []byte) {
if r.hub.cfg.MaxCachedMessages == 0 {
return
}

n := len(r.payloadCache)
if n >= r.hub.cfg.MaxCachedMessages {
r.payloadCache = r.payloadCache[1:]
}

r.payloadCache = append(r.payloadCache, b)
}

// queuePeerReq queues a peer addition / removal request to the room.
func (r *Room) queuePeerReq(reqType string, p *Peer) {
if r.closed {
Expand All @@ -208,17 +231,17 @@ func (r *Room) sendPeerList(p *Peer) {

// makePeerListPayload prepares a message payload with the list of peers.
func (r *Room) makePeerListPayload() []byte {
peers := make([]msgPeer, 0, len(r.peers))
peers := make([]payloadMsgPeer, 0, len(r.peers))
for p := range r.peers {
peers = append(peers, msgPeer{ID: p.ID, Handle: p.Handle})
peers = append(peers, payloadMsgPeer{ID: p.ID, Handle: p.Handle})
}
return r.makePayload(peers, TypePeerList)
}

// makePeerUpdatePayload prepares a message payload representing a peer
// join / leave event.
func (r *Room) makePeerUpdatePayload(p *Peer, peerUpdateType string) []byte {
d := msgPeer{
d := payloadMsgPeer{
ID: p.ID,
Handle: p.Handle,
}
Expand All @@ -227,7 +250,7 @@ func (r *Room) makePeerUpdatePayload(p *Peer, peerUpdateType string) []byte {

// makeMessagePayload prepares a chat message.
func (r *Room) makeMessagePayload(msg string, p *Peer) []byte {
d := msgChat{
d := payloadMsgChat{
PeerID: p.ID,
PeerHandle: p.Handle,
Msg: msg,
Expand All @@ -237,7 +260,7 @@ func (r *Room) makeMessagePayload(msg string, p *Peer) []byte {

// makePayload prepares a message payload.
func (r *Room) makePayload(data interface{}, typ string) []byte {
m := msgWrap{
m := payloadMsgWrap{
Timestamp: time.Now(),
Type: typ,
Data: data,
Expand Down
2 changes: 1 addition & 1 deletion theme/static/style.css
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,7 @@ form .help {

/* Peer list sidebar */
.chat .sidebar {
background: #fff;
width: 25%;
}
.chat .peers {
Expand Down Expand Up @@ -349,7 +350,6 @@ form .help {
}
.chat .sidebar {
width: 25%;
background: #fafafa;
position: fixed;
top: 0;
right: 0;
Expand Down

0 comments on commit 33c3c94

Please sign in to comment.