Skip to content

Commit

Permalink
Merge pull request #5 from zapnito/onDie-callback
Browse files Browse the repository at this point in the history
Add socket.onDie callback, closes #4.
  • Loading branch information
saschatimme authored Nov 19, 2016
2 parents 0477a71 + e6c1f8f commit d38d6ff
Show file tree
Hide file tree
Showing 4 changed files with 141 additions and 61 deletions.
34 changes: 23 additions & 11 deletions example/web/elm/src/Chat.elm
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type alias Model =
, state : State
, messages : List Message
, composedMessage : String
, accessToken : Int
}


Expand All @@ -53,6 +54,7 @@ initModel =
, messages = []
, state = LeftLobby
, composedMessage = ""
, accessToken = 1
}


Expand All @@ -73,6 +75,7 @@ type Msg
| NewMsg JD.Value
| UserJoinedMsg JD.Value
| SendComposedMessage
| RefreshAccessToken


update : Msg -> Model -> ( Model, Cmd Msg )
Expand Down Expand Up @@ -114,6 +117,9 @@ update message model =
Err err ->
model ! []

RefreshAccessToken ->
{ model | accessToken = model.accessToken + 1 } ! []



-- Decoder
Expand Down Expand Up @@ -143,9 +149,11 @@ lobbySocket =

{-| Initialize a socket with the default heartbeat intervall of 30 seconds
-}
socket : Socket
socket =
socket : Int -> Socket Msg
socket accessToken =
Socket.init lobbySocket
|> Socket.withParams [ ( "accessToken", toString accessToken ) ]
|> Socket.onDie RefreshAccessToken


lobby : String -> Channel Msg
Expand All @@ -162,16 +170,20 @@ lobby userName =

subscriptions : Model -> Sub Msg
subscriptions model =
case model.state of
JoiningLobby ->
Phoenix.connect socket [ lobby model.userName ]
let
connect =
Phoenix.connect (socket model.accessToken)
in
case model.state of
JoiningLobby ->
connect [ lobby model.userName ]

JoinedLobby ->
Phoenix.connect socket [ lobby model.userName ]
JoinedLobby ->
connect [ lobby model.userName ]

-- we already open the socket connection so that we can faster join the lobby
_ ->
Phoenix.connect socket []
-- we already open the socket connection so that we can faster join the lobby
_ ->
connect []



Expand Down Expand Up @@ -235,7 +247,7 @@ button model =
Html.button [ buttonClass False, Events.onClick (UpdateState LeavingLobby) ] [ Html.text "Leave lobby" ]


chatMessages : List (Message) -> Html Msg
chatMessages : List Message -> Html Msg
chatMessages messages =
Html.div [ Attr.class "chat-messages" ]
(List.map chatMessage messages)
Expand Down
61 changes: 38 additions & 23 deletions src/Phoenix.elm
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import Phoenix.Push as Push exposing (Push)


type MySub msg
= Connect Socket (List (Channel msg))
= Connect (Socket msg) (List (Channel msg))


{-| Declare a socket you want to connect to and the channels you want to join. The effect manager will open the socket connection, join the channels. See `Phoenix.Socket` and `Phoenix.Channel` for more configuration and behaviour details.
Expand All @@ -54,7 +54,7 @@ type MySub msg
**Note**: An empty channel list keeps the socket connection open.
-}
connect : Socket -> List (Channel msg) -> Sub msg
connect : Socket msg -> List (Channel msg) -> Sub msg
connect socket channels =
subscription (Connect socket channels)

Expand Down Expand Up @@ -101,7 +101,9 @@ subMap : (a -> b) -> MySub a -> MySub b
subMap func sub =
case sub of
Connect socket channels ->
Connect socket (List.map (InternalChannel.channelMap func) channels)
Connect
(socket |> InternalSocket.socketMap func)
(channels |> List.map (InternalChannel.channelMap func))


type alias Message =
Expand All @@ -113,15 +115,15 @@ type alias Message =


type alias State msg =
{ sockets : InternalSocketsDict
{ sockets : InternalSocketsDict msg
, channels : InternalChannelsDict msg
, selfCallbacks : Dict Ref (SelfCallback msg)
, channelQueues : ChannelQueuesDict msg
}


type alias InternalSocketsDict =
Dict Endpoint InternalSocket
type alias InternalSocketsDict msg =
Dict Endpoint (InternalSocket msg)


type alias InternalChannelsDict msg =
Expand Down Expand Up @@ -210,7 +212,7 @@ onEffects router cmds subs state =
-- BUILD SOCKETS


buildSocketsDict : List (MySub msg) -> Dict String Socket
buildSocketsDict : List (MySub msg) -> Dict String (Socket msg)
buildSocketsDict subs =
let
insert sub dict =
Expand Down Expand Up @@ -257,29 +259,29 @@ sendPushsHelp cmds state =
<&> sendPushsHelp rest


handleSocketsUpdate : Platform.Router msg (Msg msg) -> Dict String Socket -> InternalSocketsDict -> Task Never InternalSocketsDict
handleSocketsUpdate : Platform.Router msg (Msg msg) -> Dict String (Socket msg) -> InternalSocketsDict msg -> Task Never (InternalSocketsDict msg)
handleSocketsUpdate router definedSockets stateSockets =
let
-- leftStep: endpoints where we have to open a new socket connection
leftStep endpoint definedSocket getNewSockets =
-- addedSocketsStep: endpoints where we have to open a new socket connection
addedSocketsStep endpoint definedSocket taskChain =
let
socket =
(InternalSocket.internalSocket definedSocket)
in
getNewSockets
<&> \newSockets ->
taskChain
<&> \addedSockets ->
attemptOpen router 0 socket
<&> \pid ->
Task.succeed (Dict.insert endpoint (InternalSocket.opening 0 pid socket) newSockets)
Task.succeed (Dict.insert endpoint (InternalSocket.opening 0 pid socket) addedSockets)

-- we update the authentication parameters
bothStep endpoint definedSocket stateSocket getNewSockets =
Task.map (Dict.insert endpoint <| InternalSocket.update definedSocket stateSocket) getNewSockets
retainedSocketsStep endpoint definedSocket stateSocket taskChain =
taskChain |> Task.map (Dict.insert endpoint <| InternalSocket.update definedSocket stateSocket)

rightStep endpoint stateSocket getNewSockets =
InternalSocket.close stateSocket &> getNewSockets
removedSocketsStep endpoint stateSocket taskChain =
InternalSocket.close stateSocket &> taskChain
in
Dict.merge leftStep bothStep rightStep definedSockets stateSockets (Task.succeed Dict.empty)
Dict.merge addedSocketsStep retainedSocketsStep removedSocketsStep definedSockets stateSockets (Task.succeed Dict.empty)


handleChannelsUpdate : Platform.Router msg (Msg msg) -> InternalChannelsDict msg -> InternalChannelsDict msg -> Task Never (InternalChannelsDict msg)
Expand Down Expand Up @@ -358,7 +360,7 @@ sendJoinChannel router endpoint internalChannel =
-- STATE UPDATE HELPERS


updateSocket : Endpoint -> InternalSocket -> State msg -> State msg
updateSocket : Endpoint -> InternalSocket msg -> State msg -> State msg
updateSocket endpoint socket state =
{ state | sockets = Dict.insert endpoint socket state.sockets }

Expand All @@ -378,7 +380,7 @@ removeChannelQueue endpoint topic state =
{ state | channelQueues = Helpers.removeIn endpoint topic state.channelQueues }


insertSocket : Endpoint -> InternalSocket -> State msg -> State msg
insertSocket : Endpoint -> InternalSocket msg -> State msg -> State msg
insertSocket endpoint socket state =
{ state
| sockets = Dict.insert endpoint socket state.sockets
Expand Down Expand Up @@ -498,8 +500,21 @@ onSelfMsg router selfMsg state =

finalNewState pid =
Task.map (updateSocket endpoint (InternalSocket.opening backoffIteration pid internalSocket)) getNewState

notifyApp =
case
( internalSocket.socket.onDie
, internalSocket |> InternalSocket.isOpening
)
of
( Just onDie, False ) ->
Platform.sendToApp router onDie &> Task.succeed ()

_ ->
Task.succeed ()
in
attemptOpen router backoff internalSocket
notifyApp
&> attemptOpen router backoff internalSocket
|> Task.andThen finalNewState

Receive endpoint message ->
Expand Down Expand Up @@ -902,7 +917,7 @@ pushSocket endpoint message selfCb state =
-- OPENING WEBSOCKETS WITH EXPONENTIAL BACKOFF


attemptOpen : Platform.Router msg (Msg msg) -> Float -> InternalSocket -> Task x Process.Id
attemptOpen : Platform.Router msg (Msg msg) -> Float -> InternalSocket msg -> Task x Process.Id
attemptOpen router backoff ({ connection, socket } as internalSocket) =
let
goodOpen ws =
Expand All @@ -918,7 +933,7 @@ attemptOpen router backoff ({ connection, socket } as internalSocket) =
Process.spawn (after backoff &> actuallyAttemptOpen)


open : InternalSocket -> Platform.Router msg (Msg msg) -> Task WS.BadOpen WS.WebSocket
open : InternalSocket msg -> Platform.Router msg (Msg msg) -> Task WS.BadOpen WS.WebSocket
open socket router =
let
onMessage _ msg =
Expand Down
68 changes: 52 additions & 16 deletions src/Phoenix/Internal/Socket.elm
Original file line number Diff line number Diff line change
Expand Up @@ -25,30 +25,49 @@ type Connection
| Connected WS.WebSocket Int


type alias InternalSocket =
{ connection : Connection, socket : Socket.Socket }
type alias InternalSocket msg =
{ connection : Connection, socket : Socket.Socket msg }


internalSocket : Socket.Socket -> InternalSocket
internalSocket : Socket.Socket msg -> InternalSocket msg
internalSocket socket =
{ connection = Closed, socket = socket }



-- INSPECT


isOpening : InternalSocket msg -> Bool
isOpening internalSocket =
case internalSocket.connection of
Opening _ _ ->
True

_ ->
False



-- MODIFY


opening : Int -> Process.Id -> InternalSocket -> InternalSocket
socketMap : (a -> b) -> Socket.Socket a -> Socket.Socket b
socketMap func socket =
{ socket | onDie = Maybe.map func socket.onDie }


opening : Int -> Process.Id -> InternalSocket msg -> InternalSocket msg
opening backoff pid socket =
{ socket | connection = (Opening backoff pid) }


connected : WS.WebSocket -> InternalSocket -> InternalSocket
connected : WS.WebSocket -> InternalSocket msg -> InternalSocket msg
connected ws socket =
{ socket | connection = (Connected ws 0) }


increaseRef : InternalSocket -> InternalSocket
increaseRef : InternalSocket msg -> InternalSocket msg
increaseRef socket =
case socket.connection of
Connected ws ref ->
Expand All @@ -58,16 +77,33 @@ increaseRef socket =
socket


update : Socket.Socket -> InternalSocket -> InternalSocket
update socket { connection } =
InternalSocket connection socket
update : Socket.Socket msg -> InternalSocket msg -> InternalSocket msg
update nextSocket { connection, socket } =
let
updatedConnection =
if nextSocket.params /= socket.params then
resetBackoff connection
else
connection
in
InternalSocket updatedConnection nextSocket


resetBackoff : Connection -> Connection
resetBackoff connection =
case connection of
Opening backoff pid ->
Opening 0 pid

_ ->
connection



-- PUSH


push : Message -> InternalSocket -> Task x (Maybe Ref)
push : Message -> InternalSocket msg -> Task x (Maybe Ref)
push message { connection, socket } =
case connection of
Connected ws ref ->
Expand Down Expand Up @@ -105,7 +141,7 @@ push message { connection, socket } =
-- OPEN CONNECTIONs


open : InternalSocket -> WS.Settings -> Task WS.BadOpen WS.WebSocket
open : InternalSocket msg -> WS.Settings -> Task WS.BadOpen WS.WebSocket
open { socket } settings =
let
query =
Expand Down Expand Up @@ -134,7 +170,7 @@ after backoff =
-- CLOSE CONNECTIONS


close : InternalSocket -> Task x ()
close : InternalSocket msg -> Task x ()
close { connection } =
case connection of
Opening _ pid ->
Expand All @@ -151,17 +187,17 @@ close { connection } =
-- HELPERS


get : Endpoint -> Dict Endpoint InternalSocket -> Maybe InternalSocket
get : Endpoint -> Dict Endpoint (InternalSocket msg) -> Maybe (InternalSocket msg)
get endpoint dict =
Dict.get endpoint dict


getRef : Endpoint -> Dict Endpoint InternalSocket -> Maybe Ref
getRef : Endpoint -> Dict Endpoint (InternalSocket msg) -> Maybe Ref
getRef endpoint dict =
get endpoint dict |> Maybe.andThen ref


ref : InternalSocket -> Maybe Ref
ref : InternalSocket msg -> Maybe Ref
ref { connection } =
case connection of
Connected _ ref_ ->
Expand All @@ -171,7 +207,7 @@ ref { connection } =
Nothing


debugLogMessage : InternalSocket -> a -> a
debugLogMessage : InternalSocket msg -> a -> a
debugLogMessage { socket } msg =
if socket.debug then
Debug.log "Received" msg
Expand Down
Loading

0 comments on commit d38d6ff

Please sign in to comment.