Monthly Archives: July 2013

Nonblocking Algorithms for Multicore processors

Came across this awesome article on ACM Queue: http://queue.acm.org/detail.cfm?id=2492433. This is a very nice exposition of the topic with example code and detailed walk through of the effects of multiple access on shared state. The other interesting piece in this same context is of course transactional memory in Intel’s Haswell processor described here: http://www.realworldtech.com/haswell-tm/

 

Advertisements

Parallel Directory Tree Compare in Python

A friend asked me if there is any way to quickly use multiple threads to compare two directory trees. He was dealing with data migration scenarios with directory trees containing potentially millions of files and he would like to compare the copy with the original. A normal recursive diff would take days.

After trying out a few experiments with shell scripts and some such it finally boiled down to Python and it’s great multiprocessing module to get the work done. The full code is pasted below and is mostly self-explanatory. On my dual-core hyperthreaded laptop it takes half the time as compared to a recursive diff (with 26000 files). However this thing is primarily I/O bound and the tinny laptop’s drive cannot keep up with the I/O demands from multiple threads. A good server will give decent speedup compared to normal diff.

import os, sys
import hashlib
import stat
import multiprocessing
from multiprocessing import Process, JoinableQueue

num_procs = multiprocessing.cpu_count()
num_procs *= 4

def get_hash(fh, sz):
        h = hashlib.sha256()
        while True:
                buf = fh.read(65536)
                if len(buf) == 0: break
                h.update(buf)
        return h.digest()

def chk(path, lf, dir):
        path1 = os.path.join(dir, path)
        st = None
        st1 = None
        try:
                st = os.lstat(path)
                st1 = os.lstat(path1)
        except:
                lf.write("Missing: " + path1 + "\n")
                lf.flush()
                return

        if not stat.S_ISREG(st.st_mode):
                return
        if st.st_size != st1.st_size:
                lf.write("Size differ: " + path1 + "\n")
                lf.flush()
                return
        if st.st_size == 0: return

        hv = None
        hv1 = None
        try:
                fh = open(path, "r")
                hv = get_hash(fh, st.st_size)
                fh.close()
                fh = open(path1, "r")
                hv1 = get_hash(fh, st.st_size)
                fh.close()
        except:
                lf.write("Open error: " + path1 + "\n")
                lf.flush()
                return

        if hv != hv1:
                lf.write("Digests differ: " + path1 + "\n")
                lf.flush()

def proc_chk(q, lf, dir):
        while True:
                path1 = q.get()
                if path1 == "done":
                        break
                chk(path1, lf, dir)
                q.task_done()
        q.task_done()

q = JoinableQueue()
lf = open("/var/tmp/differ_files", "w+")
dir1 = os.path.realpath(sys.argv[1])
dir2 = os.path.realpath(sys.argv[2])

o_cwd = os.getcwd()
os.chdir(dir1)
cwd = os.getcwd()

for i in range(0, num_procs):
        p = Process(target=proc_chk, args=(q, lf, dir2))
        p.start()

for dirpath, dirnames, filenames in os.walk(".", followlinks=False):
        for f in filenames:
                q.put(os.path.join(dirpath, f))

for i in range(0, num_procs):
        q.put("done")q.join()
lf.close()
os.chdir(o_cwd)

It should be called like this: python pdiff.py <dir1> <dir2> . It will generate a list of any differing files in /var/tmp/differ_files.