4.3 Sending actor messages

At this point, ChatRoom is ready to process subscription messages, so let's send some messages to it. Scala's actors library supports both asynchronous and synchronous message sending.

Asynchronous message sending

You send a message asynchronously to an actor with the bang (!) symbol. In using ! to denote message sending, Scala follows the tradition of Erlang:

  val chatRoom = new ChatRoom
  chatRoom ! Subscribe(User("Bob"))

The ! method sends a message to chatRoom and returns immediately; it doesn't wait for any confirmation or reply from the target actor. In addition to the message, ! also sends an implicit reference to the sender to the target actor. That reference is always available inside the target actor via the sender variable.

Listing 4.6 illustrates how the target actor uses the sender reference to process a Post message.

    var session = Map.empty[UserActor]
  
  while (true) {     receive {       case Subscribe(user) =>          val subscriber = sender         val sessionUser =            actor {             while (true) {               self.receive {                 case Post(msg) => subscriber ! Post(msg)               }             }           }         session = session + (user -> sessionUser)
      // handle UserPost message       // handle Unsubscribe message     }   }
Listing 4.6 - Using the sender reference

Note that there are two actors in the above example: ChatRoom and the actor representing the user inside the chat room, sessionUser. When the chat room actor receives a Subscribe message, it assigns that message's sender to the subscriber variable. The closure passed to the actor method, in turn, captures that variable and allows the subscriberActor to receive and process Post messages. Once sessionUser is initialized to the actor representing the user, it is saved away in the session map.

Synchronous messages

Scala also supports synchronous message sending via the !? operator:

  val chatRoom = new ChatRoom
  chatRoom !? Subscribe(User("Bob"))

Unlike with asynchronous message sending, !? blocks the calling thread until the message is sent and a reply received. Listing 4.7 shows how ChatRoom might return an acknowledgment when handling a subscription message with the reply method.

    var session = Map.empty[UserActor]
  
  while (true) {     receive {       case Subscribe(user) =>          val subscriber = sender         val sessionUser =          actor {           while (true) {             self.receive {               case Post(msg) => subscriber ! Post(msg)             }           }        }        session = session + (user -> sessionUser)        reply("Subscribed " + user)      }   }
Listing 4.7 - Using the reply method

The client can capture and process the reply:

  chatRoom !? Subscribe(User("Bob")) match {
    case response: String => println(response)
  }

Futures

In some cases, you want the calling thread to return immediately after sending a message, but you may also need access to the target actor's reply at a later time. For instance, you may want to quickly return from sending a subscription message, but also record the chat room's acknowledgment message in the future.

Scala actors provide the concept of futures for such a scenario. Futures messages are sent with the !! method, which returns a Future without blocking the calling thread. The caller may or may not evaluate a future's value; if it does, and if the future value is not yet available, the calling thread will block:

  val future = chatRoom !! Subscribe(User("Bob"))
  
// Do useful work println(future()) // Wait for the future

Message timeouts

In the examples so far, receive blocks the actor's main thread until a matching message is found. In some cases, you may want to wait for suitable messages only for a certain period of time. The receiveWithin method allows you to specify a message timeout, and to be notified if no message was received within that time.

You can use the receiveWithin method to automatically unsubscribe a user if the user hasn't received a post message within a specified amount of time. In the following example, TIMEOUT will match if no Post message is received within three minutes; the user is then unsubscribed from the chat room, as Listing 4.8 shows.

    val sessionUser = actor {
      while (true) {
        self.receiveWithin (1800 * 1000) {
          case Post(msg) => subscriber ! Post(msg)
          case TIMEOUT =>
            room ! Unsubscribe(user)
            self.exit()
        }
      }
    }
Listing 4.8 - Using message timeouts with receiveWithin

Processing user posts

All that remains from our chat room to be fully functional is to implement the processing of a user's post, as shown in Listing 4.9.

    var session = Map.empty[UserActor]
  
  def act() {     while (true) {       receive {         case UserPost(user, msg) =>           for (key <- session.keys; if key != user) {             session(key) ! msg           }                              // Handle Subscribe message         // Handle Unsubscribe message       }     }   }
Listing 4.9 - Processing post messages
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset