summaryrefslogtreecommitdiff
path: root/cad/src/experimental/animations/jobqueue.py
blob: cb919c47f3f020c41f6c902f7b6bfd421b23c690 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
# Copyright 2004-2007 Nanorex, Inc.  See LICENSE file for details.

"""\
jobqueue.py

$Id$

Python code for parallelizing various kinds of jobs on a cluster of
Linux/Unix/Mac machines. Initially being used for raytracing jobs, but
should be useful for other things, perhaps parallel simulations some
day.
"""

import os, sys, types, threading, time, string

worker_list = [ 'localhost' ]

DEBUG = 0
if os.environ.has_key("DEBUG"):
    DEBUG = string.atoi(os.environ["DEBUG"])

all_workers_stop = False

def do(cmd):
    print cmd
    if os.system(cmd) != 0:
        raise Exception(cmd)

class Worker(threading.Thread):

    def __init__(self, jobqueue, machine, workdir):
        threading.Thread.__init__(self)
        self.machine = machine
        self.jobqueue = jobqueue
        self.workdir = workdir
        if machine in ('localhost', '127.0.0.1'):
            self.do = do
            self.get = self._local_get
            self.put = self._local_put
        else:
            self.do = self._remote_do
            self.get = self._remote_get
            self.put = self._remote_put

    def _remote_do(self, cmd):
        do('ssh ' + self.machine + ' ' + cmd)

    def _local_put(self, filelist, srcdir):
        # Transfer files from host to worker, NON DESTRUCTIVELY
        do('for x in %s; do cp %s/$x %s; done' %
           (" ".join(filelist), srcdir, self.workdir))

    def _local_get(self, filelist, dstdir):
        # Transfer files from host to worker
        do('for x in %s; do mv %s/$x %s; done' %
           (" ".join(filelist), self.workdir, dstdir))

    def _remote_put(self, filelist, srcdir):
        # Transfer files from host to worker
        do('(cd %s; tar cf - %s) | gzip | ssh %s "(cd %s; gunzip | tar xf -)"' %
           (srcdir, " ".join(filelist), self.machine, self.workdir))

    def _remote_get(self, filelist, dstdir):
        # Transfer files from host to worker
        do('ssh %s "(cd %s; tar cf - %s)" | gzip | (cd %s; gunzip | tar xf -)' %
           (self.machine, self.workdir, " ".join(filelist), dstdir))

    # Each worker grabs a new jobs as soon as he finishes the previous
    # one. This allows mixing of slower and faster worker machines;
    # each works at capacity.
    def run(self):
        global all_workers_stop
        self.do('mkdir -p ' + self.workdir)
        while not all_workers_stop:
            job = self.jobqueue.get()
            if job is None:
                return
            try:
                job.go(self)
                self.do('rm -rf ' + self.workdir)
                self.do('mkdir -p ' + self.workdir)
            except:
                all_workers_stop = True
                raise

_which_job = 0

class Job:

    def __init__(self, srcdir, dstdir, inputfiles, outputfiles):
        global _which_job
        self.index = index = _which_job
        _which_job += 1
        self.srcdir = srcdir
        self.dstdir = dstdir
        self.inputfiles = inputfiles
        self.outputfiles = outputfiles

    def shellScript(self):
        raise Exception, 'overload me'

    def preJob(self, worker):
        pass

    def postJob(self, worker):
        pass

    def go(self, worker):
        self.preJob(worker)
        scriptname = 'job_%08d.sh' % self.index
        longname = os.path.join(self.srcdir, scriptname)
        script = ("(cd " + worker.workdir + "\n" +
                  (self.shellScript()) + ")\n")
        if DEBUG >= 1:
            print worker.machine + ' <<<\n' + script + '>>>'
        outf = open(longname, 'w')
        outf.write(script)
        outf.close()
        os.system('chmod +x ' + longname)
        worker.put(self.inputfiles + [ scriptname ], self.srcdir)
        worker.do(os.path.join(worker.workdir, scriptname))
        worker.get(self.outputfiles, self.dstdir)
        self.postJob(worker)


class JobQueue:

    def __init__(self, _worker_list=None):
        if _worker_list is None:
            _worker_list = worker_list
        self.worker_pool = worker_pool = [ ]
        self.jobqueue = [ ]
        self._lock = threading.Lock()
        for macdir in _worker_list:
            try:
                machine, workdir = macdir
            except:
                machine = macdir
                assert type(machine) is types.StringType
                workdir = '/tmp/jobqueue'
            worker = Worker(self, machine, workdir)
            worker_pool.append(worker)

    def append(self, job):
        self._lock.acquire()   # thread safety
        self.jobqueue.append(job)
        self._lock.release()
    def get(self):
        self._lock.acquire()   # thread safety
        try:
            r = self.jobqueue.pop(0)
        except IndexError:
            r = None
        self._lock.release()
        return r

    def start(self):
        for worker in self.worker_pool:
            worker.start()

    def wait(self):
        busy_workers = self.worker_pool[:]
        while True:
            busy_workers = filter(lambda x: x.isAlive(),
                                  busy_workers)
            if len(busy_workers) == 0:
                break
            if all_workers_stop:
                raise Exception('somebody stopped')
            time.sleep(0.5)

def runjobs(mydir, infiles, outfiles, script):
    """Here is a simple usage, if each job takes one input file and
    produces one output file, and if we use the same directory for
    srcdir and dstdir.
    """
    q = JobQueue(worker_list)
    for ifile, ofile in map(None, infiles, outfiles):
        class MyJob(Job):
            def shellScript(self, script=script, ifile=ifile, ofile=ofile):
                return script % { 'ifile': ifile,
                                  'ofile': ofile }
        q.append(MyJob(mydir, mydir, [ifile], [ofile]))
    q.start()
    q.wait()

# ============================================================

if __name__ == "__main__":

    worker_list = [ 'localhost' ] # , 'server', 'mac', 'laptop' ]
    mydir = '/tmp/tryit'
    os.system('rm -rf ' + mydir)
    os.system('mkdir -p ' + mydir)

    N = 20
    inputfiles = [ ]
    outputfiles = [ ]
    for i in range(N):
        ifile = 'input%03d' % i
        ofile = 'output%03d' % i
        inputfiles.append(ifile)
        outputfiles.append(ofile)
        outf = open(os.path.join(mydir, ifile), 'w')
        outf.write('hello\n')
        outf.close()

    runjobs(mydir, inputfiles, outputfiles,
            """sleep 20
            if [ -f %(ifile)s ]; then
                cp %(ifile)s %(ofile)s
            else
                echo BAD > %(ofile)s
            fi
            """)

    # verify correct outputs, and clean up
    for ofile in outputfiles:
        assert open(os.path.join(mydir, ofile)).read() == 'hello\n'