Многопоточное программирование

Запуск задач с помощью java.util.concurrent.ExecutorService

Облегчив с помощью интерфейса Callable создание задач для параллельного выполнения, пакет java.util.concurrent также берет на себя работу по запуску и остановке потоков. Вместо объекта Thread предлагается использовать объект типа ExecutorService, с помощью которого пользователь может просто поместить задачу в очередь на выполнение и ждать получения результата. Можно сказать, что ExecutorService – это значительно усовершенствованная реализация шаблона WorkerThread.

ExecutorService – это интерфейс, поэтому для выполнения задач используются его конкретные потомки, адаптированные под требования разрабатываемого приложения. Однако программисту нет необходимости создавать собственную реализацию ExecutorService, так как в пакете java.util.concurrent уже присутствуют различные варианты реализации ExecutorService. Доступ к ним можно получить через статические методы служебного класса Executors, метод которого newFixedThreadPool возвращает объект типа ExecutorService со встроенной поддержкой шаблона ThreadPool. Также в классе Executors есть и другие методы для создания объектов ExecutorService с различными свойствами.

Наибольший интерес в ExecutorService представляет метод submit, через который задача ставится в очередь на выполнение. На вход этот метод принимает объект типа Callable или Runnable, а возвращает некий параметризованный объект типа Future. Этот объект можно использовать для доступа к результату выполнения задачи, который будет возвращен из метода call соответствующего Callable-объекта. При этом через объект Future можно проверить, закончено ли уже выполнение задачи – с помощью метода isDone и через метод get получить доступ к результату или исключительной ситуации, если в процессе выполнения задачи произошла ошибка.

Таким образом, при запуске задач с помощью классов из пакета java.util.concurrent не требуется прибегать к низкоуровневой поточной функциональности класса Thread, достаточно создать объект типа ExecutorService с нужными свойствами и передать ему на исполнение задачу типа Callable. Впоследствии можно легко просмотреть результат выполнения этой задачи с помощью объекта Future, как показано в листинге 4.

Листинг 4. Запуск задачи с помощью классов пакета java.util.concurrent
1 public class ExecutorServiceSample {
2     public static void main(String[] args) {
3         //создать ExecutorService на базе пула из пяти потоков
4         ExecutorService es1 = Executors.newFixedThreadPool(5);
5         //поместить задачу в очередь на выполнение
6         Future<String> f1 = es1.submit(new CallableSample());        
7         while(!f1.isDone()) {
8             //подождать пока задача не выполнится
9         }
10        try {
11            //получить результат выполнения задачи
12            System.out.println("task has been completed : " + f1.get());
13        } catch (InterruptedException ie) {           
14            ie.printStackTrace(System.err);
15        } catch (ExecutionException ee) {
16            ee.printStackTrace(System.err);
17        }
18        es1.shutdown();
19    }
20}

Стоит обратить внимание на строку 18, где происходит остановка объекта ExecutorService с помощью метода shutdown. Дело в том, что потоки в объекте ExecutorService не останавливаются сами, как обычно, поэтому их необходимо явно остановить с помощью этого метода, при этом если в ExecutorService находятся невыполненные задачи, то потоки будут остановлены только, когда завершится последняя задача

Создание потока данных

Последнее обновление: 02.05.2018

Для создания потока данных можно применять различные методы. В качестве источника потока мы можем использовать коллекции. В частности, в JDK 8 в интерфейс
Collection, который реализуется всеми классами коллекций, были добавлены два метода для работы с потоками:

  • : возвращается поток данных из коллекции

  • : возвращается параллельный поток данных из коллекции

Так, рассмотрим пример с ArrayList:

import java.util.stream.Stream;
import java.util.*;
public class Program {

    public static void main(String[] args) {
		
		ArrayList<String> cities = new ArrayList<String>();
        Collections.addAll(cities, "Париж", "Лондон", "Мадрид");
		cities.stream() // получаем поток
			.filter(s->s.length()==6) // применяем фильтрацию по длине строки
			.forEach(s->System.out.println(s)); // выводим отфильтрованные строки на консоль
	}
}

Здесь с помощью вызова получаем поток, который использует данные из списка cities. С помощью каждой промежуточной операции,
которая применяется к потоку, мы также можем получить поток с учетом модификаций. Например, мы можем изменить предыдущий пример следующим образом:

ArrayList<String> cities = new ArrayList<String>();
Collections.addAll(cities, "Париж", "Лондон", "Мадрид");

Stream<String> citiesStream = cities.stream(); // получаем поток
citiesStream = citiesStream.filter(s->s.length()==6); // применяем фильтрацию по длине строки
citiesStream.forEach(s->System.out.println(s)); // выводим отфильтрованные строки на консоль

Важно, что после использования терминальных операций другие терминальные или промежуточные операции к этому же потоку не могут быть применены, поток уже употреблен. Например, в следующем случае мы получим ошибку:

citiesStream.forEach(s->System.out.println(s)); // терминальная операция употребляет поток
long number = citiesStream.count(); // здесь ошибка, так как поток уже употреблен
System.out.println(number);
citiesStream = citiesStream.filter(s->s.length()>5); // тоже нельзя, так как поток уже употреблен

Фактически жизненный цикл потока проходит следующие три стадии:

  1. Создание потока

  2. Применение к потоку ряда промежуточных операций

  3. Применение к потоку терминальной операции и получение результата

Кроме вышерассмотренных методов мы можем использовать еще ряд способов для создания потока данных. Один из таких способов представляет метод
Arrays.stream(T[] array), который создает поток данных из массива:

Stream<String> citiesStream = Arrays.stream(new String[]{"Париж", "Лондон", "Мадрид"}) ;
citiesStream.forEach(s->System.out.println(s)); // выводим все элементы массива

Для создания потоков IntStream, DoubleStream, LongStream можно использовать соответствующие перегруженные версии этого метода:

IntStream intStream = Arrays.stream(new int[]{1,2,4,5,7});
intStream.forEach(i->System.out.println(i));

LongStream longStream = Arrays.stream(new long[]{100,250,400,5843787,237});
longStream.forEach(l->System.out.println(l));

DoubleStream doubleStream = Arrays.stream(new double[] {3.4, 6.7, 9.5, 8.2345, 121});
doubleStream.forEach(d->System.out.println(d));

И еще один способ создания потока представляет статический метод of(T..values) класса Stream:

Stream<String> citiesStream =Stream.of("Париж", "Лондон", "Мадрид");
citiesStream.forEach(s->System.out.println(s));

// можно передать массив
String[] cities = {"Париж", "Лондон", "Мадрид"};
Stream<String> citiesStream2 =Stream.of(cities);
       
IntStream intStream = IntStream.of(1,2,4,5,7);
intStream.forEach(i->System.out.println(i));

LongStream longStream = LongStream.of(100,250,400,5843787,237);
longStream.forEach(l->System.out.println(l));

DoubleStream doubleStream = DoubleStream.of(3.4, 6.7, 9.5, 8.2345, 121);
doubleStream.forEach(d->System.out.println(d));

НазадВперед

Остались вопросы? Бесплатная консультация по телефону:

2 Класс Writer

Класс — это полный аналог класса , и снова только с одним отличием: он работает с символами, , вместо байт.

Это абстрактный класс: объекты класса создать нельзя. Его основная цель — быть единым классом-родителем для сотен классов-наследников и задать для них общие методы работы с символьными потоками.

Методы класса (и всех его классов-наследников):

Методы Описание
Записывает один символ (не ) в поток.
Записывает массив символов в поток
Записывает часть массива символов в поток
Записывает строку в поток
Записывает часть строки в поток
Записывает в поток все данные, которые хранятся в буфере
Закрывает поток

Методы очень похожи на методы класса , только работают с символами вместо байт.

Краткое описание методов:

Метод

Этот метод записывает в поток вывода один символ (не ). Переданное значение приводится к типу , два первых байта отбрасываются.

Метод

Записывает в поток вывода переданный массив символов.

Метод

Записывает в поток вывода часть переданного массива символов. Переменная задает номер первого элемента массива, — длина записываемого фрагмента.

Метод

Записывает в поток вывода переданную строку.

Метод

Записывает в поток вывода часть переданной строки: строку преобразуют в массив символов. Переменная задает номер первого элемента массива, — длина записываемого фрагмента.

Метод

Метод используется, чтобы принудительно записать в целевой поток данные, которые могут кэшироваться в текущем потоке. Актуально при использовании буферизации и/или нескольких объектах потоках, организованных в цепочку.

Метод

Записывает в целевой объект все незаписанные данные. Метод можно не вызывать, если вы используете -with-resources.

Пример программы, которая копирует текстовый файл:

Код Примечание
для чтения из файла для записи в файл
Буфер, в который будем считывать данные
Пока данные есть в потоке
Читаем данные в буфер
Записываем данные из буфера во второй поток

Класс

Есть еще один интересный класс-наследник от класса — это . В нем находится изменяемая строка — объект . И каждый раз, когда вы что-то «пишете» в объект , текст просто добавляется во внутренний буфер.

Пример:

Код Примечание
Создается целевой символьный поток
Строка пишется в буфер внутри
Строка пишется в буфер внутри
Преобразовываем содержимое объекта к строке

В данном случае класс — это, по сути, обертка над классом , однако класс — это наследник класса-потока , и он может использоваться в цепочках из объектов-потоков. Довольно полезное свойство на практике.

Выбор между интерфейсом java.lang.Runnable и классом java.lang.Thread

Как было показано ранее, при необходимости обеспечить параллельное выполнение нескольких задач у программиста есть возможность выбрать, как именно реализовать эти задачи: с помощью класса Thread или интерфейса Runnable. У каждого подхода есть свои преимущества и недостатки.

В качестве основного преимущества при наследовании класса Thread заявляется полный доступ ко всем функциональным возможностям потока на платформе Java. Главным недостатком же считается как раз сам факт наследования, так как в силу того, что в Java применяется только одиночное наследование, то наследование классу Thread автоматически закрывает возможность наследовать другим классам. Для классов, отвечающих за доменную область или бизнес-логику, это может стать серьезной проблемой. Правда негативное воздействие, возникающее из-за невозможности наследования, можно ослабить, если вместо него применить прием делегирования или соответствующие шаблоны проектирования.

Использование интерфейса Runnable по умолчанию лишено этого недостатка, но если реализовать задачу таким способом, то придется потратить дополнительные усилия на ее запуск. Как было показано в листинге 2, для запуска Runnable-задачи все равно потребуется объект Thread, также в этом случае исчезнет возможность прямого управления потоком из задачи. Хотя последнее ограничение можно обойти с помощью статических методов класса Thread (например, метод currentThread() возвращает ссылку на текущий поток).

Поэтому сделать однозначный вывод о превосходстве какого-либо подхода довольно сложно, и чаще всего в приложениях одновременно используются оба варианта, но для решения задач различной направленности. Считается, что наследование класса Thread следует применять только тогда, когда действительно необходимо создать «новый вид потока, который должен дополнить функциональность класса java.lang.Thread», и подобное решение применяется при разработке системного ПО, например, серверов приложений или инфраструктур. Использование интерфейса Runnable показано в случаях, когда просто «необходимо одновременно выполнить несколько задач» и не требуется вносить изменений в сам механизм многопоточности, поэтому в бизнес-ориентированных приложениях в основном используется вариант с интерфейсом Runnable.

Как выбирать производителя твердотельных накопителей

Что такое многопоточность в Java?

MULTITHREADING или многопоточность в Java — это процесс одновременного выполнения двух или более потоков с максимальной загрузкой ЦП. Многопоточные приложения выполняют одновременно два или более потоков. Следовательно, он также известен как параллелизм в Java. Каждый поток проходит параллельно друг другу.

Несколько потоков не выделяют отдельную область памяти, следовательно, они экономят память. Кроме того, переключение контекста между потоками занимает меньше времени.

Пример:

package demotest;

public class GuruThread1 implements Runnable
{
       public static void main(String[] args) {
        Thread guruThread1 = new Thread("Guru1");
        Thread guruThread2 = new Thread("Guru2");
        guruThread1.start();
        guruThread2.start();
        System.out.println("Thread names are following:");
        System.out.println(guruThread1.getName());
        System.out.println(guruThread2.getName());
    }
    @Override
    public void run() {
    }

}

Преимущества многопоточности:

  • Пользователи не заблокированы, потому что потоки независимы, и мы можем выполнять несколько операций за раз
  • Поскольку такие потоки независимы, другие потоки не будут затронуты, если один поток встретит исключение.

Что такое Single Thread?

Одиночная нить — это в основном легкая и самая маленькая единица обработки. Java использует потоки, используя «класс потоков».

Существует два типа потока — пользовательский поток и поток демона (потоки демона используются, когда мы хотим очистить приложение, и используются в фоновом режиме).

Когда приложение только начинается, создается пользовательский поток. Опубликовать это, мы можем создать много пользовательских потоков.

Пример с одной нитью:

package demotest;

public class GuruThread
{
       public static void main(String[] args) {
              System.out.println("Single Thread");
       }
}

Преимущества одной нити:

  • Уменьшает накладные расходы в приложении при выполнении одного потока в системе
  • Кроме того, это снижает стоимость обслуживания приложения.

Новые возможности пакета java.uti.concurrent

Платформа Java постоянно развивается, и поэтому к существующей функциональности все время добавляются новые возможности. Иногда новая функциональность берется из уже существующих сторонних библиотек, при этом речь не идет о банальном копировании, а скорее о переосмыслении и доработке уже существующих решений. Подобным способом в версию Java 5 был добавлен пакет java.util.concurrent, включающий в себя множество уже проверенных и хорошо зарекомендовавших себя приемов для параллельного выполнения задач (этот пакет — только одно из множества важных нововведений, представленных в Java 5).

В рамках этой статьи интерес представляют уже готовые к использованию реализации шаблонов WorkerThread и ThreadPool, а также еще один способ реализации задач для параллельного выполнения, кроме упоминавшихся класса Thread и интерфейса Runnable. Ещё в пакете java.util.concurrent находятся два подпакета: java.util.concurrent.locks и java.util.concurrent.atomic, с которыми тоже стоит ознакомиться, так как они значительно упрощают организацию взаимодействия между потоками и параллельного доступа к данным.

Настройка размера пула

Настраивая размер пула потоков, важно избежать двух ошибок: слишком мало потоков или слишком много потоков. К счастью, для большинства приложений спектр между слишком большим и слишком малым количеством потоков довольно широк

Если вы помните, есть два основных преимущества в организации поточной обработки сообщений в приложениях: возможность продолжения процесса во время ожидания медленных операций, таких, как I/O (ввод — вывод), и использование возможностей нескольких процессоров. В приложениях с ограничением по скорости вычислений, функционирующих на N-процессорной машине, добавление дополнительных потоков может улучшить пропускную способность, по мере того как количество потоков подходит к N, но добавление дополнительных потоков свыше N не оправдано. Действительно, слишком много потоков разрушают качество функционирования из-за дополнительных издержек переключения процессов

Оптимальный размер пула потоков зависит от количества доступных процессоров и природы задач в рабочей очереди. На N-процессорной системе для рабочей очереди, которая будет выполнять исключительно задачи с ограничением по скорости вычислений, вы достигните максимального использования CPU с пулом потоков, в котором содержится N или N+1 поток.

Для задач, которые могут ждать осуществления I/O (ввода — вывода) — например, задачи, считывающей HTTP-запрос из сокета – вам может понадобиться увеличение размера пула свыше количества доступных процессоров, потому, что не все потоки будут работать все время. Используя профилирование, вы можете оценить отношение времени ожидания (WT) ко времени обработки (ST) для типичного запроса. Если назвать это соотношение WT/ST, для N-процессорной системе вам понадобится примерно N*(1+WT/ST) потоков для полной загруженности процессоров.

Использование процессора – не единственный фактор, важный при настройке размера пула потоков. По мере возрастания пула потоков, можно столкнуться с ограничениями планировщика, доступной памяти, или других системных ресурсов, таких, как количество сокетов, дескрипторы открытого файла, или каналы связи базы данных

Нет необходимости писать свое

Даг Ли создал отличную открытую библиотеку утилит параллельности, , которая включает объекты-мьютексы, семафоры, коллекции, такие как очереди и хэш-таблицы, хорошо работающие при параллельном доступе, и несколько реализаций рабочей очереди. Класс из этого пакета — эффективная, широко использующаяся, правильная реализация пула потоков, основанного на рабочей очереди. Прежде чем пытаться писать собственное программное обеспечение, которое вполне может оказаться неправильным, вы можете рассмотреть использование некоторых утилит в . Ссылки и дополнительную информацию смотрите в разделе .

Библиотека также служит вдохновителем для JSR 166, рабочей группы Java Community Process (JCP), которая будет производить набор параллельных утилит для включения в библиотеку классов Java в пакете , и которая готовит выпуск Java Development Kit 1.5.

Закрытие потоков

Последнее обновление: 25.04.2018

При завершении работы с потоком его надо закрыть с помощью метода close(), который определен в интерфейсе
Closeable. Метод close имеет следующее определение:

void close() throws IOException

Этот интерфейс уже реализуется в классах InputStream и OutputStream, а через них и во всех классах потоков.

При закрытии потока освобождаются все выделенные для него ресурсы, например, файл. В случае, если поток окажется не закрыт, может происходить утечка памяти.

Есть два способа закрытия файла. Первый традиционный заключается в использовании блока . Например, считаем данные из файла:

import java.io.*;

public class Program {

    public static void main(String[] args) {
        
        FileInputStream fin=null;
        try
        {
            fin = new FileInputStream("C://SomeDir//notes.txt");
            
            int i=-1;
            while((i=fin.read())!=-1){
            
                System.out.print((char)i);
            }
        }
        catch(IOException ex){
            
            System.out.println(ex.getMessage());
        } 
        finally{
            
            try{
            
                if(fin!=null)
                    fin.close();
            }
            catch(IOException ex){
            
                System.out.println(ex.getMessage());
            }
        }  
    } 
}

Поскольку при открытии или считывании файла может произойти ошибка ввода-вывода, то код считывания помещается в блок try. И чтобы быть уверенным, что
поток в любом случае закроется, даже если при работе с ним возникнет ошибка, вызов метода помещается в блок .
И, так как метод также в случае ошибки может генерировать исключение IOException, то его вызов также помещается во вложенный блок

Начиная с Java 7 можно использовать еще один способ, который автоматически вызывает метод close. Этот способ заключается в использовании конструкции
try-with-resources (try-с-ресурсами). Данная конструкция работает с объектами, которые реализуют интерфейс .
Так как все классы потоков реализуют интерфейс , который в свою очередь наследуется от , то их также можно использовать в данной
конструкции

Итак, перепишем предыдущий пример с использованием конструкции try-with-resources:

import java.io.*;

public class Program {

    public static void main(String[] args) {
        
        try(FileInputStream fin=new FileInputStream("C://SomeDir//notes.txt"))
        {
            int i=-1;
            while((i=fin.read())!=-1){
            
                System.out.print((char)i);
            }   
        }
        catch(IOException ex){
            
            System.out.println(ex.getMessage());
        } 
    } 
}

Синтаксис конструкции следующий: . Данная конструкция также не исключает
использования блоков .

После окончания работы в блоке try у ресурса (в данном случае у объекта ) автоматически вызывается метод close().

Если нам надо использовать несколько потоков, которые после выполнения надо закрыть, то мы можем указать объекты потоков через точку с запятой:

try(FileInputStream fin=new FileInputStream("C://SomeDir//Hello.txt"); 
        FileOutputStream fos = new FileOutputStream("C://SomeDir//Hello2.txt"))
{
	//..................
}

НазадВперед

Пример создания потока. Наследуем класс Thread

Мы можем наследовать класс  для создания собственного класса Thread и переопределить метод . Тогда мы можем создать экземпляр этого класса и вызвать метод  для того, чтобы выполнить метод .

Вот простой пример того, как наследоваться от класса Thread:

Java

package ua.com.prologistic;

public class MyThread extends Thread {

public MyThread(String name) {
super(name);
}

@Override
public void run() {
System.out.println(«Стартуем наш поток » + Thread.currentThread().getName());
try {
Thread.sleep(1000);
// для примера будем выполнять обработку базы данных
doDBProcessing();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(«Заканчиваем наш поток » + Thread.currentThread().getName());
}
// метод псевдообработки базы данных
private void doDBProcessing() throws InterruptedException {
Thread.sleep(5000);
}

}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

packageua.com.prologistic;

publicclassMyThreadextendsThread{

publicMyThread(Stringname){

super(name);

}

@Override

publicvoidrun(){

System.out.println(«Стартуем наш поток «+Thread.currentThread().getName());

try{

Thread.sleep(1000);

// для примера будем выполнять обработку базы данных

doDBProcessing();

}catch(InterruptedExceptione){

e.printStackTrace();

}

System.out.println(«Заканчиваем наш поток «+Thread.currentThread().getName());

}

// метод псевдообработки базы данных

privatevoiddoDBProcessing()throwsInterruptedException{

Thread.sleep(5000);

}

}

Вот тестовая программа, показывающая наш поток в работе:

Java

package ua.com.prologistic;

public class ThreadRunExample {

public static void main(String[] args){
Thread t1 = new Thread(new HeavyWorkRunnable(), «t1»);
Thread t2 = new Thread(new HeavyWorkRunnable(), «t2»);
System.out.println(«Стартуем runnable потоки»);
t1.start();
t2.start();
System.out.println(«Runnable потоки в работе»);
Thread t3 = new MyThread(«t3»);
Thread t4 = new MyThread(«t4»);
System.out.println(«Стартуем наши кастомные потоки»);
t3.start();
t4.start();
System.out.println(«Кастомные потоки в работе»);

}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

packageua.com.prologistic;

publicclassThreadRunExample{

publicstaticvoidmain(Stringargs){

Thread t1=newThread(newHeavyWorkRunnable(),»t1″);

Thread t2=newThread(newHeavyWorkRunnable(),»t2″);

System.out.println(«Стартуем runnable потоки»);

t1.start();

t2.start();

System.out.println(«Runnable потоки в работе»);

Thread t3=newMyThread(«t3»);

Thread t4=newMyThread(«t4»);

System.out.println(«Стартуем наши кастомные потоки»);

t3.start();

t4.start();

System.out.println(«Кастомные потоки в работе»);

}

}

Заключение

Пул потока – полезный инструмент для организации серверов приложений. Он довольно простой по сути, но есть некоторые моменты, с которыми следует быть осторожными во время применения и использования, такие как взаимоблокировка, пробуксовка ресурсов, и сложности, связанные с и . Если вам потребуется пул потоков для вашего приложения, рассмотрите использование одного из классов из , такой как , вместо создания нового с нуля. Если вам нужно создать потоки для решения краткосрочных задач, вам определенно следует рассмотреть использование вместо этого пула потоков.

Похожие темы

  • Оригинал статьи: Thread pools and work queues.
  • Даг Ли, Параллельное программирование в Java: принципы дизайна и модели, второе издание — умело написанная книга о проблемных вопросах, связанных с многопоточным программированием в Java-приложениях.
  • Изучите пакет Дага Ли , который содержит множество полезных классов для построения эффективных параллельных приложений.
  • Формируется пакет на основе Java Community Process JSR 166, для включения в версию 1.5 JDK (комплект разработчика для Java).
  • Книга Аллена Холуб Укрощение потоков Java — интересное знакомство с проблемами программирования Java-потоков.
  • Конечно, есть некоторые проблемы в Java Thread API; прочитайте, что сделал бы Аллен Холуб, если бы был королем (developerWorks, октябрь 2000 г.)
  • Алекс Ройтер предлагает некоторые рекомендации для написания классов с многопоточной поддержкой (developerWorks, февраль 2001 г.)
  • Другие ресурсы Java можно найти в разделе технологий Java developerWorks.
Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *

Adblock
detector