Au fil des flows

2juin/140

Implementer ContinueWith pour les Rx Extension .Net

En ce moment, j'utilise beaucoup la librairie Rx Extension pour ma mission actuelle. Il s'agit d'un client lourd en WPF et .NET 4. Pour les requetes vers notre backend, nous avons choisi un schéma orienté flux pour requêter de manière plus naturelle des données sur un backend exposant certains de nos services en WCF.

Il nous a fallu à un moment synchroniser 2 requêtes. Je n'ai pas trouvé dans les Rx extensions de méthode permettant de rendre séquentiel le post traitement de 2 flux. L'opérateur Join s'en rapproche mais s'appuie sur une fenêtre temporelle.

Pour résoudre ce cas, j'ai décidé de coder un opérateur WaitOnceFor. Pour déclencher l'envoi d'un evenement (et d'un seul) du flux 1, j'attends de recevoir celui du flux 2.

var observable1 = Observable.CreateAsync<...>(...);
var observable2 = Observable.CreateAsync<...>(...);

observable1.WaitOnceFor(observable2).Subscribe(v => { "J'ai reçu mon signal" });

Pourquoi WaitOnceFor ?

J'aurai pu l'appeller ContinueWith ou ContinueAfter. Je cherchai effectivement un comportement proche du ContinueWith qu'offre les Tasks.

La nature des Rx Extension est cependant très différent du système de Task. Un IObservable représente pas une valeur comme le ferait une tache mais une suite de valeurs. Les opérateurs doivent tenir compte de cette nature et justement l'opérateur que je propose va à contre courant.

Si je l'avais appellé ContinueWith or ContinueAfter, j'aurai du implémenter un opérateur fonctionnant sur une suite d'événement. Dans mon cas, c'est un appel asynchrone qui fonctionne exactement sur le modèle d'une Task, à part que je veux profiter de l'Async Pattern. N'etant pas en .Net 4.5, je n'avais pas la possibilité de m'appuyer sur le mécanisme async/await

Le code

Voici la solution que je vous propose. C'est une méthode d'extension. Si other est de valeur null, ça signifie que je n'ai pas besoin de l'attendre, donc je forward directement l'observable source.

Elle est loin d'être parfaite. Voici le snippet.WaitOnceFor

/// Send a single signal when events of both IObservable where sent at least once
/// case 1 :
/// source : --- 1 --- 2 --- 3 --- 4
/// other  : ------------------ 1 ---
/// Result : ------------------ 1 ---
///
/// case 2:
/// source : ------------------ 1 ---
/// other  : --- 1 --- 2 --- 3 --- 4-
/// Result : ------------------ 1 ---
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="source"></param>
/// <param name="other"></param>
/// <returns></returns>
public static IObservable<T> WaitOnceFor<T, K>(this IObservable<T> source, IObservable<K> other)
{
    if (other == null)
        return source;

    return Observable.Create<T>((IObserver<T> observer) =>
    {
        bool sourceDone = false;
        bool otherDone = false;
        T value = default(T);
        Action<T> trycomplete = (T val) =>
        {
            if (otherDone == true && sourceDone == true)
            {
                observer.OnNext(val);
                observer.OnCompleted();
            }
        };

        var d2 = source.Take(1).Subscribe(r =>
        {
            sourceDone = true;
            value = r;
            trycomplete(value);
        });

        var d1 = other.Take(1).Subscribe(r =>
        {
            otherDone = true;
            trycomplete(value);
        });

        return Disposable.Create(() =>
        {
            d1.Dispose();
            d2.Dispose();
        });
    });
}

Posted by Fabien Arcellier

Remplis sous: Non classé Laisser un commentaire
Commentaires (0) Trackbacks (0)

Aucun commentaire pour l'instant


Leave a comment

Aucun trackbacks pour l'instant