Chapter 6. Coordinating business processes

This chapter covers

  • Synchronizing the emission of several observables
  • Using observables as signaling devices
  • Building complex interactions from multiple inputs
  • Spawning streams simultaneously
  • Streamlining database storage operations using observables

The previous chapter examined how converting multiple observables into a single one can simplify their consumption and reduce the management overhead. This mechanism is important because it allows you to reuse a single subscription to handle data that’s being transformed or created by the composition of multiple tasks, such as AJAX requests, business logic transformations, timers, and others. The various strategies for how these different types of merging operations (merge(), -concat(), or switch()) occurred, as in whether we cared about the order of the events or cancelled others, was determined by the operator itself—each had a different flavor. We also showed examples like search and drag and drop that use the output of one observable to signal the start or completion of another.

In this chapter, we’ll continue with this theme and expand where we left off in chapter 5. You’ll learn that you don’t always have to care about the result of an observable if you simply want to leverage the semantics of when one emits to cause some other process to begin. Furthermore, we’ll explore scenarios where events from multiple streams can be aggregated and combined so that the resulting observable is emitting the sum of two observables: in other words, two streams cooperating with each other, working together toward a common goal. To illustrate this, in this chapter, we’ll tackle problems that involve authentication, data persistence, and stream parallelization. The interplay of using observables as a signaling device and the more interesting uses we can achieve through joining observables forms the foundation of the more complex logic you’re likely to see in the wild. In order for you to understand how observables can collaborate, you must first understand how to tap into their lifecycle.

6.1. Hooking into the observable lifecycle

The representational power of a single observable is limited. Although you can create a stream to represent just about any data type, a single stream can contain logic for only a single set of inputs and outputs, like the results of a series of keystrokes or an individual web request. Even using the combinational operators from the previous chapters like mergeMap(), there’s still only a single task to which a observable can be assigned without introducing side effects. Remember that in the last chapter you were able to combine mouse and touch events to support drag and drop. Trying to also support, say, free-form drawing using the same stream would be very difficult because it would no longer be clear which use case an observer should be expecting. It’s important to realize that, by design, a single stream can carry out only a single task; therefore, performing multiple actions whether serially or in parallel depends on how you combine streams.

By now, you’ve learned how to transform and filter data in flight, even coming from different sources. Separating tasks into loosely coupled streams is advantageous because you can compartmentalize their respective logic without bleeding state into other areas of the application—we call this upstream compartmentalization or conformance. You saw examples of this in the mouse and touch code when you needed to make two streams conformant to a single observer block. You could, alternatively, combine the stream data as is and group all your business logic into the observer, or downstream compartmentalization. We highly recommend the former over the latter.

But there are times when those operations are insufficient because you need -several streams to interact while also maintaining the same semantically easy-to--understand flow of code that you’ve come to expect from RxJS. So instead of creating separate streams and building the scaffolding to connect them yourself, you learned in the previous chapter how to combine and map observables to other observables. You did this for real-world tasks such as smart search, a stock widget, and others. In this chapter, you’ll continue building on those techniques and continue the theme of observables working in unison to achieve a certain goal.

6.1.1. Web hooks and the observer pattern

RxJS’s Observable type is comparable to an EventEmitter data type, which we briefly mentioned in chapter 1, in that both belong to a general class of objects known as event hooks. Event hooks are just a way of targeting certain points in an object’s lifecycle with the objective of triggering further actions. When an action associated with a hook is triggered, you say that the hook has fired. Event hooks can operate within or beyond the confines of a single application. For instance, GitHub, the most popular version control repository for hosting code, provides access to a whole slew of external hooks that allow multiple services to coordinate with events such as the creation of pull requests, new commits, or branches. Each time an action is performed, the associated event hooks will fire and any listeners will receive those events, as shown in figure 6.1.

Figure 6.1. A couple of well-known GitHub hooks that clients can plug logic into

In general, event hooks provide two main benefits:

  • They allow the developer of the application to retain control over what constitutes a hook, thereby maintaining final say over where and when events will be fired.
  • They allow third parties to execute arbitrary code without having to worry about detecting internal system implementation.

It’s not hard to realize that event hooks are just another manifestation of the observer pattern omnipresent in RxJS. Similarly, every observable also has a set of events, or hooks, in its lifecycle that can be plugged into, all of which should be familiar to you by now:

  • Observable start (subscription)
  • Observable stop (completion or error)
  • Observable next (normal event)

6.1.2. Hooked on observables

Let’s discuss each one a bit further and offer some operators that work during these stages. The first item, the observable start (or the onSubscribe hook), may not be as obvious as the other two, which you’ve seen in some form several times up to this point, but it’s also perhaps the easiest to understand. The goal of listening to when a subscription is created is to perform some action when an observable begins emitting events. Hence, the startWith() operator does something to this effect by prepending a value to the front of an observable each time it’s subscribed to by an observer. So in the following code, the number 0 will appear before any other events on the console:

Rx.Observable.range(1, 5)
  .startWith(0)
  .subscribe(console.log);
//-> 0
     1
     2
     3
     4
     5

The startWith() operator is a concat() (in reverse) of all of the values provided to it with the source stream following, in that order. It leverages the subscription behavior to inject events before others are received. This can be trivially implemented as a custom operator just like in chapter 3. The following listing shows how you might implement it.

Listing 6.1. Hooking into the start of a stream
function startWith(value) {
  return Rx.Observable.create(subscriber => {           1
    let source = this;
    try {
       subscriber.next(value);                          2
    }
    catch(err) {
       subscriber.error(err);
    }
    return source.subscribe(subscriber);                3
  });
};

Rx.Observable.prototype.startWith = startWith;

  • 1 Uses the factory method to create the stream
  • 2 Always emits the value before anything else
  • 3 Emits the rest of the stream
Code samples

Remember that all the code for this chapter can be found in the RxJSinAction GitHub repository: https://github.com/RxJSInAction/rxjs-in-action.

This operator makes sure that every time the stream is subscribed to, it produces that value first. Now, normally you wouldn’t reinvent your own startWith() because RxJS already implements the necessary hooks for you; all you have to do is inject any function you want. But this serves to show how extensible observables are.

On the opposite side of the spectrum, you can also think of the completion event as its own kind of event. It occurs when invoking the observer’s complete() method when some observable finishes and before the subscription is disposed of. As you’ve seen all along, this hook is used to perform all of your side effect logic, operations outside the scope of the observable (such as logging to the console, printing data to the screen, writing to a database), and others.

RxJS finally

RxJS’s error-handling mechanism also introduces the finally() operator. Semantically similar to the traditional finally block in JavaScript, this operator is the last step in the observable lifecycle, regardless of whether any errors occurred. The function passed to the finally operator will be executed when the observable is shutting down for any reason, so even if the observable terminates with an exception, the block will still be run. This gives you the opportunity to recover from errors and clean up any necessary resources. We’ll cover error handling and all of the wonderful things you can do to recover from errors in chapter 7.

In addition, you can combine logic that’s tied to the start and end of an observable. You can do this with a new operator, called using(). Figure 6.2 demonstrates how this operator works.

Figure 6.2. The using() operator controls the lifespan of a resource object (created via a resourceFactory()) through the lifespan of an observable (created via an observableFactory()).

This operator (using()) takes two parameters: a function that creates a disposable resource (like an object) and a function that creates an observable. The created object is called a disposable because it provides the mechanism to clean itself up—an unsubscribe() method. Both of these functions are known as factory functions in RxJS parlance. The resource is tied to the lifecycle of the observable created by this function, so that when the latter is disposed of, so is the resource. When an observer subscribes to the observable returned from using(), the first factory function is invoked to create an instance of the resource. The resource is then passed to the second factory function as a parameter, and that second factory function returns the actual observable that will be subscribed to. Disposing of the resource is as simple as disposing of the stream through normal means. When the subscription goes through the disposal process, it will also attempt to dispose of the resource that was created by the resource factory. The completion of the observable will also attempt to dispose of the resource, whichever comes first. Essentially, what you’re doing is linking the lifespan of an object using an observable.

Here’s an example that will help you understand how it works. Consider an arbitrary disposable resource object, called DisposableResource:

class DisposableResource {
  constructor(value) {
     this.value = value;
     this.disposed = false;
  }

  getValue() {
     if (this.disposed) {
        throw new Error('Object is disposed');
     }
     return this.value;
  }

  unsubscribe() {                        1
     if (!this.disposed) {
        this.disposed = true;
        this.value = null;
     }
     console.log('Disposed');
  }
}

  • 1 A disposable resource must provide an implementation for the unsubscribe() behavior.

You can tie the behavior of this object and its state with the lifespan of any observable with using(), as follows:

const source$ = Rx.Observable.using(              1
  ()=> new DisposableResource(42),
  resource => Rx.Observable.interval(1000)
);

const subscription = source$.subscribe(
    next => console.log(`Next: ${next}`),
    err  => console.log(`Error: ${err}`),
    ()   => console.log('Completed')
);

//...
subscription.unsubscribe();                       2

  • 1 using() receives two parameters: a resource object and an observable.
  • 2 Seconds later, you unsubscribe from the source, which will unsubscribe from the observable managing the resource as well as the resource itself.

Running this code will begin emitting one value every second. Seconds later (after subscription but before emission), you unsubscribe from the source. This cleans up the resource observable as well as the DisposableResource instance by calling its unsubscribe method.

The idea behind it is that you often have resources that are completely subject to the lifespan of the observable. In order to hook into this stage, the only requirement is that the object you plug in must be disposable-like, which is to say that it must declare an unsubscribe() method in order to be cleaned up properly if needed. As an example, suppose you want to manage the login session of the user through an observable. When the user logs in, you can create a session token that can be stored in the cookies that keep track of the authenticated user session. But when the user logs out or closes the window, the session needs to be deleted. The closing of the browser signals an event that you can listen for, so you can use observables for this.

First, you need to create an object that will manage the lifecycle of the session token, similar to the previous code sample. Upon construction, it will set the session to expire in 24 hours, as shown here.

Listing 6.2. SessionDisposable object implementing the dispose functionality
class SessionDisposable {
  constructor(sessionToken) {
    this.token = sessionToken;
    this.disposed = false;
    let expiration = moment().add(1, 'days').toDate();          1
    document.cookie = `session_token=${sessionToken};
        expires=${expiration.toUTCString()}`;                   2
    console.log('Session created: ' + this.token);
  }

  getToken() {
     return this.token;
  }

  unsubscribe() {                                               3
     if (!this.disposed) {
        this.disposed = true;
        this.token = null;
        document.cookie = 'session_token=; expires=Thu, 01 Jan 1970
            00:00:00 GMT';
        console.log('Ended session! This object has been disposed.');
     }
  }
}

  • 1 Creates a cookie with an expiration date 24 hours from now, using the popular moment.js library to manipulate dates easily (installation instructions found in appendix A)
  • 2 Adds the cookie
  • 3 Clears the cookie

The most important aspect to note from this class is the declaration of the unsubscribe() method, so that objects of this type conform to the disposable specification.

Dispose or unsubscribe

The terms dispose and unsubscribe are interchangeable. The notion of disposing was used predominantly in RxJS 4, so the terminology became part of the RxJS jargon. RxJS 5 changes this to unsubscribe, yet it’s still easier to say disposable than unsubscribable.

If you have a Java or C# background, this is analogous to saying that your class implements the Disposable interface. The logic here is simple; it only resets the value of the token and sets the time back to the epoch (01/01/1970) so that the browser deletes it before closing. Now, let’s tie the SessionDisposable object to a stream. For this example, you’ll be using the using() operator to construct an observable sequence that depends on an object whose lifetime is tied to the resulting observable sequence’s lifetime; in other words, you’ll make one stream dependent on another one. This using() operator takes a factory function that creates the SessionDisposable object and a second method that makes that object available to the stream.

Here’s how you can use using() to manage a session token available for the duration of a countdown.

Listing 6.3. Managing a temporary session token with using()
function generateSessionToken() {                          1
   return 'xyxyxyxy'.replace(/[xy]/g, c => {
      return Math.floor(Math.random() * 10);
   });
}

const $countDownSession = Rx.Observable.using(
   () => new SessionDisposable(generateSessionToken()),    2
   () => Rx.Observable.interval(1000)
     .startWith(10)
     .scan(val => val - 1)
     .take(10)
);

$countDownSession.subscribe(console.log);                  3

  • 1 Simple function to generate a random session token
  • 2 Attaches the session to the lifespan of this observable
  • 3 When this subscription completes, the subordinate session disposable token will be disposed of and the cookie deleted.

With this code, your user’s login state is now tied into the lifetime of the session, such that when the user closes the window or logs out (thereby unsubscribing from the -subscription), they’re also logged out of the application. This kind of pattern is used a lot in e-commerce sites where you need to perform some action within a certain time span. This kind of coordination takes advantage of the hooks to create side effects around the observable, but you can do much more by incorporating more streams into the mix.

About the Rx.Observable.using() operator

The using operator actually comes from C#, where a using(resource){ ... } block is a synchronous construct that manages the lifetime of a resource by invoking the garbage collector and cleaning up the resource once the inner block (between the curly braces) exits.

Using a disposable object allowed you to tap into RxJS’s unsubscription mechanism. It turns out that you can handle many different use cases when observables coordinate with others through signals.

Another form of coordination occurs when multiple streams join together to produce a result. You learned in the previous chapters about combination operators such as merge(), switch(), and concat(), and we briefly introduced combineLatest(). In the areas of signaling and coordination, combineLatest() can have many practical effects. Let’s spend more time exploring how important this operator is when it comes to parallel streams.

6.2. Joining parallel streams with combineLatest and forkJoin

Building asynchronous flows is a difficult endeavor, because each possible permutation of data arrival times—which you can never guarantee—must be accounted for. Using the browser’s multiple connections, you can retrieve some of the data in parallel. But when data is causally linked, it needs to be fetched serially. In this section, we’ll show you how to use RxJS to coordinate between observable streams that can originate from independent and dependent actions. That is to say, a stream’s data can come from events causally linked to other data sources (dependent), like a button click that kicks off an AJAX request, or it can be run in parallel with other streams (independent), like simultaneously invoking two AJAX requests. There are ways to do this with plain-vanilla JavaScript using our long-lost friends, callbacks and Promises. Consider writing an event handler for a button that, when clicked, queries two remote data sources for data using an ajax() function with a callback:

button.addEventListener('click', () => {
   let result1, result2 = {};
   ajax('/source1', data => {
      result1 = data;
   });

   ajax('/source2', data => {
      result2 = data;
   });

   setTimeout(() => {
     processResults(result1, result2);
   }, arbitraryWaitTimeMs);                      1
});

  • 1 Would need to be long enough so that both AJAX calls have enough time to finish

Of course, the chances of this working are slim to none because you can never predict how long both AJAX calls will take. Other options include writing custom waiting routines based on setInterval() and semaphores, yet they’re convoluted and invasive to the business logic. This code would allow both AJAX requests to happen in parallel, but waiting and combining the results later is difficult without proper wait semantics. Inserting sleep functions into a single-threaded JavaScript code is frowned on, because browsers may deem your scripts unresponsive. In order for this to work, you have to either sacrifice parallelism and nest your AJAX calls or create a variable visible at a higher scope. Neither of these solutions is terribly wieldy:

button.addEventListener('click', () => {
  ajax('/source1', result1 => {
    ajax('/source2', result2 => {
      processResults(result1, result2);
    });
  });
});

Or:

button.addEventListener('click', () => {
  let firstResult;
  ajax('/source1', result => {
    if (firstResult) processResults(result, firstResult);
    else firstResult = result;
  });
  ajax('/source2', result => {
    if (firstResult) processResults(firstResult, result);
    else firstResult = result;
  });
});

And just in case you’re thinking about this, we’ll mention that using observables this way goes against the RxJS philosophy—it’s an antipattern. What we mean to say is that you could think of nesting subscribe() blocks like this:

click$.subscribe(() => {
   source1$.subscribe(result1 => {
     source2$.subscribe(result2 => {
        processResults(result1, result2);
     });
   });
});

But just like the previous snippet, this would lead you to the very familiar callback hell you’re trying to stay away from in the first place. In addition, this solution would apply only if the inner observables were eagerly loaded, which is not the case. Consequently, the inner subscriptions would begin executing only after the outer subscription had produced its first value. This makes your code blocks dependent and tightly coupled to each other, not parallel, as shown in figure 6.3.

Figure 6.3. The issue with nested subscription blocks

In the previous chapter, you learned that you can project observables using operators such as mergeMap() and switchMap(), but these won’t work either, for two reasons:

  • Nesting observables implies causality: that a source observable dictates how the other executes.
  • You need to preserve the data from all the observables at the same time without cancelling any of them.

Note that nesting callbacks does eliminate the need for an arbitrary wait time. But in either case, you won’t be able to perform tasks in parallel, which is what you’re trying to do here. Given how frequently this occurs in JavaScript, other libraries address this problem, such as Async.js (https://github.com/caolan/async), which you can use to write code of the following form:

button.addEventListener('click', () => {
     async.parallel([                              1
        ajax('/source1'),
        ajax('/source2')
     ],
     (err, ([result1, result2])) => {              2
        processResults(result1, result1);
     });
});

  • 1 Runs all functions in parallel
  • 2 The final callback in async.parallel() receives an array with the outcome of both AJAX calls.

Luckily, you don’t need to include another library, because RxJS is the right tool for the job. In this section, we’ll explore an operator called combineLatest() that applies parallel semantics similar to Async.js but in line with the RxJS operator philosophy. To frame this problem more concretely, consider two streams that access two popular APIs to shorten a URL. One makes an AJAX request to Bitly (https://app.bitly.com), and the other to Google URL Shortener (https://goo.gl). For the sake of discussion, you’ll kick these streams off using another stream that monitors a URL text box field—debounced for efficiency, of course.

You want both to be able to run in parallel, and you might be able to use operators that you learned in the previous chapter, like merge() and concat(), depending on whether you care about order preservation. But because both streams are not causally linked (as in you can’t just chain them together one after the next), you can’t use these operators. For example, concat() would force each result to emit in order, one after another rather than in parallel, whereas merge() would only allow you to consider a single emission at a time downstream, instead of collectively.

Rather, we stipulated that both tasks must run in parallel but emit results only when all of them have emitted, which may be at any point in the future. You tried using callbacks; let’s see if Promises fair better.

6.2.1. Limitations of using Promises

Certainly, you need a new pattern or set of operators to accomplish your goal, such that you can execute all the statements in parallel while also being able to gather them collectively when they’ve all completed. Promise libraries that follow the Promise/A+ protocol include a collection operation called Promise.all(), which creates a new Promise that awaits the completion of all the Promises or rejects with the error of the first one to reject. Let’s use this method here, but instead of using callbacks, you’ll use a version of ajax() that returns Promises that wrap the HTTP request:

button.addEventListener('click', () => {
   Promise.all(ajax('/source1'), ajax('/source2'))          1
       .then(([result1, result2]) => {                      2
          processResults(result1, result2);
     });
});

  • 1 Executes all Promises and waits for all to complete
  • 2 Processes the joined value, an array, returned from the call to .all(), which is destructured and passed into a method that knows how to render account details

Already, you see that the use of Promises helps your code not only in indentation and readability but also in parallel, effectively, because you’re using a mechanism that knows to emit the value only when all have arrived. But the more these types have to be mixed and matched, the more difficult this becomes, because each variation will require a more intricate solution. In the previous code, you mix two fundamentally different paradigms: event-driven listeners with the more functional Promises. Nevertheless, it does achieve parallelism and moves the needle in the right direction.

You already know the desired traits of the new operator you need. It should provide the fluent API design of Promises plus the parallelism semantics of Async.js. This operator should take multiple sources, like the static merge() operator, but at the same time, it should be able to combine and emit the collective result from all inputs as an event of its own. Let’s take a look at combineLatest().

6.2.2. Combining parallel streams

Whereas operators such as merge(), concat(), and switch() combine a series of observables (or an array of them) to output a single observable, combineLatest() gives you a way to emit and capture events from multiple sources at the same time. This operator creates an observable whose values are calculated from the latest values of each of its input observables. combineLatest() is ideal for situations where you need to spawn to long-running processes in parallel and then use the combined result. For example, suppose you want to use third-party services to shorten URLs. Because both streams act independently, you can use both services in parallel and then present the user with both outputs. This is the task we’ll tackle in this section.

Before we begin developing the solution to this problem, we’ll briefly introduce you to combineLatest() with a simple example. The data emitted is similar to how buffering worked in chapter 3. In other words, the output is an array that combines the latest data from all of the input observables—the same semantics as Promise.all() or async.parallel(). Here’s a quick example to showcase how this operator works. You’ll combine the output of two streams: one emits letters every second, and the other emits numbers every second.

Listing 6.4. Synchronizing streams with combineLatest()
const letter$ =
   Rx.Observable.interval(1000)                       1
   .map(num => String.fromCharCode(65 + num))
   .map(letter => `Source 1 -> ${letter}`);

const number$ = Rx.Observable.interval(1000)          2
   .map(num => `Source 2 -> ${num}`);

Rx.Observable.combineLatest(letter$, number$)
   .take(5)
   .subscribe(console.log);

  • 1 Emits A, B, C, ... every second
  • 2 Emits 0, 1, 2, 3, ... every second

Running this code prints the following:

["Source 1 -> A", "Source 2 -> 0"]
["Source 1 -> B", "Source 2 -> 0"]          1
["Source 1 -> B", "Source 2 -> 1"]          2
["Source 1 -> C", "Source 2 -> 1"]
["Source 1 -> C", "Source 2 -> 2"]

  • 1 Source 1 emits “B” with the latest value in source 2, “0.”
  • 2 Source 2 emits “1” with the latest value in source 1, “B.”

Here, you have two independent streams that emit every second: one, letters starting with A, and the other, numbers starting at zero. Each emission will cause a collective emission of the latest value present in the stream. So after the first emission, A -> 0, each result alternates emitting the latest from the other stream. In other words, when Source 1 emits B, it sends the result with the latest value in Source 2 at that time, 0. Then, when Source 2 emits the next value, 1, it sends the result with the latest value in the stream at that time, 0. In summary, an emission from any stream in the combination causes all of them to publish their latest value, all sent to the observer via an array.

In this simple case, both data sources are asynchronous intervals. With synchronous data sources, you have to be careful because RxJS will immediately run through the events of the first source stream and combine its latest value with the latest value of the combined stream instead of pairing each number with a letter.

const letter$ = Rx.Observable.from(['a', 'b', 'c']);
const number$ = Rx.Observable.from([1, 2, 3]);
Rx.Observable.combineLatest(letter$, number$).subscribe(console.log);

Running this code will output a very different result:

   ["c", 1]
   ["c", 2]
   ["c", 3]

Now that you understand how this operator works, let’s jump into your task. Again, you want to spawn parallel AJAX calls to shorten some URL. The user is expected to type a valid URL into a text box; when the user removes focus from it, it will kick off these independent streams. So you’re mixing one causally linked stream with two parallel streams, which should suggest the use or mergeMap() (or switchMap()) and combine-Latest(), respectively.

Causality

Generally, causal streams (one depends on the other) are combined using mergeMap() or switchMap(), whereas independent streams are combined using combineLatest() and others you’ll learn about shortly.

Reasoning about this problem this way—thinking in streams—we came up with the following program for a URL shortener stream that uses both Bitly and Google.

Listing 6.5. Combining multiple URL shortener streams
const urlField = document.querySelector('#url');

const url$ = Rx.Observable.fromEvent(urlField, 'blur')
   .pluck('target', 'value')
   .filter(isUrl)                                                  1
   .switchMap(input =>                                             2
       Rx.Observable.combineLatest(bitly$(input), goog$(input)))   3
   .subscribe(([bitly, goog]) => {
     console.log(`From Bitly: ${bitly}`);
     console.log(`From Google: ${goog}`)
  });

  • 1 Checks using a regex that the input provided matches a valid URL (omitted for brevity)
  • 2 Projects an observable that will emit results when both subordinate streams emit
  • 3 Combines the latest events of both services

To run this program, type any URL into the input field; we’ll use these providers to shorten this URL. So, for https://www.manning.com/books/rxjs-in-action, the output is

From Bitly: http://bit.ly/2dkHUau
From Google: https://goo.gl/plTbDG

These all resolve to the original link (so feel free to share it on your favorite social media!). Of course, you don’t understand exactly how bitly$ and goog$ work, but the abstraction provided by RxJS means you can still reason about this code as is, from its declarative nature. Figure 6.4 is a simple graph to show you what’s happening.

Figure 6.4. The workings of combineLatest(). This operator outputs an array containing the latest values from all of its input observables.

Fortunately, combineLatest() allows you to provide a selector function that makes the stream more conformant, so that you can avoid the direct array access, which can be tedious and error prone when you need only one of the results. This selector function receives as arguments the data emitted from each subordinate observable. So, by using a selector function that measures the length, you can compute the shorter of the URLs:

const url$ = Rx.Observable.fromEvent(urlField, 'blur')
   .pluck('target', 'value')
   .filter(isUrl)
   .switchMap(input =>
         Rx.Observable.combineLatest(bitly$(input), goog$(input),
           (b, g) => b.length > g.length ? b : g))                        1
    .subscribe(shortUrl => {
      console.log(`The shorter URL is: ${shortUrl}`);
    });

  • 1 Using a selector function to pick the data from the stream that emits the shorted URL

For the sake of completing this example, let’s finish implementing each individual stream, because they pack another interesting technique used to deal with third-party APIs that work with callbacks. You’ll implement both services as functions that accept a URL and return a stream used to shorten it. You’ll start with bitly$. When you open a Bitly account, you’ll need to find the following information in order to make remote web API requests:

const API = 'https://api-ssl.bitly.com';          1
const LOGIN = '<YOUR LOGIN>';                     2
const KEY = '<YOUR GENERATED KEY>';               2

  • 1 Bitly’s Web API URL
  • 2 You can obtain these fields from your profile settings.

The next listing shows the observable function used to shorten this URL.

Listing 6.6. Bitly URL shortener stream
const ajaxAsObservable = Rx.Observable.bindCallback(ajax);                 1

const bitly$ = url => Rx.Observable.of(url)
   .filter(R.compose(R.not, R.isEmpty))
   .map(encodeURIComponent)
   .map(encodedUrl =>                                                      2
    `${API}/v3/shorten?longUrl=${encodedUrl}&login=${LOGIN}&apiKey=${KEY}`)
   .switchMap(url => ajaxAsObservable(url).map(R.head))                    3
   .filter(obj => obj.status_code === 200 && obj.status_txt === 'OK')
   .pluck('data', 'url');                                                  4

  • 1 Binds the function’s callback internally to the observer’s next function
  • 2 Builds the API path
  • 3 Invokes an AJAX call against Bitly with the longUrl to shorten
  • 4 Extracts the URL property

For starters, we need to explain the first line in listing 6.6, which you haven’t encountered before. It’s a fact that many JavaScript APIs, particularly Node.js, still use callback functions heavily. Just as RxJS works well with Promises, it’s important to be able to adapt callback-based APIs to RxJS. The way to do this is by internally binding the callback as the observer’s next() method and publishing that value as an observable to continue the chain, as shown in figure 6.5.

Figure 6.5. bindCallback transforms any function f(x, callback) into a function g, such that calling g(x) outputs an observable with the result of the callback.

This way, when the bound ajax() function is invoked with the URL argument, it will execute and the result intended for the callback will be proxied into a new observable. Because you’re returning an observable, you use switchMap() to project it and replace the source stream. This is the only new part; everything else should be straightforward.

Furthermore, working with Google’s URL shortener is similar, except that for reasons of security and authentication, it’s best to use their JavaScript client APIs instead of making a raw request (details about installing this library can be found in appendix A). Just like Bitly, Google’s service expects you to have a Google account, have this particular API enabled, and have generated a security OAuth2 token. This client API library gapi gives you access to many of Google’s web APIs, and it works partially with callbacks and Promises. So integrating it into RxJS involves wrapping those promisified method calls to configure the library and pushing it downstream as you set up to make the shorten call; see the following listing.

Listing 6.7. Google URL shortener stream
const GKEY = '<YOUR-GENERATED-OAUTH-KEY>';                                 1

const gAPILoadAsObservable = Rx.Observable.bindCallback(gapi.load);        2

const goog$ = url => Rx.Observable.of(url)
   .filter(R.compose(R.not, R.isEmpty))
   .map(encodeURIComponent)
   .switchMap(() => gAPILoadAsObservable('client'))                        3
   .do(() => gapi.client.setApiKey(GKEY))                                  4
   .switchMap(() =>                                                        5
       Rx.Observable.fromPromise(gapi.client.load('urlshortener', 'v1')))
   .switchMap(() =>                                                        6
       Rx.Observable.fromPromise(gapi.client.urlshortener.url.insert(
         {'longUrl': example_url}))
   )
   .filter(obj => obj.status === 200)
   .pluck('result', 'id');

  • 1 Uses your OAuth2 token generated through the Google APIs console
  • 2 Binds the callback into the load method so that you can integrate it into the observable
  • 3 Loads the client library
  • 4 Passes the generated token
  • 5 Loads the URL shortener API
  • 6 Shortens the URL and inserts it into your personal list of URLs

As you can see, you can compartmentalize both services as individual observables, only to embed them into an orchestrating observable using combineLatest() to run those services in parallel in reaction to the URL field changing. Here’s that code once more:

const url$ = Rx.Observable.fromEvent(urlField, 'blur')
   .pluck('target', 'value')
   .filter(isUrl)
   .switchMap(input =>
       Rx.Observable.combineLatest(bitly$(input), goog$(input)))
   .subscribe(([bitly, goog]) => {
     console.log(`From Bitly: ${bitly}`);
     console.log(`From Google: ${goog}`)
  });

This code reveals that spawning and joining streams gets first-class citizen treatment in RxJS. To nail this point home, let’s look at an operator called forkJoin().

6.2.3. More coordination with forkJoin

RxJS provides an operator called forkJoin(), in many ways similar to combine-Latest(), in charge of running multiple observable sequences in parallel and collecting their last element. In contrast to combineLatest(), forkJoin() will emit only the last value of each forked stream. This is important, so we’ll come back to it to make this really clear. At the time of writing, most modern browsers allow you to make up to 10 requests for data simultaneously, and forkJoin() takes advantage of this to maximize throughput. For the stock ticker widget, this operator is a plus because you can look up multiple stock symbols simultaneously and then add them all up to reflect the grand total of the user’s entire stock portfolio. Take a look at this example in figure 6.6. Here’s an outline of the steps:

  1. Create a function that uses an observable to fetch the stock data for a company’s symbol with price.
  2. Iterate through the user’s preferred stock symbols: FB (Facebook), AAPL (Apple), and CTXS (Citrix).
  3. Use forkJoin() to spawn these simultaneous processes and join the result.
  4. Add the final result.
Figure 6.6. The fork operation spawns several requests, waits for them to complete, and emits when all streams have completed. The result is an array mapping to the output of each stream.

To implement our widget, the first thing you’ll do is reuse the function that fetches a stock symbol’s price requestQuote$() from our stock ticker widget in chapter 5:

const requestQuote$ = symbol =>
     Rx.Observable.fromPromise(
       ajax(webservice.replace(/$symbol/, symbol)))
     .map(response => response.replace(/"/g, ''))
     .map(csv);

There are so many things you can do, and it all depends on your needs. In this case, you’re optimizing for parallelism. One of the things you did in chapter 5 was make the stream conformant in that it returns only the price property of the fetched company symbol as a numerical float.

Remember from previous chapters that the user has chosen to fetch stock information for three companies:

const symbols = ['FB', 'AAPL', 'CTXS'];

To compute the total price, you need to query for each of these symbols in parallel and add up the joined result. For this, you’ll use forkJoin(). You could pass each request observable one by one:

Rx.Observable.forkJoin(
   requestQuote$('FB'),
   requestQuote$('AAPL'),
   requestQuote$('CTXS')
);

This is clean and declarative. Preferably, use your FP skills to map this function over the symbols array, as shown here.

Listing 6.8. Using forkJoin to fetch multiple stock symbols simultaneously
Rx.Observable.forkJoin(symbols.map(requestQuote$))
 .map(data => data.map(arr => parseInt(arr[1])))                      1
 .subscribe(allPrices => {
    console.log('Total Portfolio Value: ' +
        new USDMoney(allPrices.reduce(add).toLocaleString()));
 });

  • 1 Reads the price amount only

Just like combineLatest(), forkJoin() will return an array with all stock prices all at once. The subscriber receives the array and reduces it with a simple const add = (x, y) => x + y; function to produce the result, which at the time of this run is

"Total Portfolio Value: USD 293.25"               1

  • 1 Total value subject to change depending on market conditions

As you can see, this flow is declarative, immutable, and uses functional expressions to obtain the final answer. A simple look at the browser’s console, shown in figure 6.7, reveals that all simultaneous processes began at the same time:

Figure 6.7. The browser’s view of network traffic shows the remote HTTP requests all start at the same time. The forkJoin() operator spawns these requests and waits for all to emit before emitting its result.

forkJoin() and combineLatest() are similar, yet each imparts its own flavor. Aside from the former being strictly a static factory method and the latter used interchangeably as a factory and instance operator, they differ in the criteria with which they emit their values. forkJoin() emits only the latest values from each of the input observables. So if a sequence emits five values, it will sit there and wait for the last one (certainly expect some level of in-memory caching occurring here):

Rx.Observable.forkJoin(
  Rx.Observable.of(42),
  Rx.Observable.interval(1000).take(5))               1
.subscribe(console.log);  //-> [42,4]

  • 1 It will hold on to 42 for about 5 seconds and then emit the last value seen from all streams.

On the other hand, combineLatest() is closer to a merge in the sense that it will emit values for the latest values when any of its input observables emits, namely:

Rx.Observable.combineLatest(
  Rx.Observable.of(42),
  Rx.Observable.interval(1000).take(5))
.subscribe(console.log);

//-> [42, 0]
     [42, 1]
     [42, 2]
     [42, 3]
     [42, 4]

As you saw in these examples, asynchronous data may arrive at any time, which makes coordination difficult to implement without a tool like RxJS. This is particularly important when synchronizing data operations into a database, for instance. Let’s see how RxJS fares with these kinds of problems.

Pitfalls of combinatorial operators

For many operators that combine streams, even those like combineLatest(), which emits on any change, each observable is expected to emit at least once before the combining operator emits. So, don’t try to do this,

Observable.combineLatest(
  Observable.empty(),
  Observable.range(1, 3)
)

and expect to get any values.

6.3. Building a reactive database

When data sources are expected to arrive at different times or are tied to different source events, it can become difficult to properly coordinate them. As you saw earlier, operators like combineLatest() and forkJoin() implement a joining pattern that one way or another waits for input observables to complete before emitting a value. This is incredibly powerful and the sort of behavior you’ll find in sophisticated concurrency frameworks. You can also find plenty of uses cases of this pattern in backend systems, especially when dealing with data persistence.

The use case you’ll tackle here is a simple banking transaction system that keeps track of all transactions as a user withdraws money from their account. Thinking reactively here, you should recognize instances of join patterns because reacting to some action triggers another to occur. In this case, you’ll need to join together or sequence a set of database calls to reflect a withdraw action and a transaction record being created. Around this problem domain, you’ll implement a few tasks such as loading all of a user’s transactions from the database.

A common problem with sophisticated client-side applications is loading all the data from the backend into the browser, an environment restricted to a limited amount of memory. Some architectures load the data as needed; this is called progressive loading. But this doesn’t work well if an application has high demands for performance or needs to work without an internet connection. Most modern applications are expected to work this way. Another approach is to bypass the browser’s memory and load the data into persistent storage. Let’s go over the technology you’ll be using.

IndexedDB is a great and relatively underutilized web standard for client-side databases. It takes what was traditionally a server-side process of storing data efficiently in some structured manner and allows those same types of operations for the web. Unfortunately, the standard has a less-than-straightforward interface. So for this example, you’ll use an abstracted library modeled after the more popular CouchDB library, called PouchDB, which is more readable and handles browser differences (please visit appendix A for installation instructions).

The benefit of using PouchDB, like most modern asynchronous JavaScript APIs you interacted with earlier, is that it uses Promises to model all of its asynchronous operations, which means you can use Rx.Observable.fromPromise() to adapt all the API calls if you want to use observables, which is exactly what you’ll do because you’re smarter about preferring observables to regular Promises. For instance, the output of PouchDB.put(), a Promise method, can be converted to an observable, as in figure 6.8.

Figure 6.8. Adapting the callback-based API into an observable

You can use RxJS to move this static, persistent data into flows of asynchronous operations that compose or cascade the outcome of one into the next seamlessly. Hence, RxJS becomes your query language, treating data as constantly moving and changing infinitely. Keep in mind that PouchDB is a schemaless document store, so this means you don’t need to define and create schema before writing data to its tables. You’ll start with a simple example that loads a set of banking transactions into the document store. Constructing an instance of the database is as simple as this:

const txDb = new PouchDB('transactions');

This database stores transaction documents in JSON form. A transaction has the following structure.

Listing 6.9. Transaction class
class Transaction {
  constructor(name, type, amount, from, to = null) {
     this.name = name;
     this.type = type;
     this.from = from;
     this.to   = to;
     this.amount = amount;
  }

}

Next, you’ll populate your database with a few transaction records that represent a user transferring money from one account to another.

6.3.1. Populating a database reactively

The code to create and store several transactions involves looping through Transaction objects (whether they come from a locally stored array or from a remote HTTP call), date-stamping each transaction with an RxJS timestamp, and posting it to the database, as shown in figure 6.9.

Figure 6.9. Steps to populate data into local storage using streams

You’ll start by artificially populating the database with this dataset:

function getTransactionsArray() {
  return [
   new Transaction('Brendan Eich', 'withdraw', 500, 'checking'),
   new Transaction('George Lucas', 'deposit',  800, 'savings'),
   new Transaction('Emmet Brown', 'transfer', 2000, 'checking', 'savings'),
   new Transaction('Bjarne Stroustrup', 'transfer', 1000, 'savings', 'CD'),
  ];
}

The next listing shows this in action. You’ll create two streams, one in charge of performing the database operation and the other for processing the input.

Listing 6.10. Populating the database
const writeTx$ = tx => Rx.Observable.of(tx)
  .timestamp()                                                            1
  .map(obj => Object.assign({}, obj.value, {                              2
                date: obj.timestamp
              })
   )
  .do(tx => console.log(`Processing transaction for: ${tx.name}`))
  .mergeMap(datedTx => Rx.Observable.fromPromise(txDb.post(datedTx)));    3

Rx.Observable.from(getTransactionsArray())                                4
  .concatMap(writeTx$)                                                    5
  .subscribe(
    rec => console.log(`New record created: ${rec.id}`),
    err => console.log('Error: ' + err),
    ()  => console.log('Database populated!')
  );

  • 1 Attaches a timestamp to each emitted item that indicates when it was emitted. The resulting object has two properties, obj.value, which points to the emitted object (transaction), and obj.timestamp, which contains the time the event was emitted.
  • 2 Uses ES6 Object.assign() to create a copy of the transaction object with the additional date property. This preserves immutability. You could also use the ES6 spread operator (partially supported in some JavaScript environments).
  • 3 Posts the object into the database by wrapping the PouchDB.post() operation with an observable. This assigns the stored document a unique _id.
  • 4 Reads the transaction objects from a local array
  • 5 Joins the stream to process and creates the new transaction document

Before we get into the details of this code, it’s important to note that you were able to process and manipulate a set of objects and store them in a database, all in an immutable manner; this is compelling and reduces the probability of bugs. Listing 6.10 involves multiple steps and new concepts:

  1. You know you’ll need to modify the transaction objects to include the date when the transaction was processed. This is typical of any banking application because most transactions are sorted based on date. Because functional programs are immutable, instead of mapping a function to the transactions array and modifying the object’s internal structure directly, you can use JavaScript’s ES6 Object.assign() to immutably create or set a new property into the object, leaving the original intact—you want your code to be as stateless as possible.
  2. Next, you retrieve the transaction data into an array. Given RxJS’s unifying model of computation, you could easily retrieve data from a local array, or you could just as easily fetch it with a remote HTTP call, such as this:
       Rx.Observable.fromPromise(ajax('/transactions'))
          .timestamp()
          ...
  3. You use the Object.assign() function to add a date to the transaction object iterated over by passing the generated RxJS timestamp() operator. This operator creates an object with a timestamp and a value property, containing the original object’s data, as shown in figure 6.10.
    Figure 6.10. RxJS timestamp operator

  4. You create each transaction object using the post() method of the PouchDB object. This object also sets a randomly generated key in the database table. Although this method call inevitably creates a side effect in your application (writing to a database), it’s one that’s managed by RxJS and isolated to its own function—the rest of the code remains pure. As we said earlier, because PouchDB exposes a thenable API, you can wrap observables over it, creating your reactive database.
  5. Finally, because the call to post() returns a Promise, which you convert to an observable, you use mergeMap() to flatten the projected observable.
  6. Running this code prints the following:
    "Processing transaction for: Brendan Eich"
    "New record created: 4F7404AF-10D2-8438-AEAB-CC21CDC23810"
    "Processing transaction for: George Lucas"
    "New record created: A9ACE7FE-85DB-484E-AA74-B47A7F4D32B1"
    "Processing transaction for: Emmet Brown"
    "New record created: DD469ACA-BC5C-A5C6-8E4A-0FB544C62231"
    "Processing transaction for: Bjarne Stroustrup"
    "New record created: B5C8B8C7-127B-11C7-A90E-64D79C8315E2"
    "Database populated!"

Another benefit of wrapping observables over the database API is that all side effects are pushed downstream to observers instead of each Promise.then() call. It’s nice to keep your business logic pure as much as possible and side effects isolated.

Depending on the size of the transaction objects, when storing thousands of them in an array, you could end up with very large memory footprints. Of course, you’d like to avoid keeping all of that data directly in memory, which is why you leverage the browser’s database to store this data within it but persisted out of memory. To make this example simple, you use a small array. Most likely you’ll also want to keep transactions created locally as well as data coming in remotely. Can you guess which operator you need? Correct! You can use RxJS’s merge() to plug in all of the data from multiple sources:

Rx.Observable.merge(                                       1
    getTransactionsArray(),
    Rx.Observable.fromPromise(ajax('/transactions')))
  .concatMap(writeTx$)
  .subscribe(
    rec => console.log(`New record created: ${rec.id}`),
    err => console.log('Error: ' + err),
    ()  => console.log('Database populated!')
  );

  • 1 Merging the output from local and remote streams

The rest of the code continues to work exactly the same way. Brilliant! The asynchronicity of code is seamless in reactive programming!

And in the event that the remote HTTP call response is not an array, remember that you can make the observable conformant, just as we discussed earlier, and push some logic upstream like this. It’s typical of remote calls to return an object with a status and a payload. So if you’re response object is something like

{
   status: 'OK',
   payload: [{name: 'Brendan Eich', ...}]
}

you can make it conformant as you inject it into the stream:

Rx.Observable.merge(
    getTransactionsArray(),
    Rx.Observable.fromPromise(ajax('/transactions'))
      .mergeMap(response => Rx.Observable.from(response.payload))     1
  )
  .concatMap(writeTx$)
  ...

  • 1 Converts the JSON response object into an array that gets combined with the other transaction records and pushed through the stream

Moreover, databases are full of optimizations to improve read and write speed. You can further help these optimizations by performing bulk operations whenever possible.

6.3.2. Writing bulk data

The previous code samples create single bank transaction records at a time. You can optimize this process with bulk operations. Bulk operations write an entire set of records with a single post request. Naturally, the PouchDB operation bulkDocs() takes an array. Earlier, we talked about how much memory was used to build this set, and this is completely in your control using RxJS buffers.

The buffer() operator that you saw back in chapter 4 would come in handy here when you’re processing not just a handful of transactions but hundreds of them. Let’s optimize listing 6.10 with the following listing.

Listing 6.11. Optimizing write operations using bulk writes
Rx.Observable.from(getTransactionsArray())
  .bufferCount(20)                                                  1
  .timestamp()                                                      2
  .map(obj => {
      return obj.value.map(tx => {                                  3
        return Object.assign({}, tx, {
            date: obj.timestamp
          })
      })
  })
  .do(txs => console.log(`Processing ${txs.length} transactions`))
  .mergeMap(datedTxs =>
        Rx.Observable.fromPromise(txDb.bulkDocs(datedTxs)))         4
  .subscribe(
    rec => console.log('New records created'),
    err => console.log('Error: ' + err),
    ()  => console.log('Database populated!')
  );

  • 1 Buffers 20 transactions at a time
  • 2 Timestamps the entire set of items at once
  • 3 Loops within each set and assigns a date to each transaction object
  • 4 Performs bulk operation upon passing the entire buffer

To support this optimization, you had to make a few adjustments. After collecting 20 objects with bufferCount(20), the data passing through the stream is now an array instead of a single record, as shown in figure 6.11.

Figure 6.11. Flow followed to add items in bulk (in this case, 20 at a time)

Alternatively, you could have also buffered for a certain period of time with -buffer(Rx.Observable.interval(500)); this decision depends on the amount of data your application will process. In this case, each record will be kept in a buffer for 500 milliseconds, at which point it will be released and all the records can be written in bulk to the database.

But there’s a problem with just using a count- or time-based buffer. If the user attempts to navigate away from the page while the data is being cached, you could potentially lose anything waiting in the buffer, up to 20 transactions in this case, which will never get saved. To fix this, let’s introduce another observable to trigger a buffer write. Buffers also support signaling, so that the emission can occur in response to the execution of some browser hook, such as the closing of the window. To implement this you can use the bufferWhen() operator with an observable that’s smart enough to support both use cases: to cache the results for a specific period of time or emit before the browser closes:

Rx.Observable.from(getTransactionsArray())
  .bufferWhen(() =>                                       1
    Rx.Observable.race(                                   2
      Rx.Observable.interval(500),
      Rx.Observable.fromEvent(window, 'beforeunload'))    3
   )
   ...

  • 1 Buffers events from the source observable until the provided observable emits
  • 2 Creates an observable that mirrors the first observable to emit a value of the ones provided to it. In this case, it will emit after half a second or when the window closes, whichever comes first.
  • 3 Hooking into the browser closing event. Because the contents within the buffer storage are emitted as a single array object and processed synchronously, there’s no danger of the browser shutting down before the buffer gets processed.

bufferWhen(), instead of taking an observable to trigger the start of each new buffer, accepts a closing selector method that’s re-invoked every time the buffer is closed, and the resulting observable is used to determine when the next buffer should close. Using this, you can create a signal observable that has a host of possible constraint states. Now that you know how to get data into the database, let’s join with a query that can count the total number of records.

6.3.3. Joining related database operations

All of the local store operations, whether you’re using IndexedDB directly or PouchDB, happen asynchronously, but with RxJS you can treat your operations almost as if they were synchronous because of the abstraction that it poses over the latency involved in database calls. To illustrate this, you’ll chain together an operation to insert a record, followed by an operation that retrieves the total count.

PouchDB is a map/reduce database, so in order to query the data, you must first define how the projection or the mapping function works. This object is called a design document, which you need to include as part of the query. For your purposes, you’ll keep it simple and count the number of transactions performed. So your design document—you’ll call it count—looks like this:

const count = {
  map: function (doc) {
    emit(doc.name);           1
  },
  reduce: '_count'            2
};

  • 1 Counts the number of users
  • 2 Uses the reduce PouchDB aggregate operator _count

The next listing shows how you can join two queries with a single stream declaration.

Listing 6.12. Two queries in a single stream declaration
Rx.Observable.from(getTransactionsArray())
  .switchMap(writeTx$)                                      1
  .mergeMap(() => Rx.Observable.fromPromise(
     txDb.query(count, {reduce: true})))                    2
  .subscribe(
    recs  => console.log('Total: ' + recs.rows[0].value),   3
    error => console.log('Error: ' + error),
    ()    => console.log('Query completed!')
  );

  • 1 Posts a single transaction
  • 2 Runs a reduction query to count the total number of documents in the table
  • 3 Prints the total value

PouchDB also has some reduction operations of its own, and you understand what a reduction is because you’re an experienced functional programmer by now. Aside from count, you can perform other reductions such as sum and stats. Let’s go over another example that combines all of what you’ve learned thus far. It performs a withdraw from the account database and creates a new transaction document, as shown in figure 6.12.

Figure 6.12. The backend workflow that takes place when a withdraw operation occurs. First, you find the account by ID, and if it results in a valid object, you subtract the withdraw amount and update the account.

First, you’ll need to seed a set of user accounts with the following structure (again, you’ll keep your domain simple).

Listing 6.13. The Account class
class Account {
  constructor(id, name, type, balance) {
     this._id = id;                          1
     this.name = name;
     this.type = type;
     this.balance = balance;
  }

  get id() {
    return this._id;
  }
}

  • 1 The _id field is used to tell PouchDB’s put() method to use your provided ID instead of generating a new one. You can use PouchDB’s get() method to query by this ID.

Similarly, you’ll create a few different types of accounts for your user Emmet Brown—

const accounts = [
     new Account('1', 'Emmet Brown', 'savings', 1000),
     new Account('2', 'Emmet Brown', 'checking', 2000),
     new Account('3', 'Emmet Brown', 'CD', 20000),
  ];

—to populate a new document store:

const accountsDb = new PouchDB('accounts');

Because you’re already familiar with creating databases and populating them from the earlier example, you’ll jump right into the withdraw() function, which returns an observable responsible for creating the flow to query and update multiple databases, as shown in the next listing.

Listing 6.14. withdraw function
function withdraw$({name, accountId, type, amount}) {                      1
   return Rx.Observable.fromPromise(accountsDb.get(accountId))             2
     .do(doc => console.log(
         doc.balance < amount ?
          'WARN: This operation will cause an overdraft!' :
          'Sufficient funds'
      ))
     .mergeMap(doc => Rx.Observable.fromPromise(                           3
        accountsDb.put({
         _id: doc._id,
         _rev: doc._rev,
         balance: doc.balance - amount
        }))
     )
     .filter(response => response.ok)                                      4
     .do(() =>
         console.log('Withdraw succeeded. Creating transaction document'))
     .concatMap(() => writeTx$(
           new Transaction(name, 'withdraw', amount, type)));             5
}

  • 1 Unpacks the input into the parameters needed for the transaction
  • 2 Retrieves the existing account info
  • 3 Updates the user balance
  • 4 Continues only if the DB update was successful
  • 5 Creates the transaction record and return it

You can run this code by passing it an operation object literal:

withdraw$({
  name: 'Emmet Brown',
  accountId: '3',
  type: 'checking',
  amount: 1000
})
.subscribe(
   tx    => console.log(`Transaction number: ${tx.id}`),
   error => console.log('Error: ' + error),
   ()    => console.log('Operation completed!!')
);

This will generate the following output:

"Sufficient funds"
"Withdraw succeeded. Creating transaction document"
"Processing transaction for: Emmet Brown"
"Transaction number: DB6FF825-C703-0F1A-B860-DA6B1138F723"
"Operation completed!!"

As you can see, because the API of PouchDB uses Promises, it’s easy to integrate your database code with your business logic, all wrapped and coordinated via the observable operators. Although database calls are a form of side effect, it’s one you’re willing to take in practice and rely on the unidirectional flow of streams to streamline the use of this shared state. But wrapping API calls is not the only thing you can do with PouchDB. In addition, you can build support for a reactive database.

6.3.4. Reactive databases

PouchDB is an event emitter, which means it exposes a set of events or hooks for you to use to plug logic into certain phases of its lifecycle. Just as GitHub exposes hooks to tap into when branches are created, you can add event listeners that fire when databases are created and destroyed.

This is important in browser storage where the lifespan of a database is temporary because it can be destroyed and re-created at any time. So before you begin adding any documents, it will be good to do so in the context of a database-created hook.

Using the Rx.Observable.fromEvent() operator, you can transform any event emitter into an observable sequence. Hooking into the database-creation event looks like the following:

Rx.Observable.fromEvent(txDb, 'created')
  .subscribe(
    () => console.log('Database to accept data!')
  );

Adding this check in your streams is easy. All you need to do is key off of that hook to perform all your logic. This is somewhat similar to waiting for the document to be ready before executing any of your JavaScript code. The withdraw operation would look like this:

Rx.Observable.fromEvent(txDb, 'created')                 1
  .switchMap(() =>
            withdraw$({
             name: 'Charlie Brown',
             accountId: '1',
             type: 'checking',
             amount: 1000
            })
 )
 ...

  • 1 Reacts to the ‘created’ event

In this chapter, you saw how you can bring together multiple distinct subsystems and build coherent state machines from them. Each example brought out a small piece of functionality that could be independently attached to and handled. The combinatorial operators allow you to join each stream while maintaining the same separation of concerns that you achieved with single streams. Notice that, so far, none of the code you’ve written has accounted for errors or exceptions. What if there’s an error inserting a record into a database? What if there’s an exception happening when you call some third-party function? In the next chapter, we’ll take these same concepts and show you how to make your applications more fault tolerant against the unexpected.

6.4. Summary

  • You joined parallel URL shortening services with combineLatest() and spawned multiple observable sequences with forkJoin().
  • You used buffering to improve the performance database queries.
  • You used observables to control the lifespans of non-observables like user -sessions.
  • You saw how reactive databases allow you to orchestrate business flows involving permanent storage.
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset