SignalR Message Buffers with SQLite

This post demonstrates how to set up a very basic message queue using SignalR with SQLite. This could be used to collect data from a measurement system or a client which is not always online but no data messages should be lost or the sequence of the messages.

Code: https://github.com/damienbod/SignalRMessagingBuffering

Only the client contains a spool or buffer. If the client is online, messages are sent directly to the server. When the client is DISCONNECTED, messages are buffered to the SQLite database in a FIFO structure.

SignalRSqlite01

The server controls the client messages. No messages are sent until the server requests the spool/buffer. This ensures the sequence of the messages.

SignalRSqlite02

Of course this is totally inadequate for a proper data or measurement system. The buffer requires more configuration possibilities, limits etc. For example, what happens if the buffer is full or can the server automatically turn off the spool/buffer.

The server

The server component provides a simple Hub with 2 methods, one for sending the data and another one for requesting the spool.

HubSync:

using System;
using System.Threading.Tasks;
using Damienbod.SignalR.IHubSync.Client.Dto;
using Damienbod.Slab;
using Damienbod.Slab.Services;
using Microsoft.AspNet.SignalR;

namespace Damienbod.SignalR.Host.Hubs
{
    public class HubSync : Hub
    {
        private readonly IHubLogger _slabLogger;

        public HubSync(IHubLogger slabLogger)
        {
            _slabLogger = slabLogger;
        }

        public void SendSignalRMessageDto(SignalRMessageDto message)
        {
            Console.WriteLine("Server Recieved SendSignalRMessageDto " + message.String1 + ", " + message.String2);
            _slabLogger.Log(HubType.HubServerVerbose, "HubSync Sending SendSignalRMessageDto " + message.String1 + " " + message.String2);
            Clients.Others.sendSignalRMessageDto(message);
        }

        public void RequestSpool()
        {
            Console.WriteLine("Server RequestSpool");
            _slabLogger.Log(HubType.HubServerVerbose, "Server RequestSpool");
            Clients.All.requestSpool();
        }

        public override Task OnConnected()
        {
            _slabLogger.Log(HubType.HubServerVerbose, "HubSync OnConnected" + Context.ConnectionId);
            return (base.OnConnected());
        }

        public override Task OnDisconnected()
        {
            _slabLogger.Log(HubType.HubServerVerbose, "HubSync OnDisconnected" + Context.ConnectionId);
            return (base.OnDisconnected());
        }

        public override Task OnReconnected()
        {
            _slabLogger.Log(HubType.HubServerVerbose, "HubSync OnReconnected" + Context.ConnectionId);
            return (base.OnReconnected());
        }
    }
}

SendHubSync:


using Damienbod.SignalR.Host.Hubs;
using Damienbod.SignalR.IHubSync.Client;
using Damienbod.SignalR.IHubSync.Client.Dto;
using Damienbod.Slab;
using Damienbod.Slab.Services;
using Microsoft.AspNet.SignalR;

namespace Damienbod.SignalR.Host.Service
{
    public class SendHubSync : ISendHubSync
    {
        private readonly IHubLogger _slabLogger;
        private readonly IHubContext _hubContext;

        public SendHubSync(IHubLogger slabLogger)
        {
            _slabLogger = slabLogger;
            _hubContext = GlobalHost.ConnectionManager.GetHubContext<HubSync>(); 
        }

        public void SendSignalRMessageDto(SignalRMessageDto message)
        {
            _hubContext.Clients.All.SendSignalRMessageDto(message);
            _slabLogger.Log(HubType.HubServerVerbose, "MyHub Sending SendSignalRMessageDto");
        }

        public void RequestSpool()
        {
            _hubContext.Clients.All.RequestSpool();
            _slabLogger.Log(HubType.HubServerVerbose, "MyHub Sending RequestSpool");
        }
    }
}

The startup class provides the start up logic and the input events for the console interface

using System;
using System.Diagnostics;
using Damienbod.SignalR.Host.Service;
using Damienbod.SignalR.Host.Unity;
using Damienbod.SignalR.IHubSync.Client;
using Damienbod.SignalR.IHubSync.Client.Dto;
using Microsoft.AspNet.SignalR;
using Microsoft.AspNet.SignalR.Hubs;
using Microsoft.Owin.Cors;
using Microsoft.Owin.Hosting;
using Microsoft.Owin;
using Microsoft.Practices.Unity;
using Owin;

[assembly: OwinStartup(typeof(Damienbod.SignalR.Host.Startup))]
namespace Damienbod.SignalR.Host
{
    public class Startup
    {
        private static ISendHubSync _myHub;

        public static void Start()
        {
            GlobalHost.DependencyResolver.Register(typeof(IHubActivator), () => new UnityHubActivator(UnityConfiguration.GetConfiguredContainer())); 
            GlobalHost.HubPipeline.AddModule(new LoggingPipelineModule());
            GlobalHost.HubPipeline.AddModule(new ErrorHandlingPipelineModule());
            
            var url = MyConfiguration.GetInstance().MyHubServiceUrl();
            _myHub = UnityConfiguration.GetConfiguredContainer().Resolve<ISendHubSync>();
            
            using (WebApp.Start(url))
            {
                Console.WriteLine("Server running on {0}", url);
                Console.WriteLine("----------------------");
                Console.WriteLine("H - Help");
                Console.WriteLine("S - Send message or add message to spool");
                Console.WriteLine("R - request spool");
                Console.WriteLine("C - close application");
                Console.WriteLine("----------------------");
                while (true)
                {
                    var key = Console.ReadLine();
                    if (key.ToUpper() == "S")
                    {
                        var message = new SignalRMessageDto
                        {
                            String1 = "String1 from Server",
                            String2 = "String2 from Server"
                        };
                        _myHub.SendSignalRMessageDto(message);
                    }
                    if (key.ToUpper() == "R")
                    {
                        _myHub.RequestSpool();
                    }
                    if (key.ToUpper() == "C")
                    {
                        break;
                    }
                    if (key.ToUpper() == "H")
                    {
                        Console.WriteLine("----------------------");
                        Console.WriteLine("H - Help");
                        Console.WriteLine("S - Send message or add message to spool");
                        Console.WriteLine("R - request spool");
                        Console.WriteLine("C - close application");
                        Console.WriteLine("----------------------");
                    }
                }

                Console.ReadLine();
            }
        }

        public void Configuration(IAppBuilder app)
        {
            // Branch the pipeline here for requests that start with "/signalr"
            app.Map("/signalr", map =>
            {
                // Setup the CORS middleware to run before SignalR.
                // By default this will allow all origins. You can 
                // configure the set of origins and/or http verbs by
                // providing a cors options with a different policy.
                map.UseCors(CorsOptions.AllowAll);
                var hubConfiguration = new HubConfiguration
                {
                    EnableDetailedErrors = true 
                    // You can enable JSONP by uncommenting line below.
                    // JSONP requests are insecure but some older browsers (and some
                    // versions of IE) require JSONP to work cross domain
                    // EnableJSONP = true
                };
                // Run the SignalR pipeline. We're not using MapSignalR
                // since this branch already runs under the "/signalr"
                // path.

                map.RunSignalR(hubConfiguration);
            });
        }
    }
}

The Hub is initialised using Unity 3 in the start method above. The UnityConfiguration class provides the unity 3 registration.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using Damienbod.SignalR.Host.Hubs;
using Damienbod.SignalR.IHubSync.Client;
using Damienbod.Slab.Services;
using Microsoft.AspNet.SignalR;
using Microsoft.AspNet.SignalR.Hubs;
using Microsoft.Practices.Unity;

namespace Damienbod.SignalR.Host.Unity
{
    public class UnityConfiguration
    {
        #region Unity Container
        private static Lazy<IUnityContainer> container = new Lazy<IUnityContainer>(() =>
        {
            var container = new UnityContainer();
            RegisterTypes(container);
            return container;
        });

        public static IUnityContainer GetConfiguredContainer()
        {
            return container.Value;
        }
        #endregion

        public static IEnumerable<Type> GetTypesWithCustomAttribute<T>(Assembly[] assemblies)
        {
            foreach (var assembly in assemblies)
            {
                foreach (Type type in assembly.GetTypes())
                {
                    if (type.GetCustomAttributes(typeof(T), true).Length > 0)
                    {
                        yield return type;
                    }
                }
            }
        }

        public static void RegisterTypes(IUnityContainer container)
        {
            var myAssemblies = AppDomain.CurrentDomain.GetAssemblies().Where(a => a.FullName.StartsWith("Damienbod")).ToArray();

            container.RegisterType<IHubLogger, HubLogger>(new ContainerControlledLifetimeManager());
            container.RegisterType<ISendHubSync, Service.SendHubSync>(new ContainerControlledLifetimeManager());

            // Hub must be transient see signalr docs
            container.RegisterType<HubSync, HubSync>(new TransientLifetimeManager());
            container.RegisterType<Hub, Hub>(new TransientLifetimeManager());
            container.RegisterType<IHubActivator, UnityHubActivator>(new ContainerControlledLifetimeManager());
        }
    }
}

The Damienbod.SignalR.IHubSync.Client Assembly is used as a shared assembly for the client and the Hub. This is not required but makes it easy to implement a client. Interfaces are provided for the client. These are the possible Hub methods which can be used, or the events which are available. The DTOs are also defined in this assembly. This example has only one data DTO.

namespace Damienbod.SignalR.IHubSync.Client.Dto
{
    public class SignalRMessageDto
    {
        public string String1 { get; set; }

        public string String2 { get; set; }

        public int Int1 { get; set; }

        public int Int2 { get; set; }

        public int Id { get; set; }
    }
}

The client

The client is made up of a simple SignalR client and a SQLite database used for the buffer/spool FIFO. The SQLite database has just one Table; MessageDto.

CREATE TABLE "MessageDto" (
"String1" TEXT NOT NULL  DEFAULT none, 
"String2" TEXT NOT NULL  DEFAULT none, 
"Int1" INTEGER NOT NULL  DEFAULT 0, 
"Int2" INTEGER NOT NULL  DEFAULT 0, 
"Id" INTEGER PRIMARY KEY  AUTOINCREMENT  NOT NULL 
)

This database is accessed using the SpoolDataRepository class.

using System;
using System.Collections.Generic;
using System.Data.SQLite;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Damienbod.SignalR.IHubSync.Client.Dto;

namespace SignalRClientConsole.Spool
{
    public class SpoolDataRepository
    {
        static SQLiteConnection _mDbConnection;

        public SpoolDataRepository()
        {
            _mDbConnection = new SQLiteConnection("Data Source=SignalRSpool.sqlite;Version=3;");
            _mDbConnection.Open();
        }

        public void AddSignalRMessageDto(SignalRMessageDto message)
        {
            var sql = new StringBuilder();
            sql.AppendFormat("insert into MessageDto (String1, String2, Int1, Int2) values ('{0}', '{1}', {2}, {3})",
                message.String1,
                message.String2,
                message.Int1,
                message.Int2);

            var command = new SQLiteCommand(sql.ToString(), _mDbConnection);
            command.ExecuteNonQuery();
        }

        public void RemoveSignalRMessageDto(int id)
        {
            var sql = new StringBuilder();
            sql.AppendFormat("DELETE FROM MessageDto WHERE Id = {0}",  id);
            var command = new SQLiteCommand(sql.ToString(), _mDbConnection);
            command.ExecuteNonQuery();
        }

        public SignalRMessageDto GetNextSignalRMessageDto()
        {
            const string sql = "select * from MessageDto order by Id LIMIT 1";
            var command = new SQLiteCommand(sql, _mDbConnection);
            SQLiteDataReader reader = command.ExecuteReader();
            while (reader.Read())
            {
                var messageDto = new SignalRMessageDto
                {
                    String1 = reader["String1"].ToString(),
                    String2 = reader["String2"].ToString(),
                    Int1 = Convert.ToInt32(reader["Int1"]),
                    Int2 = Convert.ToInt32(reader["Int2"]),
                    Id = Convert.ToInt32(reader["Id"])
                };

                return messageDto;
            }

            return null;
        }
    }
}

The MyHubClient class is used to set up the SignalR Hub. This class inherits from the BaseHubClient which sets up the standard SignalR settings. The class also implements the shared assembly interfaces.

using System;
using Damienbod.SignalR.IHubSync.Client;
using Damienbod.SignalR.IHubSync.Client.Dto;
using Microsoft.AspNet.SignalR.Client;
using SignalRClientConsole.Logging;

namespace SignalRClientConsole.HubClients
{
    public class MyHubClient : BaseHubClient, ISendHubSync, IRecieveHubSync
    {
        public static bool SendSpool = false;
        public static int SpoolCount = 0;

        public MyHubClient()
        {
            Init();
        }

        public new void Init()
        {
            HubConnectionUrl = "http://localhost:8089/";
            HubProxyName = "Hubsync";
            HubTraceLevel = TraceLevels.None;
            HubTraceWriter = Console.Out;

            base.Init();

            _myHubProxy.On<SignalRMessageDto>("SendSignalRMessageDto", Recieve_SendSignalRMessageDto);
            _myHubProxy.On("RequestSpool", Recieve_RequestSpool);

            StartHubInternal();
        }

        public override void StartHub()
        {
            _hubConnection.Dispose();
            Init();
        }

        public void Recieve_SendSignalRMessageDto(SignalRMessageDto message)
        {
            Console.WriteLine("Recieved SendSignalRMessageDto " + message.String1 + ", " + message.String2);
            HubClientEvents.Log.Informational("Recieved SendSignalRMessageDto " + message.String1 + ", " + message.String2);
        }

        public void Recieve_RequestSpool()
        {
            SendSpool = true;                       
        }

        public void SendSignalRMessageDto(SignalRMessageDto message)
        {
            _myHubProxy.Invoke<SignalRMessageDto>("SendSignalRMessageDto", message).ContinueWith(task =>
            {
                if (task.IsFaulted)
                {
                    HubClientEvents.Log.Error("There was an error opening the connection:" + task.Exception.GetBaseException());
                }

            }).Wait();
            HubClientEvents.Log.Informational("Client SendSignalRMessageDto sent to server");
        }

        public void RequestSpool()
        {
            // only used on the server side
            throw new NotImplementedException();
        }
    }
}

The program class contains the user interface logic and a separate thread for spooling if the server requests it.

using System;
using System.Timers;
using Damienbod.SignalR.IHubSync.Client.Dto;
using Microsoft.AspNet.SignalR.Client;
using SignalRClientConsole.HubClients;
using SignalRClientConsole.Logging;
using SignalRClientConsole.Spool;

namespace SignalRClientConsole
{
    public class Program
    {
        private static Timer _aTimer;
        private static void OnTimedEvent(object source, ElapsedEventArgs e)
        {
            if (MyHubClient.SendSpool)
            {
                Console.WriteLine("Items to be spooled: {0}", MyHubClient.SpoolCount);
                if (myHubClient.State == ConnectionState.Connected)
                {
                    while (spoolDataRepository.GetNextSignalRMessageDto() != null)
                    {
                        var spoolMessage = spoolDataRepository.GetNextSignalRMessageDto();
                        myHubClient.SendSignalRMessageDto(spoolMessage);
                        spoolDataRepository.RemoveSignalRMessageDto(spoolMessage.Id);
                        
                    }

                    MyHubClient.SpoolCount = 0;
                }

                MyHubClient.SendSpool = false;
            }
        }

        static readonly MyHubClient myHubClient = new MyHubClient();
        static readonly SpoolDataRepository spoolDataRepository = new SpoolDataRepository();

        static void Main(string[] args)
        {
            Console.WriteLine("Starting client  http://localhost:8089");
            Console.WriteLine("----------------------");
            Console.WriteLine("H - Help");
            Console.WriteLine("S - Send message or add message to spool");
            Console.WriteLine("C - Close hub connection");
            Console.WriteLine("N - Start new hub connection");
            Console.WriteLine("X - close application");
            Console.WriteLine("----------------------");
            
            _aTimer = new Timer(10000);
            _aTimer.Elapsed += new ElapsedEventHandler(OnTimedEvent);

            // Set the Interval to 2 seconds (2000 milliseconds).
            _aTimer.Interval = 2000;
            _aTimer.Enabled = true;

            while (true)
            {
                string key = Console.ReadLine();

                if (key.ToUpper() == "S")
                {
                    if (myHubClient.State == ConnectionState.Connected && MyHubClient.SpoolCount <= 0)
                    {
                        var message = new SignalRMessageDto {String1 = "clientMessage String1", String2 = " ,String2"};
                        myHubClient.SendSignalRMessageDto(message);
                    }
                    else
                    {
                        Console.WriteLine(" :no connection or spool not empty, adding message to spool");
                        spoolDataRepository.AddSignalRMessageDto(new SignalRMessageDto()
                        {
                            String1 = "client message: String1",
                            String2 = " ,String2",
                            Int1 = 3,
                            Int2 = 3
                        });
                        HubClientEvents.Log.Warning("Can't send message, connectionState= " + myHubClient.State);
                        MyHubClient.SpoolCount++;
                    }

                }
                if (key.ToUpper() == "C")
                {
                    myHubClient.CloseHub();
                    Console.WriteLine(" :closing hub if opened");
                    HubClientEvents.Log.Informational("Closed Hub");
                }
                if (key.ToUpper() == "N")
                {
                    myHubClient.StartHub();
                    Console.WriteLine(" :starting a new  hub if server exists");
                    HubClientEvents.Log.Informational("Started the Hub");
                }
                if (key.ToUpper() == "X")
                {
                    break;
                }
                if (key.ToUpper() == "H")
                {
                    Console.WriteLine("----------------------");
                    Console.WriteLine("H - Help");
                    Console.WriteLine("S - Send message or add message to spool");
                    Console.WriteLine("T - Close hub connection");
                    Console.WriteLine("Z - Start new hub connection");
                    Console.WriteLine("C - close application");
                    Console.WriteLine("----------------------");
                }
            }
        }
    }
}

When running without a server, the messages are saved to the SQLite database.

SignalRSqlite03

Now if the server is connected and the spool is requested, the messages are collected from the database and sent to the server. (only the server and not back to itself.)

SignalRSqlite04

Conclusion
This posts shows how simple it is the set up a full duplex point to point IPC system using SignalR. It would of course require much better error handling, thread synchronization, if it were to be used in a production environment. It could also be expanded and used to implement a bus system. The possibilities are endless…

Links:

http://www.asp.net/signalr

http://www.asp.net/signalr/overview/signalr-20/getting-started-with-signalr-20/introduction-to-signalr

http://signalr.net

http://www.asp.net/signalr/overview/signalr-20/hubs-api/handling-connection-lifetime-events
http://system.data.sqlite.org

https://addons.mozilla.org/de/firefox/addon/sqlite-manager/

http://sqliteadmin.orbmu2k.de/

http://blog.tigrangasparian.com/2012/02/09/getting-started-with-sqlite-in-c-part-one/

http://brice-lambson.blogspot.ch/2012/10/entity-framework-on-sqlite.html
http://brice-lambson.blogspot.ch/2013/06/systemdatasqlite-on-entity-framework-6.html

http://www.connectionstrings.com/sqlite/

http://www.codeproject.com/Articles/236918/Using-SQLite-embedded-database-with-entity-framewo

http://stackoverflow.com/questions/2514785/how-to-create-an-entity-framework-model-from-an-existing-sqlite-database-in-visu

http://www.thomasbelser.net/2009/01/25/c-sharp-und-sqlite-eine-kleine-einfuhrung/

http://cplus.about.com/od/howtodothingsinc/ss/How-To-Use-Sqlite-From-Csharp.htm

http://stackoverflow.com/questions/11591002/how-can-i-use-sqlite-in-a-c-sharp-project

http://schimpf.es/sqlite-vs-mysql/

http://chinookdatabase.codeplex.com/

http://stackoverflow.com/questions/14510096/entity-framework-6-sqlite

http://blogs.msdn.com/b/mim/archive/2013/06/18/sync-framework-with-sqlite-for-windows-store-apps-winrt-and-windows-phone-8.aspx

http://code.msdn.microsoft.com/windowsapps/Sqlite-For-Windows-8-Metro-2ec7a882

http://blogs.msdn.com/b/andy_wigley/archive/2013/06/06/sqlite-winrt-database-programming-on-windows-phone-and-windows-8.aspx

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

This site uses Akismet to reduce spam. Learn how your comment data is processed.

%d bloggers like this: