Wednesday, May 24, 2017

Python Multiprocessing behaves differently on Windows and Linux



Python is a language that embodies the philosophy “if it walks like a duck and talks like a duck, it's a duck”, I know this is referring to Python’s type mechanism, but Python is so intuitive, that whenever it walks and talks like a duck, I just don’t think twice about it and assume it will behave like a duck. That is why I am surprised that a script that runs well on Windows fails mysteriously on Linux. 

This script started out as a single-threading one, then I tried to change it to use multiple threads using concurrent.futures.ThreadPoolExecutor, it turned out, wait for it, the performance with multithreads was a disaster. This is because the task my script does is very CPU-intensive, under this condition, multi-threading is actually quite slower than single-threading. So I switched to use multiprocessing.pool, it worked like a charm on Windows: performance was improved greatly. The surprise came when I deployed the script to Linux, all sorts of weird errors were thrown out by SQLAlchemy (my script uses SQLAlchemy to query and save data in MySQL).

I researched into this, and found out multiprocessing behaves quite differently on Windows and Linux, which is what I am going to share with you in this blog.  

To simplify the research, I used two simple scripts:

  •   mppool_0.py
  •   mppool_1.py

mppool_1.py is very simple, and is imported into mppool_0.py, we just want to find out when and how this script will be imported by the main script.


import os

print("imported by process:"+str(os.getpid()))

gv1=25


mppool_0.py is the main script:


import mppool_1

import os

from random import randint



gv=None

def longRunningFunc(n):

    import time

    time.sleep(randint(1,10))

    print("in process:"+str(os.getpid())+" global variable is:"+str(gv))


    mppool_1.gv1 +=mppool_1.gv1

    print("in process:"+str(os.getpid())+" global variable in imported is:"+str(mppool_1.gv1))

    return n



def _workInitialize():

    print("initialize:"+str(os.getpid()))

    

def work():

    from multiprocessing import Pool

    pool = Pool(processes=2, initializer=_workInitialize)

    ret=pool.map(longRunningFunc,range(4))

    print("pool is finished")

    print(ret)


gv=5

print("invoked before __main__block by process:"+str(os.getpid())+" global variable is:"+str(gv))


if __name__ == '__main__':

    gv=6

    print("invoked in __main__ block by process:"+str(os.getpid())+" global variable is:"+str(gv))

    work()


Some basic knowledge: 

  • pool = Pool(processes=2, initializer=_workInitialize) starts two processes, which will be initialized by function _workInitialize. If processes parameter is not given, Pool will start as many process as the number of CPUs.

  • pool.map(longRunningFunc,range(4)) ,  longRunningFunc is the task that will be run by child-processes, the second parameter is an array, the length of the array is the count of tasks, each element of the array will be passed to  longRunningFunc as its parameter. In this example, there will be 4 tasks run by 2 processes.

  • pool.map() is synchronous, it will wait until all child processes have finished. The return value is the results from all child processes.

  •  __name__  is a special variable. If the python interpreter is running the module as the main program, it sets the __name__ to "__main__". If this file is being imported from another module, __name__ will be set to the module's name.


Let us compare the outputs from running Windows and Linux (left is Windows, right is Linux):

Let us first check out things that are the same:

  • Both starts two child-processes

  • Tasks are assigned to these 2 child-processes. Child-processes behaves like threads, after they finish one task, they get another. On the left, every child-process gets to execute two tasks; on the right, one child-process runs faster, and it gets to run 3 tasks.

  •   Unlike multithreading, multi-process won’t have data-contention issues. But since child-processes are recycled, be careful how you modify your data, which will be carried to the next round of running the child-process. In this example, you can see variable mppool_1.gv1 is updated in subsequent runs.

Now let us check the differences:

  • Through the output, we can see on Windows, a child-process runs the same program as the main-process: it also imports mppool_1.py, and sets the global variable gv to 5.  if __name__ == '__main__' prevents child-processes from entering in, as for child-processes, __name__ is set to “__mp_main__”, so things inside the if __name__ == '__main__'  block is only run by the parent process.
On Windows, Pool has to be configured with if __name__ == '__main__'  block, otherwise, the script will simply run into a cycle with the following error thrown out:
RuntimeError:
        An attempt has been made to start a new process before the
        current process has finished its bootstrapping phase.

        This probably means that you are not using fork to start your
        child processes and you have forgotten to use the proper idiom
        in the main module:

            if __name__ == '__main__':
                freeze_support()
                ...

  •  On the other hand, on Linux, a child-process starts its life at the point when the parent process starts it and inherits everything from the parent process. In this example, a child-process doesn’t import mppool_1.py, but it can still access it as it inherits it from the parent process; it doesn’t set variable gv to 5, and when it runs, it sees variable gv as 6, not 5 because the parent process has changed gv to 6 before the parent process starts child-processes.
This means, on Linux, you do not need to place Pool setup inside if __name__ == '__main__'  . But you still should do it for cross-platform compatibility.

This difference is because on Linux, multiprocesses uses fork() to start a child-process, which inherits the memory status of the parent process. On Windows, there is no fork(), so a child-process starts running from scratch. 

The differences in this example is easy to understand, but back to my script which prompted me to do the research in the first place, my script uses a SQLAlchemy.engine()to query/insert into MySql, in Windows, it works fine, sudo code is like the following:
def work():
       pool=Pool()
       pool.map(_longRunningFunc)
def _longRunningFunc(n): #this is the task run by each child process
       DBUtil.engine.execute()…. # use engine to do some mysql stuff

DBUtil.init () #initializes SQLAlchemy.engine()
if __name__ == '__main__'
       work()

In windows, every child process gets to run DBUtil.init (), and gets to initialize SQLAlchemy.engine(), so everything works out ok. In Linux, SQLAlchemy.engine() is actually shared by all processes. Behind the scene, SQLAlchemy.engine() maintains a pool of connections, a connection is a physical tcp/ip connection with the MySQL database, when they are shared across processes, all weird things happen. 

Initialize a process is easy, as Pool()has a parameter initializer, what is tricky is there is Pool() doesn’t provide finalizer. And this turns out to be another big difference between Windows and Linux. 

Let us continue to use the two simple scripts as an example. 

On windows, cleanup a process is easy, just add the following at the end of mppool_0.py, and you shall see all 3 processes (the 2 child processes and the parent process) run the exit_hanlder()function when they exit.

def exit_handler():

    print("finished process:"+str(os.getpid()))

import atexit

atexit.register(exit_handler)


This however, doesn’t work on Linux. After digging through multiprocessing code, the reason is on multiprocessing/popen_fork.py file:

def _launch(self, process_obj):

    code = 1

    parent_r, child_w = os.pipe()

    self.pid = os.fork()

    if self.pid == 0:

        try:

            os.close(parent_r)

            if 'random' in sys.modules:

                import random

                random.seed()

            code = process_obj._bootstrap()

        finally:

            os._exit(code)

    else:

        os.close(child_w)

        util.Finalize(self, os.close, (parent_r,))

        self.sentinel = parent_r


Child process is ended with os._exit(), on https://docs.python.org/2/library/atexit.html, it is clearly said: The functions registered via this module are not called when the program is killed by a signal not handled by Python, when a Python fatal internal error is detected, or when os._exit() is called.

So how can we clean up a child process? This actually took me quite some time to dig through multiprocessing code, and the clue is on multiprocessing.util.Finalizer:

def __init__(self, obj, callback, args=(), kwargs=None, exitpriority=None):

    assert exitpriority is None or type(exitpriority) is int


    if obj is not None:

        self._weakref = weakref.ref(obj, self)

    else:

        assert exitpriority is not None



    self._callback = callback

    self._args = args

    self._kwargs = kwargs or {}

    self._key = (exitpriority, next(_finalizer_counter))

    self._pid = os.getpid()

    _finalizer_registry[self._key] = self

 
Finalizer will register itself into _finalizer_registry, and upon existing, all registered finalizers will be executed; Finalizer also has a weakref, the callback method will be called when the weak-referenced object is to be finalized. 

The complete solution is as follows:

import mppool_1

import os

from random import randint

gv=None

class Foo:

    pass


def longRunningFunc(n):


    import time

    time.sleep(randint(1,10))

    print("in process:"+str(os.getpid())+" global variable is:"+str(gv)) 


    mppool_1.gv1 +=mppool_1.gv1

    print("in process:"+str(os.getpid())+" global variable in imported is:"+str(mppool_1.gv1))

    return n



foo=Foo()

def exit_handler():

    print("finished process:"+str(os.getpid()))



def _workInitialize():

    print("initialize:"+str(os.getpid()))



    from multiprocessing.util import Finalize
#create a Finalize object, the first parameter is an object referenced 
#by weakref, this can be anything, just make sure this object will be alive 
#during the time when the process is alive 

    Finalize(foo, exit_handler, exitpriority=0)



def work():

    from multiprocessing import Pool

    pool = Pool(processes=2, initializer=_workInitialize) #

    ret=pool.map(longRunningFunc,range(4))

    print("pool is finished")

    print(ret)


gv=5

print("invoked before __main__block by process:"+str(os.getpid())+" global variable is:"+str(gv))

# A tip to debug multiprocessing, you can use the following to enable the package to log details
from multiprocessing import  util
util.log_to_stderr(util.SUBDEBUG)


if __name__ == '__main__':

    gv=
6
   
print("invoked in __main__ block by process:"+str(os.getpid())+" global variable is:"+str(gv))
    work()

#parent process will get to call ateixt
import atexit
atexit.register(exit_handler)


PS: my python version is:
vagrant@stock:/stock$ python --version
Python 3.5.2 :: Anaconda 4.2.0 (64-bit)










No comments:

Post a Comment