# Copyright 2004-2007 Nanorex, Inc.  See LICENSE file for details. 

"""\
jobqueue.py

$Id$

Python code for parallelizing various kinds of jobs on a cluster of
Linux/Unix/Mac machines. Initially being used for raytracing jobs, but
should be useful for other things, perhaps parallel simulations some
day.
"""

import os, sys, types, threading, time, string

worker_list = [ 'localhost' ]

DEBUG = 0
if os.environ.has_key("DEBUG"):
    DEBUG = string.atoi(os.environ["DEBUG"])

all_workers_stop = False

def do(cmd):
    print cmd
    if os.system(cmd) != 0:
        raise Exception(cmd)

class Worker(threading.Thread):

    def __init__(self, jobqueue, machine, workdir):
        threading.Thread.__init__(self)
        self.machine = machine
        self.jobqueue = jobqueue
        self.workdir = workdir
        if machine in ('localhost', '127.0.0.1'):
            self.do = do
            self.get = self._local_get
            self.put = self._local_put
        else:
            self.do = self._remote_do
            self.get = self._remote_get
            self.put = self._remote_put

    def _remote_do(self, cmd):
        do('ssh ' + self.machine + ' ' + cmd)

    def _local_put(self, filelist, srcdir):
        # Transfer files from host to worker, NON DESTRUCTIVELY
        do('for x in %s; do cp %s/$x %s; done' %
           (" ".join(filelist), srcdir, self.workdir))

    def _local_get(self, filelist, dstdir):
        # Transfer files from host to worker
        do('for x in %s; do mv %s/$x %s; done' %
           (" ".join(filelist), self.workdir, dstdir))

    def _remote_put(self, filelist, srcdir):
        # Transfer files from host to worker
        do('(cd %s; tar cf - %s) | gzip | ssh %s "(cd %s; gunzip | tar xf -)"' %
           (srcdir, " ".join(filelist), self.machine, self.workdir))

    def _remote_get(self, filelist, dstdir):
        # Transfer files from host to worker
        do('ssh %s "(cd %s; tar cf - %s)" | gzip | (cd %s; gunzip | tar xf -)' %
           (self.machine, self.workdir, " ".join(filelist), dstdir))

    # Each worker grabs a new jobs as soon as he finishes the previous
    # one. This allows mixing of slower and faster worker machines;
    # each works at capacity.
    def run(self):
        global all_workers_stop
        self.do('mkdir -p ' + self.workdir)
        while not all_workers_stop:
            job = self.jobqueue.get()
            if job is None:
                return
            try:
                job.go(self)
                self.do('rm -rf ' + self.workdir)
                self.do('mkdir -p ' + self.workdir)
            except:
                all_workers_stop = True
                raise

_which_job = 0

class Job:

    def __init__(self, srcdir, dstdir, inputfiles, outputfiles):
        global _which_job
        self.index = index = _which_job
        _which_job += 1
        self.srcdir = srcdir
        self.dstdir = dstdir
        self.inputfiles = inputfiles
        self.outputfiles = outputfiles

    def shellScript(self):
        raise Exception, 'overload me'

    def preJob(self, worker):
        pass

    def postJob(self, worker):
        pass

    def go(self, worker):
        self.preJob(worker)
        scriptname = 'job_%08d.sh' % self.index
        longname = os.path.join(self.srcdir, scriptname)
        script = ("(cd " + worker.workdir + "\n" +
                  (self.shellScript()) + ")\n")
        if DEBUG >= 1:
            print worker.machine + ' <<<\n' + script + '>>>'
        outf = open(longname, 'w')
        outf.write(script)
        outf.close()
        os.system('chmod +x ' + longname)
        worker.put(self.inputfiles + [ scriptname ], self.srcdir)
        worker.do(os.path.join(worker.workdir, scriptname))
        worker.get(self.outputfiles, self.dstdir)
        self.postJob(worker)


class JobQueue:

    def __init__(self, _worker_list=None):
        if _worker_list is None:
            _worker_list = worker_list
        self.worker_pool = worker_pool = [ ]
        self.jobqueue = [ ]
        self._lock = threading.Lock()
        for macdir in _worker_list:
            try:
                machine, workdir = macdir
            except:
                machine = macdir
                assert type(machine) is types.StringType
                workdir = '/tmp/jobqueue'
            worker = Worker(self, machine, workdir)
            worker_pool.append(worker)

    def append(self, job):
        self._lock.acquire()   # thread safety
        self.jobqueue.append(job)
        self._lock.release()
    def get(self):
        self._lock.acquire()   # thread safety
        try:
            r = self.jobqueue.pop(0)
        except IndexError:
            r = None
        self._lock.release()
        return r

    def start(self):
        for worker in self.worker_pool:
            worker.start()

    def wait(self):
        busy_workers = self.worker_pool[:]
        while True:
            busy_workers = filter(lambda x: x.isAlive(),
                                  busy_workers)
            if len(busy_workers) == 0:
                break
            if all_workers_stop:
                raise Exception('somebody stopped')
            time.sleep(0.5)

def runjobs(mydir, infiles, outfiles, script):
    """Here is a simple usage, if each job takes one input file and
    produces one output file, and if we use the same directory for
    srcdir and dstdir.
    """
    q = JobQueue(worker_list)
    for ifile, ofile in map(None, infiles, outfiles):
        class MyJob(Job):
            def shellScript(self, script=script, ifile=ifile, ofile=ofile):
                return script % { 'ifile': ifile,
                                  'ofile': ofile }
        q.append(MyJob(mydir, mydir, [ifile], [ofile]))
    q.start()
    q.wait()

# ============================================================

if __name__ == "__main__":

    worker_list = [ 'localhost' ] # , 'server', 'mac', 'laptop' ]
    mydir = '/tmp/tryit'
    os.system('rm -rf ' + mydir)
    os.system('mkdir -p ' + mydir)

    N = 20
    inputfiles = [ ]
    outputfiles = [ ]
    for i in range(N):
        ifile = 'input%03d' % i
        ofile = 'output%03d' % i
        inputfiles.append(ifile)
        outputfiles.append(ofile)
        outf = open(os.path.join(mydir, ifile), 'w')
        outf.write('hello\n')
        outf.close()

    runjobs(mydir, inputfiles, outputfiles,
            """sleep 20
            if [ -f %(ifile)s ]; then
                cp %(ifile)s %(ofile)s
            else
                echo BAD > %(ofile)s
            fi
            """)

    # verify correct outputs, and clean up
    for ofile in outputfiles:
        assert open(os.path.join(mydir, ofile)).read() == 'hello\n'