Dynamic bulk copy SSIS package

Dynamic bulk copy SSIS package

Using the .NET SqlBulkCopy class as described in the blog ‘META driven SSIS and Bulk copy SSIS packages’, you can easily create a parameterized package to copy a bunch of tables at once. In most cases, the purpose of a process is copying a lot of tables and not just one. A set of related tables with consistent data is commonly the targeting object. Instead of creating multiple packages for this, we can create a generic package that covers the complete set of tables we want to copy. All we need to do is modify the bulk copy script using parameters, and create a list of tables that you want to copy. The following example explains it all.

Implementation example of a dynamic package

First, let’s create a parameter table to store the table names to copy.


CREATE TABLE [work].[Processing](
	[ID] [INT] IDENTITY(1,1) NOT NULL,
	[ObjectSequence] [INT] NOT NULL,
	[SourceObject] [VARCHAR](255) NOT NULL,
	[InsertDT] [DATETIME] NOT NULL DEFAULT (GETDATE()),
	[InsertedBy] [VARCHAR](255) NOT NULL DEFAULT (SUSER_SNAME()),
PRIMARY KEY CLUSTERED 
(
	[ID] ASC
)WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON, FILLFACTOR = 90) ON [PRIMARY]
) ON [PRIMARY];

Now, we insert the desired tables.


INSERT INTO [work].[Processing](ObjectSequence,SourceObject)
	VALUES(1,'[dbo].[Feature]');
INSERT INTO [work].[Processing](ObjectSequence,SourceObject)
	VALUES(2,'[dbo].[FeatureDescription]');
INSERT INTO [work].[Processing](ObjectSequence,SourceObject)
	VALUES(3,'[dbo].[Product]');

Now we can select the input table list from this table.

SELECT SourceObject
FROM [work].[Processing]
ORDER BY ObjectSequence
;

For this example, I’ve created source and target tables with the same name on different servers. Determining the ‘workload’ for this execution of our package will be the first step.

161122_1

Sql task detail:
161122_2

Store the result:
161122_3

Next we can loop over this working set as follows:
161122_4

161122_5

Next, generate a SQL command to clear the target table and use it in the execute sql command task:
161122_6

161122_7

When we’re passing the source object name to the script, the real copy job can start:
161122_8

The script code is almost the same:

#region Namespaces
using System;
using System.Data;
using Microsoft.SqlServer.Dts.Runtime;
using System.Windows.Forms;
using System.Data.SqlClient;
#endregion
namespace ST_b963b4dd93c54fbca6abf744e3be0dbf
{
///
/// ScriptMain is the entry point class of the script. Do not change the name, attributes,
/// or parent of this class.
///

[Microsoft.SqlServer.Dts.Tasks.ScriptTask.SSISScriptTaskEntryPointAttribute]
public partial class ScriptMain : Microsoft.SqlServer.Dts.Tasks.ScriptTask.VSTARTScriptObjectModelBase
{

bool fireAgain;

public void Main()
{

Dts.TaskResult = (int)ScriptResults.Failure; // initial state

// Open a sourceConnection to the database.
object rawSourceConnection = Dts.Connections["SourceConection.ADO"].AcquireConnection(Dts.Transaction);
SqlConnection sourceConnection = (SqlConnection)rawSourceConnection;

// Get data from the source table as a SqlDataReader.
SqlCommand commandSourceData = new SqlCommand((string)Dts.Variables["User::l_SourceObject"].Value, sourceConnection);
SqlDataReader reader = commandSourceData.ExecuteReader();

// Open the destination connection.
object rawTargetConnection = Dts.Connections["TargetConnection.ADO"].AcquireConnection(Dts.Transaction);
SqlConnection destinationConnection = (SqlConnection)rawTargetConnection;

// Set up the bulk copy object.
using (SqlBulkCopy bulkCopy =
new SqlBulkCopy(destinationConnection))
{
bulkCopy.DestinationTableName = (string)Dts.Variables["User::l_SourceObject"].Value;
bulkCopy.BatchSize = 10000;
bulkCopy.BulkCopyTimeout = 0;

// Set up the event handler to notify after 10000 rows.
bulkCopy.SqlRowsCopied +=
new SqlRowsCopiedEventHandler(OnSqlRowsCopied);
bulkCopy.NotifyAfter = 10000;

try
{
// Write from the source to the destination.
bulkCopy.WriteToServer(reader);
Dts.TaskResult = (int)ScriptResults.Success; // only success when everything works fine
}
catch (Exception ex)
{
Dts.Events.FireInformation(2, "Script failure for object:" + (string)Dts.Variables["User::l_SourceObject"].Value, "error:" + ex.Message, "", 0, ref fireAgain);
Dts.TaskResult = (int)ScriptResults.Failure;
}
finally
{
// Close the SqlDataReader. The SqlBulkCopy
// object is automatically closed at the end
// of the using block.
reader.Close();
}
}

}

void OnSqlRowsCopied(object sender, SqlRowsCopiedEventArgs e)
{
Dts.Events.FireInformation(1, "Copy data for object:" +(string)Dts.Variables["User::l_SourceObject"].Value, "Copied so far..." + e.RowsCopied.ToString(), "", 0, ref fireAgain);
}

#region ScriptResults declaration
///
/// This enum provides a convenient shorthand within the scope of this class for setting the
/// result of the script.
///
/// This code was generated automatically.
///

enum ScriptResults
{
Success = Microsoft.SqlServer.Dts.Runtime.DTSExecResult.Success,
Failure = Microsoft.SqlServer.Dts.Runtime.DTSExecResult.Failure
};
#endregion

}
}
 

Further parameterizing of the script can be added when needed, for example to avoid hard coded connection managers. In this example I do a full table copy, but you can understand that it doesn’t requires much work passing a SQL selective data query to the script and creating a partial load mechanism.

Parallelism is in this example not implemented, and all tables are copied in a serialized way. Nevertheless, parallelism can be implemented in such a package. Different approaches are possible to do it. For example, copying he script task multiple time and prepare some split logic of the input table list, is one of them. Another alternative is required a little more .NET code and insights, but you can instantiate the same logic in the script and create a table categorization model for splitting the work into pieces.

What comes next

As an example of an implementation, next blog shows a full staging package were this approach is used. For details see ‘Dynamic Bulk copy Staging package’ blog.