Merely logging the API response is not very useful. To traverse the follower graph, we must perform the following:
You learned how to do all these things in Chapter 7, Web APIs, but not in the context of actors.
We could just add the additional processing steps to the receive
method of our Fetcher
actor: we could add further transformations to the API response by future composition. However, having actors do several different things, and possibly failing in several different ways, is an anti-pattern: when we learn about managing the actor life cycle, we will see that it becomes much more difficult to reason about our actor systems if the actors contain several bits of logic.
We will therefore use a pipeline of three different actors:
We have already built the fetchers, though we will need to modify them to forward the API response to the response interpreter rather than just logging it.
You can find the code examples for this section in the chap09/all_workers
directory in the sample code provided with this book (https://github.com/pbugnion/s4ds).The first step is to modify the fetchers so that, instead of logging the response, they forward the response to the response interpreter. To be able to forward the response to the response interpreter, the fetchers will need a reference to this actor. We will just pass the reference to the response interpreter through the fetcher constructor, which is now:
// Fetcher.scala class Fetcher( val token:Option[String], val responseInterpreter:ActorRef) extends Actor with ActorLogging { ... }
We must also modify the Props
factory method in the companion object:
// Fetcher.scala def props( token:Option[String], responseInterpreter:ActorRef ):Props = Props(classOf[Fetcher], token, responseInterpreter)
We must also modify the receive
method to forward the HTTP response to the interpreter rather than just logging it:
// Fetcher.scala class Fetcher(...) extends Actor with ActorLogging { ... def receive = { case Fetch(login) => fetchFollowers(login) } private def fetchFollowers(login:String) { val unauthorizedRequest = Http( s"https://api.github.com/users/$login/followers") val authorizedRequest = token.map { t => unauthorizedRequest.header("Authorization", s"token $t") } val request = authorizedRequest.getOrElse(unauthorizedRequest) val response = Future { request.asString } // Wrap the response in an InterpretResponse message and // forward it to the interpreter. response.onComplete { r => responseInterpreter ! ResponseInterpreter.InterpretResponse(login, r) } } }
The response interpreter takes the response, decides if it is valid, parses it to JSON, and forwards it to a follower extractor. The response interpreter will need a reference to the follower extractor, which we will pass in the constructor.
Let's start by defining the ResponseInterpreter
companion. It will just contain the definition of the messages that the response interpreter can receive and a factory to create a Props
object to help with instantiation:
// ResponseInterpreter.scala import akka.actor._ import scala.util._ import scalaj.http._ import org.json4s._ import org.json4s.native.JsonMethods._ object ResponseInterpreter { // Messages case class InterpretResponse( login:String, response:Try[HttpResponse[String]] ) // Props factory def props(followerExtractor:ActorRef) = Props(classOf[ResponseInterpreter], followerExtractor) }
The body of ResponseInterpreter
should feel familiar: when the actor receives a message giving it a response to interpret, it parses it to JSON using the techniques that you learned in Chapter 7, Web APIs. If we parse the response successfully, we forward the parsed JSON to the follower extractor. If we fail to parse the response (possibly because it was badly formatted), we just log the error. We could recover from this in other ways, for instance, by re-adding this login to the queue manager to be fetched again:
// ResponseInterpreter.scala class ResponseInterpreter(followerExtractor:ActorRef) extends Actor with ActorLogging { // Import the message definitions import ResponseInterpreter._ def receive = { case InterpretResponse(login, r) => interpret(login, r) } // If the query was successful, extract the JSON response // and pass it onto the follower extractor. // If the query failed, or is badly formatted, throw an error // We should also be checking error codes here. private def interpret( login:String, response:Try[HttpResponse[String]] ) = response match { case Success(r) => responseToJson(r.body) match { case Success(jsonResponse) => followerExtractor ! FollowerExtractor.Extract( login, jsonResponse) case Failure(e) => log.error( s"Error parsing response to JSON for $login: $e") } case Failure(e) => log.error( s"Error fetching URL for $login: $e") } // Try and parse the response body as JSON. // If successful, coerce the `JValue` to a `JArray`. private def responseToJson(responseBody:String):Try[JArray] = { val jvalue = Try { parse(responseBody) } jvalue.flatMap { case a:JArray => Success(a) case _ => Failure(new IllegalStateException( "Incorrectly formatted JSON: not an array")) } } }
We now have two-thirds of our worker actors. The last link is the follower extractor. This actor's job is simple: it takes the JArray
passed to it by the response interpreter and converts it to a list of followers. For now, we will just log this list, but when we build our fetcher manager, the follower extractor will send messages asking the manager to add the followers to its queue of logins to fetch.
As before, the companion just defines the messages that this actor can receive and a Props factory method:
// FollowerExtractor.scala import akka.actor._ import org.json4s._ import org.json4s.native.JsonMethods._ object FollowerExtractor { // Messages case class Extract(login:String, jsonResponse:JArray) // Props factory method def props = Props[FollowerExtractor] }
The FollowerExtractor
class receives Extract
messages containing a JArray
of information representing a follower. It extracts the login
field and logs it:
class FollowerExtractor extends Actor with ActorLogging { import FollowerExtractor._ def receive = { case Extract(login, followerArray) => { val followers = extractFollowers(followerArray) log.info(s"$login -> ${followers.mkString(", ")}") } } def extractFollowers(followerArray:JArray) = for { JObject(follower) <- followerArray JField("login", JString(login)) <- follower } yield login }
Let's write a new main
method to exercise all our actors:
// FetchNetwork.scala import akka.actor._ import akka.routing._ import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.duration._ object FetchNetwork extends App { import Fetcher._ // Import messages and factory method // Get token if exists val token = sys.env.get("GHTOKEN") val system = ActorSystem("fetchers") // Instantiate actors val followerExtractor = system.actorOf(FollowerExtractor.props) val responseInterpreter = system.actorOf(ResponseInterpreter.props(followerExtractor)) val router = system.actorOf(RoundRobinPool(4).props( Fetcher.props(token, responseInterpreter)) ) List("odersky", "derekwyatt", "rkuhn", "tototoshi") foreach { login => router ! Fetch(login) } // schedule a shutdown system.scheduler.scheduleOnce(5.seconds) { system.shutdown } }
$ GHTOKEN="2502761d..." sbt run [INFO] [11/05/2015 20:09:37.048] [fetchers-akka.actor.default-dispatcher-3] [akka://fetchers/user/$a] derekwyatt -> adulteratedjedi, joonas, Psycojoker, trapd00r, tyru, ... [INFO] [11/05/2015 20:09:37.050] [fetchers-akka.actor.default-dispatcher-3] [akka://fetchers/user/$a] tototoshi -> akr4, yuroyoro, seratch, yyuu, ... [INFO] [11/05/2015 20:09:37.051] [fetchers-akka.actor.default-dispatcher-3] [akka://fetchers/user/$a] odersky -> misto, gkossakowski, mushtaq, ... [INFO] [11/05/2015 20:09:37.052] [fetchers-akka.actor.default-dispatcher-3] [akka://fetchers/user/$a] rkuhn -> arnbak, uzoice, jond3k, TimothyKlim, relrod, ...