The example in the previous chapter was admittedly quite simple. Let’s build on that foundation (pun intended) to do something a bit more interesting. Suppose we have a workflow on our site like the following:
Enter some information on page X, and submit.
Submission starts a background job, and the user is redirected to a page to view the status of that job.
That second page will subscribe to updates from the background job and display them to the user.
The core principle here is the ability to let one thread publish updates, and
have another thread subscribe to receive those updates. This is known generally
as pub/sub, and fortunately is very easy to achieve in Haskell via STM
(the Software Transactional Memory library).
Like in the previous chapter, let me start off with the following caveat: this technique only works properly if you have a single web application process. If you have two different servers and a load balancer, you’ll either need sticky sessions or some other solution to make sure that the requests from a single user are going to the same machine. In those situations, you may want to consider using an external pub/sub solution, such as Redis.
With that caveat out of the way, let’s get started.
We’ll need two different mutable references in our foundation. The first will keep track of the next “job ID” we’ll hand out. Each of these background jobs will be represented by a unique identifier that will be used in our URLs. The second piece of data will be a map from the job ID to the broadcast channel used for publishing updates. In code:
data
App
=
App
{
jobs
::
TVar
(
IntMap
(
TChan
(
Maybe
Text
)))
,
nextJob
::
TVar
Int
}
Notice that our TChan
contains Maybe Text
values. The reason for the
Maybe
wrapper is so that we can indicate that the channel is complete, by
providing a Nothing
value.
In order to allocate a job, we need to:
Get a job ID.
Create a new broadcast channel.
Add the channel to the channel map.
Due to the beauty of STM, this is pretty easy:
(
jobId
,
chan
)
<-
liftIO
$
atomically
$
do
jobId
<-
readTVar
nextJob
writeTVar
nextJob
$!
jobId
+
1
chan
<-
newBroadcastTChan
m
<-
readTVar
jobs
writeTVar
jobs
$
IntMap
.
insert
jobId
chan
m
return
(
jobId
,
chan
)
There are many different ways we could go about this, and they depend entirely
on what the background job is going to be. Here’s a minimal example of a
background job that prints out a few messages, with a one-second delay between
each message. Note how after our final message, we broadcast a Nothing
value
and remove our channel from the map of channels:
liftIO
$
forkIO
$
do
threadDelay
1000000
atomically
$
writeTChan
chan
$
Just
"Did something
"
threadDelay
1000000
atomically
$
writeTChan
chan
$
Just
"Did something else
"
threadDelay
1000000
atomically
$
do
writeTChan
chan
$
Just
"All done
"
writeTChan
chan
Nothing
m
<-
readTVar
jobs
writeTVar
jobs
$
IntMap
.
delete
jobId
m
For this demonstration, I’ve elected for a very simple progress viewing: a plain text page with a streaming response. There are a few other possibilities here: an HTML page that autorefreshes every X seconds, or using EventSource or WebSockets. I encourage you to give those a shot also, but here’s the simplest implementation I can think of:
getViewProgressR
jobId
=
do
App
{
..
}
<-
getYesod
mchan
<-
liftIO
$
atomically
$
do
m
<-
readTVar
jobs
case
IntMap
.
lookup
jobId
m
of
Nothing
->
return
Nothing
Just
chan
->
fmap
Just
$
dupTChan
chan
case
mchan
of
Nothing
->
notFound
Just
chan
->
respondSource
typePlain
$
do
let
loop
=
do
mtext
<-
liftIO
$
atomically
$
readTChan
chan
case
mtext
of
Nothing
->
return
()
Just
text
->
do
sendChunkText
text
sendFlush
loop
loop
We start off by looking up the channel in the map. If we can’t find it, it means the job either never existed, or has already been completed. In either event, we return a 404. (Another possible enhancement would be to store some information on all previously completed jobs and let the user know if the job is done.)
Assuming the channel exists, we use respondSource
to start a streaming
response. We then repeatedly call readTChan
until we get a Nothing
value,
at which point we exit (via return ()
). Notice that on each iteration, we
call both sendChunkText
and sendFlush
. Without that second call, the user
won’t receive any updates until the output buffer completely fills up, which is
not acceptable for a real-time update system.
For completeness, here’s the full source code for this application:
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE QuasiQuotes #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE TemplateHaskell #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE ViewPatterns #-}
import
Control.Concurrent
(
forkIO
,
threadDelay
)
import
Control.Concurrent.STM
import
Data.IntMap
(
IntMap
)
import
qualified
Data.IntMap
as
IntMap
import
Data.Text
(
Text
)
import
Yesod
data
App
=
App
{
jobs
::
TVar
(
IntMap
(
TChan
(
Maybe
Text
)))
,
nextJob
::
TVar
Int
}
mkYesod
"App"
[
parseRoutes
|
/
HomeR
GET
POST
/
view
-
progress
/#
Int
ViewProgressR
GET
|
]
instance
Yesod
App
getHomeR
::
Handler
Html
getHomeR
=
defaultLayout
$
do
setTitle
"PubSub example"
[
whamlet
|
<
form
method
=
post
>
<
button
>
Start
new
background
job
|
]
postHomeR
::
Handler
()
postHomeR
=
do
App
{
..
}
<-
getYesod
(
jobId
,
chan
)
<-
liftIO
$
atomically
$
do
jobId
<-
readTVar
nextJob
writeTVar
nextJob
$!
jobId
+
1
chan
<-
newBroadcastTChan
m
<-
readTVar
jobs
writeTVar
jobs
$
IntMap
.
insert
jobId
chan
m
return
(
jobId
,
chan
)
liftIO
$
forkIO
$
do
threadDelay
1000000
atomically
$
writeTChan
chan
$
Just
"Did something
"
threadDelay
1000000
atomically
$
writeTChan
chan
$
Just
"Did something else
"
threadDelay
1000000
atomically
$
do
writeTChan
chan
$
Just
"All done
"
writeTChan
chan
Nothing
m
<-
readTVar
jobs
writeTVar
jobs
$
IntMap
.
delete
jobId
m
redirect
$
ViewProgressR
jobId
getViewProgressR
::
Int
->
Handler
TypedContent
getViewProgressR
jobId
=
do
App
{
..
}
<-
getYesod
mchan
<-
liftIO
$
atomically
$
do
m
<-
readTVar
jobs
case
IntMap
.
lookup
jobId
m
of
Nothing
->
return
Nothing
Just
chan
->
fmap
Just
$
dupTChan
chan
case
mchan
of
Nothing
->
notFound
Just
chan
->
respondSource
typePlain
$
do
let
loop
=
do
mtext
<-
liftIO
$
atomically
$
readTChan
chan
case
mtext
of
Nothing
->
return
()
Just
text
->
do
sendChunkText
text
sendFlush
loop
loop
main
::
IO
()
main
=
do
jobs
<-
newTVarIO
IntMap
.
empty
nextJob
<-
newTVarIO
1
warp
3000
App
{
..
}