Pythoni multithreading’i ohud¶
Lõimedevaheline koostöö teeb programmid küll kiiremaks, kuid etteennustamatu tööjaotuse järjekord ning samade ressurssidega kaasnevad ka ohud.
1. Race conditions¶
Lõimed muudavad samal ajal samu andmeid. Tulemuseks valed väärtused/ettearvamatud seisundid.
Oht
counter = 0
- def increment():
- for _ in range(100000):
counter += 1
threads = [threading.Thread(target=increment) for _ in range(4)]
[t.start() for t in threads]
[t.join() for t in threads]
print("Race condition counter =", counter) # Mostly returns false sum
Vältimiseks: kasuta lukku
Nõuanne
counter = 0
lock = threading.Lock()
- def increment_safe():
- for _ in range(100000):
- with lock: # context manager block
counter += 1
threads = [threading.Thread(target=increment_safe) for _ in range(4)]
[t.start() for t in threads]
[t.join() for t in threads]
print("Safe counter =", counter) # Correct result
2. Deadlocks (ummikud)¶
Lõimed jäävad vastastikku lukust vabastamist ootama ja programm peatub, sest kumbki ei vabasta ressursse.
Oht
lock1 = threading.Lock()
lock2 = threading.Lock()
- def task1():
- with lock1:
time.sleep(0.1)
- with lock2:
print("Task1 done")
- def task2():
- with lock2:
time.sleep(0.1)
- with lock1:
print("Task2 done")
t1 = threading.Thread(target=task1)
t2 = threading.Thread(target=task2)
t1.start();
t2.start()
t1.join();
t2.join() # dead end!
Vältimiseks: läbimõeldud järjekord lukkudel või timeout
Nõuanne
- def task1_fixed():
acquired1 = lock1.acquire(timeout=1)
acquired2 = lock2.acquire(timeout=1)
- if acquired1 and acquired2:
print("Task1 done safely")
lock2.release()
lock1.release()
- def task2_fixed():
acquired1 = lock1.acquire(timeout=1) # Same queue
acquired2 = lock2.acquire(timeout=1)
- if acquired1 and acquired2:
print("Task2 done safely")
lock2.release()
lock1.release()
t1 = threading.Thread(target=task1_fixed)
t2 = threading.Thread(target=task2_fixed)
t1.start();
t2.start()
t1.join();
t2.join()
3. Starvation (nälgimine)¶
Ahnitsevad lõimed ei jaga kunagi ressursse teistega.
Oht
lock = threading.Lock()
- def greedy_task():
- while True:
- with lock:
print("Greedy got lock")
time.sleep(0.1) # monopol
- def starving_task():
- while True:
got = lock.acquire(timeout=0.01)
- if got:
print("Starving finally got lock!")
lock.release()
break
threading.Thread(target=greedy_task, daemon=True).start()
threading.Thread(target=starving_task).start()
Vältimiseks: hoia ressurse Queues või anna ressursse kordamööda kasutades Semaphore'i
Nõuanne
resource = queue.Queue(maxsize=1)
resource.put("token") # just once
- def fair_task(name):
token = resource.get() # takes resource
print(f"{name} working")
time.sleep(0.1)
resource.put(token) # releases
- for i in range(5):
threading.Thread(target=fair_task, args=(f"Thread {i}",)).start()
4. Livelock¶
Lõim pole blokeeritud, aga ei tee tööd teise lõime pärast. Näiteks lõimed proovivad üksteisele vastata pidevalt.
Oht
lock = threading.Lock()
- def livelock_task(name):
- while True:
got = lock.acquire(blocking=False)
- if got:
print(f"{name} works")
lock.release()
break
- else:
print(f"{name} gives up, trys again.")
time.sleep(0.1) # both give up by turns
t1 = threading.Thread(target=livelock_task, args=("T1",))
t2 = threading.Thread(target=livelock_task, args=("T2",))
t1.start()
t2.start()
t1.join()
t2.join()
Vältimiseks: backoff strateegia
Nõuanne
- def livelock_fixed(name):
- while True:
got = lock.acquire(blocking=False)
- if got:
print(f"{name} works safe")
lock.release()
break
- else:
wait = random.uniform(0.05, 0.2) # random backoff
time.sleep(wait)
t1 = threading.Thread(target=livelock_fixed, args=("T1",))
t2 = threading.Thread(target=livelock_fixed, args=("T2",))
t1.start()
t2.start()
t1.join()
t2.join()
5. Resource leak¶
Ressurssi ei vabastata (nt unustati lock.release()
), või lõim lõpetab ootamatult.
Oht
lock = threading.Lock()
- def bad_task():
lock.acquire()
print("Locked resource and will never release that now.")
t = threading.Thread(target=bad_task)
t.start()
t.join()
print("Other threads forever waiting.")
Vältimiseks:with-võtmesõna ehk context manager
Nõuanne
- def good_task():
- with lock:
print("Locked resource and automatically will release it.")
t = threading.Thread(target=good_task)
t.start();
t.join()
6. Unbounded growth / memory leaks¶
Liiga palju lõimi või ootel objekte.
Oht
threads = []
- for i in range(5000):
t = threading.Thread(target=exercise_name)
threads.append(t)
t.start()
print("Newly born thread number:", len(threads))
Vältimiseks:Queue või ThreadPoolExecutor
Nõuanne
- def worker(i):
print("Work", i)
- with ThreadPoolExecutor(max_workers=10) as pool:
pool.map(worker, range(50)) # 10 threads by turns