2

I have a situation where in I have million records in CSV file, what I need to do is to apply some business validation on certain columns of CSV and load data to DB only if particular record is valid.

I tried with per record strategy but that could turn into nightmare if the DB is hit millions of times.

I am using MongoDB to store metaData and Redshift for storing actual data.

Erik
  • 4,722
  • 4
  • 23
  • 56
Vipin
  • 21
  • 4
  • 1
    Is there a reason that you need to do the validation before loading? Normally, you'd stage your load. So you'd load data into a staging area in the database however it exists in the file then a separate step would cleanse the data. – Justin Cave Oct 29 '15 at 03:42
  • Possible duplicate of [Benefits of using Staging Database while designing Data Warehouse](http://dba.stackexchange.com/questions/56320/benefits-of-using-staging-database-while-designing-data-warehouse) – Michael Green Oct 29 '15 at 04:58
  • Are you asking how to validate in the application layer and send the valid data to the database, how to validate the data in the database layer, what should be the overall approach, or something else? – Erik Oct 29 '15 at 21:43
  • @Erik He is saying that he is validating the data in the app layer and sending only valid data to the DB, but doing so with so many records is taking a long time, so he wants to know how to make that more efficient. As far as I can tell, the only thing that is unclear is what the target RDBMS is (which, of course, is a fairly major issue since my suggestion only works with one of them ;-). – Solomon Rutzky Oct 30 '15 at 02:29
  • 1
    @srutzky So the validation is basically a non-sequitur and the OP just wants efficient bulk inserts? – Erik Oct 30 '15 at 14:08
  • @Erik My interpretation of this request is that it is mostly yes to your summation ;-), with the possible caveat that perhaps the issue is more than just efficient bulk insert, but also better strategy if validating each row as it is read and possibly inserted complicates this whereas breaking that validation out to a separate preliminary step so the process is merely doing the bulk insert has maybe not been considered and/or attempted. I think I understand your initial question better now :-). Either way, I think I need to move my answer to a new question now that we know what DB this is ;-) – Solomon Rutzky Oct 30 '15 at 15:13

1 Answers1

2

Assuming that the RDBMS being used is Microsoft SQL Server (and version 2008 or newer), then Table-Valued Parameters (TVPs) are a great fit for this exact scenario. The idea is that you will read the file line by line in your .NET code but stream the data either all at once, or if that is too much for one transaction, broken up into batches, into SQL Server using a TVP. The TVP is essentially a table variable that is an input parameter to a stored procedure so this will be a set-based operation (as far as SQL Server is concerned), not row-by-row. Technically a stored procedure is not required as a TVP can be sent as a parameter to an ad hoc query, but working with a stored procedure is just a better approach anyway.

There are two main patterns for using a TVP while reading from a file (and each of them relies upon passing in a method that returns IEnumerable<SqlDataRecord> rather than a DataTable):

  1. Execute "import" Stored Procedure, that in turn initiates the file open and reading, all rows are read (and can be validated at this point) and streamed into the Stored Procedure. In this approach, the Stored Procedure is executed only once, and all rows are sent in as a single set. This is the easier approach, but for larger data sets (i.e. millions of rows) it might not perform the best if the operation is to merge the data directly into a live table rather than simply loading into a staging table. The memory required for this approach is the size of 1 record.

  2. Create a variable for int _BatchSize, open the file, and either:

    1. Create a collection to hold the batch of records
      1. Loop for _BatchSize or until no more lines to read from the file
        1. Read a line
        2. Validate
        3. Store valid entry in the collection
      2. Execute the Stored Procedure at the end of each loop, streaming in the collection.
      3. The memory required for this approach is the size of 1 record * _BatchSize.
      4. The benefit is that the transaction in the DB does not depend on any disk I/O latency or business logic latency.
    2. Loop to execute the Stored Procedure until no more lines to read from the file
      1. Execute the Stored Procedure
        1. Loop for _BatchSize or until no more lines to read from the file
          1. Read a line
          2. Validate
          3. Stream record into SQL Server
      2. The memory required for this approach is the size of 1 record.
      3. The downside is that the transaction in the DB depends on disk I/O latency and/or business logic latency, hence could be open longer, hence more potential for blocking.

I have a full example of pattern #1 over on StackOverflow in the following answer: How can I insert 10 million records in the shortest time possible?

For pattern #2.1 (a highly scalable approach), I have a partial example below (it is too late to finish so I will post the C# portion in the morning)...

Required database objects (using a contrived structure):

FIRST, you need a User-Defined Table Type.
Please note the use of UNIQUE, DEFAULT, and CHECK constraints to enforce data integrity before the records even hit SQL Server.
The UNIQUE Constraint is also how you create an index on a Table Variable :)

CREATE TYPE [ImportStructure] AS TABLE
(
    BatchRecordID INT IDENTITY(1, 1) NOT NULL,
    Name NVARCHAR(200) NOT NULL,
    SKU VARCHAR(50) NOT NULL UNIQUE,
    LaunchDate DATETIME NULL,
    Quantity INT NOT NULL DEFAULT (0),
    CHECK ([Quantity] >= 0)
);
GO

SECOND, use the UDTT as an input param to an import stored procedure.
Hence "Tabled-Valued Parameter" (TVP).

CREATE PROCEDURE dbo.ImportData (
   @CustomerID     INT,
   @ImportTable    dbo.ImportStructure READONLY
)
AS
SET NOCOUNT ON;

UPDATE prod
SET    prod.[Name] = imp.[Name],
       prod.[LaunchDate] = imp.[LaunchDate],
       prod.[Quantity] = imp.[Quantity]
FROM   [Inventory].[Products] prod
INNER JOIN @ImportTable imp
        ON imp.[SKU] = prod.[SKU]
WHERE  prod.CustomerID = @CustomerID;

INSERT INTO [Inventory].[Products] ([CustomerID], [SKU], [Name], [LaunchDate], [Quantity])
    SELECT  @CustomerID, [SKU], [Name], [LaunchDate], [Quantity]
    FROM    @ImportTable imp
    WHERE   NOT EXISTS (SELECT prod.[SKU]
                        FROM   [Inventory].[Products] prod
                        WHERE  prod.[SKU] = imp.[SKU]
                       );
GO

App Code:

First we will define the class that will be used to store the batch of records:

using System.Collections;
using System.Collections.Generic;
using System.Data;
using System.Data.SqlClient;
using System.IO;
using Microsoft.SqlServer.Server;

private class ImportBatch
{
  string Name;
  string SKU;
  DateTime LaunchDate;
  int Quantity;
}

Next we define the method used to stream the data from the collection into SQL Server. Please note:

  • The SqlMetaData entry for BatchRecordID is defined even though it is an IDENTITY field. However, it is defined in such a way as to indicate that the value is server-generated.
  • The yield return returns the record but then control comes right back to the next line (which just goes back to the top of the loop).
private static IEnumerable<SqlDataRecord> SendImportBatch(List<ImportBatch> RecordsToSend)
{
   SqlMetaData[] _TvpSchema = new SqlMetaData[] {
      new SqlMetaData("BatchRecordID", SqlDbType.Int, true, false,
                      SortOrder.Unspecified, -1),
      new SqlMetaData("Name", SqlDbType.NVarChar, 200),
      new SqlMetaData("SKU", SqlDbType.VarChar, 50),
      new SqlMetaData("LaunchDate", SqlDbType.DateTime),
      new SqlMetaData("Quantity", SqlDbType.Int)
   };

   SqlDataRecord _DataRecord = new SqlDataRecord(_TvpSchema);

   // Stream the collection into SQL Server without first
   // copying it into a DataTable.
   foreach (ImportBatch _RecordToSend in RecordsToSend)
   {
      // we don't set field 0 as that is the IDENTITY field
      _DataRecord.SetString(1, _RecordToSend.Name);
      _DataRecord.SetString(2, _RecordToSend.SKU);
      _DataRecord.SetDateTime(3, _RecordToSend.LaunchDate);
      _DataRecord.SetInt32(4, _RecordToSend.Quantity);

      yield return _DataRecord;
   }
}

Finally, we define the overall import processing operation. It opens the Connection to SQL Server and the File, then it cycles through the file reading and validating the _BatchSize number of records for each cycle. The stored procedure parameters are only defined once as they do not change: the CustomerID value doesn't change, and the TVP parameter value is merely a reference to a method -- SendImportBatch -- that only gets invoked when the stored procedure is executed via ExecuteNonQuery. The input parameter to the method passed in as the TVP value is a reference type so it should always reflect the current value of that variable / object.

public static void ProcessImport(int CustomerID)
{
   int _BatchSize = GetBatchSize();
   string _ImportFilePath = GetImportFileForCustomer(CustomerID);

   List<ImportBatch> _CurrentBatch = new List<ImportBatch>();
   ImportBatch _CurrentRecord;

   SqlConnection _Connection = new SqlConnection("{connection string}");
   SqlCommand _Command = new SqlCommand("ImportData", _Connection);
   _Command.CommandType = CommandType.StoredProcedure;

   // Parameters do not require leading "@" when using CommandType.StoredProcedure
   SqlParameter _ParamCustomerID = new SqlParameter("CustomerID", SqlDbType.Int);
   _ParamCustomerID.Value = CustomerID;
   _Command.Parameters.Add(_ParamCustomerID);

   SqlParameter _ParamImportTbl = new SqlParameter("ImportTable", SqlDbType.Structured);
   // TypeName is not needed when using CommandType.StoredProcedure
   //_ParamImportTbl.TypeName = "dbo.ImportStructure";
   // Parameter value is method that returns streamed data (IEnumerable)
   _ParamImportTbl.Value = SendImportBatch(_CurrentBatch);
   _Command.Parameters.Add(_ParamImportTbl);

   StreamReader _FileReader = null;

   try
   {
      int _RecordCount;
      string[] _InputLine = new string[4];

      _Connection.Open();

      _FileReader = new StreamReader(_ImportFilePath);

       // process the file
       while (!_FileReader.EndOfStream)
       {
          _RecordCount = 1;

          // process a batch
          while (_RecordCount <= _BatchSize
                  && !_FileReader.EndOfStream)
          {
             _CurrentRecord = new ImportBatch();

             _InputLine = _FileReader.ReadLine().Split(new char[]{','});

             _CurrentRecord.Name = _InputLine[0];
             _CurrentRecord.SKU = _InputLine[1];
             _CurrentRecord.LaunchDate = DateTime.Parse(_InputLine[2]);
             _CurrentRecord.Quantity = Int32.Parse(_InputLine[3]);

             // Do validations, transformations, etc
             if (record is not valid)
             {
                _CurrentRecord = null;
                continue; // skip to next line in the file
             }

             _CurrentBatch.Add(_CurrentRecord);
             _RecordCount++; // only increment for valid records
          }

          _Command.ExecuteNonQuery(); // send batch to SQL Server

          _CurrentBatch.Clear();
       }
   }
   finally
   {
      _FileReader.Close();

      _Connection.Close();
   }

   return;
}

UPDATE:

Since it might not be apparently obvious to everyone reading the code, I should point out, especially with regards to comments on the Question suggesting to use a staging table or even staging database, that the method I outlined above is really just a variation of that approach. How so? Well, the table variable that is the TVP is the staging table. And in fact, as opposed to a typical staging table which is a permanent table, being temporary and not shared means that you do not need to worry about:

  • cleaning up the data after the process is done
  • separating multiple concurrent import processes

If the READONLY nature of the TVP prohibits some validation and/or transformation that is needed before being merged into the destination table, then the data in the TVP can be easily transferred to a local temporary table at the beginning of the Stored Procedure.

Solomon Rutzky
  • 65,256
  • 6
  • 135
  • 267