This post demonstrates how to set up a very basic message queue using SignalR with SQLite. This could be used to collect data from a measurement system or a client which is not always online but no data messages should be lost or the sequence of the messages.
Code: https://github.com/damienbod/SignalRMessagingBuffering
Only the client contains a spool or buffer. If the client is online, messages are sent directly to the server. When the client is DISCONNECTED, messages are buffered to the SQLite database in a FIFO structure.
The server controls the client messages. No messages are sent until the server requests the spool/buffer. This ensures the sequence of the messages.
Of course this is totally inadequate for a proper data or measurement system. The buffer requires more configuration possibilities, limits etc. For example, what happens if the buffer is full or can the server automatically turn off the spool/buffer.
The server
The server component provides a simple Hub with 2 methods, one for sending the data and another one for requesting the spool.
HubSync:
using System; using System.Threading.Tasks; using Damienbod.SignalR.IHubSync.Client.Dto; using Damienbod.Slab; using Damienbod.Slab.Services; using Microsoft.AspNet.SignalR; namespace Damienbod.SignalR.Host.Hubs { public class HubSync : Hub { private readonly IHubLogger _slabLogger; public HubSync(IHubLogger slabLogger) { _slabLogger = slabLogger; } public void SendSignalRMessageDto(SignalRMessageDto message) { Console.WriteLine("Server Recieved SendSignalRMessageDto " + message.String1 + ", " + message.String2); _slabLogger.Log(HubType.HubServerVerbose, "HubSync Sending SendSignalRMessageDto " + message.String1 + " " + message.String2); Clients.Others.sendSignalRMessageDto(message); } public void RequestSpool() { Console.WriteLine("Server RequestSpool"); _slabLogger.Log(HubType.HubServerVerbose, "Server RequestSpool"); Clients.All.requestSpool(); } public override Task OnConnected() { _slabLogger.Log(HubType.HubServerVerbose, "HubSync OnConnected" + Context.ConnectionId); return (base.OnConnected()); } public override Task OnDisconnected() { _slabLogger.Log(HubType.HubServerVerbose, "HubSync OnDisconnected" + Context.ConnectionId); return (base.OnDisconnected()); } public override Task OnReconnected() { _slabLogger.Log(HubType.HubServerVerbose, "HubSync OnReconnected" + Context.ConnectionId); return (base.OnReconnected()); } } }
SendHubSync:
using Damienbod.SignalR.Host.Hubs; using Damienbod.SignalR.IHubSync.Client; using Damienbod.SignalR.IHubSync.Client.Dto; using Damienbod.Slab; using Damienbod.Slab.Services; using Microsoft.AspNet.SignalR; namespace Damienbod.SignalR.Host.Service { public class SendHubSync : ISendHubSync { private readonly IHubLogger _slabLogger; private readonly IHubContext _hubContext; public SendHubSync(IHubLogger slabLogger) { _slabLogger = slabLogger; _hubContext = GlobalHost.ConnectionManager.GetHubContext<HubSync>(); } public void SendSignalRMessageDto(SignalRMessageDto message) { _hubContext.Clients.All.SendSignalRMessageDto(message); _slabLogger.Log(HubType.HubServerVerbose, "MyHub Sending SendSignalRMessageDto"); } public void RequestSpool() { _hubContext.Clients.All.RequestSpool(); _slabLogger.Log(HubType.HubServerVerbose, "MyHub Sending RequestSpool"); } } }
The startup class provides the start up logic and the input events for the console interface
using System; using System.Diagnostics; using Damienbod.SignalR.Host.Service; using Damienbod.SignalR.Host.Unity; using Damienbod.SignalR.IHubSync.Client; using Damienbod.SignalR.IHubSync.Client.Dto; using Microsoft.AspNet.SignalR; using Microsoft.AspNet.SignalR.Hubs; using Microsoft.Owin.Cors; using Microsoft.Owin.Hosting; using Microsoft.Owin; using Microsoft.Practices.Unity; using Owin; [assembly: OwinStartup(typeof(Damienbod.SignalR.Host.Startup))] namespace Damienbod.SignalR.Host { public class Startup { private static ISendHubSync _myHub; public static void Start() { GlobalHost.DependencyResolver.Register(typeof(IHubActivator), () => new UnityHubActivator(UnityConfiguration.GetConfiguredContainer())); GlobalHost.HubPipeline.AddModule(new LoggingPipelineModule()); GlobalHost.HubPipeline.AddModule(new ErrorHandlingPipelineModule()); var url = MyConfiguration.GetInstance().MyHubServiceUrl(); _myHub = UnityConfiguration.GetConfiguredContainer().Resolve<ISendHubSync>(); using (WebApp.Start(url)) { Console.WriteLine("Server running on {0}", url); Console.WriteLine("----------------------"); Console.WriteLine("H - Help"); Console.WriteLine("S - Send message or add message to spool"); Console.WriteLine("R - request spool"); Console.WriteLine("C - close application"); Console.WriteLine("----------------------"); while (true) { var key = Console.ReadLine(); if (key.ToUpper() == "S") { var message = new SignalRMessageDto { String1 = "String1 from Server", String2 = "String2 from Server" }; _myHub.SendSignalRMessageDto(message); } if (key.ToUpper() == "R") { _myHub.RequestSpool(); } if (key.ToUpper() == "C") { break; } if (key.ToUpper() == "H") { Console.WriteLine("----------------------"); Console.WriteLine("H - Help"); Console.WriteLine("S - Send message or add message to spool"); Console.WriteLine("R - request spool"); Console.WriteLine("C - close application"); Console.WriteLine("----------------------"); } } Console.ReadLine(); } } public void Configuration(IAppBuilder app) { // Branch the pipeline here for requests that start with "/signalr" app.Map("/signalr", map => { // Setup the CORS middleware to run before SignalR. // By default this will allow all origins. You can // configure the set of origins and/or http verbs by // providing a cors options with a different policy. map.UseCors(CorsOptions.AllowAll); var hubConfiguration = new HubConfiguration { EnableDetailedErrors = true // You can enable JSONP by uncommenting line below. // JSONP requests are insecure but some older browsers (and some // versions of IE) require JSONP to work cross domain // EnableJSONP = true }; // Run the SignalR pipeline. We're not using MapSignalR // since this branch already runs under the "/signalr" // path. map.RunSignalR(hubConfiguration); }); } } }
The Hub is initialised using Unity 3 in the start method above. The UnityConfiguration class provides the unity 3 registration.
using System; using System.Collections.Generic; using System.Linq; using System.Reflection; using Damienbod.SignalR.Host.Hubs; using Damienbod.SignalR.IHubSync.Client; using Damienbod.Slab.Services; using Microsoft.AspNet.SignalR; using Microsoft.AspNet.SignalR.Hubs; using Microsoft.Practices.Unity; namespace Damienbod.SignalR.Host.Unity { public class UnityConfiguration { #region Unity Container private static Lazy<IUnityContainer> container = new Lazy<IUnityContainer>(() => { var container = new UnityContainer(); RegisterTypes(container); return container; }); public static IUnityContainer GetConfiguredContainer() { return container.Value; } #endregion public static IEnumerable<Type> GetTypesWithCustomAttribute<T>(Assembly[] assemblies) { foreach (var assembly in assemblies) { foreach (Type type in assembly.GetTypes()) { if (type.GetCustomAttributes(typeof(T), true).Length > 0) { yield return type; } } } } public static void RegisterTypes(IUnityContainer container) { var myAssemblies = AppDomain.CurrentDomain.GetAssemblies().Where(a => a.FullName.StartsWith("Damienbod")).ToArray(); container.RegisterType<IHubLogger, HubLogger>(new ContainerControlledLifetimeManager()); container.RegisterType<ISendHubSync, Service.SendHubSync>(new ContainerControlledLifetimeManager()); // Hub must be transient see signalr docs container.RegisterType<HubSync, HubSync>(new TransientLifetimeManager()); container.RegisterType<Hub, Hub>(new TransientLifetimeManager()); container.RegisterType<IHubActivator, UnityHubActivator>(new ContainerControlledLifetimeManager()); } } }
The Damienbod.SignalR.IHubSync.Client Assembly is used as a shared assembly for the client and the Hub. This is not required but makes it easy to implement a client. Interfaces are provided for the client. These are the possible Hub methods which can be used, or the events which are available. The DTOs are also defined in this assembly. This example has only one data DTO.
namespace Damienbod.SignalR.IHubSync.Client.Dto { public class SignalRMessageDto { public string String1 { get; set; } public string String2 { get; set; } public int Int1 { get; set; } public int Int2 { get; set; } public int Id { get; set; } } }
The client
The client is made up of a simple SignalR client and a SQLite database used for the buffer/spool FIFO. The SQLite database has just one Table; MessageDto.
CREATE TABLE "MessageDto" ( "String1" TEXT NOT NULL DEFAULT none, "String2" TEXT NOT NULL DEFAULT none, "Int1" INTEGER NOT NULL DEFAULT 0, "Int2" INTEGER NOT NULL DEFAULT 0, "Id" INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL )
This database is accessed using the SpoolDataRepository class.
using System; using System.Collections.Generic; using System.Data.SQLite; using System.Linq; using System.Text; using System.Threading.Tasks; using Damienbod.SignalR.IHubSync.Client.Dto; namespace SignalRClientConsole.Spool { public class SpoolDataRepository { static SQLiteConnection _mDbConnection; public SpoolDataRepository() { _mDbConnection = new SQLiteConnection("Data Source=SignalRSpool.sqlite;Version=3;"); _mDbConnection.Open(); } public void AddSignalRMessageDto(SignalRMessageDto message) { var sql = new StringBuilder(); sql.AppendFormat("insert into MessageDto (String1, String2, Int1, Int2) values ('{0}', '{1}', {2}, {3})", message.String1, message.String2, message.Int1, message.Int2); var command = new SQLiteCommand(sql.ToString(), _mDbConnection); command.ExecuteNonQuery(); } public void RemoveSignalRMessageDto(int id) { var sql = new StringBuilder(); sql.AppendFormat("DELETE FROM MessageDto WHERE Id = {0}", id); var command = new SQLiteCommand(sql.ToString(), _mDbConnection); command.ExecuteNonQuery(); } public SignalRMessageDto GetNextSignalRMessageDto() { const string sql = "select * from MessageDto order by Id LIMIT 1"; var command = new SQLiteCommand(sql, _mDbConnection); SQLiteDataReader reader = command.ExecuteReader(); while (reader.Read()) { var messageDto = new SignalRMessageDto { String1 = reader["String1"].ToString(), String2 = reader["String2"].ToString(), Int1 = Convert.ToInt32(reader["Int1"]), Int2 = Convert.ToInt32(reader["Int2"]), Id = Convert.ToInt32(reader["Id"]) }; return messageDto; } return null; } } }
The MyHubClient class is used to set up the SignalR Hub. This class inherits from the BaseHubClient which sets up the standard SignalR settings. The class also implements the shared assembly interfaces.
using System; using Damienbod.SignalR.IHubSync.Client; using Damienbod.SignalR.IHubSync.Client.Dto; using Microsoft.AspNet.SignalR.Client; using SignalRClientConsole.Logging; namespace SignalRClientConsole.HubClients { public class MyHubClient : BaseHubClient, ISendHubSync, IRecieveHubSync { public static bool SendSpool = false; public static int SpoolCount = 0; public MyHubClient() { Init(); } public new void Init() { HubConnectionUrl = "http://localhost:8089/"; HubProxyName = "Hubsync"; HubTraceLevel = TraceLevels.None; HubTraceWriter = Console.Out; base.Init(); _myHubProxy.On<SignalRMessageDto>("SendSignalRMessageDto", Recieve_SendSignalRMessageDto); _myHubProxy.On("RequestSpool", Recieve_RequestSpool); StartHubInternal(); } public override void StartHub() { _hubConnection.Dispose(); Init(); } public void Recieve_SendSignalRMessageDto(SignalRMessageDto message) { Console.WriteLine("Recieved SendSignalRMessageDto " + message.String1 + ", " + message.String2); HubClientEvents.Log.Informational("Recieved SendSignalRMessageDto " + message.String1 + ", " + message.String2); } public void Recieve_RequestSpool() { SendSpool = true; } public void SendSignalRMessageDto(SignalRMessageDto message) { _myHubProxy.Invoke<SignalRMessageDto>("SendSignalRMessageDto", message).ContinueWith(task => { if (task.IsFaulted) { HubClientEvents.Log.Error("There was an error opening the connection:" + task.Exception.GetBaseException()); } }).Wait(); HubClientEvents.Log.Informational("Client SendSignalRMessageDto sent to server"); } public void RequestSpool() { // only used on the server side throw new NotImplementedException(); } } }
The program class contains the user interface logic and a separate thread for spooling if the server requests it.
using System; using System.Timers; using Damienbod.SignalR.IHubSync.Client.Dto; using Microsoft.AspNet.SignalR.Client; using SignalRClientConsole.HubClients; using SignalRClientConsole.Logging; using SignalRClientConsole.Spool; namespace SignalRClientConsole { public class Program { private static Timer _aTimer; private static void OnTimedEvent(object source, ElapsedEventArgs e) { if (MyHubClient.SendSpool) { Console.WriteLine("Items to be spooled: {0}", MyHubClient.SpoolCount); if (myHubClient.State == ConnectionState.Connected) { while (spoolDataRepository.GetNextSignalRMessageDto() != null) { var spoolMessage = spoolDataRepository.GetNextSignalRMessageDto(); myHubClient.SendSignalRMessageDto(spoolMessage); spoolDataRepository.RemoveSignalRMessageDto(spoolMessage.Id); } MyHubClient.SpoolCount = 0; } MyHubClient.SendSpool = false; } } static readonly MyHubClient myHubClient = new MyHubClient(); static readonly SpoolDataRepository spoolDataRepository = new SpoolDataRepository(); static void Main(string[] args) { Console.WriteLine("Starting client http://localhost:8089"); Console.WriteLine("----------------------"); Console.WriteLine("H - Help"); Console.WriteLine("S - Send message or add message to spool"); Console.WriteLine("C - Close hub connection"); Console.WriteLine("N - Start new hub connection"); Console.WriteLine("X - close application"); Console.WriteLine("----------------------"); _aTimer = new Timer(10000); _aTimer.Elapsed += new ElapsedEventHandler(OnTimedEvent); // Set the Interval to 2 seconds (2000 milliseconds). _aTimer.Interval = 2000; _aTimer.Enabled = true; while (true) { string key = Console.ReadLine(); if (key.ToUpper() == "S") { if (myHubClient.State == ConnectionState.Connected && MyHubClient.SpoolCount <= 0) { var message = new SignalRMessageDto {String1 = "clientMessage String1", String2 = " ,String2"}; myHubClient.SendSignalRMessageDto(message); } else { Console.WriteLine(" :no connection or spool not empty, adding message to spool"); spoolDataRepository.AddSignalRMessageDto(new SignalRMessageDto() { String1 = "client message: String1", String2 = " ,String2", Int1 = 3, Int2 = 3 }); HubClientEvents.Log.Warning("Can't send message, connectionState= " + myHubClient.State); MyHubClient.SpoolCount++; } } if (key.ToUpper() == "C") { myHubClient.CloseHub(); Console.WriteLine(" :closing hub if opened"); HubClientEvents.Log.Informational("Closed Hub"); } if (key.ToUpper() == "N") { myHubClient.StartHub(); Console.WriteLine(" :starting a new hub if server exists"); HubClientEvents.Log.Informational("Started the Hub"); } if (key.ToUpper() == "X") { break; } if (key.ToUpper() == "H") { Console.WriteLine("----------------------"); Console.WriteLine("H - Help"); Console.WriteLine("S - Send message or add message to spool"); Console.WriteLine("T - Close hub connection"); Console.WriteLine("Z - Start new hub connection"); Console.WriteLine("C - close application"); Console.WriteLine("----------------------"); } } } } }
When running without a server, the messages are saved to the SQLite database.
Now if the server is connected and the spool is requested, the messages are collected from the database and sent to the server. (only the server and not back to itself.)
Conclusion
This posts shows how simple it is the set up a full duplex point to point IPC system using SignalR. It would of course require much better error handling, thread synchronization, if it were to be used in a production environment. It could also be expanded and used to implement a bus system. The possibilities are endless…
Links:
http://www.asp.net/signalr/overview/signalr-20/hubs-api/handling-connection-lifetime-events
http://system.data.sqlite.org
https://addons.mozilla.org/de/firefox/addon/sqlite-manager/
http://sqliteadmin.orbmu2k.de/
http://blog.tigrangasparian.com/2012/02/09/getting-started-with-sqlite-in-c-part-one/
http://brice-lambson.blogspot.ch/2012/10/entity-framework-on-sqlite.html
http://brice-lambson.blogspot.ch/2013/06/systemdatasqlite-on-entity-framework-6.html
http://www.connectionstrings.com/sqlite/
http://www.codeproject.com/Articles/236918/Using-SQLite-embedded-database-with-entity-framewo
http://www.thomasbelser.net/2009/01/25/c-sharp-und-sqlite-eine-kleine-einfuhrung/
http://cplus.about.com/od/howtodothingsinc/ss/How-To-Use-Sqlite-From-Csharp.htm
http://stackoverflow.com/questions/11591002/how-can-i-use-sqlite-in-a-c-sharp-project
http://schimpf.es/sqlite-vs-mysql/
http://chinookdatabase.codeplex.com/
http://stackoverflow.com/questions/14510096/entity-framework-6-sqlite
http://code.msdn.microsoft.com/windowsapps/Sqlite-For-Windows-8-Metro-2ec7a882