Глава 9. Семафоры. Управление потоками данных. Взаимосвязь источник-приемник.

Содержание:

  • Семафоры.
  • Счетчик больше единицы? "Не вполне критические" секции.
  • Новое применение семафоров: управление потоками данных.
  • Ограниченный буфер.
  • Реализация ограниченного буфера в Delphi.
  • Создание: Корректная инициализация счетчиков семафоров.
  • Работа: правильные времена ожидания.
  • Разрушение: Очистка.
  • Разрушение: тонкости остаются.
  • Доступ к дескрипторам синхронизации должен быть синхронизован!
  • Управление дескрипторами в Win32.
  • Решение.
  • Использование ограниченного буфера: пример.
  • В завершение...

Семафоры.

Семафор представляет собой другой тип примитива синхронизации, с несколько более широкими возможностями по сравнению с мьютексом. В наиболее простых случаях его можно использовать точно так же, как и мьютекс. В общем же семафоры позволяют реализовать в программе более продвинутые механизмы синхронизации.
Сначала давайте вспомним, как работают мьютексы. Мьютекс может быть или занят или свободен (signalled). Если он свободен, то действие ожидания мьютекса не блокируется. Если он занят, операция ожидания этого мьютекса блокирована. Если мьютекс занят, то он принадлежит конкретному потоку, и, следовательно, только один поток может обладать мьютексом в каждый момент времени.
Семафоры можно заставить действовать точно так же. Вместо понятия владения, захвата мьютекса, у семафора имеется счетчик. Когда этот счетчик больше нуля, семафор свободен, и операции ожидания для него не блокируются. Когда счетчик равен 0, то семафор занят, и операции ожидания заблокированы. Мьютекс по существу является разновидностью семафора, счетчик которого может быть только 0 или 1. Аналогично, семафоры можно рассматривать как воображаемые мьютексы, которые могут одновременно иметь более одного владельца. Функции Win32 API, работающие с семафорами, очень похожи на функции для работы с мьютексами.

  • CreateSemaphore. Эта функция подобна CreateMutex. Вместо флага, указывающего, что поток, создающий мьютекс, сразу им будет владеть, эта функция принимает аргумент, задающий начальный счетчик. Создание занятого мьютекса подобно созданию семафора со счетчиком 0: в обоих случаях любой другой поток, ожидающий освобождения объекта, будет заблокирован. Аналогично, создание свободного мьютекса подобно созданию семафора со счетчиком 1: в обоих случаях единственный поток не будет заблокирован в ожидании объекта синхронизации.
  • Функции ожидания (Wait). Они для обоих случаев идентичны. Для мьютексов успешный результат ожидания приводит к тому, что поток захватывает мьютекс, а для семафоровуспешный результат ожидания уменьшает счетчик семафора, а если счеичик обнуляется, то вызывающий поток блокируется.
  • ReleaseSemaphore. Это подобно ReleaseMutex, но вместо освобождения мьютекса потоком, ReleaseSemaphore принимает дополнительный целочисленный аргумент, определяющий, на какую величину увеличится счетчик. ReleaseSemaphore либо увеличивает счетчик семафора, либо активирует соответствующее число потоков, которые были блокированы эти семафором.

Следующая таблица показывает, как превратить код, использующий мьютексы, в код с применением семафоров, и эквивалентные операции.

Выделить всёБез подсветки
1:
2:
3:
4:
5:
6:
7:
Мьютексы.                                  Семафоры.
 
MyMutex := CreateMutex(nil,FALSE,<name>);  MySemaphore := CreateSemaphore(nil,1,1,<name>); 
MyMutex := CreateMutex(nil,TRUE,<name>);   MySemaphore := CreateSemaphore(nil,0,1,<name>); 
WaitForSingleObject(MyMutex,INFINITE);     WaitForSingleObject(MySemaphore,INFINITE); 
ReleaseMutex(MyMutex);                     ReleaseSemaphore(MySemaphore,1); 
CloseHandle(MyMutex);                      CloseHandle(MySemaphore);



Вот простой пример: изменения, требующиеся для кода, представленного в 6 Главе, чтобы программа использовала семафоры вместо критических секций.

Выделить всёРазвернуть кодкод Pascal/Delphi
1:
2:
3:
4:
5:
6:
7:
8:
9:
10:
11:
12:
13:
14:
15:
16:
17:
18:
19:
20:
21:
22:
23:
24:
25:
26:
27:
28:
29:
30:
31:
32:
33:
34:
35:
36:
37:
38:
39:
40:
41:
42:
43:
44:
45:
46:
47:
48:
49:
50:
51:
52:
53:
54:
55:
56:
57:
58:
59:
60:
61:
62:
63:
64:
65:
66:
67:
68:
69:
70:
71:
72:
73:
74:
75:
76:
77:
78:
79:
80:
type
  TPrimeFrm = class(TForm)
    { No change here until public declarations }
  public
    { Public declarations }
    StringSemaphore: THandle; { Now a semaphore instead of a critical section }
    property StringBuf: TStringList read FStringBuf write FStringBuf;
  end;
 
procedure TPrimeFrm.StartBtnClick(Sender: TObject);
begin
  if not FStringSectInit then
  begin
    StringSemaphore := CreateSemaphore(nil11, SemName); { Now creating a semaphore instead of a critical section }
    FStringBuf := TStringList.Create;
    FStringSectInit := true;
    FPrimeThread := TPrimeThrd2.Create(true);
    SetThreadPriority(FPrimeThread.Handle, THREAD_PRIORITY_BELOW_NORMAL);
    try
      FPrimeThread.StartNum := StrToInt(StartNumEdit.Text);
    except
      on EConvertError do FPrimeThread.StartNum := 2;
    end;
    FPrimeThread.Resume;
  end;
  UpdateButtons;
end;
 
procedure TPrimeFrm.StopBtnClick(Sender: TObject);
begin
  if FStringSectInit then
  begin
    with FPrimeThread do
    begin
      Terminate;
      WaitFor;
      Free;
    end;
    FPrimeThread := nil;
    FStringBuf.Free;
    FStringBuf := nil;
    CloseHandle(StringSemaphore); { Deleting semaphore }
    FStringSectInit := false;
  end;
  UpdateButtons;
end;
 
procedure TPrimeFrm.HandleNewData(var Message: TMessage);
begin
  if FStringSectInit then {Not necessarily the case!}
  begin
    WaitForSingleObject(StringSemaphore, INFINITE); { New wait call }
    ResultMemo.Lines.Add(FStringBuf.Strings[0]);
    FStringBuf.Delete(0);
    ReleaseSemaphore(StringSemaphore, 1nil); { New release call }
    {Now trim the Result Memo.}
    if ResultMemo.Lines.Count > MaxMemoLines then
      ResultMemo.Lines.Delete(0);
  end;
end;
 
procedure TPrimeThrd2.Execute;
 
var
  CurrentNum: integer;
 
begin
  CurrentNum := FStartNum;
  while not Terminated do
  begin
    if IsPrime(CurrentNum) then
    begin
      WaitForSingleObject(PrimeFrm.StringSemaphore, INFINITE); { New wait call }
      PrimeFrm.StringBuf.Add(IntToStr(CurrentNum) + ' is prime.');
      ReleaseSemaphore(PrimeFrm.StringSemaphore, 1nil); { New release call }
      PostMessage(PrimeFrm.Handle, WM_DATA_IN_BUF, 00);
    end;
    Inc(CurrentNum);
  end;
end;



Счетчик больше единицы? "Не вполне критические" секции.

Возможность семафоров иметь счетчик более единицы отчасти аналогична разрешению мьютексу иметь более одного владельца. Таким образом семафоры позволяют создавать критические секции, который разрешают определенному количеству потоков иметь доступ к одному конкретному участку кода или к конкретному объекту. Это по большей части полезно в ситуациях, когда коллективный ресурс состоит из множества буферов или множества потоков, которые можно использовать и другими потоками системы. Давайте рассмотрим конкретный пример и предположим, что до трех потоков могут работать с определенным участком кода. Семафор создается с начальным и максимальным счетчиком 3, и предположим, что ни один поток не работает с критическим участком. Выполнение пяти потоков, пытающихся получить доступ к коллективному ресурсу, может выглядеть приблизительно так:

--Resize_Images_Alt_Text--


Это конкретное применение семафоров, вероятно, не особенно полезно для программистов на Delphi, главным образом потому, что есть несколько подобных статических структур для уровня приложения. Тем не менее, оно оказывается значительно более важным для ОС, где дескрипторы или ресурсы, такие как системные буферы, вероятно будут статически распределены во время загрузки.

Новое применение семафоров: управление потоками данных.

В Главе 6 было указано на потребность в управлении потоками данных при их прохождении между программными потоками . Кроме того, в Главе 8 эта тема была затронута при обсуждении мониторов. В данной главе рассматривается ситуация для примера, где часто требуется управление потоками данных: ограниченный буфер с единственным потоком-поставщиком данных, выводящий некоторые элементы в буфер, и единственный поток-потребитель, забирающий их оттуда.

Ограниченный буфер.

Ограниченный буфер представляет собой простую разделяемую структуру данных, которая обеспечивает и управление потоками данных, и общий доступ к данным. Буфер, рассмотренный здесь, будет простой очередью: первым вошел - первым вышел (FIFO). . Это будет реализовано в виде циклического буфера, то есть содержать фиксированное количество элементов и иметь два указателя "get" и "put", показывающие, в каком именно месте буфера данные будут вставлены и удалены. Обычно разрешается четыре операции с буфером:

  • Create Buffer. Создаются и инициализируются буфер и связанные с ним механизмы синхронизации.
  • Put Item. Попытка вставить элемент в буфер с учетом потокобезопасности. Если это невозможно из-за заполнения буфера, то поток, пытающийся вставить элемент, блокируется (приостанавливается), пока буфер не перейдет в состояние, в котором в него разрешено добавлять данные.
  • Get Item. Попытка забрать элемент из буфера с учетом потокобезопасности. Если это невозможно из-за того, что буфер пуст, то поток, пытающийся получить элемент, блокируется, пока буфер не перейдет в состояние, в котором из него разрешено удалять данные..
  • Destroy Buffer. Разблокирует все потоки, ожидающие буфера, разрушает буфер.

Очевидно, для обращения с коллективными данными потребуются мьютексы. Тем не менее, мы можем использовать семафоры для выполнения необходимых операции блокировки, когда буфер полон или пуст, устраняя потребность в контроле выхода за границы, и даже подсчете того, сколько элементов находится в буфере. Для того, чтобы это сделать, потребуется некоторое изменение концепций. Вместо ожидания семафора и затем освобождения его при выполнении операций, имеющих отношение к буферу, мы используем счетчик двух семафоров, чтобы следить за тем, сколько входов в буфере пусты или заполнены. Давайте назовем эти семафоры "EntriesFree" и "EntriesUsed".
Обычно с буфером взаимодействуют два потока. Поток-производитель (или писатель) пытается вставить данные в буфер, а поток-потребитель (читатель) пытается их извлечь, как показано на следующей диаграмме. Третий поток (возможно, поток VCL), может вмешиваться для того, чтобы создавать и уничтожать буфер.

--Resize_Images_Alt_Text--


Как можно видеть, потоки - читатель и писатель - выполняются в цикле. Поток-писатель создает элемент и пытается поместить его в буфер. Сначала поток ожидает семафора EntriesFree. Если счетчик EntriesFree нулевой, то поток будет заблокирован, так как буфер полный, и больше данных добавить нельзя. Как только это возможное ожидание закончится, поток добавляет элемент в буфер, а затем увеличивает счетчик EntriesUsed, и, если необходимо, активирует поток-потребитель. Соответственно поток-потребитель заблокируется, если счетчик EntriesUsed нулевой, а когда он удаляет элемент, то увеличивает счетчик EntriesFree, разрешая потоку-производителю добавлять новый элемент.
Блокировка нужного потока всякий раз, когда буфер становится полным или пустым, оставляет один или другой поток "вне игры". При данном размере буфера N, поток-производитель может только быть на N элементов впереди потока-потребителя до своей остановки, и аналогично, поток-потребитель не может отставать более, чем на N элементов. Это дает несколько преимуществ:

  • Один поток не может выдать лишние данные, таким образом мы избавляемся от проблем, указанных в Главе 6, где у нас один поток создавал очередь все увеличивающегося размера.
  • Буфер имеет конечный размер, в противоположность списку, рассмотренному ранее, так что мы можем предусмотреть наихудший вариант использования памяти.
  • Здесь не будет "ожидания при занятости". Когда потоку нечего делать, он приостанавливается. При этом не возникает ситуации, когда программист пишет маленькие циклы, которые ничего не делают, кроме ожидания новых данных при снятии блокировки. Этого следует избегать, так как впустую тратится процессорное время.

Для полной ясности я дам пример последовательности событий. У нас есть буфер, который имеет 4 возможных входа, и инициализируется он так, что все элементы свободны. Возможно много путей выполнения в зависимости от работы планировщика, но я проиллюстрирую тот, где каждый поток выполняется столько, сколько возможно, прежде чем он будет приостановлен.

Выделить всёБез подсветки
1:
2:
3:
4:
5:
6:
7:
8:
9:
10:
11:
12:
13:
14:
15:
16:
17:
18:
19:
20:
21:
22:
23:
Действия потока-читателя              Действия потока-писателя            Число     Число
                                                                        свободных  занятых
                                                                        элементов  элементов
Thread starts                         Thread inactive (not scheduled)       4       0
Wait(EntriesUsed) blocks. Suspended.                                        4       0
                                      Wait(EntriesFree) flows through       3       0
                                      Item Added. Signal(EntriesUsed)       3       1
                                      Wait(EntriesFree) flows through       2       1
                                      Item Added. Signal(EntriesUsed)       2       2
                                      Wait(EntriesFree) flows through       1       2
                                      Item Added. Signal(EntriesUsed)       1       3
                                      Wait(EntriesFree) flows through       0       3
                                      Item Added. Signal(EntriesUsed)       0       4
                                      Wait(EntriesFree) blocks. Suspended   0       4
Wait(EntriesUsed) completes                                                 0       3
Item Removed. Signal(EntriesFree)                                           1       3
Wait(EntriesUsed) flows through                                             1       2
Item Removed. Signal(EntriesFree)                                           2       2
Wait(EntriesUsed) flows through                                             2       1
Item Removed. Signal(EntriesFree)                                           3       1
Wait(EntriesUsed) flows through                                             3       0
Item Removed. Signal(EntriesFree)                                           4       0
Wait(EntriesUsed) blocks. Suspended                                         4       0



Реализация ограниченного буфера в Delphi.

Вот первая реализация ограниченного буфера на Delphi.

Выделить всёРазвернуть кодкод Pascal/Delphi
1:
2:
3:
4:
5:
6:
7:
8:
9:
10:
11:
12… Продолжение »