Home > Other > Основи роботи з потоками у мові Python

Основи роботи з потоками у мові Python

Деталі:
Стаття писалася в той час коли останніми версіями CPython були:
2.6.2 для другої гілки і 3.1.1 для третьої гілки.У статті використовуються нововведені в CPython 2.6 with statements, тому при бажанні використання цього коду у більш ранніх версіях доведеться переписувати код на розсуд. У процессі написання данної статті використовувалися лише стандартні модулі, доступні “з коробки”. Також, виходячи з того, що я є не професійним програмістом, а самоучкою, то прошу вибачення у поважної аудиторії за возморжливі неточності відносно трактування тих чи інших понять. Тому, запрошую вас задавати питання, на які я буду по можливості давати відповіді.

Вступ:
Цю статтю я задумав написати після все частіших і частіших питань як на форумі , так і питань в icq на тему багатопоточності в CPython. Проблема людей, які їх задають виходить, в основному, з незнання чи нерозуміння основних принципів роботи багатопоточних додатків. По меншій мірі, це відноситься до використовуваної мною моделі багатопотоковості, котра носить назву Thread Pool (Пул потоків). Частою проблемою є і інше : люди не мають елементарних навичок роботи із стандартними модулями CPython. В статті я буду намагатися навести приклади такого незнання, не зупиняючись на особистостях, так як це ,на мою скромну думку, неважливо. Виходячи з умов, в яких пишется ця стаття, то ми трохи торкнемся й работи через proxy сервер(не плутати з SOCKS).

Отже, приступимо, фактично, то що я зібрався описувати вперше було порекомендовано шанованим lorien з python.su (правда в його прикладі Queue взагалі в окремому потоці оброблялося :) ),не упевнений що він автор продемонстрованого ним концепту, але вперше я побачив це опублікованим саме від нього, і являє собою навіть швидше не Thread Pool, а Task Pool (хоча можливо я і не правий у трактуванії цього терміну).
Що є багатопоточний додаток? Це додаток, в якому певна кількість потоків виконуэ деякі завдання. Проблема багатьох в тому, що вони не до кінця уловлюють те, що потоки діють окремо один від одного до тих пір, доки активний головний потік. Особисто я прагну писати так, щоб це їм не заважало, але про це пізніше. Також їх проблемою є так званий “індуський” код, який просто і бездумно звідкись копіюється, а програма доводиться до рівня “аби працювало”. Панове, засвойте раз і назавжди: якщо ви не розумієте, як працює та або інша ділянка вашої програми, то перепишіть її так, щоб це було зрозуміло ВАМ, якщо в майбутньому ви доростете до розуміння тих речей, які ви передбачали бездумно скопіювати, то вам без проблем можна буде використовувати цей код. Головним є саме ВАШЕ розуміння того, як працює ваше творіння.
Торкнемося проблеми окремої роботи потоків. Панове, взаємодію потоків варто продумувати до того як ви починаєте писати додаток, а не коли ви його вже написали. В принципі, якщо дотримуватися деяких правил роботи з вихідним кодом застосування, то перероблення програми з однопоточної в багатопоточну відбувається легко, безболісно, і швидко
Відносно активності головного потоку. Коли, як вам здається, ви запускаєте ОДИН потік, фактично працює вже ДВА потоки. Потрібно розуміти, що кількість потоків, активних в даний момент дорівнює кількості потоків, запущених в даний момент вами +1 потік, в якому працює основне тіло додатку. Особисто я прагну писати так, щоб чітко відділяти основний потік від запущених мною. Якщо цього не робити, то можливе передчасне (як вам здається) завершення роботи додатку, хоча насправді додаток відпрацює саме так, як ви його написали.
Начебто на словах зрозуміло, тепер приступаємо до практики. На практиці в CPython є таке понятніє як GIL (Global Interpreter Lock). Під цим мається на увазі глобальне блокування інтерпрітатора в той момент коли потоки вашого додатку звертаються до процесора. Фактично, в кожен окремо взятий момент з процесором працює лише один потік. У зв’язку з цим максимальна кількість потоків, яку взагалі можна запустити в стандартному CPython вагається в районі 350 штук.
Як приклад буде зроблена спроба реалізувати многопоточний парсер www.google.com. Як я вже написав вище, для роботи будуть використані виключно стандартні модулі, для виконання завдання знадобляться модулі urllib2, urllib, queue, threading, re.
По порядку:

#!usr/bin/env python
#-*-encoding:UTF-8-*-
 
#==================<Імпортування необхідних модулів>====================
import urllib2
#Модуль для роботи з протоколом HTTP, високорівневий
import urllib
#Модуль для роботи з протоколом HTTP, більш низькорівневий ніж urllib2,
#фактично з нього необхідна одна функція - urllib.urlquote
import Queue
#Модуль, який представляє собою "Pool", фактично це список, в якому на 
#потрібних місцях вставлені замки таким чином, щоб до нього одночасно 
#міг звертатися лише один потік
import threading
#Модуль для работи з потоками, з нього знадобляться лише 
#threading.active_count, threading.Thread, threading.Thread.start, 
#threading.Rlock
import re
#Модуль для роботи з регулярними виразами, його використання виходить
#за межі статті
import time
#Модуль для роботи з часом, з нього потрібна лише функція sleep
queue = Queue.Queue()
#Обов`язкове присваювання, потрібно робити саме так (тобто імпортувати 
#класс Queue з модуля Queue й ініціювати його)
#==================</Імпортування необхідних модулів>===================
 
#==============================<Налаштування>===========================
PROXY = "10.10.31.103:3128"
#Під час написання статті сиджу за проксі-сервером тому в статті торкаюся
#і цього питання, цим рядком обьявляєтся  глобальна змінна PROXY, у якій
#знаходиться адреса проксі-сервера. Для роботи безпосередньо необхідно 
#вказати значення None
HEADERS = {"User-Agent" : "Opera/9.64 (Windows NT 5.1; U; en) Presto/2.1.1",
           "Accept" : "text/html, application/xml;q=0.9, application/xhtml+xml, image/ png, image/jpeg, image/gif, image/x-xbitmap, */*;q=0.1",
           "Accept-Language" : "ru,uk-UA;q=0.9,uk;q=0.8,en;q=0.7",
           "Accept-Charset" : "iso-8859-1, utf-8, utf-16, *;q=0.1",
           "Accept-Encoding" : "identity, *;q=0",
           "Connection" : "Keep-Alive"}
#Для того, щоб отримати сторінку www.google.com НЕОБХІДНО використовувати
#заголовки браузеру, вони представлені вище в асоціативному масиві HEADERS, 
#відповідають реальним заголовкам браузеру Opera з маленькою модифікацією, ці 
#заголовки означають що клієнт не може приймати zlib compressed data, тобто 
#стислі дані - не хотів я морочитися ще і з разархивіроанієм сторінок, тим 
#більш що не всі сайти їх стискують...
THREADS_COUNT = 10
#В принципі це все налаштування додатка, це-кількість потоків
DEEP = 30
#Це - значення, яке відповідає за глибину сторінок пошуку, які потрібно
#переглядати, фактично ж визначає собою кількість посилань, які будуть
#зібрані зпарсером.
ENCODING = "UTF-8"
#Кодування ваших файлів (необхідно для переводу даних з вашого текстового
#документу з запросами до гуглу у юнікод)
#==============================</Налаштування>==========================
 
LOCK = threading.RLock() #Ось тут те вперше і торкаємося модуля threading
#створюється обьект LOCK, який є классом threading.RLock з
#модулю threading, це -простий замок, який забороняє виконання 
#декількома потоками ділянки коди який йде після виклику його методу 
#acquire() Основною відмінністю threading.RLock від threading.Lock 
#(теж класс з модуля threading) є те, що кожен потік може звертатися до
#об`єкту threading.RLock необмежену кількість разів, обьект 
#threading.Lock може викликатися кожним потоком лише один раз.

Основним принципом для безпроблемної реалізації багатопоточності я рахую модульність коду. Не обов’язково виносити використовувані Вами функції в окремі файли або класи, досить аби хоч би це були окремі функції (Прошу пробачити за каламбур).

Зараз я виділю роботу потоку в окрему функцію, нехай вона буде деяким абстрактним поняттям роботи. На даному етапі поки що деякі моменти будуть непонятни, але пізніше все це вистроїться в зрозумілий алгоритм.

def worker():
#Оголошення функції worker, вхідних аргументів немає
    global queue
    #Тут і надалі я буду обьявляти змінні з глобального простору 
    #імен в локальному для кращої читабельності коди, хоча в написанні 
    #софта таке робити дуже не рекомендую (!)
    while True:
    #Запуск нескінченного циклу, в якому відбуватиметься робота
        try:
        #Обробка помилок, блок try/except, коли обробиться помилка 
        #Queue.Empty це означає, що список завдань порожній, і потік повинен 
        #завершити свою роботу
        	target_link =  queue.get_nowait()
            #Ця строчка втілює собою здобуття завдання потоком з
            #списку завдань queue
        except Queue.Empty, error:
        #Сам перехват помилки
            return
            #Завершення роботи функції
        parsed_data = get_and_parse_page(target_link)
        #Пізніше буде реалізована функція, яка отримуватиме сторінку
        #і діставати з неї необхідні значення
        if parsed_data != "ERROR":
        #Перевірка на те, чи була отримана сторінка
            write_to_file(parsed_data)
            #Також буде реалізована функція для запису зібраних даних у файл
        else:
            queue.put(target_link)
            #Якщо сторінка не була отримана, то додаємо її назад в queue

Головне, що потрібно чітко засвоїти – це алгоритм роботи самого потоку, і що саме потоки повинні обробляти незалежно один від одного. Разом, завдання потоку дуже прості – отримати посилання на сторінку пошуку, передати її у функцию-обработчик, з якої повернуться посилання на знайдені сайти а також title цих сайтів, після записати заслання і title у файл (все це знаходитиметься в parsed_data).

У міру роботи реалізовуються завдання від простої до складної, тому на цих етапах практично непорушені питання багатопоточності. Далі настала черга реалізації функції, яка відповідатиме за запис у файл.

def write_to_file(parsed_data):
#Оголошення функції write_to_file, аргумент – масив даних для запису
    global LOCK
    global ENCODING
    LOCK.acquire()
    #"Накидання замку", наступна далі ділянка коду може виконуватися
    #лише одним потоком в один і той же момент часу
    with open("parsed_data.txt", "a") as out:
    #Використовується with statement, відкривається файл parsed_data.txt з 
    #правами "a", що означає дозапис в кінець файлу, і присвоюється
    #хендлеру на файл ім'я out (я так звик)
        for site in parsed_data:
        #Прохід циклом по всіх елементах parsed data, ім'я активного в 
        #даний момент елементу буде site
            link, title = site[0], site[1]
            #Привласнення змінним link і title значень з кортежу site
            title = title.replace(u"<em>", "").replace(u"</em>", "").replace(u"<b>", "").replace(u"</b>", "")
            #.replace -це заміна HTML-тегів, які проскакують в title
            #і зовсім не потрібні
            out.write(u"{link}|{title}\n".format(link=link, title=title).encode(ENCODING))
            #Проходить сам запис у файл, використовується оператор 
            #форматування рядків .format, на відміну від % він підтримує
            #іменовані аргументи, чим я і не забув скористатися
            #таким чином у файл пишеться рядок вигляду: посилання на сайт | 
            #title сторінки\n -символ перенесення рядка(все це переводиться
            #з юникода у cp1251)
    LOCK.release()
    # "Відмикання"  замку, інакше жоден з наступних 
    #потоків не зможе працювати з цією ділянкою коду. По-хорошому, тут теж потрібно 
    #зробити обробку помилок, але це учбовий приклад, та і помилка там може 
    #виникнути (після добавки замку в цю ділянку коди) лише якщо в час
    #роботи додатка виставити атрибут “лише читання” для даного користувача
    #відносно файлу parsed_data.txt

Далі йде реалізація функції get_and_parse_page :

def get_and_parse_page(target_link):
#Визначення функції, аргумент – посилання на сторінку
    global PROXY
    #Вказує на те, що в даній функції використовується змінна PROXY
    #з глобального простору імен
    global HEADERS
    #Те ж саме і для змінної Headers
    if PROXY is not None:
    #Якщо значення PROXY не дорівнює None
        proxy_handler = urllib2.ProxyHandler( { "http": "http://"+PROXY+"/" } )
        #Створюється Проксі-хендлер з вказаним проксі
        opener = urllib2.build_opener(proxy_handler)
        #Далі створюється opener c створеним раніше Прокси-хендлером
        urllib2.install_opener(opener)
        #І нарешті він встановлюється, тепер немає необхідності в 
        #шаманствах, всі запити в яких використовуватиметься urllib2 
        #(в межах цієї функції прямуватимуть через вказаний раніше 
        #PROXY)
    page_request = urllib2.Request(url=target_link, headers=HEADERS)
    #Створюється обьект Request, який втілює собою Request 
    #instance, фактично це GET запит до сервера з вказаними 
    #параметрами, мені ж необхідно використовувати заголовки...
    try:
    #Обробка всіх можливих помилок, що виникають під час здобуття
    #сторінки, це недобре, але краще ніж повна відсутність обробки
        page = urllib2.urlopen(url=page_request).read().decode("UTF-8", "replace")
        #Змінній page привласнюємо прочитане значення сторінки запиту 
        #переведене в unicode з кодування UTF-8 (кодування 
        #використовувана на www.google.com) (у Python 2.6 unicode -це 
        #окремий тип даних(!))
    except Exception ,error:
    #Саме перехоплення помилки і збереження її значення в змінну error
        print str(error)
        #Виведення помилки в консоль, перед тим перевівши її в строку
        #(просто на всякий випадок)
        return "ERROR"
        #Повернення з функції в тому випадку, якщо під час роботи виникла помилка
    harvested_data = re.findall(r'''\<li\ class\=g\>\<h3\ class\=r\>\<a\ href\=\"(.*?)".*?>(.*?)\<\/a\>\<\/h3\>''', page)
    #Збір із сторінки пошуку посислань і title знайдених сторінок
    #Очищення даних від результатів пошуку по картинкам, блогам і т.д. від гугла
    for data in harvested_data:
    #Для кожного елементу массиву harvested_data присвоюємо ім`я data
        if data[0].startswith("/"):
        #якщо нульовий елемент масиву data (посилання) починається з символу /
            harvested_data.remove(data)
            #Видаляємо його з массиву harvested_data
        if ".google.com" in data[0]:
        #якщо нульовий елемент масиву data (посилання) має в собі .google.com
            harvested_data.remove(data)
            #Також видаляємо його
    return harvested_data
    #Повертаємо зібрані значення з функції

Нарешті дійшла черга до реалізації основного тіла додатка :

def main():
#Оголошення функції, вхідних аргментов немає
    print "STARTED"
    #Вивід в консоль про початок процесу
    global THREADS_COUNT
    global DEEP
    global ENCODING
    #Оголошення про те що ці змінні використовуватимуться
    #з глобального пространства імен
    with open("requests.txt") as requests:
    #Відкриваємо файл requests у якому знаходяться запити до пошукової системи
         for request in requests:
         #На даному файлхендлері доступний ітератор, тому можна 
         #пройтися по файлу циклом, без завантаження файл в оперативку, але це
         #теж не важливо, я все одно його туди завантажу:)
                request = request.translate(None, "\r\n").decode(ENCODING, "replace")
                #Очищення запиту від символів кінця рядка а також переклад 
                #у юникод (з заміною конфліктних символів)
                empty_link = "http://www.google.com/search?hl=ru&client=opera&rls=ru&hs=67v&q={request}&start={N}&sa=N"
                #Це порожня адреса сторінки пошуку, відформатована
                for i in xrange(0, DEEP, 10):
                #Прохід ітератором по діапазону чисел від 0 до DEEP, 
                #який є максимальною глибиною пошуку з
                #кроком в 10, тобто отримуємо з цього діапазону лише 
                #числа десятків, тобто 10, 20, 30 (як йде пошук в гугла)
                     queue.put(empty_link.format(request=request.encode("UTF-8"), N=i))
                     #Додавання в чергу кожного посилання, що згенеровано
                     #і переведення його у кодування utf-8(для гуглу)
    for _ in xrange(THREADS_COUNT):
    #Прохід циклом по діапазону чисел кількості потоків
        thread_ = threading.Thread(target=worker)
        #Створюється потік target-ім'я функції, яка являє собою 
        #ділянку коду, виконувану багатопоточно
        thread_.start()
        #Викликається метод start(), таким чином потік запускається
    while threading.active_count() >1:
    #До тих пір, поки кількість активних потоків більша 1 (значить 
    #запущені потоки продовжують роботу)
        time.sleep(1)
        #Основний потік засинає на 1 секунду
    print "FINISHED"
    #Вивід в консоль про завершення роботи додатка

У результаті отримуємо нормально працюючий багатопоточний парсер. Природно,з багатьма мінусами, але гарно написане я задовбусь коментувати.

P.S. Та я знаю, що комусь цей приклад здасться нерациінальним використанням Queue (привіт cr0w). Але обробку помилок найпростіше робити саме використовуючи його.

P.P.S. Матеріал не претендує на непогрішимість. Природно, тут 100% бидлокод, жодного розуміння мною того що я описую, непонятки з термінами, я-бидлокодер і так далі і тому подібне АЛЕ тут є те, чого вам не пересрать – воно ПРАЦЮЄ, причому працює саме так, як від нього очікується, код зрозумілий і відкоментований так, що буде зрозуміле навіть немовляті.Сподіваюся, що воно хоч комусь допоможе…

Ця стаття+сорци: sendspace.com/file/mw0pac
Код з російськими коментарями: dumpz.org/15202/
Код з українськими коментарями: dumpz.org/15201/

Categories: Other Tags:
  1. December 14th, 2009 at 20:45 | #1
  2. December 15th, 2009 at 01:04 | #2

    Статья полезная, мне помогла.

  3. December 15th, 2009 at 16:56 | #3

    Класна стаття. Хоч з пітоном вже давно і з багатопоточністю добре знайомий, але всеодно деякі моменти стали більш зрозумілішими. Дякую!

  4. ulidtko
    December 16th, 2009 at 06:51 | #4

    Здавалося б, непогана тема, розуміння та термінологія є, коментований код є… Але читати неприємно :( До чого ці всі особисті ремарки в адресу знайомих читачів чи кого там ще? Вони зайві і заважають звичайному читачеві. Хочу вам порадити писати в більш відстороненому стилі, вкладаючи в статтю тільки актуально цікаві речі.

  5. login999
    December 18th, 2009 at 11:17 | #5

    @ulidtko
    Дякую, якщо колись щось буду писати, то на це обов’язково зверну увагу

  6. December 22nd, 2009 at 16:31 | #6

    Частое использование global палит автора как бывшего РНР-ника В)

  7. login999
    December 23rd, 2009 at 10:03 | #7

    Враки ^_^.
    global использовал для того, чтобы было понятно, откуда взялась та или другая переменная в локальном пространстве имен.
    P.S. Для пхпшников )

  8. December 23rd, 2009 at 11:22 | #8

    :) знач. я был прав в своих догадках.

  1. No trackbacks yet.