One of the interesting things about Mongrel2 is its ability to send output to multiple clients with a single handler message. This has a lot of potential for push applications and while I was investigating Mongrel2 a new version of iOS came out that included changes to Safari. While looking at the list of Safari changes in iOS 4.2 I noticed something called EventSource and went to investigate what it was.
As it turns out EventSource is a newer way of doing browser push currently supported by Chrome, Opera and Safari (mobile Safari as well). There is a good HTML5Rocks post on Server-Sent Events that goes into more detail on the differences of using it over something like WebSockets. One of the differences is that EventSource specially addresses mobile device use with the ability to do a “Connectionless push” through a proxy so the end device can sleep but still receive push notifications.
Before reading on check out my example Mongrel2 ruby handler post if you haven’t already. The following examples will be based on the code from that post. I’m also going to use Modernizr to detect support for EventSource so check out my post on using Modernizr to detect browser support as well.
The first thing to do is configure Mongrel2. The following configuration will tell Mongrel2 to serve static content from a directory named “html” and connect the /esupdates path to a ruby handler we will write later (also note that it listens on more than one host, make sure you put your IP in place of the example IP):
[code language=”text”]
root_dir = Dir(base=’html/’, index_file=’index.html’, default_ctype=’text/plain’)
esupdates = Handler(send_spec=’tcp://127.0.0.1:9999′, send_ident=’54c6755b-9628-40a4-9a2d-cc82a816345e’,
recv_spec=’tcp://127.0.0.1:9998′, recv_ident=”)
routes = {
‘/esupdates’: esupdates,
‘/’: root_dir
}
main = Server(
uuid="2f62bd5-9e59-49cd-993c-3b6013c28f05",
access_log="/logs/access.log",
error_log="/logs/error.log",
chroot="./",
pid_file="/run/mongrel2.pid",
default_host="localhost",
name="main",
port=6767,
hosts=[ Host(name="192.168.1.1", routes=routes)
Host(name="localhost", routes=routes) ]
)
settings = {"zeromq.threads": 1}
servers = [main]
[/code]
The next step is to create the page that will set up the EventSource connection and display any push messages. In this example I’m using a modified version of Modernizr to detect EventSource support but you could do the test by hand or assume the user has support. The current version of Modernizr doesn’t support EventSource yet but you can grab my fork of Modernizr where I have added it.
[code language=”html”]
<!doctype html>
<html style="no-js">
<head>
<title>EventSource Test</title>
<style type="text/css" media="screen">
div.esno, div.esyes { display: none }
.no-eventsource div.esno { display: block }
.eventsource div.esyes { display: block }
</style>
<script src="https://ajax.googleapis.com/ajax/libs/jquery/1.4.4/jquery.min.js" type="text/javascript"></script>
<script src="modernizr-min.js" type="text/javascript"></script>
<script type="text/javascript">
function startUpdates()
{
var source = new EventSource(‘/esupdates’);
source.onmessage = function (event)
{
$("#events").innerHTML += event.data + ‘<br/>’;
};
source.onerror = function (event)
{
$("#events").innerHTML += ‘error<br/>’;
};
source.onopen = function (event)
{
$("#events").innerHTML += ‘open<br/>’;
};
}
$(document).ready(function()
{
if (Modernizr.eventsource)
{
setTimeout(startUpdates, 10);
}
});
</script>
</head>
<body>
<div class="esno">
Your browser does not support EventSource.
</div>
<div class="esyes">
Messages:<br/>
<div id="events"></div>
</div>
</body>
</html>
[/code]
The above creates the EventSource connection to the /esupdates URL and when any event comes in it appends that event to the div with the id of “events”. This and the Modernizr javascript need to go in the “html” directory.
And last comes the handler code. I have modified the ruby handler example from my previous post by adding a new ZMQ queue called “eventsource_queue” as well as a beacon thread that pushes a message to the “eventsource_queue” every 5 seconds. Messages coming in on the “eventsource_queue” are sent to every connected client:
[code language=”ruby”]
require ‘zmq’
require ‘json’
handler_thread = Thread.new do
handler_ctx = ZMQ::Context.new(1)
receive_queue = handler_ctx.socket(ZMQ::PULL)
receive_queue.connect("tcp://127.0.0.1:9999")
response_publisher = handler_ctx.socket(ZMQ::PUB)
response_publisher.connect("tcp://127.0.0.1:9998")
response_publisher.setsockopt(ZMQ::IDENTITY, "82209006-86FF-4982-B5EA-D1E29E55D481")
stop_queue = handler_ctx.socket(ZMQ::PULL)
stop_queue.connect("ipc://shutdown_queue")
eventsource_queue = handler_ctx.socket(ZMQ::PULL)
eventsource_queue.connect("ipc://eventsource_queue")
connections = []
stopped = false
until stopped do
selected_queue = ZMQ.select([receive_queue, stop_queue, eventsource_queue])
if selected_queue[0][0] == stop_queue # Anything on the stop_queue ends processing
stop_queue.close
receive_queue.close
eventsource_queue.close
response_publisher.close
handler_ctx.close
stopped = true
elsif selected_queue[0][0] == eventsource_queue # A message that goes out to all clients
next if connections.size == 0
# The following must be the send_ident value used in the Handler config file entry
sender_uuid = ’54c6755b-9628-40a4-9a2d-cc82a816345e’
es_message = eventsource_queue.recv(0)
# Note: This can only handle 128 connections without modification
client_str = connections.join(‘ ‘)
response_value = "#{sender_uuid} #{client_str.length}:#{client_str}, #{es_message}"
response_publisher.send(response_value, 0)
else
# Request comes in as "UUID ID PATH SIZE:HEADERS,SIZE:BODY,"
sender_uuid, client_id, request_path, request_message = receive_queue.recv(0).split(‘ ‘, 4)
len, rest = request_message.split(‘:’, 2)
headers = JSON.parse(rest[0…len.to_i])
len, rest = rest[(len.to_i+1)..-1].split(‘:’, 2)
body = rest[0…len.to_i]
if headers[‘METHOD’] == ‘JSON’ and JSON.parse(body)[‘type’] == ‘disconnect’
connections.delete(client_id) # A client has disconnected, remove them from the list
next
end
# Respond with the opening EventSource information
connections << client_id
headers = {}
headers[‘Content-Type’] = ‘text/event-stream’
headers[‘Expires’] = ‘Fri, 01 Jan 1990 00:00:00 GMT’
headers[‘Cache-Control’] = ‘no-cache, no-store, max-age=0, must-revalidate’
headers[‘Pragma’] = ‘no-cache’
headers_s = headers.map{|k, v| "%s: %s" % [k,v]}.join("\r\n")
content_body = ": time stream\nretry: 5000\n\n";
response_value = "#{sender_uuid} #{client_id.to_s.length}:#{client_id}, HTTP/1.1 200 OK\r\n#{headers_s}\r\n\r\n#{content_body}"
response_publisher.send(response_value, 0)
end
end
end
# This thread sends out a message every 5 seconds to all connected clients
beacon_thread = Thread.new do
beacon_ctx = ZMQ::Context.new(1)
eventsource_queue = beacon_ctx.socket(ZMQ::PUSH)
eventsource_queue.bind("ipc://eventsource_queue")
index = 1
loop do
eventsource_queue.send("id: #{index}\r\ndata: Timestamp #{Time.now}\r\n\r\n")
sleep(5)
index = index + 1
end
end
ctx = ZMQ::Context.new(1)
stop_push_queue = ctx.socket(ZMQ::PUSH)
trap(‘INT’) do # Send a message to shutdown on SIGINT
stop_push_queue.bind("ipc://shutdown_queue")
stop_push_queue.send("shutdown")
end
handler_thread.join
beacon_thread.kill
beacon_thread.join
stop_push_queue.close
[/code]
There is a limitation in the above of 128 client connections at one time. This is something that could be fixed by splitting the set of connections into subsets of 128 each and sending each subset a message. I left that out to try to reduce the complexity of this example.
Although this example shows a single event going out to every client there is no need to limit it to just that. You could easily do the same thing for subsets of the full set of clients.
As a next step it might be interesting to see how well this setup could handle the C10k problem or even the C500k problem.