10.6. Communication Channel

Our last piece of the user distributed data structure is a communication channel, which is a consumer channel like those we discussed in Chapter 5. This channel is used to collect all messages sent to the user. When new messages arrive they are appended to the tail of the channel, and as they're displayed on the user's messenger console, they're deleted from the head of the channel.

The channel—like the user's account and friends list—persists in the space as the user logs in and out of the service. If the user happens to log out just as new messages are being appended to the channel, but before they are displayed to the console, the messages aren't lost: The next time the user logs back into the messenger, the unread messages are displayed. Let's take a look at the code that implements the communication channel.

10.6.1. Message Entries and Indices

Each message in the channel is represented by a ChannelMessageEntry, defined as follows:

public class ChannelMessageEntry implements Entry {
  public String username;
  public String from;
  public Date time;
  public String content;
  public Integer position;

  public ChannelMessageEntry() {
  }

  public ChannelMessageEntry(String username, int position) {
    this.username = username;
    this.position = new Integer(position);
  }

  public ChannelMessageEntry(String to, String from,
    String content)
  {
    this.username = to;
    this.from = from;
    this.content = content;
    this.time = new Date();
  }
}

Each channel message entry has five fields: the username of the recipient, a from field holding the username of the person who sent the message, the time and content of the message, and a position field giving the sequence number of the message in the channel. The ChannelMessageEntry class also supplies a few convenience constructors.

Recall that a consumer channel uses two index entries to represent the head and tail of the channel. Here we reuse the Index class we introduced in Section 5.6 to implement the head and the tail:

public class Index implements Entry {
  public String type;        // head or tail
  public String channel;
  public Integer position;

  public Index() {
  }

  public Index(String type, String channel) {
    this.type = type;
    this.channel = channel;
  }

  public Index(String type, String channel, Integer position) {
    this.type = type;
    this.channel = channel;
    this.position = position;
  }

  public Integer getPosition() {
    return position;
  }

  public void increment() {
    position = new Integer(position.intValue() + 1);
  }
}

10.6.2. The Channel Object

To encapsulate operations on the channel distributed data structure—such as creating the channel, appending messages to it, and consuming messages from it—we define a Channel class:

public class Channel {
  private String username;
  private JavaSpace space;
  private TransactionManager mgr;
  private final long MESSAGE_LEASE_TIME =
     604800000;  // one week

  public Channel(Messenger messenger, String username) {
    this.username = username;
    this.space = messenger.space();
    this.mgr = messenger.mgr();
  }

  public void create(Transaction txn)
    throws RemoteException, TransactionException
  {
    Index head =
      new Index("head", username, new Integer(1));
    Index tail =
      new Index("tail", username, new Integer(0));

    space.write(head, txn, Lease.FOREVER);
    space.write(tail, txn, Lease.FOREVER);
  }

  public void append(ChannelMessageEntry message) {
    try {
       Transaction txn = createTransaction();

       Index tail = takeTail(message.username, txn);
       tail.increment();
       message.position = tail.getPosition();
       writeTail(tail, txn);
       space.write(message, txn, MESSAGE_LEASE_TIME);

       txn.commit();
     } catch (Exception e) {
        e.printStackTrace();
     }
  }

  private Transaction createTransaction()
     throws RemoteException, LeaseDeniedException
  {
     Transaction.Created trc =
       TransactionFactory.create(mgr, 3000);
     Transaction txn = trc.transaction;
     return txn;
  }

  //... tail and head helper methods go here

  //... code for consuming messages goes here
}

A Channel object has three fields: the space the channel resides in, a transaction manager mgr that we use to create transactions involving operations on the channel, and the username of the user who “owns” the channel. The Channel constructor takes a handle to the messenger applet and the username as parameters. It obtains references to the space and the transaction manager by calling the applet's space and mgr methods, respectively, and assigns the value of the username parameter to the username field.

The create method sets up a new channel in the space. As we saw in Chapter 5, this involves creating initial head and tail Index entries and writing them to the space under the supplied transaction. The head starts off with an index of 1, while the tail starts off with an index of 0 (recall that the tail being less than the head indicates an empty channel).

The append method adds a new message to the tail end of a channel. We've implemented append much as we did in Chapter 5, where we remove the tail entry of the channel from the space, increment the position of the tail by one, write the updated tail back to the space, and then write the new message to the channel at the new tail position. We write the message to the space with a lease time of one week so that at some point, if messages aren't read by the user, they are expired and removed. Note the head and tail entries of the channel are always written to the space with a lease time of Lease.Forever, so they are never expired by the space. The problem with the append code from Chapter 5, however, is that it does nothing to protect against partial failure. If failure occurs anywhere within the append sequence—say, the tail is removed from the space but can't be written back, or the message can't be written to the new tail position—the space may be left in an inconsistent state, and the application breaks. Now that we've learned how to create and use transactions, we've redesigned our append method to make it robust.

The revised append method first creates a transaction (by calling a createTransaction helper method we've defined). We take the tail from the space, write the tail back to the space, and write the message to the space—all under the transaction—and then commit the transaction. If partial failure occurs, the space will be left in the same consistent state it was in prior to the transaction.

You'll notice here and in the next section that we use several convenient helper methods for operating on the head and tail entries of the user's channel: readTail, takeTail, writeTail, takeHead, and writeHead (which you'll find defined in the complete source code). The readTail method reads the tail index of the channel and returns the tail's position. The takeTail method removes and returns the tail entry for the channel, while writeTail writes a tail entry for the channel into the space. The takeHead and writeHead methods are analogous, but work on head entries instead. Besides readTail, all the methods take a transaction parameter and perform their space operation under that transaction.

10.6.3. Retrieving Messages from the Channel

The Channel object provides two methods for consuming messages from a channel. One method, getMessage, removes a message with a particular sequence number from the channel. The other method, getMessages, removes all messages currently in the channel, up to and including the one at the tail.

Here is how the getMessage method is defined:

public ChannelMessageEntry getMessage(int num, Transaction txn) {
  ChannelMessageEntry template =
    new ChannelMessageEntry(username, num);
  try {
     return (ChannelMessageEntry)
        space.take(template, txn, 500);
  } catch (Exception e) {
     e.printStackTrace();
     return null;
  }
}

The method takes two parameters: the position number of the message to retrieve, and the transaction under which the operations should take place. We first instantiate a message template, specifying the username (which uniquely identifies the channel) and the position of the message we want. Then we use the template to take the matching message entry from the space.

Note that the take is performed under the transaction that's been passed to the method and with a time-out of 500 milliseconds (if the take hasn't found a matching entry within half a second, it will return null). We put a time limit on the retrieval, because there is a chance that messages may have been expired and removed from the space if they have been in the space longer than a week (recall that we wrote messages to the space with a lease time of one week). In this case, we don't want to block on a message that will never appear in the space, so after half a second we skip it and continue on to the next message.

The getMessages method is used when we want to remove all the messages currently in the channel, from the head to the tail:

public Enumeration getMessages() {
  Transaction txn = null;

  try {
     txn = createTransaction();

     // read tail & remove head
     int end = readTail();
     Index head = takeHead(txn);
     int start = head.position.intValue();
     if (end < start) {
        return null;
     }

     // retrieve messages from head to tail
     Vector messages = new Vector();
     for (int i = start; i <= end; i++) {
        ChannelMessageEntry message = getMessage(i, txn);
        if (message != null) {
           messages.addElement(message);
        }
        head.increment();
     }

     writeHead(head, txn);

     txn.commit();
     return messages.elements();
  } catch (Exception e) {
    if (txn != null) {
       try {
          txn.abort();
       } catch (Exception e2) {
         ; // failed to abort
       }
    }
    System.err.println("Failed reading messages");
    e.printStackTrace();
    return null;
  }
}

The method first creates a transaction, under which it will perform all of its space operations. This ensures that the channel will be left in a consistent state even if partial failure occurs.

Next, getMessages calls readTail, to determine the sequence number of the last message currently in the channel, and takeHead, which removes the head index from the space. You may be wondering why we remove the head from the space but just read the tail. By removing the head, we're ensuring that this is the only process removing messages from this channel for the time being. We don't need to remove the tail, because there's no reason other processes can't append new messages to the channel even as we're removing and displaying older messages.

Next we compare the position numbers of the head and tail; if the tail is less than the head, then the channel is empty and we return null. Otherwise, we iterate through the sequence numbers from the head to the tail, calling getMessage to remove each message from the channel and adding it to a vector called messages. As we consume each message, we increment the head index. Once we've consumed them all, we write the head back into the space; its position will be one greater than the tail we recorded (which means the channel will be empty, unless other processes have appended messages to it as we've been busy removing messages). Finally we commit the transaction and return the vector of messages (which will be displayed to the messenger console, as we'll see shortly).

Note that the whole sequence of operations to remove channel messages is performed within a try / catch clause. If any exceptions occur, we check to see if a transaction has already been created and, if so, we explicitly abort it.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset