CSV export using Elasticsearch and Web API

This article demonstrates how to export data from Elasticsearch to a CSV file using Web API. The data is retrieved from Elasticsearch using _search with scan and scroll. This API can retrieve data very fast without any sorting. The data is then exported to a CSV file using WebApiContrib.Formatting.Xlsx from Jordan Gray. The progress of the export is displayed in a HTML page using SignalR (MVC razor view).

Code: https://github.com/damienbod/WebApiCSVExportFromElasticsearch

Other Tutorials:

Part 1: ElasticsearchCRUD introduction
Part 2: MVC application search with simple documents using autocomplete, jQuery and jTable
Part 3: MVC Elasticsearch CRUD with nested documents
Part 4: Data Transfer from MS SQL Server using Entity Framework to Elasticsearch
Part 5: MVC Elasticsearch with child, parent documents
Part 6: MVC application with Entity Framework and Elasticsearch
Part 7: Live Reindex in Elasticsearch
Part 8: CSV export using Elasticsearch and Web API
Part 9: Elasticsearch Parent, Child, Grandchild Documents and Routing
Part 10: Elasticsearch Type mappings with ElasticsearchCRUD
Part 11: Elasticsearch Synonym Analyzer using ElasticsearchCRUD
Part 12: Using Elasticsearch German Analyzer
Part 13: MVC google maps search using Elasticsearch
Part 14: Search Queries and Filters with ElasticsearchCRUD
Part 15: Elasticsearch Bulk Insert
Part 16: Elasticsearch Aggregations With ElasticsearchCRUD
Part 17: Searching Multiple Indices and Types in Elasticsearch
Part 18: MVC searching with Elasticsearch Highlighting
Part 19: Index Warmers with ElasticsearchCRUD

Setup
The export uses the persons index created in the previous post. This index persons_v2 is accessed using the alias persons. Because the index has almost no data, about 20000 records, the export can be exported as a single CSV file or in a single chunk.

The Person class is used to retrieve the data from Elasticsearch and also export the data to the CSV file. The different attributes are added as required.

public class Person
{
	[Key]
	public int BusinessEntityID { get; set; }

	[Required]
	[StringLength(2)]
	public string PersonType { get; set; }

	public bool NameStyle { get; set; }

	[StringLength(8)]
	public string Title { get; set; }

	[Required]
	[StringLength(50)]
	public string FirstName { get; set; }

	[StringLength(50)]
	public string MiddleName { get; set; }

	[Required]
	[StringLength(50)]
	public string LastName { get; set; }

	[StringLength(10)]
	public string Suffix { get; set; }

	public int EmailPromotion { get; set; }

	[Column(TypeName = "xml")]
	public string AdditionalContactInfo { get; set; }

	[Column(TypeName = "xml")]
	public string Demographics { get; set; }

	public Guid rowguid { get; set; }

	[DisplayFormat(DataFormatString = "{0:D}")]
	[ExcelColumn(UseDisplayFormatString = true)]
	public DateTime ModifiedDate { get; set; }

	public bool Deleted { get; set; }
}

Now that the data model is ready, the export can be implemented. This is completely implemented in the PersonsCsvExportController class. The GetPersonsCsvExport method gets the data, exports it to a CSV file and adds diagnostic messages to a HTML page using SignalR.

using System.Collections.Generic;
using System.Diagnostics;
using System.Net.Http.Headers;
using System.Text;
using System.Web.Http;
using ElasticsearchCRUD;
using ElasticsearchCRUD.ContextSearch;
using Microsoft.AspNet.SignalR;
using WebApiCSVExportFromElasticsearch.Models;

namespace WebApiCSVExportFromElasticsearch.Controllers
{
    public class PersonsCsvExportController : ApiController
    {
		private readonly IHubContext _hubContext = GlobalHost.ConnectionManager.GetHubContext<DiagnosisEventSourceService>();

		[Route("api/PersonsCsvExport")]
		public IHttpActionResult GetPersonsCsvExport()
		{
			_hubContext.Clients.All.addDiagnosisMessage(string.Format("Csv export starting"));

			// force that this method always returns an excel document.
			Request.Headers.Accept.Clear();
			Request.Headers.Accept.Add(new MediaTypeWithQualityHeaderValue("application/vnd.ms-excel"));

			_hubContext.Clients.All.addDiagnosisMessage(string.Format("ScanAndScrollConfiguration: 1s, 300 items pro shard"));
			_hubContext.Clients.All.addDiagnosisMessage(string.Format("sending scan and scroll _search"));
			_hubContext.Clients.All.addDiagnosisMessage(BuildSearchMatchAll());

			var result = new List<Person>(); 
			using (var context = new ElasticsearchContext("http://localhost:9200/", new ElasticsearchMappingResolver()))
			{
				context.TraceProvider = new SignalRTraceProvider(_hubContext, TraceEventType.Information);

				var scanScrollConfig = new ScanAndScrollConfiguration(new TimeUnitSecond(1), 300);
				var scrollIdResult = context.SearchCreateScanAndScroll<Person>(BuildSearchMatchAll(), scanScrollConfig);
				
				var scrollId = scrollIdResult.ScrollId;
				_hubContext.Clients.All.addDiagnosisMessage(string.Format("Total Hits: {0}", scrollIdResult.TotalHits));

				int processedResults = 0;
				while (scrollIdResult.TotalHits > processedResults)
				{
                                        var resultCollection = context.SearchScanAndScroll<Person>(scrollId, scanScrollConfig);
					scrollId = resultCollection.ScrollId;

					result.AddRange(resultCollection.PayloadResult);
					processedResults = result.Count;
					_hubContext.Clients.All.addDiagnosisMessage(string.Format("Total Hits: {0}, Processed: {1}", scrollIdResult.TotalHits, processedResults));
				}
			}

			_hubContext.Clients.All.addDiagnosisMessage(string.Format("Elasticsearch proccessing finished, starting to serialize csv"));
			return Ok(result);
		}

			//{
		//	"query" : {
		//		"match_all" : {}
		//	}
		//}
		private Search BuildSearchMatchAll()
		{
			return new Search()
			{
				Query = new Query(new MatchAllQuery())
			};

		}
}

Elasticsearch scan and scroll with ElasticsearchCRUD

The scan and scroll function can be used to select data fast without any sorting, as sorting is an expensive operation. The first request which uses the _search API (SearchCreateScanAndScroll), defines the Query for the scan and returns the total amount of hits for this query and also a scrollId.
This scrollId is then used to retrieve the next scroll. The scan is configured with a ScanAndScrollConfiguration class. The class defines the amount of items to be retrieved (max) pro shard within the defined time limit. if 300 is defined, and the index has 5 shards, 1500 documents will be scrolled pro request if the server can complete this within the time limit.

All of the following scroll requests returns a new scrollId which is then used for the next scroll (n + 1). This is repeated until all documents from the scan have been selected.

using (var context = new ElasticsearchContext("http://localhost:9200/", new ElasticsearchMappingResolver()))
{
	var scanScrollConfig = new ScanAndScrollConfiguration(1, TimeUnits.Second, 300);
	var scrollIdResult = context.SearchCreateScanAndScroll<Person>(BuildSearchMatchAll(), scanScrollConfig);
	
	var scrollId = scrollIdResult.ScrollId;

	int processedResults = 0;
	while (scrollIdResult.TotalHits > processedResults)
	{
		var resultCollection = context.SearchScanAndScroll<Person>(scrollId, scanScrollConfig);
		scrollId = resultCollection.ScrollId;

		// Use the data here: resultCollection.PayloadResult
		processedResults = result.Count;
	}
}

ElasticsearchCRUD TraceProvider using SignalR

The example also traces all ElasticsearchCRUD messages using SignalR. An IHubContext is created and this is then used in the SignalRTraceProvider

private readonly IHubContext _hubContext = GlobalHost.ConnectionManager.GetHubContext<DiagnosisEventSourceService>();

context.TraceProvider = new SignalRTraceProvider(_hubContext, TraceEventType.Information);

The TraceProvider sends messages to all clients, if the trace event level has a lower value than the min value defined in the constructor.

using System;
using System.Diagnostics;
using System.Text;
using ElasticsearchCRUD.Tracing;
using Microsoft.AspNet.SignalR;

namespace WebApiCSVExportFromElasticsearch
{
	public class SignalRTraceProvider : ITraceProvider
	{
		private readonly TraceEventType _traceEventTypelogLevel;
		private readonly IHubContext _hubContext;

		public SignalRTraceProvider(IHubContext hubContext, TraceEventType traceEventTypelogLevel)
		{
			_traceEventTypelogLevel = traceEventTypelogLevel;
			_hubContext = hubContext;
		}

		public void Trace(TraceEventType level, string message, params object[] args)
		{
			if (_traceEventTypelogLevel >= level)
			{
				var sb = new StringBuilder();
				sb.AppendLine();
				sb.Append(DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss") + ": ");
				sb.Append(string.Format(message, args));

				_hubContext.Clients.All.addDiagnosisMessage(string.Format("{0}: {1}", level, sb.ToString()));
			}
		}

		public void Trace(TraceEventType level, Exception ex, string message, params object[] args)
		{
			if (_traceEventTypelogLevel >= level)
			{
				var sb = new StringBuilder();
				sb.AppendLine();
				sb.Append(DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss") + ": ");
				sb.Append(string.Format(message, args));
				sb.AppendFormat("{2}: {0} , {1}", ex.Message, ex.StackTrace, ex.GetType());
				_hubContext.Clients.All.addDiagnosisMessage(string.Format("{0}: {1}", level, sb.ToString()));
			}
		}	
	}
}

SignalR can be downloaded from NuGet and is setup up as follows: (see the ASP.NET SignalR documentation for more info)

public class Startup
{
	public void Configuration(IAppBuilder app)
	{
		app.MapSignalR();
	}
}
public class DiagnosisEventSourceService : Hub
{
}

The SignalR HTML client is configured as follows:


<h4>Diagnosis</h4>
<div class="container">
    <ol id="discussion"></ol>
</div>

@section scripts {
    <!--Script references. -->
    <!--The jQuery library is required and is referenced by default in _Layout.cshtml. -->
    <!--Reference the SignalR library. -->

    <script src="~/Scripts/jquery.signalR-2.1.2.js"></script>
    <!--Reference the autogenerated SignalR hub script. -->
    <script src="~/signalr/hubs"></script>
    <!--SignalR script to update the chat page and send messages.-->
    <script>
        $(function () {
            // Reference the auto-generated proxy for the hub.
            var signalRService = $.connection.diagnosisEventSourceService;

            // Create a function that the hub can call back to display messages.
            signalRService.client.addDiagnosisMessage = function (message) {
                // Add the message to the page.
                $('#discussion').append('<li>' + htmlEncode(message) + '</li>');
            };

            // Start the connection.
            $.connection.hub.start();
        });
        // This optional function html-encodes messages for display in the page.

        function htmlEncode(value) {
            var encodedValue = $('<div />').text(value).html();
            return encodedValue;
        }
    </script>
}

The download and diagnostics can be viewed or used in the Home/Index MVC view:
el_csv_export_01

And the downloaded CSV with the Elasticsearch data export:
el_csv_export_02

Scan and scroll can be very useful if you need to select large amount of data from Elasticsearch in a unsorted manor. This is useful for backups, reindexing or exporting data to different mediums.

Links:

http://obtao.com/blog/2014/03/elasticsearch-symfony-export-scan-scroll-functions/

http://www.elasticsearch.org/guide/en/elasticsearch/guide/current/scan-scroll.html

http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/search-request-scroll.html

http://exploringelasticsearch.com/searching_data.html#sec-search-query-dsl

http://spinscale.github.io/elasticsearch/2012-03-jugm.html#/

2 comments

Leave a comment

This site uses Akismet to reduce spam. Learn how your comment data is processed.