Création de sorties asynchrones sur le composant script de SSIS

Cet article est un article invité du blog de Jean-Pierre.

Cet article va expliquer de manière détaillée comment générer un flux asynchrone à partir du composant Script de SSIS. C’est-à-dire générer une sortie déconnectée du flux d’entré.

Nous allons prendre un exemple trivial. En entré, nous avons un flux composé de 2 colonnes : NS et Reste. Ce flux provient d’un fichier avec un séparateur de colonne point-virgule. La colonne Reste contient 2 valeurs séparées par des virgules. Nous voulons obtenir 1 ligne en sortie pour chaque valeur de Reste et en conservant le même NS.

Fichier en entré :

NS;Reste
A;Hello,World
B;Salut,Monde

En sortie, nous souhaitons avoir 2 lignes par ligne d’entrée :

A Hello
A World
B Salut
B Monde

Il s’agit de « dépivoter » les données en entrée. Ce qui correspond à un UNPIVOT en SQL.

Mise en place de l’asynchronisme

Une fois le composant script branché sur une entrée, allez dans l’onglet Inputs and Outputs et passer la sortie en mode asynchrone (Propriété SynchronousInputID = None).

async_ssis1

Figure 1: Propriétés Inputs and Outputs du script

A la sortie MaSortie, ajoutez les deux colonnes du flux souhaitées en sortie. Pour des raisons de simplification, elles seront de type DT_STR et de taille 50. Cette étape se fait à la main et s’il y a beaucoup de colonnes, peut être fastidieuse.

async_ssis2

Figure 2: Ajout des colonnes de sortie

Il faut indiquer maintenant au script de traiter les lignes une par une. Cela est effectué en ajoutant dans le code, la méthode suivante :

public override void Input0_ProcessInput(Input0Buffer Buffer)
    {        
        //Tant qu'il y a des lignes à traiter
        while (Buffer.NextRow())
        {
            //Appel du traitement d'une ligne
            Input0_ProcessInputRow(Buffer);
        }

        //quand il n'y a plus de lignes
        if (Buffer.EndOfRowset())
        {
            //Fermeture du flux de sortie
            MaSortieBuffer.SetEndOfRowset();
        }
    }

Le nom de méthode est suffixé par le nom du flux d’entrée du composant. Dans notre exemple Input0. Pensez à le changer si besoin. Par défaut, le VSTA de SSIS 2008 ne donne pas le prototype de cette méthode, il faut l’écrire à la main.

Ce code renvoie chaque ligne (Row) lue du tampon (Buffer) d’entré vers une méthode de traitement.

Une fois le tampon d’entré entièrement lu, le tampon de sortie est fermé.

Le composant est prêt à être utilisé en mode asynchrone. Il ne reste qu’à écrire le traitement métier.

Traitement métier

Le code est très simple. Il s’agit de la méthode classique XXX_ProcessInputRow dont le prototype est présent dans le code (avec XXX le nom du flux d’entré). Dans notre exemple : Input0_ProcessInputRow.

Il suffit de maintenant de séparer les valeurs de la colonne Reste et en faire deux lignes de sortie (en utilisant le tampon MaSortieBuffer).

Le code de notre exemple est le suivant.

public override void Input0_ProcessInputRow(Input0Buffer Row)
    {
        //Traitement d'une ligne

        //Séparation de la valeur de la colonne Reste par la virgule
        var valeurs = Row.Reste.Split(',');

        //Pour chaque valeur, il faut créer une ligne dans le flux de sortie et l'associer au NS
        foreach (var valeur in valeurs)
        {
            MaSortieBuffer.AddRow();
            MaSortieBuffer.NS = Row.NS;
            MaSortieBuffer.MaValeur = valeur;
        }
    }

Notons que c’est à ce niveau que la méthode AddRow est employée sur le tampon de sortie pour créer une ligne par valeur séparée.

Pour des raisons de simplification, aucun contrôle (taille, …) n’est effectué.

Résultat

En exécutant notre Data Flow, nous obtenons la séparation désirée, chaque valeur séparée par une virgule donne lieu à une ligne de sortie. Dans notre exemple, cela donne pour une ligne d’entré, deux lignes en sortie.

async_ssis3

Figure 3: Résultat de l’exécution de notre script

Il ne reste qu’à poursuivre le traitement SSIS avec nos données « dépivotées ».

Le code complet du script se trouve ci-dessous en annexe.

A quoi ça sert ?

Bonne question !

Dans une mission, il m’est arrivé de recevoir une colonne contenant une règle mathématique sous forme d’une liste de code A + B +C = D (bon, les règles étaient un peu plus complexes, il y avait des moins). La demande client était d’effectuer des agrégations de valeurs suivant ces règles. La valeur de code D est obtenue par la somme des valeurs des codes A, B et C. Un développeur .NET s’est chargé de développer un parseur syntaxique, me permettant d’obtenir un tableau des codes à sommer.

L’utilisation du code ci-dessus a simplement permis de normaliser la règle en deux colonnes code_parent et code_a_sommer. Et ensuite … c’est du SQL à base de SUM et GROUP BY.

Annexe : code complet du script

/* Microsoft SQL Server Integration Services Script Component
* Write scripts using Microsoft Visual C# 2008.
* ScriptMain is the entry point class of the script.*/

using System;
using System.Data;
using Microsoft.SqlServer.Dts.Pipeline.Wrapper;
using Microsoft.SqlServer.Dts.Runtime.Wrapper;

[Microsoft.SqlServer.Dts.Pipeline.SSISScriptComponentEntryPointAttribute]
public class ScriptMain : UserComponent
{

public override void PreExecute()
{
base.PreExecute();
/*
Add your code here for preprocessing or remove if not needed
*/
}

public override void PostExecute()
{
base.PostExecute();
/*
Add your code here for postprocessing or remove if not needed
You can set read/write variables here, for example:
Variables.MyIntVar = 100
*/
}

/// <summary>
/// Code de traitement asynchrone de l'entrée
/// Code à ajouter
/// </summary>
/// <param name="Buffer"></param>
public override void Input0_ProcessInput(Input0Buffer Buffer)
{
//Tant qu'il y a des lignes à traiter
while (Buffer.NextRow())
{
//Appel du traitement d'une ligne
Input0_ProcessInputRow(Buffer);
}

//quand il n'y a plus de lignes
if (Buffer.EndOfRowset())
{
//Fermeture du flux de sortie
MaSortieBuffer.SetEndOfRowset();
}
}

public override void Input0_ProcessInputRow(Input0Buffer Row)
{
//Traitement d'une ligne

//Séparation de la valeur de la colonne Reste par la virgule
var valeurs = Row.Reste.Split(',');

//Pour chaque valeur, il faut créer une ligne dans le flux de sortie et l'associer au NS
foreach (var valeur in valeurs)
{
MaSortieBuffer.AddRow();
MaSortieBuffer.NS = Row.NS;
MaSortieBuffer.MaValeur = valeur;
}
}

public override void CreateNewOutputRows()
{
/*
Add rows by calling the AddRow method on the member variable named "<Output Name>Buffer".
For example, call MyOutputBuffer.AddRow() if your output was named "MyOutput".
*/
}

}

Merci à JP pour mettre à disposition cet article en le publiant sur son blog.

Laisser un commentaire

Entrez vos coordonnées ci-dessous ou cliquez sur une icône pour vous connecter:

Logo WordPress.com

Vous commentez à l'aide de votre compte WordPress.com. Déconnexion / Changer )

Image Twitter

Vous commentez à l'aide de votre compte Twitter. Déconnexion / Changer )

Photo Facebook

Vous commentez à l'aide de votre compte Facebook. Déconnexion / Changer )

Photo Google+

Vous commentez à l'aide de votre compte Google+. Déconnexion / Changer )

Connexion à %s