summaryrefslogtreecommitdiff
path: root/openpcrlib.py
blob: 5db20d457aa90f8d981f22114b7cbeedbc786115 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
import time, sys, os.path, json
from subprocess import check_output

class OpenPCR:
    def __init__(self,devicepath=''):
        self.platform = sys.platform[:3]
        # Linux 	'lin'
        # Windows	'win'
        # Cygwin 	'cyg'
        # Mac OS X	'dar'
        # OS/2 (/EMX)	'os2'
        
        if not devicepath: #i.e. if none was passed at object initiation..
            self.devicepath = self.DefaultPlatformPath()
        else:
            self.devicepath = devicepath # Just assume user knows what s/he is doing..

        if not os.path.exists(self.devicepath):
            print("Directory specified (default /media/OPENPCR) does not exist. Is OpenPCR turned on?")
            self.devicepath = ''
        assert(self.devicepath)

    def DefaultPlatformPath(self):
        'Checks "usual" mount locations on supported platforms and returns directory if successful.'
        if self.platform == 'lin':
            return '/media/OPENPCR/' # Ubuntu mountpoint, at least.
            # TODO: Cascade through potential mountpoints on other linuxes, using os.path.exists()?
        elif self.platform == 'win':
            print("Windows not yet supported. You can try manually invoking the OpenPCR() object with a "+\
                  "'devicepath' argument consisting of the openpcr mountpoint, if you like; be sure to "+\
                  "share your results if it works!")
            return ''
        elif self.platform == 'dar':
            print("Mac/Darwin not yet supported. You can try manually invoking the OpenPCR() object with a "+\
                  "'devicepath' argument consisting of the openpcr mountpoint, if you like; be sure to "+\
                  "share your results if it works!")
            return ''
        elif self.platform == 'os2':
            print("OS/2 not yet supported. You can try manually invoking the OpenPCR() object with a "+\
                  "'devicepath' argument consisting of the openpcr mountpoint, if you like; be sure to "+\
                  "share your results if it works!\n"+\
                  "Please note, there is no 'ncc' binary for OS2, so reading OpenPCR status is not possible.")
            return ''
        elif self.platform == 'cyg':
            print("Cygwin not yet supported. You can try manually invoking the OpenPCR() object with a "+\
                  "'devicepath' argument consisting of the openpcr mountpoint, if you like; be sure to "+\
                  "share your results if it works!")
            return ''

    def sendprogram(self,program):
        'Sends a program to the OpenPCR and prints a verification if successful.'
        assert(self.devicepath)
        Status = self.readstatus()
        CurrentNonce = Status['nonce']
#        CurrentNonce = 65534 # For debugging purposes only, replace with the above..
        if CurrentNonce < 65535: # Max int size on arduino; wrap around to avoid bugs.
            NewNonce = CurrentNonce + 1
        else:
            NewNonce = 1
        NonceCMD = 'd=' + str(NewNonce)
        
        # Chop up program string, insert nonce, and reassemble for sending.
        dissectedprogram = program.split("&")
        for item in dissectedprogram:
            if item[0]=='d': # If there's already a nonce in this program for some reason..
                print("Found existing nonce value ('d=xxx') in program. Removing..")
                noncelistindex = dissectedprogram.index(item) # Find index to delete first..
                del(dissectedprogram[noncelistindex]) # Then kill it with fire.

        dissectedprogram.insert(1,NonceCMD) # Add new nonce to program.
        assembledprogram = "&".join(dissectedprogram) # Reassembles YAML-formatted program.
        self._sendprogram(assembledprogram)

        time.sleep(2) # If no waiting time is given, readstatus fails.
        recurse = 0
        def checkNonce():
            try:
                Status = self.readstatus()
                if Status['nonce'] == NewNonce:
                    print("Program sent successfully!")
                else:
                    print("Nonce values unchanged; program may not have sent correctly.")
            except ValueError:
                if recurse < 6: # Max 5 seconds wait after initial second.
                    print("Waiting for OpenPCR to respond..")
                    time.sleep(1)
                    recurse += 1
                    checkNonce() # Recursive function calls, yay! (Puppies may die.)
                else:
                    print("OpenPCR not responding to checkstatus calls. Giving up.")
        checkNonce() # Start checking.

    def _sendprogram(self,program):
        assert(self.devicepath)
        with open(self.devicepath+'CONTROL.TXT', encoding='utf-8', mode='w') as ProgramPipe:
            ProgramPipe.write(program)

    def test(self):
        assert(self.devicepath)
        self.sendprogram('s=ACGTC&c=start&n=OpenPyCR Test Program&p=(20[10|45|TestStep1][20|25|TestStep2])')

    def stop(self):
        assert(self.devicepath)
        self.sendprogram('s=ACGTC&c=stop')

    def ncc(self):
        'Low-level. Calls ncc binary for appropriate platform, returns raw output as string.'
        if (self.platform == 'lin') or (self.platform == 'dar'): # Assuming that mac uses same binary here..
            try:
                ByteStatus = check_output(["./ncc", self.devicepath+"STATUS.TXT"])
            except:
                print("'ncc' binary exited with an error. Platform may be incompatible.")
        elif (self.platform == 'win') or (self.platform == 'cyg'): # Assuming cygwin calls windows binary.
            try:
                ByteStatus = check_output(["./ncc.exe", self.devicepath+"STATUS.TXT"]) # Totally untested.
            except:
                print("'ncc.exe' binary exited with an error. Platform may be incompatible.")
        else:
            print("System not supported.")
            return None
        StrStatus = str(ByteStatus, encoding='utf-8')
        return StrStatus

    def readstatus(self):
        'Calls ncc and translates output into a dictionary of values.'
        assert(self.devicepath)
        statustxt = self.ncc()
        statusdict = {}
        for entry in statustxt.split("&"):
            key, val = entry.split("=")
            if key == 's': # State: running, idle, etc.
                statusdict['state'] = val
            elif key == 't': # Job: Heating/Cooling/Holding
                statusdict['job'] = val
            elif key == 'b': # Temperature of the block; Celsius.
                statusdict['blocktemp'] = float(val)
            elif key == 'l': # Temperature of the lid; Celsius.
                statusdict['lidtemp'] = float(val)
            elif key == 'e': # Elapsed seconds.
                statusdict['elapsedsecs'] = int(val)
            elif key == 'r': # Remaining seconds (accuracy?).
                secs = int(val) # Converts from string to int.
                mins = int(secs/60) # int() omits decimals.
                hours = int(mins/60)
                statusdict['secsleft'] = secs
                statusdict['minsleft'] = mins
                statusdict['hoursleft']= hours
                 # Remainders, for hours:mins:seconds
                extramins = mins - (hours*60)
                extrasecs = secs - (mins*60)
                formattedtime = '{0}:{1}:{2}'.format(hours,extramins,extrasecs)
                statusdict['timeleft'] = formattedtime
            elif key == 'p': # Name of current step, if given.
                statusdict['currentstep'] = val
            elif key == 'c': # Cycle number.
                statusdict['cycle'] = int(val)
            elif key == 'n': # Program name, if given.
                statusdict['program'] = val
            elif key == 'd':
                # The "number-used-once" that helps verify program sending.
                # No other function, generally safe to ignore except when
                # testing program delivery.
                statusdict['nonce'] = int(val)
        return statusdict

    def csvstatus(self,keyorder = ['elapsedsecs','cycle','blocktemp']):
        '''Calls readstatus and formats output for logging to csv.

        keyorder is a list of dictionary keys to use, in desired order, when
        formatting output. The default provides the elapsed time in seconds,
        current cycle number, and temperature of the block.'''
        assert(self.devicepath)
        S = self.readstatus()
        if (S['state'] == 'stopped') or (S['job'] == 'idle'):
            return 'Inactive' # OpenPCR not running, will probably raise keyerrors below if we proceed.
        if (S['state'] == 'complete'):
            return 'Complete'
        thisline = []
        try:
            for key in keyorder:
                thisline.append(str(S[key]))
            return ','.join(thisline)
        except KeyError:
            print("Provided keys invalid. Available keys are: state, job, blocktemp, lidtemp, elapsedsecs, secsleft, minsleft, hoursleft, timeleft, currentstep, cycle, program, nonce.")

    def printstatus(self):
        'Calls readstatus and prints useful information to stdout.'
        assert(self.devicepath)
        S = self.readstatus()
        if S['state'] == 'complete':
            print("Program complete.")
            return None
        if S['state'] == 'stopped':
            print("System idle. No program running.")
            return None
        PrettyStatus = '''Current Program: {0}
 Step '{1}' of cycle {2}
 Currently: {3}
 Block: {4}°C, Lid: {5}°C
 Remaining Time: {6}'''.format(S['program'],S['currentstep'],str(S['cycle']),S['job'],str(S['blocktemp']),str(S['lidtemp']),S['timeleft'])
        print(PrettyStatus)