Pythoni multithreading’i ohud

Lõimedevaheline koostöö teeb programmid küll kiiremaks, kuid etteennustamatu tööjaotuse järjekord ning samade ressurssidega kaasnevad ka ohud.

1. Race conditions

Lõimed muudavad samal ajal samu andmeid. Tulemuseks valed väärtused/ettearvamatud seisundid.

Oht

counter = 0

def increment():
for _ in range(100000):

counter += 1

threads = [threading.Thread(target=increment) for _ in range(4)]

[t.start() for t in threads]

[t.join() for t in threads]

print("Race condition counter =", counter) # Mostly returns false sum

Vältimiseks: kasuta lukku

Nõuanne

counter = 0

lock = threading.Lock()

def increment_safe():
for _ in range(100000):
with lock: # context manager block

counter += 1

threads = [threading.Thread(target=increment_safe) for _ in range(4)]

[t.start() for t in threads]

[t.join() for t in threads]

print("Safe counter =", counter) # Correct result

2. Deadlocks (ummikud)

Lõimed jäävad vastastikku lukust vabastamist ootama ja programm peatub, sest kumbki ei vabasta ressursse.

Oht

lock1 = threading.Lock()

lock2 = threading.Lock()

def task1():
with lock1:

time.sleep(0.1)

with lock2:

print("Task1 done")

def task2():
with lock2:

time.sleep(0.1)

with lock1:

print("Task2 done")

t1 = threading.Thread(target=task1)

t2 = threading.Thread(target=task2)

t1.start();

t2.start()

t1.join();

t2.join() # dead end!

Vältimiseks: läbimõeldud järjekord lukkudel või timeout

Nõuanne

def task1_fixed():

acquired1 = lock1.acquire(timeout=1)

acquired2 = lock2.acquire(timeout=1)

if acquired1 and acquired2:

print("Task1 done safely")

lock2.release()

lock1.release()

def task2_fixed():

acquired1 = lock1.acquire(timeout=1) # Same queue

acquired2 = lock2.acquire(timeout=1)

if acquired1 and acquired2:

print("Task2 done safely")

lock2.release()

lock1.release()

t1 = threading.Thread(target=task1_fixed)

t2 = threading.Thread(target=task2_fixed)

t1.start();

t2.start()

t1.join();

t2.join()

3. Starvation (nälgimine)

Ahnitsevad lõimed ei jaga kunagi ressursse teistega.

Oht

lock = threading.Lock()

def greedy_task():
while True:
with lock:

print("Greedy got lock")

time.sleep(0.1) # monopol

def starving_task():
while True:

got = lock.acquire(timeout=0.01)

if got:

print("Starving finally got lock!")

lock.release()

break

threading.Thread(target=greedy_task, daemon=True).start()

threading.Thread(target=starving_task).start()

Vältimiseks: hoia ressurse Queues või anna ressursse kordamööda kasutades Semaphore'i

Nõuanne

resource = queue.Queue(maxsize=1)

resource.put("token") # just once

def fair_task(name):

token = resource.get() # takes resource

print(f"{name} working")

time.sleep(0.1)

resource.put(token) # releases

for i in range(5):

threading.Thread(target=fair_task, args=(f"Thread {i}",)).start()

4. Livelock

Lõim pole blokeeritud, aga ei tee tööd teise lõime pärast. Näiteks lõimed proovivad üksteisele vastata pidevalt.

Oht

lock = threading.Lock()

def livelock_task(name):
while True:

got = lock.acquire(blocking=False)

if got:

print(f"{name} works")

lock.release()

break

else:

print(f"{name} gives up, trys again.")

time.sleep(0.1) # both give up by turns

t1 = threading.Thread(target=livelock_task, args=("T1",))

t2 = threading.Thread(target=livelock_task, args=("T2",))

t1.start()

t2.start()

t1.join()

t2.join()

Vältimiseks: backoff strateegia

Nõuanne

def livelock_fixed(name):
while True:

got = lock.acquire(blocking=False)

if got:

print(f"{name} works safe")

lock.release()

break

else:

wait = random.uniform(0.05, 0.2) # random backoff

time.sleep(wait)

t1 = threading.Thread(target=livelock_fixed, args=("T1",))

t2 = threading.Thread(target=livelock_fixed, args=("T2",))

t1.start()

t2.start()

t1.join()

t2.join()

5. Resource leak

Ressurssi ei vabastata (nt unustati lock.release()), või lõim lõpetab ootamatult.

Oht

lock = threading.Lock()

def bad_task():

lock.acquire()

print("Locked resource and will never release that now.")

t = threading.Thread(target=bad_task)

t.start()

t.join()

print("Other threads forever waiting.")

Vältimiseks:with-võtmesõna ehk context manager

Nõuanne

def good_task():
with lock:

print("Locked resource and automatically will release it.")

t = threading.Thread(target=good_task)

t.start();

t.join()

6. Unbounded growth / memory leaks

Liiga palju lõimi või ootel objekte.

Oht

threads = []

for i in range(5000):

t = threading.Thread(target=exercise_name)

threads.append(t)

t.start()

print("Newly born thread number:", len(threads))

Vältimiseks:Queue või ThreadPoolExecutor

Nõuanne

def worker(i):

print("Work", i)

with ThreadPoolExecutor(max_workers=10) as pool:

pool.map(worker, range(50)) # 10 threads by turns