Java può parallelizzare le operazioni di flusso per sfruttare i sistemi multi-core. Questo articolo fornisce una prospettiva e mostra come il flusso parallelo può migliorare le prestazioni con esempi appropriati.
Stream in Java
Uno stream in Java è una sequenza di oggetti rappresentata come un condotto di dati. Di solito ha una fonte dove si trovano i dati e una destinazione dove viene trasmesso. Nota che uno stream non è un repository; invece, opera su un'origine dati come un array o una raccolta. I bit intermedi nel passaggio sono in realtà chiamati stream. Durante il processo di trasmissione, il flusso di solito subisce una o più possibili trasformazioni, come il filtraggio o l'ordinamento, oppure può essere qualsiasi altro processo che opera sui dati. Questo personalizza i dati originali in una forma diversa, in genere, in base alle esigenze del programmatore. Pertanto, viene creato un nuovo flusso in base all'operazione applicata su di esso. Ad esempio, quando un flusso viene ordinato, ne risulta un nuovo flusso che produce un risultato che viene quindi ordinato. Ciò significa che i nuovi dati sono una copia trasformata dell'originale anziché essere nella forma originale.
Stream sequenziale
Qualsiasi operazione di flusso in Java, a meno che non sia esplicitamente specificata come parallela, viene elaborata in sequenza. Sono fondamentalmente flussi non paralleli che utilizzano un singolo thread per elaborare la loro pipeline. I flussi sequenziali non traggono mai vantaggio dal sistema multicore anche se il sistema sottostante può supportare l'esecuzione parallela. Cosa succede, ad esempio, quando applichiamo il multithreading per elaborare il flusso? Anche allora, opera su un singolo core alla volta. Tuttavia, può passare da un core all'altro a meno che non sia esplicitamente bloccato su un core specifico. Ad esempio, l'elaborazione in quattro thread diversi rispetto a quattro core diversi è ovviamente diversa laddove il primo non corrisponde al secondo. È del tutto possibile eseguire più thread in un unico ambiente core, ma l'elaborazione parallela è un genere completamente diverso. Un programma deve essere progettato per la programmazione parallela oltre all'esecuzione in un ambiente che lo supporti. Questo è il motivo per cui la programmazione parallela è un'arena complessa.
Proviamo un esempio per illustrare ulteriormente l'idea.
package org.mano.example; import java.util.Arrays; import java.util.List; public class Main2 { public static oid main(String[] args) { List<Integer> list=Arrays.asList(1,2,3,4,5,6,7,8,9); list.stream().forEach(System.out::println); System.out.println(); list.parallelStream().forEach(System.out::println); } }
Uscita
123456789 685973214
Questo esempio è un'illustrazione di q flusso sequenziale e q flusso parallelo in funzione. Il list.stream() funziona in sequenza su un singolo thread con println() operazione. list.parallelStream() , invece, viene elaborato in parallelo, sfruttando appieno l'ambiente multicore sottostante. L'aspetto interessante è nell'output del programma precedente. Nel caso di un flusso sequenziale, il contenuto dell'elenco viene stampato in una sequenza ordinata. L'output del flusso parallelo, invece, non è ordinato e la sequenza cambia ad ogni esecuzione del programma. Questo significa almeno una cosa:quell'invocazione di list.parallelStream() il metodo rende println istruzione operano in più thread, qualcosa che list.stream() fa in un unico thread.
Streaming parallelo
La motivazione principale alla base dell'utilizzo di un flusso parallelo è rendere l'elaborazione del flusso una parte della programmazione parallela, anche se l'intero programma potrebbe non essere parallelizzato. Il flusso parallelo sfrutta i processori multicore, con conseguente aumento sostanziale delle prestazioni. A differenza di qualsiasi programmazione parallela, sono complessi e soggetti a errori. Tuttavia, la libreria di flussi Java offre la possibilità di farlo facilmente e in modo affidabile. L'intero programma potrebbe non essere parallelizzato. ma almeno la parte che gestisce il flusso può essere parallelizzata. In realtà sono abbastanza semplici, nel senso che possiamo invocare alcuni metodi e il resto è curato. Ci sono un paio di modi per farlo. Uno di questi è ottenere un flusso parallelo invocando parallelStream() metodo definito da Raccolta . Un altro modo è invocare il parallel() metodo definito da BaseStream su un flusso sequenziale. Il flusso sequenziale è parallelizzato dall'invocazione. Si noti che la piattaforma sottostante deve supportare la programmazione parallela, ad esempio con un sistema multicore. Altrimenti, non ha senso nell'invocazione. Il flusso verrebbe elaborato in sequenza in tal caso, anche se abbiamo effettuato l'invocazione. Se l'invocazione viene effettuata su un flusso già parallelo, non fa nulla e restituisce semplicemente il flusso.
Per garantire che il risultato dell'elaborazione parallela applicata allo stream sia lo stesso ottenuto tramite l'elaborazione sequenziale, gli stream paralleli devono essere stateless, non interferenti e associativi.
Un rapido esempio
package org.mano.example; import java.util.Arrays; import java.util.Comparator; import java.util.List; import java.util.Optional; import java.util.stream.Collectors; public class Main { public static void main(String[] args) { List<Employee> employees = Arrays.asList( new Employee(1276, "FFF",2000.00), new Employee(7865, "AAA",1200.00), new Employee(4975, "DDD",3000.00), new Employee(4499, "CCC",1500.00), new Employee(9937, "GGG",2800.00), new Employee(5634, "HHH",1100.00), new Employee(9276, "BBB",3200.00), new Employee(6852, "EEE",3400.00)); System.out.println("Original List"); printList(employees); // Using sequential stream long start = System.currentTimeMillis(); List<Employee> sortedItems = employees.stream() .sorted(Comparator .comparing(Employee::getName)) .collect(Collectors.toList()); long end = System.currentTimeMillis(); System.out.println("sorted using sequential stream"); printList(sortedItems); System.out.println("Total the time taken process :" + (end - start) + " milisec."); // Using parallel stream start = System.currentTimeMillis(); List<Employee> anotherSortedItems = employees .parallelStream().sorted(Comparator .comparing(Employee::getName)) .collect(Collectors.toList()); end = System.currentTimeMillis(); System.out.println("sorted using parallel stream"); printList(anotherSortedItems); System.out.println("Total the time taken process :" + (end - start) + " milisec."); double totsal=employees.parallelStream() .map(e->e.getSalary()) .reduce(0.00,(a1,a2)->a1+a2); System.out.println("Total Salary expense: "+totsal); Optional<Employee> maxSal=employees.parallelStream() .reduce((Employee e1, Employee e2)-> e1.getSalary()<e2.getSalary()?e2:e1); if(maxSal.isPresent()) System.out.println(maxSal.get().toString()); } public static void printList(List<Employee> list) { for (Employee e : list) System.out.println(e.toString()); } } package org.mano.example; public class Employee { private int empid; private String name; private double salary; public Employee() { super(); } public Employee(int empid, String name, double salary) { super(); this.empid = empid; this.name = name; this.salary = salary; } public int getEmpid() { return empid; } public void setEmpid(int empid) { this.empid = empid; } public String getName() { return name; } public void setName(String name) { this.name = name; } public double getSalary() { return salary; } public void setSalary(double salary) { this.salary = salary; } @Override public String toString() { return "Employee [empid=" + empid + ", name=" + name + ", salary=" + salary + "]"; } }
Nel codice precedente, nota come abbiamo applicato l'ordinamento su uno stream utilizzando l'esecuzione sequenziale.
List<Employee> sortedItems = employees.stream() .sorted(Comparator .comparing(Employee::getName)) .collect(Collectors.toList());
e l'esecuzione parallela si ottiene modificando leggermente il codice.
List<Employee> anotherSortedItems = employees .parallelStream().sorted(Comparator .comparing(Employee::getName)) .collect(Collectors.toList());
Confronteremo anche il tempo di sistema per avere un'idea di quale parte del codice richiede più tempo. L'operazione parallela inizia una volta che il flusso parallelo viene ottenuto esplicitamente da parallelStream() metodo. C'è un altro metodo interessante, chiamato reduce() . Quando applichiamo questo metodo a un flusso parallelo, l'operazione può verificarsi in thread diversi.
Tuttavia, possiamo sempre passare da parallelo a sequenziale secondo necessità. Se vogliamo cambiare il flusso parallelo in sequenziale, possiamo farlo invocando il sequenziale() metodo specificato da BaseStream . Come abbiamo visto nel nostro primo programma, l'operazione eseguita sullo stream può essere ordinata o disordinata in base all'ordine degli elementi. Ciò significa che l'ordine dipende dall'origine dati. Questa, tuttavia, non è la situazione nel caso di flussi paralleli. Per aumentare le prestazioni, vengono elaborati in parallelo. Poiché ciò avviene senza alcuna sequenza, in cui ogni partizione del flusso viene elaborata indipendentemente dalle altre partizioni senza alcun coordinamento, la conseguenza è imprevedibilmente disordinata. Ma, se vogliamo eseguire specificamente un'operazione su ogni elemento nel flusso parallelo da ordinare, possiamo considerare il forEachOrdered() metodo, che è un'alternativa a forEach() metodo.
Conclusione
Le API di flusso fanno parte di Java da molto tempo, ma l'aggiunta del tweak dell'elaborazione parallela è molto gradita e allo stesso tempo una caratteristica piuttosto intrigante. Ciò è particolarmente vero perché le macchine moderne sono multicore e c'è uno stigma sul fatto che la progettazione della programmazione parallela sia complessa. Le API fornite da Java forniscono la capacità di incorporare una sfumatura di modifiche alla programmazione parallela in un programma Java che ha il design generale dell'esecuzione sequenziale. Questa è forse la parte migliore di questa funzione.