Google Analytics

воскресенье, 13 мая 2012 г.

О поддержке параллелизма в Java


Хотя статья и задумывалась как краткое описание основных возможностей Java в области параллелизма, но тема настолько обширна, что в результате получилось достаточно объёмное произведение. Целью статьи является описание общей картины поддержки параллелизма в Java, проблем которые эта поддержка призвана решить и некоторых деталей реализации. Думаю, эта информации будет полезна как новичкам, которые смогут сформировать целостное представление о данной теме, так и более опытным разработчикам незнакомым с деталями реализации механизмов связанных с параллелизмом в JVM HotSpot и JDK. Большинство из приведенной информации актуально для Java 6 и 7. Там где это неверно будет указанна требуемая версия. Все примеры взяты из, либо выполнялись на Debian GNU/Linux 6.0.4 amd64, Java(TM) SE Runtime Environment (build 1.7.0_04-b20), Java HotSpot(TM) 64-Bit Server VM (build 23.0-b21, mixed mode).

1 Проблемы порождаемые параллелизмом

Перед изучением механизмов работы с параллелизмом, которые предоставляет Java, необходимо обсудить проблемы характерные для подавляющего большинства современных платформ. Несмотря на существенные различия в платформах, проблемы в области параллелизма в большинстве своём общие. Главная причина существования этих проблем - разделение данных (memory sharing). Из этого следует, что если бы параллельно выполняющиеся потоки (под потоками подразумевается параллельно выполняющийся код) не взаимодействовали через общие участки памяти, то не было бы и подобных проблем. Но без взаимодействия с окружением сложно говорить о полезности таких программ. Необходимо заметить, что часть проблем актуальна не только для параллельно выполняющегося кода на физически разных процессорах или ядрах, но и для кода, который выполняется в среде вытесняющей многозадачности на однопроцессорных системах. Большинство проблем, о которых пойдёт речь в данном подразделе, являются следствием низкоуровневых операций. Это означает, что причины возникновения проблем лежат не на уровне семантики кода языка программирования, а на уровне компилятора, операционной системы или физического процессора. Это как раз то, что Joel Spolsky называет "текущей" абстракции (leaky abstraction [5]) - для понимания механизмов работы на уровне языка программирования необходимо спускаться ниже, вплоть до уровня процессора.

1.1 Атомарность операций (atomicity)

Атомарность есть неделимость. Ошибочное восприятие атомарности возникает преимущественно из-за некорректного понимания всех уровней абстракций и суждение об операции только поверхностно. Классический пример - инкремент. Во многих языках существует оператор инкремента, который выглядит как атомарная операция (если судить на уровне абстракции данного языка программирования). Действительно, на уровне языка, реализующего поддержку оператора инкремента, мы не имеем возможность разбить его на более мелкие операции (например в C или Java оператор ++). Но с точки зрения процессора это несколько операций. Прочитать данные из памяти, прибавить единицу к прочитанному значению, сохранить новое значение обратно. Проблема заключается в том, что когда несколько потоков выполнения пытаются инкрементировать переменную, возможен такой неудачный момент, при котором несколько потоков одновременно прочитают одно и тоже значение памяти, увеличат его на единицу и сохранят результат. Как следствие вычисления будут содержать ошибку - не учтены некоторые операции инкремента (смотри 1.2 Состояние гонки).

Для дальнейшего обсуждение нам понадобится ещё одно понятие тесно связанное с атомарностью. Действия содержащие несколько подопераций, для которых нужно обеспечивать атомарность, будем называть составными (compound actions). Типичный пример составного действия прочитать-изменить-записать (read-modify-rite) или проверить-затем-действовать (check-then-act).

1.2 Состояние гонки (race condition)

Ситуация состояние гонки возникает когда корректность вычислений зависит от порядка выполнения потоков во времени. Например как в случае с неатомарным инкрементированием итоговое значение зависит от того, как выполнялись потоки относительно друг друга. На загруженной системе можно получить корректный результат при небольшом количестве конкурирующих потоков выполняющих инкрементирование, но такой гарантии нет. Такая ситуация характерна для любых составных действий, которые не обеспечены должной атомарностью.

1.3 Видимость (visibility)

Если записать в переменную значение, то все последующие чтения из неё должны давать один и тот-же результат, при условии, что между операциями чтения не будут производится дополнительные операции записи. Это утверждение верно только для однопоточного выполнения. В случае многопоточного выполнения, потоки в которых производятся операции чтения могут видеть разные значения, не смотря на то, что операция записи уже выполнена (в другом потоке). При этом не существует никаких гарантий когда читающим потокам станет видно новое значение. Из этого вытекает проблема видимости.

Наиболее распространённой причиной проявления проблемы видимости является наличия нескольких ядер/процессоров с собственными кэшами в системе на которых выполняется код. Допустим первый поток на первом ядре/процессоре произвёл вычисления и сохранил значение, которое не попадает сразу же в общую память, а задерживается в кэше до момента его сброса. При этом второй поток, выполняющийся на втором ядре/процессоре, читает значение переменной из общей памяти и видит старое значение. Почему в процессорах не реализована обязательная синхронизация кэшей и памяти, которая бы не допускала подобных ситуаций? Потому что это привело бы к существенным проблемам с производительностью. Большую часть времени ядра/процессоры выполняют работу с различными участками памяти и поэтому отсутствует взаимовлияние. Процессоры поддерживают механизмы управления способом взаимодействия с памятью при выполнении операции, но они реализованы в виде отдельных инструкций.

1.4 Переупорядочивание команд (reordering)

Причиной возникновения проблем с видимостью могут служить не только наличие кэшей, но и переупорядочивание операций. Большинство компиляторов языков и процессоры гарантируют только сохранение порядка выполнения операций, которые влияют на результат выполнения текущего потока. Для параллельных потоков такая гарантия отсутствует. То есть если в одном потоке выполняется код который поочерёдно присваивает значения двум переменным a1 и a2, то второй поток может видеть старое значение a1 и новое a2. Это может произойти по причине изменения порядка операций присваивания с целью оптимизации (как на уровне компилятора, так и на уровне процессора). Именно для таких случаев в процессорах реализованы инструкции - барьеры памяти (memory barrier)(для x86 процессоров инструкции lfence, sfence, mfence [4]). Эти инструкции препятствуют изменению порядка следования операций через такие барьеры. Аналогичные механизмы есть на уровне компиляторов (например в gcc механизм __sync_synchronize, но в некоторых случаях явных барьеров не требуется, механизмы синхронизации могут обеспечивают подобную гарантию порядка).

2 Параллелизм в Java

2.1 Модель памяти Java (Java Memory Model)

Java разрабатывался как кроссплатформенный язык программирования. Следовательно, его реализации должны поддерживать множество платформ: windows и linux на x86 процессорах, solaris на sparc и прочие. Эти платформы достаточно сильно отличаются, начиная от механизмов работы с памятью на уровне операционных систем и заканчивая инструкциями которые поддерживают процессоры. По этой причине одна из задач, стоявших перед разработчиками Java, была создание абстракции памяти которая могла быть реализована на всех поддерживаемых платформах. Результатом этой работы и стала модель памяти Java.

Фундаментальной аксиомой модели памяти Java является правило происходит-прежде (happens-before). Данное правило гласит о том, что если два действия связанны отношением происходит-прежде, то результат действия происходящего прежде будет виден действию происходящему позже (независимо от того, в разных ли потоках выполняются эти действия). Для того чтобы дальнейшее повествование было лаконичным введём нотацию hb(x,y), которая обозначает, что действия x и y связанны отношением происходит-прежде. Далее представлены правила происходит-прежде гарантируемые моделью памяти Java:

  • Если действия x и y выполняются в одном потоке и x находится перед y, то для них верно hb(x,y);
  • Если действие x это выполнение конструктора класса, а действие y метода finalize, то для них верно hb(x,y);
  • Если действие x синхронизируется (либо имеем volatile запись, затем чтение) с последующим действием y, то для них верно hb(x,y);
  • Если hb(x,y) и hb(y,z), то hb(x,z) (Транзитивность).
2.2 Процессы и потоки

Все операционный системы, для которых имеется реализация OpenJDK, поддерживают механизмы процессов и потоков. Процесс это среда выполнения программы, которая создается операционной системой и включает в себя сегмент кода, сегмент данных и прочие ресурсы созданные во время выполнения (дескрипторы открытых файлов или сокетов и т.д.). Процессы могут взаимодействовать только через специальные механизмы предоставляемые операционной системой или реализованных в пространстве пользователя на базе предоставляемых. Например Unix-подобные операционные системы обеспечивают межпроцессное взаимодействие (IPC) через такие механизмы как: дескрипторы файлов, каналы (в том числе именованные), очереди (System V и POSIX) сообщений, разделяемая память, сетевые и локальные сокеты и другие [12]. Поток, подобно процессу, тоже является средой выполнение, но ключевыми отличиями служат общие сегмент данных и текущие ресурсы, разделяемые несколькими потоками одновременно. Как правило, эти различия обеспечивают потокам определённую легковесность относительно процессов. Тем самым уменьшая количество ресурсов необходимых на поддержания жизненного цикла потоков и упрощая их взаимодействие (общие сегмент данных и ресурсы). В языке Java, а именно в стандартной библиотеке, поддерживаются обе этих абстракции.

Процесс представлен в Java классом java.lang.Process, который соответствует исполняемому процессу операционной системы. Для запуска процесса необходимо выполнить команду операционной системы, которая запускает определенное приложение, аналогично тому, как это делается в командном интерпретаторе (консоли). Создать процесс, то есть запустить на выполнение внешнюю программу, можно несколькими способами java.lang.ProcessBuilder.start и java.lang.Runtime.exec. Для процесса потомка необходимо перенаправить ввод/вывод. По-умолчанию, потомок не подключается к текущему терминалу и может блокироваться при исчерпании буферов ввода/вывода предоставленных операционной системой. Представления процессов используются в Java для запуска внешних программ, отслеживания их состояния и прочего взаимодействия. Хотя процессы являлись одним из первых инструментом привносящим параллелизм, как правило, они плохо подходят для параллельных вычислений (имеются в виду тесно-связанные параллельные алгоритмы) в силу своей тяжеловесности и отсутствия простых механизмов взаимодействия.

Поддержка потоков в Java реализована в классе java.lang.Thread. Thread является основным инструментом в Java для реализации параллельных вычислений. Для создания нового потока необходимо инстанцировать объект класса Thread. Конструктор этого класса может принимать, в различных комбинациях, аргументы со следующими типами и названиями: java.lang.Runnable target, java.lang.String name, java.lang.ThreadGroup group и только в одном случае long stackSize. В качестве аргумента target передаётся объект класса реализующего интерфейс Runnable, содержащий метод run() с кодом, который будет запущен в отдельном потоке. Строковый аргумент name может указывать на имя потока (удобно при отладке, профилировании и логировании). Аргумент group содержит ссылку на объект группы потоков, который может использоваться для получения информации об и воздействия на потоки принадлежащие группе (например максимальный приоритет). И последний аргумент stackSize это размер стека потока. Рассмотрим пример запуска нескольких потоков:
public class Task implements Runnable {
    
    public static void main(String[] args) {
        Thread t1 = new Thread(new Task(), "Thread1");
        Thread t2 = new Thread(new Task(), "Thread2");
        
        t1.start();
        t2.start();
    }

    @Override
    public void run() {
        for (int i = 0; i < 100; i++) {
            System.out.println(
                    Thread.currentThread().getName() 
                    + " - " + i);
            System.out.flush();
        }
    }
}
В представленном выше коде запускаются на выполнения два потока с именами "Thread1" и "Thread2" соответственно. Каждый из них выполняет вывод на экран чисел от 0 до 99 с указанием имени потока. При выполнении этого кода на экран будет выводиться информация подобная следующей:
Thread2 - 0
Thread1 - 0
Thread1 - 1
Thread2 - 1
Thread2 - 2
Thread2 - 3
Thread2 - 4
, при этом порядок вывода цифр не детерминирован. Это связанно с тем, что не представляется возможным определить точные периоды, когда код потоков выполняется на процессоре. На этот порядок влияют как алгоритм и реализация планировщика задач операционной системы, так и текущая загруженность системы в купе с множеством других факторов.

Рассмотрим наиболее интересные и интенсивно используемые методы API потоков (методы deprecated не рассматриваются). Методы int Thread.getPriority() и void setPriority(int newPriority) возвращают и устанавливают приоритет потока соответственно. С допустимыми значениями приоритета можно определиться по статическим атрибутам Thread.MIN_PRIORITY, Thread.NORM_PRIORITY, Thread.MAX_PRIORITY для текущей реализации jdk7u4 это 1, 5 и 10. Потоку с большим приоритетом будет предоставляться больше процессорного времени. Для установки приоритета необходимо проверять допустимые права с помощью метода checkAccess. Как видно из примера, для запуска потока используется метода void Thread.start(). После вызова данного метода стартует новый поток и в нём запускается на выполнение метод run(). Любой кто заглянет в код класса Thread может увидеть, что он реализует интерфейс Runnable, метод run() которого вызывает метод run() объекта переданного в аргументе target. Возможно переопределить метод run в потомке класса Thread и использовать новый класс для создания потоков, но такой способ не будет являться идиоматическим. Механизм прерывания работы потока был переработан ещё во времена выхода Java 1.3, метод stop() помечен как deprecated. Причиной этому послужил неудачная архитектура с прерыванием потока извне. Такое прерывание могло привести к непредсказуемому поведению в случае когда прерываемый поток находился в состоянии удержания блокировок. После завершения потока все удерживаемые им блокировки освобождаются, но так как прерывание может произойти в любой момент, состояние защищаемое блокировками может остаться несогласованным. Поэтому работу по управлению прерыванием потока было решено переложить на плечи разработчика потока, который должен использовать следующие методы. boolean Thread.interrupted() - возвращает признак ожидания прерывания потоком, при этом сбрасывая его значение. boolean Thread.isInterrupted() - аналогично предыдущему методу возвращает признак ожидания прерывания, но не сбрасывает его. void Thread.interrupt() - устанавливает флаг прерывания потока, при этом если он находится в прерываемом блокирующемся методе одно из классов стандартной библиотеки, то такой метод должен бросить исключение InterruptedException. В остальных случаях разработчик сам должен заботиться о проверке состояния прерывания потока в долгоиграющих алгоритмах. Отзывчивость потока на сигнал прерывания будет зависеть от периодов через которые выполняется проверка состояния прерывания. Например:
@Override
public void run() {
    while (!Thread.currentThread().isInterrupted()) {
        //Какие-то недолгие вычисления 1
        if (Thread.currentThread().isInterrupted()) break;
        //Какие-то недолгие вычисления 2
    }
}
Если об этом не позаботиться приложение не сможет корректно завершаться, или невозможно будет реализовать отмену долгого действия, которое пользователь хочет прервать. Семейство переопределённых методов join блокирует текущий поток в ожидании завершения ожидаемого потока (данные методы являются блокирующимися с возможностью задания таймаута). Ещё один метод влияющий на выполнения потока - это void Thread.yield(). Он подсказывает планировщику, что можно освободить текущий квант процессорного времени. Поток, освобождающий текущий квант, получит процессорное время в следующий раз в соответствии с поведением планировщика задач. Где это может потребоваться? Например мы реализуем краткое ожидание какого то события: проверяем событие, если не наступило, то вызываем yield и так по кругу. При длительном ожидание такой подход не является эффективным, так как постоянно пробуждает поток ожидания. В таких случаях лучше использовать механизмы ожидания реализованные на базе механизмов операционной системы (например ожидание событий файловой системы в linux системные вызовы inotify). По поводу блокирующихся методов можно добавить, что все вызовы таких методов прерывают выполнение потока. В отличие от yield блокирующиеся методы паркуют поток делая его недоступным для планирования на выполнение до вызова обратного действия. И последние методы, которые мы обсудим, это boolean Thread.isDaemon() и void Thread.setDaemon(boolean on). Как видно из названия, данные методы взаимодействуют с признаком того, что поток является демоном. Установить этот признак можно только до запуска потока. Виртуальная машина Java завершает выполнение приложения когда остаются только потоки демоны. Такой признак можно установить для второстепенных потоков, которые выполняют сервисные или вспомогательные функции. Например поток опрашивает состояние сервера и получает информацию о его работе, которая выводится на экран пользователя (уже в другом потоке). Мы не можем выполнять эти действия в основном потоке событий, так как это приведет к существенным задержкам в отклике системы. В тоже время обособленное существование этого вспомогательного потока бесполезно.

Рассмотрим пример отслеживания потоков на уровне операционной системы (в нашем случае linux) и с помощью утилиты, входящей в состав jdk, jvisualvm. Будем отслеживать поведение потоков на основе выполнения класса Task, описанного выше. Для этого необходимо сделать небольшое изменение. Добавим следующий код в основной цикл в методе run().
try {
    Thread.sleep(1000);
} catch (InterruptedException e) {
    //Восстанавливаем статус прерывания потока
    Thread.currentThread().interrupt();
    break;
}
Этот код обеспечит задержку в одну секунду на каждой итерации цикла и позволит отследить наличие и состояние потоков. В противном случае приложение будет слишком быстро завершаться, что не позволит отследить его состояние. Запустим этот пример на выполнение и введём в командном интерпретаторе строку $ ps -eLo pid,lwp,nlwp,state,cmd | grep -E "(CMD)|(test.Task)"
  PID   LWP NLWP S CMD
 9866  9866   16 S /usr/bin/java -cp /путь test.Task
 9866  9868   16 S /usr/bin/java -cp /путь test.Task
 9866  9870   16 S /usr/bin/java -cp /путь test.Task
 ...Пропушено несколько строк
 9866  9885   16 S /usr/bin/java -cp /путь test.Task
 9866  9886   16 S /usr/bin/java -cp /путь test.Task
Из вывода видно: все потоки принадлежат одному процессу с идентификатором 9866; идентификаторы потоков указаны в столбце lwp (light weight process, в некоторых операционных системах потоки принято называть легковесными процессами, что достаточно точно передаёт их суть); в столбце S указано текущее состояние потока (в данном случае S - sleep, неудивительно ведь большую часть времени поток находится в ожидании); cmd это команда запуска процесса; умышленно пропущенное описание столбца nlwp указывает на количество потоков принадлежащих процессу. Почему же количество потоков равно 16? Ведь в примере запускается только 3 потока (2 явно, с помощью объектов типа Thread и 1 неявно, основной поток выполнения). Дело в том, что при запуске Java приложения запускается несколько служебных потоков. Каких? Будет видно чуть позже. По примеру использования утилиты ps для детектирования потоков можно заключить, что реализация поддержки потоков Java в linux опирается на механизмы потоков самой операционной системы.

Ещё один способ, с помощью которого можно получить представление о потоках запущенных в Java-приложении, утилита jvisualvm. Этот инструмент очень удобен и может быть использован для разных целей (профилирование, получение информации о состоянии jvm). После запуска утилиты необходимо подключиться к интересующему нас процессу Java-приложения и перейти на вкладку Threads. Одно из возможных отображений в виде временной линии представлено на изображении ниже.

Потоки Java-приложения в jvisualvm

Из изображения видно, что два потока (Thread1 и Thread2) запущенные из приложения находятся в состоянии Sleeping. Помимо них отображены служебные потоки: RMI, JMX, Finalizer и прочие. Из названия видно какие механизмы они обслуживают. 

2.3 Модификаторы final и volatile

Причины проблем видимости достаточно "просты" по своей природе (наличие кэшей и регистров процессора и др.), но наверное наименее интуитивно понимаемы для новичков. В Java существуют механизмы обеспечивающие гарантию видимости. В этой секции речь пойдёт о модификаторах полей класса final и volatile. Для затравки предлагаю взглянуть на классический пример нарушение видимости поля класса.
public class InfiniteLoop implements Runnable {

    private boolean flag = true;
        
    @Override
    public void run() {
        
        while (flag) {
            
        }
        System.out.println("finished");
        
    }
    
    public static void main(String[] args) 
            throws InterruptedException {
        
        InfiniteLoop loop = new InfiniteLoop();
        Thread t = new Thread(loop);
        t.start();
        Thread.sleep(1000);
        loop.flag = false;   
    }
}
Как долго будет выполняться метод main данного класса? Наивное предположение о продолжительности чуть более секунды не оправдается. Метод может выполняться "бесконечно". Согласно спецификации языка нет никаких гарантий на завершение выполнения данного кода. Причина этому проблема с видимостью поля flag. После того как дополнительный поток с циклом while запускается, происходит считывание из памяти значение поля flag. Значение этого поля может храниться в кэше и даже в регистре процессора, всё зависит от интерпретатора и JIT-компилятора виртуальной машины на которой выполняется код. При каждой итерации используется значение не из памяти, а значение из кэша или регистра. Поэтому когда основной поток изменяет состояние данного поля, новое значение не используется для проверки условия выхода из цикла. Как решить эту проблему - использовать модификатор volatile. Использование данного модификатора гарантирует, что после произведения записи в такое поле, все последующие (во времени) чтения из любых потоков будут видеть новое значение. Если пометить поле flag как volatile, то выполнение кода будет прекращаться через секунду с небольшим.

Декларирование поля как final дает гарантию, что после полной инициализации (завершение конструктора класса) все такие поля будут видимы всем потокам. Такая гарантия может существовать благодаря неизменности значений таких полей после их инициализации. Это позволяет создавать потокобезопасные немутабельные объекты. Но и здесь присутствует пара подводных камней. Первый - необходимо обратить внимание на то, что гарантия видимости даётся только после окончания полной инициализации. Если ссылка на объект будет опубликована из его конструктора, то такой гарантии не будет. Типичны пример некорректной публикации объекта:
public class Task implements Runnable {
    
    public final int value1;
    public final int value2;
    
    public Task(Evaluator evaluator) {
        value1 = 10;
        value2 = 20;
        
        evaluator.evaluate(this);
    }
}
Если объект evaluator обращается к полям value1 и value2 в другом потоке, то могут возникнуть проблемы с видимостью и он будет видеть значения по-умолчанию для данных полей. Второй - объявление поля со ссылкой на объект как final гарантирует неизменность только ссылки, а не самого объекта. Такой объект должен самостоятельно гарантировать потокобезопасность.

В заключении данной секции предлагаю рассмотреть пример работы модификатора volatile на уровне команд процессора. Во-первых это позволит опуститься на самый нижний уровень и увидеть реальные команды процессора стоящие за механизмом языка программирования. Во-вторых будет показана работа с дизассемблером jvm, существующего в виде отдельного плагина hsdis. Механизм volatile выбран в качестве примера не случайно, а по причине своей простоты. Фактически данный механизм реализуется путём добавляет одной команды процессора (на тестовой архитектуре x86_64). В качестве примера возьмём следующий код:
public class VolatileDisassemblerExample {    
    
    private volatile int value = 0;
    
    public void test() {
        value = 3;
    }

    public static void main(String[] args) {
        VolatileDisassemblerExample example = 
                new VolatileDisassemblerExample();
        for (int i = 0; i < 100000; i++) {
            example.test();
        }
    }
}
Метод, который мы будем дизассемблировать, называется test. В нём осуществляется присваивание константы 3 полю value. В методе main повторяется вызов метода тест 100000 раз для запуска механизма компиляции (механизм дизассемблера отказывается работать при наличии опции CompileThreshold). Будем запускать на исполнения два варианта этого класса, в первом модификатор volatile отсутствует, во втором присутствует.

Перед непосредственным запуском необходимо выполнить дополнительные действия. Плагин hsdis не поставляется вместе с бинарной сборкой JVM HotSpot. Поэтому необходимо скачать исходные тексты и скомпилировать его на вашей платформе. Так как плагин hsdis является частью OpenJDK, то и исходные тексты можно скачать из mercurial репозитория проекта [10]. В README файле плагина даны рекомендации по сборке. Необходимо будет загрузить дополнительные исходные тексты проекта binutils, от которого зависит данный плагин. При сборке плагина из последних версий исходников у меня возникла проблема с отсутствующей зависимостью на заголовочный файл sysdep.h (решение [9]). После сборки плагина необходимо поместить его в директорию, которая включена в пути поиска для динамически подключаемых библиотек [8]. На моей системе я поместил его в дерево файлов jdk в директорию с libjvm.so (/opt/jdk1.7.0_03/jre/lib/amd64/server/). При запуске на выполнения данного класса необходимо добавить следующие параметры jvm:
-XX:+UnlockDiagnosticVMOptions -XX:CompileCommand=print,*.test -XX:+DebugNonSafepoints
, параметр CompileCommand=print,*.test устанавливает фильтр для кода который необходимо дизассемблировать [8]. В данном случае будет выводиться ассемблерный код для всех методов test. Если не установить этот фильтр, то будет очень сложно разобраться в большом количестве информации на экране. При дизассемблировании двух вариантов тестового класса вывод будет почти полностью идентичным за исключением адресов памяти и участка выполнения присваивания полю value.
Вариант без volatile
...опущена часть вывода (несущественная в контексте осуждения)
  0x00007fbda30b8525: nop                       ;*synchronization entry
                                                ; - test.Task::test@-1 (line 8)
  0x00007fbda30b8526: movl   $0x3,0xc(%rsi)     ;*putfield value
                                                ; - test.Task::test@2 (line 8)
...опущена часть вывода (несущественная в контексте осуждения)

Вариант c volatile
...опущена часть вывода (несущественная в контексте осуждения)
  0x00007fa54580b525: nop
  0x00007fa54580b526: movl   $0x3,0xc(%rsi)
  0x00007fa54580b52d: lock addl $0x0,(%rsp)     ;*synchronization entry
                                                ; - test.Task::test@-1 (line 8)
...опущена часть вывода (несущественная в контексте осуждения)
Рассмотрим этот код подробно. Привыкшим к Intel-синтаксу ассемблера данный листинг покажется необычным, поэтому рекомендую прочитать отличия AT&T синтаксиса в документации по GAS (GNU Assembler) в binutils [11]. movl - перемещает константу 3 в участок памяти на который ссылается указатель в регистре rsi (64-битный расширение регистра esi или si). И вот оно различие, вариант с volatile имеет команду lock addl. addl добавляет 0 к значению типа long на которое ссылается указатель стека - регистр rsp. Данная инструкция была бы полностью бесполезна если не инструкция lock. Она обращает addl в атомарную инструкцию во время выполнение которой процессор имеет эксклюзивный доступ к разделяемой памяти посылая сигнал Lock (на самом деле не всегда так смотри [4]). Этот сигнал обеспечивает согласованную видимость нового значения в памяти для всех процессоров/ядер включая их кэши. Таким образом с помощью механизма дизассемблирования вы можете исследовать работу кода в jvm на уровне команд процессора. Нужно признать, что это достаточно увлекательное и познавательное занятие.

2.4 Блокировки

Мы рассмотрели механизм Java позволяющий гарантировать видимость операции записи, но проблема видимости не единственная в многопоточной среде. Как быть с атомарностью составных действий? В реальных приложениях, в том числе и многопоточных, очень часто необходимо выполнять изменение значения на основе его текущего состояния, либо обеспечивать корректность работы объекта в инвариант которого включены множество значений (состояние объекта заключено в множестве полей). Механизм блокировок позволяет обеспечивать атомарность составных действий, решать проблему видимости, возникающую как по причине кэширования, так и по причине переупорядочивания команд (не для всех видов блокировок). В Java поддержка блокировок обеспечена двумя видами реализаций: встроенные в синтаксис языка средства и блокировки реализованные в стандартной библиотеке классов. Эти механизмы блокировок имеют различные реализацию и поддерживаемую функциональность о них и пойдёт речь в этом разделе.

2.4.1 synchronized

Самый простой способ использования блокировок в Java - это так называемые мониторы или внутренние блокировки (monitor, intrinsic lock). Существует два варианта синтаксических форм обеспечения синхронизации выполняемого участка кода. В каждом из них используется ключевое слово synchronized. В первом, ключевое слово используется в качестве модификатора в объявлении метода, во втором в выделении блока кода. Оба варианта представлены ниже.
public synchronized void methodName() {
//Защищаемый код
}

synchronized (object) {
//Защищаемый код
}
Функциональность этих синтаксических форм практически идентична. Данная блокировка работает как mutex (mutual exclusion - взаимное исключение) гарантирующий, что только один поток в каждый момент времени может находиться внутри защищаемого блока. Остальные потоки при этом блокируются при попытки захвата блокировки. В случае если поток захвативший блокировку будет удерживать её вечно, то и ожидающие потоки будут заблокированы вечно. При этом для критических секций синхронизируемыми одним монитором работает правило происходит-прежде. Блокировка освобождается при любом выходе из блока, будь-то завершение всех инструкции или выброс исключения. Ключом идентифицирующим блокировку может выступать любой объект чей тип наследуется от Object, то есть любой объект Java за исключением простых типов данных. Для методов с модификатором synchronized в качестве ключа монитора выступает объект класса, которому принадлежит метод, или объект типа Class, в случае статического метода. Поддержка synchronized интегрирована не только на уровне синтаксиса языка, но и на уровне виртуальной машины. JVM поддерживает инструкции monitorenter и monitorexit, а также флаги ACC_SYNCHRONIZED для методов [2]. Рассмотрим класс Test.
public class Test {
    
    private int a = 0;

    public synchronized void synchronizedMethod() {
        a = 255;
    }
    
    public synchronized void methodWithSynchronizedBlock() {
        synchronized (this) {
            a = 255;
        }
    }
}
Байт-код jvm этого класса выглядит следующим образом (получен с помощью утилиты javap).
public class test.Test

//опущен вывод несущественный в контексте обсуждения

  public synchronized void synchronizedMethod();
    flags: ACC_PUBLIC, ACC_SYNCHRONIZED
//опущен вывод несущественный в контексте обсуждения
    Code:
      stack=2, locals=1, args_size=1
         0: aload_0       
         1: sipush        255
         4: putfield      #12         // Field a:I
         7: return        
//опущен вывод несущественный в контексте обсуждения

  public synchronized void methodWithSynchronizedBlock();
    flags: ACC_PUBLIC
//опущен вывод несущественный в контексте обсуждения
    Code:
      stack=2, locals=2, args_size=1
         0: aload_0       
         1: dup           
         2: astore_1      
         3: monitorenter  
         4: aload_0       
         5: sipush        255
         8: putfield      #12          // Field a:I
        11: aload_1       
        12: monitorexit   
        13: goto          19
        16: aload_1       
        17: monitorexit   
        18: athrow        
        19: return        
//опущен вывод несущественный в контексте обсуждения
}

Присваивание константы полю класса выполняется инструкциями sipush помещает 255 в вершину стека операндов, putfield присваивает значение из вершины стека полю #12 (ссылка на a). Этот код у обоих методов идентичен. Различие же заключается в том, что в первом методе установлен флаг ACC_SYNCHRONIZED, а во втором находятся явные инструкции захвата и освобождения блокировки. Первая инструкция monitorexit освобождают блокировку в случае последовательного выполнения, вторая в случае появления исключения. Несмотря на разный байт-код, функциональность выполняемых методов полностью идентична. В первом случае (флаг ACC_SYNCHRONIZED) - все действия обработки захвата и освобождения блокировки выполняются инструкциями вызова методов (invokedynamic, invokeinterface, invokespecial, invokestatic, invokevirtual), а во втором - весь код аналогичной функциональности помещается в тело метод.

С механизмом синхронизации мониторы тесно связан механизм условных переменных или переменных состояния (condition variables). Аналогичный механизм можно увидеть в библиотеки POSIX Threads на Unix-системах [12]. Предназначение данного механизма легко объяснить на примере следующей ситуации. Допустим у нас имеется алгоритм предназначенный для работы в многопоточной среде, который описывает выполнение двух видов затратных операция. При этом каждая операция второго вида должна выполняться на основе вычислений какой-то операции первого вида. Эффективная реализация такого алгоритма подразумевает использование пары видов потоков: первый - обеспечивает выполнения операций одного вида, второй - операций другого. Но как второй вид потоков будет узнавать о возможности запуска операций второго вида на основе результата вычисления операции первого вида? Возможным решением будет следующий порядок операций:
  1. Захватить блокировку на область разделяемой памяти в которой хранятся данные о результатах выполнения операций первого вида;
  2. В случае наличия необходимых данных получить один из результатов в неразделяемую память;
  3. Освободить блокировку на область разделяемой памяти;
  4. При наличии результата выполнения задачи первого вида запустить выполнение задачи второго вида;
  5. Вернуться к действию 1.
Соответственно потоки первого вида будут аналогично захватывать блокировку на разделяемую память и помещать в неё результаты вычислений. В данном подходе присутствует существенная проблема. Цикл действий из 5 в 1 может потреблять существенное количество ресурсов. Если время выполнение операций первого вида превышает время выполнения операций второго вида и одинаковом количестве потоков каждого вида, то цикл будет осуществлять бесполезный захват блокировок и потреблять ресурсы процессора. В такой ситуации хорошем решением будет использование переменных состояния. С их помощью возможно посылать сигнал потоку который заблокирован на его ожидании. Тем самым обеспечить требуемый механизм синхронизации потоков выполнения. При этом поток заблокированный на ожидании сигнала находится в пассивном состоянии. Любой объект чей тип наследуется от Object, обладает поддержкой переменных состояния, аналогично ситуации с мониторами. Для помещения текущего потока в состояние ожидания сигнала или для посылки сигнала ожидающим потокам необходимо чтобы поток выполняющий эти действия являлся владельцем монитора. Посылка сигнала осуществляется через методы notify и notifyAll различие которых заключается в количестве адресатов получателей сигнала. Блокировка текущего потока на ожидании сигнала выполняется вызовом метода из семейства методов wait, которые различаются способами установки максимального времени ожидания.

2.4.2 java.util.concurrent.locks

До версии Java 1.5 мониторы были единственным механизмом блокировок. С этой версии появился новый механизм - блокировки стандартной библиотеки, реализованные в виде набора классов в пакете java.util.concurrent.locks. В отличие от мониторов они не обладают специфичным синтаксисом. Их использование осуществляется аналогично любым другим объектам Java. Но они обеспечивают несравненную гибкость в отличие от старых методов синхронизации. К тому же они обладают уникальной реализацией не связанной с реализацией мониторов. То есть их стоит рассматривать как обособленный механизм, предлагающий альтернативный способ решения тех же проблем синхронизации. При этом они обеспечивают туже семантику операций с памятью, что и мониторы. Базовые операции с блокировками описывает интерфейс java.util.concurrent.locks.Lock. В отличие от мониторов функциональность блокировок из стандартной библиотеки не ограничивается методами безусловного захвата блокировки, которые блокируется на бесконечное время. Они могут похвастаться наличием методов обеспечивающих условные попытки захвата, в то числе и с указанием таймаута. Типичный шаблон (boilerplate) использования представлен следующим кодом:
lock.lock();
try {
    //Синхронизируемые действия
}
finally {
    lock.unlock();
}
,где lock это объект класса реализующего интерфейс Lock. Основной реализацией этого интерфейса является класс ReentrantLock. Этот класс представляет реентрантную блокировку с возможностью честного выбора следующего владельца (подробнее далее). Реентрантный (reentrant) означает, что текущий владелец (поток) блокировки не будет блокироваться при многократной попытки захвата, а наоборот такая попытка всегда будет удачной. При это необходимо на каждую операцию захвата выполнять операции освобождения. Кстати, монитор тоже является реентрантной блокировкой. Монитор и ReentrantLock обеспечивают одноранговый доступ к разделяемым данным. Все потоки пытающиеся войти в защищаемую область равны и внутри неё может находиться только один. Такое поведение не всегда эффективно. Допустим у нас есть несколько потоков изменения данных и несколько чтения с соотношением операций изменения/чтения равному 1/10. В случае использования общей блокировки все конкурирующие потоки будут сериализоваться (выполняться последовательно) на участке доступа к разделяемой памяти. Но потоки чтения вполне могут иметь одновременный доступ к данным, что позволит им выполняться параллельно. При этом поток изменяющий данные должен быть единственным владельцем защищаемого участка памяти. Такое поведение поддерживается классом ReentrantReadWriteLock реализующим интерфейс ReadWriteLock. Для пользователя объект данного класс выглядит как контейнер с двумя методами возвращающими ссылки на две блокировки чтения и записи. Использование этих блокировок осуществляется в соответствие с контрактом Lock. Блокировка записи ведёт себя аналогично ReentrantLock, а блокировка чтения позволяет нескольким потокам становиться её владельцем. Таким образом допускается для потоков одновременное владение несколькими блокировками чтения, но только одной блокировкой записи. Классы блокировок реализующих интерфейс Lock имеют ещё один метод newCondition. Он возвращает объект класса Condition. Это реализация механизма переменных состояния для блокировок пакета java.util.concurrent.locks. Применение его аналогично подобному механизму базирующемуся на мониторах.

2.4.3 Сравнение двух видов блокировок

В секции посвящённой механизму synchronized мы рассмотрели байт-код использования мониторов. Реализация этого механизма выполнена на стороне компилятора и виртуальной машины. В случае с классом ReentrantLock большая часть реализации выполнена на Java. Помимо технологий реализации, данные виды блокировок отличаются режимами работы. Это несомненно должно сказываться на их поведении. Давайте сравним эти механизмы.

Поддержка synchronized на уровне компилятора позволяет реализовать объединение блокировок (lock coarsening), что способствует улучшению производительности объединяя расположенные последовательно критические секции защищённые монитором. В добавок к этому HotSpot поддерживает еще одну оптимизацию мониторов - разные режимы работы, которые отражены на схеме.


Режимы работы монитора

Каждый объект в Java имеет заголовок (на схеме представлены прямоугольниками). При обычном режиме работы (справа на схеме), объект на котором не захвачена блокировка содержит 01 в последних двух битах заголовка. При попытке захвата блокировки на объекте слово заголовка и указатель на объект сохраняются в lock-записи в текущем стеке. Далее виртуальная машина пытается установить указатель на lock-запись в заголовок с помощью операции compare-and-swap. При достижении этого текущий поток будет являться владельцем блокировки. Последние два бита заголовка становятся 00. Если compare-and-swap операция проваливается из-за того, что блокировка уже захвачена, то виртуальная машина проверяет по lock-записи владельца блокировки. В случае совпадения владельца блокировки с текущим потоком при последней проверки выполнение безопасно продолжается. Для такого рекурсивно заблокированного объекта lock-запись инициализируется 0. Только если два различных потока пытаются захватить одну блокировку  thin-блокировка должна преобразоваться в тяжеловесную inflated-блокировку с управлением ожидающими потоками.

Thin-блокировка намного дешевле inflated-блокировки, но и она имеет существенный недостаток из-за compare-and-swap операции на многопроцессорных системах. В Java 6 и 7 этот недостаток решается с помощью biased-блокировок (слева на схеме). В biased-блокировках только первая попытка захвата использует compare-and-swap операцию, которая устанавливает id потока владельца в заголовок объекта, после чего объект становится "пристрастный" к конкретному потоку. Последующие захваты и освобождения biased-блокировки не требуют compare-and-swap операций. В случае появления дополнительного потока пытающегося захватить biased-блокировку её привязка к потоку пропадает и в действие вступает обычный алгоритм с thin и inflated блокировками.

Описанный механизм позволяет значительно ускорить выполнение критических участков при отсутствие конкуренции нескольких мониторов. Более подробно о реализации этого механизма можно узнать из комментариев в исходных кодах HotSpot (смотри директория hotspot/src/share/vm/runtime/ файлы synchronizer.cpp, synchronizer.hpp, biasedLocking.cpp, biasedLocking.hpp, а также в [13], [14])

В отличие от мониторов ReentrantLock - это библиотечный класс, который полностью реализован на Java. Справедливо будет заметить, что эта реализация всё-же опирается на нативные механизм платформы, но об этом позже. Ключевыми элементами в реализации ReentrantLock являются семейство внутренних классов Sync, которые в свою очередь наследуют существенную часть функционала от класса AbstractQueuedSynchronizer. Этот абстрактный класс служит в качестве фреймворка для реализации механизмов синхронизации с FIFO очередью ожидания. Рассмотрим реализацию нечестного режима ReentrantLock (рассмотрение честного режима оставим любознательному читателю в качестве самостоятельной работы). Вернее рассмотрим только реализацию методов lock и unlock (полное рассмотрение реализации ReentrantLock тянет на отдельную статью).

Вызов метода ReentrantLock.lock() делегирует вызов к ReentrantLock.Sync.lock(). В рассматриваемом случае поле ReentrantLock.sync является инстансом ReentrantLock.NonfairSync. Метод ReentrantLock.NonfairSync.lock пытается установить состояния объекта в 1 с помощью атомарной операции AbstractQueuedSynchronizer.compareAndSetState, если это удаётся значит блокировка захвачена и остаётся установить только владельца - текущий поток.
static final class NonfairSync extends Sync {
//опущен код несущественный в контексте обсуждения
    final void lock() {
        if (compareAndSetState(0, 1))
            setExclusiveOwnerThread(Thread.currentThread());
        else
            acquire(1);
    }
//опущен код несущественный в контексте обсуждения
}
В случае неудачной попытки захвата блокировки вызывается метод AbstractQueuedSynchronizer.acquire(1), это происходит когда блокировка уже захвачена либо другим потоком, либо текущим. В этом методе сначала вызывается метод tryAcquire, который в итоге приводит к вызову ReentrantLock.Sync.nonfairTryAcquire. В случае когда вызову tryAcquire не удаётся захватить блокировку, текущий поток ставится в очередь на ожидание вызовом acquireQueued(addWaiter(Node.EXCLUSIVE), arg)).
public abstract class AbstractQueuedSynchronizer {
//опущен код несущественный в контексте обсуждения
    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
//опущен код несущественный в контексте обсуждения
}
Метод ReentrantLock.Sync.nonfairTryAcquire в случае когда состояние равно 0 пытается захватить блокировку способом идентичным тому, как это происходит в утвердительной части оператора if в методе ReentrantLock.NonfairSync.lock. Таким образом происходит две идентичные попытки захвата блокировки через небольшой интервал на случай если блокировка уже освободилась после первой проверки. Когда блокировка уже захвачена, то осуществляется проверка владельца. Если текущий поток является владельцем блокировки, то увеличивается значение состояния, отражающее количество захватов блокировки текущим владельцем. При успешной попытки захвата блокировки или когда блокировка уже захвачена текущим потоком данный метод возвращает true, в обратном случае false.
abstract static class Sync extends AbstractQueuedSynchronizer {
//опущен код несущественный в контексте обсуждения
    final boolean nonfairTryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            if (compareAndSetState(0, acquires)) {
              setExclusiveOwnerThread(current);
              return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0) // overflow
              throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
//опущен код несущественный в контексте обсуждения
}
Когда ReentrantLock.Sync.nonfairTryAcquir возвращает false это означает, что уже захваченную блокировку пытается захватить другой поток. В этом случае управление переходит уже упомянутому коду acquireQueued(addWaiter(Node.EXCLUSIVE), arg)). Метод AbstractQueuedSynchronizer.addWaiter создает Node для текущего потока. Далее вызывается метод AbstractQueuedSynchronizer.acquireQueued с только что созданным узлом. В for-цикле без условия происходит проверка предшествующего узла на соответствие его головному. Если эта проверка успешна, то происходит попытка захвата блокировки уже знакомым нам методом tryAcquire. В случае успешного захвата блокировки происходит установка нового головного узла и возвращение из метода со значением false. Это в итоге формирует ложное значения для условного оператора в AbstractQueuedSynchronizer.acquire.
public abstract class AbstractQueuedSynchronizer {
//опущен код несущественный в контексте обсуждения
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
//опущен код несущественный в контексте обсуждения
}
Если же блокировка не была захвачена, то после проверки на допустимость парковки текущего потока выполняется это действие. Допустимость парковки зависит от состояния предыдущих узлов. Запаркованный поток недоступен для планировщика пока другой поток не вызовет для него unpark, либо прерывание потока. В итоге блокировка будет захвачена когда подойдёт очередь соответствующего потока.

Освобождение блокировки начинается с метода ReentrantLock.unlock. Далее управление переходит методу AbstractQueuedSynchronizer.release, который пытается освободить блокировку вызовом ReentrantLock.Sync.tryRelease, освобождающим блокировку если текущий поток является её владельцем и состояние с учётом освобождение станет равно 0. Затем происходит распарковка следующего ожидающего потока.
abstract static class Sync extends AbstractQueuedSynchronizer {
//опущен код несущественный в контексте обсуждения
    protected final boolean tryRelease(int releases) {
        int c = getState() - releases;
        if (Thread.currentThread() != getExclusiveOwnerThread())
            throw new IllegalMonitorStateException();
        boolean free = false;
        if (c == 0) {
            free = true;
            setExclusiveOwnerThread(null);
        }
        setState(c);
        return free;
    }
//опущен код несущественный в контексте обсуждения
}

public abstract class AbstractQueuedSynchronizer {
//опущен код несущественный в контексте обсуждения
    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
//опущен код несущественный в контексте обсуждения
}
Необходимо заметить, что реализация большого количества библиотечных классов требующая плотное взаимодействие с нижележащими уровнями программно-аппаратной платформы опирается на класс sun.misc.Unsafe, который не входит в стандартное Java API, но тем не менее хорошо документирован и иногда используется не только в реализации библиотеки (например он позволяет прямой доступ к адресному пространству). В описанном выше функционале Unsafe используется для реализации compare-and-swap операций и управлением парковкой потоков.

Из описание реализаций мониторов и ReentrantLock видно, что первый вид имеет преимущество в виде режима bias, который исключает дополнительные накладные расходы на compare-and-swap операции при захвате блокировки в отсутствие конкуренции. Тем не менее ReentrantLock обеспечивает сравнимую производительность в конкурентной среде.

Настало время "на практике" проверить производительность разных видов блокировок (на практике выделено кавычками потому как тест грубо измеряет средние накладные расходы на синхронизацию, смотри пояснение ниже). Наш примитивный тестовый фреймворк базируется на основе двух классов:
public class CompetingLockingTest {
    
    volatile boolean active;
    long value;    
    private Thread[] threads;    
    
    public CompetingLockingTest(
            Class<?> clazz, 
            int numberOfThreads) {        
        value = 0l;
        threads = new Thread[numberOfThreads];
        
        for (int i = 0; i < numberOfThreads; i++) {
            try {
                TestTask task = 
                        (TestTask) clazz
                        .newInstance();
                task.initialize(this);
                threads[i] = new Thread(task);                
            } catch (ReflectiveOperationException e) {
                e.printStackTrace();
            }
        }
    }

    public void startTest() {
        active = false;
        for (Thread t: threads) {
            t.start();
        }
        active = true;
    }
    
    public void stopTest() {
        active = false;
        for (Thread t: threads) {
            try {
                t.join();
            } catch (InterruptedException e) {
            }
        }
    }
    
    public void printResult() {
        System.out.println(
                "Number of threads: " + threads.length 
                + "; value: " + value);
    }
    
    public static void main(String[] args) {
        Class<?>[] classes = {
                NonBlocking.class,
                MonitorLock.class, 
                ExplicitLock.class, 
                ExplicitFairLock.class
                };
                
        //Выполнение набора тестов 
        //над каждой реализацией потокобезопасного действия
        for (Class<?> clazz: classes) {
            System.out.println(clazz.getName());
            for (int n = 1; n <= 50; n = (n == 1)? 10 : n + 10) {
                for (int j = 0; j < 10; j++) {
                    CompetingLockingTest test = 
                            new CompetingLockingTest(clazz, n);
                    test.startTest();
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    test.stopTest();
                    test.printResult();
                    test = null;
                    Runtime.getRuntime().gc();
                }
            }        
        }
    }
}

public abstract class TestTask implements Runnable {

    private CompetingLockingTest test;
    
    protected void initialize(
            CompetingLockingTest competingLockingTest
            ) {
        this.test = competingLockingTest;
    }

    protected abstract void doThreadSafeAction();
    
    protected void afterTest() {
        
    }
    
    protected void addValue(long argument) {
        test.value += argument;
    }
    
    protected void act() {
        test.value++;
    }
    
    @Override
    public void run() {
        //Синхронизация начала теста 
        // для всех потоков 
        while (!test.active) {}
        
        //Основной цикл действия
        while (test.active) {
            doThreadSafeAction();
        }
        afterTest();
    }
}
Класс CompetingLockingTest реализует функционал выполнения тестов на основе заданий. Каждое задание запускается на выполнение в несколько потоков от 1 до 50. Каждый тест в разрезе задание - количество потоков запускается 10 раз. Заданием является класс наследуемый от TestTask. Класс реализующий задание определяет абстрактный метод doThreadSafeAction, который должен вызывать метод act гарантируя потокобезопасность. Рассмотрим классы заданий защищающие выполнение метода act в многопоточной среде.
public class NonBlocking extends TestTask {

 private final static Object monitor =
    new Object();
 private long value = 0;
 
 @Override
 protected void doThreadSafeAction() {
  value++;
 }

 @Override
 protected void afterTest() {
 //сохраняем счетчик каждого потока 
 //в общем результате
  synchronized (monitor) {
   addValue(value);
  }
 }
}

public class MonitorLock extends TestTask {

 private final static Object monitor = 
    new Object(); 
 
 @Override
 protected void doThreadSafeAction() {
  synchronized (monitor) {
   act();
  }  
 }
}

public class ExplicitLock extends TestTask {

 private final static Lock reentrantLock = 
    new ReentrantLock();
 
 @Override
 protected void doThreadSafeAction() {
  reentrantLock.lock();
  try {
   act();
  }
  finally {
   reentrantLock.unlock();
  }
 }
}

public class ExplicitFairLock extends TestTask {

 private final static Lock reentrantLock = 
    new ReentrantLock(true);
 
 @Override
 protected void doThreadSafeAction() {
  reentrantLock.lock();
  try {
   act();
  }
  finally {
   reentrantLock.unlock();
  }
 }
}
Все классы выполняют одинаковую работу - инкрементирование счётчика. Класс NonBlocking выполняет эту работу без использование блокировок. Цель включение в тестирование задания реализующего неблокирующий подход показать сравнительную производительность на том же оборудовании. Остальные классы реализуют защиту операции инкрементирования общего поля посредством монитора, ReentrantLock в несправедливом и справедливом режимах. Результаты выполнения тестов представлены на графиках.
Результаты тестирования производительности разных видов блокировок (графики сглажены, ключевые точки 1, 10, 20, 30, 40, 50 потоков)

Прежде, чем мы обсудим результаты, я вынужден сделать некоторое замечание. Точность тестирования оставляет желать лучшего. Это связанно как с аппаратным обеспечением (например intel turbo boost), так и с реализацией теста (используются только те механизмы, которые уже описаны выше). К тому же тесты проводились на моей рабочей машине, со всеми вытекающими последствиями. В тестировании отдельно не учитываются различные режимы работы мониторов. По причине всего вышесказанного к результатам тестирования стоит относиться как достаточно грубому общему случаю накладных расходов от простейшей синхронизации. В различных частных случаях результат сравнения может сильно отличаться.

Рассмотрим график отражающий выполнение задания с неблокирующим алгоритмом. По нему видно, что производительность растёт до момента когда задание выполняется в 20 потоков. Что вполне оправдывает ожидание: чем больше загружен процессор, тем больше выполненная работа (в данном случае алгоритм полностью загружает процессор операцией инкремента). Далее производительность немного снижается по причине накладных расходов на планирование выполнения (scheduling) потоков и переключение между ними. Теперь рассмотрим графики заданий реализующих блокирующий алгоритм. Видно, что картина почти противоположная - максимальная производительность достигается при выполнение задания в одном потоке. Всему виной операции синхронизации, которые потребляют достаточно много ресурсов. Выполнение задания с неблокирующим алгоритмов производительнее от одного до двух порядков. Такое поведение характерно для всех видов блокировок. ReentrantLock в честном режиме безнадёжно уступает монитору и ReentrantLock в нечестном режиме. Использование же монитора и ReentrantLock в нечестном режиме достаточно близки по производительности, хотя первый имеет немного меньшие накладные расходы. Данный тест является в большей мере синтетическим, потому как он измеряет только ресурсы затрачиваемые на выполнение синхронизации, а фактическая работа (операция инкремента) практически нулевая. В реальности такое бывает не всегда. То есть накладные расходы на операции синхронизации могут играть меньшую роль по сравнению с временем затрачиваемым на основную работу. В таких случаях куда важнее проблема производительности возникающая при сериализации выполнения параллельных задач на синхронизируемых участках. Результат тестирование иллюстрирует выводы сделанные на основе исследования механизмов реализации и работы разных видов блокировок, но не стоит его рассматривать как истину в последней инстанции. Помните о недостатках тестирования описанных в предыдущем абзаце.

2.5 Атомарные операции

Использование блокирующихся алгоритмов не единственный способ справится с состоянием гонки при параллелизме. Более масштабируемый подход - неблокирующиеся алгоритмы, которые не синхронизируются блокировкой при выполнении критической секции, а, например, выполняют её ещё раз в случае изменения исходных данных (оптимистический контроль параллелизма). Плата за масштабируемость в таких алгоритмах - повышенная сложность. Для их эффективной реализации используются атомарные compare-and-swap операции поддерживаемые аппаратным обеспечением.

В Java поддержка таких операций добавлена начиная с версии 1.5. Эта поддержка реализована в виде классов в пакете java.util.concurrent.atomic. Классы из этого пакета реализуют compare-and-swap операцию над одним из типов: boolean, int, int[], long, long[], Object. Простейший пример неблокирующегося алгоритма можно посмотреть в реализации операции инкрементирования класса AtomicInteger.
public class AtomicInteger 
  extends Number implements java.io.Serializable {
//опущен код несущественный в контексте обсуждения
    public final int getAndIncrement() {
        for (;;) {
            int current = get();
            int next = current + 1;
            if (compareAndSet(current, next))
                return current;
        }
    }
//опущен код несущественный в контексте обсуждения
}
В приведенном коде происходит считывание текущего значения, увеличение его на единицу, попытка атомарной compare-and-swap операции. Если compare-and-swap операция не достигает успеха, потому что текущее значение уже изменилось, то происходит повторение всех действий. Почему это работает? Во-первых для значения атомарного класса гарантируется volatile видимость записи (убедиться в этом можно по реализации любого из классов Atomic). Во-вторых обеспечивается атомарность compareAndSet.

Реализация AtomicInteger.compareAndSet использует JNI метод Unsafe.compareAndSwapInt, которые реализован по разному в зависимости от платформы. Например для linux на x86 используется ассемблерная инструкция сравнения с lock префиксом, что в итоге выполняется на процессоре в виде lock cmpxchg [4].
AtomicInteger.java

public final boolean compareAndSet(int expect, int update) {
    return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}


Unsafe.java

public final native boolean compareAndSwapInt(Object o, long offset,
                                              int expected,
                                              int x);

hotspot/src/share/vm/prims/usafe.cpp

UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSwapInt(
    JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jint e, jint x))
  UnsafeWrapper("Unsafe_CompareAndSwapInt");
  oop p = JNIHandles::resolve(obj);
  jint* addr = (jint *) index_oop_from_field_offset_long(p, offset);
  return (jint)(Atomic::cmpxchg(x, addr, e)) == e;
UNSAFE_END

hotspot/src/os_cpu/linux_x86/vm/atomic_linux_x86.inline.hpp

// Adding a lock prefix to an instruction on MP machine
#define LOCK_IF_MP(mp) "cmp $0, " #mp "; je 1f; lock; 1: "

inline jint     Atomic::cmpxchg(
      jint exchange_value, volatile jint* dest, jint compare_value) {
  int mp = os::is_MP();
  __asm__ volatile (LOCK_IF_MP(%4) "cmpxchgl %1,(%3)"
                    : "=a" (exchange_value)
                    : "r" (exchange_value), "a" (compare_value), "r" (dest), "r" (mp)
                    : "cc", "memory");
  return exchange_value;
}
Подобную реализацию имеют все compare-and-swap методы в классах Atomic*.

2.6 Галопом по Европам или обзор высокоуровневых инструментов

Описание работы с параллелизмом в Java останется неполным если не упомянуть более высокоуровневые инструменты. Они позволяют решать проблемы синхронизации гораздо "проще" нежели базовые механизмы (если вообще уместно говорить о простоте в контексте параллелизма). Например в реализации тестового мини-фреймворка в секции 2.4.3 присутствует код отвечающий за синхронизацию запуска выполнения тестовых потоков, который можно было бы реализовать с использованием механизмов описываемых далее.

2.6.1 Коллекции безопасные для параллельного доступа

Коллекции одна из наиболее часто используемых частей Java библиотеки. Но обычные классы коллекций небезопасны при использовании в многопоточной среде. При решении этой проблемы можно действовать несколькими путями: самостоятельно поддерживать потокобезопасность с помощью механизмов синхронизации; использовать synchronized коллекции, что мало чем отличается от первого варианта; использовать специальные потокобезопасные реализации из пакета java.util.concurrent. Рассмотрим эти варианты по порядку.

Ничто не мешает использовать простые коллекции с дополнительными механизмами синхронизации для обеспечения потокобезопасности. Одним из недостатков такого подходя является наличие дополнительного кода, который необходимо распространять на все операции с коллекцией. Например:
List<Object> list = new LinkedList<Object>();
//потокобезопасное добавление элемента  
synchronized (list) {
 list.add(object);
}  

//потокобезопасное удаление элемента
synchronized (list) {
 list.remove(object);
}

//потокобезопасное итерирование по элементам
synchronized (list) {
 for (Object o: list) {
  /* 
   * Какие-либо действия 
   * c элементами списка
   */
 }
}
Частично побороть недостаток с дополнительным кодом синхронизации позволяет использование обёрток, которые реализуются в стандартной библиотеки с помощью статических методов класса Collections. Для каждого вида коллекции существует отдельный метод вида synchronizedНазваниеКоллекции. Такой метод принимает в качестве аргумента коллекцию соответствующего вида и возвращает объект обёртку на объект коллекции обеспечивающий синхронизацию.
List<Object> list = 
 Collections.synchronizedList(
   new LinkedList<Object>()
   );

//потокобезопасное добавление элемента
list.add(object);

/**
* Итерирование аналогично случаю 
* с самостоятельной синхронизацией.
* Это следствие контракта итерируемого объекта.
* Между вызовами hasNext и next на 
* коллекции должна удерживаться блокировка
* по этой причине не была создана 
* синхронизируемая обёртка для итератора.
*/

У этих подходов есть недостаток, который проявляет себя при высокой степени конкурентного доступа. А именно плохая производительность в виду глобальной блокировки на весь объект. Например, нам необходимо получить количество элементов в списке в момент итерирования по нему, но это невозможно сделать до окончания этого процесса. Или более печальный вариант - несколько потоков проходят по коллекции, в таком случае их выполнение будет сериализовано.

Начиная с версии 1.5 в Java появились новые реализации коллекций адаптированных для параллельного доступа, с каждой последующей версией их число увеличивается. Первыми появились ConcurrentHashMap, CopyOnWriteArrayList, CopyOnWriteArraySet и несколько классов очередей. Далее были добавлена поддержка деков, сортированных карт и множеств. Рассмотрим пару способов с помощью которых обеспечивается лучшая масштабируемость при параллельном доступе на примере двух классов ConcurrentHashMap и CopyOnWriteArrayList.

Класс ConcurrentHashMap достигает целей масштабирования в параллельной среде при помощи разбиения хэш-таблицы на несколько подтаблиц, каждая из которых защищается отдельной блокировкой (lock striping). Такие подтаблицы представлены внутренним классом Segment в котором реализована поддержка основных операций. Конечная структура ConcurrentHashMap имеет вид хэш-таблицы хэш-таблиц. Операции чтения из хэш-таблицы не используют блокировку благодаря свойству volatile. Операции записи производятся с блокировкой текущей подтаблицы. Говоря про чтение без блокировки я немного слукавил, на самом деле чтение из поля с блокировкой может произойти если в поле значения будет null в результате того, что компилятор переупорядочит инициализацию элементов HashEntry с его связыванием с таблицей. Подобное поведение допустимо согласно модель памяти Java, но неизвестно чтобы такое когда-либо происходило.
static final class Segment<K,V> 
    extends ReentrantLock implements Serializable {
//опущен код несущественный в контексте обсуждения
    V get(Object key, int hash) {
        if (count != 0) { // read-volatile
            HashEntry<K,V> e = getFirst(hash);
            while (e != null) {
                if (e.hash == hash && key.equals(e.key)) {
                    V v = e.value;
                    if (v != null)
                        return v;
                    return readValueUnderLock(e); // recheck
                }
                e = e.next;
            }
        }
        return null;
    }
//опущен код несущественный в контексте обсуждения
}
В отличие от HashMap ConcurrentHashMap имеет ряд специфических особенностей. Не поддерживается null ни для ключа, ни для значения (один из примеров почему смотри код выше). Конструктор может принимать дополнительный целочисленный параметр concurrencyLevel, который учитывается при формировании внутренних структур хэш-таблицы, тем самым повышая масштабируемость при параллелизме за счёт дополнительной памяти и процессорного времени. Рекомендуется чтобы этот параметр был приблизительно равен количеству одновременно пишущих в хэш таблицу потоков. Некоторые операции могут давать неожиданный результат когда несколько потоков одновременно работают с хэш-таблицей. Например операции size может вернуть значения, которое на момент получения уже будет неверно. Аналогично может произойти и с операцией clear. Такое поведение неожиданно только для тех, кто привык использовать HashMap в одном потоке.

Класс CopyOnWriteArrayList плохо масштабируется при наличие несколько писателей. Фактически в этом он аналогичен synchronized списку. Зато отлично масштабируется при большом количестве читателей. Реализация этого списка использует volatile поле в котором сохраняется массив. Произвольное действие по изменению списка захватывает общую блокировку и создаёт новую копию массива.
public class CopyOnWriteArrayList<E>
    implements List<E>, RandomAccess, 
    Cloneable, java.io.Serializable {
//опущен код несущественный в контексте обсуждения
    /** The lock protecting all mutators */
    transient final ReentrantLock lock = new ReentrantLock();

    /** The array, accessed only via getArray/setArray. */
    private volatile transient Object[] array;
    
    public E remove(int index) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            Object[] elements = getArray();
            int len = elements.length;
            Object oldValue = elements[index];
            int numMoved = len - index - 1;
            if (numMoved == 0)
                setArray(Arrays
                    .copyOf(elements, len - 1));
            else {
                Object[] newElements = 
                        new Object[len - 1];
                System.arraycopy(elements, 0, 
                    newElements, 0, index);
                System.arraycopy(elements, index + 1, 
                    newElements, index, numMoved);
                setArray(newElements);
            }
            return (E)oldValue;
        } finally {
            lock.unlock();
        }
    }
//опущен код несущественный в контексте обсуждения
}
Такое поведение неэффективно при больших количествах действий по изменению данных, потому что любое действия по изменению списка требует O(n) процессорного времени и столько же дополнительной памяти. Наиболее эффективный способ использования данного класса - это минимальное количество записи (по возможности массовые операции addAll, clear) в список и большое количество чтения из него.

2.6.2 Механизмы синхронизации

Механизмы синхронизации - это средства предназначенные для согласования параллельных потоков выполнения относительно друг друга. В секции 2.4 мы рассмотрели различные виды блокировок, которые являются механизмами синхронизации. Но основное предназначение блокировок - обеспечение атомарности составных действий вкупе с гарантией видимости, которые достигаются за счёт синхронизации выполнения действий. Если два составных действия согласовываются как взаимоисключающиеся в каждый конкретный момент времени (выполняется либо одно, либо другое, но не оба сразу), то они будут является атомарными относительно друг друга. То есть последующее действие всегда будет видеть целостный результат предыдущего и никогда частичный. Реализация блокировок согласно модели памяти Java обеспечивает гарантии видимости, а невозможность частичного перекрытия - гарантии атомарности составного действия. В этой секции мы рассмотрим такие механизмы синхронизации, чьё основное предназначение именно согласования параллельных потоков выполнения, а обеспечение атомарности и видимости может использоваться как побочный эффект. К таким механизмам относятся CountDownLatch, CyclicBarrier, Semaphore, BlockingQueue и другие.

Класс CountDownLatch реализует механизм синхронизации с помощью которого несколько потоков могут блокироваться в ожидании совершения некоторого множества событий. Множество событий отражается целым числом, а фиксация совершения события вызовом метода CountDownLatch.countDown. Ожидающие потоки блокируются одним из переопределенных методов await. Защёлку можно было использовать для синхронизации начала выполнения тестовых потоков из секции 2.4.3 Сравнение двух видов блокировок. В этом случае ожидалось бы наступление всего одного события - начало тестирования. После наступление события срабатывание защёлки она становится бесполезной, потому что нельзя установить новое состояние.

CyclicBarrier позволяет нескольким потоком ожидать друг друга в определенной точке выполнения. Этот класс часто используется в реализациях алгоритмов в которых присутствуют некие фиксированные повторяющиеся действия с необходимостью синхронизации в к какой-либо момент. Конструктор CyclicBarrier принимает в качестве аргумента целое число, означающее количество частей до выполнения которых вызов await будет блокироваться, и объект Runnable с действием которое будет вызываться после достижение барьера всеми потоками, но перед разблокировкой барьера.

Semaphore механизм синхронизации управляющий определенным количеством разрешений. При создании семафора в конструктор передаётся целое число характеризующее максимально количество доступных захватов семафора (методы acquire). По завершению действий ограниченных семафором его нужно освободить (методы release) подобно блокировкам. Семафоры часто используются для ограничения доступа к фиксированным ресурсам. Классический пример пул подключения к СУБД, перед получением объекта подключения семафор захватывается, когда объект подключения становится не нужен - семафор освобождается. При исчерпании количество подключений в пуле последующие попытки получения блокируются на этапе захвата семафора.

Интерфейс BlockingQueue расширяет интерфейс коллекции Queue блокирующимися методами которые возможно использовать в качестве механизмов синхронизации. BlockingQueue реализуют несколько классов среди которых ArrayBlockingQueue, ConcurrentLinkedQueue, LinkedBlockingQueue, PriorityBlockingQueue, различные классы деков (двусторонняя очередь). Каждая из реализаций имеет свои слабые и сильные стороны: вместимость очереди, расположение элементов при добавлении (приоритет), накладные расходы при параллельном доступе и прочее. Блокирующиеся очереди очень полезный инструмент для реализации паттерна производитель-потребитель (producer-consumer), потребитель блокируется на получение элемента из очереди пока производитель не добавит его туда, тем самым синхронизируя их выполнение.

Механизмы синхронизации Java не исчерпываются списком рассмотренных выше, поэтому обратитесь к документации по Java API и исходным текстам за подробным описанием.

2.6.3 Задачи с отложенным выполнением, исполнители

Достаточно часто требуется выполнять некоторые затратные действия. В большинстве приложений, особенно в интерактивных, не представляется возможным блокировать основной поток выполнения в ожидание завершения длительного действия. Поэтому затратные действия должны выполняться в отдельном потоке, что требует дополнительной синхронизации для взаимодействия с состоянием выполнения (отменять, проверять текущее состояние, получать результат выполнения). Интерфейс Future как раз описывает протокол подобного взаимодействия - методы cancel, get, isCancelled, isDone. С помощью метода get поток ожидающий результата выполнения блокируется (с таймаутом или без) до момента завершения выполнения действия представленного как Future.

Интерфейс Future в Java 6 имеет только одну универсальную реализацию в стандартной библиотеки - класс FutureTask. Вторая реализация - класс SwingWorker предназначена для выполнения длительных действий при использования GUI фреймворка Swing, чтобы не блокировать поток управления событиями. В Java 7 появились дополнительные реализации предназначенные для использования в Fork/Join об этом в следующей секции. FutureTask реализует поддержку асинхронного отменяемого выполнения, обеспечивает перехват исключений для соответствующего выполняемого метода объектов Runnable или Callable.
FutureTask<String> task = 
    new FutureTask<String>(new Callable<String>() {
    @Override
    public String call() throws Exception {
        //эмуляция длительного действия
        Thread.sleep(3000);                
        return "Task've been completed";
    }
});

new Thread(task).start();

System.out.println(task.get());
Выше представлен пример использования FutureTask. Этот код выведет сообщение на экран по истечении трёх секунд. Основной поток заблокирован на время ожидания завершения метода call объекта Callable, который выполняется в потоке создаваемом в строке new Thread(task).start(). Если взглянуть в исходный код FutureTask то можно увидеть, что реализация базируется на знакомом по секции 2.4.3 Сравнение двух видов блокировок AbstractQueuedSynchronizer.

В любом месте, где встречается прикладной код вида new Thread() происходит жёсткое связывание выполняемой задачи с политикой выполнения. В примере использования FutureTask политика выполнения сводится к созданию нового потока каждый раз когда требуется выполнение задачи. Если этот код выполняется один раз, то особых проблем такое поведение не принесёт. Но представьте, что будет если создавать поток на выполнение каждой задачи, которых может быть несколько тысяч. Создание потока не самая дешёвая операция и имеет определённые накладные расходы как по потреблению процессора, так и по памяти. Даже если удастся создать несколько тысяч потоков (ограничения системных ресурсов, jvm и прочее), то само их выполнение погрузит систему в ступор. Вместо выполнение полезной работы, львиная часть ресурсов будет потребляться на переключение контекста (context switching) и на планирование (scheduling). Причина этому отсутствия чёткой политики выполнения, которую к тому же невозможно ни задавать ни изменять при беспорядочном создании потоков в разных местах кода. Для решения этих проблем и предназначены исполнители (executors).

Интерфейс Executor описывает протокол простейшего исполнителя с единственным методом execute, который принимает для выполнения Runnable объект. Интерфейс ExecutorService развивает протокол исполнителя описанием более дюжины методов, среди которых управляющие завершением всех задач исполнителя, возвращающие текущее состояние, отправляющие задание на выполнение с возвращением Future объектов и других. В качестве примера рассмотрим одну из реализаций последнего интерфейса ThreadPoolExecutor. Этот класс реализует пул потоков, который настраивается через параметры конструктора (размер пула, время жизни свободных потоков, объект очереди задач на выполнение). ThreadPoolExecutor редко используется напрямую. Вместо этого он применяется при создании пулов с различной конфигурацией в статических фабричных методах класса Executors.
/**
* Создание исполнителя
* с фиксированным числом потоков в пуле
* которое на единицу больше количества 
* доступных процессоров/ядер 
*/  
ExecutorService executor = 
 Executors.newFixedThreadPool(
  Runtime.getRuntime()
   .availableProcessors() + 1
   );
/**
* Какая либо задача для выполнения.
* Например, поиск по файловой системе.
*/
Runnable task = 

/**
* Добавить задачу в пул
*/
executor.execute(task);
Листинг представленный выше - простейший пример использование исполнителя. Сколько бы раз не вызывался метод execute с новыми задачами в пуле будет поддерживаться только заданное количество потоков. Любой поток из пула может завершиться либо при выбросе исключения из исполняемого метода задачи, либо при ручном завершении с помощью методов ExecutorService.shutdown, ExecutorService.shutdownNow или установлении признака прерывания и завершения методов Runnable.run или Callable.call.

Напоследок необходимо заметить, что как и любой фреймворк - исполнители являются не универсальными инструментами. Их основное предназначение выполнение относительно обособленных задачи. Например серверные задачи по обработке лёгких запросов клиентов хорошо подходят для использования исполнителей. Обратный пример задачи с зависимостями, когда отправленный на выполнение код будет заблокирован до момента выполнение кода который ещё находится в очереди ожидания. Конечно подобные ситуации можно улучшить, например, с помощью задач с приоритетами и поддерживающие их очереди ожидания. Тем не менее для решения подобных типов задач использование исполнителей не лучшее решение.

2.6.4 Fork/Join в Java 7

В Java 7 был добавлен Fork/Join фреймворк позволяющий использовать многопроцессорность подобно другим исполнителям ExecutorService. Но этот фреймворк отличен от рассмотренных выше исполнителей в механизме распределения работы. Fork/Join использует work-stealing алгоритм распределения. Когда у рабочего потока иссякают задания он может "украсть" работу у занятого потока. В большей степени это достигается за счёт лучшей грануляции реализаций алгоритмов. Грануляция осуществляется путём рекурсивного выполнения отдельных частей вычислений и последующего объединение результатов с помощью механизмов предоставленных классами ForkJoinTask, RecursiveAction и RecursiveTask. Наглядные примеры реализации подобных алгоритмов представлены в Java API документации.
class SortTask extends RecursiveAction {
   final long[] array; final int lo; final int hi;
   SortTask(long[] array, int lo, int hi) {
     this.array = array; this.lo = lo; this.hi = hi;
   }
   protected void compute() {
     if (hi - lo < THRESHOLD)
       sequentiallySort(array, lo, hi);
     else {
       int mid = (lo + hi) >>> 1;
       invokeAll(new SortTask(array, lo, mid),
                 new SortTask(array, mid, hi));
       merge(array, lo, hi);
     }
   }
 }

class Fibonacci extends RecursiveTask<Integer> {
   final int n;
   Fibonacci(int n) { this.n = n; }
   Integer compute() {
     if (n <= 1)
        return n;
     Fibonacci f1 = new Fibonacci(n - 1);
     f1.fork();
     Fibonacci f2 = new Fibonacci(n - 2);
     return f2.compute() + f1.join();
   }
}
Запуск вычисления задач в пуле осуществляется подобно тому, как это делается в любом другом исполнителе. Например таким образом:
ForkJoinPool pool = new ForkJoinPool(5);
ForkJoinTask<Integer> result = pool.submit(new Fibonacci(30));
System.out.println(result.get());
Нужно подчеркнуть, что текущий пример с вычислением чисел Фибоначчи предназначен только для иллюстрации работы с фреймворком и имеет неэффективную реализацию (смотри динамического программирования, мемоизация).

В Fork/Join пуле задача ForkJoinTask запускается внутри потока ForkJoinWorkerThread, который в свою очередь связан с самим пулом. Возможность реализации методов подобных ForkJoinTask.fork обязана подобной структуре. То есть каждая задача выполняющаяся в пуле тесно с ним взаимодействует через такие методы.

2.7 Проблемы при реализации параллельных программ

Следствием увеличенной сложности параллельных программ является большое количество возможных проблем. Относительно низкоуровневая их часть уже рассмотрена выше. В данной секции обратимся к более высокоуровневым вопросам.

Взаимная блокировка (deadlock) возникает когда несколько потоков при попытки синхронизации по нескольким точкам блокируются в ожидании другу друга. Например один поток владеет одной точкой синхронизации, второй другой и каждый из них ожидает захвата точки которая уже захвачена. Это происходит по причине неконтролируемого порядка захвата точек синхронизации, что не всегда возможно в большом приложении. Данная проблема имеет несколько способов решения: автоматическое детектирование взаимной блокировки и принятие мер по её устранению, захватывать точку синхронизации без постоянной блокировки механизма захвата, жёстко гарантировать порядок захвата. Нужно учитывать, что эта проблема характерна для любого механизма синхронизации который может блокировать текущий поток навсегда. Рассмотрим простейший пример взаимной блокировки.
public class DeadLock {

    public static void main(String[] args) {
        final ReentrantLock lock1 = new ReentrantLock();
        final ReentrantLock lock2 = new ReentrantLock();
        
        Thread t = new Thread(new Runnable() {
            @Override
            public void run() {
                lock1.lock();
                delay(); 
                lock2.lock();
                try {
                    //критическая секция
                }
                finally {
                    lock1.unlock();
                    lock2.unlock();
                }
            }
        }, "slave");
        
        t.start();

        lock2.lock();
        delay();
        lock1.lock();
        try {
            //критическая секция
        }
        finally {
            lock2.unlock();
            lock1.unlock();
        }
        System.out.println("finished");
    }
    
    /**
     * Задержка на 1 секунду
     */
    public static void delay() {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
Выполнение кода представленного выше никогда не приведёт к выводу на экран строки "finished". Потоки main и slave будут заблокированы навсегда после того как первый захватит lock1, а второй lock2. В таком маленьком фрагменте кода не вызовет проблем детектировать взаимную блокировку просто посмотрев листинг. В реальности не всегда получается это сделать на этапе разработки (хотя возможно использовать статический анализатор кода), поэтому требуются методы детектирования таких ситуаций во время выполнения. Наиболее удобным методом является изучения дампа потоков, который можно получить с помощью утилиты jstack, либо послав сигнал выполняемому процессу (для консольного windows приложения Ctrl+Break, в linux сигнал SIGQUIT). Для нашего примера часть дампа потоков будет выглядеть подобно представленному ниже.
... опущена часть вывода несущественная в контексте обсуждения
Found one Java-level deadlock:
=============================
"slave":
  waiting for ownable synchronizer 0x2299c870, 
  (a java.util.concurrent.locks.ReentrantLock$NonfairSync),
  which is held by "main"
"main":
  waiting for ownable synchronizer 0x2299c848, 
  (a java.util.concurrent.locks.ReentrantLock$NonfairSync),
  which is held by "slave"

Java stack information for the threads listed above:
===================================================
"slave":
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x2299c870> 
        (a java.util.concurrent.locks.ReentrantLock$NonfairSync)
        at java.util.concurrent.locks.LockSupport.park(Unknown Source)
 ... опущена часть вывода несущественная в контексте обсуждения
        at java.lang.Thread.run(Unknown Source)
"main":
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x2299c848> 
        (a java.util.concurrent.locks.ReentrantLock$NonfairSync)
        at java.util.concurrent.locks.LockSupport.park(Unknown Source)
... опущена часть вывода несущественная в контексте обсуждения
        at kz.pnhz.test.sandbox.DeadLock.main(DeadLock.java:31)

Found 1 deadlock.
... опущена часть вывода несущественная в контексте обсуждения
Таким образом возможно с легкостью распознать причину взаимной блокировки. Хотя deadlock часто выявляется во время выполнения по причине возникновения это ошибка проектирования или реализации приложения и должна исправляться на этапе разработки.

Голодание (starvation) - проблема полной недоступности ресурсов определённому потоку в связи с потреблением их другими потоками. Может проявляется при некорректном планирования выполнения (например различие в приоритетах на определённых системах), либо более частый случай - ошибки при проектировании и реализации. Если у одного потока будет выставлен очень высокий приоритет, а у другого низкий, то первый может потреблять существенную все вычислительные ресурсы, в то время как второй не получит их вовсе. К счастью, такое поведение практически невозможно на большинстве платформ поддерживаемых Java. В то время как ошибки при проектировании и реализации приводящие к проблеме голодания вполне реальны. Например ошибка некорректной обработки условия выхода из цикла выполняющегося при захваченной блокировке, которую ожидает другой поток.
public class Starvation {

    public static void main(String[] args) {
        final ReentrantLock lock1 = new ReentrantLock();
        
        Thread t = new Thread(new Runnable() {
            @Override
            public void run() {
                delay();//реализация аналогична DeadLock
                lock1.lock();
                try {
                    //критическая секция
                    System.out.println("completed");
                }
                finally {
                    lock1.unlock();
                }
            }
        }, "slave");
        
        t.start();

        
        lock1.lock();
        try {
            while (!Thread.currentThread().isInterrupted()) {
                //условие выхода которое не наступает
            }
        }
        finally {
            lock1.unlock();
        }
        System.out.println("finished");
    }
}
Детектирование проблемы голодания не столь прямолинейно как в случае с взаимной блокировкой. Если ситуация подобна примеру, то можно легко вычислить не ожидающий поток с бесконечным циклом любой утилитой отображающей состояние процессов и потоков в операционной системе. Далее получить дамп потоков и определить место возникновения проблемы. Но если в такой ситуации внутри цикла поток будет блокироваться в ожидании события или просто отдавать рекомендацию планировщику (Thread.yield, Thread.sleep) уступить текущий квант, то определить её будет сложнее. При подобных условиях могут пригодиться как ручное наблюдение за стеками выполнения и состояниями потоков во времени, так и автоматическое профилирование приложения (может понадобиться длительное время сбора статистики).

Активная блокировка (livelock) - зацикливание алгоритма в результате взаимодействия состоянием, которое в свою очередь может независимо изменяться извне. Условие существование изменяемого извне состояния - параллелизм (необязательно многопоточность, например, взаимодействие процессов с файловой системой). При активной блокировки поток не блокируется, а продолжает попытки выполнить полезное действие, но в результате некорректной обработки ошибки при его выполнение повторяет его снова. Например завершение транзакции в СУБД, которая не может быть выполнена по причине нарушения в работе сервера, но некорректный код трактует ошибку как временную и пытается повторить транзакцию. Ещё один классический пример из области сетевых технологий. Если два или более передающих устройства одновременно пытаются передать данные, то обнаруживая коллизию и повторяя попытки передачи через равные интервалы времени ни одно устройство не сможет ничего передать, так как каждый раз разделяемая среда будет передавать сигналы нескольких источников. 

Помимо таких явных проблем блокирующих выполнение, как упомянутые выше в этой секции, существуют и менее критичные проблемы производительности и латентности (времени реакции). Наверное, наиболее распространённым их видом, связанным с производительностью, будет являться проблема со слабо гранулированными блокировками (coarse-grained lock) или их экстремальным вариантом - глобальными блокировками (global lock). Суть этой проблемы сводится к тому, что в результате больших размеров критической секции выполнение на этом участке фактически сериализуется (смотри 2.4 Блокировки). К проблемам латентности можно отнести ситуацию близкую к голоданию, когда какой-то поток практически не получает время центрального процессора или прочий ресурс, в результате чего растягивается время выполнения операции.

Вместо заключения

В статье описывается, хотя бы очень кратко, бОльшая часть основных вопросов связанных с поддержкой параллелизма В Java. Надеюсь, мне удалось передать наиболее важные аспекты этой поддержки и у читателя сложилась достаточно полная картина. С удовольствием приму обратную связь в виде критики или вопросов.

Дополнительные источники

[1] Java Language Specification Third Edition
[2] Java Virtual Machine Specification Java SE 7 Edition
[3] Java Concurrency in Practice Brian Goetz ISBN-13: 978-0321349606
[4] Intel® 64 and IA-32 Architectures Software Developer’s Manual Combined Volumes:1, 2A, 2B, 2C, 3A, 3B, and 3C
[5] http://www.joelonsoftware.com/articles/LeakyAbstractions.html
[6] http://en.wikipedia.org/wiki/Happened-before
[7] http://www.oracle.com/technetwork/java/javase/tech/vmoptions-jsp-140102.html
[8] https://wikis.oracle.com/display/HotSpotInternals/PrintAssembly
[9] http://stackoverflow.com/questions/9341083/how-to-use-xxunlockdiagnosticvmoptions-xxcompilecommand-print-option-with-j
[10] http://hg.openjdk.java.net/jdk7u/jdk7u/hotspot/src/share/tools/hsdis
[11] http://sourceware.org/binutils/docs-2.22/as/index.html
[12] Advanced UNIX Programming (2nd Edition) Marc J. Rochkind ISBN-13: 978-0131411548
[13] https://wikis.oracle.com/display/HotSpotInternals/Synchronization
[14] https://blogs.oracle.com/dave/entry/java_util_concurrent_reentrantlock_vs

7 комментариев:

  1. Отличный глубокий обзор.
    Очень рад появлению такого материала на русском языке!

    ОтветитьУдалить
  2. Спасибо! Очень полезная статья!

    ОтветитьУдалить
  3. Офигенная статья. То что доктор прописал! Так держать!

    ОтветитьУдалить
  4. Спасибо за статью! Очень доступно для понимания изложено!

    ОтветитьУдалить
  5. Спасибо за статью.

    ОтветитьУдалить
  6. Спасибо! Дам жене почитать!)

    ОтветитьУдалить