ThreadPoolExecutor

ThreadPoolExecutor'i on nagu reserv paljude lõimedega. Töö tekkides korjatakse lõimi reservist välja ja pannakse tagasi reservi, kui töö tehtud.

../_images/threadpool.png

Meetodid

  • ThreadPoolExecutor(max_workers=None, thread_name_prefix="", initializer=None, initargs=()) - uue loomine.
    1. max_workers - maksimaalne töötavate lõimede arv.

    2. thread_name_prefix - võimaldab lõimedele määrata nime-prefiksi. Kasulik mitme ThreadPoolExecutor korral.

    3. initializer - funktsioon, mida kutsutakse lõimes töötamist alustades.

    4. initargs - argumendid initializer'ile (näiteks andmebaasi ühenduse loomisel: host, kasutajanimi, parool; või logimise tase, API võti jne).

  • shutdown(wait=True, cancel_futures=False) - lõpetab uute tööde lisamise.
    1. wait=True - ootab kõikide tööde lõpuni.

    2. cancel_futures=True - tühistab alustamata tööd. Kasuta with võtmesõnaga ehk context manageriga.

  • submit(fn, args, kwargs): lisab töö järjekorda ja tagastab Future objekti.
    1. fn võib olla enda loodud funktsioon või sisseehitatud funktsioon (näiteks pow) või lambda funktsiooni.

Enda loodud funktsiooniga submit meetod:

def multiply(n):
    return n * n

with ThreadPoolExecutor() as executor:
    future = executor.submit(multiply, 5)
    print(future.result())   # 25
  • map(fn, *iterables, timeout=None) - teeb kasutaja poolt määratud tehet igale elemendile paralleelselt.
    1. fn - samad tingimused, mis submit meetodis.

    2. iterables - 1 või rohkem konteinertüüpi. map lõpetatakse, kui esimene konteinertüübi elementide lõppedes.

Kolme järjendi vastava elemendi liitmine:

def sum(a, b, c):
    return a + b + c

a = [1, 2, 3]
b = [10, 20, 30]
c = [100, 200, 300]

with ThreadPoolExecutor(max_workers=3) as executor:
    result = executor.map(sum, a, b, c)
    print(list(result))

Võrdne:

sum(1, 10, 100) → 111
sum(2, 20, 200) → 222
sum(3, 30, 300) → 333

Väljund: [111, 222, 333]

Future

ThreadPoolExecutor tagastab Future objekti, mille eelis lõime ees on sisseehitatud funktsioonid ja automaatne tööjaotus.

../_images/thread_vs_pool.png

Meetodid

  • cancel() – katkestab töö.

  • cancelled() – kas töö katkestati.

  • running() – kas tööl.

  • done() – kas töö lõpetati (edukalt või veaga).

  • result(timeout=None) – tagastab töö tulemuse (või erindi töö katkedes või aja lõppedes).

  • exception(timeout=None) – tagastab erindi, kui töö ebaõnnestus.

  • add_done_callback(fn) – lisab callbacki, mis käivitatakse pärast lõpetamist. Ehk funktsiooni tulemus antakse argumendina uuele funktsioonile.

Future'i lihtsus:

def division(x):
    return x / 2

with ThreadPoolExecutor() as executor:
    future = executor.submit(division, 10)
    if not future.cancelled():
        print(future.result())  # 5.0

Meetodid korraga mitme töö korraga haldamiseks:

  • as_completed(fs, timeout=None): tagastab tulemused nende valmimise järjekorras (mitte sisendite järjekorras).

  • wait(fs, timeout=None, return_when=...): tagastab kaks hulka: valmis ja pooleli.
    1. return_when konstanti väärtused võivad olla:
      1. FIRST_EXCEPTION – lõpetab, kui esimene töö lõppeb erindiga.

      2. ALL_COMPLETED – ootab kõigi tööde valmimist.

      3. FIRST_COMPLETED – lõpetab esimese töö valmimisel

Esimise töö valmides töö lõpetatakse:

def job(n):
    time.sleep(n)
    return f"job {n} done"

with ThreadPoolExecutor(max_workers=2) as executor:
    futures = [executor.submit(job, i) for i in [1, 3, 5]]

    done, not_done = wait(futures, return_when=FIRST_COMPLETED)
    print("First jobs done:")
    for f in done:
        print(f.result())
    print("Still not done jobs:", len(not_done))