Surrogate WebSockets Alongside Rails
ActionCable is coming to Rails 5 and brings with it the promise of using WebSockets directly in Rails.
Ruby has a notoriously bad concurrency story, and that certainly extends into the realm of WebSockets and pubsub. ActionCable may be suitable for a small number of authenticated sessions, but scaling persistent connections to thousands or tens-of-thousands won’t be easy. That’s all right, the beauty of the internet is that you can mix and match technologies as you see fit!
There are plenty of other languages that your application can lean on to achieve real-time over WebSockets. In the recent past, you may have reached to Node.js for this kind of work, but there is a better option.
Pubsub is by its nature a distributed system, and it’s no secret that Erlang is a superstar in the distributed systems realm. Rather than reach for Erlang directly, we’ll leverage Elixir’s productive tooling to build out a WebSocket microservice.
By using a surrogate microservice, your Rails application remains the authority on business logic. It still handles all database interactions, sending email, and plugs away at the other work Rails excels at — it just doesn’t hold thousands of connections.
Gluing PubSub Together
You may recall from my previous article on service communication with pubsub that it is very easy to publish messages from Ruby. As a refresher, here is the simple Publisher class:
require 'redis'
class Publisher
def publish(channel, message)
Redis.current.publish(channel, message)
end
endThe Rails application will publish score updates whenever data changes:
# from a controller or a background worker
publisher.publish('teams-123-scores', score.to_json)Publishing is the easy part though! So long as you have a consistent naming scheme for channels and serialize your messages, it is very straightforward. What about the Elixir side?
Getting Started With Compatibility
In the Elixir world, Phoenix is an obvious choice for hosting surrogate WebSocket connections. There are two pubsub implementations broadly available for Phoenix: Redis and PG2 (using Erlang’s distributed named process groups).
As it turns out, both modules rely on Erlang encoded terms, so neither are conducive to communication outside the Erlang ecosystem. Developing an alternate pubsub module for Phoenix is beyond the scope of this article. Instead, we’ll build a simple system that allows Ruby and Elixir to communicate over pubsub with a minimal API.
Setting Up Elixir and Redis PubSub
From here on, I’ll assume you are familiar with Elixir, the mix build tool, and testing with ExUnit. Our first step is to create a surrogate project:
mix new surrogate && cd surrogate
Now we’ll install the Redis client. There are a few different Redis clients in the Erlang world, but we’ll be using the native Elixir client Redix. It is easy to configure, easy to test, and wonderfully written. Open the generated mix.exs and modify the deps:
defp deps do
[{:redix, "~> 0.3"}]
endNow retrieve the dependencies:
mix deps.get
That’s enough to get the project set up and explore pubsub with a small test. The only functions our module needs are subscribe/1, unsubscribe/1, and possibly broadcast/2. That makes it consistent with the Phoenix.PubSub API. Our test gets started by testing subscribe/1:
defmodule SurrogateTest do
use ExUnit.Case
alias Surrogate.PubSub
test "subscribe/1 links to a topic" do
{:ok, _pid} = PubSub.start_link
:ok = PubSub.subscribe("topic.1")
:ok = PubSub.subscribe("topic.2")
{:ok, other_conn} = Redix.start_link
{:ok, _} = Redix.command(other_conn, ~w(PUBLISH topic.1 hello))
{:ok, _} = Redix.command(other_conn, ~w(PUBLISH topic.2 world))
assert_receive "hello"
assert_receive "world"
endThe test first establishes a link to a new surrogate server and then uses the subscribe/1 API to register the test process for published messages. Next it starts another link directly through Redix and publishes messages on both of the subscribed topics. Finally, it verifies that both of the simple hello and world messages were received by the test process.
Actually defining the Surrogate.PubSub module is a bit more involved. Retaining a list of subscribed processes is essential, as every connection from the web is a separate process. In order to retain state, we must use an Agent or a GenServer. As our module combines state and functions to manipulate that state, we will use the GenServer behaviour.
defmodule Surrogate.PubSub do
use GenServer
def start_link(opts \\ []) do
GenServer.start_link(__MODULE__, opts, [name: __MODULE__])
end
## Callbacks
def init(opts) do
{:ok, conn} = Redix.PubSub.start_link(opts)
{:ok, %{conn: conn, subs: %{}}}
end
endThe GenServer behavior is mixed in via use, and we then define the start_link/1 convenience function. As soon as the process is started, it links itself to Redix.PubSub and retains a connection to Redis. Now we’re ready to add the subscribe/1 api method.
def subscribe(topic) do
GenServer.call(__MODULE__, {:subscribe, topic})
end
## Callbacks
def handle_call({:subscribe, topic}, {pid, _}, %{conn: conn, subs: subs} = state) do
:ok = Redix.PubSub.subscribe(conn, topic, self())
tops = Map.get(subs, pid, MapSet.new)
subs = Map.put(subs, pid, MapSet.put(tops, topic))
{:reply, :ok, %{state | subs: subs}}
endSubscribe issues a handle_call/3, part of the GenServer behaviour, to synchronously subscribe the caller process to a topic. Simultaneously it adds the calling process (seen here as {pid, _}) to the server’s state.
Every process is a key in the subs map, and its value is a set of all the topics the process is subscribed to. This mapping is critical for receiving published messages and managing unsubscribes later.
Running the tests now doesn’t quite work, because the Redix.PubSub module sends out-of-band messages back to the pubsub process, and they aren’t being handled. To handle these out-of-band messages, we have to use the handle_info/2 callback:
def handle_info({:redix_pubsub, :message, message, topic}, %{subs: subs} = state) do
for {pid, topics} <- subs do
if MapSet.member?(topics, topic), do: send pid, message
end
{:noreply, state}
end
def handle_info(_msg, state) do
{:noreply, state}
endThere! Now the module can properly handle subscriptions and forwarding messages to the subscribed processes.
mix test test/surrogate_test.exs Compiled lib/surrogate.ex . Finished in 0.1 seconds (0.06s on load, 0.04s on tests) 1 test, 0 failures Randomized with seed 117850
What Else Does PubSub Need?
The subscribe/1 function is almost enough for one-way communication between the Rails app and the Surrogate. There isn’t any need to support broadcast/2 (which would send messages back upstream), leaving unsubscribe/1.
When the current process is unsubscribed, it should no longer receive any forwarded messages. The implementation of unsubscribe/1 is simple, effectively subscribe/1 in reverse. First, add a new test case:
test "unsubscribe/1 unlinks a topic" do
{:ok, _pid} = PubSub.start_link
:ok = PubSub.subscribe("topic.1")
:ok = PubSub.unsubscribe("topic.1")
{:ok, other_conn} = Redix.start_link
{:ok, _} = Redix.command(other_conn, ~w(PUBLISH topic.1 hello))
refute_receive "hello"
endThe test refutes that any message is broadcast after unsubscribing to "topic.1". Of course this initially fails — now we make it pass:
def unsubscribe(topic) do
GenServer.call(__MODULE__, {:unsubscribe, topic})
end
## Callbacks
def handle_call({:unsubscribe, topic}, {pid, _}, %{subs: subs} = state) do
tops = Map.get(subs, pid, MapSet.new)
subs = Map.put(subs, pid, MapSet.delete(tops, topic))
{:reply, :ok, %{state | subs: subs}}
endAgain we rely on the power of call, which includes the calling processes’ pid in the second argument. The handler finds the entry for the calling pid and removes the specified topic from its list. When future messages are published, that topic won’t be found and the message won’t be forwarded.
This approach is simple for illustration purposes, but it is quite naive and leaves behind dangling references to processes. Production grade implementations, such as Phoenix.PubSub, run periodic GC to clean up after unsubscribes.
Stitching into WebSockets
Our connections are built on top of Plug, which comes with an adapter for the Cowboy server. Every HTTP connection that comes to Cowboy is wrapped in a separate process. That includes WebSocket connections, which are initiated as HTTP connections and then upgraded to WebSockets.
That’s perfect for our pubsub setup! Once a connection is upgraded, the server simply waits for topic subscriptions and forwards them on to the Surrogate module.
Add both plug and cowboy to deps in mix.exs, ensure they are started as applications, and define Surrogate as the application to start.
def application do
[mod: {Surrogate, []},
applications: [:cowboy, :plug, :logger]]
end
defp deps do
[{:redix, "~> 0.3"},
{:plug, "~> 1.0"},
{:cowboy, "~> 1.0"}]
endIn order to treat Surrogate as an application, it needs a base module that uses the Application behavior. Forgive me if what follows is a little like the how to draw an owl process. There are a lot of details in application setup that are outside the scope of this article, and it is necessary to get a real server.
defmodule Surrogate do
use Application
alias Plug.Adapters.Cowboy
def start(_, _) do
import Supervisor.Spec
children = [
Cowboy.child_spec(:http, Surrogate.Server, [], [dispatch: dispatch]),
worker(Surrogate.PubSub, [])
]
opts = [strategy: :one_for_one, name: Surrogate.Supervisor]
Supervisor.start_link(children, opts)
end
defp dispatch do
[{:_, [{"/ws", Surrogate.Socket, []},
{:_, Cowboy.Handler, {Surrogate.Server, []}}]}]
end
endThe Surrogate module is defined as a proper Erlang app, which supervises the pubsub and cowboy workers. The supervision ensures that any time a worker crashes a new one is started up in its place. Notice that the number of modules has expanded; now there are also Surrogate.Server and Surrogate.Socket modules. These will handle our incoming HTTP and WebSocket connections, respectively.
First, the Surrogate.Server module is defined as a simple plug that always returns an “OK” text response.
defmodule Surrogate.Server do
import Plug.Conn
def init(_opts) do
end
def call(conn, _opts) do
conn
|> put_resp_content_type("application/text")
|> send_resp(200, "OK")
end
endWe can verify that the server is working now by starting the application up with iex -S mix and visiting localhost:4000. If everything went according to plan, you’ll see a friendly “OK” in your browser. Now the final module needed to relay messages, Surrogate.Socket:
defmodule Surrogate.Socket do
@behaviour :cowboy_websocket_handler
def init(_, _req, _opts) do
{:upgrade, :protocol, :cowboy_websocket}
end
## Callbacks
def websocket_init(_type, req, _opts) do
{:ok, req, %{}, 60_000}
end
endThe module uses the cowboy_websocket_handler behaviour, which requires callback handlers just like a GenServer. Upgrading to a WebSocket is handled by websocket_init/3, which is called whenever a new connection comes in to localhost:4000/ws. Once connected, we can begin sending and receiving messages through websocket_handle/3.
alias Surrogate.PubSub
def websocket_handle({:text, "ping"}, req, state) do
{:reply, {:text, "pong"}, req, state}
end
def websocket_handle({:text, "subscribe|" <> topic}, req, state) do
PubSub.subscribe(topic)
{:ok, req, state}
end
def websocket_handle({:text, "unsubscribe|" <> topic}, req, state) do
PubSub.unsubscribe(topic)
{:ok, req, state}
end
def websocket_handle({:text, _}, req, state) do
{:ok, req, state}
endEach handler will pattern match on the incoming message, allowing the server to selectively handle subscribe and unsubscribe messages accordingly. At this point, the server can handle incoming messages, but it is unable to relay subscribed messages being returned from pubsub. As with all out-of-band messages to a server, those are handled with an _info/3 callback:
def websocket_info(message, req, state) do
{:reply, {:text, message}, req, state}
endHere the message is the exact value being published to the channel, and it is passed along to the WebSocket connection directly. Finally, we can test the round trip between Ruby, Elixir, and a web browser. Restart the surrogate server interactively with iex -S mix, reload the page, and open up the JavaScript console:
let ws = new WebSocket("ws://localhost:4000/ws")
ws.onmessage = (message) => console.log(message.data)
ws.send("subscribe|topic.1")All messages published to topic. are fed to the console, whether they come from Ruby, a Redis CLI instance, or the Elixir shell. For example, when a ‘score’ object is serialized and published from Ruby, this is logged as data in the console:
'{"id":12345,"value":"new score"}'Monolith This Isn’t
The system we’ve built is more complex than a monolith, but it will scale simply and perform reliably. It will easily handle thousands of topics and thousands of connections, even on the lightest weight hardware. Line for line, it may outweigh a simple naive Node.js implementation, but it is entirely fault tolerant right out of the box. That’s the benefit we get from building on top of Erlang.
This system is real-time only; there aren’t any mechanisms to rewind or catch up to missed messages after temporary disconnections. That is simply the nature of pubsub. Paired with a lack of channel authorization, it is perfectly suited to pushing updates over broadly available public channels. Think real-time sports updates, leaderboards, stock tickers, etc.
This is not the “beautiful monolith” approach, because that simply doesn’t exist when you enter the realm of distributed systems. It relies on an external database to broker communication, which is a service in itself. When external dependencies are already at play, it is wise to leverage languages and frameworks that are perfectly suited to the task.
| Reference: | Surrogate WebSockets Alongside Rails from our WCG partner Florian Motlik at the Codeship Blog blog. |




