Catégories
Talend

Talend et MongoDB : Une Connexion Performante, Flexible et Evolutive

Lors d’un récent projet de mise en place d’un PIM (Product Information Management) visant à centraliser une grande quantité de données, l’utilisation de MongoDB s’est avérée nécessaire. Pour répondre à ce besoin, j’ai adopté une approche particulière axée sur l’évolutivité, et les performances. Dans cette démarche, j’ai exploité les routines Java de Talend Integration Suite pour établir une connexion à MongoDB sans nécessiter la licence BigData. Les résultats ont été positifs, et dans cet article, je partage en détail cette approche. Bien que je me concentre ici sur la lecture d’une collection pour des raisons de simplicité, il est essentiel de souligner que cette approche peut tout aussi bien être étendue à l’insertion, à la mise à jour des données, ainsi qu’à l’extraction sous forme de flux des modifications du journal d’historisation (OpLog), une fonctionnalité native de MongoDB.

Télécharger MongoDB Java Drivers

La première étape essentielle est de télécharger MongoDB Java Drivers pour commencer à utiliser MongoDB.

Les librairies citées ci-dessous seront ajoutées comme dépendances à votre routine Talend.

Assurez-vous de choisir les versions compatibles avec votre version de MongoDB (Dans cet exemple, nous utilisons le pilote Java v4.5.1 pour se connecter à MongoDB v5.0).

Créer une nouvelle routine

Maintenant que nous avons téléchargé les librairies Java, nous allons créer une routine Talend nommée MongoDBConnection pour encapsuler notre logique de connexion à la base de données.

package routines;

import java.util.Arrays;
import java.util.List;

import org.bson.Document;
import org.bson.conversions.Bson;

import com.mongodb.ConnectionString;
import com.mongodb.client.MongoClients;
import com.mongodb.client.MongoCursor;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.Aggregates;

/**
 * Classe permettant la connexion à une base de données MongoDB et l'exécution
 * d'opérations.
 * 
 * @author Votre Nom
 */
public class MongoDBConnection {

    private final String databaseName;
    private final String connectionString;
    private boolean isProcessFinished = false;

    private MongoCursor<Document> mongoCursor;

    /**
     * Constructeur de la classe MongoDBConnection.
     * 
     * @param connectionString L'URL de connexion à MongoDB.
     * @param databaseName     Le nom de la base de données MongoDB.
     */
    public MongoDBConnection(String connectionString, String databaseName) {
        this.connectionString = isEmpty(connectionString) ? "mongodb://127.0.0.1:27017" : connectionString;
        this.databaseName = requireNonEmpty(databaseName, "Database name cannot be null or empty.");
    }

    /**
     * Établit une connexion à la base de données MongoDB.
     * 
     * @return La base de données MongoDB connectée.
     * @throws IllegalArgumentException Si la connexion échoue.
     */
    public MongoDatabase connect() {
        try {
            return MongoClients.create(new ConnectionString(connectionString)).getDatabase(databaseName);
        } catch (Exception ex) {
            throw new IllegalArgumentException(ex.getMessage());
        }
    }

    /**
     * Exécute une requête sur une collection MongoDB avec des options de
     * filtrage et de tri facultatives.
     * 
     * @param collectionName Le nom de la collection à interroger.
     * @param queryString    La chaîne de filtrage de la requête.
     * @param sortString      La chaîne de tri de la requête.
     * @throws IllegalArgumentException Si le nom de la collection est nul ou vide.
     */
    public void queryCollection(String collectionName, String queryString, String sortString) {
        if (isEmpty(collectionName)) {
            throw new IllegalArgumentException("Collection name cannot be null or empty.");
        }

        Bson filter = getAggregationOperatorFromString(queryString);

        List<Bson> operators = (isEmpty(sortString))
                ? Arrays.asList(Aggregates.match(filter))
                : Arrays.asList(Aggregates.match(filter), Aggregates.sort(getAggregationOperatorFromString(sortString)));

        mongoCursor = connect().getCollection(collectionName).aggregate(operators).iterator();
    }

    /**
     * Convertit une chaîne en opérateur d'agrégation MongoDB (Bson Document).
     * 
     * @param operator La chaîne représentant l'opérateur.
     * @return L'opérateur d'agrégation MongoDB sous forme de Document Bson.
     */
    public Document getAggregationOperatorFromString(String operator) {
        if (isEmpty(operator)) {
            return Document.parse("{}");
        }

        return Document.parse(operator);
    }

    /**
     * Récupère le document suivant du curseur.
     * 
     * @return Le document suivant ou null si le curseur est vide.
     */
    public Document getNextDocument() {
        if (mongoCursor != null) {
            Document next = mongoCursor.tryNext();
            if (next != null) {
                return next;
            } else {
                mongoCursor.close();
                this.isProcessFinished = true;
            }
        } else {
            this.isProcessFinished = true;
        }
        return null;
    }

    /**
     * Récupère le document suivant sous forme de chaîne JSON.
     * 
     * @return Le document suivant au format JSON ou null si le curseur est vide.
     */
    public String getNextDocumentToJSON() {
        Document doc = getNextDocument();
        return (doc != null) ? doc.toJson() : null;
    }

    /**
     * Vérifie si le processus est terminé.
     * 
     * @return true si le processus est terminé, false sinon.
     */
    public boolean isProcessFinished() {
        return this.isProcessFinished;
    }

    /**
     * Vérifie si la chaîne est vide ou contient "null".
     * 
     * @param s La chaîne à vérifier.
     * @return true si la chaîne est vide ou contient "null", false sinon.
     */
    public static boolean isEmpty(String s) {
        if (s == null) {
            return true;
        }
        if (s.trim().isEmpty()) {
            return true;
        }
        if (s.trim().equals("null")) {
            return true;
        }
        return false;
    }

    /**
     * Exige une valeur non vide, sinon lève une exception avec le message
     * spécifié.
     * 
     * @param value   La valeur à vérifier.
     * @param message Le message à inclure dans l'exception.
     * @return La valeur si elle n'est pas vide.
     * @throws IllegalArgumentException Si la valeur est vide.
     */
    private static String requireNonEmpty(String value, String message) {
        if (isEmpty(value)) {
            throw new IllegalArgumentException(message);
        }
        return value;
    }
}

Ajouter les dépendances à la routine

Créer un nouveau job

Créez un nouveau job Talend pour tester la routine MongoDBConnection, que vous pouvez nommer job_Read_Mongo_Collection. Ajoutez deux composants, tJavaFlex et tLogRow, puis reliez-les en renommant le lien comme NEXT_DOCUMENT.

Adaptez le code suivant et modifiez la connexion et le nom de la base passés en paramètres du constructeur de MongoDBConnection.

Modifiez également le nom de la collection et le filtre (optionnel) à appliquer sur la collection.

MongoDBConnection mongoConnection = new MongoDBConnection("mongodb://127.0.0.1:27017", "PIMSOG");
mongoConnection.queryCollection("H308", "{ EAN13: '3324976120027' }", null);

while (true) {
  String sRecord = mongoConnection.getNextDocumentToJSON();

  if (mongoConnection.isProcessFinished() == Boolean.TRUE) {

    break;
  } else {

    // Flux de sortie
    if (sRecord != null) {
      NEXT_DOCUMENT.Record = sRecord.trim();
    }
  }
}

En conclusion, l’intégration de MongoDB dans Talend peut s’avérer à la fois puissante et flexible. En adoptant une approche basée sur des routines Java personnalisées, nous avons démontré comment établir une connexion à une base de données MongoDB sans nécessiter la licence BigData, offrant ainsi une solution évolutive et performante.

La simplicité de l’exemple présenté ici, se limitant à la lecture d’une collection MongoDB, ne reflète qu’une fraction des capacités offertes. J’ai étendu cette approche pour gérer l’insertion, la mise à jour de données, ainsi que l’extraction du journal des modifications (OpLog) de MongoDB pour un mode delta.

Cette approche procure une maîtrise totale de votre code, une évolutivité garantie et des par expérience des performances optimales. L’intégration de MongoDB avec Talend s’érige comme une solution robuste.

J’espère que cet article vous a fourni des insights utiles pour tirer parti de la synergie entre MongoDB et Talend, et je vous encourage à explorer davantage cette approche pour répondre à vos besoins spécifiques. N’hésitez pas à partager vos expériences et à poser des questions dans les commentaires.

Bonne intégration !

Laisser un commentaire

Votre adresse e-mail ne sera pas publiée. Les champs obligatoires sont indiqués avec *