□ Асинхронная функция не может иметь параметры out и ref.
□ Оператор await не может использоваться в блоке catch, finally или unsafe.
□ Не допускается установление блокировки, поддерживающей владение потоком или рекурсию, до операции await, и ее снятие после оператора await. Это ограничение объясняется тем, что один поток может выполнить код до await, а другой поток может выполнить код после await. При использовании await с командой C# lock компилятор выдает сообщение об ошибке. Если вместо этого явно вызвать методы Enter и Exit класса Monitor, то код откомпилиру- ется, но Monitor. Exit выдаст исключение SynchronizationLockException во время выполнения.
□ В выражениях запросов оператор await может использоваться только в первом выражении коллекции условия from или в выражении коллекции условия join.
Все эти ограничения не столь существенны. При их нарушении компилятор выдаст соответствующее сообщение, а проблемы обычно удается решить при помощи незначительной модификации кода.
Преобразование асинхронной функции в конечный автомат
Работа с асинхронными функциями станет более эффективной, если вы будете понимать суть преобразований кода, которые компилятор выполняет за вас. На мой взгляд, разобраться в происходящем проще всего на конкретном примере, поэтому мы начнем с определения нескольких простых типов и методов.
internal sealed class Typel { } internal sealed class Type2 { } private static async Task<Typel> MethodlAsync() {
/* Асинхронная операция, возвращающая объект Typel */
}
private static async Task<Type2> Method2Async() {
/* Асинхронная операция, возвращающая объект Туре2 */
}
Теперь я приведу асинхронную функцию, которая использует эти простые типы и методы.
private static async Task<String> MyMethodAsync(Int32 argument) {
Int32 local = argument; try {
Typel resultl = await MethodlAsyncQ; for (Int32 x = 0; x < 3; x++) {
Type2 result2 = await Method2Async();
}
}
catch (Exception) {
Console.WriteLine("Catch");
}
finally {
Console.WriteLine("Finally");
>
return "Done";
Код MyMethodAsync выглядит довольно запутанно, но он демонстрирует несколько ключевых моментов. Во-первых, сама асинхронная функция возвращает Task<String>, нов теле кода возвращается String. Во-вторых, в ней вызываются другие функции, которые выполняют асинхронные операции — одна автономно, другая в цикле for. Наконец, также присутствует код обработки исключений. При компиляции MyMethodAsync компилятор преобразует код метода в структуру конечного автомата с возможностью приостановки и продолжения выполнения.
Я взял приведенный код, откомпилировал его, а затем преобразовал IL-код обратно в исходный код С#. Далее я слегка упростил код и добавил подробные комментарии, чтобы вы поняли, что делает компилятор для работы асинхронных функций. Ниже приведен основной код, созданный в результате преобразования. Я показываю как преобразованный метод MyMethodAsync, так и структуру конечного автомата, от которой он зависит.
// Атрибут AsyncStateMachine обозначает асинхронный метод // (полезно для инструментов; использующих отражение);
// тип указывает; какая структура реализует конечный автомат.
[DebuggerStepThrough, AsyncStateMachine(typeof(StateMachine))] private static Task<String> MyMethodAsync(Int32 argument) {
// Создание экземпляра конечного автомата и его инициализация StateMachine StateMachine = new StateMachineQ {
// Создание построителя; возвращающего Task<String>.
// Конечный автомат обращается к построителю для назначения // завершения задания или выдачи исключения, mbuilder = AsyncTaskMethodBuilder<String>.Create(),
mstate =1, // инициализация местонахождения
m_argument = argument // Копирование аргументов в поля конечного }; // автомата
// Начало выполнения конечного автомата.
StateMachine.m_builder.Start(ref StateMachine);
return StateMachine.m_builder.Task; // Возвращение задания конечного } // автомата
// Структура конечного автомата
[CompilerGenerated; StructLayout(LayoutKind.Auto)] private struct StateMachine : IAsyncStateMachine {
// Поля для построителя конечного автомата (Task) и его местонахождения public AsyncTaskMethodBuilder<String> mbuilder; public Int32 m_state;
// Аргумент и локальные переменные становятся полями: public Int32 m_argument; mlocal, m_x; public Typel m_resultTypel; public Type2 m_resultType2;
// Одно поле на каждый тип Awaiter.
//В любой момент времени важно только одно из этих полей. В нем // хранится ссылка на последний выполненный экземпляр await,
// который завершается асинхронно:
private TaskAwaiter<Typel> m_awaiterTypel; private TaskAwaiter<Type2> m_awaiterType2;
11 Сам конечный автомат
void IAsyncStateMachine.MoveNextQ {
String result = null; // Результат Task
// Вставленный компилятором блок try гарантирует // завершение задания конечного автомата try {
Boolean executeFinally = true; // Логический выход из блока 'try' if (m_state == 1) { // Если метод конечного автомата
// выполняется впервые
m_local = m_argument; // Выполнить начало исходного метода
}
// Блок try из исходного кода try {
TaskAwaiter<Typel> awaiterTypel;
TaskAwaiter<Type2> awaiterType2;
switch (m_state) {
case 1: // Начало исполнения кода в 'try'
// вызвать MethodlAsync и получить его объект ожидания awaiterTypel = MethodlAsyncQ .GetAwaiterQ; if (!awaiterTypel.IsCompleted) {
mstate =0; // 'MethodlAsync'
// завершается асинхронно
m_awaiterTypel = awaiterTypel; // Сохранить объект
// ожидания до возвращения // Приказать объекту ожидания вызвать MoveNext // после завершения операции
m_builder.Awaitl)nsafeOnCompleted(ref awaiterTypelj ref this); // Предыдущая строка вызывает метод OnCompleted // объекта awaiterTypelj что приводит к вызову // ContinueWith(t => MoveNextQ) для Task.
// При завершении Task ContinueWith вызывает MoveNext
executeFinally = false; // Без логического выхода
// из блока 'try'
return; // Поток возвращает
} // управление вызывающей стороне
// 'MethodlAsync' завершается синхронно, break;
case 0: // 'MethodlAsync' завершается асинхронно
awaiterTypel = m_awaiterTypel; // Восстановление последнего
break; // объекта ожидания
case 1: // 'Method2Async' завершается асинхронно
awaiterType2 = m_awaiterType2; // Восстановление последнего
goto ForLoopEpilog; // объекта ожидания
}
// После первого await сохраняем результат и запускаем цикл 'for'
продолжение
m_resultTypel = awaiterTypel.GetResultQ; // Получение результата ForLoopPrologue:
m_x = 0; // Инициализация цикла 'for'
goto ForLoopBody; // Переход к телу цикла 'for'
ForLoopEpilog:
m_resultType2 = awaiterType2 .GetResultQ;
m_x++; // Увеличение x после каждой итерации
// Переход к телу цикла 'for'
ForLoopBody:
if (m_x < 3) { // Условие цикла 'for'
// Вызов Method2Async и получение объекта ожидания awaiterType2 = Method2Async(). GetAwaiterQ; if (!awaiterType2.IsCompleted) {
m_state = 1; II 'Method2Async' завершается
// асинхронно
m_awaiterType2 = awaiterType2; // Сохранение объекта
// ожидания до возвращения
// Приказываем вызвать MoveNext при завершении операции m_builder.AwaitUnsafeOnCompleted(ref awaiterType2J ref this) executeFinally = false; // Без логического выхода
// из блока 'try'
return; // Поток возвращает управление
} // вызывающей стороне
// 'Method2Async' завершается синхронно
goto ForLoopEpilog; // Синхронное завершение, возврат
>
>
catch (Exception) {
Console.WriteLine("Catch");
>
finally {
// Каждый раз, когда блок физически выходит из 'try',
// выполняется 'finally'.
// Этот код должен выполняться только при логическом // выходе из 'try', if (executeFinally) {
Console.WriteLine("Finally");
>
>
result = "Done"; // То, что в конечном итоге должна вернуть } // асинхронная функция,
catch (Exception exception) {
// Необработанное исключение: задание конечного автомата // завершается с исключением, mbuilder.SetException(exception); return;
}
// Исключения нет: задание конечного автомата завершается с результатом m_builder.SetResult(result);
>
}
Если вы не пожалеете времени на просмотр кода и чтение комментариев, думаю, вы сможете понять, что компилятор делает за вас. Пожалуй, стоит особо упомянуть об одном важном моменте. Каждый раз, когда в вашем коде используется оператор await, компилятор берет указанный операнд и пытается вызвать для него метод GetAwaiter. Этот метод может быть как экземплярным методом, так и методом расширения. Объект, возвращаемый при вызове GetAwaiter, называется объектом ожидания (awaiter).
После того как конечный автомат получит объект ожидания, он запрашивает его свойство IsCompleted. Если операция завершается синхронно, возвращается значение t rue, и в порядке оптимизации конечный автомат просто продолжает выполнение. Он вызывает метод GetResult объекта ожидания, который либо выдает исключение в случае неудачного выполнения операции, либо возвращает результат, если операция прошла успешно. Конечный автомат продолжает выполнение для обработки результата.
Если операция завершается асинхронно, IsCompleted возвращает false. В этом случае конечный автомат вызывает метод OnCompleted объекта ожидания, передавая ему делегата метода MoveNext конечного автомата. И теперь конечный автомат позволяет своему потоку вернуть управление в исходную точку, чтобы тот мог продолжить выполнение другого кода. В будущем объект ожидания, инкапсулирующий Task, узнает о своем заверении и вызывает делегата, что приводит к выполнению MoveNext. По полям конечного автомата определяется способ перехода к правильной точке кода, что создает иллюзию продолжения выполнения метода с того места, с которого он был прерван. На этой стадии код вызывает метод GetResult объекта ожидания и продолжает выполнение для обработки результата.
Так работает модель асинхронных функций, единственная цель которой — упрощение работы программиста по написанию кода без блокировки исполнения.
Расширяемость асинхронных функций
Что касается расширяемости, если в объект Task можно упаковать операцию, которая завершится в будущем, то вы сможете использовать оператор await для ожидания завершения этой операции. Представление всех разновидностей асинхронных операций одним типом (Task) чрезвычайно полезно, потому что оно позволяет реализовать комбинаторы (методы WhenAll и WhenAny класса Task) и другие полезные операции. Позднее в этой главе данная возможность будет продемонстрирована на примере упаковки CancellationToken с Task, позволяющем использовать await для асинхронной операции с поддержкой тайм-аута и отмены.
А сейчас я представлю еще один пример. Ниже приведен мой класс TaskLogger, который может использоваться для вывода информации о незавершенных асинхронных операциях. Такая информация чрезвычайно полезна в ходе отладки, особенно если ваше приложение «виснет» из-за некорректного запроса или отсутствия реакции сервера.
public static class TaskLogger {
public enum TaskLogLevel { None, Pending } public static TaskLogLevel LogLevel { get; set; }
public sealed class TaskLogEntny {
public Task Task { get; internal set; } public String Tag { get; internal set; } public DateTime LogTime { get; internal set; } public String CallerMemberName { get; internal set; } public String CallerFilePath { get; internal set; } public Int32 CallerLineNumber { get; internal set; } public override string ToStringQ {
return String.Format("LogTime={0}, Tag={l}, Member={2}, File={3}({4})“, LogTime, Tag ?? "(none)", CallerMemberName, CallerFilePath, CallerLineNumber);
}
}
private static readonly ConcurrentDictionarycTask, TaskLogEntry> slog = new ConcurrentDictionarycTask, TaskLogEntry>(); public static IEnumerable<TaskLogEntry> GetLogEntriesQ { return slog.Values; }
public static Task<TResult> Log<TResult>(this Task<TResult> task,
String tag = null,
[CallerMemberName] String CallerMemberName = null,
[CallerFilePath] String CallerFilePath = null,
[CallerLineNumber] Int32 CallerLineNumber = 1) { return (Task<TResult>)
Log((Task)task, tag, CallerMemberName, CallerFilePath, CallerLineNumber);
public static Task Log(this Task task, String tag = null, [CallerMemberName] String CallerMemberName = null, [CallerFilePath] String CallerFilePath = null, [CallerLineNumber] Int32 CallerLineNumber = 1) { if (LogLevel == TaskLogLevel.None) return task; var logEntry = new TaskLogEntry {
Task = task,
LogTime = DateTime.Now,
Tag = tag,
CallerMemberName = CallerMemberName,
CallerFilePath = CallerFilePath,
CallerLineNumber = CallerLineNumber
};
s_l°g[task] = logEntry;
task.ContinueWith(t => { TaskLogEntry entry; slog.TryRemove(t, out entry); },
TaskContinuationOptions.ExecuteSynchronously); return task;
}
}
Следующий фрагмент кода демонстрирует использование класса:
public static async Task Go() {
#if DEBUG
// Использование TaskLogger приводит к лишним затратам памяти // и снижению производительности; включить для отладочной версии TaskLogger.LogLevel = TaskLogger.TaskLogLevel.Pending;
#endif
// Запускаем 3 задачи; для тестирования TaskLogger их продолжительность
// задается явно.
var tasks = new List<Task> {
Task.Delay(2000).Log("2s op"),
Task.Delay(5000).Log("5s op"),
Task.Delay(6000).Log("6s op")
H
try {
// Ожидание всех задач с отменой через 3 секунды; только одна задача // должна завершиться в указанное время.
// Примечание: WithCancellation - мой метод расширения,
// описанный позднее в этой главе, await Task.WhenAll(tasks).
WithCancellation(new CancellationTokenSource(3000).Token);
}
catch (OperationCanceledException) { }
// Запрос информации о незавершенных задачах и их сортировка // по убыванию продолжительности ожидания
foreach (var op in TaskLogger.GetLogEntries().OrderBy(tle => tie.LogTime)) Console.WriteLine(op);
>
Построив и запустив эту программу, я получаю следующий результат:
LogTime=7/16/2012 6:44:31 AM, Tag=6s op, Member=Go,
File=C:\CLR via C#\Code\Ch281I00ps.cs(332)
LogTime=7/16/2012 6:44:31 AM, Tag=5s op, Member=Go,
File=C:\CLR via C#\Code\Ch281I00ps.cs(331)
Наряду с гибкостью, обусловленной использованием Task, асинхронные функции предоставляют еще одну точку расширения: компилятор вызывает GetAwaiter для операнда, использовавшегося с await. Таким образом, операнд вообще не обязан быть объектом Task; он может относиться к любому типу, содержащему метод GetAwaiter. Пример моего собственного объекта ожидания, связывающего конечный автомат async-метода с инициируемым событием.
public sealed class EventAwaiter<TEventArgs> : INotifyCompletion {
private ConcurrentQueue<TEventArgs> mevents = new ConcurrentQueue<TEventArgs>(); private Action mcontinuation;
#region Члены, вызываемые конечным автоматом
// Конечный автомат сначала вызывает этот метод для получения
// объекта ожидания; возвращаем текущий объект
public EventAwaiter<TEventArgs> GetAwaiter() { return this; }
продолжение P>
// Сообщает конечному автомату; произошли ли какие-либо события public Boolean IsCompleted { get { return m_events.Count > 0; } }
// Конечный автомат сообщает; какой метод должен вызываться позднее;
// сохраняем полученную информацию
public void OnCompleted(Action continuation) {
Volatile.Write(ref m_continuation, continuation);
}
// Конечный автомат запрашивает результат, которым является // результат оператора await public TEventArgs GetResultQ {
TEventArgs e;
m_events.TryDequeue(out e); return e;
}
#endregion
// Теоретически может вызываться несколькими потоками одновременно,
// когда каждый поток инициирует событие
public void EventRaised(Ob]ect sender, TEventArgs eventArgs) { m_events.Enqueue(eventArgs); // Сохранение EventArgs
// для возвращения из GetResult/await // Если имеется незавершенное продолжение, поток забирает его Action continuation = Interlocked.Exchange(ref mcontinuation, null); if (continuation != null) continuation(); // Продолжение выполнения } // конечного автомата
}
Следующий метод использует мой класс EventAwaiter для возвращения из оператора await при инициировании события. В данном случае конечный автомат продолжает выполнение при выдаче исключения любым потоком в домене.
private static async void ShowExceptionsQ {
var eventAwaiter = new EventAwaiter<FirstChanceExceptionEventArgs>(); AppDomain.CurrentDomain.FirstChanceException += eventAwaiter.EventRaised;
while (true) {
Console.WriteLine("AppDomain exception: {0}",
(await eventAwaiter).Exception.GetType());
}
}
И наконец, пример кода, который показывает, как работает эта система:
public static void Go() {
ShowExceptionsQ;
for (Int32 x = 0; x < 3; x++) { try {
switch (x) {
case 0: throw new InvalidOperationExceptionQ;
case 1: throw new ObjectDisposedException("“);
case 2: throw new ArgumentOutOfRangeException();
}
>
catch { }
>
}
Асинхронные функции и обработчики событий
Асинхронные функции обычно используют тип возвращаемого значения Task или Task< Result», представляющий завершение конечного автомата функции. Однако также возможно определение асинхронной функции с возвращаемым типом void. Это особый случай, который поддерживается компилятором C# для упрощения очень распространенной ситуации: реализации асинхронного обработчика события.
Почти все методы обработчиков событий имеют сигнатуру следующего вида:
void EventHandlerCallback(Object sender, EventArgs e);
На практике в обработчиках событий довольно часто выполняются операции ввода-вывода — например, когда пользователь щелкает на элементе пользовательского интерфейса, чтобы открыть файл и прочитать из него данные. Чтобы пользовательский интерфейс реагировал на действия пользователя, ввод-вывод должен выполняться асинхронно. Для использования такого кода в методе обработчика события с типом возвращаемого значения void компилятор C# должен разрешить асинхронным функциям иметь возвращаемый тип void, чтобы оператор await мог использоваться для выполнения неблокирующих операций ввода-вывода. Когда асинхронная функция имеет возвращаемый тип void, компилятор генерирует код создания конечного автомата, но не создает объект Task, потому что он все равно не будет использоваться. По этой причине невозможно узнать, что конечный автомат асинхронной функции, возвращающей void, отработал до завершения[53].
Асинхронные функции в FCL
Лично мне модель асинхронных функций очень симпатична, потому что ее довольно легко освоить, она проста в использовании и поддерживается многими типами библиотеки FCL. Асинхронные функции сразу видны в коде, потому что по действующим соглашениям имя метода снабжается суффиксом Async. В FCL многие типы, предоставляющие операции ввода-вывода, также предоставляют методы XxxAsync. Несколько примеров[54]:
□ Все производные от System .10. St ream классы предоставляют методы ReadAsync, WriteAsync, FlushAsync и CopyToAsync.
□ Все производные от System. 10.textReader классы предоставляют методы ReadAsync, ReadLineAsync, ReadToEndAsync и ReadBlockAsync. Классы, производные от System. 10.TextWriter, предоставляют методы WriteAsync, Write- LineAsync и FlushAsync.
□ Класс System.Net.Http.HttpClient предоставляет методы GetAsync, Get- StreamAsync, GetByteArrayAsync, PostAsync, PutAsync, DeleteAsync и др.
□ Все производные от System.Net .WebRequest классы (включая FileWebRequest, FtpWebRequest и HttpWebRequest) предоставляют методы GetRequestStreamAsync и GetResponseAsync.
□ Класс System.Data.SqlClient.SqlCommand предоставляет методы Exe- cuteDbDataReaderAsync,ExecuteNonQueryAsync, ExecuteReaderAsync, ExecuteScalarAsync и ExecuteXmlReaderAsync.
□ Инструменты (такие, как Svclltil.exe), создающие типы представителей для вебслужб, также генерируют методы XxxAsync.
Программистам с опытом использования предыдущих версий .NET Framework могут быть известны другие модели асинхронного программирования — как, например, модель, использовавшая методы BeginXxx и EndXxx в сочетании с интерфейсом IAsyncResult. Также имеется событийная модель, использовавшая методы XxxAsync (не возвращающие объекты Task) с вызовами методов обработчиков событий при завершении асинхронных операций. Эти две модели асинхронного программирования теперь считаются устаревшими, и вместо них рекомендуется использовать новую модель с объектами Task.
Просматривая описания классов FCL, можно заметить, что у некоторых классов нет методов XxxAsync, а вместо них предоставляются методы BeginXxx и EndXxx. В основном это объясняется тем, что у компании Microsoft не было времени для
обновления этих классов новыми методами. В будущем эти классы будут доработаны, и в них появится полноценная поддержка новой модели. А до того времени можно воспользоваться вспомогательным методом, адаптирующим старую модель BeginXxx/EndXxx для новой модели на базе Task.
Ранее я приводил код клиентского приложения, которое передает запрос по именованному каналу. Пора привести серверную сторону этого кода.
private static async void StartServerQ { while (true) {
var pipe = new NamedPipeServerStream(c_pipeName, PipeDirection.InOut, 1, PipeTransmissionMode.Message, PipeOptions.Asynchronous | PipeOptions.WriteThrough);
11 Асинхронный прием клиентского подключения.
// ПРИМЕЧАНИЕ: NamedPipeServerStream использует старую модель // асинхронного программирования.
// Я преобразую ее к новой модели Task при помощи метода // FromAsync класса TaskFactory.
await Task.Factory.FromAsync(pipe.Beginl/daitForConnection, pipe.Endl/daitForConnection, null);
// Начало обслуживания клиента; управление возвращается немедленно,
// потому что операция выполняется асинхронно.
SeгviceClientRequestAsync(pipe);
}
}
В классе NamedPipeServerStream определены методы BeginWaitForConnection и EndWaitForConnection, но еще не определен метод WaitForConnectionAsync.
Ожидается, что этот метод будет добавлен в будущей версии FCL. Впрочем, как видно из предыдущего кода, я вызываю метод FromAsync класса TaskScheduler, передаю ему имена методов BeginXxx и EndXxx, а метод FromAsync создает объект Task, который является «оберткой» для этих методов. Теперь объект Task можно использовать с оператором await[55].
Для старой событийной модели программирования в FCL нет вспомогательных методов, адаптирующих эту модель к новой модели на базе Task, поэтому вам придется программировать их вручную. Следующий код показывает, как упаковать объект WebClient (использующий событийную модель программирования) с объектом TaskCompletionSource, чтобы для него можно было вызывать await в асинхронной функции.
private static async Task<String> AwaitWebClient(Uri uri) {
// Класс System.Net.WebClient поддерживает событийную модель
продолжение &
// асинхронного программирования var wc = new System.Net.WebClientQ;
// Создание объекта TaskCompletionSource и его внутреннего объекта Task var tcs = new TaskCompletionSource<String>();
// При завершении загрузки строки объект WebClient инициирует // событие DownloadStringCompleted, завершающее TaskCompletionSource wc.DownloadStringCompleted += (s, е) => { if (е.Cancelled) tcs.SetCanceled(); else if (e.Error != null) tcs.SetException(e.Error); else tcs.SetResult(e.Result);
};
// Начало асинхронной операции wc.DownloadStringAsync(uri);
// Теперь мы можем взять объект Task из TaskCompletionSource //и обработать результат обычным способом.
String result = await tcs.Task;
// Обработка строки результата (если нужно)...
return result;
}
Асинхронные функции и исключения
Если при обработке драйвером устройства асинхронного запроса что-то пойдет не так, Windows нужно проинформировать об этом приложение. К примеру, представим, что при передаче байтов по сети произошел тайм-аут. Если данные не пришли вовремя, драйвер устройства сообщает вам, что асинхронная операция завершилась с ошибкой. Для этого он отправляет готовый IRP-пакет в CLR-пул потоков, а поток пула завершает объект Task с исключением. При возобновлении выполнения конечного автомата оператор await видит, что попытка выполнения операции была неудачной, и выдает исключение.
В главе 27 я говорил о том, что объекты Task обычно инициируют исключение AggnegateException, а для получения информации о реальных исключениях следует обратиться к свойству InnerExceptions этого исключения. Однако при использовании await с Task вместо AggregateException выдается первое внутреннее исключение1. Это было сделано для того, чтобы поведение кода соответствовало ожиданиям разработчика. Кроме того, без этого вам пришлось бы перехватывать AggregateException в вашем коде, проверять внутреннее исключение и либо перехватывать его, либо выдавать заново. От этого код становится слишком громоздким.
Если ваш метод конечного автомата сталкивается с необработанным исключением, то объект Task, представляющий асинхронную функцию, завершается из-за необработанного исключения. Любой код, ожидающий завершения этого объекта Task, «увидит» это исключение. Однако асинхронная функция также может иметь возвращаемый тип void; в этом случае вызывающая сторона не может обнаружить необработанное исключение. Таким образом, при выдаче необработанного исключения асинхронной функцией, возвращающей void, сгенерированный компилятором код перехватит его и выдаст заново с использованием контекста синхронизации стороны вызова (см. далее). Если сторона вызова исполняется в потоке графического интерфейса, то исключение будет перезапущено потоком графического интерфейса. Обычно повторное инициирование исключений приводит к завершению всего процесса.
Другие возможности асинхронных функций
В этом разделе речь пойдет о некоторых дополнительных возможностях асинхронных функций. Поддержка отладки асинхронных функций в Microsoft Visual Studio реализована просто замечательно. Когда отладчик остановлен на операторе await, пошаговое выполнение (F10) осуществляет переход в отладчик при достижении следующей команды после завершения операции. При этом код может выполняться даже в другом потоке, а не в том, который инициировал операцию! Такое поведение очень удобно и существенно упрощает отладку.
Кроме того, при пошаговом выполнении с заходом (F11) в асинхронную функцию можно выйти из функции, вернувшись на сторону вызова (Shift+F11); впрочем, это должно быть сделано на открывающей фигурной скобке асинхронной функции. После ее прохождения комбинация Shift+F11 не будет работать, пока асинхронная функция не отработает до завершения. Если вам потребуется отладить вызывающий метод до того, как конечный автомат отработает до завершения, установите точку прерывания в вызывающем методе и включите код на выполнение (F5).
Некоторые асинхронные операции выполняются очень быстро, а, следовательно, завершаются почти мгновенно. В таких ситуациях неэффективно приостанавливать конечный автомат, чтобы другой поток немедленно возобновил его выполнение; гораздо эффективнее просто разрешить конечному автомату продолжить выполнение. К счастью, код, сгенерированный компилятором для оператора await, проверяет подобные ситуации. Если асинхронная операция завершается непосредственно перед возвратом управления из потока, поток не возвращает управление, а просто выполняет следующую строку кода.
Все это, конечно, хорошо, но время от времени попадаются асинхронные функции, которые выполняют значительные вычисления перед запуском асинхронной операции. Если вызвать такую функцию из потока графического интерфейса вашего приложения, интерфейс перестанет реагировать на действия пользователя. А если асинхронная операция завершается синхронно, то пользовательский интерфейс будет недоступен в течение еще большего времени. Таким образом, если вы хотите инициировать асинхронную функцию из другого потока, используйте статический метод Run класса Task:
// Task.Run вызывается в потоке графического интерфейса Task.Run(async () => {
// Этот код выполняется в потоке из пула // T0D0: Подготовительные вычисления...
await XxxAsyncQ; // Инициирование асинхронной операции // Продолжение обработки...
});
В этом коде продемонстрирована еще одна полезная возможность С#: асинхронные лямбда-выражения. Дело в том, что оператор awat нельзя просто поместить в тело обычного лямбда-выражения, потому что компилятор не сможет преобразовать метод в конечный автомат. Размещение async перед лямбда-выражением заставит компилятор преобразовать лямбда-выражение в метод конечного автомата, возвращающий значение Task или Task<TResult>, которое может быть присвоено любой делегатной переменной Func с типом возвращаемого значения Task или Task<TResult>.
При написании кода очень легко вызвать функцию async, забыв об использовании оператора await:
static async Task OutenAsyncFunction() {
InnerAsyncFunction(); // В этой строке пропущен оператор await!
// Код продолжает выполняться, как и InnerAsyncFunction...
}
static async Task InnerAsyncFunction() { /* ... */ }
К счастью, в таких ситуациях компилятор C# выдает предупреждение и предлагает применить оператор await к результату вызова. Это хорошо, но в отдельных случаях вас действительно не интересует, когда завершится InnerAsyncFunction, и вы бы предпочли использовать этот код без предупреждений компилятора. Чтобы избавиться от предупреждения, просто присвойте переменной объект Task, возвращенный InnerAsyncFunction. В дальнейшем переменную можно игнорировать[56].
static async Task OuterAsyncFunction() {
var noWarning = InnerAsyncFunctionQ; // Строка без await
// Этот код продолжает выполняться, как и код InnerAsyncFunction...
}
А я предпочитаю определять метод расширения, который выглядит так:
[MethodImpl(MethodImplOptions.Aggressivelnlining)] // Заставляет компилятор
// убрать вызов при оптимизации
public static void NoWarning(this Task task) { /* He содержит кода */ }
Далее метод используется следующим образом:
static async Task OuterAsyncFunction() {
InnerAsyncFunction().NoWarning(); // Строка без await
// Код продолжает выполняться, как и InnerAsyncFunction...
}
У асинхронных операций ввода-вывода есть одна действительно замечательная особенность: вы можете инициировать сразу несколько операций, чтобы они выполнялись параллельно. Это может обеспечить феноменальный прирост производительности приложения. Я еще нигде не приводил код, который запускает мой сервер именованного канала, а затем обращается к нему с клиентскими запросами. Вот как он выглядит: public static async Task Go() {
// Запуск сервера немедленно возвращает управление, потому что // сервер ожидает клиентские запросы в асинхронном режиме StartServerQ; // Возвращает void, компилятор выдает предупреждение
// Создание набора асинхронных клиентских запросов;
// сохраняем Task<String> каждого клиента.
List<Task<String>> requests = new List<Task<String>>(10000); for (Int32 n = 0; n < requests.Capacity; n++)
requests.Add(IssueClientRequestAsync("localhost", "Request #" + n));
// Асинхронное ожидание завершения всех клиентских запросов // ВНИМАНИЕ: если 1+ заданий выдадут исключение,
// WhenAll заново инициирует последнее исключение String[] responses = await Task.WhenAll(requests);
// Обработка всех запросов
for (Int32 n = 0; n < responses.Length; n++)
Console.Write Line(responses[n]);
}
Код запускает сервер именованного канала, чтобы он начинал прослушивание клиентских запросов, а затем в цикле for инициирует 10 000 запросов с максимально возможной скоростью. При каждом вызове IssueClientRequestAsync возвращается объект Task<String>, который добавляется в коллекцию, Теперь сервер именованного канала с максимально возможной скоростью обрабатывает эти запросы, используя потоки из пула, которые пытаются обеспечить максимальную загрузку всех процессоров на машине[57]. По мере того, как сервер обрабатывает каждый запрос, объект Task<String> каждого запроса завершается со строковым ответом, возвращенным сервером.
В предыдущем коде я хотел дождаться получения ответов на все клиентские запросы, прежде чем переходить к обработке результатов. Для этого я вызывал статический метод WhenAll класса Task. Во внутренней реализации представлений этот метод создает объект Task<String[ ] >, который завершается после завершения всех объектов Task из коллекции List. Затем оператор await вызывается для объекта Task<String[ ] >, чтобы конечный автомат продолжил работу после выполнения всех задач. Я последовательно перебираю ответы и обрабатываю их (вызывая Console.WniteLine).
Возможно, вы захотите обработать каждый запрос по мере поступления — вместо того, чтобы дожидаться их завершения. Эта задача решается почти так же просто при помощи статического метода WhenAny класса Task. Обновленная версия кода выглядит так:
public static async Task Go() {
// Запуск сервера немедленно возвращает управление, потому что // сервер ожидает клиентские запросы в асинхронном режиме StartServerQ;
// Создание набора асинхронных клиентских запросов;
// сохраняем Task<String> каждого клиента.
List<Task<String>> requests = new List<Task<String>>(10000); for (Int32 n = 0; n < requests.Capacity; n++)
requests.Add(IssueCllentRequestAsync("localhost", "Request #" + n));
// Продолжение с завершением КАИДОЙ задачи while (requests.Count > 0) {
// Последовательная обработка каадого завершенного ответа
Task<String> response = await Task.WhenAny(requests);
requests.Remove(response); // Удаление завершенной задачи из коллекции
// Обработка одного ответа Console.WriteLine(response.Result);
}
}
Здесь я создаю цикл while, перебирающий клиентские запросы. В цикле оператор await вызывается для метода WhenAny класса Task, который возвращает один объект Task<String> для клиентского запроса, обработанного сервером. После получения объекта Тask<String> я исключаю его из коллекции, а затем запрашиваю результат для обработки (передачи Console.WriteLine).
Потоковые модели приложений
В .NET Framework поддерживаются разнообразные модели приложений, каждая из которых может предложить собственную потоковую модель. Консольные приложения и Windows-службы (которые фактически тоже являются консольными
приложениями, просто вы не видите консоль) не навязывают никакой потоковой модели; то есть поток может делать все, что он хочет и когда хочет.
Однако приложения с графическим пользовательским интерфейсом (GUI), в том числе приложения Windows Forms, Windows Presentation Foundation (WPF), Silverlight и Windows Store, предлагают такую модель, в которой обновлять окно можно только создавшему его потоку. GUI-потоки обычно порождают асинхронные операции, чтобы предотвратить блокировку и не допустить отсутствия реакции интерфейса на средства пользовательского ввода — мышь, клавиатуру, перо, сенсорный экран. Однако при завершении асинхронной операции поток пула завершает объект Task, возобновляя работу конечного автомата.
Для некоторых приложений такое поведение нормально и даже желательно, потому что оно эффективно. Но для других моделей приложений (например, приложений с графическим интерфейсом) оно создает проблемы, потому что код выдает исключение при попытке обновления элементов пользовательского интерфейса через поток из пула. Иногда последний должен каким-то образом заставить графический поток обновлять элементы пользовательского интерфейса.
Приложения ASP.NET позволяют любому потоку делать все, что угодно. Начав обрабатывать клиентский запрос, поток пула может выбрать пользовательские региональные стандарты (System.Globalization .Culturelnfo), позволив серверу осуществить принятые в рассматриваемом регионе форматы чисел, дат и времени[58]. Также веб-сервер может определить идентификационные данные клиента (System. Security. Principal. IPrincipal), предоставив ему доступ только к тем ресурсам, на которые у него есть права. Порожденная одним потоком пула асинхронная операция заканчивается другим потоком, который обрабатывает ее результат. Хотя эта работа и выполняется по поручению клиентского запроса, региональные стандарты и идентификационные данные клиента должны «переходить» к новому потоку пула, чтобы вся работа, выполняемая по поручению клиента, использовала региональные стандарты и идентификационную информацию клиента.
К счастью, в FCL определен базовый класс System. Threading. Synchronization- Context, позволяющий решить все описанные проблемы. Объект, производный от этого класса, связывает прикладную модель с потоковой. В FCL имеется группа классов, производных от класса SynchronizationContext, но обычно напрямую они не используются; более того, многие из них даже не документированы.
В основном разработчикам приложений не нужно ничего знать о классе SynchronizationContext. При вызове await для Task используется объект SynchronizationContext вызывающего потока. Когда пул потока завершает объект Task, используется объект SynchronizationContext, обеспечивающий соответствие потоковой и прикладной модели. Таким образом, когда GUI-поток выполняет await для Task, код, следующий за оператором await, заведомо будет исполнен в GUI-потоке, что позволяет этому коду обновить элементы пользовательского интерфейса. Для приложений ASP.NET код, следующий за оператором await, заведомо выполняется в пуле потока, с которым связаны региональные стандарты и идентификационные данные клиента.
В большинстве случаев возобновление работы конечного автомата с использованием потоковой модели приложения чрезвычайно полезно и удобно. Впрочем, в отдельных случаях оно способно создать проблемы. Следующий пример вызывает взаимную блокировку приложения WPF:
private sealed class MyWpfWindow : Window {
public MyWpfWindow() { Title = "WPF Window"; }
protected override void OnActivated(EventArgs e) {
// Запрос свойства Result не позволяет GUI-потоку вернуть управление;
// поток блокируется в ожидании результата
String http = GetHttp().Result; // Синхронное получение строки base.OnActivated(e);
}
private async Task<String> GetHttpQ {
// Выдача запроса HTTP и возврат из GetHttp HttpResponseMessage msg = await new
HttpClientQ .Get Async ("http://Wintellect. com/");
// В эту точку мы никогда не попадем: GUI-поток ожидает завершения // этого метода, а метод не может завершиться, потому что GUI-поток // ожидает его завершения > ВЗАИМНАЯ БЛОКИРОВКА!
return await msg.Content. ReadAsStringAsyncQ;
}
}
Разработчики, создающие библиотеки классов, определенно должны знать о классе SynchronizationContext. Это позволит им создавать высокопроизводительный код, работающий со всеми моделями приложений. Так как большая часть библиотечного кода не зависит от модели приложения, нам хотелось бы избежать дополнительных затрат, связанных с использованием объекта SynchronizationContext. Кроме того, разработчики библиотек классов должны сделать все возможное, чтобы помочь разработчикам приложений избежать ситуаций взаимной блокировки. Для решения обеих проблем классы Task и Task<TResult> предоставляют метод ConfigureAwait с сигнатурой следующего вида:
// Task определяет метод: public ConfiguredTaskAwaitable
ConfigureAwait(Boolean continueOnCapturedContext);
// Task<TResult> определяет метод: public ConfiguredTaskAwaitable<TResult>
ConfigureAwait(Boolean continueOnCapturedContext);
При передаче true метод ведет себя так, как если бы он вообще не вызвался. Но если передать значение false, то оператор await не запрашивает объект
SynchnonizationContext вызывающего потока, а когда поток пула завершает задание Task, то происходит простое завершение с выполнением кода после оператора await через поток пула.
Хотя мой метод GetHttp не входит в библиотеку классов, проблема взаимной блокировки исчезает при добавлении вызовов Conf iguneAwait. Измененная версия метода GetHttp выглядит так:
private async Task<String> GetHttpQ {
// Выдача запроса HTTP и возврат из GetHttp HttpResponseMessage msg = await new
HttpClientQ .Get Async ("http ://Wintellect. com/")
.ConfigureAwait(false);
II На этот раз управление попадет в эту точку, потому что поток пула // может выполнить этот код (в отличие от выполнения через GUI-поток).
return await msg.Content.ReadAsStringAsync().ConfigureAwait(false);
}
Как показывает предыдущий код, вызов ConfiguneAwait(false) должен быть применен к каждому объекту Task, используемому с await. Это связано с тем, что асинхронные операции могут завершаться синхронно, и когда это происходит, вызывающий поток просто продолжает выполняться без возвращения управления стороне вызова; вы никогда не знаете, какой операции потребуется игнорировать объект SynchnonizationContext, поэтому необходимо приказать всем операциям игнорировать его. Это также означает, что код библиотеки классов должен быть независимым от модели приложения.
Также можно переписать метод GetHttp так, как показано ниже, чтобы все выполнение происходило через поток пула:
private Task<String> GetHttpQ {
return Task.Run(async () => {
// Выполнение в потоке пула, с которым не связан // объект SynchronizationContext HttpResponseMessage msg = await new
HttpClientQ .Get Async ("http ://Wintellect. com/"); return await msg.Content. ReadAsStringAsyncQj
});
}
Обратите внимание: в этой версии кода метод GetHttp не является асинхронной функцией; я удалил ключевое слово async из сигнатуры метода, потому что метод более не содержит оператор await. С другой стороны, лямбда-выражение, передаваемое Task. Run, является асинхронной функцией.
Асинхронная реализация сервера
Многолетнее общение с множеством разработчиков показало, что лишь очень немногие из них знают о встроенных средствах .NET Framework, позволяющих строить асинхронные серверы с хорошей масштабируемостью. В этой книге я не смогу объяснить, как эта задача решается для каждого вида серверов, но могу хотя бы указать, что следует искать в документации MSDN.
□ Построение асинхронных приложений Web Forms ASP.NET: добавьте в файле .aspx строку «Async=true» в директиве page и ознакомьтесь с описанием метода RegistenAsyncTask классаSystem.Web.UI.Page.
□ Построение асинхронных MVC-контроллеров ASP.NET: объявите класс контроллера производным от System.Web.Mvc .AsyncController и верните Task<ActionResult> из метода действия.
□ Построение асинхронного обработчика ASP.NET: объявите класс производным от System.Web.HttpTaskAsyncHandlenи переопределите его абстрактный метод PnocessRequestAsync.
□ Построение асинхронной службы WCF: реализуйте службу как асинхронную функцию, возвращающую Task или Task<TResult>.
Отмена операций ввода-вывода
В общем случае Windows не предоставляет возможности отмены затянувшейся операции ввода-вывода. Многие разработчики хотели бы видеть такую возможность, но реализовать ее достаточно сложно. Ведь если вы обращаетесь с запросом к серверу, а потом решаете, что ответ вам больше не нужен, просто приказать серверу проигнорировать исходный запрос уже не удастся; нужно принять байты на клиентской машине и отбросить их. Кроме того, возникает ситуация гонки — запрос на отмену может поступить в то время, когда сервер передает ответ. И как должно поступить ваше приложение? Вам придется обработать эту потенциальную ситуацию в своем коде и решить, то ли проигнорировать данные, то ли обработать их.
Для упрощения этой задачи я рекомендую реализовать метод расширения WithCancellation, который расширяет Task<TResult> (вам также понадобится аналогичная перегрузка, расширяющая Task) следующим образом:
private struct Void { } // Из-за отсутствия необобщенного класса // TaskCompletionSource.
private static async Task<TResult>
WithCancellation<TResult>(this Task<TResult> originalTask,
CancellationToken ct) {
// Создание объекта Task, завершаемого при отмене CancellationToken var cancelTask = new TaskCompletionSource<Void>();
// При отмене CancellationToken завершить Task using (ct.Register(
t => ((TaskCompletionSource<Void>)t).TrySetResult(new VoidQ), cancelTask)) {
// Создание объекта Task, завершаемого при отмене исходного
// объекта Task или объекта Task от CancellationToken
Task any = await Task.WhenAny(originalTask, cancelTask.Task);
11 Если какой-либо объект Task завершается из-за CancellationToken,
// инициировать OperationCanceledException
if (any == cancelTask.Task) ct.ThrowIfCancellationRequestedQ;
}
11 Выполнить await для исходного задания (синхронно); awaiting it 11 если произойдет ошибка, выдать первое внутреннее исключение // вместо AggregateException return await originalTask;
}
Теперь этот метод расширения вызывается следующим образом:
public static async Task Go() {
// Создание объекта CancellationTokenSource, отменяющего себя
// через заданный промежуток времени в миллисекундах
var cts = new CancellationTokenSource(5000); // Чтобы отменить ранее,
var ct = cts.Token; // вызовите cts.Cancel()
try {
// Я использую Task.Delay для тестирования; замените другим методом,
// возвращающим Task
await Task.Delay(10000).WithCancellation(ct);
Console.WriteLine("Task completed");
}
catch (OperationCanceledException) {
Console.WriteLine("Task cancelled");
}
}
Некоторые операции ввода-вывода должны выполняться синхронно
В Win32 API существует множество функций, выполняющих операции ввода-вывода. К сожалению, не все их них допускают асинхронное выполнение. К примеру, Win32-MeTOfl CreateFile (вызываемый конструктором FileStream) всегда выполняется синхронно. При попытке создать или открыть файл на сервере в сети до возвращения управления методом CreateFile может пройти несколько секунд — в это время вызывающий поток ничего не делает. В идеале приложения, разработанные с прицелом на оптимальные производительность и масштабируемость, должны использовать Win32-функцию, которая создает или открывает файл в асинхронном режиме, чтобы поток не ждал ответа с сервера. К сожалению, в Win32 нет функции, подобной CreateFile, а, значит, FCL не предлагает эффективного средства асинхронного открытия файлов. Windows также не предоставляет функций для асинхронного обращения к реестру, обращения к журналу событий, получения списка файлов/подкаталогов, изменения атрибутов файла/каталога и т. д.
Рассмотрим ситуацию, когда такое поведение становится серьезной проблемой. Представьте, что вам нужно написать простой элемент интерфейса для ввода пути к файлу и поддержкой автоматического завершения (примерно как в часто используемом диалоговом окне открытия файла). Этот элемент управления должен задействовать отдельные потоки для перебора папок, в которых осуществляется поиск файлов, так как в Windows не существует функции асинхронного перебора файлов. По мере того как пользователь продолжает вводить путь к файлу, вам придется подключать дополнительные потоки, игнорируя результаты ранее порожденных потоков. В Windows Vista появилась новая Win32^yHKnHH CancelSynchronousIO. Она позволяет одному потоку отменять синхронную операцию ввода-вывода, проводимую другим. Эта функция не отражена в FCL, но если вы решите воспользоваться ею из управляемого кода, вызовите ее при помощи механизма P/Invoke. Сигнатура P/Invoke для данного случая рассмотрена в следующем разделе.
Многие полагают, что с синхронным прикладным программным интерфейсом работать проще, и во многих случаях это действительно так. Но иногда именно синхронные интерфейсы сильно осложняют жизнь.
Из-за проблем, возникающих при синхронном выполнении операций ввода-вывода, при проектировании Windows Runtime группа Windows решила предоставить все методы ввода/вывода в асинхронной форме. Таким образом, сейчас существует функция Windows Runtime API для асинхронного открытия файлов (см. описание метода OpenAsync класса Windows.Storage.StorageFile). Windows Runtime не представляет функций API для синхронного выполнения операций ввода/вывода. К счастью, при вызове этих функций можно воспользоваться асинхронными функциями C# для упрощения кода.
Проблемы FileStream
При создании объекта FileStream флаг FileOptions. Asynchronous позволяет указать, какие операции — синхронные или асинхронные — будут использоваться для взаимодействия (что эквивалентно вызову Win32^yHKnHH CreateFile и передаче ей флага FILE_FLAG_OVERLAPPED). При отсутствии этого флага Windows выполняет все операции с файлом в синхронном режиме. Разумеется, ничто не мешает вызвать метод ReadAsync объекта FileStream. С точки зрения приложения это выглядит как асинхронное выполнение операций, но на самом деле класс FileStream эмулирует асинхронное поведение при помощи дополнительного потока, который впустую тратит ресурсы и снижает производительность.
В то же время можно создать объект FileStream, указав флаг FileOptions. Asynchronous. После этого вы можете вызвать метод Read объекта FileStream для выполнения синхронной операции. Класс FileStream эмулирует такое поведение, запуская асинхронную операцию и немедленно переводя вызывающий поток в спящий режим до завершения операции. Это тоже не самый эффективный способ, но он лучше, чем вызов метода BeginRead с применением объекта FileStream, созданного без флага FileOptions.Asynchronous.
Подведем итоги. При работе с объектом FileStream следует заранее выбрать, синхронным или асинхронным будет ввод-вывод файлов, и установить флаг FileOptions.Asynchronous (или не делать этого). Если флаг установлен, всегда вызывайте метод ReadAsync, а если нет — метод Read. Это обеспечит наилучшую производительность. Если вы собираетесь выполнить синхронную или асинхронную операцию с объектом FileStream, эффективней всего конструировать данный объект с флагом FileOptions .Asynchronous. В качестве альтернативы можно создать два объекта FileStream для одного файла. Один объект FileStream будет открыт для асинхронного ввода-вывода, второй — для синхронного. Учтите, что класс System. 10. File содержит вспомогательные методы (Create, Open и OpenWrite), которые создают и возвращают объекты FileStream. Во внутренней реализации ни один из этих методов не использует флаг FileOptions.Asynchronous, поэтому для построения масштабируемых приложений с хорошим временем отклика от этих методов лучик; держаться подальше.
Следует также помнить, что драйвер устройства файловой системы NTFS выполняет некоторые операции в синхронном режиме вне зависимости от способа открытия файла. Дополнительную информацию по этой теме вы найдете по адресу http://support.microsoft.com/default.aspx?scid=kb%3Ben-us%3B 156932.
Приоритеты запросов ввода-вывода
В главе 26 было показано, каким образом приоритет потока влияет на способ его исполнения. Однако сами потоки также выполняют запросы ввода-вывода к различным аппаратным устройствам для чтения и записи данных. Если время процессора окажется выделено под запросы с низким приоритетом, в очереди очень быстро окажутся сотни и даже тысячи запросов. Так как для их обработки требуется время, скорее всего, поток с низким приоритетом повлияет на быстродействие системы, приостановив более приоритетные потоки. Именно поэтому можно наблюдать снижение быстродействия компьютера при выполнении длительных низкоприоритетных заданий, таких как дефрагментация диска, сканирование на вирусы, индексирование содержимого и т. п.[59]
Windows позволяет указать приоритет потока при выполнении запросов ввода- вывода. За дополнительной информацией обращайтесь к статье по адресу http://www. microsoft.com/whdc/driver/priorityio.mspx. К сожалению, данная функциональность еще не включена в FCL; надеюсь, она появится в следующей версии. Однако преимуществом данной функции уже можно воспользоваться при помощи механизма P/Invoking. Вот как выглядит такой код:
internal static class ThreadIO {
public static BackgroundProcessingDisposer BeginBackgroundProcessing( Boolean process = false) {
ChangeВасkgroundProcessing(process, true); return new BackgroundProcessingDisposer(process);
}
public static void EndBackgroundProcessing(Boolean process = false) { ChangeBackgroundProcessing(process, false);
}
private static void ChangeBackgroundProcessing(
Boolean process, Boolean start) {
Boolean ok =
process ? SetPriorityClass(GetCurrentWin32ProcessHandle(),
start ? ProcessBackgroundMode.Start : ProcessBackgroundMode.End)
: SetThreadPriority(GetCurrentWin32ThreadHandle(),
start ? ThreadBackgroundgMode.Start : ThreadBackgroundgMode.End); if (!ok) throw new Win32Exception();
}
// Эта структура позволяет инструкции using выйти // из режима фоновой обработки
public struct BackgroundProcessingDisposer : IDisposable { private readonly Boolean m_process; public BackgroundProcessingDisposer(
Boolean process) { mprocess = process; } public void DisposeQ { EndBackgroundProcessing(m_process); }
}
// См. И1п32-функции THREADMODEBACKGROUNDBEGIN // и THREADMODEBACKGROUNDEND
private enum ThreadBackgroundgMode { Start = 0x10000, End = 0x20000 }
// См. 1л11п32-функции PROCESSMODEBACKGROUNDBEGIN // и PROCESSMODEBACKGROUNDEND
private enum ProcessBackgroundMode { Start = 0x100000, End = 0x200000 }
[DllImport("Kernel32", EntryPoint = "GetCurrentProcess",
ExactSpelling = true)]
private static extern SafeWaitHandle GetCurrentWin32ProcessHandle();
[DllImport("Kernel32", ExactSpelling = true, SetLastError = true)] [return: MarshalAs(UnmanagedType.Bool)] private static extern Boolean SetPriorityClass(
SafeWaitHandle hprocess, ProcessBackgroundMode mode);
[DllImport(
"Kernel32", EntryPoint = "GetCurrentThread", ExactSpelling = true)] private static extern SafeWaitHandle GetCurrentWin32ThreadHandle();
[DllImport("Kernel32", ExactSpelling = true, SetLastError = true)]
[return: MarshalAs(UnmanagedType.Bool)]
private static extern Boolean SetThreadPriority(
Safel/daitHandle hthread, ThreadBackgroundgMode mode);
// http://msdn.microsoft.com/en-us/library/aa480216.aspx [DllImport(
"Kernel32", SetLastError = true, EntryPoint = "CancelSynchronousIo")]
[return: MarshalAs(UnmanagedType.Bool)]
private static extern Boolean CancelSynchronousIO(Safel/\laitHandle hThread);
}
Следующий код показывает, как все это использовать:
public static void Main () {
using (ThreadlO.BeginBackgroundProcessingQ) {
// Здесь располагается низкоприоритетный запрос ввода-вывода // (например, вызов BeginRead/BeginWrite)
}
}
Вы объясняете Windows, что поток должен выполнять низкоприоритетные запросы ввода-вывода при помощи метода BeginBackgroundProcessing класса ThreadlO. Обратите внимание, что при этом также снижается приоритет выполнения потока процессором. Вернуть поток к выполнению запросов ввода-вывода обычной важности (и к обычному приоритету выполнения потока процессором) можно методом EndBackgroundProcessing или же вызвав метод Dispose для значения, возвращенного методом BeginBackgroundProcessing (при помощи инструкции using языка С#, как показано в примере). Поток может влиять только на собственный режим фоновой обработки; Windows не позволяет одному потоку менять режим фоновой обработки другого потока.
Чтобы заставить все потоки в процессе обрабатывать низкоприоритетные запросы ввода-вывода и снизить приоритет выполнения потоков процессором, можно вызвать метод BeginBackgroundProcessing, передав ему значение true для параметра process. Процесс может воздействовать только на собственный фоновый режим обработки; Windows не позволяет потоку менять фоновый режим обработки другого процесса.
ВНИМАНИЕ
В сферу ответственности разработчика входит применение новых фоновых приоритетов, обеспечивающих более высокое быстродействие активных приложений, и принятие мер для устранения инверсии приоритетов. При наличии активных операций ввода-вывода с обычным приоритетом поток, работающий в фоновом режиме, может получать результаты запросов ввода-вывода с задержкой в несколько секунд. Если поток с низким приоритетом в рамках синхронизации потоков получит право на блокировку, ожидаемое потоком с обычным приоритетом, последний может оказаться в ожидании окончания обработки низкоприоритетных запросов ввода- вывода. Для возникновения проблемы потоку с фоновым приоритетом не придется даже отправлять запросы на ввод-вывод. Поэтому следует свести к минимуму совместное использование объектов синхронизации потоками с обычным и фоновым приоритетами (а лучше вообще этого избегать). Это избавит вас от инверсии приоритетов, при которой потоки с обычным приоритетом перестают выполняться из-за блокировок, установленных потоками с фоновым приоритетом.
Глава 29. Примитивные конструкции синхронизации потоков
Когда поток из пула блокируется, пул порождает дополнительные потоки исполнения; при этом приходится тратить соответствующие ресурсы (времени и памяти) на создание, планирование и удаление потоков. Многие разработчики, обнаружив в программе простаивающие потоки, считают, что дополнительные потоки уж точно будут делать что-нибудь полезное. Однако при разработке масштабируемого и быстродействующего приложения нужно стараться избегать блокировки потоков, только в этом случае их можно будет снова и снова использовать для решения других задач. В главе 27 мы говорили о том, как потоки выполняют вычислительные операции, в то время как глава 28 была посвящена выполнению потоками операций ввода-вывода.
Теперь пришло время обсудить вопросы синхронизации потоков. Синхронизация позволяет предотвратить повреждение общих данных при одновременном доступе к этим данным разных потоков. Слово «одновременно» выделено не зря, ведь синхронизация потоков целиком и полностью базируется на контроле времени. Если доступ к неким данным со стороны двух потоков осуществляется таким образом, что потоки никак не могут помешать друг другу, синхронизации не требуется. В главе 28 было показано, как разные секции асинхронной функции могут выполняться разными потоками. Теоретически возможно, что два потока будут работать с одними и теми же данными. Однако асинхронные функции реализованы таким образом, что два потока не будут одновременно работать с одними данными, поэтому при обращении кода к данным, содержащимся в асинхронной функции, синхронизация потоков не нужна.
Этот случай можно считать идеальным, так как синхронизация потоков влечет за собой много проблем. Во-первых, программировать код синхронизации крайне утомительно и при этом легко допустить ошибку. В коде следует выделить все данные, которые потенциально могут обрабатываться различными потоками в одно и то же время. Затем все эти данные заключаются в другой код, обеспечивающий их блокировку и разблокирование. Блокирование гарантирует, что доступ к ресурсу в каждый момент времени сможет получить только один поток. Однако достаточно при программировании забыть заблокировать хотя бы один фрагмент кода, и ваши данные будут повреждены. К тому же нет способа проверить, правильно ли работает блокирующий код. Остается только запустить приложение, провести многочисленные нагрузочные испытания и надеяться, что все пройдет благополучно. При этом тестирование желательно осуществлять на машине с максимально возможным количеством процессоров, так как это повышает шансы выявить ситуацию, когда два и более потока попытаются получить одновременный доступ к ресурсу — а значит, повысит шансы на выявление проблемы.
Второй проблемой блокирования является снижение производительности. Установление и снятие блокировки требуют времени, так как для этого вызываются дополнительные методы, причем процессоры должны координировать совместную работу, определяя, который из потоков нужно блокировать первым. Подобное взаимодействие процессоров не может не сказываться на производительности. К примеру, рассмотрим код, добавляющий узел в начало связанного списка:
// Этот класс используется классом LinkedList public class Node { internal Node m_next;
// Остальные члены не показаны
}
public sealed class LinkedList { private Node m_head;
public void Add(Node newNode) {
// Эти две строки реализуют быстрое присваивание ссылок newNode.m_next = mhead; m_head = newNode;
}
}
Метод Add просто очень быстро присваивает ссылки. И если мы хотим сделать вызов этого метода безопасным, дав возможность разным потокам одновременно вызывать его без риска повредить связанный список, следует добавить к методу Add код установления и снятия блокировки:
public sealed class LinkedList {
private SomeKindOfLock mlock = new SomeKindOfLockQ; private Node mhead;
public void Add(Node newNode) { mlock.Acquire();
// Эти две строки выполняют быстрое присваивание ссылок newNode.mnext = m_head; mhead = newNode; mlock.Release();
}
}
Теперь метод Add стал безопасным в отношении потоков, но скорость его выполнения серьезно упала. Снижение скорости работы зависит от вида выбранного механизма блокирования; сравнение производительности различных вариантов блокирования делается как в этой, так и в следующей главах. Но даже самое быстрое блокирование заставляет метод Add работать в несколько раз медленнее по сравнению с его версией без блокирования. И разумеется, вызов метода Add в цикле для вставки в связанный список дополнительных узлов также значительно снижает производительность.
Третья проблема состоит в том, что при блокировании в каждый момент времени допускается доступ к ресурсам только одного потока. Собственно, для этого и было придумано блокирование, но, к сожалению, подобное поведение приводит к созданию дополнительных потоков. То есть если поток пула пытается получить доступ к запертому ресурсу и не получает его, скорее всего, пул создаст еще один поток для сохранения загрузки процессора. Как обсуждалось в главе 26, создание потока обходится крайне дорого в смысле затрат памяти и снижения производительности. Но хуже всего то, что после разблокирования старый поток появляется в пуле вместе с новым; то есть операционной системе приходится планировать выполнение потоков, количество которых превышает количество процессоров, а значит, увеличивается частота переключений контекста, что, опять же, отрицательно сказывается на производительности.
Словом, синхронизация потоков имеет столько нежелательных последствий, что приложения следует проектировать так, чтобы она применялась как можно реже. Избегайте общих данных — например, статических полей. Когда поток конструирует новый объект оператором new, оператор возвращает ссылку на этот объект. Причем в этот момент ссылка имеется только у создающего объект потока, для других потоков он недоступен. Если не передавать эту ссылку другому потоку, который может использовать объект одновременно с потоком, создавшим объект, необходимость в синхронизации отпадает.
Старайтесь по возможности работать со значимыми типами, потому что они всегда копируются, и каждый поток в итоге работает с собственной копией. Ну и, наконец, нет ничего страшного в одновременном доступе разных потоков к общим данным, если эти данные предназначены только для чтения. К примеру, многие приложения в процессе инициализации создают структуры данных. После инициализации приложение может создать столько потоков, сколько считает необходимым; и если все эти потоки решат получить доступ к этим данным, они смогут сделать это одновременно, не прибегая ни к блокированию, ни к разблокированию. Примером такого поведения служит тип String. Созданные строки неизменны, поэтому одновременный доступ к ним можно предоставить произвольному количеству потоков, не опасаясь повреждения данных.
Библиотеки классов и безопасность потоков
А сейчас хотелось бы сказать несколько слов о библиотеках классов и синхронизации потоков. Библиотека FCL разработки Microsoft гарантирует безопасность в отношении потоков всех статических методов. Это означает, что одновременный вызов статического метода двумя потоками не приводит к повреждению данных. Механизм защиты реализован внутри FCL, поскольку нет способа обеспечить блокирование сборок различных производителей, спорящих за доступ к ресурсу.
Класс Console содержит статическое поле, по которому многие из его методов устанавливают и снимают блокировку, гарантируя, что в каждый момент времени доступ к консоли будет только у одного потока.
Кстати, создание метода, безопасного в отношении потоков, не означает, что внутренне он реализует блокирование в рамках синхронизации потоков. Просто этот метод предотвращает повреждение данных при попытке одновременного доступа к ним со стороны нескольких потоков. Класс System.Math обладает статическим методом Мах, который реализован следующим образом:
public static Int32 Max(Int32 vail, Int32 val2) { return (vail < val2) ? val2 : vail;
}
Этот метод безопасен в отношении потоков, хотя в нем нет никакого кода блокирования. Так как тип Int32 относится к значимым, два значения этого типа при передаче в переменную Мах копируются, а значит, разные потоки могут одновременно обращаться к данной переменной. При этом каждый поток будет работать с собственными данными, изолированными от всех прочих потоков.
В то же время FCL не гарантирует безопасности в отношении потоков экзем - плярным методам, так как введение в них блокирующего кода слишком сильно сказывается на производительности. Более того, если каждый экземплярный метод начнет выполнять блокирование и разблокирование, все закончится тем, что в приложении в каждый момент времени будет исполняться только один поток, что еще больше снизит производительность. Как уже упоминалось, поток, конструирующий объект, является единственным, кто имеет к нему доступ. Другим потокам данный объект недоступен, а значит, при вызове экземплярных методов синхронизация не требуется. Однако если потом поток предоставит ссылку на объект (поместив ее в статическое поле, передав ее в качестве аргумента состояния методу ThreadPool. QueueUserWorkltem или объекту Task и т. п.), то тут синхронизация уже понадобится, если разные потоки попытаются одновременно получить доступ к данным не только для чтения.
Собственные библиотеки классов рекомендуется строить по этому паттерну — то есть все статические методы следует сделать безопасными в отношении потоков, а экземплярные методы — нет. Впрочем, следует оговорить, что если целью экземплярного метода является координирование потоков, его тоже следует сделать безопасным в отношении потоков. К примеру, один поток может отменять операцию, вызывая метод Cancel класса CancellationTokenSource, а другой поток, делая запрос к соответствующему свойству IsCancellationRequested объекта CancellationToken, может обнаружить, что отмена на самом деле не нужна. Внутри этих экземплярных методов содержится специальный код синхронизации потоков, гарантирующий их скоординированную работу[60].
Примитивные конструкции пользовательского режима и режима ядра
В этой главе рассматриваются примитивные конструкции для синхронизации потоков. Под «примитивными» я подразумеваю простейшие конструкции, которые доступны в коде. Они бывают двух видов: пользовательского режима и режима ядра. По возможности нужно задействовать первые, так как они значительно быстрее вторых и используют для координации потоков специальные директивы процессора. То есть координация имеет место уже на аппаратном уровне (и именно это обеспечивает быстродействие). Однако одновременно это означает, что блокировка потоков на уровне примитивной конструкции пользовательского режима операционной системой Windows просто не распознается. А так как заблокированным таким способом поток пула не считается таковым, пул не создает дополнительных потоков для восполнения загрузки процессора. Кроме того, блокировка происходит на очень короткое время.
Звучит заманчиво, не правда ли? Более того, все действительно так, именно поэтому я рекомендую использовать эти конструкции как можно чаще. Впрочем, они не идеальны. Только ядро операционной системы Windows может остановить выполнение потока, чтобы он перестал впустую расходовать ресурсы процессора. Запущенный в пользовательском режиме поток может быть прерван операционной системой, но довольно быстро снова будет готов к работе. В итоге поток, который пытается, но не может получить некоторый ресурс, начинает циклически существовать в пользовательском режиме. Потенциально это является пустым расходованием времени процессора, которое лучше было бы потратить с пользой — или просто разрешить процессору простаивать для экономии энергии.
Это заставляет нас перейти к примитивным конструкциям режима ядра. Они предоставляются самой операционной системой Windows и требуют от потоков приложения вызова функций, реализованных в ядре. Переход потока между пользовательским режимом и режимом ядра требует значительных затрат ресурсов, поэтому конструкций режима ядра крайне желательно избегать[61]. Однако и у них есть свои достоинства. Если один поток использует конструкцию режима ядра для получения доступа к ресурсу, с которым уже работает другой поток, Windows блокирует его, чтобы не тратить понапрасну время процессора. А затем, когда ресурс становится доступным, блокировка снимается, и поток получает доступ к ресурсу.
Если поток, использующий в данный момент конструкцию, не освободит ее, ожидающий конструкции поток может оказаться заблокированным навсегда. В этом случае в пользовательском режиме поток бесконечно исполняется процессором; этот вариант блокировки называется активной (живой) блокировкой (livelock), или зависанием. В режиме ядра поток блокируется навсегда, этот тип блокировки называется взаимной (мертвой) блокировкой (deadlock). Обе ситуации по-своему плохи, но если выбирать из двух зол, второй вариант видится более предпочтительным, потому что в первом случае; впустую расходуются как время процессора, так и память (стек потока и т. и.), а во втором случае; расходуется только память1.
В идеальном мире у нас были бы конструкции, сочетающие лучшие особенности обоих типов: быстро работающие и не блокирующиеся (как конструкции пользовательского режима) в условиях отсутствия конкуренции. А если конструкции начинали бы соперничать друг другом, их блокировало бы ядро операционной системы. Описанные конструкции даже существуют в природе, я называю их гибридными (hybrid constructs), и мы рассмотрим их в следующей главе. Именно гибридные конструкции обычно используются в приложениях, так как в большинстве приложений несколько потоков крайне редко пытаются одновременно получить доступ к одним и тем же данным. Гибридные конструкции основное время поддерживают быструю работу приложения и периодически замедляются, блокируя поток. Однако это замедление в тот момент не имеет особого значения, потому что поток все равно будет заблокирован.
Многие из конструкций синхронизации потоков в CLR являются всего лишь объектно-ориентированными оболочками классов, построенных на базе конструкций синхронизации потоков Win32. В конце концов, CLR-потоки являются потоками операционной системы Windows, которая планирует и контролирует их синхронизацию. Конструкции синхронизации существуют с 1992 года и о них написано множество книг[62] [63], поэтому в этой главе я ограничусь лишь кратким обзором.
Конструкции пользовательского режима
CLR гарантирует атомарность чтения и записи следующих типов данных: Boolean, Char, (S)Byte, (U)Intl6, (U)Int32, (U)IntPtr, Single и ссылочных типов. Это означает, что все байты переменной читаются или записываются одновременно. Предположим, имеется следующий класс:
internal static class SomeType { public static Int32 x = 0;
}
Если какой-то поток выполняет следующую строку кода, то переменная х сразу (атомарно) изменяется с 0x00000000 до 0x01234567:
SomeType.х = 0x01234567;
При этом посторонние потоки не увидят переменную в промежуточном состоянии. К примеру, другой поток не может запросить свойство SomeType.x и получить значение 0x01230000. Предположим, что поле х класса SomeType относится к типу Int64. Если поток выполнит следующую строку кода, то другой поток сможет сделать запрос переменной х и получить значение 0x0123456700000000 или 0x0000000089abcdef, так как операции чтения и записи не являются атомарными:
SomeType.x = 0x0123456789abcdef;
Это пример так называемого прерванного чтения (torn read).
Хотя атомарный доступ к переменной гарантирует, что чтение и запись осуществляются одновременно, из-за работы компилятора и оптимизации процессора вы не можете знать, когда именно произойдет чтение или запись. Примитивные конструкции пользовательского режима, рассмотренные в этом разделе, управляют временем выполнения этих атомарных операций. Кроме того, они обеспечивают атомарность и управление временем выполнения для переменных типов данных U(Int64)и Double.
Примитивные конструкций синхронизации потоков пользовательского режима делятся на два типа:
□ Volatile-конструкции выполняют для переменной, содержащей данные простого типа, атомарную операцию чтения или записи.
□ Interlocked-конструкции выполняют для переменной, содержащей данные простого типа, атомарную операцию чтения и записи.
Конструкции обоих типов требуют передачи ссылки (адреса в памяти) на переменную, принадлежащую к простому типу данных.
Volatile-конструкции
Когда компьютеры только появились, программное обеспечение писалось на ассемблере. Это было крайне утомительное занятие, так как программист должен был формулировать все в явном виде: использовать определенный регистр процессора, передать управление, осуществить косвенный вызов и т. п. Для упрощения задачи были придуманы языки высокого уровня. Именно в них впервые были реализованы привычные конструкции if/else, switch/case, циклы, локальные переменные, аргументы, вызовы виртуальных методов, перегрузка операторов и многое другое. В конечном итоге компилятор преобразует конструкции высокого уровня в низкоуровневые, позволяющие компьютеру понять, что именно ему следует делать.
Другими словами, компилятор C# преобразует конструкции языка C# в команды промежуточного языка (Intermediate Language, IL), которые, в свою очередь, JIT-компилятор превращает в машинные директивы, обрабатываемые уже непосредственно процессором. При этом компилятор С#, JIT-компилятор и даже сам процессор могут оптимизировать ваш код. К примеру, после компиляции следующий нелепый метод в конечном итоге превратится в ничто:
private static void OptimizedAway() {
// Вычисленная во время компиляции константа равна нулю Int32 value = (1 * 100) - (50 * 2);
// Если значение равно 0, цикл не выполняется for (Int32 х = 0; х < value; х++) {
// Код цикла не нужно компилировать, так как он никогда не выполняется Console.WriteLine("3eff");
}
}
В этом коде компилятор выясняет, что значение всегда равно 0, а значит, цикл никогда не будет выполнен и соответственно нет никакой нужды компилировать код внутри него. От метода в итоге может ничего не остаться. Далее, при компиляции метода, вызывающего метод OptimizedAway, JIT-компилятор попытается встроить туда код метода OptimizedAway, но так как этот код отсутствует, компилятор даже исключит сам код вызова метода. Разработчики очень любят это свойство компиляторов. Обычно код пытаются писать максимально осмысленно; он должен быть простым для чтения, записи и редактирования. А затем компилятор переводит его на язык, понятный компьютеру. И мы хотим, чтобы компиляторы делали это максимально хорошо.
В процессе оптимизации кода компилятором С#, JIT-компилятором и процессором гарантируется сохранение его назначения. То есть с точки зрения одного потока метод делает то, зачем мы его написали, хотя способ реализации может отличаться от описанного в исходном коде. Однако при переходе к многопоточной конфигурации ситуация может измениться. Вот пример, в котором в результате оптимизации программа стала работать не так, как ожидалось:
internal static class StrangeBehavior {
// Далее вы увидите, что проблема решается объявлением этого поля volatile
private static Boolean sstopWorker = false;
public static void Main() {
Console.WriteLine("Main: letting worker run for 5 seconds");
Thread t = new Thread(Worker);
t.Start();
Thread.Sleep(5000);
sstopWorker = true;
Console.WriteLine("Main: waiting for worker to stop");
t. 3oin();
}
private static void Worker(Object o) {
Int32 x = 0;
while (!s_stopWorker) x++;
Console.WriteLine("Worker: stopped when x={0}“, x);
}
Метод Main в этом фрагменте кода создает новый поток, исполняющий метод Worker, который считает по возрастающей, пока не получит команду остановиться. Метод Main позволяет потоку метода Worker работать 5 секунд, а затем останавливает его, присваивая статическому полю Boolean значение true. В этот момент поток метода Worker должен вывести результат счета, после чего он завершится. Метод Main ждет завершения метода Worker, вызывает метод loin, после чего поток метода Main возвращает управление, заставляя весь процесс прекратить работу.
Выглядит просто, не так ли? Но программа скрывает потенциальные проблемы из-за возможных оптимизаций. При компиляции метода Worker компилятор обнаруживает, что переменная s_stopWorker может принимать значение true или false, но внутри метода это значение никогда не меняется. Поэтому компилятор может создать код, заранее проверяющий состояние переменной s_stopWorker. Если она имеет значение true, выводится результат "Worker: stopped when х=0". В противном случае компилятор создает код, входящий в бесконечный цикл и бесконечно увеличивающий значение переменной х. При этом оптимизация заставляет цикл работать крайне быстро, так как проверка переменной s_stopWorker осуществляется перед циклом, а проверки переменной на каждой итерации цикла не происходит.
Если вы хотите посмотреть, как это работает, поместите код в файл с расширением .cs и скомпилируйте его с ключами platform :х8б и /optimize+ компилятора С#. Запустите полученный исполняемый файл и вы убедитесь, что программа работает бесконечно. Обратите внимание, что вам нужен JIT-компилятор для платформы х86, который совершеннее компиляторов х64 и IA64, а значит, обеспечивает более полную оптимизацию. Остальные JIT-компиляторы не выполняют оптимизацию столь тщательно, поэтому после их работы программа успешно завершится. Это подчеркивает еще один интересный аспект: итоговое поведение вашей программы зависит от множества факторов, в частности от выбранной версии компилятора и используемых ключей, от выбранного JIT-компилятора, от процессора, который будет выполнять код. Кроме того, программа не станет работать бесконечно, если запустить ее в отладчике, так как отладчик заставляет JIT-компилятор ограничиться неопти- мизированным кодом, который проще поддается выполнению в пошаговом режиме.
Рассмотрим другой пример, в котором пара потоков осуществляет доступ к двум полям:
internal sealed class ThreadsSharingData { private Int32 mflag = 0; private Int32 m_value = 0;
// Этот метод исполняется одним потоком public void ThreadlQ {
// ПРИМЕЧАНИЕ. Они могут выполняться в обратном порядке m_value = 5; m_flag = 1;
} // Этот метод исполняется другим потоком public void Thread2() {
// ПРИМЕЧАНИЕ. Поле mvalue может быть прочитано раньше, чем mflag
if (m_flag == 1)
Console.WriteLine(mvalue);
>
>
В данном случае проблема в том, что компиляторы и процессор могут оттранслировать код таким образом, что две строки в методе Threadl поменяются местами. Разумеется, это не изменит предназначения метода. Метод должен получить значение 5 в переменной m_value и значение 1 в переменной m_f lag. С точки зрения однопоточного приложения порядок выполнения строк кода не имеет значения. Если же поменять указанные строки местами, другой поток, выполняющий метод Thread2, может обнаружить, что переменная m_flag имеет значение 1, и выведет значение 0.
Рассмотрим этот код с другой точки зрения. Предположим, что код метода Threadl выполняется так, как предусмотрено программой (то есть так, как он написан). Обрабатывая код метода Thread2, компилятор должен сгенерировать код, читающий значения переменных m_flag и m_value из оперативной памяти в регистры процессора. И возможно, что память первой выдаст значение переменной m_value, равное 0. Затем может выполниться метод Threadl, меняющий значение переменной m_value на 5, а переменной m_flag — на 1. Но регистр процессора метода Thread2 не видит, что значение переменной m_value было изменено другим потоком на 5. После этого из оперативной памяти в регистре процессора может быть считано значение переменной m_flag, ставшее равным 1. В результате метод Thread2 снова выведет значение 0.
Все эти крайне неприятные события, скорее всего, приведут к проблемам в окончательной, а не в отладочной версии программы. В результате задача выявления проблемы и исправления кода становится нетривиальной. Поэтому сейчас давайте поговорим о том, как исправить код.
Класс System .Threading. Volatile содержит два статических метода, которые выглядят следующим образом[64]:
public sealed class Volatile {
public static void Write(ref Int32 location, Int32 value); public static Int32 Read(ref Int32 location);
>
Это специальные методы, отключающие оптимизации, обычно выполняемые компилятором С#, JIT-компилятором и собственно процессором. Вот как они работают:
□ Метод Volatile .Write заставляет записать значение в параметр location непосредственно в момент обращения. Более ранние загрузки и сохранения программы должны происходить до вызова этого метода.
□ Метод Volatile. Read заставляет считать значение параметра address непосредственно в момент обращения. Более поздние загрузки и сохранения программы должны происходить после вызова этого метода.
ВНИМАНИЕ
Чтобы не запутаться, приведу простое правило: при взаимодействии потоков друг с другом через общую память записывайте последнее значение методом Volatile. Write, а первое значение читайте методом Volatile.Read.
Теперь при помощи указанных методов можно исправить класс ThreadsSharing- Data:
internal sealed class ThreadsSharingData { private Int32 mflag = 0; private Int32 m_value = 0;
11 Этот метод выполняется одним потоком public void ThreadlQ {
// ПРИМЕЧАНИЕ. 5 нужно записать в mvalue до записи 1 в m_flag m_value = 5;
Volatile.Write(ref m_flag, 1);
}
// Этот метод выполняется вторым потоком public void Thread2() {
// ПРИМЕЧАНИЕ. Поле m_value должно быть прочитано после mflag if (Volatile.Read(ref m_flag) == 1)
Console.Write Line(m_value);
}
}
Обращаю ваше внимание на четкое соблюдение правил. Метод Threadl записывает два значения в поля, к которым имеют доступ несколько потоков. Последнее значение, которое мы хотим записать (присвоение переменной m_flag значения 1), записывается методом Volatile. Write. Метод Thread2 читает оба значения из полей общего доступа, причем чтение первого из них (m_flag) выполняется методом Volatile.Read.
Но что здесь происходит на самом деле? Для метода Threadl вызов метода Volatile. Write гарантирует, что все записи в переменные будут завершены до записи значения 1 в переменную m_flag. Так как операция m_value = 5 расположена до вызова метода Volatile.Write, она сначала должна завершиться. Более того, значения скольких бы переменных ни редактировались до вызова метода Volatile. Write, все эти операции следует завершить до записи значения 1 в переменную
m_flag. При этом все эти операции можно оптимизировать, выполняя их в любом порядке; главное, чтобы все они закончились до вызова метода Volatile .Write.
Вызов метода Volatile. Read для метода Thread2 гарантирует, что значения всех переменных будут прочитаны после значения переменной m_flag. Так как чтение переменной m_value происходит после вызова метода Volatile. Read, оно должно осуществляться только после чтения значения переменной ш_ .flag. То же самое касается чтения всех остальных переменных, расположенных после вызова метода Volatile. Read. При этом операции чтения после метода Volatile. Read можно оптимизировать, выполняя их в любом порядке; просто чтение станет невозможным до вызова метода Volatile. Read.
Поддержка полей Volatile в C#
Как гарантировать, что программисты будут корректно вызывать методы Volatile. Read и Volatile.Write? Сложно продумать все, в частности представить, что именно могут делать с общими данными другие потоки в фоновом режиме. Для упрощения ситуации в C# было введено ключевое слово volatile, применяемое к статическим или экземплярным полям типов Boolean, (S)Byte, (U)Intl6, (U)Int32, (U)IntPtr, Single и Char. Также оно применяется к ссылочным типам и любым перечислимым полям, если в основе последних лежит тип (S)Byte, (U)Intl6 или (U)Int32. JIT-компилятор гарантирует, что доступ к полям, помеченным данным ключевым словом, будет происходить в режиме волатильного чтения или записи, поэтому в явном виде вызывать статические методы Read и Write класса Volatile больше не требуется. Более того, ключевое слово volatile запрещает компилятору C# и JIT-компилятору кэшировать содержимое поля в регистры процессора. Это гарантирует, что при всех операциях чтения и записи манипуляции будут производиться непосредственно с памятью.
Ключевое слово volatile позволяет переписать класс ThreadsSharingData следующим образом:
internal sealed class ThreadsSharingData { private volatile Int32 m_flag = в; private Int32 m_value = в;
II Этот метод исполняется одним потоком public void ThreadlQ {
// ПРИМЕЧАНИЕ. Значение 5 должно быть записано в mvalue // перед записью 1 в m_flag m_value = 5; m_flag = 1;
}
// Этот метод исполняется другим потоком public void Thread2() {
// ПРИМЕЧАНИЕ. Поле m_value должно быть прочитано после m_flag if (mflag == 1)
Console.Write Line(m_value);
}
Некоторые разработчики (в том числе я) не любят ключевое слово volatile и считают, что не стоило вводить его в С#[65]. Мы считаем, что большинству алгоритмов не нужен одновременный доступ на чтение и запись поля со стороны нескольких потоков. А в большинстве оставшихся алгоритмов можно ограничиться обычным доступом к полю, повышающему производительность. А доступ к полю, помеченному как volatile, требуется крайне редко. К примеру, сложно объяснить, как применить операцию волатильного чтения к такому вот алгоритму:
mamount = m_amount + mamount; // Предполагается, что поле mamount
// определено как volatile
Обычно целое число может быть удвоено простым сдвигом всех битов на единицу влево, и многие компиляторы могут обработать такой код и выполнить указанную оптимизацию. Но если пометить поле m_amount ключевым словом volatile, оптимизация станет невозможной. Компилятору придется создать код, читающий переменную m_amount из регистра, затем читающий ее еще раз из другого регистра, складывающий два значения и записывающий результат обратно в поле m_amount. Неоптимизированный код определенно занимает больше места и медленней работает; вряд ли было бы уместно помещать такой кода внутрь цикла.
К тому же C# не поддерживает передачу волатильного поля по ссылке в метод. К примеру, если принадлежащее типу Int32 волатильное поле m_amount попытается вызвать метод Int32.TryParse, компилятор сгенерирует предупреждение:
Boolean success = Int32.TryParse(n123", out m_amount);
// Эта строка приводит к сообщению от компилятора:
// CS0420: ссылка на волатильное поле не будет трактоваться как волатильная
Наконец, поля volatile несовместимы со спецификацией CLS, потому что они
не поддерживаются многими языками (включая Visual Basic).
Interlocked-конструкции
Как мы выяснили, метод Read класса Volatile выполняет атомарную операцию чтения, а метод Write этого же класса осуществляет атомарную операцию записи. То есть каждый метод выполняет либо атомарное чтение, либо атомарную запись. В этом разделе мы поговорим о статических методах класса System. Threading. Interlocked. Каждый из этих методов выполняет как атомарное чтение, так и атомарную запись. Кроме того, все методы класса Interlocked ставят барьер в памяти, то есть любая запись переменной перед вызовом метода класса Interlocked выполняется до этого метода, а все чтения переменных после вызова метода выполняются после него.
Статические методы, работающие с переменными типа Int32, безоговорочно относятся к наиболее часто используемым. Продемонстрируем их:
public static class Interlocked {
// Возвращает (++location)
public static Int32 Increment(ref Int32 location);
// Возвращает (--location)
public static Int32 Decrement(ref Int32 location);
// Возвращает (location += value)
// ПРИМЕЧАНИЕ. Значение может быть отрицательным,
// что позволяет выполнить вычитание
public static Int32 Add(ref Int32 location, Int32 value);
// Int32 old = location; location = value; возвращает old; public static Int32 Exchange(ref Int32 location, Int32 value);
// Int32 old = locationl;
// если (locationl == comparand) location = value;
// возвращает old;
public static Int32 CompareExchange(ref Int32 location,
Int32 value, Int32 comparand);
}
Существуют и перегруженные версии этих методов, работающие со значениями типа Int64. Кроме того, в классе Interlocked существуют методы Exchange и CompareExchange, принимающие параметры Object, IntPtr, Single и Double. Есть и обобщенная версия, в которой обобщенный тип ограничен типом class
(любой ссылочный тип).
Лично мне очень нравятся Interlocked-методы, потому что они работают относительно быстро и позволяют многого добиться. Давайте рассмотрим код, использующий данные методы для асинхронного запроса данных с различных веб-серверов. Это короткий код, не блокирующий никаких потоков, автоматически масштабируемый с использованием потоков пула и использующий все доступные процессоры, если их загрузка может пойти ему на пользу. Кроме того, количество серверов, на которые код в исходном виде поддерживает доступ, достигает 2 147 483 647 (Int32.MaxValue). Другими словами, это превосходная основа для написания собственных сценариев.
internal sealed class MultiWebRequests {
// Этот класс Helper координирует все асинхронные операции private AsyncCoordinator m ac = new AsyncCoordinatorQ;
// Набор веб-серверов, к которым будут посылаться запросы // Хотя к этому словарю возможны одновременные обращения,
// в синхронизации доступа нет необходимости, потому что // ключи после создания доступны только для чтения
private DictionarycString, Object» mservers = new DictionarycString, Object» {
{ "http://Wintellect.com/", null },
{ "http://Microsoft.com/", null },
{ "http://l.1.1.1/", null }
};
public MultiWebRequests(Int32 timeout = Timeout.Infinite) {
продолжение &
// Асинхронное инициирование всех запросов var httpClient = new HttpClient(); foreach (var server in m_servers.Keys) { mac.AboutToBegin(l); httpClient.GetByteArrayAsync(server)
•ContinueWith(task => ComputeResult(server, task));
}
// Сообщаем AsyncCoordinator, что все операции были инициированы // и что он должен вызвать AllDone после завершения всех операций,
// вызова Cancel или тайм-аута m_ac.AllBegun(AllDone, timeout);
}
private void ComputeResult(String server, Task<Byte[]> task) {
Object result;
if (task.Exception != null) {
result = task.Exception.InnerException;
} else {
// Обработка завершения ввода-вывода - здесь или в потоке(-ах) пула // Разместите свой вычислительный алгоритм... result = task.Result.Length; // В данном примере } // просто возвращается длина
// Сохранение результата (исключение/сумма)
// и обозначение одной завершенной операции m_servers[server] = result; mac.lustEnded();
}
// При вызове этого метода результаты игнорируются public void Cancel() { m_ac.Cancel(); }
// Этот метод вызывается после получения ответа от всех веб-серверов,
// вызова Cancel или тайм-аута
private void AllDone(CoordinationStatus status) { switch (status) {
case CoordinationStatus.Cancel:
Console.WriteLine("Operation canceled."); break;
case CoordinationStatus.Timeout:
Console.WriteLine("Operation timedout."); break;
case CoordinationStatus.AllDone:
Console.WriteLine("Operation completed; results below:"); foreach (var server in m_servers) {
Console.Write("{0} ", server.Key);
Object result = server.Value; if (result is Exception) {
Console.WriteLine("failed due to {0}.", result.GetType().Name) } else {
Console. WriteLine("returned {0:N0} bytes. ", result);
}
}
break;
}
}
}
Этот код непосредственно не задействует Interlocked-методы, так как весь координирующий код инкапсулирован в класс AsyncCoordinator. Я подробнее опишу его ниже, а пока расскажу, что этот класс делает. В процессе конструирования класс MultiWebRequest инициализирует класс AsyncCoordinator и словарь с набором URI серверов (и их будущих результатов). Затем он асинхронно выполняет все веб-запросы. Сначала вызывается метод AboutToBegin класса AsyncCoordinator, которому передается количество запланированных запросов[66]. Затем происходит инициирование запроса вызовом метода GetByteArrayAsync класса HttpClient. Метод возвращает объект Task, для которого я вызываю ContinueWith, чтобы при получении ответа сервера полученные байты параллельно обрабатывались методом ComputeResult во многих потоках пула. После завершения всех запросов к вебсерверам вызывается метод AllBegun класса AsyncCoordinator, которому передается имя метода, который следует запустить после выполнения всех операций (AllDone), а во-вторых, продолжительность тайм-аута. После ответа каждого сервера различные потоки пула будут вызывать метод ComputeResult класса MultiWebRequests. Этот метод обрабатывает байты, возвращенные сервером (или любые ошибки), и сохраняет результат в словаре. После сохранения каждого результата вызывается метод lustEnded класса AsyncCoordinator, позволяющий объекту AsyncCoordinator узнать о завершении операции.
После завершения всех операций объект AsyncCoordinator вызывает метод AllDone для обработки результатов, полученных со всех веб-серверов. Этот метод будет выполняться потоком пула, последним получившим ответ с сервера. В случае завершения времени ожидания или отмены операции метод AllDone будет вызван либо потоком пула, уведомляющим объект AsyncCoordinator о том, что время закончилось, либо потоком, вызвавшим метод Cancel. Существует также вероятность, что поток, выполняющий запрос к серверу, сам вызовет метод AllDone, если последний запрос завершится до вызова метода AllBegin.
Имейте в виду, что в данном случае; имеет место ситуация гонки, так как возможно одновременное завершение всех запросов к серверам, вызов метода AllBegun, завершение времени ожидания и вызов метода Cancel. Если такое произойдет, объект AsyncCoordinator выберет победителя, гарантируя, что метод AllDone будет вызван не более одного раза. Победитель указывается передачей в метод AllDone аргумента состояния, роль которого может играть одно из символических имен, определенных в типе CoordinationStatus:
internal enum CoordinationStatus { AllDone, Timeout, Cancel };
Теперь, когда вы получили представление о том, что происходит, посмотрим, как это работает. Класс AsyncCoordinator содержит всю логику координации потоков. Во всех случаях он использует методы Interlocked, гарантируя быстрое выполнение кода и отсутствие блокировки потоков. Вот код этого класса:
internal sealed class AsyncCoordinator {
private Int32 mopCount = 1; // Уменьшается на 1 методом AllBegun
private Int32 mstatusReported = 0; // 0=false, l=true private Action<CoordinationStatus> mcallback; private Timer mtimer;
// Этот метод ДОЛЖЕН быть вызван ДО инициирования операции public void AboutToBegin(Int32 opsToAdd = 1) {
Interlocked.Add(ref m_opCount, opsToAdd);
>
// Этот метод ДОЛЖЕН быть вызван ПОСЛЕ обработки результата public void 3ustEnded() {
if (Interlocked.Decrement(ref m_opCount) == 0)
ReportStatus(CoordinationStatus.AllDone);
>
// Этот метод ДОЛЖЕН быть вызван ПОСЛЕ инициирования ВСЕХ операций public void AllBegun(Action<CoordinationStatus> callback,
Int32 timeout = Timeout.Infinite) {
mcallback = callback; if (timeout != Timeout.Infinite)
mtimer = new Timer(TimeExpired, null, timeout, Timeout.Infinite);
3ustEnded();
}
private void TimeExpired(Object o) {
ReportStatus(CoordinationStatus.Timeout);
}
public void CancelQ { ReportStatus(CoordinationStatus.Cancel); }
private void ReportStatus(CoordinationStatus status) {
// Если состояние ни разу не передавалось, передать его;
// в противном случае оно игнорируется
if (Interlocked.Exchange(ref m_statusReported, 1) == 0)
m_callback(status);
}
Самым важным в этом классе является поле m_opCount. В нем отслеживается количество асинхронных операций, ожидающих выполнения. Перед началом каждой такой операции вызывается метод AboutToBegin. Он вызывает метод Interlocked. Add, чтобы атомарно добавить к полю m_opCount переданное в него число. Операция суммирования должна осуществляться атомарно, так как веб-серверы могут отвечать потокам пула в процессе начала дополнительных операций. При каждом ответе
сервера вызывается метод lustEnded. Он вызывает метод Interlocked .Decrement
и атомарно вычитает из переменной m_opCount единицу. Поток, присвоивший переменной m_opCount значение 0, вызывает метод ReportStatus.
ПРИМЕЧАНИЕ
Полю m_opCount присваивается начальное значение 1 (не 0); это крайне важно, так как гарантирует, что метод AIIDone не будет вызван во время запроса к серверу потоком, исполняющим метод конструктора. До вызова конструктором методаAIIBegun переменная m_opCount не может получить значение 0. Вызванный же конструктором методАНВедип, в свою очередь, вызывает метод JustEnded, который последовательно уменьшает значение переменной m_opCount на 1 и фактически отменяет эффект присвоения ей начального значения 1. В результате переменная m_opCount может достичь значения 0, но только после того как мы получим информацию об отправке всех запросов к веб-серверам.
Метод ReportStatus выполняет функцию арбитра в гонке, которая может возникнуть между завершающимися операциями, истечением времени ожидания и вызовом метода Cancel. Он должен убедиться, что только одно из условий рассматривается в качестве победителя, и метод m_callback будет вызван всего один раз. Арбитраж осуществляется передачей методу Interlocked. Exchange ссылки на поле m_statusReported. Это поле рассматривается как переменная типа Boolean; но на самом деле подобное невозможно, так как методы класса Interlocked не принимают переменных типа Boolean. Поэтому мы используем переменную типа Int32, значение 0 которой является эквивалентом false, а значение 1 — эквивалентом true.
Внутри метода ReportStatus вызов метода Interlocked. Exchange меняет значение переменной m_statusReported на 1. Но только первый проделавший это поток увидит, как метод Interlocked. Exchange возвращает значение 0, и только он активизирует метод обратного вызова. Все остальные потоки, вызвавшие метод Interlocked. Exchange, получат значение 1, по сути, уведомляющее их, что метод обратного вызова уже активизирован и больше этого делать не нужно.
Реализация простой циклической блокировки
Interlocked-методы прекрасно работают, но в основном со значениями типа Int32. А что делать, если возникла необходимость атомарного манипулирования набором полей объекта? Нам потребуется предотвратить проникновение всех потоков кроме одного в область кода, управляющую полями. Interlocked-методы позволяют выполнить блокирование в рамках синхронизации потоков:
internal struct SimpleSpinLock {
private Int32 m Resourcelnllsej // 0=false (по умолчанию), l=true
public void EnterQ { while (true) {
// Всегда указывать, что ресурс используется.
// Если поток переводит его из свободного состояния;
// вернуть управление
if (Interlocked.Exchange(ref m_ResourceInUse, 1) == 0) return; // Здесь что-то происходит...
}
}
public void LeaveQ {
// Помечаем ресурс, как свободный Volatile.Write(ref m_ResourceInUse, в);
}
А вот класс, демонстрирующий использование метода SimpleSpinLock:
public sealed class SomeResource {
private SimpleSpinLock m_sl = new SimpleSpinLockQ;
public void AccessResourceQ {
m_sl. EnterQ;
// Доступ к ресурсу в каждый момент времени имеет только один поток...
m_sl. LeaveQ;
}
}
Реализация метода SimpleSpinLock очень проста. Если два потока одновременно вызывают метод Enter, метод Interlocked. Exchange гарантирует, что один поток изменит значение переменной m_resourceInUse с 0 на 1. Когда он видит, что переменная m_resourceInUse равна 0, он заставляет метод Enter возвратить управление, чтобы продолжить выполнение кода метода AccessResource. Второй поток тоже попытается заменить значение переменной m_resourceInUse на 1. Но этот поток увидит, что переменная уже не равна 0, и зациклится, начав непрерывно вызывать метод Exchange до тех пор, пока первый поток не вызовет метод Leave.
После того как первый поток завершит манипуляции полями объекта SomeResource, он вызовет метод Leave, который, в свою очередь, вызовет метод Volatile.Write и вернет переменной m_resourceInUse значение 0. Это заставит зациклившийся поток поменять значение переменной m_resourceInUse с 0 на 1 и, наконец, получить управление от метода Enter, предоставляя последнему доступ к полям объекта SomeResource.
Вот и все. Это очень простая реализация блокирования в рамках синхронизации потоков. Правда, ее серьезный потенциальный недостаток состоит в том, что при наличии конкуренции за право на блокирование потоки вынуждены ожидать блокирования в цикле, и это зацикливание приводит к пустому расходованию бесценного процессорного времени. Соответственно, блокирование с зацикливанием имеет смысл использовать только для защиты очень быстро выполняемых областей кода.
Блокирование с зацикливанием обычно не применяется на машинах с одним процессором, так как зацикливание другого потока-претендента помешает быстрому снятию блокировки. Ситуация осложняется, если поток, удерживающий блокировку, имеет более низкий приоритет, чем поток, претендующий на ее получение. Низкоприоритетный поток может вообще не получить шансов на выполнение, то есть просто зависнуть. Windows иногда на короткое время динамически повышает приоритет потоков. Для потоков, использующих блокирование с зацикливанием, данный режим следует отключить. Это делается при помощи свойств PriorityBoostEnabled классов System.Diagnostics.Process и System. Diagnostics. ProcessThread. Блокирование с зацикливанием на гиперпотоковых машинах также связано с проблемами. Для их решения код блокирования с зацикливанием часто наделяется дополнительной логикой. Однако я не хотел бы вдаваться в детали, так как эта логика довольно быстро меняется. Могу сказать только, что FCL поставляется вместе со структурой System.Threading.SpinWait, которая заключает в себя всю необходимую логику.
Задержка в обработке потока
Хитрость состоит в том, чтобы иметь поток, умеющий заставить ресурс на время приостановить исполнение этого потока, чтобы другой поток, обладающий в данный момент ресурсом, завершился и освободил место. Для этого структура SpinWait вызывает методы Sleep, Yield и SpinWait класса Thread. Коротко опишем данные методы.
Поток может сообщить системе, что в течение некоторого времени его не нужно планировать на исполнение. Эта задача решается статическим методом Sleep:
public static void Sleep(Int32 millisecondsTimeout); public static void Sleep(TimeSpan timeout);
Поток заставляет метод приостановить работу на указанное время. Вызов метода Sleep позволяет потоку добровольно убрать напоминание о времени своего исполнения. Система забирает поток у планировщика примерно на указанное время. То есть если вы говорите системе, что метод хочет приостановить работу на 100 мс, он будет приостановлен примерно на это время, но возможно пробудет в состоянии покоя на несколько секунд меньше или больше. Не забывайте, что Windows не является операционной системой реального времени. Поток, скорее всего, пробудится в указанное время, но по большому счету время его пробуждения зависит от остальных происходящих в системе процессов.
Можно передать параметру millisecondsTimeout метода Sleep значение System. Threading. Тimeout. Infinite (определенное как -1). В результате поток окажется заблокированным на неограниченное время. При этом он будет существовать, и вы в любой момент сможете восстановить его стек и ядро. Передача в метод Sleep значения 0 сообщит системе, что вызывающий поток освобождает ее от его исполнения и заставляет запланировать другой поток. Впрочем, система при отсутствии доступных для планирования потоков такого же или более высокого приоритета может снова запланировать исполнение потока, только что вызвавшего метод Sleep.
Поток может также попросить Windows запланировать для текущего процессора другой поток, вызвав метод Yield класса Thread:
public static Boolean YieldQ;
При наличии другого потока, готового работать на данном процессоре, метод возвращает значение true, время жизни вызвавшего его потока считается завершенным, а в течение одного такта используется другой выбранный поток. После этого вызвавший метод Yield поток снова попадает в расписание и начинает работать в течение следующего такта. При отсутствии потоков, на которые можно переключиться, метод Yield возвращает значение false, и вызвавший его поток продолжает исполняться.
Метод Yield дает шанс исполнить ожидающие своего процессорного времени потоки равного или более низкого приоритета. Поток вызывает данный метод, если ему требуется ресурс, которым в настоящее время владеет другой поток. Он рассчитывает на то, что Windows поставит обладающий ресурсом в данный момент поток в очередь планировщика, освободив тем самым доступ. В результате, когда вызвавший метод Yield поток снова начнет исполняться, доступ к ресурсу может получить уже он.
Существует выбор между вызовом методов Thread. Sleep (0) и Th read. Sleep (1). В первом случае потокам с низким приоритетом не дают исполняться, в то время как метод Thread.Sleep(l) включает принудительное переключение контекста, и Windows погружает поток в спящее состояние более чем на 1 мс, что обусловлено разрешением внутреннего системного таймера.
На гиперпотоковых процессорах в каждый момент времени может выполняться толкьо один поток. И когда на таких процессорах поток входит в состояние зацикливания, нужно принудительно остановить текущий поток, позволив исполняться другому. Поток может остановиться сам, дав гиперпотоковому процессору возможность переключиться на другие потоки, вызывая метод SpinWait класса Thread:
public static void SpinWait(Int32 iterations);
Вызывая этот метод, вы фактически выполняете специальную инструкцию процессора. Она не заставляет Windows делать какую-либо работу (операционная система уверена, что она уже запланировала для процессора два потока). В случае если гиперпотоковый процессор не используется, эта специальная инструкция просто игнорируется.
ПРИМЕЧАНИЕ
Чтобы лучше познакомиться с данными методами, почитайте про их Win32- эквиваленты: Sleep, SwitchToThread и YieldProcessor. Дополнительные сведения о настройке разрешения системного таймера вы получите при знакомстве с Win32- функциями timeBeginPeriod и timeEndPeriod.
В FCL существует также структура System.Threading.SpinLock, сходная с показанным ранее классом SimpleSpinLock. Она отличается использованием структуры SpinWait с целью повышения производительности. Структура SpinLock поддерживает время ожидания. Интересно отметить, что обе структуры — моя SimpleSpinLock и SpinLock в FCL — относятся к значимым типам. То есть они являются облегченными объектами, требующими минимальных затрат памяти. Перечислением SpinLock имеет смысл пользоваться, если вам нужно, к примеру, связать блокировку с каждым элементом коллекции. Но при этом нужно следить за тем, чтобы экземпляры SpinLock никуда не передавались, потому что они при этом копируются, из-за чего вся синхронизация теряется. И хотя вы можете определять экземплярные поля SpinLock, не помечайте их как предназначенные только для чтения (readonly), поскольку при манипуляциях с блокировкой их внутреннее значение должно меняться.
Универсальный Interlocked-паттерн
Многие пользователи, познакомившись с Interlocked-методами, удивляются, почему специалисты Microsoft не разработали дополнительных методов подобного рода, подходящих для большего количества ситуаций. К примеру, в классе Interlocked были бы полезны методы Multiple, Divide, Minimum, Maximum, And, Or, Xor и многие другие. Однако вместо этих методов можно использовать хорошо известный шаблон, позволяющий методом Interlocked. CompareExchange атомарно выполнять любые операции со значениями типа Int32. А так как существуют перегруженные версии этого метода для типов Int64, Single, Double, Object, а также для обобщенного ссылочного типа, шаблон может работать и со всеми этими типами.
Вот пример создания на основе шаблона атомарного метода Maximum:
public static Int32 Maximum(ref Int32 target, Int32 value) {
Int32 currentVal = target, startVal, desiredVal;
// Параметр target может использоваться другим потоком,
// его трогать не стоит do {
// Запись начального значения этой итерации startVal = currentVal;
// Вычисление желаемого значения в контексте startVal и value desiredVal = Math.Max(startVal, value);
// ПРИМЕЧАНИЕ. Здесь поток может быть прерван!
// if (target == startVal) target = desiredVal
// Возвращение значения, предшествующего потенциальным изменениям currentVal = Interlocked.CompareExchange( ref target, desiredVal, startVal);
![]() |
// Если начальное значение на этой итерации изменилось, повторить } while (startVal != currentVal);
// Возвращаем максимальное значение, когда поток пытается его присвоить return desiredVal;
}
Давайте посмотрим, что здесь происходит. В момент, когда метод начинает выполняться, переменная currentVal инициализируется значением параметра target. Затем внутри цикла то же самое начальное значение получает переменная startVal. При помощи этой последней переменной вы можете выполнять любые нужные вам операции. Они могут быть крайне сложными и состоять из тысяч строк кода. Но в итоге должен быть сформирован результат, который помещается в переменную desiredVal. В моем примере просто сравниваются переменные startValи value.
Пока операция выполняется, значение target может быть изменено другим потоком. Это маловероятно, но теоретически не исключено. Если это произойдет, значение переменной derivedVal окажется основанным на старом значении переменной startVal, а не на текущем значении параметра target, а следовательно, мы не должны менять этот параметр. Гарантировать, что значение параметра target поменяется на значение переменной desiredVal при условии, что никакой другой поток не поменяет его за спиной нашего потока, можно с помощью метода Interlocked.CompareExchange. Он проверяет, совпадает ли значение параметра target со значением переменной startVal (а именно его мы предполагаем у параметра target перед началом выполнения операции). Если значение параметра target не поменялось, метод CompareExchange заменяет его новым значением переменной desiredVal. Если же изменения произошли, метод CompareExchange не трогает параметр target.
Метод CompareExchange возвращает значение параметра target на момент своего вызова, которое мы помещаем в переменную currentVal. Затем переменная startVal сравнивается с новым значением переменной currentVal. В случае совпадения поток не меняет параметр target за нашей спиной, этот параметр содержит значение переменной desiredVal, цикл while прекращает свою работу, и метод возвращает управление. Если же значения не совпадают, значит, другой поток поменял значение параметра target, поэтому параметру не было присвоено значение переменной desiredVal, цикл переходит к следующей итерации и пробует снова выполнить операцию на этот раз с новым значением переменной currentVal, отражающей изменения, внесенные посторонним потоком.
Лично я использовал данный шаблон очень часто и даже создал инкапсулировавший его обобщенный метод Morph[67]:
delegate Int32 Morpher<TResult, TArgument>(
Int32 startValue, TArgument argument, out TResult morphResult);
static TResult Morph<TResult, TAngument>( ref Int32 target, TArgument argument,
Morpher<TResult, TArgument> morpher) {
TResult morphResult;
Int32 currentVal = target, startVal, desiredVal; do {
startVal = currentVal;
desiredVal = morpher(startVal, argument, out morphResult); currentVal = Interlocked.CompareExchange( ref target, desiredVal, startVal);
} while (startVal != currentVal); return morphResult;
Конструкции режима ядра
Для синхронизации потоков в Windows существует несколько конструкций режима ядра. Они работают намного медленнее конструкций пользовательского режима, так как требуют координации со стороны операционной системы. Кроме того, каждый вызов метода для объекта ядра заставляет вызывающий поток перейти из управляемого в машинный код, затем в код режима ядра, после чего возвращается назад. Такие переходы требуют много процессорного времени и их частое выполнение значительно снижает производительность приложения.
Впрочем, у конструкций режима ядра есть и свои преимущества перед конструкциями пользовательского режима:
□ Если конструкция режима ядра выявляет конкуренцию за ресурс, Windows блокирует проигравший поток, останавливая зацикливание, которое ведет к напрасному расходованию ресурсов процессора.
□ Конструкции режима ядра могут осуществлять взаимную синхронизацию неуправляемых и управляемых потоков.
□ Конструкции режима ядра умеют синхронизировать потоки различных процессов, запущенных на одной машине.
□ Конструкции режима ядра можно наделить атрибутами безопасности, ограничивая несанкционированный доступ к ним.
□ Поток можно заблокировать, пока не станут доступны все конструкции режима ядра или пока не станет доступна хотя бы одна такая конструкция.
□ Поток можно заблокировать конструкцией режима ядра, указав время ожидания; если за указанное время поток не получит доступа к нужному ему ресурсу, он будет разблокирован и сможет выполнять другие задания.
К примитивным конструкциям синхронизации потоков в режиме ядра относятся события (events) и семафоры (semaphores). На их основе строятся
более сложные конструкции аналогичного назначения, например мьютексы (mutex). Более полную информацию о них вы найдете в моей книге «Windows via C/C++» (Microsoft Press, 2007).
В пространстве имен System. Threading существует абстрактный базовый класс WaitHandle. Он играет роль оболочки для дескриптора ядра Windows. В FCL имеется несколько производных от него классов. Все они определены в пространстве имен System.Threading и реализуются библиотекой MSCorLib.dll. Исключением является класс Semaphore, реализованный в библиотеке System.dll. Вот как выглядит иерархия этих классов:
WaitHandle
EventWaitHandle AutoResetEvent ManualResetEvent Semaphore Mutex
В базовом классе WaitHandle имеется поле SafeWaitHandle, содержащее дескриптор ядра Win32. Это поле инициализируется в момент создания класса, производного от WaitHandle. Кроме того, класс WaitHandle предоставляет открытые методы, которые наследуются всеми производными классами. Каждый из вызываемых конструкциями режима ядра методов обеспечивает полную защиту памяти. Вот наиболее интересные открытые методы класса WaitHandle (перегруженные версии некоторых методов не показаны):
public abstract class WaitHandle : MarshalByRefObject, IDisposable {
// Реализация WaitOne вызывает функцию Win32 WaitForSingleObjectEx. public virtual Boolean WaitOneQ;
public virtual Boolean Wait0ne(Int32 millisecondsTimeout); public virtual Boolean WaitOne(TimeSpan timeout);
// Реализация WaitAll вызывает функцию Win32 WaitForMultipleObjectsEx public static Boolean WaitAll(WaitHandle[] waitHandles); public static Boolean WaitAll(WaitHandle[] waitHandles,
Int32 millisecondsTimeout);
public static Boolean WaitAll(WaitHandle[] waitHandles, TimeSpan timeout);
// Реализация WaitAny вызывает функцию Win32 WaitForMultipleObjectsEx public static Int32 WaitAny(WaitHandle[] waitHandles); public static Int32 WaitAny(WaitHandle[] waitHandles,
Int32 millisecondsTimeout);
public static Int32 WaitAny(WaitHandle[] waitHandles, TimeSpan timeout); public const Int32 WaitTimeout = 258; // Возвращается WaitAny
// в случае тайм-аута
// Реализация Dispose вызывает функцию Win32 // CloseHandle - НЕ ВЫЗЫВАЙТЕ ЕЕ! public void DisposeQ;
Здесь следует сделать несколько замечаний:
□ Метод WaitOne класса WaitHandle блокирует текущий поток до активизации объектом ядра. Он вызывает Win32^yinapno WaitForSingleObjectEx. Значение true возвращается, если объект был активизирован. Если же время ожидания истекло, возвращается значение false.
□ Статический метод WaitAll класса WaitHandle заставляет вызывающий поток ждать активизации всех объектов ядра, указанных в массиве WaitHandle[], Если все объекты были активизированы, возвращается значение true, в случае же истечения времени ожидания возвращается значение false. Данный метод вызывает \Ут32-функцию WaitForMultipleObjectsEx, передавая параметру bWaitAllзначение TRUE.
□ Статический метод WaitAny класса WaitHandle заставляет вызывающий поток ждать активизации любого из объектов ядра, указанных в массиве WaitHandle[ ]. Возвращенное значение типа Int32 является индексом активизированного элемента массива. Если в процессе ожидания сигнала не поступило, возвращается значение WaitHandle.WaitTimeout. Данный метод вызывает \¥т32-функцию WaitForMultipleObjectsEx, передавая параметру bWaitAll значение FALSE.
□ Метод Dispose закрывает дескриптор объекта ядра. Во внутренней реализации эти методы вызывают функцию Win32 CloseHandle. Вызывать Dispose в коде можно только в том случае, если вы абсолютно уверены, что объект ядра не используется другими потоками. Это обстоятельство сильно затрудняет написание и тестирование кода, поэтому я настоятельно не рекомендую вызывать Dispose; просто позвольте уборщику мусора выполнить свою работу. Он сможет определить, когда объект не используется, и уничтожит его.
ПРИМЕЧАНИЕ
В некоторых случаях при блокировке потока из однопоточного отделения (apartment) возможно пробуждение потока для обработки сообщений. Например, заблокированный поток может проснуться для обработки Windows-сообщения, отправленного другим потоком. Это делается для совместимости с моделью СОМ. Для большинства приложений это не проблема. Но если ваш код в процессе обработки сообщения запрет другой поток, может случиться взаимная блокировка. Как вы увидите в главе 29, все гибридные блокировки тоже могут вызывать данные методы, так что вышесказанное верно и для них.
В прототипе версий WaitOne, WaitAll и SignalAndWait, не принимающих параметр timeout, должно быть указано возвращаемое значение void, а не Boolean. В противном случае методы бы всегда возвращали значение true из-за предполагаемого бесконечного времени ожидания (System. Threading .Timeout. Infinite).
Так что при вызове любого из этих методов нет нужды проверять возвращаемое им значение.
Как уже упоминалось, классы AutoResetEvent, ManualResetEvent, Semaphore и Mutex являются производными от класса WaitHandle, то есть наследуют методы этого класса и их поведение. Впрочем, эти классы обладают и собственными методами, о которых мы сейчас и поговорим.
Во-первых, конструкторы всех этих классов вызывают \Ут32-функцию CreateEvent (передавая в параметре bManualReset значение FALSE), CreateEvent (передавая в параметре bManualReset значение TRUE), CreateSemaphore или CreateMutex. Значение дескриптора, возвращаемого при таких вызовах, сохраняется в закрытом поле SafeWaitHandle, определенном в базовом классе WaitHandle.
Во-вторых, классы EventWaitHandle, Semaphore и Mutex предлагают статические методы OpenExisting, вызывающие Win32-(|)yHKnHio OpenEvent, OpenSemaphore или OpenMutex, передавая ей аргумент типа String с именем существующего ядра. Значение дескриптора, возвращаемого при таких вызовах, сохраняется во вновь созданном объекте, возвращаемым методом OpenExisting. При отсутствии ядра с указанным именем генерируется исключение WaitHandleCannotBeOpenedExcep- tion.
Конструкции режима ядра часто используются для создания приложений, которые в любой момент времени могут существовать только в одном экземпляре. Примерами таких приложений являются Microsoft Office Outlook, Windows Live Messenger, Windows Media Player Windows Media Center. Вот как реализовать такое
приложение:
using System;
using System.Threading;
public static class Program { public static void Main() {
Boolean createdNew;
// Пытаемся создать объект ядра с указанным именем using (new Semaphored, 1, "SomeUniqueStringldentifyingMyApp", out createdNew)) { if (createdNew) {
// Этот поток создает ядро, так что другие копии приложения // не могут запускаться. Выполняем остальную часть приложения...
} else {
// Этот поток открывает существующее ядро с тем же именем;
// должна запуститься другая копия приложения.
// Ничего не делаем, ждем возвращения управления от метода Main,
// чтобы завершить вторую копию приложения
}
}
}
В этом фрагменте кода используется класс Semaphore, но с таким же успехом можно было воспользоваться классом EventWaitHandle или Mutex, так как предлагаемое объектом поведение не требует синхронизации потока. Однако я использую преимущество такого поведения при создании объектов ядра. Давайте посмотрим, как работает показанный код. Представим, что две копии процесса запустились одновременно. Каждому процессу соответствует его собственный поток, и оба потока попытаются создать объект Semaphore с одним и тем же именем (в моем примере SomeUniqueStringldentifyingMyApp). Ядро Windows гарантирует создание объекта ядра с указанным именем только одним потоком; переменной createdNew этого потока будет присвоено значение true.
В случае со вторым потоком Windows обнаруживает, что объект ядра с указанным именем уже существует; соответственно, потоку не позволяется создать еще один объект. Впрочем, продолжив работу, этот поток может получить доступ к тому же объекту ядра, что и поток первого процесса. Таким способом потоки из различных процессов взаимодействуют друг с другом через единое ядро. Но в данном случае поток второго процесса видит, что его переменной createdNew присвоено значение false. Таким образом он узнает о том, что первая копия процесса запущена, поэтому вторая копия немедленно завершает свою работу.
События
![]() |
События (events) представляют собой переменные типа Boolean, находящиеся под управлением ядра. Ожидающий события поток блокируется, если оно имеет значение false, и освобождается в случае значения true. Существует два вида событий. Когда событие с автосбросом имеет значение true, оно освобождает всего один заблокированный поток, так как после освобождения первого потока ядро автоматически возвращает событию значение false. Если же значение true имеет событие с ручным сбросом, оно освобождает все ожидающие этого потоки, так как в данном случае ядро не присваивает ему значение false автоматически, в коде это должно быть сделано в явном виде. Вот как выглядят классы, связанные с событиями:
public sealed class AutoResetEvent : EventWaitHandle { public AutoResetEvent(Boolean initialState);
}
public sealed class ManualResetEvent : EventWaitHandle { public ManualResetEvent(Boolean initialState);
С помощью события с автосбросом можно легко реализовать блокировку в рамках синхронизации потоков, поведение которого сходно с поведением ранее показанного класса SimpleSpinLock:
internal sealed class SimpleWaitLock : IDisposable { private readonly AutoResetEvent m_available;
public SimpleWaitLock() {
m_available = new AutoResetEvent(true); // Изначально свободен
}
public void EnterQ {
// Блокирование на уровне ядра до освобождения ресурса mavailable.WaitOne();
}
public void LeaveQ {
// Позволяем другому потоку обратиться к ресурсу mavailable.Set();
}
public void DisposeQ { m_available.Dispose(); }
}
Класс SimpleWaitLock применяется так же, как мы использовали бы класс SimpleSpinLock. Более того, внешне он ведет себя совершенно так же; а вот производительность двух вариантов блокировки отличается кардинальным образом. При отсутствии конкуренции за блокировку класс SimpleWaitLock работает намного медленнее класса SimpleSpinLock, поскольку каждый вызов его методов Enter и Leave заставляет поток совершить переход из управляемого кода в ядро и обратно. Тем не менее при наличии конкуренции проигравший поток блокируется ядром и не зацикливается, не давая впустую тратить ресурсы процессора. Имейте в виду, что переходами из управляемого кода в ядро и обратно сопровождается также создание объекта AutoResetEvent и вызов для него метода Dispose, что отрицательно сказывается на производительности. Впрочем, эти вызовы совершаются редко, так что не стоит слишком сильно беспокоиться по этому поводу.
Чтобы продемонстрировать разницу в производительности, я написал следующий код:
public static void Main() {
Int32 x = 0;
const Int32 iterations = 10000000; // 10 миллионов
// Сколько времени займет инкремент х 10 миллионов раз?
Stopwatch sw = Stopwatch.StartNewQ; for (Int32 i = 0; i < iterations; i++) { x++;
>
Console.WriteLine("Incrementing x: {0:N0}“, sw.ElapsedMilliseconds);
// Сколько времени займет инкремент х 10 миллионов раз, если
// добавить вызов ничего не делающего метода? sw.Restart();
for (Int32 i = 0; i < iterations; i++) {
M(); x++; M();
}
Console.WriteLine("Incrementing x in M: {0:N0}“, sw.ElapsedMilliseconds);
// Сколько времени займет инкремент х 10 миллионов раз, если // добавить вызов неконкурирующего объекта SimpleSpinLock?
SpinLock si = new SpinLock(false); sw.Restart();
for (Int32 i = 0; i < iterations; i++) {
Boolean taken = false; sl.Enter(ref taken); x++; sl.Exit();
}
Console.WriteLine("Incrementing x in SpinLock: {0:N0}“, sw.ElapsedMilliseconds);
// Сколько времени займет инкремент х 10 миллионов раз, если // добавить вызов неконкурирующего объекта SimpleWaitLock? using (SimpleWaitLock swl = new SimpleWaitLock()) { sw. RestartQ;
for (Int32 i = 0; i < iterations; i++) { swl.Enter(); x++; swl.Leave();
}
Console.WriteLine( "Incrementing x in SimpleWaitLock: {0:N0}“, sw.ElapsedMilliseconds);
}
1 [Methodlmpl(MethodlmplOptions.Noinlining)]
private static void M() { /* Этот метод только возвращает управление */ }
Запустив этот код, я получил следующий результат:
Incrementing х: 8 Incrementing х in М: 69 Incrementing х in SpinLock: 164 Incrementing х in SimpleWaitLock: 8,854
Как легко заметить, простой инкремент х занимает всего 8 мс. Простой вызов метода до и после инкремента увеличивает время выполнения в 9 раз! Выполнение кода в методе, который использует конструкции пользовательского режима, заставило код работать в 21 (164/8) раз медленней. А теперь обратите внимание, на сколько замедлилась программа при вставке в нее конструкций режима ядра. Результат достигается в 1107 (8854/8) раз медленней! Поэтому если можете избежать синхронизации потоков, избегайте ее. Если без нее не обойтись, задействуйте конструкции пользовательского режима. Конструкции режима ядра следует использовать лишь в самом крайнем случае.
Семафоры
Семафоры (semaphores) также представляют собой обычные переменные типа Int32, управляемые ядром. Ожидающий семафора поток блокируется при значении О и освобождается при значениях больше 0. При снятии блокировки с ожидающего семафора потока ядро автоматически вычитает единицу из счетчика. С семафорами связано максимальное значение типа Int32, которое ни при каких обстоятельствах не могут превысить текущие показания счетчика. Вот как выглядит класс Semaphore:
public sealed class Semaphore : WaitHandle {
public Semaphore(Int32 initialCount, Int32 maximumCount); public Int32 ReleaseQ; // Вызывает Release(l);
// возвращает предыдущее значение счетчика public Int32 Release(Int32 releaseCount); // Возвращает предыдущее
// значение счетчика
}
Подытожим, каким образом ведут себя эти три примитива режима ядра:
□ При наличии нескольких потоков в режиме ожидания событие с автосбросом освобождает только один из них.