□ Событие с ручным сбросом снимает блокировку со всех ожидающих его потоков.

□ При наличии нескольких потоков, ожидающих семафора, его появление снимает блокировку с потоков releaseCount (здесь releaseCount — это аргумент, переданный методу Release класса Semaphore).

То есть получается, что событие с автосбросом эквивалентно семафору, максимальное значение счетчика которого равно единице. Разница между ними состоит в том, что метод Set для события с автосбросом можно вызвать много раз, но каждый раз освобождаться будет всего один поток, в то время как многократный вызов метода Release каждый раз увеличивает на единицу внутренний счетчик семафора, давая возможность снять блокировку с большего количества потоков. Однако следует помнить, что при вызове метода Release для семафора, показание счетчика которого уже равно максимальному, генерируется исключение SemaphoreFullException.

При помощи семафоров можно повторно реализовать класс SimpleWaitLock таким образом, что нескольким потокам будет предоставляться одновременный доступ к ресурсу (что безопасно только при условии, что все потоки используют ресурс в режиме только для чтения):

public sealed class SimpleWaitLock : IDisposable { private Semaphore mAvailableResourcesj

public SimpleWaitLock(Int32 maximumConcurrentThreads) { m AvailableResources =

new Semaphore(maximumConcurrentThreads, maximumConcurrentThreads);

}

public void EnterQ {

// Ожидаем в ядре доступа к ресурсу и возвращаем управление m_AvailableResources.WaitOne();

>

public void Leave() {

// Этому потоку доступ больше не нужен; его может получить другой поток m_ AvailableResources.Release();

}

public void Dispose() { m_AvailableResources.Close(); }

}

Мьютексы

Мьютекс (mutex) предоставляет взаимно исключающую блокировку. Он функционирует аналогично объекту AutoResetEvent (или объекту Semaphore со значением счетчика 1), так как все три конструкции за один раз освобождают всего один ожидающий поток. Вот как выглядит класс Mutex:

public sealed class Mutex : WaitHandle { public MutexQ; public void ReleaseMutexQ;

}

Мьютексы снабжены дополнительной логикой, что делает их более сложными по сравнению с другими конструкциями. Во-первых, объекты Mutex сохраняют информацию о том, какие потоки ими владеют. Для этого они запрашивают идентификатор потока (Int32). Если поток вызывает метод ReleaseMutex, объект Mutex сначала убеждается, что это именно владеющий им поток. Если это не так, состояние объекта Mutex не меняется, а метод ReleaseMutex генерирует исключение System. ApplicationException. Если владеющий объектом Mutex поток по какой-то причине завершается, пробуждается другой поток, ожидающий мьютекса, и генерирует исключение System.Threading.AbandonedMutexException. Обычно это исключение остается необработанным, что приводит к завершению всего процесса. И это хорошо, ведь новый поток получает объект Mutex, старый владелец которого вполне мог быть финализирован перед завершением обновления защищаемых мьютексом данных. Если новый поток перехватит исключение AbandonedMutexException, он может попытаться получить доступ к поврежденным данным, что приведет к непредсказуемым результатам и проблемам безопасности.

Кроме того, объекты Mutex управляют рекурсивным счетчиком, указывающим, сколько раз поток-владелец уже владел объектом. Если поток владеет мьютексом в настоящий момент и ожидает его еще раз, рекурсивный счетчик увеличивается на единицу, и потоку разрешается продолжить выполнение. При вызове потоком метода

ReleaseMutex рекурсивный счетчик уменьшается на единицу. И только после того, как его значение достигнет 0, владельцем мьютекса может стать другой поток.

Большинство пользователей не в восторге от этой дополнительной логики. Проблема в том, что эти «возможности» имеют свою цену. Объекту Mutex требуется дополнительная память для хранения идентификатора потока и рекурсивного счетчика. И главное, код объекта Mutex должен управлять этой информацией, что тормозит блокировку. Если приложению понадобятся эти дополнительные возможности, его код сможет реализовать их самостоятельно, не встраивая в объект Mutex. Поэтому многие разработчики стараются обходиться без мьютексов.

Обычно рекурсивное блокирование имеет место, если запертый метод вызывает другой метод, также требующий блокирования. Это демонстрирует следующий код:

internal class SomeClass : IDisposable {

private readonly Mutex m_lock = new MutexQ;

public void MethodlQ { m_lock.WaitOne();

// Делаем что-то...

Method2(); // Метод Method2, рекурсивно получающий право на блокировку m_lock. ReleaseMutexQ;

}

public void Method2() { m_lock.WaitOne();

II Делаем что-то... m_lock. ReleaseMutexQ;

}

public void Dispose() { m_lock.Dispose(); }

}

В приведенном фрагменте код, использующий объект SomeClass, может вызвать метод Methodl, получающий объект Mutex. Этот код выполняет какую-то безопасную в отношении потоков операцию, а затем вызывает метод Method2, также выполняющий какую-то безопасную в отношении потоков операцию. Благодаря поддержке рекурсии объектом Mutex поток сначала дважды блокируется, а потом дважды разблокируется, и только после этого мьютекс может перейти к новому потоку. Если бы класс SomeClass использовал вместо мьютекса объект AutoReset Event, при вызове метода WaitOne поток был бы заблокирован.

Рекурсивное блокирование можно легко организовать при помощи объекта AutoResetEvent:

internal sealed class RecursiveAutoResetEvent : IDisposable { private AutoResetEvent mlock = new AutoResetEvent(true); private Int32 m_owningThreadId = в; private Int32 m_recursionCount = в;

public void Enter() {

// Получаем идентификатор вызывающего потока

Int32 currentThreadld = Thread.CurrentThread.ManagedThreadld;

// Если вызывающий поток блокируется,

// увеличиваем рекурсивный счетчик if (mowningThreadld == currentThreadld) { m_recursionCount++; return;

}

// Вызывающий поток не имеет блокировки, ожидаем m_lock.WaitOne();

// Теперь вызывающий поток блокируется, инициализируем // идентификатор этого потока и рекурсивный счетчик mowningThreadld = currentThreadld; mrecursionCount = 1;

}

public void Leave() {

// Если вызывающий поток не является владельцем блокировки,

// произошла ошибка

if (mowningThreadld != Thread.CurrentThread.ManagedThreadld) throw new InvalidOperationException();

// Вычитаем единицу из рекурсивного счетчика if (--m_recursionCount == 0) {

// Если рекурсивный счетчик равен 0,

// ни один поток не владеет блокировкой m_owningThreadId = 0;

m_lock.Set(); // Пробуждаем один ожидающий поток (если такие есть)

}

}

public void DisposeQ { m_lock.Dispose(); }

}

Хотя поведение класса RecursiveAutoResetEvent идентично классу Mutex, объект RecunsiveAutoResetEvent при попытке потока получить право на рекурсивное блокирование будет иметь значительно лучшую производительность, так как процедуры отслеживания потока-владельца и рекурсии теперь находятся в управляемом коде. Поток осуществляет переход в ядро Windows только при первом получении объекта AutoResetEvent или при окончательной передаче его другому потоку.

Глава 30. Гибридные конструкции синхронизации потоков

В главе 29 были рассмотрены простейшие конструкции синхронизации потоков пользовательского режима и режима ядра. На их основе можно строить более сложные конструкции синхронизации. Обычно конструкции пользовательского режима и режима ядра комбинируются, а то что при этом получается, я называю гибридными конструкциями синхронизации потоков (hybrid thread synchronization constructs). При отсутствии конкуренции потоков гибридные конструкции дают даже более высокую производительность, чем простейшие конструкции пользовательского режима. В них также применяются простейшие конструкции режима ядра, что позволяет избежать зацикливания (пустой траты процессорного времени) при попытке нескольких потоков одновременно получить доступ к процессору. Так как в большинстве приложений потоки редко конкурируют за доступ к конструкции, повышение производительности способствует ускорению работы приложения.

В этой главе рассматриваются вопросы создания гибридных конструкций на базе простейших конструкций. В частности, вы узнаете, какие именно гибридные конструкции поставляются вместе с FCL, познакомитесь с их поведением и получите представление о том, как правильно с ними работать. Упоминаются и созданные лично мною конструкции из библиотеки Wintellect Power Threading, которые доступны для загрузки (http://Wintellect.com/PowerThreading.aspx).

Ближе к концу главы я покажу, как минимизировать потребление ресурсов и повысить производительность с помощью безопасных в отношении потоков классов коллекций из FCL, являющихся альтернативой гибридным конструкциям. Ну и напоследок мы обсудим асинхронные конструкции синхронизации, позволяющие синхронизировать доступ к ресурсу без блокировки потоков — а следовательно, сокращающие потребление ресурсов при одновременном улучшении масштабируемости.

Простая гибридная блокировка

Начнем с демонстрации примера гибридной блокировки в рамках синхронизации потоков:

internal sealed class SimpleHybridLock : IDisposable {

// Int32 используется примитивными конструкциями // пользовательского режима (Interlocked-методы)

private Int32 mwaiters = 0;

// AutoResetEvent - примитивная конструкция режима ядра

private AutoResetEvent m_waiterLock = new AutoResetEvent(false);

public void EnterQ {

// Поток хочет получить блокировку if (Interlocked.Increment(ref m_waiters) == 1)

return; // Блокировка свободна, конкуренции нет, возвращаем управление

// Блокировка захвачена другим потоком (конкуренция),

// приходится ждать.

m_waiterLock.WaitOne(); // Значительное снижение производительности // Когда WaitOne возвращет управление, этот поток блокируется

>

public void Leave() {

// Этот поток освобождает блокировку if (Interlocked.Decrement(ref m_waiters) == 0)

return; // Другие потоки не заблокированы, возвращаем управление

// Другие потоки заблокированы, пробуждаем один из них m_waiterLock.Set(); // Значительное снижение производительности

}

public void Dispose() { m_waiterLock.Dispose(); }

}

Класс SimpleHybridLock содержит два поля: одно типа Int32, управляемое примитивными конструкциями пользовательского режима, и второе типа AutoResetEvent, являющееся примитивной конструкцией режима ядра. Чтобы

добиться более высокой производительности, при блокировании нужно пытаться использовать поле Int32 и по возможности не использовать поле AutoResetEvent. Поле AutoResetEvent создается при конструировании объекта SimpleHybridLock и является причиной значительного снижения производительности, особенно по сравнению с полем Int32. Далее в этой главе рассматривается еще одна гибридная конструкция (AutoResetEventSlim), которая не создает поля AutoResetEvent до возникновения конкуренции со стороны потоков, одновременно пытающихся добиться права на блокирование. Закрывающий поле AutoResetEvent метод Dispose также значительно снижает производительность.

Как ни заманчиво выглядит задача повышения производительности при создании и освобождении объекта SimpleHybridLock, лучше сосредоточиться на его методах Enter и Leave, вызываемых за время жизни объекта бессчетное количество раз. Давайте рассмотрим их подробно.

Первый вызвавший метод Enter поток заставляет метод Interlocked. Increment увеличить поле m_waiters на 1, сделав его значение равным единице. Поток обнаруживает, что прежде потоков, ожидающих права на данное блокирование, не было, поэтому после вызова метода Enter он возвращает управление. Здесь важно то, что поток очень быстро блокируется. Если теперь появится второй поток и вызовет метод Enter, он увеличит значение поля m_waiters уже до двух и обнаружит присутствие уже запертого потока, поэтому он блокируется, вызывая метод WaitOne, использующий поле AutoResetEvent. Вызов метода WaitOne заставит поток перейти в ядро Windows, и именно эта процедура приводит к значительному снижению производительности. Однако этот поток в любом случае должен прекратить свою работу, поэтому тот факт, что полная остановка требует лишних временных затрат, не является слишком критичным. В итоге поток блокируется и перестает впустую расходовать процессорное время из-за зацикливания. Именно для этого и нужен продемонстрированный еще в главе 29 метод Enter класса SimpleSpinLock.

Теперь перейдем к методу Leave. Его вызов потоком сопровождается вызовом метода Interlocked.Decrement, вычитающего из поля m_waiters единицу. Равенство этого поля нулю означает отсутствие заблокированных потоков внутри вызова метода Enter, поэтому поток, который вызвал метод Leave, может просто вернуть управление. И снова посмотрим, насколько быстро все это происходит. Освобождение блокировки означает, что поток вычитает единицу из поля Int32, выполняет быструю проверку условия и возвращает управление! В то же время, если вызывающий метод Leave поток обнаруживает отличное от единицы значение поля m_waiters, он узнает о наличии конкуренции и о том, что, по крайней мере, один заблокированный поток в ядре уже имеется. Поток, вызывающий метод Leave, должен разбудить один (и только один) из заблокированных потоков. Для этого он вызывает метод Set объекта AutoResetEvent. Данная операция ведет к снижению производительности, так как потоку приходится совершать переходы к ядру и обратно. К счастью, подобный переход осуществляется только при наличии конкуренции. Разумеется, объект AutoResetEvent гарантирует пробуждение только одного из заблокированных потоков; все прочие заблокированные объектом AutoResetEvent потоки останутся в таком состоянии, пока новый незаблокированный поток не вызовет метод Leave.

ПРИМЕЧАНИЕ

В действительности метод Leave может вызвать любой поток в любой момент времени, потому что метод Enter не сохраняет информацию о том, какому потоку удалось успешно запереться. Добавить для этого поле и управляющий код несложно, но это увеличивает объем памяти, необходимой для самого объекта блокирования, и снижает производительность выполнения методов Enter и Leave, ведь им в результате приходится работать с этим новым полем. Я предпочитаю иметь быстродействующее блокирование и корректно использующий его код. С информацией подобного рода не умеют работать ни события, ни семафоры; это могут делать только мьютексы.

Зацикливание, владение потоком и рекурсия

Так как переходы в ядро значительно снижают производительность, а потоки остаются запертыми очень короткое время, общую производительность приложения можно повысить, заставив поток перед переходом в режим ядра на некоторое время зациклиться в пользовательском режиме. Если в это время блокирование, которого ожидает поток, станет возможным, переход в режим ядра не понадобится.

Кроме того, некоторые варианты блокирования налагают ограничение, в соответствие с которым получить право на блокировку может только поток, снимающий блокировку. Другие варианты блокирования допускают рекурсивный захват ресурса потоком. Именно такое поведение демонстрирует объект Mutex[68]. С помощью нетривиальной логики можно реализовать гибридное блокирование, предполагающее одновременно зацикливание, владение потоком и рекурсию. Вот пример подобного кода:

internal sealed class AnotherHybridLock : IDisposable {

// Int32 используется примитивом в пользовательском режиме // (методы Interlocked)

private Int32 mwaiters = 0;

11 AutoResetEvent - примитивная конструкция режима ядра private AutoResetEvent m_waiterLock = new AutoResetEvent(false);

// Это поле контролирует зацикливание с целью поднять производительность private Int32 m_spincount = 4000; // Произвольно выбранное значение

// Эти поля указывают, какой поток и сколько раз блокируется private Int32 m_owningThreadId = 0, m_recursion = 0;

public void EnterQ {

// Если вызывающий поток уже захватил блокировку, увеличим рекурсивный

// счетчик на единицу и вернем управление

Int32 threadld = Thread.CurrentThread.ManagedThreadld;

If (threadld == m_owningThreadId) { m_recursion++; return; }

// Вызывающий поток не захватил блокировку, пытаемся получить ее SpinWait spinwait = new SpinWaitQ;

for (Int32 splnCount = 0; splnCount < m_spincount; spinCount++) {

// Если блокирование возможно, этот поток блокируется // Задаем некоторое состояние и возвращаем управление If (Interlocked.CompareExchange(

ref m_waiters, 1, 0) == 0) goto GotLock;

// Даем остальным потокам шанс выполниться // в надежде на снятие блокировки spinwait. SpinOnceQ;

}

// Зацикливание завершено, а блокировка не снята,

// пытаемся еще раз

if (Interlocked.Increment(ref m_waiters) > 1) {

// Остальные потоки заблокированы

// и этот также должен быть заблокирован

m_waiterLock.WaitOneQ; // Ожидаем возможности блокирования;

// производительность падает

// Проснувшись, этот поток получает право на блокирование // Задаем некоторое состояние и возвращаем управление

}

GotLock:

// Когда поток блокируется, записываем его идентификатор // и указываем, что он получил право на блокирование впервые m_owningThreadId = threadld; m_recursion = 1;

}

public void LeaveQ {

// Если вызывающий поток не заперт, ошибка Int32 threadld = Thread.CurrentThread.ManagedThreadld; if (threadld != mowningThreadld) throw new SynchronizationLockException(

"Lock not owned by calling thread");

// Уменьшаем на единицу рекурсивный счетчик. Если поток все еще // заперт, просто возвращаем управление if (--mrecursion > 0) return;

mowningThreadld = 0; // Запертых потоков больше нет

// Если нет других заблокированных потоков, возвращаем управление if (Interlocked.Decrement(ref mwaiters) == 0) return;

// Остальные потоки заблокированы, пробуждаем один из них m_waiterLock.Set(); // Значительное падение производительности

}

public void Dispose() { m_waiterLock.Dispose(); }

}

Как видите, оснащение кода блокирования дополнительной логикой увеличивает количество имеющихся полей, а значит, и потребление памяти. Код, который должен выполняться, становится сложнее, что также снижает производительность

         
 


блокирования. В главе 29 сравнивалась производительность конструкции, где увеличивалось на единицу значение типа Int32 без блокирования, а также примитивной конструкции пользовательского режима и конструкции режима ядра. Я воспроизведу здесь результаты теста, добавив к ним результаты использования классов SimpleHybridlock и AnotherHybridLock. Вот они от самого быстрого к самому медленному:

Стоит заметить, что блокировка AnotherHybridLock отнимает в два раза больше времени, чем SimpleHybridLock. Это обусловлено дополнительной логикой и проверкой ошибок, необходимой для управления владением потоком и рекурсией. Как видите, на производительности отрицательно сказывается любая логика, добавляемая в код блокирования.

Гибридные конструкции в FCL

В FCL существует множество гибридных конструкций, использующих изощренную логику, которая должна удержать потоки в пользовательском режиме, повышая производительность приложения. В некоторых гибридных конструкциях возникновения конкуренции между потоками обращения к конструкциям режима ядра также не происходит. В результате, если конкуренция так и не возникает, приложению не приходится сталкиваться с падением производительности и необходимостью выделять память для объекта. Некоторые конструкции поддерживают объект CancellationToken (он рассматривался в главе 27), а значит, поток получает возможность принудительно разблокировать другие потоки, которые могут находиться в режиме ожидания. В этом разделе мы рассмотрим различные типы гибридных конструкций.

Классы ManualResetEventSlim и SemaphoreSlim

Первые две гибридные конструкции — это классы System.Threading.ManualResetEventSlim и System. Threading. SemaphoreSlim[69]. Они функционируют точно также, как их аналоги режима ядра, отличаясь только зацикливанием в пользовательском
режиме, а также тем, что они не создают конструкций режима ядра до возникновения конкуренции. Их методы Wait позволяют передать информацию о времени ожидания и объект CancellationToken. Вот как выглядят данные классы (некоторые перегруженные версии методов не показаны):

public class ManualResetEventSlim : IDisposable {

public ManualResetEventSlim(Boolean initialState, Int32 spinCount);

public void DisposeQ;

public void ResetQ;

public void Set();

public Boolean Wait(

Int32 millisecondsTimeout, CancellationToken CancellationToken);

public Boolean IsSet { get; } public Int32 SplnCount { get; } public WaitHandle WaitHandle { get; }

}

public class SemaphoreSlim : IDisposable {

public SemaphoreSlim(Int32 initialCount, Int32 maxCount); public void DisposeQ;

public Int32 Release(Int32 neleaseCount); public Boolean Wait(

Int32 millisecondsTimeout, CancellationToken CancellationToken);

// Специальный метод для использования с async и await (см. главу 28) public Task<Boolean> WaitAsync(Int32 millisecondsTimeout,

CancellationToken CancellationToken);

public Int32 CurrentCount { get; }

public WaitHandle AvallableWaltHandle { get; }

>

Класс Monitor и блоки синхронизации

Вероятно, самой популярной из гибридных конструкций синхронизации потоков является класс Monitor, обеспечивающий взаимоисключающее блокирование с зацикливанием, владением потоком и рекурсией. Данная конструкция используется чаще других потому, что является одной из самых старых, для ее поддержки в C# существует встроенное ключевое слово, с ней по умолчанию умеет работать JIT- компилятор, a CLR пользуется ею от имени приложения. Однако, как вы скоро убедитесь, работать с ней непросто, а получить некорректный код очень легко. Сначала мы рассмотрим саму конструкцию, а потом отдельно остановимся на возможных проблемах и способах их обхода.

С каждым объектом в куче может быть связана структура данных, называемая блоком синхронизации (sync block). Этот блок содержит поля, похожие на поля ранее упоминавшегося в этой главе класса AnotherHybridLock. Точнее, есть поле для объекта ядра, идентификатора потока-владельца, счетчика рекурсии и счетчика ожидающих потоков. Класс Monitor является статическим, и его методы принимают ссылки на любой объект кучи. Управление полями эти методы осуществляют в блоке синхронизации заданного объекта. Вот как выглядят чаще всего используемые методы класса Monitor:

public static class Monitor {

public static void Enter(Object obj); public static void Exit(Object obj);

// Можно также указать время блокирования (требуется редко):

public static Boolean TryEnter(Object obj, Int32 millisecondsTimeout);

// Аргумент lockTaken будет рассмотрен позднее

public static void Enter(Object obj, ref Boolean lockTaken);

public static void TryEnter(

Object obj, Int32 millisecondsTimeout, ref Boolean lockTaken);

>

Очевидно, что привязка блока синхронизации к каждому объекту в куче является достаточно расточительной, особенно если учесть тот факт, что большинство объектов никогда не пользуются этим блоком. Чтобы снизить потребление памяти, разработчики CLR применили более эффективный вариант реализации описанной функциональности. Во время инициализации CLR выделяется массив блоков синхронизации. Как уже не раз упоминалось в этой книге, при создании объекта в куче с ним связываются два дополнительных служебных поля. Первое поле — указатель на объект-тип — содержит адрес этого объекта в памяти. Второе поле содержит индекс блока синхронизации (sync block index), то есть индекс в массиве таких блоков.

В момент конструирования объекта этому индексу присваивается значение — 1, что означает отсутствие ссылок на блок синхронизации. Затем при вызове метода Monitor. Enter CLR обнаруживает в массиве свободный блок синхронизации и присваивает ссылку на него объекту. То есть привязка объекта к блоку синхронизации происходит «налету». Метод Exit проверяет наличие потоков, ожидающих блока синхронизации. Если таких потоков не обнаруживается, метод возвращает индексу значение -1, означающее, что блоки синхронизации свободны и могут быть связаны с какими-нибудь другими объектами.

Рисунок 30.1 демонстрирует связь между объектами кучи, их индексами блоков синхронизации и элементами массива блоков синхронизации в CLR. Указатель на объект-тип объектов Д Б и С ссылается на тип Г. Это говорит о принадлежности всех трех объектов к одному и тому же типу. Как обсуждалось в главе 4, объект- тип также находится в куче и подобно всем остальным объектам обладает двумя служебными членами: индексом блока синхронизации и указателем на объект-тип. То есть блок синхронизации можно связать с объектом-типом, а ссылку на этот объект можно передать методам класса Monitor. Кстати, массив блоков синхронизации при необходимости может увеличить количество блоков, поэтому не стоит беспокоиться, что при одновременной синхронизации нескольких объектов блоков не хватит.

Рис. 30.1. Индекс блоков синхронизации объектов в куче (включая объекты-типы) может ссылаться на запись в массиве блоков синхронизации CLR


 

Следующий код демонстрирует предполагаемый исходный вариант использования класса Monitor:

internal sealed class Transaction { private DateTime m_time0fLastTrans;

public void PerformTransaction() {

Monitor.Enter(this);

// Этот код имеет эксклюзивный доступ к данным... m_timeOfLastTrans = DateTime.Now;

Monitor.Exit(this);

}

public DateTime LastTransaction { get {

Monitor.Enter(this);

// Этот код имеет совместный доступ к данным...

DateTime temp = mtimeOfLastTrans;

Monitor.Exit(this); return temp;

}

}

 

На первый взгляд все выглядит достаточно просто, но это не так. Проблема в том, что индекс блока синхронизации каждого объекта неявно находится в открытом доступе. И вот как это проявляется:

public static void SomeMethod() { var t = new TransactionQ;

Monitor.Enter(t); 11 Этот поток получает открытую блокировку объекта

// Заставляем поток пула вывести время LastTransaction // ПРИМЕЧАНИЕ. Поток пула заблокирован до вызова // методом SomeMethod метода Monitor.Exit!

ThreadPool.QueueUserWorkItem(o => Console.WriteLine(t.LastTransaction));

// Здесь выполняется какой-то код...

Monitor.Exit(t)j

}

В этом коде поток, выполняющий метод SomeMethod, вызывает метод Monitor. Enter, получая открытую блокировку объекта Transaction. Когда поток пула запрашивает свойство LastTransaction, это свойство также вызывает метод Monitor. Enter, чтобы получить право на то же самое блокирование. В результате поток пула оказывается заблокированным, пока поток, выполняющий метод SomeMethod, не вызовет метод Monitor. Exit. При помощи отладчика можно определить, что поток пула заблокирован внутри свойства LastTransaction, но узнать, какой еще поток заблокирован, очень сложно. Для этого нужно понять, какой именно код привел к получению блокировки. Но даже если вы это узнаете, вполне может оказаться, что этот код окажется недоступным для редактирования, а значит, вы не сможете устранить проблему. Именно поэтому я предлагаю пользоваться только закрытыми блокировками. Вот каким образом следует исправить класс Тransaction:

internal sealed class Transaction {

private readonly Object mlock = new Object(); // Теперь блокирование

// в рамках каждой транзакции ЗАКРЫТО

private DateTime mtimeOfLastTransj

public void PerformTransaction() {

Monitor.Enter(mlock); // Вход в закрытую блокировку // Этот код имеет эксклюзивный доступ к данным... mtimeOfLastTrans = DateTime.Now;

Monitor.Exit(mlock)j // Выход из закрытой блокировки

}

public DateTime LastTransaction { get {

Monitor.Enter(mlock)j // Вход в закрытую блокировку // Этот код имеет монопольный доступ к данным...

DateTime temp = mtimeOfLastTrans;

Monitor.Exit(mlock)j // Завершаем закрытое блокирование return temp;

}

}

Если бы члены класса Transaction были статическими, для их безопасности в отношении потоков достаточно было бы сделать статическим поле m_lock.

Однако я думаю, вы уже поняли из предшествующих обсуждений, что класс Monitor не должен быть реализован как статический; его следует реализовать, как и все прочие конструкции, в виде класса, допускающего создание экземпляров и вызов экземплярных методов. Также реализация класса Monitor как статического создаст ряд дополнительных проблем:

□ Если тип объекта-представителя является производным от System.Marshal- ByRef Object, на такой объект может ссылаться переменная (эта тема рассматривалась в главе 22). При передаче методам класса Monitor ссылки на такой представитель блокируется представитель, а не представляемый им объект.

□ Если поток вызывает метод Monitor. Enter и передает в него ссылку на объект- тип, загруженный нейтрально по отношению к домену (о том, как это сделать, мы говорили в главе 22), поток блокирует этот тип во всех доменах процесса. Это известная недоработка CLR, нарушающая декларируемую изолированность доменов. Исправить ее без потери производительности сложно, поэтому никто этим не занимается. Пользователям просто рекомендуют никогда не передавать ссылку на объект-тип в методы класса Monitor.

□ Так как строки допускают интернирование (это обсуждалось в главе 14), два разных фрагмента кода могут ошибочно сослаться на один и тот же объект String в памяти. При передаче ссылки на этот объект в методы типа Monitor выполнение этих двух фрагментов кода будет непреднамеренно синхронизироваться.

□ При передаче строки через границу домена CLR не создает ее копию; ссылка на строку просто передается в другой домен. Это повышает производительность, и в теории все должно быть в порядке, так как объекты типа String неизменны. Но с ними, как и с любыми другими объектами, связан индекс блока синхронизации, который может изменяться. Таким образом, потоки в различных доменах начинают синхронизироваться друг с другом. Это еще одна недоработка CLR, связанная с недостаточной изолированностью доменов. Поэтому пользователям рекомендуется никогда не передавать ссылок на объекты типа String методам класса Monitor.

□ Так как методы класса Monitor принимают параметры типа Object, передача им значимого типа приводит к его упаковке. В результате поток блокирует упакованный объект. При каждом вызове метода Monitor. Enter блокируется другой объект, и синхронизация потоков вообще отсутствует.

□ Применение к методу атрибута [MethodImpl(MethodImplOptions.Synchronized ) ] заставляет JIT-компилятор окружить машинный код метода вызовами Monitor. Enter и Monitor. Exit. Если метод является экземплярным, этим методам передается this, что приводит к установлению неявно открытой блокировки.

В случае статического метода этим двум методам передается ссылка на объект- тип, что приводит к потенциальной блокировке нейтрального по отношению к домену типа. Поэтому использовать данный атрибут не рекомендуется.

□ При вызове конструктора типов (он обсуждался в главе 8) CLR блокирует для типа объект-тип, гарантируя, что всего один поток примет участие в инициализации данного объекта и его статических полей. И снова загрузка этого типа нейтрально по отношению к домену создает проблемы. К примеру, если код конструктора типа войдет в бесконечный цикл, тип станет непригодным для использования всеми доменами в процессе. В данном случае рекомендуется по возможности избегать конструкторов типа или хотя бы делать их как можно более короткими и простыми.

К сожалению, дальше все становится только хуже. Так как разработчики привыкли в одном и том же методе устанавливать блокировку, что-то делать, а затем снимать блокировку, в C# появился упрощенный синтаксис в виде ключевого слова lock. Рассмотрим следующий метод:

private void SomeMethodQ { lock (this) {

// Этот код имеет эксклюзивный доступ к данным...

}

}

Приведенный фрагмент эквивалентен следующему:

private void SomeMethod() {

Boolean lockTaken = false; try {

//

Monitor.Enter(this, ref lockTaken);

// Этот код имеет монопольный доступ к данным...

}

finally {

if (lockTaken) Monitor.Exit(this);

}

}

Первая проблема в данном случае состоит в принятом разработчиками C# решении, что метод Monitor. Exit лучше вызывать в блоке finally. Они считали, что это гарантирует снятие блокировки вне зависимости от происходящего в блоке try. Однако ничего хорошего в этом нет. Если в блоке try в процессе изменения состояния возникнет исключение, состояние окажется поврежденным. И снятие блокировки в блоке finally приведет к тому, что с поврежденным состоянием начнет работать другой поток. Лучше позволить приложению зависнуть, чем оставить его работать с поврежденными данными и потенциальными брешами в защите. Кроме того, вход в блок try и выход из блока снижает производительность метода. Некоторые JIT-компиляторы не поддерживают подстановку для методов, в которых имеются блоки try, что еще больше снижает производительность. В итоге мы получаем более медленный код, к тому же допускающий доступ потоков к поврежденному состоянию[70]. Поэтому я крайне не рекомендую вам пользоваться инструкцией lock.

Теперь перейдем к переменной lockTaken типа Boolean и к проблеме, которую призвана решить эта переменная. Предположим, поток вошел в блок try и был прерван до вызова метода Monitor. Enter (прерывание потоков обсуждалось в главе 22). После этого вызывается блок finally, но его код не должен снимать блокировку В этом нам поможет переменная lockTaken. Ей присваивается начальное значение false, означающее, что блокировка еще не установлена. Если вызванный метод Monitor. Enter успешно получает блокировку, переменной lockTaken присваивается значение true. Блок finally по значению этой переменной определяет, нужно ли вызывать метод Monitor. Exit. Кстати, структура SpinLock также поддерживает паттерн с переменной lockTaken.

Класс ReaderWriterLockSlim

Часто потоки просто читают некие данные. Если такие данные защищены взаимоисключающей блокировкой (например, SimpleSpinLock, SimpleWaitLock, SimpleHybridLock, AnotherHybridLock, Mutex или Monitor), то при попытке одновременного доступа нескольких потоков работу продолжит только один из них, а остальные блокируются, что значительно ухудшает масштабируемость и снижает производительность вашего приложения. Впрочем, в случае доступа в режиме только для чтения необходимость в блокировке отпадает, и потоки получают одновременный доступ к данным. А вот потоку, который хочет внести в данные изменения, требуется монопольный доступ. Конструкция ReaderWriterLockSlim содержит логику, позволяющую решить данную проблему. Управление потоками осуществляется следующим образом:

□ Если один поток осуществляет запись данных, все остальные потоки, требующие доступа, блокируются.

□ Если один поток читает данные, все остальные потоки, требующие доступа, продолжают работу, блокируются только потоки, ожидающие доступа на запись.

□ После завершения работы потока, осуществлявшего запись данных, разблокируется либо один поток, ожидающий доступ на запись, либо все потоки, ожидающие доступ на чтение. При отсутствии заблокированных потоков блокировку получает следующий поток чтения или записи, которому это потребуется.

□ После завершения всех потоков, осуществлявших чтение данных, разблокируется поток, ожидающий разрешения на запись. При отсутствии заблокированных потоков блокировку получит следующий поток чтения или записи, которому это потребуется.

Вот как выглядит данный класс (некоторые перегруженные версии методов не показаны):

public class ReaderWriterLockSlim : IDisposable {

public ReaderWriterLockSlim(LockRecursionPolicy recursionPolicy); public void DisposeQ;

public void EnterReadLock();

public Boolean TryEnterReadl_ock(Int32 millisecondsTimeout); public void ExitReadLockQ;

public void EnterWriteLockQ;

public Boolean TryEnterWriteLock(Int32 millisecondsTimeout); public void ExitWriteLockQ;

// Большинство приложений никогда не обращаются к этим свойствам

public Boolean IsReadLockHeld { get; }

public Boolean IsWriteLockHeld { get; }

public Int32 CurrentReadCount { get; }

public Int32 RecunsiveReadCount { get; }

public Int32 RecursiveWriteCount { get; }

public Int32 WaitingReadCount { get; }

public Int32 WaitingWriteCount { get; }

public LockRecunsionPolicy RecursionPolicy { get; }

//He показаны члены, связанные с переходом от чтения к записи

}

Следующий код демонстрирует применение данной конструкции:

internal sealed class Transaction : IDisposable {
private readonly ReaderWriterLockSlim mlock =

new ReaderWriterLockSlim(LockRecursionPolicy.NoRecursion); private DateTime m_timeOfLastTrans;

public void PerformTransaction() { m_lock. EnterWriteLockQ;

// Этот код имеет монопольный доступ к данным... m_timeOfLastTrans = DateTime.Now; m_lock.ExitWriteLock();

>

public DateTime LastTransaction { get {

m_lock. EnterReadLockQ;

// Этот код имеет совместный доступ к данным...

DateTime temp = m_timeOfLastTrans; m_lock. ExitReadLockQ; return temp;

>

>

public void DisposeQ { m lock.DisposeQ; }

>

С этой конструкцией связан ряд концепций, заслуживающих отдельного упоминания. Во-первых, конструктору ReadenWnitenLockSlim можно передать флаг LockRecunionsPolicy, определенный следующим образом:

public enum LockRecursionPolicy { NoRecursion, SupportsRecursion }

Флаг SupportsRecursion наделяет код блокирования механизмом владения потоком и рекурсивным поведением. Как уже упоминалось в этой главе, эти режимы негативно влияют на производительность блокирования, так что я рекомендую всегда передавать конструктору флаг LockRecursionPolicy .NoRecursion (как это сделано в моем примере). Поддержка режимов владения потоком и рекурсии для блокирования на чтение-запись является крайне дорогим удовольствием, ведь при блокировании нужно отслеживать все блокируемые потоки, занимающиеся чтением данных, и поддерживать для каждого из них отдельный счетчик рекурсии. На самом деле для управления всей этой информацией в безопасном в отношении потоков режиме конструкция ReaderWriterLockSlim внутренне использует взаимоисключающее блокирование с зацикливанием! И я не шучу.

Класс ReaderWriterLockSlim содержит дополнительные методы (ранее они не демонстрировались), позволяющие читающему потоку превратиться в поток записывающий. Затем возможен обратный переход. В основе такого подхода лежит идея, что в процессе чтения данных потоком может возникнуть необходимость их редактирования. Поддержка данного поведения снижает производительность блокирования. Ну а я так вообще считаю его бесполезным. Дело в том, что поток не может просто так превратиться из читающего в пишущий. Перед тем как он получит позволение на подобное преобразование, все остальные читающие потоки должны покинуть код блокирования. Это то же самое, что поток чтения, освобождающий блокировку и тут же получающий ее для записи.

ПРИМЕЧАНИЕ

В FCL также присутствует конструкция ReaderWriterLock, появившаяся еще в Microsoft .NET Framework 1.0. Она была настолько проблемной, что в версию 3.5 разработчики Microsoft ввели конструкцию ReaderWriterLockSlim. Менять конструкцию ReaderWriterLock они не стали, чтобы не потерять совместимости с использующими ее приложениями. Данная конструкция работает крайне медленно даже в отсутствии конкуренции потоков. Она не позволяет отказаться от владения потоком и рекурсивного поведения, еще сильнее замедляющих блокирование. Потоки чтения имеют в этой конструкции приоритет перед потоками записи, что может привести к проблемам типа «отказ в обслуживании».

Класс OneManyLock

Я создал собственную конструкцию чтения-записи, работающую быстрее, чем встроенный в FCL класс ReaderWriterLockSlim[71]. Эта конструкция называется

OneManyLock, так как она предоставляет доступ либо одному пишущему потоку, либо нескольким читающим. Данный класс выглядит примерно следующим образом:

public sealed class OneManyLock : IDisposable { public OneManyLockQ; public void Dispose();

public void Enter(Boolean exclusive); public void LeaveQ;

}

Теперь посмотрим, как это работает. Класс содержит поле типа Int32, предназначенное для хранения состояния блокирования, объект Semaphore, блокирующий читающие потоки, и объект AutoReset Event, блокирующий пишущие потоки. Поле записи состояния содержит в себе пять вложенных полей.

□ Четыре бита представляют состояние блокировки. Значение 0 означает F гее (доступно), 1 — OwnedByWriter (занято записывающим потоком), 2 — OwnedByReaders (занято читающими потоками), 3 — OwnedByReadersAndWriterPending (занято записывающим и читающими потоками) и 4 — ReservedForWriter (зарезервировано для записывающего потока). Другие значения не используются.

□ Двадцать битов (число от 0 до 1048575) представляют количество потоков чтения (RR), допустимых для блокировки.

□ Двадцать битов представляют количество потоков чтения (RW), ожидающих получения блокировки. Эти потоки удерживает объект AutoResetEvent.

□ Двадцать битов представляют количество потоков записи (WW), ожидающих получения блокировки. Эти потоки удерживает объект Semaphore.

Теперь, когда вся информация о блокировании сконцентрирована в одном поле типа Int64, я могу управлять этим полем при помощи методов класса Interlocked. В результате блокирование выполняется очень быстро и приводит к блокированию потока только при конкуренции потоков.

Вот что происходит при входе потока в код блокирования совместного доступа:

□ Если блокирование возможно, присваиваем состоянию значение OwnedByReaders, выполняем RR = 1, возвращаем управление.

□ Если состояние блокирования имеет значение OwnedByReaders (занято потоком чтения), выполняем RR++, возвращаем управление.

□ В противном случае выполняем RW++, блокируем поток чтения. Когда поток проснется, проходим цикл и делаем вторую попытку.

Вот что происходит при выходе потока из кода блокировки совместного доступа:

□ Выполняем RR--.