В статье я опишу механизмы Python 3 с помощью которых выполняется синхронизация потоков, а именно классы Event, Condition, Barrier и Semaphore.

Введение

Синхронизация потоков Python 3 нужна в разных случаях, например:

  • Потоки должны ожидать какого-то события, и в случае его наступления выполнять работу дальше.
  • Один поток должен подождать разблокировки объекта, так как с ним уже работает другой поток. При этом разблокировка будет выполняться по како-му то событию.
  • Приложение должно дождаться работы нескольких потоков и только потом продолжить выполнение кода.
  • Может быть запущено большое количество потоков, но выполняться они должны параллельно по нескольку штук.

Синхронизация потоков в Python 3 обеспечивается благодаря классам модуля threading: Event, Condition, Barrier, Semaphore.

Класс Event

Здесь мы рассмотрим класс Event модуля threading. Этот класс содержит 3 метода: set — устанавливает флаг, clear — очищает флаг, wait — заставляет поток ожидать, пока флаг не будет установлен.

По умолчанию флаг снят, то есть строчка event.wait() заставит поток приостановиться. Дальше можно установить флаг с помощью event.set(), и тогда поток продолжит своё выполнение.

Более понятным языком это можно описать так, запускаем процессы, они что-то делают, затем приостанавливаются и ждут каково-то события. И как только это событие наступает, то продолжают свою работу.

Чтобы лучше с этим разобраться напишем демо-программу. Начнём с импорта нужных модулей: time и threading. Функция current_thread — определяет имя текущего потока, а функция active_count — сообщает текущее число потоков.

from time import sleep
from threading import Thread, Event, current_thread, active_count

Создадим объект класса Event. По умолчанию у созданного объекта флаг снят (то есть событие не наступило).

event = Event()

Укажем количество потоков которые будут запускаться:

max_t = 5

Напишем функцию, которую будем запускать в потоках. Она в переменную t_nane сохраняет имя текущего потока. Сообщает что запустился такой-то поток. А после ждёт пока наступит событие (event.wait()). И как только событие наступит, продолжает свою работу.

def f():
    thr_num = current_thread().name
    print(f"Поток {thr_num} запустился. Но ждёт остальных.")
    event.wait()
    print(f"Событие наступило! Поток {thr_num} продолжил свою работу")

Теперь в цикле запускаем потоки с задержкой 0.2 секунды:

for i in range(max_t):
    Thread(target=f).start()
    sleep(0.2)

И как только запустятся все потоки устанавливаем флаг объекту event (говорим что событие наступило):

if active_count() >= max_t:
    event.set()

Результат выполнения написанной выше программы:

Поток Thread-1 (f) запустился. Но ждёт остальных.
Поток Thread-2 (f) запустился. Но ждёт остальных.
Поток Thread-3 (f) запустился. Но ждёт остальных.
Поток Thread-4 (f) запустился. Но ждёт остальных.
Поток Thread-5 (f) запустился. Но ждёт остальных.
Событие наступило! Поток Thread-1 (f) продолжил свою работу
Событие наступило! Поток Thread-2 (f) продолжил свою работу
Событие наступило! Поток Thread-5 (f) продолжил свою работу
Событие наступило! Поток Thread-4 (f) продолжил свою работу
Событие наступило! Поток Thread-3 (f) продолжил свою работу

Дополнительная информация по классу Event().

Класс Condition

Класс Condition тоже используется для синхронизации потоков. Он является некоторой комбинацией классов Event и Lock, что позволяет потокам ожидать пока ресурс с которым происходит работа будет разблокирован. А разблокировать мы можем объект при каком-то условии.

Например первый поток перебирает число от 1 до 20, и если число делится на 5 без остатка, то разблокирует объект (устанавливает флаг). А второй поток выводит надпись «Событие наступило», когда дождётся установленного флага. Вот такую программу мы и реализуем.

Начнем писать код с импорта необходимых модулей и создание объекта класса Condition:

from time import sleep
from threading import Thread, Condition
cond = Condition()

Напишем первую функцию. Она будет в бесконечном цикле ставить блокировку — cond.wait(). И с Condition важно использовать менеджер контекста (with) чтобы он сам следил за блокировкой и разблокировкой.

def f1():
    while True:
        with cond:
            cond.wait()
            print("Получили событие!")

Затем напишем вторую функцию. В цикле пробежимся по цифрам от 0 до 20. И если число делится на 5 без остатка, то будем снимать блокировку (cond.notify()). Здесь также используем менеджер контекста (with). В противном случае (если число не делится на 2 без остатка) мы выводим это число на экран. И дополнительно установим небольшую задержку (sleep(0.2)).

def f2():
    for i in range(21):
        if i % 5 == 0:
            with cond:
                cond.notify()
        else:
            print(i)
        sleep(0.2)

И наконец запускаем оба потока:

Thread(target=f1, daemon=True).start()
Thread(target=f2).start()

Первый поток запускаю в режиме демона, так как там используется бесконечный цикл и нужно чтобы этот поток завершился при завершении основной программы.

Результат выполнения нашей программы:

Получили событие!
1
2
3
4
Получили событие!
6
7
8
9
Получили событие!
11
12
13
14
Получили событие!
16
17
18
19
Получили событие!

За надпись «Получили событие!» отвечает первая функция. Разблокировка происходит во второй функции, когда число делится на 5 без остатка.

Дополнительная информация по Condition().

Класс Barrier

Barrier позволяет реализовать алгоритм, когда необходимо дождаться завершения работы группы потоков, прежде чем продолжить выполнение задачи.

Например у вас первый поток что-то высчитывает и второй поток что-то высчитывает. А третий поток (возможно основной) должен использовать результаты первого и второго потока для дальнейшего расчёта. Для этого основная программа должна дождаться выполнения обоих потоков и только затем продолжить работать. Вот и напишем подобную программу.

Начнем как обычно с импорта необходимых модулей:

from time import sleep
from threading import Thread, Barrier

Создадим барьер. При его создании необходимо указать сколько потоков будет работать с барьером. У нас будет работать 3 потока — 1 основной и 2 дополнительных.

br = Barrier(3)

Дальше сделаем две глобальные переменные, в которые будем помещать результаты работы каждой функции:

a = 0
b = 0

Напишем первую функцию (возведение числа в квадрат). Она будет ожидать 1 секунду. А дальше будет ждать заполнения барьера до 3 потоков (br.wait()).

def f1(x):
   print("Высчитываем 1 число")
   global a 
   a = x**2
   sleep(1)
   br.wait()

Напишем вторую функцию (умножение на 2). Она будет ожидать 2 секунды. И тоже ожидать заполнения барьера.

def f2(x):
   print("Высчитываем 2 число")
   global b 
   b = x*2
   sleep(2)
   br.wait()

Я использую разное ожидание (первый поток ждёт 1 секунду, второй — 2 секунды), чтобы продемонстрировать, что потоки могут работать разное время. Но в ожидание br.wait() должны попасть три потока, и только тогда потоки продолжат своё выполнение.

Запустим оба потока:

Thread(target=f1, args=(3,)).start()
Thread(target=f2, args=(7,)).start()

Укажем основному потоку программы тоже ждать заполнения барьера:

br.wait()

И вот тут у нас 3 потока попали в ожидание барьера, как только это случилось, программа продолжила своё выполнение. После чего на экран выводится сумма полученных ранее двух результатов:

print("Результат = ", a+b)

Результат выполнения этой программы:

Высчитываем 1 число
Высчитываем 2 число
Результат =  23

Класс Semaphore

Семафоры — это технология в основе которой лежит счетчик. Когда поток заходит в семафор, то его счётчик уменьшается на 1. И когда счетчик становится равным нулю, то новый поток уже не может попасть в семафор и ожидает освобождения семафора. Когда поток в семафоре завершается, то счетчик увеличивается на 1.

Например создаём семафор = 5. Но запускаем 10 потоков. В итоге только 5 потоков получат доступ к семафору. А остальным 5 потокам придётся ждать своей очереди.

Это удобно, чтобы ограничить число подключений к сети или одновременно авторизованных пользователей программы.

Напишем демо-программу. Начнем с импорта необходимых модулей и создания объекта семафора (укажем что семафор работает одновременно только с двумя потоками):

from time import sleep, time
from threading import Thread, Semaphore, current_thread
s = Semaphore(2)

Напишем функцию которую будем запускать в потоках. Она будет запоминать время запуска потока и время перед самым завершением. При этом, поток после запуска будет попадать в семафор, в котором помещаются только 2 потока.

def f():
   thname = current_thread().name
   start_tread = time()
   with s:
       sleep(1)
       print(f"Время выполнения потока {thname} = {time() - start_tread}")

То есть наша функция сохраняет имя потока и текущее время. Затем использует семафор. В семафоре ожидает 1 секунду, и выводит общее время работы потока.

И наконец запускаем 10 потоков в цикле:

for i in range(10):
   Thread(target=f).start()

Результат выполнения этой программы:

Время выполнения потока Thread-1 (f) = 1.0010035037994385
Время выполнения потока Thread-2 (f) = 1.0010035037994385
Время выполнения потока Thread-3 (f) = 2.001539468765259
Время выполнения потока Thread-4 (f) = 2.0020761489868164
Время выполнения потока Thread-5 (f) = 3.002070665359497
Время выполнения потока Thread-6 (f) = 3.003070592880249
Время выполнения потока Thread-7 (f) = 4.003071546554565
Время выполнения потока Thread-8 (f) = 4.003068447113037
Время выполнения потока Thread-9 (f) = 5.003071069717407
Время выполнения потока Thread-10 (f) = 5.0040764808654785

Как видим, потоки выполняются одновременно по 2.

Итог

Мы познакомились с классами Event, Condition, Barrier, Semaphore модуля threading. И научились синхронизировать работу нескольких потоков.

Остальные статье по Python 3 можете посмотреть здесь.

Сводка
Python 3 Синхронизация потоков
Имя статьи
Python 3 Синхронизация потоков
Описание
В статье я опишу механизмы Python 3 с помощью которых выполняется синхронизация потоков, а именно классы Event, Condition, Barrier и Semaphore

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *