Многопоточность в Python. Разбираемся подробней.

Возможно вы все пользовались потоками в Питоне, типа как я. Запустили там, пару тысяч потоков и что там вроде делается , висит , тупит . Но я решил разобраться с этой темой поподробней.

Что такое Thread (Поток)?

Thread – это отдельный поток выполнения. Это означает, что в вашей программе могут работать две и более подпрограммы одновременно. Ваши программы работает только на одном процессоре. Различные задачи, внутри потоков выполняются на одном ядре, а операционная система управляет, когда ваша программа работает с каким потоком. Как то туманно , да ? Мне очень понравилась аналогия которую прочитал в статье , я ее процитирую :

Лучшая аналогия, которую я читал «Введение Async IO в Python: полное прохождение» : которое сравнивает этот процесс с шахматистом-гроссмейстером, соревнующимся одновременно со многими противниками. Это всего лишь один человек, ему нужно переключаться между задачами (играми) и помнить состояние (обычно это называется state) для каждой игры.

webdevblog.ru

В Python есть стандартная библиотека для работы с потоками threading и класс Thread. Давайте что нибудь простое изобразим :

import threading
import time

def potoc (name):
    print('Поток '+str(name) +' стартовал.')
    time.sleep(2) # Спим
    print('Поток ' + str(name) +' остановился.')
print ('Создаем поток')
x = threading.Thread(target=potoc, args=(1,)) # Создание потока
print('Запускаем поток.')
x.start() # Запуск потока
print('Ждем когда завершиться поток.')
print('Конец программы.')
  • target=potoc — Передаем имя функции которая будет выполняться(подпрограмма).
  • args=(1,) — Передаем список аргументов функции.

Если вы заметили программа продолжила выполняться дальше. Это как раз и есть наглядная демонстрация что такое поток. Функция выполняется не зависимо от самой программы(в своем потоке) или на оборот. Но на самом деле программа не завершилась после выполнения print('Конец программы.'). Она ожидает выхода, потому что сам поток находится в спящем режиме. Как только он завершит работу и напечатает сообщение, вызовется метод .join() и программа сможет выйти. Тоже самое мы может сделать и сами, вызвать метод .join() для ожидания завершения запущенных потоков в любом месте программы.

print('Ждем когда завершиться поток.')
x.join()
print('Конец программы.')

Но есть и другой путь. Запустить демонический поток , который будет остановлен сразу после завершения программы. Еще проще, запустили, забыли и бросили на произвол судьбы. Что в нем там происходит , завершился он и т.д

демон (deamon)  — это компьютерная программа , которая работает в фоновом режиме, а не находится под непосредственным управлением интерактивного пользователя.

x = threading.Thread(target=potoc, args=(1,), daemon = True) # Создание демонического потока
Не забудьте закомментировать x.join() 

Давайте теперь запустим побольше потоков, штучек пять:

import threading
import time

def potoc (name):
    print('Поток '+str(name) +' стартовал.')
    time.sleep(2) # Спим
    print('Поток ' + str(name) +' остановился.')
for i in range(5):
    print ('Создаем поток '+str(i))
    x = threading.Thread(target=potoc, args=(i,))
    print('Запускаем поток '+str(i))
    x.start() # Запуск потока
print('Ждем когда завершиться поток.')
#x.join()
print('Конец программы.')

Запустив несколько раз код вы поймете, что потоки не смотря на то что создаются и запускаются друг за другом, завершаются всегда по разному. Дело в том что порядок выполнения потоков определяется операционной системой и его может быть довольно сложно предсказать или не возможно.

Для эксперимента можете попробовать запустить скажем миллион потоков ))) В этом тоже есть опасность, система имеет конечные ресурсы. Конечно же в Питоне есть методы для управления и контроля потоков.

Использование ThreadPoolExecutor.

Из названия думаю все понятно или не очень ))) Экзекутор Пула Потоков, Ахха-ха-ха-ха ))) Ну а если серьезно, этот метод нам поможет контролировать пул потоков. Давайте сразу пример и там разберемся.

import time
import concurrent.futures

def potoc (name):
    print('Поток '+str(name) +' стартовал.')
    time.sleep(2) # Спим
    print('Поток ' + str(name) +' остановился.')

with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
    executor.map(potoc, range(1,6))
#print('Ждем когда завершиться поток.')
#x.join()
print('Конец программы.')

ThreadPoolExecutor входит в стандартную библиотеку Питона: concurrent.futures — Запуск параллельных задач

  • concurrent.futures.ThreadPoolExecutor(max_workers=3) — Запускаем группу(пул) потолок, max_workers=3 — максимальное число работающих одновременно.
  • executor.map(potoc, range(1,6)) — Методом .map мы создаем поток и добавляем его в пул. Potoc — функция , и передаем переменную в диапазоне от 1 до 5. То есть, будет выполнено пять раз.

Собственно все это мы обернули в менеджер контекст with…as. Что из этого вышло : запустилось три потока, и как только какой то поток завершился, наш экзекутор запускает следующий из пула. Идея всего этого типа такая.

Примечание. Использование ThreadPoolExecutor может привести к ошибкам. Например, если вы вызываете функцию, которая не принимает параметров, но передаете ей параметры в .map(), в этом случае поток выдаст исключение. К сожалению, ThreadPoolExecutor скрывает это исключение, и (в случае выше) программа завершается без вывода. Это может стать довольно запутанным при отладке.

Вы библиотеке также есть класс ProcessPoolExecutor для процессов.

with concurrent.futures.ProcessPoolExecutor() as executor:
result = executor.map(function, iterable)

Условия гонки (Race Conditions).

Условия гонки могут возникать, когда два или более потока обращаются к общему фрагменту данных или ресурсу. Например выполняя простую операцию с общими данными в потоке х = х + 1 , операционная система передаст управление другому потоку сразу после прочтения значения х . Вторая часть : +1 и запись нового значения в переменную х будет исполнено позже. Думаю понятно что это может принести очень много проблем при отладке программы. Есть конечно методы борьбы с этим, например threading.Lock() (блокировка потока).

Блокировку может использовать только один поток одновременно. Если другой поток захочет вызвать блокировку в это же время, ему придётся ждать пока не разблокируется другой поток. Будьте внимательны , если вы не освободите поток, ваша программа просто зависнет ! Для блокировки потока используется метод .acquire() и освобождение потока .release(). Давайте попробуем что нибудь симулировать.

import threading
import time
import concurrent.futures

def potoc (name):
    #lock.acquire()
    global x
    local = x * 2
    x = local
    #lock.release()
    time.sleep(5)
for z in range(10):
    x = 1
    for i in range(1,11):
        l = threading.Thread(target=potoc, args=(i,))
        l.start()
    l.join()
    print('Величина переменно', x)

Теперь добавим блокировку потока во время обращения к переменной.

import threading
import time
import concurrent.futures

def potoc (name):
    global x
    print('Поток '+str(name) +' стартовал.')
    lock.acquire()
    local = x
    print (str(local)+'\n')
    x = local * 2
    lock.release()
    time.sleep(5) # Спим

for z in range(10):
    print ('Новый цикл')
    x = 1
    lock = threading.Lock()
    for i in range(1,11):
        l = threading.Thread(target=potoc, args=(i,))
        l.start()
    l.join()
    print('Величина переменно', x)

Сколько потоком запускать ?

Это наверное первый вопрос который возник у вас в голове. Я не дам однозначного ответа, но есть такое мнение : сколько ядер\виртуальных потоков у процессора, столько потоков и запускать одновременно. Все зависит от компьютера и задач которые вы пытаетесь выполнить.