Chapter 8. Mocks

As illustrated in Chapter 7, it’s desirable to replace data pipeline dependencies when unit testing. This helps reduce cloud costs, as you aren’t using resources or quotas while testing, and it expands test coverage. In addition to making it easier to run tests in CI, this approach can provide better test coverage versus using live services.

With the different types of dependencies in data pipelines, creating mocks can feel like peeling an onion. Maybe you just created a mock for unit-testing code that acquires data from an API, and now you are back on Stack Overflow looking for advice on how to mock interactions with cloud storage. It’s not that mocking is difficult; it’s the variety of interfaces data pipelines interact with that can make this endeavor challenging.

This chapter eliminates the onion peeling by consolidating techniques for replacing common data pipeline dependencies in one place. Starting with advice on how to evaluate test double placement and efficacy, you’ll see how to build mocks for generic interfaces and cloud services using common Python modules and CSP client mock libraries. The last technique is to use test databases for situations where you need to test code that manipulates database objects.

The code examples in this chapter can be found in the testing directory in GitHub. To run these examples, follow the setup instructions in the README. The Spark dependencies are not required for running the examples in this chapter.

For a deeper dive into these techniques, “Further Exploration” has some derivative mocking techniques you can try out.

Considerations for Replacing Dependencies

A good test double can save time, lower costs, and improve test coverage. A bad test double can be ineffective or, worse, hide broken code. In this section, you’ll see how to assess where dependency replacement makes sense and how to position a test double to be most effective.

Placement

Test doubles involve a trade-off of being able to test without a dependency at the expense of not having a live service to work with. Live services provide feedback, such as via HTTP error codes with an API. These behaviors need to be mimicked by a mock that replaces this service.

When creating a test double, apply it as close as possible to the interface with the dependency you are replacing. I’m pointing this out explicitly because it is often possible to place a test double at several places along the code path of a dependency. Let’s take a look at an example to see how this can occur and the consequences when it does.

The lat_long_to_pop method in geocoding.py gets the population of an area given a latitude and longitude. The lookup is performed by the methods get_zip and get_population, which make requests to a fictitious API:

def lat_long_to_pop(lat_long):
   zipcode = get_zip(lat_long)
   pop = get_population(zipcode)
   return {zipcode: pop}

To test geocoding.py without querying the API, you may be tempted to stub the get_zip and get_population methods. These stubs enable testing of lat_long_to_pop, as shown in test_geocoding.py, without hitting the API endpoint. Don’t get too caught up on the mock decorators in this code; you’ll see how to use these shortly. For now, just know that the get_zip and get_population methods are not actually called during the test. Instead, a fake response is provided, denoted by the return_value:

@mock.patch('geocoding.get_zip', mock.Mock(return_value='95472'))
@mock.patch('geocoding.get_population', mock.Mock(return_value='1000'))
def test_lat_long_to_pop():
   assert lat_long_to_pop((38.4021, 122.8239)) == {'95472':'1000'}

While this test validates that lat_long_to_pop returns the expected values in the expected format, you need more than this test to have adequate test coverage. There’s no coverage for error conditions that could occur when querying the API or verifying that the API response is parsed as expected.

This is a gap between the stub and the interface to the API, which is the requests.get methods in get_zip and get_population. Without additional testing, these stubs effectively hide the code in get_zip and get_population from testing.

A former colleague shared that a situation like this led to deploying a broken service, despite thousands of passing unit tests. It turned out that a gap between where a mock was applied and the interface to the dependency hid some broken code.

Dependency Stability

When working with a dependency that can change, such as a beta product feature, building a test double may not be feasible or reasonable. On a project for which my team was evaluating an experimental Druid feature, an engineer built the code in an integration test environment, where he ran a Druid instance locally instead of trying to mock the Druid interactions.

Because the Druid feature was experimental, it didn’t make sense to try to mock the interactions, as they were likely to change. This can also happen if dependencies roll out backward-incompatible changes. In this case, the mock would need to be updated to reflect the changed dependency. If the dependency was modeled with a client library, using autospec will alert you to this change (more on this later). This is another reason to mock close to a dependency interface.

Complexity Versus Criticality

It’s important to keep in mind that a test double is another piece of code that needs to be maintained. Similar to being judicious with corner cases in unit tests, consider the trade-off of creating and maintaining test doubles against the importance of the test coverage they provide.

In “Cloud Components”, you saw how to evaluate whether to test cloud service interactions. I mentioned that our team unit-tested DynamoDB interactions, which required setting up a few layers of cloud service mocks. This was a bit more complex on the mocking side, but it was worth the investment due to both the criticality of the code and the cost in cloud resources and time to test by running the entire pipeline.

On the other hand, the AWS Lambda function in the HoD pipeline was created to alert on the presence of night heron content. This is an important function of the system, but trying to mock a cloud event, the Lambda function invocation, and the Slack message would not be worth the time to run this as a unit test. This was not as critical as the DynamoDB process, and it was also something that could be checked with an integration test.

Reducing gaps between dependency interfaces and test doubles, assessing dependency stability, and evaluating the complexity versus criticality trade-off will pinpoint where test doubles make sense in your test suites. With this in mind, let’s move on to different techniques for replacing common data pipeline dependencies.

Mocking Generic Interfaces

To get started building mocks, consider the interfaces in Figure 8-1, which shows the “Validate data” and “Get zip code” steps of the HoD survey data pipeline.

Internal interfaces, such as the one between “Validate data” and “Get zip code,” can often be unit-tested by providing fake data, which is the topic of Chapter 9. For example, to test “Get zip code,” you need data that has passed the “Validate data” step.

Interfaces for “Validate data” and “Get zip code” steps
Figure 8-1. Interfaces for “Validate data” and “Get zip code” steps

When it comes to external dependencies, such as the interface between the “Get zip code” step and the Geocoding service, mocks may be needed to test interactions. Interacting with external interfaces often involves the following actions:

  • Making a request

  • Handling a response

  • Detecting and handling connectivity issues

There are a few ways to replace a dependency with a mock. One approach is dependency injection, which was covered in Chapter 6. Instead of using the module that connects to the dependency, pass in a module that provides a mock response for testing.

More often, I’ve seen codebases where the ability to inject dependencies is limited, either due to code design or because the method that needs to be mocked is in a library. In these cases, you can apply a patch to swap in a mock, as illustrated in the next section.

Responses

External interfaces often return something in response to a request, such as a job identifier, data, or the status of a running job. On the surface, unit testing the logic that handles this response is similar to testing data transformation logic: provide input data and test that the output is as expected. The difference is that often the data and the mechanism for acquiring it are tightly coupled, requiring you to mock the behavior of the interface in addition to providing fake data.

Recall the gap between the API interface and the stubs used in test_​lat_long_to_pop in “Placement”. Let’s take a look at how to close this gap with a mock for the get_zip method in geocoding.py:

GEOCODING_API = "http://www.geocoding.com/get_zipcode"
def get_zip(lat_long):
   response  = requests.get(GEOCODING_API)
   if response.status_code != 200:
       raise GeocodingError(f"Unable to get zipcode for {lat_long}")
   result = response.json()
   return result["zipcode"]

Testing this code requires mocking the geocoding API interactions. This can be done with responses, as will be described in “Requests”. Rather than starting with responses, I’ll illustrate how to set up mocks from scratch. This will help you generalize the techniques to other types of interfaces besides APIs. As it turns out, responses uses some of the approaches described in this section.

The test_get_zip_404 test in test_geocoding.py is an example of mocking an API response by patching the requests module using unittest.mock:

@mock.patch('geocoding.requests', autospec=True)
def test_get_zip_404(mock_requests):
   mock_requests.get.return_value.status_code = 404
   mock_requests.get.return_value.json.return_value = {}

   with pytest.raises(GeocodingError):
       get_zip((38.4021, 122.8239))

In this test, the requests library is replaced with a mock, mock_requests. When requests.get(GEOCODING_API) is executed in the get_zip method during the test, the mock is called instead of the requests module. Rather than sending the request to the Geocoding service, the mock_requests patch is configured to return a 404 status code and an empty JSON value.

Notice that mock.patch is applied to geocoding.requests. This applies the mock only to requests methods called in the geocoding module. If instead the test was decorated with @mock.patch('requests', . . .), all calls to the requests library would use the mock. The unittest documentation has some helpful advice on where to apply patches.

Warning

Beware that misspellings in mocks can result in tests passing silently while the mocks are not getting called, as described in the autospeccing documentation. This occurs because any attribute can be added to a mock object, so something that looks to you like a typo looks to a mock like a new attribute. The recommended mitigation for this issue is to use autospec, which creates mock objects that have the same attributes and methods as the objects they’re replacing. An additional advantage of using autospec is that if the class being spec’d changes, your mocks will break instead of silently allowing code to pass while doing nothing.

In addition, pay particular attention when working with semistructured formats like JSON and dynamically typed languages. It’s easier to make mistakes representing data and data structures in mocks without the benefit of type checking.

Using the same approach, here is a test to validate a successful API call. In this case, a successful response results in providing a JSON payload with the zip code:

@mock.patch('geocoding.requests', autospec=True)
def test_get_zip_ok(mock_requests):
   mock_requests.get.return_value.status_code = 200
   mock_requests.get.return_value.json.return_value = {"zipcode": "95472"}
  
   assert get_zip((38.4021, 122.8239)) == "95472"

Requests

Another aspect of testing interfaces is checking the information sent when issuing a request; unittest provides a variety of assertion methods for performing this kind of validation.

As an example, you can assert that the Geocoding service is called exactly once with the expected parameters, as illustrated in test_get_zip_ok in test_geocoding.py:

@mock.patch('geocoding.requests', autospec=True)
def test_get_zip_ok(mock_requests):
   . . .
   assert get_zip((38.4021, 122.8239)) == "95472"
   mock_requests.get.assert_called_once_with(
               url=geocoding.GEOCODING_API,
               params={"lat_long": (38.4021, 122.8239)}
           )

Now that I’ve covered how to create a few mocks for APIs created from scratch, let’s take a look at how to validate request parameters with responses.

The test_get_zip_ok_resp method in test_geocoding.py performs the same test as test_get_zip_ok but uses responses to mock the API behavior:

@responses.activate()
def test_get_zip_ok_resp():
   zip_resp = responses.get(
       geocoding.GEOCODING_API, status=200,
       json={"zipcode": "95472"})
   assert get_zip((38.4021, 122.8239)) == "95472"
   assert zip_resp.call_count == 1

The line responses.get(. . .) creates the mock response, similar to the mock_requests object. Here, a requests.get call with the specified parameters is registered in the responses environment. When a call to requests.get occurs, responses checks to see whether a similar signature was registered, and if so, it returns the registered values. If not, the test will fail.

To see how this works, modify the responses.get signature, such as by replacing the URL:

def test_get_zip_ok_resp():
   zip_resp = responses.get(
       # geocoding.GEOCODING_API,
       "www.python.org",

Now run the test and observe what happens:

$ cd testing
$ pytest -v test_geocoding.py::test_get_zip_ok_resp
. . .
test_geocoding.py::test_get_zip_ok_resp FAILED
. . .
- GET www.python.org Next 'Response' in the order doesn't match due to the 
      following reason: URL does not match

Because the API call in get_zip was made with a different URL than what was registered with responses.get, responses throws an error because it can’t find a get method with the corresponding URL in the registry. Comparing this code to test_get_zip_ok you can appreciate the work responses saves when testing API interactions.

Connectivity

The test_get_zip_404 test validated how get_zip would handle an error response from the API. In addition to testing the handling of known error responses from an external interface, consider testing logic that deals with connectivity issues. For example, an external dependency could be offline or temporarily overloaded, or you may need to periodically check back on a job status. Often these scenarios can be handled with retry logic.

When thinking about how to test retry logic, consider the potential failure scenarios and how the code should respond. Thinking back to “Automatic Retries”, you wouldn’t want to retry a database query if the reason it failed was because the underlying table didn’t exist, but you would want to retry on a connectivity error that could be a temporary networking blip.

Another thing to consider when testing retries is limiting tests to the failure conditions that exercise specific code paths. For example, there are more than 20 HTTP response status codes, but that doesn’t mean all of them should be tested. Logging errors that occur when interacting with interfaces will help track down error cases not handled in testing. If a particular error type starts showing up regularly in the logs, you can add logic to handle that case and add a unit test to cover it.

To see how to test connectivity issues, let’s add retries to get_zip using the tenacity library in this new method, get_zip_retry:

@tenacity.retry(retry=tenacity.retry_if_exception_type(GeocodingRetryException),
      stop=tenacity.stop_after_attempt(5),
      wait=tenacity.wait_exponential(multiplier=1, min=4, max=10),
      reraise=True)
def get_zip_retry(lat_long):
   response = requests.get(GEOCODING_API, {"lat_long": lat_long})
   if response.status_code == 429:
       raise GeocodingRetryException()
   . . .

The get_zip_retry method includes an additional check for status_code == 429 that typically is used to signify too many requests. In this case, retries are made using an exponential backoff defined by wait_exponential and are stopped after five retry attempts. If at this point the API returns another 429, the GeocodingRetryException will be reraised by tenacity. An important note for testing: using stop_after_attempt allows the retry delay to be eliminated in testing, as you’ll see shortly.

When retrying, multiple calls are being made to the same resource. To test retries you need to mock different responses every time requests.get is called. One way to approach this is to continue using mock.patch, using the side_effect parameter to supply different responses for every call to geocoding.requests:

@mock.patch('geocoding.requests', autospec=True)
def test_get_zip_retry_mock(mock_requests):
   . . .
   responses = [resp_429, resp_429, resp_200]
   mock_requests.get.side_effect = responses
   zip = get_zip((38.4021, 122.8239))

The order of the responses dictates the order in which the mock response will be provided. In this example, the mock will return two 429 events followed by one 200 event.

Another option is to use responses to handle the retries. The OrderedRegistry guarantees that the API calls will be performed in the order they are added to the registry with responses.get:

from responses.registries import OrderedRegistry

@responses.activate(registry=OrderedRegistry)
def test_get_zip_retry():
   responses.get(geocoding.GEOCODING_API, status=429, json={})
   responses.get(geocoding.GEOCODING_API, status=429, json={})
   responses.get(geocoding.GEOCODING_API, status=200, json={"zipcode": "95472"})
   zip = get_zip_retry((38.4021, 122.8239))
   assert zip == "95472"

One final line of code needed for both of the get_zip_retry tests is get_zip_retry.retry.sleep = mock.Mock(). This sets the tenacity delay between retries to zero. Because the retry logic uses stop_after_attempt, setting the delay to zero eliminates waiting for the retry loop.

Something else you can test is the number of retries using tenacity statistics, which can be accessed by the calling method via the get_zip_retry.retry.statistics property. After a run of test_get_zip_retry, get_zip_retry.retry.statistics returns the following:

'attempt_number': 3,
'delay_since_first_attempt': 0.00052,
'idle_for': 8,
'start_time': 1.546632987

This can be handy to ensure that a specific number of retries are attempted. For example, this method tests that the code retries five times before raising a GeocodingRetryException:

def test_get_zip_retries_exhausted():
   resp_429 = MockResponse({}, 429)
   get_zip_retry.retry.sleep = mock.Mock()
   with mock.patch('geocoding.requests.get', side_effect=[resp_429]*6):
       with pytest.raises(GeocodingRetryException):
           get_zip_retry((38.4021, 122.8239))
   assert get_zip_retry.retry.statistics.get('attempt_number') == 5
Warning

Not only can it take a few attempts to connect to an external resource, it can also take a few tries to get retry logic right. This is especially true if you are using a CSP client library to handle retries, where there isn’t a lot of visibility or control over how the retries are done.

The techniques in this section illustrated how to use unittest.mock and responses to set up mocks at a dependency interface, helping to reduce the gap between a dependency and its mock. This enables you to test the responses, requests, and connectivity interactions with an external resource. I’ve used these approaches for testing API interactions and database reads, connectivity, and exception handling.

Mocking Cloud Services

Cloud service interactions can be a bit different than the API interactions described in the previous section. When working with major CSPs like Azure, AWS, or Google, you often have access to a client library to interact with services, instead of making an API request. In the case of AWS there is also a mocking library, moto, that can be used with the official AWS Python client, boto.

Building on “Mocking Generic Interfaces”, this section covers how to combine unittest mocks and patches with pytest fixtures to set up cloud service mocks, as well as some examples of moto for AWS.

Recall the retry mechanism for the HoD survey data pipeline, as shown in Figure 8-2. When “Extract species” completes, it writes data to the Temp Storage bucket to provide a state to retry from if “Enrich with social” fails. Once “Enrich with social” has succeeded, the Temp Storage data should be removed.

Temporary storage of “Extract species” data
Figure 8-2. Temporary storage of “Extract species” data

This section describes a few ways to approach testing the code that deletes the temporary data by creating mocks for cloud storage.

Building Your Own Mocks

While AWS users have the luxury of the moto library, Google users presently have to create their own mocks, as a mock library is out of scope for the Google cloud storage library. Interestingly, the contributor responding to this issue recommends using mock with autospec, which should sound familiar.

Following is the delete_temp method from cloud_examples.py that performs the temporary data deletion using GCS. Given a bucket_name and prefix, the delete_temp method will remove all objects under gs://bucket_name/prefix:

from google.cloud import storage

def delete_temp(bucket_name, prefix):
   storage_client = storage.Client()
   bucket = storage_client.get_bucket(bucket_name)

   blobs = bucket.list_blobs(prefix)
   for blob in blobs:
       blob.delete()

To test that delete_temp removes all objects in the specified location, you can create a mock of the Google storage client and create some mock objects to delete, as in test_delete_temp from test_cloud_services.py:

@mock.patch('cloud_examples.storage', autospec=True)
def test_delete_temp(storage):
   blob = mock.Mock(Blob)
   Blob.delete.return_value = None
   mock_bucket = storage.Client.return_value.get_bucket.return_value
   mock_bucket.list_blobs.return_value = [blob, blob]
   . . .

In this test, the GCS client imported in cloud_examples.py, cloud_examples.storage, uses the same mock.patch decorator described in “Mocking Generic Interfaces”. Similar to mocking the return_value of requests.get in the API example, the return_value of the various storage client methods called in delete_temp needs to be mocked as well.

In addition to setting autospec for the storage client, a spec can be set for the mock objects returned from the storage client, such as with the blob mock. The spec for blob is set to the cloud storage Blob class, so if you attempt to add a method that isn’t in the spec, an AttributeError will be raised:

from google.cloud.storage import Blob
def test_delete_temp(storage):
   blob = mock.Mock(Blob)

It can be a little hard to follow the chain of mocks from the storage client down to the list of blobs returned by the list_blobs method. I find it helpful to think backward when building mocks like this. Start with what you need the mock to produce, in this case a blob that has a delete method, and begin by creating those mocks, blob in this case. The next level from here is mocking a bucket that has a list_blobs method that returns the list of blob.

To help you visualize this, here’s the delete_temp method commented with the corresponding mock for each storage client method call:

def delete_temp(bucket_name, prefix):
   storage_client = storage.Client() # @mock.patch('cloud_examples.storage' . . .)
   bucket = storage_client.get_bucket(bucket_name) 
            # storage...get_bucket.return_value

   blobs = bucket.list_blobs(prefix) # mock_bucket.list_blobs.return_value
   for blob in blobs: # [blob, blob]
       blob.delete() # blob = mock.Mock(Blob); blob.delete.return_value = None

With the mock in place, consider what cloud component operations to validate. You can start by validating that the blobs returned from the list_blobs have been deleted, which can be done by checking the call_count of the blob.delete mock. Since there are two blobs in the list, delete should be called twice:

@mock.patch('cloud_examples.storage', autospec=True)
def test_delete_temp(storage):
            . . . mock setup code . . .
mock_bucket.list_blobs.return_value = [blob, blob]
cloud_examples.delete_temp("fake_bucket", "fake_prefix")
assert blob.delete.call_count == 2

In addition to verifying that something is getting deleted, check that the items getting deleted are the ones specified in the call to delete_temp:

cloud_examples.delete_temp("fake_bucket", "fake_prefix")
client_mock = storage.Client.return_value
client_mock.get_bucket.assert_called_with("fake_bucket")
client_mock.get_bucket.return_value.list_blobs.assert_called_with("fake_prefix")

If you run the test, it should pass, which is not terribly interesting:

$ cd testing
$ pytest -v test_cloud_services.py::test_delete_temp

test_cloud_services.py::test_delete_temp PASSED

To simulate a code bug, comment out the delete loop in delete_temp in cloud_examples.py:

   blobs = bucket.list_blobs(prefix)
   # for blob in blobs:
   #     blob.delete()

Now the test will fail because nothing is getting deleted:

test_cloud_services.py::test_delete_temp FAILED  
. . .
FAILED test_cloud_services.py::test_delete_temp - AssertionError: assert 0 == 2

You’ll notice that there are no GCS authentication credentials in place for this test—the mock enables this behavior to be tested without connecting to GCS. There’s no need to set up authentication and no risk that objects could be unintentionally deleted or modified, which could happen if you were testing while connected to GCS. This unit test is entirely self-contained.

Mocking with Moto

If you’re working in AWS, you have a leg up on this mocking business thanks to the moto library. Unlike the GCS scenario, it’s not necessary to create the storage client mock from scratch. To see how this works, let’s take a look at an AWS version of the delete_temp_aws function from cloud_examples.py, which uses the boto3 AWS client library:

def delete_temp_aws(bucket_name, prefix):
   s3 = boto3.client('s3', region_name='us-east-1')
   objects = s3.list_objects_v2(Bucket=bucket_name, Prefix=prefix)
   object_keys = [{'Key':item['Key']} for item in objects['Contents']]
   s3.delete_objects(Bucket=bucket_name, Delete={'Objects':object_keys})

Fundamentally it’s pretty similar to the GCS version: create a client, list the objects at the specified bucket/prefix, and delete the objects.

The difference between these approaches is more pronounced when you see how to set up mocks with moto. With the GCS client, the mock needs to be set up to return a list of objects so that delete_temp has objects to delete. With moto, the objects to be deleted need to be created. Keep in mind that these objects aren’t created in the cloud but rather in the mock environment created by moto.

Here, moto is mocking the environment, whereas the GCS mock in the previous section mocks the results of method calls. Where the GCS mock used mock.patch for the GCS client, moto provides a pytest fixture for the S3 client:

def test_delete_temp_aws(s3):
   s3.create_bucket(Bucket="fake_bucket")
   s3.put_object(Bucket="fake_bucket", Key="fake_prefix/something", 
                 Body=b'Some info')

With the bucket and object created in the mock environment, delete_temp_aws has something to delete. Rather than asserting that mock calls occurred as in the GCS case, you can check that there are no objects in the bucket when delete_temp_aws returns:

   delete_temp_aws("fake_bucket", "fake_prefix")
   obj_response = s3.list_objects_v2(Bucket="fake_bucket", Prefix="fake_prefix")
   assert obj_response['KeyCount'] == 0

Where does the mock environment for moto come from? The s3 and aws_credentials fixtures in conftest.py provide some clues:

@pytest.fixture(scope="function")
def aws_credentials():
   os.environ['AWS_ACCESS_KEY_ID'] = 'testing'
   . . .
@pytest.fixture(scope="function")
def s3(aws_credentials):
   with mock_s3():
       yield boto3.client('s3', region_name='us-east-1')

The moto library works by intercepting calls to the AWS API. Notice that the s3 mock is creating a boto S3 client, but within a moto mock_s3 context. The boto library expects to find credentials in the environment, which is why the aws_credentials fixture is present, but these credentials are bogus. They never get used because the boto client is created within the mock context.

Another thing to notice is that the scope on the moto fixtures is set to function. scope refers to how often a pytest fixture is invoked during a test run. function is the default scope, but I wanted to make it explicit here to talk about state.

Remember that in the unit test, you’re creating a bucket and some objects. These are created in the moto mock_s3 environment. If instead I used a session scoped fixture, the objects created in test_delete_temp_aws would become part of the mock environment for other tests. It’s important to avoid accumulating state in this way with test fixtures as it leads to inaccurate unit-test results.

Another thing about the s3 fixture is the use of yield. By yielding the s3 client, the fixture provides the test with access to the context where the s3 client has been set up with the credentials. If instead I returned the s3 client, the context would be closed and this setup information would be lost.

To see the moto mocks in action, first try commenting out the code that creates and tests the bucket and object:

def test_delete_temp_aws(s3):
   # s3.create_bucket(Bucket="fake_bucket")
   # s3.put_object(Bucket="fake_bucket", Key="fake_prefix/something", 
   #               Body=b'Some info')

   # obj_response = s3.list_objects_v2(Bucket="fake_bucket", 
   #                                   Prefix="fake_prefix")
   # assert len(obj_response['Contents']) == 1

You should see the following error when running the test:

pytest -v test_cloud_services.py::test_delete_temp_aws
. . . 
botocore.errorfactory.NoSuchBucket: An error occurred (NoSuchBucket)
when calling the ListObjectsV2 operation: The specified bucket does 
not exist

Because the bucket wasn’t created in the s3 mock environment, the list_objects_v2 call failed. You might want to write a unit test for this case explicitly; what should delete_temp_aws do if it can’t find the bucket?

While the next section moves on from mocking, you’ll notice a lot of similarities between setting up mocks and working with a test database. Although a test database is not a mock, it is still necessary to manage state across tests, using fixtures to set up and tear down between tests.

Testing with Databases

When testing code that stores, modifies, or deletes data in a database, it can be preferable to create a test database instead of attempting to mock these interactions. Using a test database also prevents accidental corruption of data in a live system, in addition to removing the database as a testing dependency. This helps reduce the cost of testing by using a small, local database instead of connecting to a hosted instance, where you would incur cloud costs.

Sometimes, using a test database is not an option given the database infrastructure. For example, one pipeline I worked on interacted with Druid, which was not conducive to using as a local test database. In this case, the database interactions were limited to storing and retrieving data, so our team used mocks and fake data instead. In “Further Exploration”, you’ll have an opportunity to try out this technique of using mocks with databases.

This section illustrates how to set up a test database and where this can be helpful. In this situation, a test database is a local database, either in a local development environment or in CI, as opposed to a database hosted in a test tier, as described in Chapter 6.

Tip

One caveat to consider with test databases is that, ideally, you want to use the same type of database for testing as is used in production. For example, if production uses an AWS RDS instance running Postgres, you would want to use Postgres for the test database.

If you choose to use a different database type, such as SQLite for a smaller footprint, be sure that the operations being tested behave consistently between the test database and production database types.

Test Database Example

To set up the code examples and give you a sense of where a test database can be helpful, I’ll walk you through a project where I used a test database for testing a medical data management system. The input data had to be normalized for use by dashboards and search mechanisms. As a result, the process involved a lot of database interactions. A subset of the ETL process is depicted in Figure 8-3.

ETL process for medical data management system
Figure 8-3. ETL process for medical data management system

Starting with the Patient Data table in the upper left, the ETL process parsed the Treatment Info field to find matches in the Treatment and Delivery Mechanism lookup tables. A Match Results table was updated with the results, setting OK to true if both treatment and delivery mechanism had a match.

You can see the results of this process in the Match Results table; for Patient ID 2, “Drug B” is not found in the Treatment table, but for Patient ID 1, both the Drug ID and Delivery ID are present.

A later step in the process was to separate out records that needed review, shown in the Dispose step in Figure 8-3. Any records with OK=False in the Match Results table would be set aside for review by a medical data steward in the Review table. Records that were matched successfully across all lookups were moved to the Ingest table.

Because this was a medical data system, our team had to follow strict requirements for retaining data at every step of the ETL process. Where you might perform a lot of these steps in memory, writing out only the Ingest and Review tables, our team had to persist the results for each step in the pipeline. This information was also helpful for the data steward to assess where issues occurred in the process. As a result, we needed to test that intermediate tables such as Match Results were getting populated correctly.

Throughout this process, the ETL pipeline was manipulating data in the database—adding and updating records in the Match Results table and creating new records in the Ingest and Review tables. To unit-test this logic, our team needed to verify that these changes were happening as expected. With a test database, we could populate the Delivery Mechanism and Treatment lookup tables and the Patient Data table with the cases we wanted to test, run the ETL steps, and set assertions for each case.

As an interesting aside, the ETL process was written using SQL stored procedures, meaning we didn’t have access to the kinds of mocking tools described in this chapter. For unit testing, I wrote a configurable test framework in Python to execute the stored procedures against a test database, which you can find on GitHub.

With this system in mind, let’s take a look at how to set up and use a test database.

Working with Test Databases

There are test database packages, such as the pytest-postgresql plug-in for pytest. Under the hood, these packages use a database installed locally or in a Docker container. Keep in mind these are not mocks of databases but rather are fakes that encapsulate the techniques illustrated in this section.

To run the code referenced in this section, you need a local Postgres installation, or you need to use containers as described in Chapter 5. If you have another local database that is supported by SQLAlchemy, you can change the connection string in the test_conn fixture in conftest.py.

Linking test database creation and destruction closely to the testing process will ensure that the test database is always starting from scratch, preventing state from accumulating and causing unreliable test results. This can be done with a similar approach to the AWS example in “Mocking with Moto” by using pytest fixtures, as shown with the test_db fixture in conftest.py:

@pytest.fixture(scope="session")
def test_db():
   engine = setup_test_db()
   yield engine
   teardown_test_db(engine)

Using yield in the test_db fixture ensures that the database engine will be torn down when tests that use this fixture have completed, which is the recommended approach for teardown in pytest. In the teardown_test_db method, the test database is deleted. While not strictly necessary since the setup_test_db method removes the database if it exists, if a lot of data is accumulating in the test database, you can save resources by tearing it down when the test concludes. On the other hand, to retain the test database for debugging test failures, you would not want to execute teardown_test_db in the test_db fixture.

If not much data is accumulating during the test session, omitting teardown_test_db from the test_db fixture is fine. setup_test_db will drop the database if it exists at the beginning of the next test session. To have the choice to do either, add a command-line flag to pytest to selectively persist the database:

def pytest_addoption(parser):
   parser.addoption(
       "--persist-db", action="store_true",
       help="Do not teardown the test db at the end of the session",
   )
@pytest.fixture(scope="session")
def test_db(request):
   engine = setup_test_db()
   yield engine
   if request.config.getoption("--persist-db"):
       return
   teardown_test_db(engine)

Use this flag when you want to keep the test database, such as when debugging a test failure:

pytest -v test_medical_etl.py --persist-db

Notice that the fixture scope for test_db is session. This means that, once created, the yield will not be returned until the end of the testing session. Creating a database can be a resource-intensive activity, similar to the spark_context fixture in Chapter 7. session scoping amortizes this cost over the entire test session.

It’s important to note that while the session scope reduces overhead, it can mean you need to carefully manage state versus using a function scope. Similar to the cloud service mocks, the scope of fixtures for test databases is important to ensure that tests are idempotent.

For example, in conftest.py, the lookup tables Treatment and Delivery Mechanism are created as part of the session scoped fixture test_conn, but they are not torn down after the yield. This makes sense, as these are static tables that are not modified during the ETL process. The final teardown that deletes the test database in the test_db fixture will remove these tables:

def create_tables(conn):
   conn.execute("""
       CREATE TABLE treatment 
        . . .
       INSERT INTO treatment VALUES(1, 'Drug A');
        . . .
       CREATE TABLE delivery_mechanism 
        . . .
       INSERT INTO treatment VALUES(1, 'tablet');
        . . .""")

@pytest.fixture(scope="session")
def test_conn(test_db):
   . . .
   test_engine = create_engine(f"postgresql://{creds}@{host}/test_db")
   test_conn = test_engine.connect()
   create_tables(test_conn)
   yield test_conn
   test_conn.close()
   test_engine.dispose()

On the other hand, the Patient Data and Match Results tables have a function scope and are cleaned up after each test. This makes sense because each test case of Patient Data records will be unique, having a unique Match Result:

@pytest.fixture(scope="function")
def patient_table(test_conn):
   test_conn.execute("""
       CREATE TABLE patient_data (
        . . . )   
   yield
   test_conn.execute("DROP TABLE patient_data")

Notice that the patient_table fixture creates the patient_data table but does not populate it. This enables you to insert whatever data is relevant within the test method, as shown in test_medical_etl.py:

def test_match_success(test_conn, match_table, patient_table):
   test_conn.execute("""
       INSERT INTO patient_data VALUES (1, 'Drug A tablet 0.25mg')
   """)
Tip

When using a test database, it can be necessary to inspect the database state to debug a failing test. You’ve already seen one approach to doing this with the custom –persist-db command-line option.

When using a debugger, setting a breakpoint before and after database calls provides an opportunity to query the test database.

Another approach is the time-honored practice of adding print statements, as illustrated in the tests in test_medical_etl.py.

By default, pytest won’t print the print statements for passing tests, but it will if the test fails. To see print messages for passing tests as well, use the -s flag, which turns off the stdout and stderr capture mechanism in pytest.

In terms of how to use the db_engine fixture in unit tests, there are a few options. Chapter 6 covered how to use abstractions to have different possibilities for object storage, including a MockStorage class that could be used for testing. Similarly, if the code accepts the database engine as a parameter, you could use a TestDatabase class to connect to the test database, or pass the engine as a parameter to a method. Another possibility is to use mock.patch for the database connection, where the engine or conn objects are patched.

As mentioned earlier, test databases allow you to avoid the cost of testing with cloud databases. For example, the official Google Cloud SQL Python examples show how to use Google’s connector library with the sqlalchemy​.cre⁠ate_engine method to create the database connection. When testing, substitute the engine for the test database instead of the Cloud SQL engine.

Using a test database doesn’t preclude you from running unit tests in a CI pipeline. The Docker Compose techniques covered in Chapter 5 can be used to include a test database container for CI testing. I’ve used this technique with Google Cloud Build by adding a test database container to the build config.

Summary

This chapter illustrated how to replace data pipeline dependencies for testing, reducing test time, complexity, and cost and improving test coverage.

It’s not always practical to replace a dependency with a test double. Dependencies that are stable, are well understood, and play a critical role in pipeline operation are good candidates for replacement. On the other hand, if dependencies are immature, complex, or not critical, it may not be worthwhile or even possible to create quality test doubles.

You’ve seen several different approaches to creating and working with mocks. The responses and moto libraries provide ready-to-use mocks for APIs and AWS services, respectively. Python’s standard unittest library (which is used in both responses and moto) provides tools to patch, mock, and validate a wide variety of conditions. Combining these approaches with pytest fixtures helps streamline test code and manage state across a test suite.

Across these different techniques, there are common threads for creating good test doubles:

  • Place test doubles at the interface between the dependency and your codebase.

  • Focus mock behavior on critical interactions and log unexpected or rare cases.

  • Pay attention to state accumulation in mocks and fakes shared across multiple tests.

  • Use specs, such as autospec, to ensure that mocks accurately represent dependencies.

  • When testing against a mock, validate request and response payloads, error handling, and retries.

When testing code that modifies database objects, a test database provides the ability to test without connecting to an external database. This enables CI testing, reduces complexity in managing shared test tier databases, and does not incur cloud costs or risk data corruption.

Test databases are another case where pytest fixtures are a great help, enabling you to create test databases at test time and manage database state throughout testing. Creating static tables once per session, while creating and tearing down dynamic entities on a per-function basis, will streamline test fixtures and limit state accumulation.

Peeling onions is an unpleasant experience. I hope the collection of techniques and code examples in this chapter have peeled back some of the layers of mocking, helping you quickly replace dependencies without too much crying.

Recall from Chapter 7 that cost-effective testing involves minimizing dependencies, one part of which is mocking interfaces and using test databases as you saw in this chapter. The second part is reducing data dependencies through judicious use of live resources and creating robust fake datasets, which we will discuss in the next chapter.

Further Exploration

To build on the techniques described in this chapter, the following examples provide an opportunity to try them out for yourself. Possible solutions are provided as a guide.

More Moto Mocks

In cloud_examples.py, the hod_has_night_heron_data method is the code used by the AWS Lambda function to check whether night heron content is available. This method is called with a prefix to a location of the latest ingested data in env.ENRICHED_DATA_BUCKET. For example, if the latest data is stored at s3://{ENRICHED_DATA_BUCKET}/20220530T112300, the method call would be hod_has_night_heron_data("20220530T112300").

Using what you’ve seen about creating mocks with moto, write unit tests that validate the case where data at the specified prefix has night heron content as well as the case where there is no night heron content.

You can find a possible solution to this exercise in test_exercises.py.

Mock Placement

Earlier I talked about the “Enrich with social” step of the survey data pipeline in Figure 8-2, where relevant data from the HoD social media database would be matched back to the survey data based on the extracted species and zip code information. I’ve provided an implementation of a few methods in database_example.py that look up this information.

Given a species and zip code, get_hod_matches will return a DataFrame of IDs and content from the HoD database that matches on both parameters. Without rewriting the SQL, come up with an approach for testing get_hod_matches for the cases where there are no results from get_species_matches that occur in the results of get_zipcode_matches, meaning there are no rows in the DataFrame returned from get_hod_matches, and the case where you have one or more results returned from get_hod_matches.

Another thing to keep in mind when you are building mocks is that the fewer assumptions you need to make, the less likely you will be to have mistakes in your mocks. There is one very simple way to solve this problem, and one more involved way that results in better test coverage. For a hint, review the content for the get_zip tests earlier in this chapter.

A possible solution to this exercise is provided in test_database.py.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset