/[ascend]/trunk/pygtk/selftest.py
ViewVC logotype

Contents of /trunk/pygtk/selftest.py

Parent Directory Parent Directory | Revision Log Revision Log


Revision 532 - (show annotations) (download) (as text)
Mon Apr 24 02:23:08 2006 UTC (14 years, 1 month ago) by johnpye
File MIME type: text/x-python
File size: 6635 byte(s)
Removed 'interface' directory in trunk/pygtk/interface (moved everything up a level)
Made corresponding changes to SCons* and spec files.
1 import os, os.path, mmap, re
2 import threading, heapq
3 import time, platform
4
5 import sys, dl
6 # This sets the flags for dlopen used by python so that the symbols in the
7 # ascend library are made available to libraries dlopened within ASCEND:
8 sys.setdlopenflags(dl.RTLD_GLOBAL|dl.RTLD_NOW)
9 import ascpy
10
11 # Our input will be the ASCENDLIBRARY environment variable
12 # plus perhaps some commandline options relating to exclusions and inclusions.
13 # This program needs to search files in the given directory and hunt for
14 # A4C and A4L files that contain MODELs that contain 'METHOD self_test'
15 # methods. We then create instances of any such models and run these
16 # methods. The methods should include commands which we can trap via
17 # some kind of callback, reporting failure or success.
18
19 TEST_METHOD_NAME='self_test';
20 METHOD_RE = '\\s*METHOD\\s+'+TEST_METHOD_NAME+'\\s*;';
21 METHOD_REGEXP = re.compile(METHOD_RE);
22
23 #---------------------------------------------------------------------
24 # TASK CLASSES
25
26 # These classes represent tasks that will be added to the queue. They
27 # just need to have __init__ and run methods, they will be queued and
28 # run by the thread manager.
29
30
31 # Scan a directory, store any a4c or a4l files in the queue,
32 # also store and subdirectories in the queue, they will be
33 # recursed into.
34 class AscendScanDirectory:
35 global jobs, jobslock
36 def __init__(self,path):
37 self.path=path
38
39 def run(self):
40 print "Scanning",self.path,"..."
41 for f in os.listdir(self.path):
42 if f==".svn" or f=="CVS" or f=="westerberg":
43 continue
44 if os.path.isdir( os.path.join(self.path,f) ):
45 jobslock.acquire()
46 heapq.heappush( jobs,( 30, AscendScanDirectory(os.path.join(self.path,f)) ) )
47 jobslock.release()
48 else:
49 stem,ext = os.path.splitext(f)
50 if ext==".a4c" or ext==".a4l":
51 jobslock.acquire()
52 heapq.heappush( jobs,( 10, AscendInspectFile(os.path.join(self.path,f)) ) )
53 jobslock.release()
54
55 # Ascend file inspection task: load the file and check for 'METHOD self_test'
56 # somewhere in the file.
57
58
59 class AscendInspectFile:
60 global jobs,jobslock;
61 global inspectlock;
62
63 def __init__(self,filepath):
64 self.filepath = filepath;
65
66 def run(self):
67 inspectlock.acquire()
68 try:
69 (path,file) = os.path.split(self.filepath)
70 #print "Checking",self.filepath,"for '"+TEST_METHOD_NAME+"' method"
71 size = os.path.getsize(self.filepath)
72 #print "File size is",size
73 if size > 0:
74 f = open(self.filepath,'r')
75 s = mmap.mmap(f.fileno(),size,mmap.MAP_SHARED,mmap.PROT_READ)
76
77 if METHOD_REGEXP.search(s):
78 print "Found 'METHOD "+TEST_METHOD_NAME+"' in",file
79 jobslock.acquire()
80 heapq.heappush(jobs, (5, AscendTestFile(self.filepath)) )
81 jobslock.release()
82 else:
83 pass #print "File",file," is not self-testing"
84 f.close()
85 except IOError:
86 print "IOError"
87 inspectlock.release()
88
89
90 # This 'task class' will use ASCEND to load the specified model
91 # then some sub-task will read off the list of models and methods
92 # and for each model that includes self_test, will run that
93 # method and check the result.
94 class AscendTestFile:
95 global jobs, jobslock, ascendlock, library;
96
97 def __init__(self,filepath):
98 self.filepath = filepath
99
100 def run(self):
101 ascendlock.acquire()
102 L = ascpy.Library()
103 L.clear()
104 L.load(self.filepath)
105 for M in L.getModules():
106 print "Looking at module '"+M.getName()+"'"
107 for t in L.getModuleTypes(M):
108 #print "Looking at type '"+str(t.getName())+"'"
109 for m in t.getMethods():
110 #print "Looking at method '"+str(m.getName())+"'"
111 if m.getName()==TEST_METHOD_NAME:
112 jobslock.acquire()
113 heapq.heappush(jobs, (0, AscendTestModel( self.filepath,t.getName().toString())) )
114 jobslock.release()
115 ascendlock.release()
116
117 # Run self_test on a given model
118 class AscendTestModel:
119 global jobs, jobslock;
120
121 def __init__(self,filepath,modelname):
122 self.filepath=filepath
123 self.modelname=modelname
124
125 def run(self):
126 ascendlock.acquire()
127 try:
128 L = ascpy.Library()
129 L.clear()
130 L.load(self.filepath)
131 t = L.findType(self.modelname)
132 testmethod = None
133 for m in t.getMethods():
134 if m.getName()==TEST_METHOD_NAME:
135 testmethod = m;
136 if not testmethod:
137 raise RuntimeError("No method '"+TEST_METHOD_NAME+"' found")
138
139 s = t.getSimulation('testsim');
140 #s.check()
141 s.build();
142 print "LAUNCHING SOLVER...\n\n"
143 r = ascpy.SolverReporter()
144 s.solve(ascpy.Solver('QRSlv'),r)
145 s.run(testmethod)
146 except RuntimeError, e:
147 print e
148
149 ascendlock.release()
150
151
152 #---------------------------------------------------------------
153 # Thread runner
154
155 class AscendModelLibraryTester:
156 def __init__(self):
157 global jobs, jobslock, inspectlock, ascendlock
158
159 jobs = []
160
161 self.runlock = threading.Lock()
162 jobslock = threading.Lock()
163 inspectlock = threading.Lock()
164 ascendlock = threading.Lock()
165
166 self.runlock.acquire()
167 self.path = os.environ['ASCENDLIBRARY']
168 self.threads = []
169
170 if platform.system()=='Windows':
171 self.separator=";"
172 else:
173 self.separator=":"
174
175 print 'Search path:',self.path
176 for p in self.path.split(self.separator):
177 j = AscendScanDirectory(p)
178 jobslock.acquire()
179 heapq.heappush(jobs, (20,j) )
180 jobslock.release()
181
182 def run(self,num_threads):
183 for i in range(num_threads):
184 t = AscendModelLibraryTester.DoWork(i) # new thread
185 self.threads.append(t)
186 t.start()
187
188 for i in range(num_threads):
189 self.threads[i].join()
190
191 self.runlock.release()
192
193 class DoWork(threading.Thread): # thread number tn
194 global jobs, jobslock, max_threads
195 moreThreads = threading.Event()
196 lock = threading.Lock()
197
198 def __init__(self,tn):
199 threading.Thread.__init__(self)
200 self.threadnum = tn
201
202 def run(self):
203 while 1:
204
205 jobslock.acquire()
206 try:
207 index, job = heapq.heappop(jobs)
208 except IndexError:
209 jobslock.release()
210 #print "No jobs left for thread",self.threadnum
211 return
212 jobslock.release()
213
214 job.run()
215
216 # The following little extra bit means that this file can be automatically
217 # be used as part of a larger unittesting process. So if there are
218 # CUnit tests, and they can be executed somehow via Python, then we
219 # test everything in ASCEND at once.
220
221 import unittest
222
223 class TestAscendModelLibrary(unittest.TestCase):
224
225 def testAllModelsInPath(self):
226 t = AscendModelLibraryTester();
227 t.run(5); # five threads
228 print 'Waiting for ASCEND Model Library Tester to complete...'
229 t.runlock.acquire()
230
231 if __name__=='__main__':
232 unittest.main()

john.pye@anu.edu.au
ViewVC Help
Powered by ViewVC 1.1.22