.NET - Apresentando Parallel LINQ (PLINQ)


A language-integrated Query - LINQ, foi introduzida com a .NET Framework 3.5 e apresenta um modelo unificado para requisitar qualquer fonte de dados de System.Collections.IEnumerable ou de System.Collections.Generic.IEnumerable<T> de uma maneira segura. LINQ para objetos é o nome para consultas LINQ que são executadas em coleções de memória como List<T> e matrizes.

Parallel LINQ também conhecido como PLINQ é uma implementação do processamento paralelo para LINQ to Objects.

Assim o PLINQ, disponível a partir do .NET Framework 4.0, tem a capacidade de realizar consultas usando a computação paralela. Dessa forma podemos realizar uma tarefa com PLINQ de forma que ela seja executada concorrentemente.

PLINQ implementa todos os métodos de extensão de uma consulta LINQ, tendo operadores adicionais para realizar as operações em paralelo. O grau de simultaneidade das consultas PLINQ esta baseado na capacidade do computador que executa a consulta.

Em muitos cenários (não todos) o PLINQ pode proporcionar um aumento significativo na velocidade usando todas as CPUs ou núcleos de CPU disponíveis.

Uma consulta PLINQ pode proporcionar um ganho de desempenho quanto ela realiza operações com uso intensivo da CPU.

Um problema frequente com aplicações Windows Forms ocorre quando tentamos atualizar um controle no formulário a partir de uma thread diferente da thread que criou o controle; nestes casos uma exceção InvalidOperationException é lançada com a mensagem :
 “Cross-thread operation not valid: Control ‘txtLog’ accessed from a thread other than the thread it was created on.”

A seguir vamos fornecer uma visão geral das recursos do PLINQ com exemplos.

Estou usando o Visual Studio 2012 Express for desktop para criar os exemplos em VB .NET e C#; e o template Console Application nos exemplos.

A classe ParallelEnumerable

A classe System.Linq.ParallelEnumerable expõe as principais funcionalidades do PLINQ e inclui implementações de todos os operadores de consulta padrão que o LINQ to Objects suporta; PLINQ também contém um conjunto de métodos que habilita os comportamentos específicos para a execução paralela. Vejamos alguns métodos dessa classe:

1 - O método de extensão AsParallel

O método de extensão AsParallel é o ponto de entrada para o PLINQ e especifica que a consulta deve ser processada em paralelo; ele divide o trabalho em cada processador ou núcleo do processador.

O exemplo a seguir inicia um inicia um cronômetro(stopwatch) no namespace System.Diagnostics para mostrar o tempo decorrido quando a tarefa for completada, e, em seguida, a classe Enumerable, produz uma sequência de números inteiros de 1 a 10.

A chamada do método AsParallel é adicionada na fonte. Isto faz com que as iterações sejam espalhadas pelos processadores e núcleos disponíveis.

A seguir uma consulta LINQ recupera todos os números pares, mas na consulta LINQ, a cláusula WHERE está chamando um método Calcular que tem um atraso de um segundo usando a classe Thread, que está no namespace System.Threading. Finalizando, temos um laço foreach/For each que exibe o resultado:

Imports System.Threading

Module Module1

    Sub Main()
        inicia()
        Console.ReadKey()
    End Sub

    Private Sub inicia()
        Dim sw As New Stopwatch
        sw.Start()
        Dim fonte = Enumerable.Range(1, 10).AsParallel()

        Dim evenNums = From num In fonte Where Calcular(num) Mod 2 = 0
                                  Select num

        For Each ev In evenNums
            Console.WriteLine("{0} na Thread {1}", New Object() {ev, Thread.CurrentThread.GetHashCode})
        Next
        sw.Stop()
        Console.WriteLine("Concluído {0}", New Object() {sw.Elapsed})
    End Sub

    Private Function Calcular(ByVal num As Integer) As Integer
        Console.WriteLine("Calculando {0} na Thread {1}", New Object() {num, Thread.CurrentThread.GetHashCode})
        Thread.Sleep(1000)
        Return num
    End Function

End Module

Executando o código acima teremos o seguinte resultado:

A seguir temos a versão na linguagem C# e o resultado obtido:

using System;
using System.Diagnostics;
using System.Linq;
using System.Threading;

namespace PLINQ_CSharp
{
    class Program
    {
        static void Main(string[] args)
        {
            Stopwatch sw = new Stopwatch();
            sw.Start();
            var fonte = Enumerable.Range(1, 10).AsParallel();
            
            var evenNums = from num in fonte
                                    where Calcular(num) % 2 == 0
                                    select num;

            foreach (var ev in evenNums)
            {
                Console.WriteLine("{0} na Thread {1}", ev,
                Thread.CurrentThread.GetHashCode());
            }
            sw.Stop();
            Console.WriteLine("Concluído {0}", sw.Elapsed);
            Console.ReadKey();
        }
        public static int Calcular(int num)
        {
            Console.WriteLine("Calculando {0} na Thread {1}", num,
            Thread.CurrentThread.GetHashCode());
            Thread.Sleep(1000);
            return num;
        }
    }
}

Podemos ver que o laço foreach/For each exibe os números pares usando a thread principal da aplicação que é a thread 9.

Como a thread 9 esta executando o laço foreach/For each, que é a interface com o usuário, ele exibe o resultado depois do método Calcular.

O método Calcular esta sendo executado em uma thread diferente, mas a thread varia de 10 a 13, pois o meu computador possui quatro processadores. Assim embora o método Calcular tenha 1 segundo de atraso ele leva poucos segundos para executar pois temos 4 threads alocadas.

2 - O método de extensão ForAll

O método ForAll é um método de enumeração multithreaded que, ao contrário de iterar sobre os resultados da consulta, permite que os resultados sejam processados paralelamente sem primeiro mesclarem-se de volta para thread consumidora.

Quando a consulta é iterada usando um laço foreach/For each cada iteração é sincronizada na mesma thread para ser tratada uma após a outra na ordem de sequência. Se você quiser apenas realizar realizar cada iteração em paralelo, sem qualquer ordem específica, use o método ForAll. Ele tem o mesmo feito que a realização de cada iteração em uma thread diferente. Verifique esta técnica para saber se você obtêm o ganho de desempenho esperado.

A seguir temo um exemplo de uso de ForAll:

Imports System.Threading

Module Module1

    Sub Main()

        Dim sw As New Stopwatch
        sw.Start()

        Dim source = Enumerable.Range(1, 10).AsParallel()

        Dim evenNums = From num In source
                                 Where Calcular(num) Mod 2 = 0
                                 Select num

        evenNums.ForAll(Sub(ev) Console.WriteLine(String.Format("{0} na Thread {1}", ev, Thread.CurrentThread.GetHashCode())))
        sw.Stop()

        Console.WriteLine((String.Format("Done {0}", New Object() {sw.Elapsed})))
        Console.ReadKey()

    End Sub

    Private Function Calcular(ByVal num As Integer) As Integer
        Console.WriteLine(String.Format("Calculando {0} na Thread {1}", num, Thread.CurrentThread.GetHashCode()))
        Thread.Sleep(1000)
        Return num
    End Function

End Module

A seguir temos o resultado da execução deste código:

Agora a versão CSharp:

using System;
using System.Diagnostics;
using System.Linq;
using System.Threading;

namespace PLINQ_ForAll
{
    class Program
    {
        static void Main(string[] args)
        {
            Stopwatch sw = new Stopwatch();
            sw.Start();
            
            var fonte = Enumerable.Range(1, 10).AsParallel();

            var evenNums = from num in fonte
                                    where Calcular(num) % 2 == 0
                                   select num;

            evenNums.ForAll(ev => Console.WriteLine(string.Format("{0} na Thread {1}", ev,Thread.CurrentThread.GetHashCode())));

            sw.Stop();
            Console.WriteLine(string.Format("Concluído {0}", sw.Elapsed));
            Console.ReadKey();
        }
        private static int Calcular(int num)
        {
            Console.WriteLine(string.Format("Calculando {0} na Thread {1}", num, Thread.CurrentThread.GetHashCode()));
            Thread.Sleep(1000);
            return num;
        }
    }
}
 
 

Em consultas LINQ seqüenciais, a execução é adiada até que a consulta é enumerada ou em um laço foreach (For Each em Visual Basic) ou invocando um método como ToList <TSource>, ToArray <TSource> ou ToDictionary. Em PLINQ, você também pode usar foreach para executar a consulta e percorrer os resultados. No entanto, foreach não executa em paralelo e, portanto, requer que a saída de todas as tarefas paralelas sejam mescladas de volta para o segmento em que o laço é executado.

Em PLINQ, você pode usar foreach quando você deve preservar a ordenação final dos resultados da consulta, e também sempre que você está processando os resultados de uma forma serial, por exemplo, quando você está chamando Console.WriteLine para cada elemento. Para executar a consulta mais rápido quando a preservação da ordem não é necessária e quando o processamento dos resultados podem ser paralelizados, use o método ForAll<TSource> para executar a consulta LINQ. O método ForAll <TSource> não executa este passo final de mesclagem.

A ilustração abaixo mostra a diferença entre foreach e ForAll<TSource> na execução da consulta:

3 - O método de extensão AsSequential

Algumas operações exigem que os dados de origem sejam entregues de forma seqüencial. Os operadores de consulta ParallelEnumerable revertem para o modo sequencial automaticamente quando isto é necessário. Para operadores de consulta definidas pelo usuário e delegados de usuários que requerem uma execução sequencial, o PLINQ fornece o método AsSequential<Tsource>/AsSequecntial(Of TSource). Quando você usa AsSequential<TSource>/(Of TSource), todos os operadores subseqüentes na consulta são executados sequencialmente até que AsParallel seja chamado novamente.

O método AsSequential especifica que o resto da consulta deve ser executada em seqüência, como uma consulta LINQ não paralela. Ele converte a ParallelQuery(Of TSource)/<TSource) em um IEnumerable(Of T)/<T> para forçar uma avaliação sequencial da consulta.

A cláusula AsSequential é o oposto de AsParallel pois serializar partes da sua consulta LINQ.

Use AsParallel e AsSequential como portões para execução paralela e seqüencial, como mostrado no diagrama a seguir. Embora não seja comum, uma única consulta PLINQ pode ter várias cláusulas AsParallel e AsSequential.

Semelhante à cláusula AsParallel, AsSequential pode ser usada para prefixar um método LINQ. A partir dessa posição de consulta para a frente, o restante do da consulta LINQ é executada sequencialmente, pelo menos até que ele encontra uma cláusula AsParallel.

O diagrama a seguir ilustra uma consulta PLINQ usando as cláusulas AsParallel e AsSequential. As cláusulas Select e GroupBy são executadas em paralelo enquanto que cláusula OrderBy é executada sequêncialmente:

numeros.AsParallel.Select("consulta")
        .AsSequential().Where("filtro").OrderBy("ordenaçao")
        .AsParallel.GroupBy("agrupamento")

4 - O método de extensão AsOrdered

Em algumas consultas, um operador de consulta deve produzir resultados que preservam a ordem da seqüência de origem. O PLINQ fornece o operador AsOrdered para esta finalidade. AsOrdered é diferente da AsSequential<TSource>/(Of TSource). Uma seqüência AsOrdered ainda é processado em paralelo, mas seus resultados são armazenados e classificados.

Como a preservação da ordem geralmente envolve trabalho extra, uma seqüência AsOrdered pode ser processada mais lentamente do que o sequencia padrão AsUnordered<TSource>/(Of TSource). Se uma operação paralela ordenada é mais rápida ou não que a versão seqüencial da operação depende de muitos fatores que devem ser analisados com cuidado para avalizar se o custo da execução vale a pena.

Veja como exemplo o resultado da execução do código VB .NET abaixo:

Imports System.Threading

Module Module1

    Sub Main()

        Dim numeros As Integer() = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17}
        Dim resultado = From numero In numeros.AsParallel()
                                 Select numero * numero

        For Each valor In resultado
            Console.Write(valor.ToString + " ")
        Next
        Console.ReadKey()
    End Sub

End Module

Ficou surpreso por ver que o resultado não aparece na mesma ordem da lista de números de entrada ?

O PLINQ cria tarefas para a consulta, onde cada tarefa é agendada e colocada na fila do pool da thread. As tarefas são então agendadas no processador e executadas. Dessa forma o PLINQ não garante a ordem na qual as tarefas serão executadas.

Se você preferir um resultado ordenado pode usar a cláusula AsOrdered. O PLINQ ainda irá executar a operação no modo não ordenado para aumentar o desempenho e utilizar os recursos do processador, porém, os resultados são armazenados e então ordenados na conclusão da consulta.

A seguir o mesmo código usando a cláusula AsOrdered:

Imports System.Threading

Module Module1

    Sub Main()

        Dim numeros As Integer() = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17}
        Dim resultado = From numero In numeros.AsParallel().AsOrdered()
                                 Select numero * numero

        For Each valor In resultado
            Console.Write(valor.ToString + " ")
        Next
        Console.ReadKey()
    End Sub

End Module

A versão CSharp do código :

using System;
using System.Linq;

namespace PLINQ_AsOrdered
{
    class Program
    {
        static void Main(string[] args)
        {
             int[] numeros = {0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17};
             var resultado = from numero in numeros.AsParallel().AsOrdered()
                                    select numero * numero;

             foreach (var valor in resultado) 
             {
                 Console.Write(valor.ToString() + " ");
             }
             Console.ReadKey();
        }
    }
}

O Tratamento de exceções

Exceções não tratadas levantadas em uma consulta PLINQ são propagadas para a thread vinculada.

Como pode haver uma ou mais tarefas em paralelo, várias exceções não tratadas podem ocorrer simultaneamente.

Por essa razão, as exceções não tratadas são levantadas como AggregateException. Podemos enumerar a propriedade AggregateException.InnerExceptions para recuperar as exceções originais levantadas.

Vejamos um exemplo, onde vamos iterar sobre um array de inteiros. Cada elemento é então utilizado como um divisor em um cálculo. Alguns dos valores na matriz são zero e isso gera uma exceção de divisão por zero. Vamos capturar e exibir estas exceções.

Exemplo CSharp e o resultado da execução:

using System;
using System.Linq;

namespace PLINQ_Exceptions
{
    class Program
    {
        static void Main(string[] args)
        {
            int[] intArray = { 5, 1, 2, 7, 4, 0, 6, 2, 9, 0 };
            var resultados = intArray.AsParallel().Select(item => (int)1000 / (int)item);
            try
            {
                resultados.ForAll((item) => Console.WriteLine(item));
            }
            catch (AggregateException ex)
            {
                foreach (var inner in ex.InnerExceptions)
                {
                    Console.WriteLine(inner.Message);
                }
            }
            Console.WriteLine("Presse algo para sair");
            Console.ReadLine();
        }
    }
}

A versão VB .NET :

Module Module1

    Sub Main()

        Dim intArray As Integer() = {5, 1, 2, 7, 4, 0, 6, 2, 9, 0}
        Dim resultados = intArray.AsParallel().Select(Function(item) CInt(1000) \ CInt(item))
        Try
            resultados.ForAll(Function(item) Lista(item))
        Catch ex As AggregateException
            For Each excep In ex.InnerExceptions
                Console.WriteLine(excep.Message)
            Next
        End Try
        Console.WriteLine("Presse algo para sair")
        Console.ReadLine()

    End Sub

    Function Lista(item) As String
        Console.WriteLine(item)
        Return Nothing
    End Function
End Module
Note que foi preciso criar uma função para poder listar os valores usando o método
ForAll que não retorna um valor;

Se você não fazer isso vai obter a mensagem de erro: Expression does not produce a value

A versão C# mostrada abaixo funciona sem problemas:

resultados.ForAll((item) => Console.WriteLine(item));

mas se você usar os conversores a expressão acima será convertida para:

lista.ForEach(Function(item) Console.WriteLine(item))

não vai funcionar pois não retorna um valor.

A saída da aplicação mostra duas exceções. Porquê ?

Existe uma exceção para cada zero na lista de inteiros tratados. Cada um irá causar uma exceção em uma tarefa separada.

Cancelamento de tarefas

O PLINQ esta integrado com os tipos de cancelamento do .NET Framework 4, portanto, ao contrário das consultas LINQ to Objects seqüenciais as consultas PLINQ podem ser canceladas.

Para criar consultas PLINQ canceláveis, utilize o operador WithCancellation(Of TSource) na consulta e forneça uma instância de CancellationToken como argumento.

Quando a propriedade IsCancellationRequested no token é definida como true, O PLINQ vai perceber isso e parar o processamento de todas as threads lançando uma OperationCanceledException.

Nota:É possível que uma consulta PLINQ continue a processar alguns elementos depois que o token de cancelamento é definido.

Dessa forma podemos cancelar uma consulta PLINQ usando os objetos CancellationTokenSource e CancellationToken.

CancellationToken é uma propriedade da classe CancellationTokenSource. Usando a Cláusula WithCancellation, você fornece um token de cancelamento de uma consulta PLINQ.

Podemos , então, chamar CancellationToken.Cancel para cancelar a operação de consulta. Quando você cancelar a operação, ele lança uma exceção OperationCanceledException.

Aqui estão os passos básicos do modelo de cancelamento com PLINQ.

Abaixo temos um exemplo usando a linguagem C#:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;

namespace PLINQ_Cancel
{
    class Program
    {
        static void Main(string[] args)
        {
            IEnumerable<int> range = Enumerable.Range(1, 50000000);
            CancellationTokenSource tokenSource = new CancellationTokenSource();
            new Thread(() =>
            {
                Console.WriteLine("Iniciando o processamento...");

                try
                {
                    // consulta 
                    List<double> sqrts = range
                                 .AsParallel()
                                 .WithCancellation(tokenSource.Token)
                                 .Select(n => Math.Sqrt(n))
                                 .ToList();

                                 Console.WriteLine("Processamento encerrado {0} raízes quadrada ", sqrts.Count);
                }
                catch (OperationCanceledException)
                {
                    // Operação cancelada
                    Console.WriteLine("Cancelada");
                }
                catch (AggregateException aex)
                {
                    // captura pelo menos uma exceção
                    foreach (var inner in aex.InnerExceptions)
                    {
                        Console.WriteLine(inner.Message);
                    }
                }
            }).Start();
            Console.WriteLine("Pressione Enter para Parar");
            Console.ReadLine();
            tokenSource.Cancel();
        }
    }
}

No código acima, usamos o método Enumerable.Range para criar uma grande seqüência de números inteiros. Esta é a seqüência de origem para a consulta paralela que será executada. Inicializamos o token de cancelamento que irá ser utilizado para gerar o token que sinaliza o pedido de cancelamento da consulta LINQ paralelo.

A seguir criamos e iniciamos uma nova thread que executa a consulta. A consulta e o código que sinaliza o cancelamento estão em threads separadas, pois os blocos de consulta PLINQ bloqueiam a thread durante a execução.

Observe que a consulta está dentro de um bloco try/catch.

Uma vez que a thread é iniciada, o código mostra uma mensagem e espera que o usuário pressione Enter. Quando Enter for pressionado, o cancelamento é solicitado. Se a consulta já tiver terminado, a chamada para Cancelar não tem nenhum efeito e a saída é como mostra a figura abaixo:

Pegue o projeto completo aqui: PLINQ_VBNET.zip PLINQ_CSharp.zip

João 6:49 Vossos pais comeram o maná no deserto e morreram.

João 6:50 Este é o pão que desce do céu, para que o que dele comer não morra.

João 6:51 Eu sou o pão vivo que desceu do céu; se alguém comer deste pão, viverá para sempre; e o pão que eu darei pela vida do mundo é a minha carne.

Referências:


José Carlos Macoratti