This chapter covers
Until now, we’ve explored only happy-path examples involving RxJS for tackling many different use cases. We suspect that at some point you’ve probably asked yourself, “What would happen if a remote HTTP call failed while fetching data for my stock quote widget?” Observers see the outcome of combining and transforming sequences of streams that you use to map your business logic to. But if an exception occurs midstream, what will the observer see at that point? These are some valid and important questions, but it was important that you first understand and learn to think reactively with ideal scenarios. Now, we’re going to sprinkle a dose of the real world onto your code. The brutal reality is that software will likely fail at some point during its execution.
Many issues can arise in software where data inadvertently becomes null or undefined, exceptions are thrown, network connectivity is lost, and so on. Your code needs to account for the potential occurrence of these issues, which unavoidably creates complexity. In other words, you can’t escape errors, but you can learn how to deal with them. One strategy that developers often use is to scatter error-handling code around every function call. We do this to make our code more robust and fault tolerant, but it has the detrimental effect of making it even more complex and harder to read.
In this chapter, you’ll learn that the key to elegant error handling in RxJS is done in part by the effective use of observables and by following proper FP principles, as you’ve seen all along. Given a properly constructed observable stream, the next step is to learn about the different RxJS observable operators that you can plug in to respond to any adversity. Before we get started, it’s important for you to understand that you need to put aside the imperative error-handling techniques you’re accustomed to, like try/catch, in favor of a functional approach as implemented in RxJS.
JavaScript errors can occur in many situations, especially when an application fails to communicate with a server when invoking an AJAX call. Also, third-party libraries that you load into your project can have functions that throw exceptions to signal special error conditions. Hence, you always need to be prepared for the worst and design with failure in mind, instead of letting it become an afterthought and a source of regret.
In the imperative world, exceptions are typically handled with the common try/catch idiom, which occurs frequently with synchronous code. Conversely, in the asynchronous world—remote HTTP calls and event emitters—you’re required to interface with functions that delegate failures to callback functions. And recently with JavaScript ES6, many libraries have switched to using Promises to wrap their asynchronous computations. Let’s examine each of these cases individually.
JavaScript’s default exception-handling mechanism is geared toward throwing and catching exceptions through the popular try/catch block, which is also pervasive in most modern programming languages. Here’s a sample:
try { someDangerousFunction(); } catch (error) { // statements to handle any exceptions console.log(error.message); }
As you know, the purpose of this structure is to surround a piece of code that you deem to be unsafe. Upon throwing an exception, the JavaScript runtime abruptly halts the program’s execution and creates a stack trace of all the function calls leading up to the problematic instruction. Specific details about the error, such as the message, line number, and filename, are populated into an object of type Error and passed into the catch block.
RxJS 5 has a number of improvements over its previous version. One of them involves the simplification of the internal mechanisms of RxJS, resulting in a stack trace that’s much easier to parse.
The catch block becomes a safe haven so that you can potentially recover your program. But with your knowledge about observables, you can see how this imperative style of dealing with exceptions is structurally very different from what you’ve done so far. So, adding try/catch to your RxJS code to provide error-handling logic to a stream would look like the following:
try { const data$ = Rx.Observable.fromPromise(ajax('/data')) .subscribe(console.log); } catch(error) { console.log(`Error processing stream: ${error.message}`); }
Now, imagine having to merge multiple streams, each with its own type of failure, and you can see how this pattern couldn’t possibly be effective if you need to wrap each stream with its own try/catch. With asynchronous functions, the common JavaScript pattern is to provide the error callback alongside the success callback.
As is common with asynchronous functions in many JavaScript libraries, there’s typically a function that responds to the success case and one that handles errors. This is necessary because asynchronous functions are unpredictable in terms of if and when they return, and if errors occur. Until now, we purposely avoided talking about error cases when using an asynchronous function like ajax(). You’ve been using this function all along, as a kind of black box that always ran correctly. You could use it in two different ways: with callbacks or with Promises. Let’s peek under the hood of this function using callbacks.
const ajax = function (url, success, error) { let req = new XMLHttpRequest(); 1 req.responseType = 'json'; req.open('GET', url); req.onload = function() { if(req.status == 200) { let data = JSON.parse(req.responseText); success(data); 2 } else { req.onerror(); } } req.onerror = function () { if(error) { error(new Error('IO Error')); 3 } }; req.send(); };
Remember that the code for this chapter can be found in the RxJSinAction GitHub repository, https://github.com/RxJSInAction/rxjs-in-action.
Using this function, code that would require multiple nested sequences of HTTP calls, such as when mashing up different sources of data, would look like the next listing.
ajax('/data', data => { for (let item of data) { ajax(`/data/${item.getId()}/info`, dataInfo => { ajax(`/data/images/${dataInfo.img}`, showImage, error => { 1 console.log(`Error image: ${error.message}`); }); }, error => { 2 console.log(`Error each data item: ${error.message}`); }); } }, error => { 3 console.log(`Error fetching data: ${error.message}`); } });
Looking at this code from just a structural point of view, you can picture it as nested code blocks, such as the ones shown in figure 7.1.
Indeed, although our code is more fault tolerant, all we’ve done here is exacerbate the problem of having to parse this nested “pyramid of doom,” which we spoke about in chapter 1. Because of this type of situation, the JavaScript ES6 specification introduced Promises, which elegantly streamline the invocation of a sequence of asynchronous functions.
The Promise.then() function acts as the mapping function (similar to Rx.Observable.map()) used to project (or map) another Promise to a source Promise. This is the reason why we decided to “promisify” ajax(), and it’s what we’ve been using for most of the examples as a form much superior to its callback counterpart. Here’s the code for that.
const ajax = function (url) { return new Promise(function(resolve, reject) { 1 let req = new XMLHttpRequest(); req.responseType = 'json'; req.open('GET', url); req.onload = function() { if(req.status == 200) { let data = JSON.parse(req.responseText); resolve(data); 2 } else { reject(new Error(req.statusText)); 3 } }; req.onerror = function () { reject(new Error('IO Error')); 3 }; req.send(); }); };
Much as you can with observables, you can chain multiple asynchronous calls by mapping new Promises to a source Promise. Then, you can use the Promise.catch() operator to implement an error-handling strategy that answers to any of the rejected Promises or ones that throw exceptions, as such:
ajax('/data') .then(...) .catch(error => console.log(`Error fetching data: ${error.message}`))
Because catch() itself returns a Promise, you can implement specific errors by inserting multiple asynchronous calls to catch() in series, like this:
ajax('/data') .then(item => ajax(`/data/${item.getId()}/info`)) .catch(error => console.log(`Error fetching data: ${error.message}`)) .then(dataInfo => ajax(`/data/images/${dataInfo.img}`)) .catch(error => console.log(`Error each data item: ${error.message}`)) .then(showImg); .catch(error => console.log(`Error image: ${error.message}`))
Arguably, in comparison to figure 7.1, the statement in figure 7.2 resembles a much easier structure to parse.
If the first ajax() fails, the first catch() operator runs before jumping into the next Promise in the chain. Each catch can be thought of as a recovery block for the previous Promise; the Promise allows you to resume processing in some known state.
The previous code example introduces a small bug that we ignored to prevent cluttering up the code. Because catches are also part of the continuation, the handler method can return either a value or another Promise; if no value is returned, then an undefined value will be passed to the next continuation block.
Just like with a synchronous try/catch, you can either continue by recovering from the error, in this case by returning a non-error value, or you can rethrow the error. In the catch block, that’s done by either returning a Promise.reject() or throwing within the callback method. But because you’re basically just transferring control from one Promise to the next, it’s more typical to just implement a single, global catch() operator (this is essentially equivalent to placing an overarching try/catch block over your entire function body). In this case, when any Promise fails, the catch() operator is run and the entire body of code is exited:
ajax('/data') .then(item => ajax(`/data/${item.getId()}/info`)) .then(dataInfo => ajax(`/data/images/${dataInfo.img}`)) .then(showImg) .catch(error => console.log(error.message));
Certainly, Promises get you closer to where you want to be. Unfortunately, all these approaches limit your ability to make your code responsive and reactive; in other words, you can’t easily return a default value in case a request failed or perhaps retry a rejected Promise. You can get around passing default values down the chain by introducing side effects in your code. And you can implement retries with the help of third-party libraries, such as Q.js (https://github.com/kriskowal/q). But more importantly, recall from our earlier discussions that Promises model single asynchronous values, not a deluge of them, which are the type of problems you solve when combining functional and reactive programming—and to make matters worse, Promises can swallow exceptions if no error handler is provided. (This is true only for older browser versions. Newer browsers and NodeJS bubble up errors thrown from unhandled promises.) Let’s examine in more detail the reasons why these imperative error-handling mechanisms are incompatible with a reactive application.
The structured mechanism of throwing and catching exceptions in imperative Java-Script code has many drawbacks when used in a functional or reactive style. In general, functions that throw exceptions
try { let record = findRecordById('123'); ... potentially many lines of code in between } catch (e) { console.log('ERROR: Record not found!'); // Handle error here }
let record = null; try { record = findRecordByName('RecordA'); } catch (e) { console.log('ERROR: Cannot locate record by name'); try { record = findRecordById('123'); } catch (e) { console.log('ERROR: Record is nowhere to be found!'); } }
After reading all these statements, you’re probably asking yourself, “Is throwing exceptions completely off the table?” We certainly don’t believe so. In practice, they can never be off the table because there are many factors outside your control that you may need to account for, like system or environmental errors or calls to third-party code.
We’re not recommending you don’t use exceptions at all, because they do serve a purpose—just use them for truly exceptional conditions. When you need to use exceptions or deal with errors, the functional approach is to allow functional data types to abstract them away from your main business logic; this prevents you from creating side effects or code that becomes hard to maintain.
The functional approach to error handling is quite simple. As we mentioned before, we won’t get too deep into any functional topics in this book, so we’ll provide a simplistic view of this approach that will serve to help you better understand the design of RxJS’s error-handling mechanism. The goal here is to reify, or make a first-class citizen, the idea of a wrapper around a function or body of code that has the potential of throwing an exception. If you think about it, that’s what you’ve been doing all along when you use a try/catch block. The function findRecordById() can throw an exception in the event that a database record is not found, as illustrated in figure 7.3.
The try block creates an invisible enclosure around the function call so that you can implement all your error-handling logic inside the catch block. In the functional world, you’ll reify this container with a data type called Try.
The Try data type is a common pattern in FP that we introduce here merely as a theoretical construct. This will help you later, when we discuss how observables implement this pattern.
Figure 7.4 shows how this data type would work.
You can use this type to apply or map a function to a certain value. This is equivalent to invoking the function with that parameter. With this extra plumbing, Try allows you to provide the necessary abstraction to return an object of type Success if a record object is found; otherwise, an object of type Failure, signaling that something unexpected occurred. Notice that this requires the input to be a function so that it can properly capture a thrown exception:
Try.of(() => findRecordById('123')); //-> Success(Record) Try.of(() => findRecordById('456')); //-> Failure Try.of(() => findRecordById('xxxxx')) .getOrElse(new Record(...)); //-> Default value
Now, just like any functional data type, suppose Try also had a map() operator, which you can use to perform any action on the resolved object, if one is found:
Try.of(() => findRecordById('123')).map(processRecord);
Using Try as the return type of your functions is quite handy, because not only do you protect the value it returns from a possible null access, but also you let your users know that this particular function might produce an invalid result—it’s self-documenting. This is why other languages such as Scala, Java, and Haskell one way or another provide native APIs for this data type.
For the purpose of our discussion, we show some of the pieces of Try in the next listing, as well as its derived types Success and Failure.
class Try { constructor(val) { 1 this._val = val; } static of(fn) { 2 try { return new Success(fn()); } catch (error) { return new Failure(error); } } map(fn) { 3 return Try.of(() => fn(this._val)); } } class Success extends Try { 4 getOrElse(anotherVal) { return this._val; } getOrElseThrow() { return this._val; } } class Failure extends Try { 5 map(fn) { return this; } getOrElse(anotherVal) { return anotherVal; } getOrElseThrow() { if(this._val !== null) { throw this._val; } } }
Listing 7.4 uses the class syntax in ES6 to model the Try data type. We use classes only because they’re syntactically shorter than using functions and object prototypes. As you probably know by now, classes are nothing more than syntactic sugar over JavaScript’s existing prototype-based inheritance. Whether you decide to implement this using function or class syntax is entirely up to you.
Listing 7.4 shows just a few of the key details of this functional data type. Try models two scenarios:
What you accomplish with this is a simple data type that allows you to pipeline, or chain operations on objects, catching exceptions along the way, without impacting your business logic and hiding the imperative try/catch structure. Here’s how you can use it. Suppose you execute a function processRecord() that works on a record fetched from a database. If the record is not found, processRecord() will throw an exception:
let record = Try.of(() => findRecordById('123') .map(processRecord) .getOrElse(new Record('123', 'RecordA'));
This code works by lifting a value into the Try context and then mapping a function to it. map() is where the try/catch logic lives, consolidated in one place. Arguably, this code is much more readable and pure compared to the following:
let record; try { record = findRecordById('123'); processRecord(record); } catch (e) { record = new Record('123', 'RecordA'); }
In the functional case, if the process operation were to fail, nothing in this logic would actually change because the error would be propagated internally via Failure instances, finally resulting in the getOrElse() function that creates and returns a default record object. This simple design pattern is really powerful, because it abstracts error handling completely from your business logic so that your functions worry only about writing code to solve your task at hand, while remaining side effect–free. You can see the workings of this in the diagram in figure 7.5.
Does this discussion about propagation of change and the mapping of functions ring a bell? That’s right! The Observable data type works exactly the same way, and now you’ll see how it implements its own exception-handling operators.
Just as observables abstract data flow and processing, they also abstract errors and exception handling. RxJS’s Observable type provides several strategies for you to manage the errors that could arise midstream. In this section, you’ll learn about these strategies:
In chapter 2, we mentioned that at the end of the observable stream is a subscriber waiting to pounce on the next event to occur. This subscriber implements the Observer interface, consisting of three methods: next(), error(), and complete().
In general, errors don’t escape the observable pipeline. They are contained and guarded to prevent side effects from happening—much like Try, as shown in figure 7.6.
Errors that occur at the beginning of the stream or in the middle are propagated down to any observers, finally resulting in a call to error(). Here’s a quick example to illustrate this concept.
const computeHalf = x => Math.floor(x / 2); Rx.Observable.of(2,4,5,8,10) .map(num => { if(num % 2 !== 0) { throw new Error(`Unexpected odd number: ${num}`); 1 } return num; }) .map(computeHalf) .subscribe( (next) => console.log(val), (error) => console.log(`Caught: ${err}`), 2 () => console.log('All done!'); );
Running this code prints the following:
1 2 "Caught: Error: Unexpected odd number: 5"
You can consider this approach similar in structure to an overarching try/catch block. The important aspect to note from this example is how the Observable data type acts like a Try by disallowing the exception to leak from the stream’s context. Because there’s no way to recover, the first exception that fires will result in the entire stream being cancelled. Think of parsing data from a network call; you’d obviously want to skip parsing the object if the network call was unsuccessful. The error is pushed down to any subscribers so that they can perform any side effects, such as showing an alert pop-up or a modal dialog. Most of the time, though, you’ll want to catch and recover from the error that occurred. To make understanding the different recovery strategies easier, we’ll continue using this simple numerical example from listing 7.5 as our theme.
Most of the time, you’ll want to catch and recover from any errors so that your application is always responsive and resilient—one of the main requirements of being reactive is always being responsive.
One of the main principles of reactive systems is the notion of resiliency, which states that systems should stay responsive in the face of failure. Reacting to errors using RxJS operators is one way to work toward this goal.
The basic error-handling mechanism that RxJS provides is the catch() operator, used to intercept any error in the Observable and give you the option to handle by returning a new Observable or, again, by propagating it down to observers in case there’s a recoverability path, as shown in figure 7.7.
Just like with regular try/catch usage, you want to place the catch() operator close to the segment of code that might fail. catch() allows you to insert a default value in place of the event that caused the error; any subsequent operators in the chain will never know that an exception occurred. Imagine if you experienced a login error to the server or had a problem accessing your local DB. The catch could be used to capture that error and inject a default or in-memory value into the stream without the downstream being any the wiser!
You can use marble diagrams to show error handling in a stream as well, just like with any other operator. Figure 7.8 shows an example of a stream that rejects odd numbers and returns evens instead.
Here’s the code for figure 7.8.
Rx.Observable.of(2,4,5,8,10) .map(num => { if(num % 2 !== 0) { throw new Error(`Unexpected odd number: ${num}`); } return num; }) .catch(err => Rx.Observable.of(6)) 1 .map(n => n / 2) .subscribe( (next) => console.log(val), (error) => console.log(`Caught: ${err}`), 2 () => console.log('All done!'); );
Running this code now prints the following:
1 2 3 "All done!"
As you can see, the stream continues to be cancelled when the exception occurs, but now you’re at least able to recover. Some errors, however, might be intermittent and shouldn’t halt the stream. For instance, a server is unavailable for a short period of time because of a planned outage. In cases like this, you may want to retry your failed operations.
The catch() operator is passed a function that takes an error argument (shown in listing 7.6) as well as the source observable that was caught, which you can return to tell the source observable to retry from the beginning. Let’s take a look:
Rx.Observable.of(2,4,5,8,10) .map(num => { if(num % 2 !== 0) { throw new Error(`Unexpected odd number: ${num}`); } return num; }) .catch((err, source) => source) 1 ...
This operation can be dangerous when the exception is unavoidable or not transient because you’ve now entered an infinite loop; there’s no condition in the business logic that will change for the error to disappear. Figure 7.9 shows what’s occurring.
Another place looping can occur is when using Promises. A Promise can emit two types of errors: either an unexpected exception is thrown during the body of the computation, or the Promise becomes unfulfilled and gets rejected. Because Promises are not retriable artifacts, dereferencing the value of a Promise will always return its fulfilled value or error, as the case may be. The following code creates a big problem:
const requestQuote$ = symbol => Rx.Observable.fromPromise( ajax(webservice.replace(/$symbol/, symbol))) .catch((err$, promise$)=> promise$) 1 .map(response => response.replace(/"/g, '')) .map(csv);
Just like in figure 7.9, if the server you’re trying to access is offline, the exception thrown would also create an infinite loop and exhaust the main thread, because you would be retrying the same exception (failed Promise) over and over again. You’ll see how to solve this problem in a bit.
RxJS provides more-intuitive ways of retrying via the retry() operator, which combines this notion of catching and reexecuting the source observable into one function. Here’s a simple example:
Rx.Observable.of(2,4,5,8,10) .map(num => { if(num % 2 !== 0) { throw new Error(`Unexpected odd number: ${num}`); } return num; }) .retry(3) 1 .subscribe( num => console.log(num), err => console.log(err.message) );
Running this code will print a sequence of numbers 2 and 4 a total of four times before printing “Unexpected odd number: 5.” So unless you’re dealing with a transient failure that you know will resolve itself somehow, avoid catching and returning the same sequence or the equivalent retry operation with an empty argument. In order to ensure you don’t lock up the UI or cause infinite loops to occur, you should always use retry() with a fixed number. You could also elegantly combine the two approaches. You can reattempt the operation three more times and then catch the exception, to fall back to a default value:
Rx.Observable.of(2,4,5,8,10) .map(num => { if(num % 2 !== 0) { throw new Error(`Unexpected odd number: ${num}`); } return num; }) .retry(3) .catch(err$ => Rx.Observable.of(6)) 1 .subscribe( num => console.log(num), err => console.log(err.message));
Again the effect of this is that the sequence would be tried a total of four times before the catch block executes, emitting the default value 6 and then completing the sequence. Notice that using a default value with catch doesn’t simply replace the value in the sequence and allow it to continue. After an exception occurs, the Observable is terminated at that point.
Now that we’ve talked about catch/retry, you’re probably thinking it would be appropriate to embed retry into your stock ticker code, so that if the server were to fail due to a restart or a small outage, you could at least retry to fetch stock information:
const requestQuote$ = symbol => Rx.Observable.fromPromise( ajax(webservice.replace(/$symbol/, symbol))) .retry(3) .map(response => response.replace(/"/g, '')) .map(csv);
But there’s a small caveat here. Recall from our previous discussions that Promises have no retry capability (you don’t get second chances with Promises). Unlike Promises, streams are retriable artifacts, so you can easily get around this limitation by wrapping the Promise observable into another stream that is retriable—again creating a higher-order observable. Effectively, what you want to do is apply the retry function to an outer observable that wraps the inner Promise. You can use mergeMap() to flatten it back into a single stream, so placing the retry at fetchDataInterval$ solves this problem:
const fetchDataInterval$ = symbol => twoSecond$ .mergeMap(() => requestQuote$(symbol) 1 .distinctUntilChanged((previous, next) => { ... })) .retry(3);
This code will cause the Promise internally to reinstantiate and retry three more times if it encounters an exception or a rejection, which is really nice. Keep in mind that it will become a single observable layer once mergeMap() projects requestQuote$(-symbol) onto the source. The fact that streams can reemit or replay events upon multiple subscriptions is important, but there’s a bit more you need to understand that’s happening behind the scenes. We’ll come back to this solution in the next chapter in the context of hot observables. Another way of implementing retries effectively is to add a backoff strategy, which introduces some wait time in between retry actions.
Using retries with backoff is an effective way to retry more times without overloading the server. Examples of a backoff strategy are constant, linear, exponential, and random (also known as jitter). The exponential and linear types are more commonly used, but in any case, the goal is to use progressively longer waits between retries for consecutive periods of time. RxJS allows you to accomplish this using the retryWhen() operator. retryWhen() takes a notifier observable argument (an internal Observable object that contains any errors that occurred during the execution of the stream, just like with catch()) and repeats the source observable that errors at the pace of when this notifier emits values. For instance, you can say “retry after 3 seconds,” as shown in figure 7.10.
In other words, if the provided error observable emits a value, the retry action is executed. So you can use this observable to control when and how retries should take place; it’s common to use timer observables to accomplish this. Let’s go back to our numbers example to see this clearly:
Rx.Observable.of(2,4,5,8,10) .map(num => { if(num % 2 !== 0) { throw new Error(`Unexpected odd number: ${num}`); } return num; }) .retryWhen(errors$ => errors$.delay(3000)) 1 ...
This will retry the observable sequence from the start and every 3 seconds thereafter and repeat the numbers indefinitely or until the operation that threw the exception becomes successful:
1 2 // 3 seconds wait... 1 2 // 3 seconds wait... ... // and so on
You can also use retryWhen() to implement a fixed number of retries by keeping track of the number of times the source observable has been retried. Remember, you can use scan() to emit values at every accumulated interval:
const maxRetries = 3; Rx.Observable.of(2,4,5,8,10) .map(num => { if(num % 2 !== 0) { throw new Error(`Unexpected odd number: ${num}`); } return num; }) .retryWhen(errors$ => errors$.scan((errorCount, err) => { if(errorCount >= maxRetries) { throw err; } return errorCount + 1; }, 0) ) ...
Running this code prints the same result as previously, with the difference that instead of running indefinitely, it will retry up to the maxRetries limit and then error, calling the error() method on the observers. A more effective retry strategy used in cases where remote requests are being made is a linear backoff, which alleviates the overall load on the server. This technique is readily implemented in most major modern websites; the first retry action occurs immediately, and subsequent actions occur after a certain lag time, which increases linearly, as shown in figure 7.11.
Before we get into the code that implements this, we’ll introduce a new operator called zip(). This operator merges the specified observable sequence into one by using a selector function (a function that you provide to instruct zip() how to format the events emitted) whenever all of the observable sequences have emitted values at a corresponding index. This operator is frequently used in FP to merge two corresponding arrays; for instance, zip() is implemented in Ramda.js:
const records = R.zip( ['RecordA', 'RecordB', 'RecordC'], 1 ['123', '456', '789'] ); //=> [['RecordA', '123'], ['RecordB', '456'], ['RecordC', '789'] ]
This works with streams just as well, as shown in the marble diagram in figure 7.12.
In some ways, zip() works like combineLatest(), except that the former matches the index of the corresponding events one-to-one, as shown in figure 7.12, whereas the latter just combines the latest values when any of the observables emits a value. Here’s a simple numerical example illustrating this difference:
const s1$ = Rx.Observable.of(1, 2, 3, 4, 5, 6, 7, 8, 9); const s2$ = Rx.Observable.of('a', 'b', 'c', 'd', 'e'); Rx.Observable.zip(s1$, s2$).subscribe(console.log); //-> [1, "a"] [2, "b"] [3, "c"] [4, "d"] [5, "e"] Rx.Observable.combineLatest(s1$, s2$).subscribe(console.log); //-> [9, "a"] [9, "b"] [9, "c"] [9, "d"] [9, "e"]
As you can see, zip() sticks to the array definition, merges both events, and matches the corresponding indexes between the streams. In this case, s1$ continues to emit more values, but because s2$ doesn’t, zip() ignores them—both have to emit events. On the other hand, combineLatest() just merges the latest event of s1$ with whatever is the latest value emitted by s2$.
Now that you know how to use zip(), you’ll implement a linear backoff that retries the first time after 1 second, the next time after 2 seconds, and so on, as shown in the following listing.
const maxRetries = 3; Rx.Observable.of(2,4,5,8,10) .map(num => { if(num % 2 !== 0) { throw new Error(`Unexpected odd number: ${num}`); } return num; }) .retryWhen(errors$ => Rx.Observable.range(0, maxRetries) 1 .zip(errors$, val => val) 2 .mergeMap(i => 3 Rx.Observable.timer(i * 1000) .do(() => console.log(`Retrying after ${i} second(s)...`))) ) .subscribe(console.log);
With this retry strategy, the stream will attempt to run for a fixed number of times (given by maxRetries), with a linearly incrementing time of 1 second between retries, before finally giving up. Because you’re not throwing the exception, the stream halts execution on every retry, generating the following output:
2 4 Retrying after 0 second(s)... 2 4 Retrying after 1 second(s)... 2 4 Retrying after 2 second(s)... 2 4
Although the following listing is pretty advanced, you can also bundle throwing the exception if your goal is to signal the unrecoverable condition on your last retry action, as shown in the next listing. You can do this by implementing some conditional logic within the projected observable returned from mergeMap(). In this case, if you’ve reached the last retry, you’ll project an observable with an exception; otherwise, you’ll project the timer, just as before.
const maxRetries = 3; Rx.Observable.of(2,4,5,8,10) .map(num => { if(num % 2 !== 0) { throw new Error(`Unexpected odd number: ${num}`); } return num; }) .retryWhen(errors$ => Rx.Observable.range(0, maxRetries + 1) .zip(errors$, (i, err) => ({'i': i, 'err': err})) 1 .mergeMap( ({i, err}) => { 2 if(i === maxRetries) { return Rx.Observable.throw(err); 3 } return Rx.Observable.timer(i * 1000) .do(() => console.log(`Retrying after ${i} second(s)...`)); }) ) .subscribe( console.log, error => console.log(error.message) );
Running this code prints the following:
2 4 Retrying after 0 second(s)... 2 4 Retrying after 1 second(s)... 2 4 Retrying after 2 second(s)... 2 4 Unexpected odd number: 5
Using if/else here is not the most functional way of writing code, but it’s acceptable in practice given that the scope is internal to the pipeline. But if you’re looking for a purer approach that uses more lambda expressions and keeps the FP spirit high, RxJS provides Rx.Observable.if(condition, then$, else$), which evaluates a given condition function and either returns the then$ observable or the else$, respectively. You’ll use this to refactor just that segment of code:
... .retryWhen(errors$ => Rx.Observable.range(1, maxRetries) .zip(errors$, (i, err) => ({'i': i, 'err': err})) .mergeMap(({i, err}) => Rx.Observable.if(() => i <= maxRetries - 1, 1 Rx.Observable.timer(i * 1000) 2 .do(() => console.log(`Retrying after ${i} second(s)...`)), Rx.Observable.throw(err)) 3 ) ) ...
The zip() operator can be very useful in cases when you need to spread out a stream synchronously over time, just as you did in the previous code samples. It’s not recommended when coordinating asynchronous streams that emit at different times—combineLatest() is the operator of choice in these cases. The reason for this is that zip() pairs the events one-to-one, so it’s effective when the asynchronous streams it’s operating over emit values with similar time intervals, which you can’t control all of the time. So if you’re pairing a mouse-move observable that emits rapidly, for example, with an AJAX call that emits every few seconds, you can easily cause its internal, unbounded buffer to overflow and your application to crash.
Finally, in order to be at feature parity with the imperative world of try/catch/finally, RxJS provides the finally() operator. Just like the do() operator, this operator mirrors the source observable and invokes a specified void function after the source observable terminates by invoking the observer’s complete() or error() methods. So the expectation is that finally() could perform some kind of side effect, if need be, such as cleanup actions. This is perfect for our stock ticker widget, which shows a counter of the last time the stock quotes were updated. In this case, you can add another subscription to the twoSecond$ observable for updating the last updated date:
const lastUpdated = document.querySelector('#last-updated'); const updateSubscription = twoSecond$.subscribe(() => { lastUpdated.innerHTML = new Date().toLocaleTimeString(); });
Remember that you can have a list of subscribers for the same event, so separating the logic for updating different portions of the site keeps the code under the observer nice and simple. If you had, say, three components that needed to change as a result of a stream emitting events, you could attach three observers and update the different portions of the site accordingly. So you have two subscribers: the one we just showed you and another used to fetch the stock data, to which you’ll add error-handling code. If the web service call made in the fetchDataInterval$ observable were to fail (returning a 500 HTTP response code, for example), the catch() operator would react and return a default value for that stock quote section, as shown in the next listing.
const requestQuote$ = symbol => Rx.Observable.fromPromise( ajax(webservice.replace(/$symbol/, symbol))) .map(response => response.replace(/"/g, '')) .map(csv) .catch(() => 1 Rx.Observable.of([new Error('Check again later...'), 0])) 1 .finally(() => { updateSubscription.unsubscribe(); 2 });
The other code you added was the finally() operator, which fires when a stream completes or when it errors. Because you’re running a two-second interval, you don’t expect a completion, but in the event of an error, you should also clean up the interval and cancel the subscription, so that the updated time shown reflects the last quoted update received before the error occurred. You can see this process in the graph in figure 7.13.
And now you need to make a small adjustment to the tick$ observable, so that it knows how to handle an error. You can use our Try functional data type to handle this, and if a failure does occur, delegate the exception to the error callback of the observer. Here’s that code once more with the new addition:
ticks$ .map(([symbol, price]) => [Try.of(symbol).getOrElseThrow(), price]) 1 .subscribe( ([symbol, price]) => { let id = 'row-' + symbol.toLowerCase(); let row = document.querySelector(`#${id}`); if(!row) { addRow(id, symbol, price); } else { updateRow(row, symbol, price); } }, error => console.log(error.message));
If this service were to fail (or your internet disconnect), you’d see “Check again later...” printed in the console.
As you can see, RxJS provides a comprehensive set of error-handling operations that allows you to easily retry an entire observable sequence when an error is detected in the pipeline. But we made a huge assumption about the nature of the observable sequences. That is, the observables that we created and retried in this chapter belong to a category known as cold observables. Cold observables are passive (dormant) and emit values only when subscribed to: an array of numbers, a Promise, intervals, and the like. In other words, retrying a cold observable basically resubscribes to it and requests that it emit its values again. In the next chapter, you’ll learn to create and handle the different types of observables: cold and hot.