Chapter 7. Error handling with RxJS

This chapter covers

  • The issues with imperative error–handling schemes
  • Using functional data types to abstract exception handling
  • Using observable operators to handle exceptions
  • Different strategies for retrying observable sequences

Until now, we’ve explored only happy-path examples involving RxJS for tackling many different use cases. We suspect that at some point you’ve probably asked yourself, “What would happen if a remote HTTP call failed while fetching data for my stock quote widget?” Observers see the outcome of combining and transforming sequences of streams that you use to map your business logic to. But if an exception occurs midstream, what will the observer see at that point? These are some valid and important questions, but it was important that you first understand and learn to think reactively with ideal scenarios. Now, we’re going to sprinkle a dose of the real world onto your code. The brutal reality is that software will likely fail at some point during its execution.

Many issues can arise in software where data inadvertently becomes null or undefined, exceptions are thrown, network connectivity is lost, and so on. Your code needs to account for the potential occurrence of these issues, which unavoidably creates complexity. In other words, you can’t escape errors, but you can learn how to deal with them. One strategy that developers often use is to scatter error-handling code around every function call. We do this to make our code more robust and fault tolerant, but it has the detrimental effect of making it even more complex and harder to read.

In this chapter, you’ll learn that the key to elegant error handling in RxJS is done in part by the effective use of observables and by following proper FP principles, as you’ve seen all along. Given a properly constructed observable stream, the next step is to learn about the different RxJS observable operators that you can plug in to respond to any adversity. Before we get started, it’s important for you to understand that you need to put aside the imperative error-handling techniques you’re accustomed to, like try/catch, in favor of a functional approach as implemented in RxJS.

7.1. Common error-handling techniques

JavaScript errors can occur in many situations, especially when an application fails to communicate with a server when invoking an AJAX call. Also, third-party libraries that you load into your project can have functions that throw exceptions to signal special error conditions. Hence, you always need to be prepared for the worst and design with failure in mind, instead of letting it become an afterthought and a source of regret.

In the imperative world, exceptions are typically handled with the common try/catch idiom, which occurs frequently with synchronous code. Conversely, in the asynchronous world—remote HTTP calls and event emitters—you’re required to interface with functions that delegate failures to callback functions. And recently with JavaScript ES6, many libraries have switched to using Promises to wrap their asynchronous computations. Let’s examine each of these cases individually.

7.1.1. Error handling with try/catch

JavaScript’s default exception-handling mechanism is geared toward throwing and catching exceptions through the popular try/catch block, which is also pervasive in most modern programming languages. Here’s a sample:

try {
   someDangerousFunction();
}
catch (error) {
   // statements to handle any exceptions
   console.log(error.message);
}

As you know, the purpose of this structure is to surround a piece of code that you deem to be unsafe. Upon throwing an exception, the JavaScript runtime abruptly halts the program’s execution and creates a stack trace of all the function calls leading up to the problematic instruction. Specific details about the error, such as the message, line number, and filename, are populated into an object of type Error and passed into the catch block.

RxJS 5 exceptions

RxJS 5 has a number of improvements over its previous version. One of them involves the simplification of the internal mechanisms of RxJS, resulting in a stack trace that’s much easier to parse.

The catch block becomes a safe haven so that you can potentially recover your program. But with your knowledge about observables, you can see how this imperative style of dealing with exceptions is structurally very different from what you’ve done so far. So, adding try/catch to your RxJS code to provide error-handling logic to a stream would look like the following:

try {
   const data$ = Rx.Observable.fromPromise(ajax('/data'))
     .subscribe(console.log);
}
catch(error) {
   console.log(`Error processing stream: ${error.message}`);
}

Now, imagine having to merge multiple streams, each with its own type of failure, and you can see how this pattern couldn’t possibly be effective if you need to wrap each stream with its own try/catch. With asynchronous functions, the common JavaScript pattern is to provide the error callback alongside the success callback.

7.1.2. Delegating errors to callbacks

As is common with asynchronous functions in many JavaScript libraries, there’s typically a function that responds to the success case and one that handles errors. This is necessary because asynchronous functions are unpredictable in terms of if and when they return, and if errors occur. Until now, we purposely avoided talking about error cases when using an asynchronous function like ajax(). You’ve been using this function all along, as a kind of black box that always ran correctly. You could use it in two different ways: with callbacks or with Promises. Let’s peek under the hood of this function using callbacks.

Listing 7.1. Function ajax() with success and error callbacks
const ajax = function (url, success, error) {
   let req = new XMLHttpRequest();                        1
   req.responseType = 'json';
   req.open('GET', url);
   req.onload = function() {
      if(req.status == 200) {
         let data = JSON.parse(req.responseText);
         success(data);                                   2
      }
      else {
         req.onerror();
      }
   }
   req.onerror = function () {
      if(error) {
         error(new Error('IO Error'));                 3
      }
   };
   req.send();
};

  • 1 Initializes an XmlHttpRequest object used to fetch data remotely
  • 2 On success, parses the data as JSON and invokes the success() callback
  • 3 On error, converts the error message into an exception object
Code samples

Remember that the code for this chapter can be found in the RxJSinAction GitHub repository, https://github.com/RxJSInAction/rxjs-in-action.

Using this function, code that would require multiple nested sequences of HTTP calls, such as when mashing up different sources of data, would look like the next listing.

Listing 7.2. Imperative error handling with asynchronous code
ajax('/data',
  data => {
    for (let item of data) {
      ajax(`/data/${item.getId()}/info`,
        dataInfo => {
          ajax(`/data/images/${dataInfo.img}`,
            showImage,
            error => {                                            1
              console.log(`Error image: ${error.message}`);
            });
        },
        error => {                                                2
          console.log(`Error each data item: ${error.message}`);
        });
    }
  },
  error => {                                                      3
    console.log(`Error fetching data: ${error.message}`);
  }
});

  • 1 Handles the innermost HTTP call
  • 2 Handles second-level HTTP call
  • 3 Handles the outermost HTTP call

Looking at this code from just a structural point of view, you can picture it as nested code blocks, such as the ones shown in figure 7.1.

Figure 7.1. Imperative asynchronous error handling tends to nest when processing a series of asynchronous calls.

Indeed, although our code is more fault tolerant, all we’ve done here is exacerbate the problem of having to parse this nested “pyramid of doom,” which we spoke about in chapter 1. Because of this type of situation, the JavaScript ES6 specification introduced Promises, which elegantly streamline the invocation of a sequence of asynchronous functions.

7.1.3. Errors and Promises

The Promise.then() function acts as the mapping function (similar to Rx.Observable.map()) used to project (or map) another Promise to a source Promise. This is the reason why we decided to “promisify” ajax(), and it’s what we’ve been using for most of the examples as a form much superior to its callback counterpart. Here’s the code for that.

Listing 7.3. Promisified ajax()
const ajax = function (url) {
    return new Promise(function(resolve, reject) {          1
        let req = new XMLHttpRequest();
        req.responseType = 'json';
        req.open('GET', url);
        req.onload = function() {
            if(req.status == 200) {
               let data = JSON.parse(req.responseText);
               resolve(data);                               2
            }
            else {
               reject(new Error(req.statusText));           3
            }
        };
        req.onerror = function () {
           reject(new Error('IO Error'));                   3
        };
        req.send();
    });
 };

  • 1 Creates and returns the HTTP call wrapped in a Promise
  • 2 Promise is resolved if data is fetched successfully
  • 3 Promise is rejected in case failure occurs while performing the remote request

Much as you can with observables, you can chain multiple asynchronous calls by mapping new Promises to a source Promise. Then, you can use the Promise.catch() operator to implement an error-handling strategy that answers to any of the rejected Promises or ones that throw exceptions, as such:

ajax('/data')
   .then(...)
   .catch(error => console.log(`Error fetching data: ${error.message}`))

Because catch() itself returns a Promise, you can implement specific errors by inserting multiple asynchronous calls to catch() in series, like this:

ajax('/data')
   .then(item => ajax(`/data/${item.getId()}/info`))
   .catch(error => console.log(`Error fetching data: ${error.message}`))

     .then(dataInfo => ajax(`/data/images/${dataInfo.img}`))
     .catch(error => console.log(`Error each data item: ${error.message}`))

     .then(showImg);
     .catch(error => console.log(`Error image: ${error.message}`))

Arguably, in comparison to figure 7.1, the statement in figure 7.2 resembles a much easier structure to parse.

Figure 7.2. Promises allow you to chain subsequent asynchronous calls, each with its own success and error (catch) callbacks.

If the first ajax() fails, the first catch() operator runs before jumping into the next Promise in the chain. Each catch can be thought of as a recovery block for the previous Promise; the Promise allows you to resume processing in some known state.

Continuous catch

The previous code example introduces a small bug that we ignored to prevent cluttering up the code. Because catches are also part of the continuation, the handler method can return either a value or another Promise; if no value is returned, then an undefined value will be passed to the next continuation block.

Just like with a synchronous try/catch, you can either continue by recovering from the error, in this case by returning a non-error value, or you can rethrow the error. In the catch block, that’s done by either returning a Promise.reject() or throwing within the callback method. But because you’re basically just transferring control from one Promise to the next, it’s more typical to just implement a single, global catch() operator (this is essentially equivalent to placing an overarching try/catch block over your entire function body). In this case, when any Promise fails, the catch() operator is run and the entire body of code is exited:

ajax('/data')
   .then(item => ajax(`/data/${item.getId()}/info`))
   .then(dataInfo => ajax(`/data/images/${dataInfo.img}`))
   .then(showImg)
   .catch(error => console.log(error.message));

Certainly, Promises get you closer to where you want to be. Unfortunately, all these approaches limit your ability to make your code responsive and reactive; in other words, you can’t easily return a default value in case a request failed or perhaps retry a rejected Promise. You can get around passing default values down the chain by introducing side effects in your code. And you can implement retries with the help of third-party libraries, such as Q.js (https://github.com/kriskowal/q). But more importantly, recall from our earlier discussions that Promises model single asynchronous values, not a deluge of them, which are the type of problems you solve when combining functional and reactive programming—and to make matters worse, Promises can swallow exceptions if no error handler is provided. (This is true only for older browser versions. Newer browsers and NodeJS bubble up errors thrown from unhandled promises.) Let’s examine in more detail the reasons why these imperative error-handling mechanisms are incompatible with a reactive application.

7.2. Incompatibilities between imperative error-handling techniques an- nd functional and reactive code bases

The structured mechanism of throwing and catching exceptions in imperative Java-Script code has many drawbacks when used in a functional or reactive style. In general, functions that throw exceptions

  • Can’t be composed or chained like other functional artifacts.
  • Violate the principle of pure functions that advocates a single, predictable value because throwing exceptions constitutes another exit path from your function calls.
  • Cause side effects to occur because an unanticipated unwinding of the stack impacts the entire system beyond just the function call or the stream declaration.
  • Violate the principle of non-locality because the code used to recover from the error is distanced from the originating function call. When an error is thrown, a function leaves the local stack and environment, for instance:
    try {
         let record = findRecordById('123');
         ... potentially many lines of code in between
    }
    catch (e) {
         console.log('ERROR: Record not found!');
    
         // Handle error here
    }
  • Put a great deal of responsibility on the caller to declare matching catch blocks to manage specific exceptions instead of just worrying about a function’s single return value.
  • Are hard to use asynchronously. The try/catch idiom is effective when enclosing synchronous code, where errors are syntactically bounded by the enclosing try blocks. This code is predictable and not affected by time and latency. Asynchronous functions, on the other hand, are unpredictable and typically provide an error callback mechanism to give control of the program back to the user.
  • Are hard to use when multiple error conditions create nested levels of exception-handling blocks:
    let record = null;
    try {
       record = findRecordByName('RecordA');
    }
    catch (e) {
       console.log('ERROR: Cannot locate record by name');
    
       try {
          record = findRecordById('123');
       }
       catch (e) {
          console.log('ERROR: Record is nowhere to be found!');
       }
    }

After reading all these statements, you’re probably asking yourself, “Is throwing exceptions completely off the table?” We certainly don’t believe so. In practice, they can never be off the table because there are many factors outside your control that you may need to account for, like system or environmental errors or calls to third-party code.

We’re not recommending you don’t use exceptions at all, because they do serve a purpose—just use them for truly exceptional conditions. When you need to use exceptions or deal with errors, the functional approach is to allow functional data types to abstract them away from your main business logic; this prevents you from creating side effects or code that becomes hard to maintain.

7.3. Understanding the functional error-handling approach

The functional approach to error handling is quite simple. As we mentioned before, we won’t get too deep into any functional topics in this book, so we’ll provide a simplistic view of this approach that will serve to help you better understand the design of RxJS’s error-handling mechanism. The goal here is to reify, or make a first-class citizen, the idea of a wrapper around a function or body of code that has the potential of throwing an exception. If you think about it, that’s what you’ve been doing all along when you use a try/catch block. The function findRecordById() can throw an exception in the event that a database record is not found, as illustrated in figure 7.3.

Figure 7.3. A try/catch block creates an invisible section that protects any section of code.

The try block creates an invisible enclosure around the function call so that you can implement all your error-handling logic inside the catch block. In the functional world, you’ll reify this container with a data type called Try.

Note

The Try data type is a common pattern in FP that we introduce here merely as a theoretical construct. This will help you later, when we discuss how observables implement this pattern.

Figure 7.4 shows how this data type would work.

Figure 7.4. Use a data type called Try to make errors first-class citizens of your application. This can be used to wrap any value and then safely apply or map functions to it. If a function invocation is successful (no exceptions produced), a data type called Success is returned; otherwise, an object of Failure is returned.

You can use this type to apply or map a function to a certain value. This is equivalent to invoking the function with that parameter. With this extra plumbing, Try allows you to provide the necessary abstraction to return an object of type Success if a record object is found; otherwise, an object of type Failure, signaling that something unexpected occurred. Notice that this requires the input to be a function so that it can properly capture a thrown exception:

Try.of(() => findRecordById('123')); //-> Success(Record)
Try.of(() => findRecordById('456')); //-> Failure
Try.of(() => findRecordById('xxxxx'))
   .getOrElse(new Record(...));   //-> Default value

Now, just like any functional data type, suppose Try also had a map() operator, which you can use to perform any action on the resolved object, if one is found:

Try.of(() => findRecordById('123')).map(processRecord);

Using Try as the return type of your functions is quite handy, because not only do you protect the value it returns from a possible null access, but also you let your users know that this particular function might produce an invalid result—it’s self-documenting. This is why other languages such as Scala, Java, and Haskell one way or another provide native APIs for this data type.

For the purpose of our discussion, we show some of the pieces of Try in the next listing, as well as its derived types Success and Failure.

Listing 7.4. Internals of the Try functional data type
class Try {
  constructor(val) {                         1
    this._val = val;
  }

  static of(fn) {                            2
    try {
      return new Success(fn());
    } catch (error) {
      return new Failure(error);
    }
  }

  map(fn) {                                  3
    return Try.of(() => fn(this._val));
  }
}

class Success extends Try {                  4

   getOrElse(anotherVal) {
     return this._val;
   }

   getOrElseThrow() {
      return this._val;
   }
}

class Failure extends Try {                  5

  map(fn) {
    return this;
  }

  getOrElse(anotherVal) {
     return anotherVal;
  }

  getOrElseThrow() {
    if(this._val !== null) {
       throw this._val;
    }
  }
}

  • 1 Creates a new instance of this data type
  • 2 If Try yields a successful computation, wraps the result in a Success; otherwise, wraps the result in a Failure
  • 3 Map applies a function to a value with internal try/catch logic and returns an instance of Try to continue chaining more operations (this is analogous to Rx.Observable.map()).
  • 4 Success represents a successful computation, with a method to get the value.
  • 5 Failure represents a function that resulted in an exception being thrown. Any subsequent mapping operations are skipped.
Syntax

Listing 7.4 uses the class syntax in ES6 to model the Try data type. We use classes only because they’re syntactically shorter than using functions and object prototypes. As you probably know by now, classes are nothing more than syntactic sugar over JavaScript’s existing prototype-based inheritance. Whether you decide to implement this using function or class syntax is entirely up to you.

Listing 7.4 shows just a few of the key details of this functional data type. Try models two scenarios:

  • If an instance of Try<Record> represents a successful computation, it’s an instance of Success<Record> internally that’s used to continue the chain.
  • If, on the other hand, it represents a computation in which an error has occurred, it’s an instance of Failure<Error>, wrapping an Error object or an exception.

What you accomplish with this is a simple data type that allows you to pipeline, or chain operations on objects, catching exceptions along the way, without impacting your business logic and hiding the imperative try/catch structure. Here’s how you can use it. Suppose you execute a function processRecord() that works on a record fetched from a database. If the record is not found, processRecord() will throw an exception:

let record = Try.of(() => findRecordById('123')
   .map(processRecord)
   .getOrElse(new Record('123', 'RecordA'));

This code works by lifting a value into the Try context and then mapping a function to it. map() is where the try/catch logic lives, consolidated in one place. Arguably, this code is much more readable and pure compared to the following:

let record;
try {
   record = findRecordById('123');
   processRecord(record);
}

catch (e) {
   record = new Record('123', 'RecordA');
}

In the functional case, if the process operation were to fail, nothing in this logic would actually change because the error would be propagated internally via Failure instances, finally resulting in the getOrElse() function that creates and returns a default record object. This simple design pattern is really powerful, because it abstracts error handling completely from your business logic so that your functions worry only about writing code to solve your task at hand, while remaining side effect–free. You can see the workings of this in the diagram in figure 7.5.

Figure 7.5. Mapping a function to a Try returns the result wrapped in a Success type or an exception wrapped in a Failure.

Does this discussion about propagation of change and the mapping of functions ring a bell? That’s right! The Observable data type works exactly the same way, and now you’ll see how it implements its own exception-handling operators.

7.4. The RxJS way of dealing with failure

Just as observables abstract data flow and processing, they also abstract errors and exception handling. RxJS’s Observable type provides several strategies for you to manage the errors that could arise midstream. In this section, you’ll learn about these strategies:

  • Propagating errors to observers
  • Catching errors and reacting accordingly
  • Retrying a failed operation for a fixed number of times
  • Reacting to failed retries

7.4.1. Errors propagated downstream to observers

In chapter 2, we mentioned that at the end of the observable stream is a subscriber waiting to pounce on the next event to occur. This subscriber implements the Observer interface, consisting of three methods: next(), error(), and complete().

In general, errors don’t escape the observable pipeline. They are contained and guarded to prevent side effects from happening—much like Try, as shown in figure 7.6.

Figure 7.6. Errors that occur within an operator are not allowed to escape the context of the observable. Rather, errors can be handled within the pipeline (as you’ll see later in section 7.4); otherwise, the observer’s error() function is called.

Errors that occur at the beginning of the stream or in the middle are propagated down to any observers, finally resulting in a call to error(). Here’s a quick example to illustrate this concept.

Listing 7.5. Calling the error method on observers when an exception is thrown
const computeHalf = x => Math.floor(x / 2);
Rx.Observable.of(2,4,5,8,10)
   .map(num => {
      if(num % 2 !== 0) {
        throw new Error(`Unexpected odd number: ${num}`);       1
      }
      return num;
   })
   .map(computeHalf)
   .subscribe(
     (next) => console.log(val),
     (error) => console.log(`Caught: ${err}`),                  2
     () => console.log('All done!');

    );

  • 1 The business logic spits out an exception somewhere midstream.
  • 2 Without any exception handlers (discussed later in section 7.4), any errors are automatically propagated down to the observers.

Running this code prints the following:

1
2
"Caught: Error: Unexpected odd number: 5"

You can consider this approach similar in structure to an overarching try/catch block. The important aspect to note from this example is how the Observable data type acts like a Try by disallowing the exception to leak from the stream’s context. Because there’s no way to recover, the first exception that fires will result in the entire stream being cancelled. Think of parsing data from a network call; you’d obviously want to skip parsing the object if the network call was unsuccessful. The error is pushed down to any subscribers so that they can perform any side effects, such as showing an alert pop-up or a modal dialog. Most of the time, though, you’ll want to catch and recover from the error that occurred. To make understanding the different recovery strategies easier, we’ll continue using this simple numerical example from listing 7.5 as our theme.

7.4.2. Catching and reacting to errors

Most of the time, you’ll want to catch and recover from any errors so that your application is always responsive and resilient—one of the main requirements of being reactive is always being responsive.

Reactive Manifesto

One of the main principles of reactive systems is the notion of resiliency, which states that systems should stay responsive in the face of failure. Reacting to errors using RxJS operators is one way to work toward this goal.

The basic error-handling mechanism that RxJS provides is the catch() operator, used to intercept any error in the Observable and give you the option to handle by returning a new Observable or, again, by propagating it down to observers in case there’s a recoverability path, as shown in figure 7.7.

Figure 7.7. Exception caught in an operator upstream by using the catch() operator

Just like with regular try/catch usage, you want to place the catch() operator close to the segment of code that might fail. catch() allows you to insert a default value in place of the event that caused the error; any subsequent operators in the chain will never know that an exception occurred. Imagine if you experienced a login error to the server or had a problem accessing your local DB. The catch could be used to capture that error and inject a default or in-memory value into the stream without the downstream being any the wiser!

You can use marble diagrams to show error handling in a stream as well, just like with any other operator. Figure 7.8 shows an example of a stream that rejects odd numbers and returns evens instead.

Figure 7.8. Error handling using marble diagrams

Here’s the code for figure 7.8.

Listing 7.6. Recovering from an exception using catch()
Rx.Observable.of(2,4,5,8,10)
   .map(num => {
      if(num % 2 !== 0) {
        throw new Error(`Unexpected odd number: ${num}`);
      }
      return num;
   })
   .catch(err => Rx.Observable.of(6))                    1
   .map(n => n / 2)
   .subscribe(
     (next) => console.log(val),
     (error) => console.log(`Caught: ${err}`),           2
     () => console.log('All done!');
    );

  • 1 Catches or intercepts the error and returns an observable in its place
  • 2 In this case, because the exception is caught and handled, the error method on the observer is never executed.

Running this code now prints the following:

1
2
3
"All done!"

As you can see, the stream continues to be cancelled when the exception occurs, but now you’re at least able to recover. Some errors, however, might be intermittent and shouldn’t halt the stream. For instance, a server is unavailable for a short period of time because of a planned outage. In cases like this, you may want to retry your failed operations.

7.4.3. Retrying failed streams for a fixed number of times

The catch() operator is passed a function that takes an error argument (shown in listing 7.6) as well as the source observable that was caught, which you can return to tell the source observable to retry from the beginning. Let’s take a look:

Rx.Observable.of(2,4,5,8,10)
   .map(num => {
      if(num % 2 !== 0) {
        throw new Error(`Unexpected odd number: ${num}`);
      }
      return num;
   })
   .catch((err, source) => source)             1
   ...

  • 1 Returning the original observable, which will begin to emit the entire observable sequence, starting with the first value, 2

This operation can be dangerous when the exception is unavoidable or not transient because you’ve now entered an infinite loop; there’s no condition in the business logic that will change for the error to disappear. Figure 7.9 shows what’s occurring.

Figure 7.9. The catch() operator is provided an Observable sequence populated with any errors that occurred as well as the source observable, which you can use to retry the sequence from the beginning.

Another place looping can occur is when using Promises. A Promise can emit two types of errors: either an unexpected exception is thrown during the body of the computation, or the Promise becomes unfulfilled and gets rejected. Because Promises are not retriable artifacts, dereferencing the value of a Promise will always return its fulfilled value or error, as the case may be. The following code creates a big problem:

const requestQuote$ = symbol =>
     Rx.Observable.fromPromise(
       ajax(webservice.replace(/$symbol/, symbol)))
     .catch((err$, promise$)=> promise$)                      1
     .map(response => response.replace(/"/g, ''))
     .map(csv);

  • 1 Using the selector function to reiterate the execution of this Promise stream. Bad idea!

Just like in figure 7.9, if the server you’re trying to access is offline, the exception thrown would also create an infinite loop and exhaust the main thread, because you would be retrying the same exception (failed Promise) over and over again. You’ll see how to solve this problem in a bit.

RxJS provides more-intuitive ways of retrying via the retry() operator, which combines this notion of catching and reexecuting the source observable into one function. Here’s a simple example:

Rx.Observable.of(2,4,5,8,10)
   .map(num => {
      if(num % 2 !== 0) {
        throw new Error(`Unexpected odd number: ${num}`);
      }
      return num;
   })
   .retry(3)                                      1
   .subscribe(
     num => console.log(num),
     err => console.log(err.message)
   );

  • 1 Repeats this sequence three more times (a total of four) if there’s an error before giving up and letting the exception propagate down to the observer

Running this code will print a sequence of numbers 2 and 4 a total of four times before printing “Unexpected odd number: 5.” So unless you’re dealing with a transient failure that you know will resolve itself somehow, avoid catching and returning the same sequence or the equivalent retry operation with an empty argument. In order to ensure you don’t lock up the UI or cause infinite loops to occur, you should always use retry() with a fixed number. You could also elegantly combine the two approaches. You can reattempt the operation three more times and then catch the exception, to fall back to a default value:

Rx.Observable.of(2,4,5,8,10)
   .map(num => {
      if(num % 2 !== 0) {
        throw new Error(`Unexpected odd number: ${num}`);
      }
      return num;
   })
   .retry(3)
   .catch(err$ => Rx.Observable.of(6))         1
   .subscribe(
     num => console.log(num),
     err => console.log(err.message));

  • 1 Instead of propagating the error down, you can use this placeholder value.

Again the effect of this is that the sequence would be tried a total of four times before the catch block executes, emitting the default value 6 and then completing the sequence. Notice that using a default value with catch doesn’t simply replace the value in the sequence and allow it to continue. After an exception occurs, the Observable is terminated at that point.

Now that we’ve talked about catch/retry, you’re probably thinking it would be appropriate to embed retry into your stock ticker code, so that if the server were to fail due to a restart or a small outage, you could at least retry to fetch stock information:

const requestQuote$ = symbol =>
     Rx.Observable.fromPromise(
       ajax(webservice.replace(/$symbol/, symbol)))
     .retry(3)
     .map(response => response.replace(/"/g, ''))
     .map(csv);

But there’s a small caveat here. Recall from our previous discussions that Promises have no retry capability (you don’t get second chances with Promises). Unlike Promises, streams are retriable artifacts, so you can easily get around this limitation by wrapping the Promise observable into another stream that is retriable—again creating a higher-order observable. Effectively, what you want to do is apply the retry function to an outer observable that wraps the inner Promise. You can use mergeMap() to flatten it back into a single stream, so placing the retry at fetchDataInterval$ solves this problem:

const fetchDataInterval$ = symbol => twoSecond$
     .mergeMap(() => requestQuote$(symbol)                        1
          .distinctUntilChanged((previous, next) => {
              ...
          }))
     .retry(3);

  • 1 requestQuote$ invokes the Promise. This source observable is an outer observable that you can use to make the Promise observable retry three more times.

This code will cause the Promise internally to reinstantiate and retry three more times if it encounters an exception or a rejection, which is really nice. Keep in mind that it will become a single observable layer once mergeMap() projects requestQuote$(-symbol) onto the source. The fact that streams can reemit or replay events upon multiple subscriptions is important, but there’s a bit more you need to understand that’s happening behind the scenes. We’ll come back to this solution in the next chapter in the context of hot observables. Another way of implementing retries effectively is to add a backoff strategy, which introduces some wait time in between retry actions.

7.4.4. Reacting to failed retries

Using retries with backoff is an effective way to retry more times without overloading the server. Examples of a backoff strategy are constant, linear, exponential, and random (also known as jitter). The exponential and linear types are more commonly used, but in any case, the goal is to use progressively longer waits between retries for consecutive periods of time. RxJS allows you to accomplish this using the retryWhen() operator. retryWhen() takes a notifier observable argument (an internal Observable object that contains any errors that occurred during the execution of the stream, just like with catch()) and repeats the source observable that errors at the pace of when this notifier emits values. For instance, you can say “retry after 3 seconds,” as shown in figure 7.10.

Figure 7.10. Implementing retries with a constant wait of 3 seconds between retries

In other words, if the provided error observable emits a value, the retry action is executed. So you can use this observable to control when and how retries should take place; it’s common to use timer observables to accomplish this. Let’s go back to our numbers example to see this clearly:

Rx.Observable.of(2,4,5,8,10)
   .map(num => {
      if(num % 2 !== 0) {
        throw new Error(`Unexpected odd number: ${num}`);
      }
      return num;
   })
   .retryWhen(errors$ => errors$.delay(3000))                 1
   ...

  • 1 Using the delay operator to plug in a three-second delay between when each error value is emitted

This will retry the observable sequence from the start and every 3 seconds thereafter and repeat the numbers indefinitely or until the operation that threw the exception becomes successful:

1
2
// 3 seconds wait...
1
2
// 3 seconds wait...
...
// and so on

You can also use retryWhen() to implement a fixed number of retries by keeping track of the number of times the source observable has been retried. Remember, you can use scan() to emit values at every accumulated interval:

const maxRetries = 3;
Rx.Observable.of(2,4,5,8,10)
   .map(num => {
      if(num % 2 !== 0) {
        throw new Error(`Unexpected odd number: ${num}`);
      }
      return num;
   })
   .retryWhen(errors$ =>
        errors$.scan((errorCount, err) => {
            if(errorCount >= maxRetries) {
                throw err;
            }
            return errorCount + 1;
        }, 0)
    )
    ...

Running this code prints the same result as previously, with the difference that instead of running indefinitely, it will retry up to the maxRetries limit and then error, calling the error() method on the observers. A more effective retry strategy used in cases where remote requests are being made is a linear backoff, which alleviates the overall load on the server. This technique is readily implemented in most major modern websites; the first retry action occurs immediately, and subsequent actions occur after a certain lag time, which increases linearly, as shown in figure 7.11.

Figure 7.11. After each retry, the time between retries grows linearly. It starts with a two-second wait, and then invokes the next retry call after 4 seconds, and then after 6 seconds, and so on.

Before we get into the code that implements this, we’ll introduce a new operator called zip(). This operator merges the specified observable sequence into one by using a selector function (a function that you provide to instruct zip() how to format the events emitted) whenever all of the observable sequences have emitted values at a corresponding index. This operator is frequently used in FP to merge two corresponding arrays; for instance, zip() is implemented in Ramda.js:

const records = R.zip(
   ['RecordA', 'RecordB', 'RecordC'],            1
   ['123', '456', '789']
);
//=> [['RecordA', '123'],
      ['RecordB', '456'],
      ['RecordC', '789']
     ]

  • 1 zip combines both arrays into a multidimensional array, associating each value at the corresponding key.

This works with streams just as well, as shown in the marble diagram in figure 7.12.

Figure 7.12. Internal workings of zip with streams. Both stream events are combined at each index irrespective of the time either event occurs.

In some ways, zip() works like combineLatest(), except that the former matches the index of the corresponding events one-to-one, as shown in figure 7.12, whereas the latter just combines the latest values when any of the observables emits a value. Here’s a simple numerical example illustrating this difference:

const s1$ = Rx.Observable.of(1, 2, 3, 4, 5, 6, 7, 8, 9);
const s2$ = Rx.Observable.of('a', 'b', 'c', 'd', 'e');

Rx.Observable.zip(s1$, s2$).subscribe(console.log);
//-> [1, "a"]
     [2, "b"]
     [3, "c"]
     [4, "d"]
     [5, "e"]

   Rx.Observable.combineLatest(s1$, s2$).subscribe(console.log);
//-> [9, "a"]
     [9, "b"]
     [9, "c"]
     [9, "d"]
     [9, "e"]

As you can see, zip() sticks to the array definition, merges both events, and matches the corresponding indexes between the streams. In this case, s1$ continues to emit more values, but because s2$ doesn’t, zip() ignores them—both have to emit events. On the other hand, combineLatest() just merges the latest event of s1$ with whatever is the latest value emitted by s2$.

Now that you know how to use zip(), you’ll implement a linear backoff that retries the first time after 1 second, the next time after 2 seconds, and so on, as shown in the following listing.

Listing 7.7. Implementing a linear backoff retry for our stock ticker stream
const maxRetries = 3;
Rx.Observable.of(2,4,5,8,10)
   .map(num => {
      if(num % 2 !== 0) {
        throw new Error(`Unexpected odd number: ${num}`);
      }
      return num;
   })
   .retryWhen(errors$ =>
       Rx.Observable.range(0, maxRetries)                                  1
         .zip(errors$, val => val)                                         2
         .mergeMap(i =>                                                    3
           Rx.Observable.timer(i * 1000)
             .do(() => console.log(`Retrying after ${i} second(s)...`)))
    )
   .subscribe(console.log);

  • 1 Returns an observable that will emit a maxRetries number of events. So, if maxRetries is 3, it will emit events 3 - 0 = 3 times.
  • 2 zip is used to combine one-to-one values from the source observable (range) with the error observable. You pass a selector function, known in FP as the identity function, that returns the value of the first argument passed to it.
  • 3 Merges map with a timer observable based on the number of attempts. This is what allows you to emulate the backoff mechanism.

With this retry strategy, the stream will attempt to run for a fixed number of times (given by maxRetries), with a linearly incrementing time of 1 second between retries, before finally giving up. Because you’re not throwing the exception, the stream halts execution on every retry, generating the following output:

2
4
Retrying after 0 second(s)...
2
4
Retrying after 1 second(s)...
2
4
Retrying after 2 second(s)...
2
4

Although the following listing is pretty advanced, you can also bundle throwing the exception if your goal is to signal the unrecoverable condition on your last retry action, as shown in the next listing. You can do this by implementing some conditional logic within the projected observable returned from mergeMap(). In this case, if you’ve reached the last retry, you’ll project an observable with an exception; otherwise, you’ll project the timer, just as before.

Listing 7.8. Fixed count, linear backoff, and throwing exception if error persists
const maxRetries = 3;

Rx.Observable.of(2,4,5,8,10)
   .map(num => {
      if(num % 2 !== 0) {
        throw new Error(`Unexpected odd number: ${num}`);
      }
      return num;
   })
   .retryWhen(errors$ =>
       Rx.Observable.range(0, maxRetries + 1)
         .zip(errors$, (i, err) => ({'i': i, 'err': err}))                1
         .mergeMap( ({i, err}) => {                                       2
             if(i === maxRetries) {
               return Rx.Observable.throw(err);                           3
             }
             return Rx.Observable.timer(i * 1000)
               .do(() =>
                    console.log(`Retrying after ${i} second(s)...`));
        })
    )
   .subscribe(
     console.log,
     error => console.log(error.message)
   );

  • 1 Uses a selector function that combines events from both zipped streams into a single object
  • 2 Destructures the parameter to extract the attempt count and the last error object that occurred
  • 3 Because this code is inside a mergeMap() operator, it expects you to return an observable object. You can use the throw() operator to create an observable that safely wraps an exception object (throwing the error would also work, but this approach is more elegant).

Running this code prints the following:

2
4
Retrying after 0 second(s)...
2
4
Retrying after 1 second(s)...
2
4
Retrying after 2 second(s)...
2
4
Unexpected odd number: 5

Using if/else here is not the most functional way of writing code, but it’s acceptable in practice given that the scope is internal to the pipeline. But if you’re looking for a purer approach that uses more lambda expressions and keeps the FP spirit high, RxJS provides Rx.Observable.if(condition, then$, else$), which evaluates a given condition function and either returns the then$ observable or the else$, respectively. You’ll use this to refactor just that segment of code:

...
.retryWhen(errors$ =>
     Rx.Observable.range(1, maxRetries)
       .zip(errors$, (i, err) => ({'i': i, 'err': err}))
       .mergeMap(({i, err}) =>
          Rx.Observable.if(() => i <= maxRetries - 1,                      1
             Rx.Observable.timer(i * 1000)                                 2
               .do(() => console.log(`Retrying after ${i} second(s)...`)),
             Rx.Observable.throw(err))                                     3
         )
    )
    ...

  • 1 Uses the if() operator (also called the functional combinator) to select between two streams, depending on the evaluation of the condition function
  • 2 If the condition returns true, project this observable; otherwise, project the observable created in the else block.
  • 3 Otherwise, use throw() to propagate an exception downstream to subscribers.
Best Practice

The zip() operator can be very useful in cases when you need to spread out a stream synchronously over time, just as you did in the previous code samples. It’s not recommended when coordinating asynchronous streams that emit at different times—combineLatest() is the operator of choice in these cases. The reason for this is that zip() pairs the events one-to-one, so it’s effective when the asynchronous streams it’s operating over emit values with similar time intervals, which you can’t control all of the time. So if you’re pairing a mouse-move observable that emits rapidly, for example, with an AJAX call that emits every few seconds, you can easily cause its internal, unbounded buffer to overflow and your application to crash.

Finally, in order to be at feature parity with the imperative world of try/catch/finally, RxJS provides the finally() operator. Just like the do() operator, this operator mirrors the source observable and invokes a specified void function after the source observable terminates by invoking the observer’s complete() or error() methods. So the expectation is that finally() could perform some kind of side effect, if need be, such as cleanup actions. This is perfect for our stock ticker widget, which shows a counter of the last time the stock quotes were updated. In this case, you can add another subscription to the twoSecond$ observable for updating the last updated date:

const lastUpdated = document.querySelector('#last-updated');

const updateSubscription = twoSecond$.subscribe(() => {
   lastUpdated.innerHTML = new Date().toLocaleTimeString();
});

Remember that you can have a list of subscribers for the same event, so separating the logic for updating different portions of the site keeps the code under the observer nice and simple. If you had, say, three components that needed to change as a result of a stream emitting events, you could attach three observers and update the different portions of the site accordingly. So you have two subscribers: the one we just showed you and another used to fetch the stock data, to which you’ll add error-handling code. If the web service call made in the fetchDataInterval$ observable were to fail (returning a 500 HTTP response code, for example), the catch() operator would react and return a default value for that stock quote section, as shown in the next listing.

Listing 7.9. Stock ticker with error handling
const requestQuote$ = symbol =>
      Rx.Observable.fromPromise(
        ajax(webservice.replace(/$symbol/, symbol)))
      .map(response => response.replace(/"/g, ''))
      .map(csv)
      .catch(() =>                                                     1
         Rx.Observable.of([new Error('Check again later...'), 0]))     1
      .finally(() => {
         updateSubscription.unsubscribe();                             2
      });

  • 1 Adds catch() to handle the exception potentially thrown from requestQuote$
  • 2 In the event an error occurs, cancels the twoSecond$ interval observable through its subscription object.

The other code you added was the finally() operator, which fires when a stream completes or when it errors. Because you’re running a two-second interval, you don’t expect a completion, but in the event of an error, you should also clean up the interval and cancel the subscription, so that the updated time shown reflects the last quoted update received before the error occurred. You can see this process in the graph in figure 7.13.

Figure 7.13. Using finally to clean up and cancel any outstanding streams

And now you need to make a small adjustment to the tick$ observable, so that it knows how to handle an error. You can use our Try functional data type to handle this, and if a failure does occur, delegate the exception to the error callback of the observer. Here’s that code once more with the new addition:

ticks$
 .map(([symbol, price]) => [Try.of(symbol).getOrElseThrow(), price])      1
 .subscribe(
   ([symbol, price]) => {

      let id = 'row-' + symbol.toLowerCase();
      let row = document.querySelector(`#${id}`);
      if(!row) {
         addRow(id, symbol, price);
      }
      else {
         updateRow(row, symbol, price);
      }
   },
   error => console.log(error.message));

  • 1 Before the data is handed down to the subscriber, Try can inspect it and decide if the data flowing in is an exception that needs to be thrown.

If this service were to fail (or your internet disconnect), you’d see “Check again later...” printed in the console.

As you can see, RxJS provides a comprehensive set of error-handling operations that allows you to easily retry an entire observable sequence when an error is detected in the pipeline. But we made a huge assumption about the nature of the observable sequences. That is, the observables that we created and retried in this chapter belong to a category known as cold observables. Cold observables are passive (dormant) and emit values only when subscribed to: an array of numbers, a Promise, intervals, and the like. In other words, retrying a cold observable basically resubscribes to it and requests that it emit its values again. In the next chapter, you’ll learn to create and handle the different types of observables: cold and hot.

7.5. Summary

  • Imperative error handling has many drawbacks that make it incompatible with FP.
  • Value containers, like Try, provide a fluent, expressive mechanism for transforming values immutably.
  • The Try wrapper is a functional data type used to consolidate and abstract exception handling so that you can sequentially map functions to values.
  • RxJS implements many useful and powerful operators that allow you to catch and retry failed operations in a way that doesn’t break the flow of the stream and the declarative nature of an RxJS stream declaration.
  • RxJS provides operators such as catch(), retry(), retryWhen(), and finally() that you can combine to create sophisticated error-handling schemes.
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset