Handling data retrieval in highly-threaded environments

In this recipe, we will be retrieving data from the database in a threaded environment, making sure that it is the most current data possible.

Getting ready

We will be using the NuGet Package Manager to install the Entity Framework 4.1 assemblies.

The package installer can be found at http://nuget.org.

We will also be using a database for connecting to the data and updating it.

Open the Improving Transaction Scope solution in the included source code examples.

How to do it...

  1. We start with a test that defines how threads and contexts should interact when pulling the most current data. This will let us control the scope and make sure we have accomplished the goal with the following code:
    using System.Data.Entity;
    using System.Linq;
    using System.Threading;
    using BusinessLogic;
    using DataAccess;
    using DataAccess.Database;
    using Microsoft.VisualStudio.TestTools.UnitTesting;
    using Test.Properties;
    
    namespace Test
    {
      [TestClass]
      public class MultiThreadedOjectsQueriesTests
      {
        private IBlogRepository repo1 = null;
        private IBlogRepository repo2 = null;
        private string _changedInThread;
    
        [TestMethod]
        public void ShouldGetMostRecentDataFromDatabaseAfterAnUpdateFromAnotherThread()
        {
          //Arrange
          Database.SetInitializer(new Initializer());
          repo1 = RepositoryFactory.Get(Settings.Default.Connection);
    
          //Act
          Thread thread = new Thread(UpdateFirstBlog);
          var item = repo1.Set<Blog>().First();
          Assert.IsNotNull(item);
    
          thread.Start();
          while (thread.ThreadState == ThreadState.Running)
          {
            Thread.Sleep(1);
          }
    
          item = repo1.CurrentSet<Blog>().First();
          //Assert
          Assert.AreEqual(_changedInThread, item.Title);
        }
    
        private void UpdateFirstBlog()
        {
          repo2 = RepositoryFactory.Get(Settings.Default.Connection);
          var item = repo2.Set<Blog>().First();
          _changedInThread = "Changed in Thread 2";
          item.Title = _changedInThread;
          repo2.SaveChanges();
        }
      }
    
    }
  2. We then want to add a new C# class named Blog to the BusinessLogic project with the following code:
    using System;
    
    namespace BusinessLogic
    {
      public class Blog
      {
        public int Id { get; set; }
        public DateTime Creationdate { get; set; }
        public string ShortDescription { get; set; }
        public string Title { get; set; }
        public double Rating { get; set; }
      }
    }
  3. Next up will be for us to add a new C# class named BlogMapping to the DataAccess project with the following code:
    using System.ComponentModel.DataAnnotations;
    using System.Data.Entity.ModelConfiguration;
    using BusinessLogic;
    
    namespace DataAccess.Mappings
    {
      public class BlogMapping : EntityTypeConfiguration<Blog>
      {
        public BlogMapping()
        {
          this.ToTable("Blogs");
          this.HasKey(x => x.Id);
          this.Property(x => x.Id).HasDatabaseGeneratedOption(DatabaseGeneratedOption.Identity).HasColumnName("BlogId");
    
          this.Property(x => x.Title).IsRequired().HasMaxLength(250);
          this.Property(x => x.Creationdate).HasColumnName("CreationDate").IsRequired();
          this.Property(x => x.ShortDescription).HasColumnType("Text").IsMaxLength().IsOptional().HasColumnName("Description");
        }
    
      }
    }
  4. We then want to add a new C# class named BlogContext to the DataAccess project with the following code:
    using System;
    using System.Data.Entity;
    using System.Data.Entity.Infrastructure;
    using System.Data.Objects;
    using System.Linq;
    using BusinessLogic;
    using DataAccess.Mappings;
    
    namespace DataAccess
    {
      public class BlogContext : DbContext, IUnitOfWork
      {
        public BlogContext(string connectionString) : base(connectionString)
        {
    
        }
    
        protected override void OnModelCreating(DbModelBuilder modelBuilder)
        {
          modelBuilder.Configurations.Add(new BlogMapping());
          base.OnModelCreating(modelBuilder);
        }
    
        public IQueryable<T> Find<T>() where T : class
        {
          return this.Set<T>();
        }
    
        public IQueryable<T> CurrentFind<T>() where T : class
        {
          var context = (IObjectContextAdapter) this;
          var query = context.ObjectContext.CreateObjectSet<T>();
          query.MergeOption = MergeOption.OverwriteChanges;
          return query;
        }
    
        public void Refresh()
        {
          this.ChangeTracker.Entries().ToList().ForEach(x=>x.Reload());
        }
    
        public void Commit()
        {
          this.SaveChanges();
        }
      }
    }
  5. We now want to add code to the repository, so that it can handle retrieving the current data with the code:
    using System;
    using System.Data.Entity;
    using System.Linq;
    using Logic;
    
    namespace DataAccess
    {
      public class BlogRepository : IBlogRepository
      {
        private readonly IUnitOfWork _context;
    
        public BlogRepository(IUnitOfWork context)
        {
          _context = context;
        }
    
        public IQueryable<T> Set<T>() where T : class
        {
          return _context.Find<T>();
        }
    
        public IQueryable<T> CurrentSet<T>() where T : class
        {
          return _context.CurrentFind<T>();
        }
    
        public void RollbackChanges()
        {
          _context.Refresh();
        }
    
        public void SaveChanges()
        {
          try
          {
            _context.Commit();
          }
          catch (Exception)
          {
            RollbackChanges();
            throw;
          }
    
        }
      }
    }
  6. We then want to add the repository factory into the DataAccess project with the following code:
    using System.Collections.Generic;
    using System.Configuration;
    using System.Threading;
    
    namespace DataAccess
    {
      public class RepositoryFactory
      {
        private static Dictionary<string, IBlogRepository> repositories = new Dictionary<string, IBlogRepository>();
    
        public static IBlogRepository Get(string connectionString)
        {
          var id = new RepositoryIdentifier(Thread.CurrentThread, connectionString);
    
         if(!repositories.ContainsKey(id))
          {
            //This would more than likely not new up the blog //repository but supply it from an IoC implementation.
            repositories.Add(id, new BlogRepository(new BlogContext(connectionString)));
          }
          return repositories[id];
        }
    
        public static void Dispose(string connectionString)
        {
          var id = new RepositoryIdentifier(Thread.CurrentThread, connectionString);
          if (!repositories.ContainsKey(id))
          {
            repositories.Remove(id);
          }
        }
    
        private class RepositoryIdentifier
        {
          private readonly Thread _currentThread;
          private readonly string _connectionString;
    
          public RepositoryIdentifier(Thread currentThread, string connectionString)
          {
            _currentThread = currentThread;
            _connectionString = connectionString;
          }
    
          public override string ToString()
          {
            return _currentThread.ManagedThreadId + _connectionString;
          }
    
          public static implicit operator string(RepositoryIdentifier r)
          {
            return r.ToString();
          }
        }
      }
    }
  7. Run all of our tests, and they should pass.

How it works...

Our test defines the problem that we are trying to solve as we retrieve an object from the database into the context. We then update it from another context, and retrieve it again. We want the most current values possible on the last retrieve.

We add our Blog object that we will be interacting with, the database-specific mappings, and the context, so we can retrieve and update data from the database.

We then add a current Set and find method to the context and the unit of work interface. These methods will allow us to explicitly bypass the entity cache and get the most current data from the database. This is essential for us to allow for the concurrent processing, without the data becoming stale.

There's more...

The validity of the data in the system is the key to making any system function correctly. There are many ways to handle concurrency issues, and the following sections give details on those solutions:

Client wins

The Client wins concurrency conflict resolution says that the data changed by the client application is the best version of the truth and overwrites the changes in the data store to match the data in the client. This can be useful for an administrative tool that needs to fix data problems, but is dangerous if set for all applications.

Store wins

The Store wins concurrency conflict resolution overwrites the data on the client entity with the data on the store. This not only ensures that the client entity is working on the most current data, but also requires the changes to the entity be made again. This may be a good solution for an application with a UI, where the user can redo changes or approve a programmatic redo, but doesn't work in automated service situations.

Custom determined

If neither of the previous two options fit your needs, then you can create your own custom solution. This would be hooked in at the point of the concurrency violation exception in Entity Framework, but you could then handle that in any way that you would like. This takes much more management, and more logic to be coded into the DataAccess layer, but is far less likely to have some of the fit issues of the previous two examples, because it can be tailored to your exact business needs.

See also

In this chapter:

  • Implementing optimistic concurrency
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset