From cf84552bc2bc89dc2e2fd2b75cbb87896fa2785c Mon Sep 17 00:00:00 2001 From: "W. Brian Gourlie" <bgourlie@gmail.com> Date: Sun, 12 Feb 2017 10:56:44 -0600 Subject: [PATCH 1/9] Remove message queuing --- elm-package.json | 2 +- src/WebSocket.elm | 74 ++++++++++------------------------------------- 2 files changed, 17 insertions(+), 59 deletions(-) diff --git a/elm-package.json b/elm-package.json index 5ff5ae2..caa118d 100644 --- a/elm-package.json +++ b/elm-package.json @@ -1,5 +1,5 @@ { - "version": "1.0.2", + "version": "2.0.0", "summary": "Persistent network connections, making client/server communication faster.", "repository": "http://github.com/elm-lang/websocket.git", "license": "BSD3", diff --git a/src/WebSocket.elm b/src/WebSocket.elm index 5aa256d..ca4e4d9 100644 --- a/src/WebSocket.elm +++ b/src/WebSocket.elm @@ -27,7 +27,6 @@ many unique connections to the same endpoint, you need a different library. import Dict import Process import Task exposing (Task) -import Time exposing (Time) import WebSocket.LowLevel as WS @@ -115,7 +114,6 @@ subMap func sub = type alias State msg = { sockets : SocketsDict - , queues : QueuesDict , subs : SubsDict msg } @@ -124,10 +122,6 @@ type alias SocketsDict = Dict.Dict String Connection -type alias QueuesDict = - Dict.Dict String (List String) - - type alias SubsDict msg = Dict.Dict String (List (String -> msg)) @@ -139,7 +133,7 @@ type Connection init : Task Never (State msg) init = - Task.succeed (State Dict.empty Dict.empty Dict.empty) + Task.succeed (State Dict.empty Dict.empty) @@ -158,52 +152,29 @@ onEffects -> Task Never (State msg) onEffects router cmds subs state = let - sendMessagesGetNewQueues = - sendMessagesHelp cmds state.sockets state.queues - newSubs = buildSubDict subs Dict.empty - cleanup newQueues = - let - newEntries = - Dict.union newQueues (Dict.map (\k v -> []) newSubs) + newEntries = + Dict.map (\k v -> []) newSubs - leftStep name _ getNewSockets = - getNewSockets - |> Task.andThen (\newSockets -> attemptOpen router 0 name - |> Task.andThen (\pid -> Task.succeed (Dict.insert name (Opening 0 pid) newSockets))) + leftStep name _ getNewSockets = + getNewSockets + |> Task.andThen (\newSockets -> attemptOpen router 0 name + |> Task.andThen (\pid -> Task.succeed (Dict.insert name (Opening 0 pid) newSockets))) - bothStep name _ connection getNewSockets = - Task.map (Dict.insert name connection) getNewSockets + bothStep name _ connection getNewSockets = + Task.map (Dict.insert name connection) getNewSockets - rightStep name connection getNewSockets = - closeConnection connection &> getNewSockets + rightStep name connection getNewSockets = + closeConnection connection &> getNewSockets - collectNewSockets = - Dict.merge leftStep bothStep rightStep newEntries state.sockets (Task.succeed Dict.empty) - in - collectNewSockets - |> Task.andThen (\newSockets -> Task.succeed (State newSockets newQueues newSubs)) + collectNewSockets = + Dict.merge leftStep bothStep rightStep newEntries state.sockets (Task.succeed Dict.empty) in - sendMessagesGetNewQueues - |> Task.andThen cleanup - + collectNewSockets + |> Task.andThen (\newSockets -> Task.succeed (State newSockets newSubs)) -sendMessagesHelp : List (MyCmd msg) -> SocketsDict -> QueuesDict -> Task x QueuesDict -sendMessagesHelp cmds socketsDict queuesDict = - case cmds of - [] -> - Task.succeed queuesDict - - Send name msg :: rest -> - case Dict.get name socketsDict of - Just (Connected socket) -> - WS.send socket msg - &> sendMessagesHelp rest socketsDict queuesDict - - _ -> - sendMessagesHelp rest socketsDict (Dict.update name (add msg) queuesDict) buildSubDict : List (MySub msg) -> SubsDict msg -> SubsDict msg @@ -262,15 +233,7 @@ onSelfMsg router selfMsg state = |> Task.andThen (\pid -> Task.succeed (updateSocket name (Opening 0 pid) state)) GoodOpen name socket -> - case Dict.get name state.queues of - Nothing -> - Task.succeed (updateSocket name (Connected socket) state) - - Just messages -> - List.foldl - (\msg task -> WS.send socket msg &> task) - (Task.succeed (removeQueue name (updateSocket name (Connected socket) state))) - messages + Task.succeed state BadOpen name -> case Dict.get name state.sockets of @@ -290,11 +253,6 @@ updateSocket name connection state = { state | sockets = Dict.insert name connection state.sockets } -removeQueue : String -> State msg -> State msg -removeQueue name state = - { state | queues = Dict.remove name state.queues } - - -- OPENING WEBSOCKETS WITH EXPONENTIAL BACKOFF From 55f46b8d5fe70f4ef57b18fff3c321d53328f906 Mon Sep 17 00:00:00 2001 From: "W. Brian Gourlie" <bgourlie@gmail.com> Date: Sun, 12 Feb 2017 11:40:39 -0600 Subject: [PATCH 2/9] Remove KeepAlive functionality --- src/WebSocket.elm | 26 +------------------------- 1 file changed, 1 insertion(+), 25 deletions(-) diff --git a/src/WebSocket.elm b/src/WebSocket.elm index ca4e4d9..5693f2c 100644 --- a/src/WebSocket.elm +++ b/src/WebSocket.elm @@ -1,7 +1,6 @@ effect module WebSocket where { command = MyCmd, subscription = MySub } exposing ( send , listen - , keepAlive ) {-| Web sockets make it cheaper to talk to your servers. @@ -20,7 +19,7 @@ The API here attempts to cover the typical usage scenarios, but if you need many unique connections to the same endpoint, you need a different library. # Web Sockets -@docs listen, keepAlive, send +@docs listen, send -} @@ -62,7 +61,6 @@ cmdMap _ (Send url msg) = type MySub msg = Listen String (String -> msg) - | KeepAlive String {-| Subscribe to any incoming messages on a websocket. You might say something @@ -82,31 +80,12 @@ listen url tagger = subscription (Listen url tagger) -{-| Keep a connection alive, but do not report any messages. This is useful -for keeping a connection open for when you only need to `send` messages. So -you might say something like this: - - subscriptions model = - keepAlive "ws://echo.websocket.org" - -**Note:** If the connection goes down, the effect manager tries to reconnect -with an exponential backoff strategy. Any messages you try to `send` while the -connection is down are queued and will be sent as soon as possible. --} -keepAlive : String -> Sub msg -keepAlive url = - subscription (KeepAlive url) - - subMap : (a -> b) -> MySub a -> MySub b subMap func sub = case sub of Listen url tagger -> Listen url (tagger >> func) - KeepAlive url -> - KeepAlive url - -- MANAGER @@ -186,9 +165,6 @@ buildSubDict subs dict = Listen name tagger :: rest -> buildSubDict rest (Dict.update name (add tagger) dict) - KeepAlive name :: rest -> - buildSubDict rest (Dict.update name (Just << Maybe.withDefault []) dict) - add : a -> Maybe (List a) -> Maybe (List a) add value maybeList = From fb2d5d857ac5766cffddf465b9532defaa615bd1 Mon Sep 17 00:00:00 2001 From: "W. Brian Gourlie" <bgourlie@gmail.com> Date: Sun, 12 Feb 2017 12:25:30 -0600 Subject: [PATCH 3/9] Refactor to category-based subscriptions --- src/WebSocket.elm | 60 ++++++++++++++++++++++++++++++----------------- 1 file changed, 38 insertions(+), 22 deletions(-) diff --git a/src/WebSocket.elm b/src/WebSocket.elm index 5693f2c..d3152f7 100644 --- a/src/WebSocket.elm +++ b/src/WebSocket.elm @@ -60,7 +60,7 @@ cmdMap _ (Send url msg) = type MySub msg - = Listen String (String -> msg) + = MySub String String (String -> msg) {-| Subscribe to any incoming messages on a websocket. You might say something @@ -77,14 +77,14 @@ connection is down are queued and will be sent as soon as possible. -} listen : String -> (String -> msg) -> Sub msg listen url tagger = - subscription (Listen url tagger) + subscription (MySub "listen" url tagger) subMap : (a -> b) -> MySub a -> MySub b subMap func sub = case sub of - Listen url tagger -> - Listen url (tagger >> func) + MySub category url tagger -> + MySub category url (tagger >> func) @@ -102,7 +102,7 @@ type alias SocketsDict = type alias SubsDict msg = - Dict.Dict String (List (String -> msg)) + Dict.Dict String (Dict.Dict String (String -> msg)) type Connection @@ -135,17 +135,17 @@ onEffects router cmds subs state = buildSubDict subs Dict.empty newEntries = - Dict.map (\k v -> []) newSubs + buildEntriesDict subs Dict.empty - leftStep name _ getNewSockets = + leftStep category _ getNewSockets = getNewSockets - |> Task.andThen (\newSockets -> attemptOpen router 0 name - |> Task.andThen (\pid -> Task.succeed (Dict.insert name (Opening 0 pid) newSockets))) + |> Task.andThen (\newSockets -> attemptOpen router 0 category + |> Task.andThen (\pid -> Task.succeed (Dict.insert category (Opening 0 pid) newSockets))) - bothStep name _ connection getNewSockets = - Task.map (Dict.insert name connection) getNewSockets + bothStep category _ connection getNewSockets = + Task.map (Dict.insert category connection) getNewSockets - rightStep name connection getNewSockets = + rightStep category connection getNewSockets = closeConnection connection &> getNewSockets collectNewSockets = @@ -162,18 +162,33 @@ buildSubDict subs dict = [] -> dict - Listen name tagger :: rest -> - buildSubDict rest (Dict.update name (add tagger) dict) + MySub category name tagger :: rest -> + buildSubDict rest (Dict.update category (set (name, tagger)) dict) -add : a -> Maybe (List a) -> Maybe (List a) -add value maybeList = - case maybeList of +buildEntriesDict : List (MySub msg) -> Dict.Dict String (List a) -> Dict.Dict String (List a) +buildEntriesDict subs dict = + case subs of + [] -> + dict + + MySub category name tagger :: rest -> + case category of + "listen" -> + buildEntriesDict rest (Dict.update name (Just << Maybe.withDefault []) dict) + + _ -> + buildEntriesDict rest dict + + +set : (comparable, b) -> Maybe (Dict.Dict comparable b) -> Maybe (Dict.Dict comparable b) +set value maybeDict = + case maybeDict of Nothing -> - Just [value] + Just (Dict.fromList [value]) Just list -> - Just (value :: list) + Just (Dict.fromList [value]) @@ -193,9 +208,10 @@ onSelfMsg router selfMsg state = Receive name str -> let sends = - Dict.get name state.subs - |> Maybe.withDefault [] - |> List.map (\tagger -> Platform.sendToApp router (tagger str)) + Dict.get "listen" state.subs + |> Maybe.withDefault Dict.empty + |> Dict.toList + |> List.map (\(_, tagger) -> Platform.sendToApp router (tagger str)) in Task.sequence sends &> Task.succeed state From dbebe20e9b72cd3b1c0b88497d7d7ae1a773ccd6 Mon Sep 17 00:00:00 2001 From: "W. Brian Gourlie" <bgourlie@gmail.com> Date: Sun, 12 Feb 2017 12:26:57 -0600 Subject: [PATCH 4/9] Update documentation --- src/WebSocket.elm | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/src/WebSocket.elm b/src/WebSocket.elm index d3152f7..7b1976a 100644 --- a/src/WebSocket.elm +++ b/src/WebSocket.elm @@ -42,8 +42,8 @@ type MyCmd msg send "ws://echo.websocket.org" "Hello!" **Note:** It is important that you are also subscribed to this address with -`listen` or `keepAlive`. If you are not, the web socket will be created to -send one message and then closed. Not good! +`listen`. If you are not, the web socket will be created to send one message +and then closed. Not good! -} send : String -> String -> Cmd msg send url message = @@ -72,8 +72,7 @@ like this: listen "ws://echo.websocket.org" Echo **Note:** If the connection goes down, the effect manager tries to reconnect -with an exponential backoff strategy. Any messages you try to `send` while the -connection is down are queued and will be sent as soon as possible. +with an exponential backoff strategy. -} listen : String -> (String -> msg) -> Sub msg listen url tagger = From bd46dd2a4ce3c17751e2b38064a97f2f02758267 Mon Sep 17 00:00:00 2001 From: "W. Brian Gourlie" <bgourlie@gmail.com> Date: Sun, 12 Feb 2017 12:38:59 -0600 Subject: [PATCH 5/9] Add onOpen/onClose subscriptions --- src/WebSocket.elm | 51 +++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 47 insertions(+), 4 deletions(-) diff --git a/src/WebSocket.elm b/src/WebSocket.elm index 7b1976a..67b2548 100644 --- a/src/WebSocket.elm +++ b/src/WebSocket.elm @@ -1,6 +1,8 @@ effect module WebSocket where { command = MyCmd, subscription = MySub } exposing ( send , listen + , onOpen + , onClose ) {-| Web sockets make it cheaper to talk to your servers. @@ -19,7 +21,7 @@ The API here attempts to cover the typical usage scenarios, but if you need many unique connections to the same endpoint, you need a different library. # Web Sockets -@docs listen, send +@docs listen, onOpen, onClose, send -} @@ -79,6 +81,32 @@ listen url tagger = subscription (MySub "listen" url tagger) +{-| Subscribe to websocket open events. You might say something +like this: + + type Msg = WsOpened String | ... + + subscriptions model = + onOpen WsOpened +-} +onOpen : (String -> msg) -> Sub msg +onOpen tagger = + subscription (MySub "onOpen" "" tagger) + + +{-| Subscribe to websocket close events. You might say something +like this: + + type Msg = WsClosed String | ... + + subscriptions model = + onClose WsClosed +-} +onClose : (String -> msg) -> Sub msg +onClose tagger = + subscription (MySub "onClose" "" tagger) + + subMap : (a -> b) -> MySub a -> MySub b subMap func sub = case sub of @@ -220,11 +248,26 @@ onSelfMsg router selfMsg state = Task.succeed state Just _ -> - attemptOpen router 0 name - |> Task.andThen (\pid -> Task.succeed (updateSocket name (Opening 0 pid) state)) + let + sends = + Dict.get "onClose" state.subs + |> Maybe.withDefault Dict.empty + |> Dict.toList + |> List.map (\(_, tagger) -> Platform.sendToApp router (tagger name)) + in + Task.sequence sends + |> Task.andThen (\_ -> attemptOpen router 0 name) + |> Task.andThen (\pid -> Task.succeed (updateSocket name (Opening 0 pid) state)) GoodOpen name socket -> - Task.succeed state + let + sends = + Dict.get "onOpen" state.subs + |> Maybe.withDefault Dict.empty + |> Dict.toList + |> List.map (\(_, tagger) -> Platform.sendToApp router (tagger name)) + in + Task.sequence sends &> Task.succeed state BadOpen name -> case Dict.get name state.sockets of From c4994fe53584d0b1b7317a73fd02985a2f0c169c Mon Sep 17 00:00:00 2001 From: "W. Brian Gourlie" <bgourlie@gmail.com> Date: Sun, 12 Feb 2017 17:23:19 -0600 Subject: [PATCH 6/9] Add connect command --- src/WebSocket.elm | 148 ++++++++++++++++++++++++++++------------------ 1 file changed, 92 insertions(+), 56 deletions(-) diff --git a/src/WebSocket.elm b/src/WebSocket.elm index 67b2548..06e75bc 100644 --- a/src/WebSocket.elm +++ b/src/WebSocket.elm @@ -1,8 +1,10 @@ effect module WebSocket where { command = MyCmd, subscription = MySub } exposing ( send , listen + , connect , onOpen , onClose + , Error(..) ) {-| Web sockets make it cheaper to talk to your servers. @@ -21,7 +23,7 @@ The API here attempts to cover the typical usage scenarios, but if you need many unique connections to the same endpoint, you need a different library. # Web Sockets -@docs listen, onOpen, onClose, send +@docs connect, listen, send, onOpen, onClose, Error -} @@ -37,6 +39,19 @@ import WebSocket.LowLevel as WS type MyCmd msg = Send String String + | Connect String + + +-- ERRORS + + +{-| The `connect` and `send` functions may fail for a variety of reasons. +In each case, the browser will provide a string with additional information. +-} +type Error + = ConnectFailed + | SendFailed + {-| Send a message to a particular address. You might say something like this: @@ -52,10 +67,26 @@ send url message = command (Send url message) +{-| Attempt to connect to a particular address. You might say something like this: + + connect "ws://echo.websocket.org" + +**Note:** It is important that you are also subscribed to this address with +`listen` if you want to handle messages from the connection! +-} +connect : String -> Cmd msg +connect url = + command (Connect url) + + cmdMap : (a -> b) -> MyCmd a -> MyCmd b -cmdMap _ (Send url msg) = - Send url msg +cmdMap _ cmd = + case cmd of + Send url msg -> + Send url msg + Connect url -> + Connect url -- SUBSCRIPTIONS @@ -73,8 +104,6 @@ like this: subscriptions model = listen "ws://echo.websocket.org" Echo -**Note:** If the connection goes down, the effect manager tries to reconnect -with an exponential backoff strategy. -} listen : String -> (String -> msg) -> Sub msg listen url tagger = @@ -133,7 +162,7 @@ type alias SubsDict msg = type Connection - = Opening Int Process.Id + = Opening Process.Id | Connected WS.WebSocket @@ -164,24 +193,47 @@ onEffects router cmds subs state = newEntries = buildEntriesDict subs Dict.empty - leftStep category _ getNewSockets = + leftStep name _ getNewSockets = getNewSockets - |> Task.andThen (\newSockets -> attemptOpen router 0 category - |> Task.andThen (\pid -> Task.succeed (Dict.insert category (Opening 0 pid) newSockets))) - bothStep category _ connection getNewSockets = - Task.map (Dict.insert category connection) getNewSockets + bothStep name _ connection getNewSockets = + Task.map (Dict.insert name connection) getNewSockets - rightStep category connection getNewSockets = + rightStep name connection getNewSockets = closeConnection connection &> getNewSockets - - collectNewSockets = - Dict.merge leftStep bothStep rightStep newEntries state.sockets (Task.succeed Dict.empty) in - collectNewSockets + Dict.merge leftStep bothStep rightStep newEntries state.sockets (Task.succeed Dict.empty) + |> Task.andThen (\newSockets -> cmdHelp router cmds newSockets) |> Task.andThen (\newSockets -> Task.succeed (State newSockets newSubs)) +cmdHelp : Platform.Router msg Msg -> List (MyCmd msg) -> SocketsDict -> Task Never SocketsDict +cmdHelp router cmds socketsDict = + case cmds of + [] -> + Task.succeed socketsDict + + Send name msg :: rest -> + case Dict.get name socketsDict of + Just (Connected socket) -> + WS.send socket msg + &> cmdHelp router rest socketsDict + + _ -> + Task.succeed socketsDict + --Task.fail SendFailed -- Not connected + + Connect name :: rest -> + case Dict.get name socketsDict of + Just (Connected _) -> + Task.succeed socketsDict + --Task.fail ConnectFailed -- already connected + + _ -> + attemptOpen router name + |> Task.andThen (\pid -> Task.succeed (Dict.insert name (Opening pid) socketsDict)) + + buildSubDict : List (MySub msg) -> SubsDict msg -> SubsDict msg buildSubDict subs dict = @@ -225,8 +277,7 @@ set value maybeDict = type Msg = Receive String String | Die String - | GoodOpen String WS.WebSocket - | BadOpen String + | Open String WS.WebSocket onSelfMsg : Platform.Router msg Msg -> Msg -> State msg -> Task Never (State msg) @@ -247,19 +298,23 @@ onSelfMsg router selfMsg state = Nothing -> Task.succeed state - Just _ -> + Just conn -> let sends = - Dict.get "onClose" state.subs - |> Maybe.withDefault Dict.empty - |> Dict.toList - |> List.map (\(_, tagger) -> Platform.sendToApp router (tagger name)) + case conn of + Connected _ -> + Dict.get "onClose" state.subs + |> Maybe.withDefault Dict.empty + |> Dict.toList + |> List.map (\(_, tagger) -> Platform.sendToApp router (tagger name)) + + Opening _ -> -- Don't report close events if we never actually connected + [] in - Task.sequence sends - |> Task.andThen (\_ -> attemptOpen router 0 name) - |> Task.andThen (\pid -> Task.succeed (updateSocket name (Opening 0 pid) state)) + Task.sequence sends &> Task.succeed state + |> Task.andThen (\_ -> Task.succeed (removeSocket name state)) - GoodOpen name socket -> + Open name socket -> let sends = Dict.get "onOpen" state.subs @@ -268,18 +323,12 @@ onSelfMsg router selfMsg state = |> List.map (\(_, tagger) -> Platform.sendToApp router (tagger name)) in Task.sequence sends &> Task.succeed state + |> Task.andThen (\_ -> Task.succeed (updateSocket name (Connected socket) state)) - BadOpen name -> - case Dict.get name state.sockets of - Nothing -> - Task.succeed state - Just (Opening n _) -> - attemptOpen router (n + 1) name - |> Task.andThen (\pid -> Task.succeed (updateSocket name (Opening (n + 1) pid) state)) - - Just (Connected _) -> - Task.succeed state +removeSocket : String -> State msg -> State msg +removeSocket name state = + { state | sockets = Dict.remove name state.sockets } updateSocket : String -> Connection -> State msg -> State msg @@ -287,25 +336,21 @@ updateSocket name connection state = { state | sockets = Dict.insert name connection state.sockets } - --- OPENING WEBSOCKETS WITH EXPONENTIAL BACKOFF - - -attemptOpen : Platform.Router msg Msg -> Int -> String -> Task x Process.Id -attemptOpen router backoff name = +attemptOpen : Platform.Router msg Msg -> String -> Task x Process.Id +attemptOpen router name = let goodOpen ws = - Platform.sendToSelf router (GoodOpen name ws) + Platform.sendToSelf router (Open name ws) badOpen _ = - Platform.sendToSelf router (BadOpen name) + Platform.sendToSelf router (Die name) actuallyAttemptOpen = open name router |> Task.andThen goodOpen |> Task.onError badOpen in - Process.spawn (after backoff &> actuallyAttemptOpen) + Process.spawn actuallyAttemptOpen open : String -> Platform.Router msg Msg -> Task WS.BadOpen WS.WebSocket @@ -316,15 +361,6 @@ open name router = } -after : Int -> Task x () -after backoff = - if backoff < 1 then - Task.succeed () - - else - Process.sleep (toFloat (10 * 2 ^ backoff)) - - -- CLOSE CONNECTIONS @@ -332,7 +368,7 @@ after backoff = closeConnection : Connection -> Task x () closeConnection connection = case connection of - Opening _ pid -> + Opening pid -> Process.kill pid Connected socket -> From 4f735d9f775b46a3e531d1aa7f097bf3f0df807e Mon Sep 17 00:00:00 2001 From: "W. Brian Gourlie" <bgourlie@gmail.com> Date: Sun, 12 Feb 2017 17:44:21 -0600 Subject: [PATCH 7/9] Revert "Add connect command" This reverts commit c4994fe53584d0b1b7317a73fd02985a2f0c169c. --- src/WebSocket.elm | 148 ++++++++++++++++++---------------------------- 1 file changed, 56 insertions(+), 92 deletions(-) diff --git a/src/WebSocket.elm b/src/WebSocket.elm index 06e75bc..67b2548 100644 --- a/src/WebSocket.elm +++ b/src/WebSocket.elm @@ -1,10 +1,8 @@ effect module WebSocket where { command = MyCmd, subscription = MySub } exposing ( send , listen - , connect , onOpen , onClose - , Error(..) ) {-| Web sockets make it cheaper to talk to your servers. @@ -23,7 +21,7 @@ The API here attempts to cover the typical usage scenarios, but if you need many unique connections to the same endpoint, you need a different library. # Web Sockets -@docs connect, listen, send, onOpen, onClose, Error +@docs listen, onOpen, onClose, send -} @@ -39,19 +37,6 @@ import WebSocket.LowLevel as WS type MyCmd msg = Send String String - | Connect String - - --- ERRORS - - -{-| The `connect` and `send` functions may fail for a variety of reasons. -In each case, the browser will provide a string with additional information. --} -type Error - = ConnectFailed - | SendFailed - {-| Send a message to a particular address. You might say something like this: @@ -67,26 +52,10 @@ send url message = command (Send url message) -{-| Attempt to connect to a particular address. You might say something like this: - - connect "ws://echo.websocket.org" - -**Note:** It is important that you are also subscribed to this address with -`listen` if you want to handle messages from the connection! --} -connect : String -> Cmd msg -connect url = - command (Connect url) - - cmdMap : (a -> b) -> MyCmd a -> MyCmd b -cmdMap _ cmd = - case cmd of - Send url msg -> - Send url msg +cmdMap _ (Send url msg) = + Send url msg - Connect url -> - Connect url -- SUBSCRIPTIONS @@ -104,6 +73,8 @@ like this: subscriptions model = listen "ws://echo.websocket.org" Echo +**Note:** If the connection goes down, the effect manager tries to reconnect +with an exponential backoff strategy. -} listen : String -> (String -> msg) -> Sub msg listen url tagger = @@ -162,7 +133,7 @@ type alias SubsDict msg = type Connection - = Opening Process.Id + = Opening Int Process.Id | Connected WS.WebSocket @@ -193,47 +164,24 @@ onEffects router cmds subs state = newEntries = buildEntriesDict subs Dict.empty - leftStep name _ getNewSockets = + leftStep category _ getNewSockets = getNewSockets + |> Task.andThen (\newSockets -> attemptOpen router 0 category + |> Task.andThen (\pid -> Task.succeed (Dict.insert category (Opening 0 pid) newSockets))) - bothStep name _ connection getNewSockets = - Task.map (Dict.insert name connection) getNewSockets + bothStep category _ connection getNewSockets = + Task.map (Dict.insert category connection) getNewSockets - rightStep name connection getNewSockets = + rightStep category connection getNewSockets = closeConnection connection &> getNewSockets + + collectNewSockets = + Dict.merge leftStep bothStep rightStep newEntries state.sockets (Task.succeed Dict.empty) in - Dict.merge leftStep bothStep rightStep newEntries state.sockets (Task.succeed Dict.empty) - |> Task.andThen (\newSockets -> cmdHelp router cmds newSockets) + collectNewSockets |> Task.andThen (\newSockets -> Task.succeed (State newSockets newSubs)) -cmdHelp : Platform.Router msg Msg -> List (MyCmd msg) -> SocketsDict -> Task Never SocketsDict -cmdHelp router cmds socketsDict = - case cmds of - [] -> - Task.succeed socketsDict - - Send name msg :: rest -> - case Dict.get name socketsDict of - Just (Connected socket) -> - WS.send socket msg - &> cmdHelp router rest socketsDict - - _ -> - Task.succeed socketsDict - --Task.fail SendFailed -- Not connected - - Connect name :: rest -> - case Dict.get name socketsDict of - Just (Connected _) -> - Task.succeed socketsDict - --Task.fail ConnectFailed -- already connected - - _ -> - attemptOpen router name - |> Task.andThen (\pid -> Task.succeed (Dict.insert name (Opening pid) socketsDict)) - - buildSubDict : List (MySub msg) -> SubsDict msg -> SubsDict msg buildSubDict subs dict = @@ -277,7 +225,8 @@ set value maybeDict = type Msg = Receive String String | Die String - | Open String WS.WebSocket + | GoodOpen String WS.WebSocket + | BadOpen String onSelfMsg : Platform.Router msg Msg -> Msg -> State msg -> Task Never (State msg) @@ -298,23 +247,19 @@ onSelfMsg router selfMsg state = Nothing -> Task.succeed state - Just conn -> + Just _ -> let sends = - case conn of - Connected _ -> - Dict.get "onClose" state.subs - |> Maybe.withDefault Dict.empty - |> Dict.toList - |> List.map (\(_, tagger) -> Platform.sendToApp router (tagger name)) - - Opening _ -> -- Don't report close events if we never actually connected - [] + Dict.get "onClose" state.subs + |> Maybe.withDefault Dict.empty + |> Dict.toList + |> List.map (\(_, tagger) -> Platform.sendToApp router (tagger name)) in - Task.sequence sends &> Task.succeed state - |> Task.andThen (\_ -> Task.succeed (removeSocket name state)) + Task.sequence sends + |> Task.andThen (\_ -> attemptOpen router 0 name) + |> Task.andThen (\pid -> Task.succeed (updateSocket name (Opening 0 pid) state)) - Open name socket -> + GoodOpen name socket -> let sends = Dict.get "onOpen" state.subs @@ -323,12 +268,18 @@ onSelfMsg router selfMsg state = |> List.map (\(_, tagger) -> Platform.sendToApp router (tagger name)) in Task.sequence sends &> Task.succeed state - |> Task.andThen (\_ -> Task.succeed (updateSocket name (Connected socket) state)) + BadOpen name -> + case Dict.get name state.sockets of + Nothing -> + Task.succeed state -removeSocket : String -> State msg -> State msg -removeSocket name state = - { state | sockets = Dict.remove name state.sockets } + Just (Opening n _) -> + attemptOpen router (n + 1) name + |> Task.andThen (\pid -> Task.succeed (updateSocket name (Opening (n + 1) pid) state)) + + Just (Connected _) -> + Task.succeed state updateSocket : String -> Connection -> State msg -> State msg @@ -336,21 +287,25 @@ updateSocket name connection state = { state | sockets = Dict.insert name connection state.sockets } -attemptOpen : Platform.Router msg Msg -> String -> Task x Process.Id -attemptOpen router name = + +-- OPENING WEBSOCKETS WITH EXPONENTIAL BACKOFF + + +attemptOpen : Platform.Router msg Msg -> Int -> String -> Task x Process.Id +attemptOpen router backoff name = let goodOpen ws = - Platform.sendToSelf router (Open name ws) + Platform.sendToSelf router (GoodOpen name ws) badOpen _ = - Platform.sendToSelf router (Die name) + Platform.sendToSelf router (BadOpen name) actuallyAttemptOpen = open name router |> Task.andThen goodOpen |> Task.onError badOpen in - Process.spawn actuallyAttemptOpen + Process.spawn (after backoff &> actuallyAttemptOpen) open : String -> Platform.Router msg Msg -> Task WS.BadOpen WS.WebSocket @@ -361,6 +316,15 @@ open name router = } +after : Int -> Task x () +after backoff = + if backoff < 1 then + Task.succeed () + + else + Process.sleep (toFloat (10 * 2 ^ backoff)) + + -- CLOSE CONNECTIONS @@ -368,7 +332,7 @@ open name router = closeConnection : Connection -> Task x () closeConnection connection = case connection of - Opening pid -> + Opening _ pid -> Process.kill pid Connected socket -> From 2ca4dba64841dd30f1673a6af2393401ec1ff34b Mon Sep 17 00:00:00 2001 From: "W. Brian Gourlie" <bgourlie@gmail.com> Date: Sun, 12 Feb 2017 18:18:04 -0600 Subject: [PATCH 8/9] Fix backoff --- src/WebSocket.elm | 24 +++++++++++++++++++----- 1 file changed, 19 insertions(+), 5 deletions(-) diff --git a/src/WebSocket.elm b/src/WebSocket.elm index 67b2548..ce130fe 100644 --- a/src/WebSocket.elm +++ b/src/WebSocket.elm @@ -247,7 +247,7 @@ onSelfMsg router selfMsg state = Nothing -> Task.succeed state - Just _ -> + Just (Connected _) -> let sends = Dict.get "onClose" state.subs @@ -256,9 +256,12 @@ onSelfMsg router selfMsg state = |> List.map (\(_, tagger) -> Platform.sendToApp router (tagger name)) in Task.sequence sends - |> Task.andThen (\_ -> attemptOpen router 0 name) + &> attemptOpen router 0 name |> Task.andThen (\pid -> Task.succeed (updateSocket name (Opening 0 pid) state)) + Just (Opening n _) -> + retryConnection router n name state + GoodOpen name socket -> let sends = @@ -267,7 +270,8 @@ onSelfMsg router selfMsg state = |> Dict.toList |> List.map (\(_, tagger) -> Platform.sendToApp router (tagger name)) in - Task.sequence sends &> Task.succeed state + Task.sequence sends + &> Task.succeed (updateSocket name (Connected socket) state) BadOpen name -> case Dict.get name state.sockets of @@ -275,13 +279,23 @@ onSelfMsg router selfMsg state = Task.succeed state Just (Opening n _) -> - attemptOpen router (n + 1) name - |> Task.andThen (\pid -> Task.succeed (updateSocket name (Opening (n + 1) pid) state)) + retryConnection router n name state Just (Connected _) -> Task.succeed state +retryConnection + : Platform.Router msg Msg + -> Int + -> String + -> State msg + -> Task x (State msg) +retryConnection router n name state = + attemptOpen router (n + 1) name + |> Task.andThen (\pid -> Task.succeed (updateSocket name (Opening (n + 1) pid) state)) + + updateSocket : String -> Connection -> State msg -> State msg updateSocket name connection state = { state | sockets = Dict.insert name connection state.sockets } From fd4a03ff7cd5376aa9b3880d4420720f7b7f9790 Mon Sep 17 00:00:00 2001 From: "W. Brian Gourlie" <bgourlie@gmail.com> Date: Sun, 12 Feb 2017 18:27:24 -0600 Subject: [PATCH 9/9] Fix sending --- src/WebSocket.elm | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/src/WebSocket.elm b/src/WebSocket.elm index ce130fe..c7edc93 100644 --- a/src/WebSocket.elm +++ b/src/WebSocket.elm @@ -178,10 +178,27 @@ onEffects router cmds subs state = collectNewSockets = Dict.merge leftStep bothStep rightStep newEntries state.sockets (Task.succeed Dict.empty) in - collectNewSockets + cmdHelp router cmds state.sockets + &> collectNewSockets |> Task.andThen (\newSockets -> Task.succeed (State newSockets newSubs)) +cmdHelp : Platform.Router msg Msg -> List (MyCmd msg) -> SocketsDict -> Task Never SocketsDict +cmdHelp router cmds socketsDict = + case cmds of + [] -> + Task.succeed socketsDict + + Send name msg :: rest -> + case Dict.get name socketsDict of + Just (Connected socket) -> + WS.send socket msg + &> cmdHelp router rest socketsDict + + _ -> + -- TODO: Since messages are no longer queued, this probably shouldn't just succeed + Task.succeed socketsDict + buildSubDict : List (MySub msg) -> SubsDict msg -> SubsDict msg buildSubDict subs dict =