Python

Python Threading / Concurrency Example

Threads are processes which run in parallel to other threads. In a utopian scenario, if you split a big process in 2 threads, these threads will run in parallel so it would take half the time.

This is not true in most cases. Using CPython, there is a mutex that prevents multiple native threads from executing Python byte codes at once. It’s called GIL (global interpreter lock). This lock is necessary mainly because CPython’s memory management is not thread-safe, but notice that I/O, image processing, and other potentially blocking operations, happen outside the GIL, so it will only become a bottle neck in processes that spend a lot of time inside the GIL.

In most applications nowadays, concurrency is something we all must be able to handle. Mostly in web applications, where one request usually starts a thread, we need to have concurrency and threading in mind so we can write our programs accordingly.

Threading is also a good solution to optimize response times. Given a scenario in which we have to process 4 million objects, a good thing to do would be divide them in 4 groups of a million objects and process them in 4 separated threads.
 

1. Python _thread module

The _thread module is very effective for low level threading, let’s see an example to understand the concept. But keep in mind that since Python 2.4, this module is not used anymore.

The process of spawning a thread is pretty simple. We just need to call a method called start_new_thread, available in the _thread module, which receives a function, and arguments to pass to it. It returns immediately and a thread will run in parallel. Let’s see:

import _thread as thread
import time

executed_count = 0


# Define a function for the thread
def print_time(thread_name, delay):
    global executed_count
    count = 0
    while count < 5:
        time.sleep(delay)
        count += 1
        print("%s: %s" % (thread_name, time.ctime(time.time())))
    executed_count += 1


# Create two threads as follows
try:
    threads = [thread.start_new_thread(print_time, ("Thread-1", 2,)),
               thread.start_new_thread(print_time, ("Thread-2", 4,))]
except:
    print("Error: unable to start thread")

while executed_count < 2:
    pass

When we run this script we’ll see an output that looks like:

Thread-1: Mon Dec 21 12:55:23 2015
Thread-2: Mon Dec 21 12:55:25 2015
Thread-1: Mon Dec 21 12:55:25 2015
Thread-1: Mon Dec 21 12:55:27 2015
Thread-2: Mon Dec 21 12:55:29 2015
Thread-1: Mon Dec 21 12:55:29 2015
Thread-1: Mon Dec 21 12:55:31 2015
Thread-2: Mon Dec 21 12:55:33 2015
Thread-2: Mon Dec 21 12:55:37 2015
Thread-2: Mon Dec 21 12:55:41 2015

So, let’s see what is going on there.

We have a variable called executed_count initialized in zero. Then there is a function called print_time which receives the name of the thread and a delay, and every {delay} seconds it will print the date with the thread name, five times. Then it adds one to the executed_count variable.

Then we create two threads, each of them containing our print_time function, with a name and a delay assigned to them. And we have a while which makes sure the program won’t exit until executed_count is equal or greater than 2.

2. Python threading module

The newer threading module included with Python 2.4 provides much more powerful, high-level support for threads than the _thread module.

2.1. Extending Thread

The most used procedure for spawning a thread using this module, is defining a subclass of the Thread class. Once you’ve done it, you should override the __init__ and run methods.

Once you’ve got your class, you just instantiate an object of it and call the method called start. Let’s see an example of this:

import threading

import time


class MyThread(threading.Thread):
    def __init__(self, name, sleep_time):
        threading.Thread.__init__(self)
        self.name = name
        self.sleep_time = sleep_time

    def run(self):
        print("{} start".format(self.name))
        time.sleep(self.sleep_time)
        print("{} end".format(self.name))


threads = [MyThread("Thread-{}".format(i), i) for i in range(1, 4)]
for t in threads:
    t.start()

Then we run it an see something like:

Thread-1 start
Thread-2 start
Thread-3 start
Thread-1 end
Thread-2 end
Thread-3 end

Of course, we don’t need to name our threads like that. Each Thread instance has a name with a default value that can be changed as the thread is created. Naming threads is useful in server processes with multiple service threads handling different operations. So, let’s change our code to avoid reinventing the wheel. In our constructor:

def __init__(self, sleep_time):
    threading.Thread.__init__(self)
    threading.Thread.__init__(self)
    self.sleep_time = sleep_time

Then the output will look like:

Thread-2 start
Thread-4 start
Thread-6 start
Thread-2 end
Thread-4 end
Thread-6 end

But it usually isn’t so simple, the logic isn’t in the run method ouf our Thread class, so let’s make it a little more real and do the print and sleep in a method outside our MyThread.

2.2. Getting Current Thread Information

Our problem here is that we don’t have our thread name outside of it, and we don’t want that information going around in our function’s signature. But the threading module, has some methods that give us access to the current thread information:

import threading
import time


def my_logic(sleep_time):
    thread_name = threading.current_thread().getName()
    print("{} start".format(thread_name))
    time.sleep(sleep_time)
    print("{} end".format(thread_name))


class MyThread(threading.Thread):
    def __init__(self, sleep_time):
        threading.Thread.__init__(self)
        threading.Thread.__init__(self)
        self.sleep_time = sleep_time

    def run(self):
        my_logic(self.sleep_time)


threads = [MyThread(i) for i in range(1, 4)]
for t in threads:
    t.start()

By executing threading.current_thread(), we gain access to the current thread information. Among that information we can find its status (is_alive()), its daemon flag (isDaemon()), and other useful methods.

2.3. Daemon Threads

Now, let’s talk about daemon threads. Until now, our programs waited for every thread to end before actually terminating, but sometimes we don’t want that behaviour. If we have a thread, pushing status or metrics to a service, we usually don’t care if it has finished or not when we shut down our program, and maybe we don’t want to explicitly terminate it before exiting.

Daemon threads run without blocking the main thread from exiting. They are useful when we have services where there may not be an easy way to interrupt the thread or where letting the thread die in the middle of its work does not lose or corrupt data.

To spawn a daemon thread, we just spawn a normal thread an call its setDaemon() method with True as a parameter. By default threads are not daemon. Let’s se how our program behaves when we make one of those threads daemon:

threads = [MyThread(i) for i in range(1, 4)]
threads[2].setDaemon(True)
for t in threads:
    t.start()

We are now grabbing the last thread we create, and making it daemon. The output will now look like:

Thread-2 start
Thread-4 start
Thread-6 start
Thread-2 end
Thread-4 end

As you can see, the main thread is not waiting for Thread-6 to finish before exiting, daemon threads are terminated when the main thread finished its execution.

Let’s write something that resembles a real-life problem solution. Let’s make a script, that given an array of URL’s, crawls them and saves the html in files.

site-crawler.py

import http.client
import threading
import logging

logging.basicConfig(level=logging.INFO, format='(%(threadName)-10s) %(message)s', )


def save(html, file_absolute_path):
    logging.info("saving {} bytes to {}".format(len(html), file_absolute_path))
    with open(file_absolute_path, 'wb+') as file:
        file.write(html)
        file.flush()


def crawl(req):
    logging.info("executing get request for parameters: {}".format(str(req)))
    connection = http.client.HTTPConnection(req["host"], req["port"])
    connection.request("GET", req["path"])
    response = connection.getresponse()
    logging.info("got {} response http code".format(response.status))
    logging.debug("headers: {}".format(str(response.headers)))
    response_content = response.read()
    logging.debug("actual response: {}".format(response_content))
    return response_content


class MyCrawler(threading.Thread):
    def __init__(self, req, file_path):
        threading.Thread.__init__(self, name="Crawler-{}".format(req["host"]))
        self.req = req
        self.file_path = file_path

    def run(self):
        global executed_crawlers
        html = crawl(self.req)
        save(html, self.file_path)


def __main__():
    continue_input = True
    threads = []
    while continue_input:
        host = input("host: ")
        port = 80  # int(input("port: "))
        path = "/"  # input("path: ")
        file_path = input("output file absolute path: ")
        req = {"host": host, "port": port, "path": path}
        threads.append(MyCrawler(req, file_path))
        continue_input = input("add another? (y/N) ") == "y"

    for t in threads:
        t.start()
        # t.join()

__main__()


So, what do we’ve got here? This is a script that asks the user to input a host and the absolute path of the file to which it’ll write the site’s output html, and measures the time it’ll take. The script as is gives the following input:

host: www.google.com.ar
output file absolute path: /tmp/google-ar.html
add another? (y/N) y
host: www.google.com.br
output file absolute path: /tmp/google-br.html
add another? (y/N) y
host: www.google.com.co
output file absolute path: /tmp/google-co.html
add another? (y/N) y
host: www.google.cl
output file absolute path: /tmp/google-cl.html
add another? (y/N)
(Crawler-www.google.com.ar) executing get request for parameters: {'path': '/', 'host': 'www.google.com.ar', 'port': 80}
(Crawler-www.google.com.br) executing get request for parameters: {'path': '/', 'host': 'www.google.com.br', 'port': 80}
(Crawler-www.google.com.co) executing get request for parameters: {'path': '/', 'host': 'www.google.com.co', 'port': 80}
(Crawler-www.google.cl) executing get request for parameters: {'path': '/', 'host': 'www.google.cl', 'port': 80}
(Crawler-www.google.com.co) got 200 response http code
(Crawler-www.google.com.ar) got 200 response http code
(Crawler-www.google.com.br) got 200 response http code
(Crawler-www.google.com.co) saving 53181 bytes to /tmp/google-co.html
(Crawler-www.google.com.ar) saving 53008 bytes to /tmp/google-ar.html
(Crawler-www.google.com.br) saving 53605 bytes to /tmp/google-br.html
(Crawler-www.google.cl) got 200 response http code
(Crawler-www.google.cl) saving 53069 bytes to /tmp/google-cl.html

As the log shows us, threads are running in parallel, a thread runs while the others make I/O operations (write to a file, or connect via http to a server). Now, there is a line commented on the __main__() function that says t.join(), the join method, causes the current thread to wait for the thread it joined before continuing its execution, and the log looks like:

host: www.google.com.ar
output file absolute path: /tmp/google-ar.html
add another? (y/N) y
host: www.google.com.br
output file absolute path: /tmp/google-ar.html
add another? (y/N) y
host: www.google.com.co
output file absolute path: /tmp/google-co.html
add another? (y/N) y
host: www.google.cl
output file absolute path: /tmp/google-cl.html
add another? (y/N)
(Crawler-www.google.com.ar) executing get request for parameters: {'port': 80, 'path': '/', 'host': 'www.google.com.ar'}
(Crawler-www.google.com.ar) got 200 response http code
(Crawler-www.google.com.ar) saving 52973 bytes to /tmp/google-ar.html
(Crawler-www.google.com.br) executing get request for parameters: {'port': 80, 'path': '/', 'host': 'www.google.com.br'}
(Crawler-www.google.com.br) got 200 response http code
(Crawler-www.google.com.br) saving 54991 bytes to /tmp/google-ar.html
(Crawler-www.google.com.co) executing get request for parameters: {'port': 80, 'path': '/', 'host': 'www.google.com.co'}
(Crawler-www.google.com.co) got 200 response http code
(Crawler-www.google.com.co) saving 53172 bytes to /tmp/google-co.html
(Crawler-www.google.cl) executing get request for parameters: {'port': 80, 'path': '/', 'host': 'www.google.cl'}
(Crawler-www.google.cl) got 200 response http code
(Crawler-www.google.cl) saving 53110 bytes to /tmp/google-cl.html

See? First it crawls google Argentina, then Brazil, and so on. You sure are wondering why would someone do this. Well… this is not the only use case of the join method. Imagine these threads where daemon, and you don’t have control over that. You would have to instantiate a variable which holds the amount of threads executed and then wait for it to equal the number of threads that must be executed before exiting the main thread. It’s not very elegant.

2.4. Joining Threads

Well, there is another way, let’s make these threads daemon, just to experiment a little bit, and wait for all of them to finish before exiting the main thread:

site-crawler.py

import http.client
import threading
import logging

logging.basicConfig(level=logging.INFO, format='(%(threadName)-10s) %(message)s', )


def save(html, file_absolute_path):
    logging.info("saving {} bytes to {}".format(len(html), file_absolute_path))
    with open(file_absolute_path, 'wb+') as file:
        file.write(html)
        file.flush()


def crawl(req):
    logging.info("executing get request for parameters: {}".format(str(req)))
    connection = http.client.HTTPConnection(req["host"], req["port"])
    connection.request("GET", req["path"])
    response = connection.getresponse()
    logging.info("got {} response http code".format(response.status))
    logging.debug("headers: {}".format(str(response.headers)))
    response_content = response.read()
    logging.debug("actual response: {}".format(response_content))
    return response_content


class MyCrawler(threading.Thread):
    def __init__(self, req, file_path):
        threading.Thread.__init__(self, name="Crawler-{}".format(req["host"]), daemon=True)
        self.req = req
        self.file_path = file_path

    def run(self):
        global executed_crawlers
        html = crawl(self.req)
        save(html, self.file_path)


def __main__():
    continue_input = True
    threads = []
    while continue_input:
        host = input("host: ")
        port = 80  # int(input("port: "))
        path = "/"  # input("path: ")
        file_path = input("output file absolute path: ")
        req = {"host": host, "port": port, "path": path}
        threads.append(MyCrawler(req, file_path))
        continue_input = input("add another? (y/N) ") == "y"

    for t in threads:
        t.start()

    current_thread = threading.currentThread()
    for thread in threading.enumerate():
        if thread is not current_thread:
            thread.join()


__main__()

Here, we are creating every thread as daemon, but we are enumerating every active thread by calling threading.enumerate() and joining every thread which is not the main one. The behavior remains the same:

host: www.google.com.ar
output file absolute path: /tmp/google-ar.html
add another? (y/N) y
host: www.google.com.br
output file absolute path: /tmp/google-br.html
add another? (y/N) y
host: www.google.com.co
output file absolute path: /tmp/google-co.html
add another? (y/N) y
host: www.google.cl
output file absolute path: /tmp/google-cl.html
add another? (y/N)
(Crawler-www.google.com.ar) executing get request for parameters: {'port': 80, 'host': 'www.google.com.ar', 'path': '/'}
(Crawler-www.google.com.br) executing get request for parameters: {'port': 80, 'host': 'www.google.com.br', 'path': '/'}
(Crawler-www.google.com.co) executing get request for parameters: {'port': 80, 'host': 'www.google.com.co', 'path': '/'}
(Crawler-www.google.cl) executing get request for parameters: {'port': 80, 'host': 'www.google.cl', 'path': '/'}
(Crawler-www.google.com.ar) got 200 response http code
(Crawler-www.google.cl) got 200 response http code
(Crawler-www.google.com.br) got 200 response http code
(Crawler-www.google.com.ar) saving 52980 bytes to /tmp/google-ar.html
(Crawler-www.google.cl) saving 53088 bytes to /tmp/google-cl.html
(Crawler-www.google.com.br) saving 53549 bytes to /tmp/google-br.html
(Crawler-www.google.com.co) got 200 response http code
(Crawler-www.google.com.co) saving 53117 bytes to /tmp/google-co.html

2.5. Time Threads

Another thing that’s worthy of being pointed out, is the existence of the class threading.Timer. It is basically a subclass of Thread which, given a delay and a function, it executes the function after the delay has passed. Also, it can be cancelled at any point.

import threading
import time
import logging

logging.basicConfig(level=logging.DEBUG, format='(%(threadName)-10s) %(message)s',)

def delayed():
    logging.debug('worker running')
    return

t1 = threading.Timer(3, delayed)
t1.setName('t1')
t2 = threading.Timer(3, delayed)
t2.setName('t2')

logging.debug('starting timers')
t1.start()
t2.start()

logging.debug('waiting before canceling %s', t2.getName())
time.sleep(2)
logging.debug('canceling %s', t2.getName())
t2.cancel()
logging.debug('done')

If we execute it:

(MainThread) starting timers
(MainThread) waiting before canceling t2
(MainThread) canceling t2
(MainThread) done
(t1        ) worker running

Here, we are creating two timers, both execute the same function after 3 seconds. Then we wait 2 seconds and cancel one of them. In the output we can see only one of the timers executed the delayed function.

This is useful on scenarios where we need to execute some process if something didn’t happen in an interval of time, or even for scheduling.

2.6. Events: Communication Between Threads

Now, we all know that the idea of using threads is making tasks independent from each other, but some times we need for a thread to wait for an event caused by another. Python provides a way of signaling between threads. To experiment with this, we’ll make a race:

race.py

import threading


class Racer(threading.Thread):

    def __init__(self, name, start_signal):
        threading.Thread.__init__(self, name=name)
        self.start_signal = start_signal

    def run(self):
        self.start_signal.wait()
        print("I, {}, got to the goal!".format(self.name))


class Race:

    def __init__(self, racer_names):
        self.start_signal = threading.Event()
        self.racers = [Racer(name, self.start_signal) for name in racer_names]
        for racer in self.racers:
            racer.start()

    def start(self):
        self.start_signal.set()


def __main__():
    race = Race(["rabbit", "turtle", "cheetah", "monkey", "cow", "horse", "tiger", "lion"])
    race.start()


__main__()

Here, we created a subclass of Thread called Racer, which on its run waits for the event to be set and then prints “I, {name}, got to the goal!”.

We create a couple of threads, an then set the event, so they all try to start at the same time, the output is interesting:

first run output

I, rabbit, got to the goal!
I, lion, got to the goal!
I, turtle, got to the goal!
I, cheetah, got to the goal!
I, monkey, got to the goal!
I, cow, got to the goal!
I, horse, got to the goal!
I, tiger, got to the goal!

second run output

I, lion, got to the goal!
I, turtle, got to the goal!
I, monkey, got to the goal!
I, horse, got to the goal!
I, cow, got to the goal!
I, tiger, got to the goal!
I, cheetah, got to the goal!
I, rabbit, got to the goal!

Here we can see how the rabbit won the first race, but ended last on the second. Either he got tired, or our event behaves just as we wanted it to behave.

If we didn’t use the event, and start each thread in a loop, the first thread would have an advantage of milliseconds over the last one. And we all know every millisecond counts on computer times.

2.7. Locking Resources

Sometimes we have a couple threads accessing the same resource, and if it’s not thread safe, we don’t want threads to access it at the same time. One solution to this problem could be locking.

A side note: Python’s built-in data structures (lists, dictionaries, etc.) are thread-safe as a side-effect of having atomic byte-codes for manipulating them (the GIL is not released in the middle of an update). Other data structures implemented in Python, or simpler types like integers and floats, don’t have that protection.

Let’s imagine, just to make a fun example, that dictionaries in python are not thread safe. We’ll make an on-memory repository and make a couple of threads read and write data to it:

locking.py

import random
import threading
import logging

logging.basicConfig(level=logging.INFO,
                    format='[%(levelname)s] (%(threadName)-s) (%(module)-s) (%(funcName)-s) %(message)s',
                    filename='/tmp/locking-py.log')


class Repository:
    def __init__(self):
        self.repo = {}
        self.lock = threading.Lock()

    def create(self, entry):
        logging.info("waiting for lock")
        self.lock.acquire()
        try:
            logging.info("acquired lock")
            new_id = len(self.repo.keys())
            entry["id"] = new_id
            self.repo[new_id] = entry
        finally:
            logging.info("releasing lock")
            self.lock.release()

    def find(self, entry_id):
        logging.info("waiting for lock")
        self.lock.acquire()
        try:
            logging.info("acquired lock")
            return self.repo[entry_id]
        except KeyError:
            return None
        finally:
            logging.info("releasing lock")
            self.lock.release()

    def all(self):
        logging.info("waiting for lock")
        self.lock.acquire()
        try:
            logging.info("acquired lock")
            return self.repo
        finally:
            logging.info("releasing lock")
            self.lock.release()


class ProductRepository(Repository):
    def __init__(self):
        Repository.__init__(self)

    def add_product(self, description, price):
        self.create({"description": description, "price": price})


class PurchaseRepository(Repository):
    def __init__(self, product_repository):
        Repository.__init__(self)
        self.product_repository = product_repository

    def add_purchase(self, product_id, qty):
        product = self.product_repository.find(product_id)
        if product is not None:
            total_amount = product["price"] * qty
            self.create({"product_id": product_id, "qty": qty, "total_amount": total_amount})

    def sales_by_product(self, product_id):
        sales = {"product_id": product_id, "qty": 0, "total_amount": 0}
        all_purchases = self.all()
        for k in all_purchases:
            purchase = all_purchases[k]
            if purchase["product_id"] == sales["product_id"]:
                sales["qty"] += purchase["qty"]
                sales["total_amount"] += purchase["total_amount"]
        return sales


class Buyer(threading.Thread):
    def __init__(self, name, product_repository, purchase_repository):
        threading.Thread.__init__(self, name="Buyer-" + name)
        self.product_repository = product_repository
        self.purchase_repository = purchase_repository

    def run(self):
        for i in range(0, 1000):
            max_product_id = len(self.product_repository.all().keys())
            product_id = random.randrange(0, max_product_id + 1, 1)
            qty = random.randrange(0, 100, 1)
            self.purchase_repository.add_purchase(product_id, qty)


class ProviderAuditor(threading.Thread):
    def __init__(self, product_id, purchase_repository):
        threading.Thread.__init__(self, name="Auditor-product_id=" + str(product_id))
        self.product_id = product_id
        self.purchase_repository = purchase_repository

    def run(self):
        logging.info(str(self.purchase_repository.sales_by_product(self.product_id)))


def __main__():
    product_repository = ProductRepository()
    purchase_repository = PurchaseRepository(product_repository)

    input_another_product = True
    while input_another_product:
        description = input("product description: ")
        price = float(input("product price: "))
        product_repository.add_product(description, price)
        input_another_product = input("continue (y/N): ") == "y"

    buyers = [Buyer("carlos", product_repository, purchase_repository),
              Buyer("juan", product_repository, purchase_repository),
              Buyer("mike", product_repository, purchase_repository),
              Buyer("sarah", product_repository, purchase_repository)]

    for b in buyers:
        b.start()
        b.join()

    for i in product_repository.all():
        ProviderAuditor(i, purchase_repository).start()


__main__()

As you see, both resources (purchases and products) are extending a class Repository which has locks for every access method (let’s assume every developer will know that he mustn’t access the repository’s dictionary directly).

This lock will guarantee that only one thread at a time can access one repository. One thing to notice is how the lock is released in a finally block, you should be very careful with that. If you don’t put the release in a finally block, whenever an exception is raised and interrupts the function’s execution, your lock will not be released, and there will be no way to access that resource anymore.

Now, let’s execute this code and input something like:

product description: a
product price: 1
continue (y/N): y
product description: b
product price: 2
continue (y/N): y
product description: c
product price: 3
continue (y/N): y
product description: d
product price: 4
continue (y/N): y
product description: e
product price: 5
continue (y/N): y
product description: f
product price: 6
continue (y/N): y
product description: g
product price: 7
continue (y/N): y
product description: h
product price: 8
continue (y/N): y
product description: i
product price: 9
continue (y/N):

As you see, the logger was configured to output its log to a file. We don’t care about the logging of the Buyer threads, since they perform a thousand actions each. That log won’t be readable, BUT, ProviderAuditor threads will log some very interesting information. So we run grep "Auditor" /tmp/locking-py.log and see:

[INFO] (Auditor-product_id=0) (locking) (all) waiting for lock
[INFO] (Auditor-product_id=0) (locking) (all) acquired lock
[INFO] (Auditor-product_id=0) (locking) (all) releasing lock
[INFO] (Auditor-product_id=1) (locking) (all) waiting for lock
[INFO] (Auditor-product_id=0) (locking) (run) {'total_amount': 19850.0, 'product_id': 0, 'qty': 19850}
[INFO] (Auditor-product_id=2) (locking) (all) waiting for lock
[INFO] (Auditor-product_id=1) (locking) (all) acquired lock
[INFO] (Auditor-product_id=3) (locking) (all) waiting for lock
[INFO] (Auditor-product_id=4) (locking) (all) waiting for lock
[INFO] (Auditor-product_id=5) (locking) (all) waiting for lock
[INFO] (Auditor-product_id=1) (locking) (all) releasing lock
[INFO] (Auditor-product_id=6) (locking) (all) waiting for lock
[INFO] (Auditor-product_id=7) (locking) (all) waiting for lock
[INFO] (Auditor-product_id=1) (locking) (run) {'total_amount': 41586.0, 'product_id': 1, 'qty': 20793}
[INFO] (Auditor-product_id=2) (locking) (all) acquired lock
[INFO] (Auditor-product_id=2) (locking) (all) releasing lock
[INFO] (Auditor-product_id=2) (locking) (run) {'total_amount': 60294.0, 'product_id': 2, 'qty': 20098}
[INFO] (Auditor-product_id=3) (locking) (all) acquired lock
[INFO] (Auditor-product_id=3) (locking) (all) releasing lock
[INFO] (Auditor-product_id=3) (locking) (run) {'total_amount': 86752.0, 'product_id': 3, 'qty': 21688}
[INFO] (Auditor-product_id=4) (locking) (all) acquired lock
[INFO] (Auditor-product_id=8) (locking) (all) waiting for lock
[INFO] (Auditor-product_id=4) (locking) (all) releasing lock
[INFO] (Auditor-product_id=4) (locking) (run) {'total_amount': 93960.0, 'product_id': 4, 'qty': 18792}
[INFO] (Auditor-product_id=5) (locking) (all) acquired lock
[INFO] (Auditor-product_id=5) (locking) (all) releasing lock
[INFO] (Auditor-product_id=5) (locking) (run) {'total_amount': 109776.0, 'product_id': 5, 'qty': 18296}
[INFO] (Auditor-product_id=6) (locking) (all) acquired lock
[INFO] (Auditor-product_id=6) (locking) (all) releasing lock
[INFO] (Auditor-product_id=6) (locking) (run) {'total_amount': 140945.0, 'product_id': 6, 'qty': 20135}
[INFO] (Auditor-product_id=7) (locking) (all) acquired lock
[INFO] (Auditor-product_id=7) (locking) (all) releasing lock
[INFO] (Auditor-product_id=7) (locking) (run) {'total_amount': 164152.0, 'product_id': 7, 'qty': 20519}
[INFO] (Auditor-product_id=8) (locking) (all) acquired lock
[INFO] (Auditor-product_id=8) (locking) (all) releasing lock
[INFO] (Auditor-product_id=8) (locking) (run) {'total_amount': 182475.0, 'product_id': 8, 'qty': 20275}

There are our 8 ProviderAuditor threads, the first one to acquire the lock is the Auditor-product_id=0, then releases it and prints our sales. Then goes Auditor-product_id=1 and 2, 3, 4 and 5 are waiting. And it goes on and on. Now (again, imagining python’s dictionaries are not thread safe) our resources are thread safe.

Another side note here: Let’s imagine another scenario. We have a thread and a resource. The thread locks de resource to write some data, and in the middle, it needs to lock it again to read some other data without releasing the first lock. Well, we have a problem here… Normal Lock objects can not be acquired more than once, even by the same thread. Changing it is easy, just substitute Lock with RLock, which is a Re-entrant Lock and will provide access to a locked resource, only to the thread which performed the lock.

Locks implement the context manager API and are compatible with the with statement. Using with removes the need to explicitly acquire and release the lock. Let’s modify our Repository class code to make it prettier:

locking.py

class Repository:
    def __init__(self):
        self.repo = {}
        self.lock = threading.Lock()

    def create(self, entry):
        logging.info("waiting lock")
        with self.lock:
            logging.info("acquired lock")
            new_id = len(self.repo.keys())
            entry["id"] = new_id
            self.repo[new_id] = entry

    def find(self, entry_id):
        logging.info("waiting for lock")
        with self.lock:
            try:
                logging.info("acquired lock")
                return self.repo[entry_id]
            except KeyError:
                return None

    def all(self):
        logging.info("waiting for lock")
        with self.lock:
            logging.info("acquired lock")
            return self.repo

And the behavior remains the same:

[INFO] (Auditor-product_id=0) (locking) (all) waiting for lock
[INFO] (Auditor-product_id=0) (locking) (all) acquired lock
[INFO] (Auditor-product_id=1) (locking) (all) waiting for lock
[INFO] (Auditor-product_id=0) (locking) (run) {'product_id': 0, 'total_amount': 19098.0, 'qty': 19098}
[INFO] (Auditor-product_id=2) (locking) (all) waiting for lock
[INFO] (Auditor-product_id=1) (locking) (all) acquired lock
[INFO] (Auditor-product_id=3) (locking) (all) waiting for lock
[INFO] (Auditor-product_id=4) (locking) (all) waiting for lock
[INFO] (Auditor-product_id=5) (locking) (all) waiting for lock
[INFO] (Auditor-product_id=1) (locking) (run) {'product_id': 1, 'total_amount': 36344.0, 'qty': 18172}
[INFO] (Auditor-product_id=6) (locking) (all) waiting for lock
[INFO] (Auditor-product_id=2) (locking) (all) acquired lock
[INFO] (Auditor-product_id=7) (locking) (all) waiting for lock
[INFO] (Auditor-product_id=8) (locking) (all) waiting for lock
[INFO] (Auditor-product_id=9) (locking) (all) waiting for lock
[INFO] (Auditor-product_id=2) (locking) (run) {'product_id': 2, 'total_amount': 57555.0, 'qty': 19185}
[INFO] (Auditor-product_id=3) (locking) (all) acquired lock
[INFO] (Auditor-product_id=3) (locking) (run) {'product_id': 3, 'total_amount': 72292.0, 'qty': 18073}
[INFO] (Auditor-product_id=4) (locking) (all) acquired lock
[INFO] (Auditor-product_id=4) (locking) (run) {'product_id': 4, 'total_amount': 88835.0, 'qty': 17767}
[INFO] (Auditor-product_id=5) (locking) (all) acquired lock
[INFO] (Auditor-product_id=5) (locking) (run) {'product_id': 5, 'total_amount': 110754.0, 'qty': 18459}
[INFO] (Auditor-product_id=6) (locking) (all) acquired lock
[INFO] (Auditor-product_id=6) (locking) (run) {'product_id': 6, 'total_amount': 129766.0, 'qty': 18538}
[INFO] (Auditor-product_id=7) (locking) (all) acquired lock
[INFO] (Auditor-product_id=7) (locking) (run) {'product_id': 7, 'total_amount': 152576.0, 'qty': 19072}
[INFO] (Auditor-product_id=8) (locking) (all) acquired lock
[INFO] (Auditor-product_id=8) (locking) (run) {'product_id': 8, 'total_amount': 150210.0, 'qty': 16690}
[INFO] (Auditor-product_id=9) (locking) (all) acquired lock
[INFO] (Auditor-product_id=9) (locking) (run) {'product_id': 9, 'total_amount': 160150.0, 'qty': 16015}

2.8. Limiting Concurrent Access to Resources

Using a lock like that, ensures only one thread at a time can access a resource, but imagine a data base access. Maybe you have a connection pool of, at most, 100 connections. In this case you want concurrent access, but you don’t want more than a 100 threads to access this resource at once. Semaphore comes to help, it tells the Lock how many threads can acquire lock at once. Let’s modify our ProviderAuditor code to let at most 5 providers acquire lock at the same time:

locking.py

class ProviderAuditor(threading.Thread):
    def __init__(self, product_id, purchase_repository):
        threading.Thread.__init__(self, name="Auditor-product_id=" + str(product_id))
        self.product_id = product_id
        self.purchase_repository = purchase_repository
        self.semaphore = threading.Semaphore(5)

    def run(self):
        with self.semaphore:
            logging.info(str(self.purchase_repository.sales_by_product(self.product_id)))

2.9. Thread-Specific Data

We often need data to be only accessible from one thread (eg.: identifiers of the current process), python provides the local() method which returns data that is only accessible from one thread, here goes an example:

process-identifier.py

import logging
import random

from threading import Thread, local

logging.basicConfig(level=logging.INFO,
                    format='[%(levelname)s] (%(threadName)-s) (%(module)-s) (%(funcName)-s) %(message)s',)


def my_method(data):
    try:
        logging.info(str(data.value))
    except AttributeError:
        logging.info("data does not have a value yet")


class MyProcess(Thread):
    def __init__(self):
        Thread.__init__(self)

    def run(self):
        data = local()
        my_method(data)
        data.value = {"process_id": random.randint(0, 1000)}
        my_method(data)

for i in range(0, 4):
    MyProcess().start()

It’s pretty easy to use, a very useful. If you run it you’ll see something like:

[INFO] (Thread-1) (process-identifier) (my_method) data does not have a value yet
[INFO] (Thread-1) (process-identifier) (my_method) {'process_id': 567}
[INFO] (Thread-2) (process-identifier) (my_method) data does not have a value yet
[INFO] (Thread-2) (process-identifier) (my_method) {'process_id': 477}
[INFO] (Thread-3) (process-identifier) (my_method) data does not have a value yet
[INFO] (Thread-3) (process-identifier) (my_method) {'process_id': 812}
[INFO] (Thread-4) (process-identifier) (my_method) data does not have a value yet
[INFO] (Thread-4) (process-identifier) (my_method) {'process_id': 981}

Every thread has to initialize local().value, data from other threads will never be available there.

3. Download the Code Project

This was an example on Threading in Python.

Download
You can download the full source code of this example here: python-threading

Sebastian Vinci

Sebastian is a full stack programmer, who has strong experience in Java and Scala enterprise web applications. He is currently studying Computers Science in UBA (University of Buenos Aires) and working a full time job at a .com company as a Semi-Senior developer, involving architectural design, implementation and monitoring. He also worked in automating processes (such as data base backups, building, deploying and monitoring applications).
Subscribe
Notify of
guest

This site uses Akismet to reduce spam. Learn how your comment data is processed.

0 Comments
Inline Feedbacks
View all comments
Back to top button