/[ascend]/trunk/pygtk/interface/selftest.py
ViewVC logotype

Contents of /trunk/pygtk/interface/selftest.py

Parent Directory Parent Directory | Revision Log Revision Log


Revision 151 - (show annotations) (download) (as text)
Thu Dec 22 10:58:33 2005 UTC (18 years, 6 months ago) by johnpye
File MIME type: text/x-python
File size: 6592 byte(s)
Working on a test case for bug #178
1 import os, os.path, mmap, re
2 import threading, heapq
3 import time, platform
4
5 import sys, dl
6 # This sets the flags for dlopen used by python so that the symbols in the
7 # ascend library are made available to libraries dlopened within ASCEND:
8 sys.setdlopenflags(dl.RTLD_GLOBAL|dl.RTLD_NOW)
9 import ascend
10
11 # Our input will be the ASCENDLIBRARY environment variable
12 # plus perhaps some commandline options relating to exclusions and inclusions.
13 # This program needs to search files in the given directory and hunt for
14 # A4C and A4L files that contain MODELs that contain 'METHOD self_test'
15 # methods. We then create instances of any such models and run these
16 # methods. The methods should include commands which we can trap via
17 # some kind of callback, reporting failure or success.
18
19 TEST_METHOD_NAME='self_test';
20 METHOD_RE = '\\s*METHOD\\s+'+TEST_METHOD_NAME+'\\s*;';
21 METHOD_REGEXP = re.compile(METHOD_RE);
22
23 #---------------------------------------------------------------------
24 # TASK CLASSES
25
26 # These classes represent tasks that will be added to the queue. They
27 # just need to have __init__ and run methods, they will be queued and
28 # run by the thread manager.
29
30
31 # Scan a directory, store any a4c or a4l files in the queue,
32 # also store and subdirectories in the queue, they will be
33 # recursed into.
34 class AscendScanDirectory:
35 global jobs, jobslock
36 def __init__(self,path):
37 self.path=path
38
39 def run(self):
40 print "Scanning",self.path,"..."
41 for f in os.listdir(self.path):
42 if f==".svn" or f=="CVS" or f=="westerberg":
43 continue
44 if os.path.isdir( os.path.join(self.path,f) ):
45 jobslock.acquire()
46 heapq.heappush( jobs,( 30, AscendScanDirectory(os.path.join(self.path,f)) ) )
47 jobslock.release()
48 else:
49 stem,ext = os.path.splitext(f)
50 if ext==".a4c" or ext==".a4l":
51 jobslock.acquire()
52 heapq.heappush( jobs,( 10, AscendInspectFile(os.path.join(self.path,f)) ) )
53 jobslock.release()
54
55 # Ascend file inspection task: load the file and check for 'METHOD self_test'
56 # somewhere in the file.
57
58
59 class AscendInspectFile:
60 global jobs,jobslock;
61 global inspectlock;
62
63 def __init__(self,filepath):
64 self.filepath = filepath;
65
66 def run(self):
67 inspectlock.acquire()
68 try:
69 (path,file) = os.path.split(self.filepath)
70 #print "Checking",self.filepath,"for '"+TEST_METHOD_NAME+"' method"
71 size = os.path.getsize(self.filepath)
72 #print "File size is",size
73 if size > 0:
74 f = open(self.filepath,'r')
75 s = mmap.mmap(f.fileno(),size,mmap.MAP_SHARED,mmap.PROT_READ)
76
77 if METHOD_REGEXP.search(s):
78 print "Found 'METHOD "+TEST_METHOD_NAME+"' in",file
79 jobslock.acquire()
80 heapq.heappush(jobs, (5, AscendTestFile(self.filepath)) )
81 jobslock.release()
82 else:
83 pass #print "File",file," is not self-testing"
84 f.close()
85 except IOError:
86 print "IOError"
87 inspectlock.release()
88
89
90 # This 'task class' will use ASCEND to load the specified model
91 # then some sub-task will read off the list of models and methods
92 # and for each model that includes self_test, will run that
93 # method and check the result.
94 class AscendTestFile:
95 global jobs, jobslock, ascendlock, library;
96
97 def __init__(self,filepath):
98 self.filepath = filepath
99
100 def run(self):
101 ascendlock.acquire()
102 L = ascend.Library()
103 L.clear()
104 L.load(self.filepath)
105 for M in L.getModules():
106 print "Looking at module '"+M.getName()+"'"
107 for t in L.getModuleTypes(M):
108 #print "Looking at type '"+str(t.getName())+"'"
109 for m in t.getMethods():
110 #print "Looking at method '"+str(m.getName())+"'"
111 if m.getName()==TEST_METHOD_NAME:
112 jobslock.acquire()
113 heapq.heappush(jobs, (0, AscendTestModel( self.filepath,t.getName().toString())) )
114 jobslock.release()
115 ascendlock.release()
116
117 # Run self_test on a given model
118 class AscendTestModel:
119 global jobs, jobslock;
120
121 def __init__(self,filepath,modelname):
122 self.filepath=filepath
123 self.modelname=modelname
124
125 def run(self):
126 ascendlock.acquire()
127 try:
128 L = ascend.Library()
129 L.clear()
130 L.load(self.filepath)
131 t = L.findType(self.modelname)
132 testmethod = None
133 for m in t.getMethods():
134 if m.getName()==TEST_METHOD_NAME:
135 testmethod = m;
136 if not testmethod:
137 raise RuntimeError("No method '"+TEST_METHOD_NAME+"' found")
138
139 s = t.getSimulation('testsim');
140 #s.check()
141 print "LAUNCHING SOLVER...\n\n"
142 s.solve(ascend.Solver('CMSlv'))
143 s.run(testmethod)
144 except RuntimeError, e:
145 print e
146
147 ascendlock.release()
148
149
150 #---------------------------------------------------------------
151 # Thread runner
152
153 class AscendModelLibraryTester:
154 def __init__(self):
155 global jobs, jobslock, inspectlock, ascendlock
156
157 jobs = []
158
159 self.runlock = threading.Lock()
160 jobslock = threading.Lock()
161 inspectlock = threading.Lock()
162 ascendlock = threading.Lock()
163
164 self.runlock.acquire()
165 self.path = os.environ['ASCENDLIBRARY']
166 self.threads = []
167
168 if platform.system()=='Windows':
169 self.separator=";"
170 else:
171 self.separator=":"
172
173 print 'Search path:',self.path
174 for p in self.path.split(self.separator):
175 j = AscendScanDirectory(p)
176 jobslock.acquire()
177 heapq.heappush(jobs, (20,j) )
178 jobslock.release()
179
180 def run(self,num_threads):
181 for i in range(num_threads):
182 t = AscendModelLibraryTester.DoWork(i) # new thread
183 self.threads.append(t)
184 t.start()
185
186 for i in range(num_threads):
187 self.threads[i].join()
188
189 self.runlock.release()
190
191 class DoWork(threading.Thread): # thread number tn
192 global jobs, jobslock, max_threads
193 moreThreads = threading.Event()
194 lock = threading.Lock()
195
196 def __init__(self,tn):
197 threading.Thread.__init__(self)
198 self.threadnum = tn
199
200 def run(self):
201 while 1:
202
203 jobslock.acquire()
204 try:
205 index, job = heapq.heappop(jobs)
206 except IndexError:
207 jobslock.release()
208 print "No jobs left for thread",self.threadnum
209 return
210 jobslock.release()
211
212 job.run()
213
214 # The following little extra bit means that this file can be automatically
215 # be used as part of a larger unittesting process. So if there are
216 # CUnit tests, and they can be executed somehow via Python, then we
217 # test everything in ASCEND at once.
218
219 import unittest
220
221 class TestAscendModelLibrary(unittest.TestCase):
222
223 def testAllModelsInPath(self):
224 t = AscendModelLibraryTester();
225 t.run(5); # five threads
226 print 'Waiting for ASCEND Model Library Tester to complete...'
227 t.runlock.acquire()
228
229 if __name__=='__main__':
230 unittest.main()

john.pye@anu.edu.au
ViewVC Help
Powered by ViewVC 1.1.22