Import CSV Files into SQL using a dynamic table schema

I recently had an perfect scenario that required a bit of automation. In short, we needed to complete some weird performance monitoring across an entire Hyper-V cluster that consisted of a large number of nodes. In order to get the data into a platform to analyse and report on the data within a reasonable amount of time, we decided to use SQL Server as a database. Due to many different reasons there were specific limitations on the ways we could (1) get to the data and (2) the types (or at least the schema) of data we would receive. As such the requirements were simple:
  • Create a method to read multiple CSV files and dump the data in SQL Server
  • The import mechanism needs to be able to adapt to changing import data (we would want to be able to add various metrics which would mean the schema of the CSV file would be dynamic). We could however assume that all the files would have the same schema.
 

Enter PowerShell…

The script below provides a mechanism of:
  1. Reading a collection of files in a target directory
  2. Extracting the schema off the CSV files
  3. Dropping any existing table in the target DB
  4. Creating a dynamic schema based on the import files
  5. Creating a table with the new schema
  6. Populating all the data from the various CSV files into the newly created table
 
cls
# database Details
$dbServer = "xxxxxxx"
$dbDatabaseName = "xxxxxxxx"
$dbTable = "xxxxxxxx"
# loading modules
Import-Module SQLPs
# If not loaded, load SQL assemblies
try {add-type -AssemblyName "Microsoft.SqlServer.ConnectionInfo, Version=10.0.0.0, Culture=neutral, PublicKeyToken=89845dcd8080cc91" -EA Stop}
catch {add-type -AssemblyName "Microsoft.SqlServer.ConnectionInfo"}
try {add-type -AssemblyName "Microsoft.SqlServer.Smo, Version=13.0.1601.5, Culture=neutral, PublicKeyToken=89845dcd8080cc91" -EA Stop}
catch {add-type -AssemblyName "Microsoft.SqlServer.Smo"}
#######################
function Get-Type
{
param($type)
$types = @(
'System.Boolean',
'System.Byte[]',
'System.Byte',
'System.Char',
'System.Datetime',
'System.Decimal',
'System.Double',
'System.Guid',
'System.Int16',
'System.Int32',
'System.Int64',
'System.Single',
'System.UInt16',
'System.UInt32',
'System.UInt64')
if ( $types -contains $type ) {
Write-Output "$type"
}
else {
Write-Output 'System.String'
}
} #Get-Type
#######################
function Out-DataTable
{
[CmdletBinding()]
param([Parameter(Position=0, Mandatory=$true, ValueFromPipeline = $true)] [PSObject[]]$InputObject)
Begin
{
$dt = new-object Data.datatable
$First = $true
}
Process
{
foreach ($object in $InputObject)
{
$DR = $DT.NewRow()
foreach($property in $object.PsObject.get_properties())
{
if ($first)
{
$Col = new-object Data.DataColumn
$Col.ColumnName = $property.Name.ToString()
if ($property.value)
{
if ($property.value -isnot [System.DBNull]) {
$Col.DataType = [System.Type]::GetType("$(Get-Type $property.TypeNameOfValue)")
}
}
$DT.Columns.Add($Col)
}
if ($property.Gettype().IsArray) {
$DR.Item($property.Name) =$property.value | ConvertTo-XML -AS String -NoTypeInformation -Depth 1
}
else {
$DR.Item($property.Name) = $property.value
}
}
$DT.Rows.Add($DR)
$First = $false
}
}
End
{
Write-Output @(,($dt))
}
} #Out-DataTable
#######################
function Get-SqlType
{
param([string]$TypeName)
switch ($TypeName)
{
'Boolean' {[Data.SqlDbType]::Bit}
'Byte[]' {[Data.SqlDbType]::VarBinary}
'Byte' {[Data.SQLDbType]::VarBinary}
'Datetime' {[Data.SQLDbType]::DateTime}
'Decimal' {[Data.SqlDbType]::Decimal}
'Double' {[Data.SqlDbType]::Float}
'Guid' {[Data.SqlDbType]::UniqueIdentifier}
'Int16' {[Data.SQLDbType]::SmallInt}
'Int32' {[Data.SQLDbType]::Int}
'Int64' {[Data.SqlDbType]::BigInt}
'UInt16' {[Data.SQLDbType]::SmallInt}
'UInt32' {[Data.SQLDbType]::Int}
'UInt64' {[Data.SqlDbType]::BigInt}
'Single' {[Data.SqlDbType]::Decimal}
default {[Data.SqlDbType]::VarChar}
}
} #Get-SqlType
#######################
function Add-SqlTable
{
[CmdletBinding()]
param(
[Parameter(Position=0, Mandatory=$true)] [string]$ServerInstance,
[Parameter(Position=1, Mandatory=$true)] [string]$Database,
[Parameter(Position=2, Mandatory=$true)] [String]$TableName,
[Parameter(Position=3, Mandatory=$true)] [System.Data.DataTable]$DataTable,
[Parameter(Position=4, Mandatory=$false)] [string]$Username,
[Parameter(Position=5, Mandatory=$false)] [string]$Password,
[ValidateRange(0,8000)]
[Parameter(Position=6, Mandatory=$false)] [Int32]$MaxLength=1000,
[Parameter(Position=7, Mandatory=$false)] [switch]$AsScript
)
try {
if($Username)
{ $con = new-object ("Microsoft.SqlServer.Management.Common.ServerConnection") $ServerInstance,$Username,$Password }
else
{ $con = new-object ("Microsoft.SqlServer.Management.Common.ServerConnection") $ServerInstance }
$con.Connect()
$server = new-object ("Microsoft.SqlServer.Management.Smo.Server") $con
$db = $server.Databases[$Database]
$table = new-object ("Microsoft.SqlServer.Management.Smo.Table") $db, $TableName
foreach ($column in $DataTable.Columns)
{
$sqlDbType = [Microsoft.SqlServer.Management.Smo.SqlDataType]"$(Get-SqlType $column.DataType.Name)"
if ($sqlDbType -eq 'VarBinary' -or $sqlDbType -eq 'VarChar')
{
if ($MaxLength -gt 0)
{$dataType = new-object ("Microsoft.SqlServer.Management.Smo.DataType") $sqlDbType, $MaxLength}
else
{ $sqlDbType = [Microsoft.SqlServer.Management.Smo.SqlDataType]"$(Get-SqlType $column.DataType.Name)Max"
$dataType = new-object ("Microsoft.SqlServer.Management.Smo.DataType") $sqlDbType
}
}
else
{ $dataType = new-object ("Microsoft.SqlServer.Management.Smo.DataType") $sqlDbType }
$col = new-object ("Microsoft.SqlServer.Management.Smo.Column") $table, $column.ColumnName, $dataType
$col.Nullable = $column.AllowDBNull
$table.Columns.Add($col)
}
if ($AsScript) {
$table.Script()
}
else {
$table.Create()
}
}
catch {
$message = $_.Exception.GetBaseException().Message
Write-Error $message
}
} #Add-SqlTable
#######################
function Write-DataTable
{
[CmdletBinding()]
param(
[Parameter(Position=0, Mandatory=$true)] [string]$ServerInstance,
[Parameter(Position=1, Mandatory=$true)] [string]$Database,
[Parameter(Position=2, Mandatory=$true)] [string]$TableName,
[Parameter(Position=3, Mandatory=$true)] $Data,
[Parameter(Position=4, Mandatory=$false)] [string]$Username,
[Parameter(Position=5, Mandatory=$false)] [string]$Password,
[Parameter(Position=6, Mandatory=$false)] [Int32]$BatchSize=50000,
[Parameter(Position=7, Mandatory=$false)] [Int32]$QueryTimeout=0,
[Parameter(Position=8, Mandatory=$false)] [Int32]$ConnectionTimeout=15
)
$conn=new-object System.Data.SqlClient.SQLConnection
if ($Username)
{ $ConnectionString = "Server={0};Database={1};User ID={2};Password={3};Trusted_Connection=False;Connect Timeout={4}" -f $ServerInstance,$Database,$Username,$Password,$ConnectionTimeout }
else
{ $ConnectionString = "Server={0};Database={1};Integrated Security=True;Connect Timeout={2}" -f $ServerInstance,$Database,$ConnectionTimeout }
$conn.ConnectionString=$ConnectionString
try
{
$conn.Open()
$bulkCopy = new-object ("Data.SqlClient.SqlBulkCopy") $connectionString
$bulkCopy.DestinationTableName = "[$TableName]"
$bulkCopy.BatchSize = $BatchSize
$bulkCopy.BulkCopyTimeout = $QueryTimeOut
$bulkCopy.WriteToServer($Data)
$conn.Close()
}
catch
{
$ex = $_.Exception
Write-Error "$ex.Message"
continue
}
} #Write-DataTable
#######################
# listing stats files
write-host ("Reading files in Stats folder") -ForegroundColor Green
$files = GCI -Path C:\Hyper-VStatistics
# clear stats dataset
$dataset = $null
#######################
# itterate through all files and get the context from each CSV file
foreach ($file in $files)
{
$data = Import-csv $file.FullName
write-host ("Importing file: " + $file.name + " – " + $data.count + " records") -ForegroundColor Yellow
#write-host ("Importing records " + $data.count) -ForegroundColor Yellow
#$data
$dataset = $dataset + $data
}
####################### – SQL Population
write-host ("Total records: " + $dataset.count) -ForegroundColor Green
$dtDataSet = $dataset | Out-DataTable
write-host ("`nConnecting to SQL Server") -ForegroundColor Green
write-host ("`nDropping SQL table $dbTable if present") -ForegroundColor Green
$dbQuery1 = "IF OBJECT_ID('dbo.$dbTable', 'U') IS NOT NULL DROP TABLE dbo.[$dbTable]"
Invoke-Sqlcmd -Query $dbQuery1 -Database $dbDatabaseName -ServerInstance $dbServer
# Add data to database table
write-host ("`nAdding data to table $dbTable") -ForegroundColor Green
Add-SqlTable -ServerInstance $dbServer -Database $dbDatabaseName -TableName $dbTable -DataTable $dtDataSet
# Write table to database
write-host ("`nWriting data to table $dbTable") -ForegroundColor Green
Write-DataTable -ServerInstance $dbServer -Database $dbDatabaseName -TableName $dbTable -Data $dtDataSet
# validate all data in table
write-host ("`nCounting SQL table $dbTable rows") -ForegroundColor Green
$dbQuery2 = "SELECT COUNT(*) as Count from dbo.[$dbTable]"
$dbCount = Invoke-Sqlcmd -Query $dbQuery2 -Database $dbDatabaseName -ServerInstance $dbServer
If ($dbCount.Count -eq $dataset.Count)
{
write-host ("`nSTATUS – SQL table rows matches FILE import count") -ForegroundColor Green
}
else
{
write-host ("`nSTATUS – SQL table rows DOES NOT matches FILE import count") -ForegroundColor Red
}