Skip to main content

Multithread processing of the SqlDataReader - Producer/Consumer design pattern

In today post I want to describe how to optimize usage of a ADO.NET SqlDataReader class by using multi-threading. To present that lets me introduce a problem that I will try to solve. 

Scenario:
In a project we decided to move all data from a multiple databases to one data warehouse. It will be a good few terabytes of data or even more. Data transfer will be done by using a custom importer program.

Problem:
After implementing a database agnostic logic of generating and executing a query I realized that I can retrieve data from source databases faster that I can upload them to big data store through HTTP client -importer program. In other words, data reader is capable of reading data faster then I can process it an upload to my big data lake.

Solution:
As a solution for solving this problem I would like to propose one of a multi-thread design pattern called Producer/Consumer. In general this pattern consists of a two main classes where:
  • Producer class is responsible for adding a new items to a shared collection
  • Consumer class is responsible for retrieving items from the collection and processing them in a specific way.
There is also a third component in this pattern which allows to share data between both consumer and producer - it`s a thread safe collection. Of course there can be multiple consumers and multiple producers working in the same time concurrently however the most important part is that no matter how many threads have been created all share the same collection. 

Using this solution will help me to speedup my upload process because a producer class will be adding a new records the shared collection and at the same time multiple consumers treads will be reading from it and processing items one by one.


Picture 1. Consumer producer design pattern basic schema.

Implementation:
To implement this design pattern I created a really simple console application and apart from the Program.cs class, which centralizes a program logic, I put there just a few more files. Firstly I implemented a thread-safe collection because as I mentioned it will be a place to share data between instances of producer and consumer(s). As my collection I`ve chosen a BlockinCollection<T> type from the System.Collections.Concurrent namespace as it`s purely designed to be used in the consumer producer design pattern implementation. At the same time it provides both a set of list like functions for adding and retrieving items in a thread-safe way and exposes a design pattern specific elements such a CompleteAdding() function or IsAddingCompleted property (the purpose of this elements is to allow producer class notify one or more consumers that items adding has been finished and consumer thread(s) should be completed). Therefore I put this collection in a new class and wrap it so then my code for my ItemCollection<T> class looks as follow:

   using System;
   using System.Collections.Concurrent;
 
   /// <summary>
   /// An items collection.
   /// </summary>
   /// <typeparam name="T">Type of the item.</typeparam>
   public class ItemCollection<T>
   {
       /// <summary>
       /// The internal collection of items.
       /// </summary>
       private BlockingCollection<T> collection;
 
       /// <summary>
       /// Gets the collection upper bound.
       /// </summary>
       /// <value>
       /// The upper bound.
       /// </value>
       public uint UpperBound { getprivate set; }
 
       /// <summary>
       /// Gets a value indicating whether this adding to the collection has been completed.
       /// </summary>
       public bool IsAddingCompleted
       {
           get
           {
               return this.collection.IsAddingCompleted;
           }
       }
 
       /// <summary>
       /// Initializes a new instance of the <see cref="ItemCollection{T}"/> class.
       /// </summary>
       /// <param name="upperBound">The collection upper bound.</param>
       public ItemCollection(uint upperBound = 25)
       {
           this.UpperBound = upperBound;
           this.collection = new BlockingCollection<T>((int)this.UpperBound);
       }
 
       /// <summary>
       /// Adds the specified item.
       /// </summary>
       /// <param name="item">The item.</param>
       /// <param name="timeoutMiliseconds">The timeout miliseconds.</param>
       /// <returns>Adding result.</returns>
       public bool TryAdd(T item, int timeoutMiliseconds)
       {
           var addResult = this.collection.TryAdd(item, timeoutMiliseconds);
 
           if (!addResult)
           {
               throw new InvalidOperationException("Unable to add item to collection.");
           }
 
           return addResult;
       }
 
       /// <summary>
       /// Try to take an item from collection.
       /// </summary>
       /// <param name="timeoutMiliseconds">The timeout miliseconds.</param>
       /// <returns>An instance of the item.</returns>
       public T TryTake(int timeoutMiliseconds)
       {
           var result = default(T);
 
           if (!this.collection.TryTake(out result, timeoutMiliseconds))
           {
               throw new InvalidOperationException("Unable to get item from collection.");
           }
 
           return result;
       }
 
       /// <summary>
       /// Completes the process of adding.
       /// </summary>
       public void CompleteAdding()
       {
           this.collection.CompleteAdding();
       }
   }

When my collection is ready a next step is to fill it with data. To do that I implemented a producer class which internally starts a new thread and within it in while loop it`s keeps adding items to the collection as long as evaluated producing function returned false. In such case it`s calling the
CompleteAdding() function on ItemCollection<T> class to let consumers know that there will be no more items added to it. I mentioned about a 'producing function' above so just to make it clear in my producer class constructor I expect a Func<T> definition. I done that as I want to keep producer logic generic so it can be use in multiple scenarios - in this case passed function comes from main thread and it`s responsible for retrieving data from a source database.

   /// <summary>
   /// The items producer.
   /// </summary>
   public class Producer<T>
       where T : classnew()
   {
       /// <summary>
       /// The collection.
       /// </summary>
       private readonly ItemCollection<T> collection;
 
       /// <summary>
       /// The producing function.
       /// </summary>
       private readonly Func<T> producingFunction;
 
       /// <summary>
       /// Initializes a new instance of the <see cref="Producer{T}"/> class.
       /// </summary>
       /// <param name="producingFunction">The producing function.</param>
       /// <param name="collection">The collection.</param>
       public Producer(Func<T> producingFunction, ItemCollection<T> collection)
       {
           if (producingFunction == null)
           {
               throw new ArgumentNullException("producingFunction");
           }
 
           if (collection == null)
           {
               throw new ArgumentNullException("collection");
           }
 
           this.collection = collection;
           this.producingFunction = producingFunction;
       }
 
       /// <summary>
       /// Starts producing items.
       /// </summary>
       public void Start()
       {
           Task.Factory.StartNew(() =>
           {
               while (this.Produce())
               {
                   continue;
               }
 
               this.collection.CompleteAdding();
           });
       }
 
       /// <summary>
       /// Produces this item.
       /// </summary>
       /// <returns>True is item has been produced and added to collection.</returns>
       public bool Produce()
       {
           var producingResult = this.producingFunction.Invoke();
           var result = false;
 
           if (producingFunction != default(T))
           {
               result = this.collection.TryAdd(producingResult, (int)TimeSpan.FromSeconds(10).TotalMilliseconds);
           }
 
           return result;
       }
   }



The last part of the design pattern is a consumer. I implemented it similarly to the producer and in the class constructor I expect a definition of an Action<T> which I invoke to consume the item from the shared collection.

/// <summary>
/// A consumer class.
/// </summary>
/// <typeparam name="T">Type of the object to process.</typeparam>
public class Consumer<T>
    where T : classnew()
{
    /// <summary>
    /// The collection.
    /// </summary>
    private readonly ItemCollection<T> collection;
 
    /// <summary>
    /// The consuming function.
    /// </summary>
    private readonly Action<T> consumingAction;
 
    /// <summary>
    /// Initializes a new instance of the <see cref="Consumer{T}"/> class.
    /// </summary>
    /// <param name="collection">The collection.</param>
    /// <param name="consumingFunction">The consuming function.</param>
    public Consumer(ItemCollection<T> collection, Action<T> consumingFunction)
    {
        if (collection == null)
        {
            throw new ArgumentNullException("collection");
        }
 
        if (collection == null)
        {
            throw new ArgumentNullException("consumingFunction");
        }
 
        this.collection = collection;
        this.consumingAction = consumingFunction;
    }
 
    /// <summary>
    /// Consumes this item from collection.
    /// </summary>
    public void Consume()
    {
        var instance = default(T);
 
        while (!this.collection.IsAddingCompleted)
        {
            instance = this.collection.TryTake((int)TimeSpan.FromMinutes(1).TotalMilliseconds);
 
            if (instance != null)
            {
                this.consumingAction.Invoke(instance);
            }
            else
            {
                throw new InvalidOperationException("Unable to get item from collection.");
            }
        }
    }
}

Lastly, in the Program.cs I  put all the logic required to married all pieces together. As you may notice in the code below I put a producing and consuming function definitions in this class body. Moreover instead of using just a single thread for consumer I decided to start multiple - where the number of threads is equal to number of cores on the host.


static void Main(string[] args)
       {
           var itemCollection = new ItemCollection<User>();
           var consumerTasks = new List<Task>();
           var connection = new SqlConnection(args[0]);
 
           connection.Open();
           var dataReader = GetDataReader(connection);
 
           // Producer initialization.
           var producer = new Producer<User>(() =>
           {
               User user = null;
 
               if (dataReader.Read())
               {
                   user = new User()
                   {
                       Id = dataReader.GetInt32(0),
                       Name = dataReader.GetString(1),
                       Surname = dataReader.GetString(2),
                       Email = dataReader.GetString(3)
                   };
               }
 
               return user;
           }, itemCollection);
 
           producer.Start();
 
           // One task perc logical processor.
           Enumerable.Range(0, System.Environment.ProcessorCount)
               .ToList()
               .ForEach(i =>
           {
               var consumer = new Consumer<User>(itemCollection, user =>
               {
                   Program.InsertToBigData(user);
               });
 
               // Start consumption.
               consumerTasks.Add(Task.Factory.StartNew(() => { consumer.Consume(); }));
           });
 
           // Waiting for all tasks to complete.
           Task.WhenAll(consumerTasks.ToArray())
               .ContinueWith((task) =>
                   {
                       if (!dataReader.IsClosed)
                       {
                           dataReader.Close();
                       }
 
                       connection.Close();
                       connection.Dispose();
                   });
       }


Thank you

Popular posts from this blog

Persisting Enum in database with Entity Framework

Problem statement We all want to write clean code and follow best coding practices. This all engineers 'North Star' goal which in many cases can not be easily achievable because of many potential difficulties with converting our ideas/good practices into working solutions.  One of an example I recently came across was about using ASP.NET Core and Entity Framework 5 to store Enum values in a relational database (like Azure SQL). Why is this a problem you might ask... and my answer here is that you want to work with Enum types in your code but persist an integer in your databases. You can think about in that way. Why we use data types at all when everything could be just a string which is getting converted into a desirable type when needed. This 'all-string' approach is of course a huge anti-pattern and a bad practice for many reasons with few being: degraded performance, increased storage space, increased code duplication.  Pre-requirements 1. Status enum type definition...

Using Newtonsoft serializer in CosmosDB client

Problem In some scenarios engineers might want to use a custom JSON serializer for documents stored in CosmosDB.  Solution In CosmosDBV3 .NET Core API, when creating an instance of  CosmosClient one of optional setting in  CosmosClientOptions is to specify an instance of a Serializer . This serializer must be JSON based and be of  CosmosSerializer type. This means that if a custom serializer is needed this should inherit from CosmosSerializer abstract class and override its two methods for serializing and deserializing of an object. The challenge is that both methods from  CosmosSerializer are stream based and therefore might be not as easy to implement as engineers used to assume - still not super complex.  For demonstration purpose as or my custom serializer I'm going to use Netwonsoft.JSON library. Firstly a new type is needed and this must inherit from  CosmosSerializer.  using  Microsoft.Azure.Cosmos; using  Newtonsoft.Json; usin...