niedziela, 8 lutego 2015

Using Hortonworks Hive in .NET

A few months ago I decided to learn a big data. This sounds very complex and of course it is. All these strange names which actually tells nothing to person who is new in these area combined with different way of looking at data storage makes entire topic even more complex. However after reading N blogs and watching many, many tutorials today I finally had a chance to try to write some code. As in last week I managed to setup a Hortonworks distribution of Hadoop today I decided to connect to it from my .NET based application and this is what I will describe in this post.

First things first I didn`t setup entire Hortonworks ecosystem from scratch - I`d love to but for now it`s far beyond my knowledge thus I decided to use a sandbox environment provided by Hortonworks. There are multiple different VMs available to download but in my case I`ve choose a Hyper-V. More about setting this environment up you can read here.

Picture 1. Up and running sandbox environment.

Now when I have my big data store ready I need to be able to establish a connection to it and start exchanging data. The problem is that Hadoop is not a database rather that that it`s a distributed file system (HDFS) and map reduce engine so  non of these fits my need directly. The tools that I`m looking for is called Hive which is data warehouse infrastructure built on top of Hadoop. Hive provides a SQL like syntax (called HiveQL) which I can use as in the case of normal database. To learn more about Hive syntax, data types and concept I`d stronly recommend this tutorial.

OK, so far so good. I decided that I`m going to use Hive as my warehouse that I will be connecting to from my .NET based application. The next step is to take care about a data provider which is a something like a driver that allow my code to interact with Hive. Of course there is no native .NET provider for Hive but this is where ODBC middle-ware API plays a part. The end-to-end tutorial how to download and setup ODBC drivers for Hortonworks Hive allowed me to set it up pretty easily and fast so I could focus on the last part which is a C# code.

Picture 2. Hortonworks Hive ODBC Driver setup.

For all developers who have at least some experience with ADO.NET or ODBC programming writing code for communicating with Hive should be very straightforward as overall concept as well as classes are exactly the same. First of all I need to have a connection string to my instance of Hive and I can build it very easily in two ways:
  • Just by specifying a predefined (Picture 2.) DNS name: dsn=Hadoop ODBC
  • By specifying all properties directly in the connection string:
    var connectionString = @"DRIVER={Hortonworks Hive ODBC Driver};                      
                                            Host=192.168.56.101;
                                            Port=10000;
                                            Schema=default;
                                            HiveServerType=2;
                                            ApplySSPWithQueries=1;
                                            AsyncExecPollInterval=100;
                                            HS2AuthMech=2;
                                            UserName=sandbox;";

One way or another after setting up connection string the following steps in my code are:
1) Opening a connection to Hive.
2) Creating a simple Hive table called 'Searches' which contains 3 columns plus one partition column called searchTime.
3) Inserting data to 'Searches' table.
4) Retrieving data from 'Searches' table by using a simple Hive SELECT query and OdbcDataReader class.
5) Dropping a 'Searches' table.

The full code for my solution is presented below.

namespace Hadoopclient
{
    using System;
    using System.Data.Odbc;
 
    class Program
    {
        static void Main(string[] args)
        {
            // @"dsn=Hadoop ODBC"
            var connectionString = @"DRIVER={Hortonworks Hive ODBC Driver};                                        
                                        Host=192.168.56.101;
                                        Port=10000;
                                        Schema=default;
                                        HiveServerType=2;
                                        ApplySSPWithQueries=1;
                                        AsyncExecPollInterval=100;
                                        HS2AuthMech=2;
                                        UserName=sandbox;";
 
            var createTableCommandText = "CREATE TABLE Searches(searchTerm STRING, userid BIGINT,userIp STRING) " +
                                            "COMMENT 'Stores all searches for data' " +
                                            "PARTITIONED BY(searchTime DATE) " +
                                            "STORED AS SEQUENCEFILE;";
 
            using (var connection = new OdbcConnection(connectionString))
            {
                using (var command = new OdbcCommand(createTableCommandText, connection))
                {
                    try
                    {
 
                        connection.Open();
 
                        // Create a table.
                        command.ExecuteNonQuery();
 
                        // Insert row of data.
                        command.CommandText = "INSERT INTO TABLE Searches PARTITION (searchTime = '2015-02-08') " +
                                               "VALUES ('search term', 1, '127.0.0.1')";
 
                        command.ExecuteNonQuery();
 
                        // Reading data from Hadoop.
                        command.CommandText = "SELECT * FROM Searches";
                        using (var reader = command.ExecuteReader())
                        {
                            while (reader.Read())
                            {
                                for (var i = 0; i < reader.FieldCount; i++)
                                {
                                    Console.WriteLine(reader[i]);
                                }
                            }
                        }
                    }
                    catch (OdbcException ex)
                    {
                        Console.WriteLine(ex.Message);
                        throw;
                    }
                    finally
                    {
                        // Drop table
                        command.CommandText = "DROP TABLE Searches";
                        command.ExecuteNonQuery();
                    }
                }
            }
        }
    }
}


Thank you.

środa, 3 grudnia 2014

Multithread processing of the SqlDataReader - Producer/Consumer design pattern

In today post I want to describe how to optimize usage of a ADO.NET SqlDataReader class by using multi-threading. To present that lets me introduce a problem that I will try to solve. 

Scenario:
In a project we decided to move all data from a multiple databases to one data warehouse. It will be a good few terabytes of data or even more. Data transfer will be done by using a custom importer program.

Problem:
After implementing a database agnostic logic of generating and executing a query I realized that I can retrieve data from source databases faster that I can upload them to big data store through HTTP client -importer program. In other words, data reader is capable of reading data faster then I can process it an upload to my big data lake.

Solution:
As a solution for solving this problem I would like to propose one of a multi-thread design pattern called Producer/Consumer. In general this pattern consists of a two main classes where:
  • Producer class is responsible for adding a new items to a shared collection
  • Consumer class is responsible for retrieving items from the collection and processing them in a specific way.
There is also a third component in this pattern which allows to share data between both consumer and producer - it`s a thread safe collection. Of course there can be multiple consumers and multiple producers working in the same time concurrently however the most important part is that no matter how many threads have been created all share the same collection. 

Using this solution will help me to speedup my upload process because a producer class will be adding a new records the shared collection and at the same time multiple consumers treads will be reading from it and processing items one by one.


Picture 1. Consumer producer design pattern basic schema.

Implementation:
To implement this design pattern I created a really simple console application and apart from the Program.cs class, which centralizes a program logic, I put there just a few more files. Firstly I implemented a thread-safe collection because as I mentioned it will be a place to share data between instances of producer and consumer(s). As my collection I`ve chosen a BlockinCollection<T> type from the System.Collections.Concurrent namespace as it`s purely designed to be used in the consumer producer design pattern implementation. At the same time it provides both a set of list like functions for adding and retrieving items in a thread-safe way and exposes a design pattern specific elements such a CompleteAdding() function or IsAddingCompleted property (the purpose of this elements is to allow producer class notify one or more consumers that items adding has been finished and consumer thread(s) should be completed). Therefore I put this collection in a new class and wrap it so then my code for my ItemCollection<T> class looks as follow:

   using System;
   using System.Collections.Concurrent;
 
   /// <summary>
   /// An items collection.
   /// </summary>
   /// <typeparam name="T">Type of the item.</typeparam>
   public class ItemCollection<T>
   {
       /// <summary>
       /// The internal collection of items.
       /// </summary>
       private BlockingCollection<T> collection;
 
       /// <summary>
       /// Gets the collection upper bound.
       /// </summary>
       /// <value>
       /// The upper bound.
       /// </value>
       public uint UpperBound { getprivate set; }
 
       /// <summary>
       /// Gets a value indicating whether this adding to the collection has been completed.
       /// </summary>
       public bool IsAddingCompleted
       {
           get
           {
               return this.collection.IsAddingCompleted;
           }
       }
 
       /// <summary>
       /// Initializes a new instance of the <see cref="ItemCollection{T}"/> class.
       /// </summary>
       /// <param name="upperBound">The collection upper bound.</param>
       public ItemCollection(uint upperBound = 25)
       {
           this.UpperBound = upperBound;
           this.collection = new BlockingCollection<T>((int)this.UpperBound);
       }
 
       /// <summary>
       /// Adds the specified item.
       /// </summary>
       /// <param name="item">The item.</param>
       /// <param name="timeoutMiliseconds">The timeout miliseconds.</param>
       /// <returns>Adding result.</returns>
       public bool TryAdd(T item, int timeoutMiliseconds)
       {
           var addResult = this.collection.TryAdd(item, timeoutMiliseconds);
 
           if (!addResult)
           {
               throw new InvalidOperationException("Unable to add item to collection.");
           }
 
           return addResult;
       }
 
       /// <summary>
       /// Try to take an item from collection.
       /// </summary>
       /// <param name="timeoutMiliseconds">The timeout miliseconds.</param>
       /// <returns>An instance of the item.</returns>
       public T TryTake(int timeoutMiliseconds)
       {
           var result = default(T);
 
           if (!this.collection.TryTake(out result, timeoutMiliseconds))
           {
               throw new InvalidOperationException("Unable to get item from collection.");
           }
 
           return result;
       }
 
       /// <summary>
       /// Completes the process of adding.
       /// </summary>
       public void CompleteAdding()
       {
           this.collection.CompleteAdding();
       }
   }

When my collection is ready a next step is to fill it with data. To do that I implemented a producer class which internally starts a new thread and within it in while loop it`s keeps adding items to the collection as long as evaluated producing function returned false. In such case it`s calling the
CompleteAdding() function on ItemCollection<T> class to let consumers know that there will be no more items added to it. I mentioned about a 'producing function' above so just to make it clear in my producer class constructor I expect a Func<T> definition. I done that as I want to keep producer logic generic so it can be use in multiple scenarios - in this case passed function comes from main thread and it`s responsible for retrieving data from a source database.

   /// <summary>
   /// The items producer.
   /// </summary>
   public class Producer<T>
       where T : classnew()
   {
       /// <summary>
       /// The collection.
       /// </summary>
       private readonly ItemCollection<T> collection;
 
       /// <summary>
       /// The producing function.
       /// </summary>
       private readonly Func<T> producingFunction;
 
       /// <summary>
       /// Initializes a new instance of the <see cref="Producer{T}"/> class.
       /// </summary>
       /// <param name="producingFunction">The producing function.</param>
       /// <param name="collection">The collection.</param>
       public Producer(Func<T> producingFunction, ItemCollection<T> collection)
       {
           if (producingFunction == null)
           {
               throw new ArgumentNullException("producingFunction");
           }
 
           if (collection == null)
           {
               throw new ArgumentNullException("collection");
           }
 
           this.collection = collection;
           this.producingFunction = producingFunction;
       }
 
       /// <summary>
       /// Starts producing items.
       /// </summary>
       public void Start()
       {
           Task.Factory.StartNew(() =>
           {
               while (this.Produce())
               {
                   continue;
               }
 
               this.collection.CompleteAdding();
           });
       }
 
       /// <summary>
       /// Produces this item.
       /// </summary>
       /// <returns>True is item has been produced and added to collection.</returns>
       public bool Produce()
       {
           var producingResult = this.producingFunction.Invoke();
           var result = false;
 
           if (producingFunction != default(T))
           {
               result = this.collection.TryAdd(producingResult, (int)TimeSpan.FromSeconds(10).TotalMilliseconds);
           }
 
           return result;
       }
   }



The last part of the design pattern is a consumer. I implemented it similarly to the producer and in the class constructor I expect a definition of an Action<T> which I invoke to consume the item from the shared collection.

/// <summary>
/// A consumer class.
/// </summary>
/// <typeparam name="T">Type of the object to process.</typeparam>
public class Consumer<T>
    where T : classnew()
{
    /// <summary>
    /// The collection.
    /// </summary>
    private readonly ItemCollection<T> collection;
 
    /// <summary>
    /// The consuming function.
    /// </summary>
    private readonly Action<T> consumingAction;
 
    /// <summary>
    /// Initializes a new instance of the <see cref="Consumer{T}"/> class.
    /// </summary>
    /// <param name="collection">The collection.</param>
    /// <param name="consumingFunction">The consuming function.</param>
    public Consumer(ItemCollection<T> collection, Action<T> consumingFunction)
    {
        if (collection == null)
        {
            throw new ArgumentNullException("collection");
        }
 
        if (collection == null)
        {
            throw new ArgumentNullException("consumingFunction");
        }
 
        this.collection = collection;
        this.consumingAction = consumingFunction;
    }
 
    /// <summary>
    /// Consumes this item from collection.
    /// </summary>
    public void Consume()
    {
        var instance = default(T);
 
        while (!this.collection.IsAddingCompleted)
        {
            instance = this.collection.TryTake((int)TimeSpan.FromMinutes(1).TotalMilliseconds);
 
            if (instance != null)
            {
                this.consumingAction.Invoke(instance);
            }
            else
            {
                throw new InvalidOperationException("Unable to get item from collection.");
            }
        }
    }
}

Lastly, in the Program.cs I  put all the logic required to married all pieces together. As you may notice in the code below I put a producing and consuming function definitions in this class body. Moreover instead of using just a single thread for consumer I decided to start multiple - where the number of threads is equal to number of cores on the host.


static void Main(string[] args)
       {
           var itemCollection = new ItemCollection<User>();
           var consumerTasks = new List<Task>();
           var connection = new SqlConnection(args[0]);
 
           connection.Open();
           var dataReader = GetDataReader(connection);
 
           // Producer initialization.
           var producer = new Producer<User>(() =>
           {
               User user = null;
 
               if (dataReader.Read())
               {
                   user = new User()
                   {
                       Id = dataReader.GetInt32(0),
                       Name = dataReader.GetString(1),
                       Surname = dataReader.GetString(2),
                       Email = dataReader.GetString(3)
                   };
               }
 
               return user;
           }, itemCollection);
 
           producer.Start();
 
           // One task perc logical processor.
           Enumerable.Range(0, System.Environment.ProcessorCount)
               .ToList()
               .ForEach(i =>
           {
               var consumer = new Consumer<User>(itemCollection, user =>
               {
                   Program.InsertToBigData(user);
               });
 
               // Start consumption.
               consumerTasks.Add(Task.Factory.StartNew(() => { consumer.Consume(); }));
           });
 
           // Waiting for all tasks to complete.
           Task.WhenAll(consumerTasks.ToArray())
               .ContinueWith((task) =>
                   {
                       if (!dataReader.IsClosed)
                       {
                           dataReader.Close();
                       }
 
                       connection.Close();
                       connection.Dispose();
                   });
       }


Thank you

niedziela, 5 października 2014

Asynchronous WebApi2 HTTP client example

Today I want to cover a very common scenario about how to create a HTTP client for a WebAPI 2 service. To present my implementation I will be using one of a Task<T> extension method that I described recently on my blog.

Let`s start from defining an API service. In this example it udes a REST based WebApi service with the following implementation.

   [AllowAnonymous]
   public class CityController
   {
       /// <summary>
       /// The city repository.
       /// </summary>
       private readonly ICityRepository cityRepository;
 
       /// <summary>
       /// Initializes a new instance of the <see cref="CityController" /> class.
       /// </summary>
       /// <param name="cityRepository">The city repository.</param>
       public CityController(ICityRepository cityRepository)
       {
           this.cityRepository = cityRepository;
       }
 
       /// <summary>
       /// Return a list of cities.
       /// </summary>
       /// <param name="query">The query.</param>
       /// <returns>
       /// A list of <see cref="Users" />.
       /// </returns>
       [HttpGet]
       [Route("api/" + ApiVersion.CurrentVersion + "/cities")]
       public async Task<HttpResponseMessage> Get(string query)
       {
           return await this.cityRepository.AutocompleteCityAsync(query)
               .Continue<IQueryable<City>, HttpResponseMessage>(entities =>
               {
                   var cities = entities.MapCollection<TCity>();
                   return this.CreateListResponse<TCity>(cities);
               });
       }
    }

As you can see there is no any big deal there. I created a CityController class which is decorated with an attribute AllowAnonymous. This will tell a WebApi engine that all functions inside this controller may be accessed by anonymous user - not logged in. Next, the controller class initialization accepts an one parameter of ICityRepository which is nothing else than a simple implementation of a Repository Pattern for Entity Framework. A fact that I passing an instance of this repository as a constructor parameter means that I used in my solution an Inversion of Control (IoC) and Dependency Injection (I recommend here using  Unity as it`s the simplest one as far as I check) - this post doesn`t cover an implementation details for both patterns however I will try to create post about it soon and link it here. Now, coming back to CityController... it has one public function which is a WebApi service call for a GET (decorated by the attribute HttpGet) request for the URL GET/ http://{url}/api/v1/cities?query={query} (Route attribute). This function is also an asynchronous one, which means that it`s returning a Task<T> type which in this case is a Task<HttpResponseMessage>. The body of it uses a task chaining implementation (with my custom Continue<T,R> extension method) to call the AutocompleteCityAsync repository method and map results returned by it to a TCity data transfer object by using the AutoMapper.

using BookingWorld.Common.Interfaces.Entities;
using System.Runtime.Serialization;
 
[DataContract]
public class TCity
{
    public TCity()
    { }
 
    [DataMember]
    public string Code { getset; }
 
    [DataMember]
    public decimal Latitude { getset; }
 
    [DataMember]
    public decimal Longitude { getset; }
 
    [DataMember]
    public decimal? Scoring { getset; }
 
    [DataMember]
    public bool CapitalCity { getset; }
 
    [DataMember]
    public int QueryCount { getset; }
 
    [DataMember]
    public string Name { getset; }
 
    [DataMember]
    public int Id { getset; }
 
    [DataMember]
    public bool Active { getset; }
}


By nature the WebApi2 service serializes all responses objects to JSON format. This is a very important part that developer has to know to implement working client. In this example my GET request will return a serialized collection on TCiti objects. Now I described a service logic so it`s time to start writing more about a client itself.

There is no strict rules how to consume a WebApi service. As it`s build at the top of standard HTTP request methods and uses a JSON format of data it`s completely platform agnostic. It`s means that you can send request to it from non-.NET platform like iOS or Android mobile device and as long as you`re using the same standards you will get an response.  However in this example the client of this API is a MVC5 based website (this really doesn`t matter what is the version, the only this which is important is that it`s based on .NET Framework 4.5.1). First of all let`s introduce a service proxy base class - this is the code of the implementation and later calling a specific API function is just a matter of changing an input parameters.

public class Proxy
    {
        public IDictionary<stringobject> Parameters { getset; }
 
        /// <summary>
        /// The <see cref="HttpClient"/> used for communicating with the API.
        /// </summary>
        private HttpClient httpClient;
 
        public Uri ServiceUrl
        {
            get;
            private set;
        }
 
        public string ControllerName
        {
            get;
            private set;
        }
 
        protected Proxy(string serviceBase, string controllerName)
        {
            this.ServiceUrl = new Uri(serviceBase);
            this.httpClient = new HttpClient();
            this.ControllerName = controllerName;
            this.Parameters = new Dictionary<stringobject>();
        }
 
        /// <summary>
        /// Sends a GET request to the API.
        /// </summary>
        /// <typeparam name="TReturnType">The type of the return type.</typeparam>
        /// <param name="getUri">The GET URI.</param>
        /// <returns>A task which results in a {TReturnType} object.</returns>
        protected async Task<TReturnType> GetAsync<TReturnType>(Uri getUri)
        where TReturnType : classnew()
        {
            return await this.httpClient.GetAsync(getUri)
                .Continue<HttpResponseMessagestring>(
                httpResponseMessage =>
                {
 
                    if (httpResponseMessage.StatusCode.IsServerErrorCode())
                    {
                        throw new Exception(httpResponseMessage.ReasonPhrase);
                    }
 
                    if (httpResponseMessage.StatusCode.IsSuccessCode())
                    {
                        if (typeof(TReturnType) == typeof(string))
                        {
                            return this.httpClient.GetStringAsync(getUri) as Task<string>;
                        }
 
                        return httpResponseMessage.Content.ReadAsStringAsync();
                    }
 
                    return Task.FromResult(string.Empty);
 
                }).Continue<string, TReturnType>(resultJson =>
                {
                    var result = default(TReturnType);
 
                    if (!string.IsNullOrWhiteSpace(resultJson))
                    {
                        result = Newtonsoft.Json.JsonConvert
                                .DeserializeObject<TReturnType>(resultJson);
                    }
 
                    return result;
                });
        }



As you can see my proxy is not a very sophisticated implementation. It contains three very import parts:

1) It uses a HttpClient.GetAsync function which is a .NET native code which will start first asynchronous task.

2) Continue<string,TReturnType> - this is my own implementation of a wrapper around ContinueWith native .NET function which simplify accessing a request results. In that case this function has two generic types: a string (JSON string returned from the API) and TReturnType  type that JSON will be deserialized to.

3)  Object deserialization - this is crucial step when a JSON will be transformed into a TReturnType.

Presented class in a very generic one. It`s not related to any API as it`s configured by the parameters of derived classes (please note that it`s has a protected constructor). Now when we have it implemented it`s a piece of cake to implement a client for a City API.

However before I present an API specific proxy class I want to mention one more thing. In my option it`s obvious thing that most of an API calls passes some parameters to it. In the case the GET request these parameters are part of the query string (part of URL). To centralized implementation of constructing a request URL for a particular API call in the Proxy base class I implemented a function presented below which build a request URL based on the parameters stored in the base class Parameters property of dictionary type.

protected Uri BuildRequestUri()
        {
            var stringBuilder = new StringBuilder(this.ServiceUrl.ToString());
            stringBuilder.Append(this.ControllerName);
 
            if (this.Parameters != null && this.Parameters.Any())
            {
                stringBuilder.Append("?");
                foreach (var parameter in this.Parameters)
                {
                    stringBuilder.AppendFormat("{0}={1}", parameter.Key, parameter.Value);
 
                    if (parameter.Key != this.Parameters.Last().Key)
                    {
                        stringBuilder.Append("&");
                    }
                }
            }
 
            this.Parameters.Clear();
            return new Uri(stringBuilder.ToString());
        }

Finally it`s time to present a API specific version of the client. It`s very tiny, don`t you think? Yes, it`s is because most of the logic I put in the base class already and this derived class is only responsible for passing proper set of parameters to it parent. In that case CityProxy  accepts an API top level URL i.e. http://www.api.com/api/v1/. Also in the constructor it passes a controller specific name - in that case it`s 'cities' as I specified in the Route attribute in service definition. Now, it the GetCities function I can pass query to look-up for a city and next it will be converted to the query string parameter by the BuildRequestUri function from base. Lastly I await a GetAsync function and pass a type of response (List<TCity>)that response JSON will be deserialized to.

   using BookingWorld.Common.Proxy;
   using BookingWorld.Common.Types;
   using System.Collections.Generic;
   using System.Collections.ObjectModel;
   using System.Threading.Tasks;
 
   public class CityProxy : Proxy
   {
       public CityProxy(string baseUrl)
           : base(baseUrl, "cities")
       {
       }
 
       public async Task<List<TCity>> GetCities(string query)
       {
           this.Parameters.Add("query", query);
           var requestUri = this.BuildRequestUri();
 
           return await this.GetAsync<List<TCity>>(requestUri);
       }
   }

For those who are looking for more, I also implemented a POST.

       /// <summary>
       /// Performs an asynchronous POST of the supplied object to the given URI.
       /// </summary>
       /// <typeparam name="TPostObjectType">Type of the object to POST.</typeparam>
       /// <typeparam name="TReturnType">Type of the object to read from the returned POST operation.</typeparam>
       /// <param name="postUri">The POST uri.</param>
       /// <param name="objectToPost">The actual object to POST.</param>
       /// <returns>
       /// A <see cref="Task{TReturnType}" /> from posting the object.
       /// </returns>
       /// <exception cref="System.ArgumentNullException">postUri</exception>
       /// <remarks>
       /// Use this version of POST if the type of object returned in the
       /// POST response is different from the object that was posted.
       /// </remarks>
       protected async Task<TReturnType> PostAsync<TPostObjectType, TReturnType>(
           Uri postUri,
           TPostObjectType objectToPost)
           where TReturnType : class
       {
           if (postUri == null)
           {
               throw new ArgumentNullException("postUri");
           }
 
           var jsonToPost = Newtonsoft.Json.JsonConvert.SerializeObject(objectToPost);
           var content = new StringContent(jsonToPost);
 
           return await this.httpClient.PostAsync(postUri.AbsoluteUri, content)
                .Continue<HttpResponseMessagestring>(
                httpResponseMessage =>
                {
                    if (httpResponseMessage.StatusCode.IsServerErrorCode())
                    {
                        throw new Exception(httpResponseMessage.ReasonPhrase);
                    }
 
 
                    if (httpResponseMessage.StatusCode.IsSuccessCode())
                    {
                        return httpResponseMessage.Content.ReadAsStringAsync();
                    }
 
                    return TaskCreator.Completed<string>(string.Empty);
                })
               .Continue<string, TReturnType>(resultJson =>
                {
                    var result = default(TReturnType);
 
                    if (!string.IsNullOrWhiteSpace(resultJson))
                    {
                        result = Newtonsoft.Json.JsonConvert
                       .DeserializeObject<TReturnType>(resultJson);
                    }
 
                    return result;
                });
       }