Package cherrypy :: Package test :: Module webtest
[hide private]
[frames] | no frames]

Source Code for Module cherrypy.test.webtest

  1  """Extensions to unittest for web frameworks. 
  2   
  3  Use the WebCase.getPage method to request a page from your HTTP server. 
  4   
  5  Framework Integration 
  6  ===================== 
  7   
  8  If you have control over your server process, you can handle errors 
  9  in the server-side of the HTTP conversation a bit better. You must run 
 10  both the client (your WebCase tests) and the server in the same process 
 11  (but in separate threads, obviously). 
 12   
 13  When an error occurs in the framework, call server_error. It will print 
 14  the traceback to stdout, and keep any assertions you have from running 
 15  (the assumption is that, if the server errors, the page output will not 
 16  be of further significance to your tests). 
 17  """ 
 18   
 19  import os, sys, time, re 
 20  import types 
 21  import pprint 
 22  import socket 
 23  import httplib 
 24  import traceback 
 25   
 26  from unittest import * 
 27  from unittest import _TextTestResult 
 28   
 29   
30 -class TerseTestResult(_TextTestResult):
31
32 - def printErrors(self):
33 # Overridden to avoid unnecessary empty line 34 if self.errors or self.failures: 35 if self.dots or self.showAll: 36 self.stream.writeln() 37 self.printErrorList('ERROR', self.errors) 38 self.printErrorList('FAIL', self.failures)
39 40
41 -class TerseTestRunner(TextTestRunner):
42 """A test runner class that displays results in textual form.""" 43
44 - def _makeResult(self):
45 return TerseTestResult(self.stream, self.descriptions, self.verbosity)
46
47 - def run(self, test):
48 "Run the given test case or test suite." 49 # Overridden to remove unnecessary empty lines and separators 50 result = self._makeResult() 51 startTime = time.time() 52 test(result) 53 timeTaken = float(time.time() - startTime) 54 result.printErrors() 55 if not result.wasSuccessful(): 56 self.stream.write("FAILED (") 57 failed, errored = map(len, (result.failures, result.errors)) 58 if failed: 59 self.stream.write("failures=%d" % failed) 60 if errored: 61 if failed: self.stream.write(", ") 62 self.stream.write("errors=%d" % errored) 63 self.stream.writeln(")") 64 return result
65 66
67 -class ReloadingTestLoader(TestLoader):
68
69 - def loadTestsFromName(self, name, module=None):
70 """Return a suite of all tests cases given a string specifier. 71 72 The name may resolve either to a module, a test case class, a 73 test method within a test case class, or a callable object which 74 returns a TestCase or TestSuite instance. 75 76 The method optionally resolves the names relative to a given module. 77 """ 78 parts = name.split('.') 79 if module is None: 80 if not parts: 81 raise ValueError("incomplete test name: %s" % name) 82 else: 83 parts_copy = parts[:] 84 while parts_copy: 85 target = ".".join(parts_copy) 86 if target in sys.modules: 87 module = reload(sys.modules[target]) 88 break 89 else: 90 try: 91 module = __import__(target) 92 break 93 except ImportError: 94 del parts_copy[-1] 95 if not parts_copy: 96 raise 97 parts = parts[1:] 98 obj = module 99 for part in parts: 100 obj = getattr(obj, part) 101 102 if type(obj) == types.ModuleType: 103 return self.loadTestsFromModule(obj) 104 elif (isinstance(obj, (type, types.ClassType)) and 105 issubclass(obj, TestCase)): 106 return self.loadTestsFromTestCase(obj) 107 elif type(obj) == types.UnboundMethodType: 108 return obj.im_class(obj.__name__) 109 elif callable(obj): 110 test = obj() 111 if not isinstance(test, TestCase) and \ 112 not isinstance(test, TestSuite): 113 raise ValueError("calling %s returned %s, " 114 "not a test" % (obj,test)) 115 return test 116 else: 117 raise ValueError("do not know how to make test from: %s" % obj)
118 119 120 try: 121 # On Windows, msvcrt.getch reads a single char without output. 122 import msvcrt
123 - def getchar():
124 return msvcrt.getch()
125 except ImportError: 126 # Unix getchr 127 import tty, termios
128 - def getchar():
129 fd = sys.stdin.fileno() 130 old_settings = termios.tcgetattr(fd) 131 try: 132 tty.setraw(sys.stdin.fileno()) 133 ch = sys.stdin.read(1) 134 finally: 135 termios.tcsetattr(fd, termios.TCSADRAIN, old_settings) 136 return ch
137 138
139 -class WebCase(TestCase):
140 HOST = "127.0.0.1" 141 PORT = 8000 142 HTTP_CONN = httplib.HTTPConnection 143 PROTOCOL = "HTTP/1.1" 144
145 - def set_persistent(self, on=True, auto_open=False):
146 """Make our HTTP_CONN persistent (or not). 147 148 If the 'on' argument is True (the default), then self.HTTP_CONN 149 will be set to an instance of httplib.HTTPConnection (or HTTPS 150 if self.scheme is "https"). This will then persist across requests. 151 152 We only allow for a single open connection, so if you call this 153 and we currently have an open connection, it will be closed. 154 """ 155 try: 156 self.HTTP_CONN.close() 157 except (TypeError, AttributeError): 158 pass 159 160 if self.scheme == "https": 161 cls = httplib.HTTPSConnection 162 else: 163 cls = httplib.HTTPConnection 164 165 if on: 166 host = self.HOST 167 if not host: 168 # The empty string signifies INADDR_ANY, 169 # which should respond on localhost. 170 host = "127.0.0.1" 171 self.HTTP_CONN = cls(host, self.PORT) 172 # Automatically re-connect? 173 self.HTTP_CONN.auto_open = auto_open 174 self.HTTP_CONN.connect() 175 else: 176 self.HTTP_CONN = cls
177
178 - def _get_persistent(self):
179 return hasattr(self.HTTP_CONN, "__class__")
180 - def _set_persistent(self, on=True):
181 self.set_persistent(on)
182 persistent = property(_get_persistent, _set_persistent) 183
184 - def getPage(self, url, headers=None, method="GET", body=None, protocol=None):
185 """Open the url with debugging support. Return status, headers, body.""" 186 ServerError.on = False 187 188 self.url = url 189 host = self.HOST 190 if not host: 191 # The empty string signifies INADDR_ANY, 192 # which should respond on localhost. 193 host = "127.0.0.1" 194 result = openURL(url, headers, method, body, host, self.PORT, 195 self.HTTP_CONN, protocol or self.PROTOCOL) 196 self.status, self.headers, self.body = result 197 198 # Build a list of request cookies from the previous response cookies. 199 self.cookies = [('Cookie', v) for k, v in self.headers 200 if k.lower() == 'set-cookie'] 201 202 if ServerError.on: 203 raise ServerError() 204 return result
205 206 interactive = True 207 console_height = 30 208
209 - def _handlewebError(self, msg):
210 print 211 print " ERROR:", msg 212 213 if not self.interactive: 214 raise self.failureException(msg) 215 216 p = " Show: [B]ody [H]eaders [S]tatus [U]RL; [I]gnore, [R]aise, or sys.e[X]it >> " 217 print p, 218 while True: 219 i = getchar().upper() 220 if i not in "BHSUIRX": 221 continue 222 print i.upper() # Also prints new line 223 if i == "B": 224 for x, line in enumerate(self.body.splitlines()): 225 if (x + 1) % self.console_height == 0: 226 # The \r and comma should make the next line overwrite 227 print "<-- More -->\r", 228 m = getchar().lower() 229 # Erase our "More" prompt 230 print " \r", 231 if m == "q": 232 break 233 print line 234 elif i == "H": 235 pprint.pprint(self.headers) 236 elif i == "S": 237 print self.status 238 elif i == "U": 239 print self.url 240 elif i == "I": 241 # return without raising the normal exception 242 return 243 elif i == "R": 244 raise self.failureException(msg) 245 elif i == "X": 246 self.exit() 247 print p,
248
249 - def exit(self):
250 sys.exit()
251 252 if sys.version_info >= (2, 5):
253 - def __call__(self, result=None):
254 if result is None: 255 result = self.defaultTestResult() 256 result.startTest(self) 257 testMethod = getattr(self, self._testMethodName) 258 try: 259 try: 260 self.setUp() 261 except (KeyboardInterrupt, SystemExit): 262 raise 263 except: 264 result.addError(self, self._exc_info()) 265 return 266 267 ok = 0 268 try: 269 testMethod() 270 ok = 1 271 except self.failureException: 272 result.addFailure(self, self._exc_info()) 273 except (KeyboardInterrupt, SystemExit): 274 raise 275 except: 276 result.addError(self, self._exc_info()) 277 278 try: 279 self.tearDown() 280 except (KeyboardInterrupt, SystemExit): 281 raise 282 except: 283 result.addError(self, self._exc_info()) 284 ok = 0 285 if ok: 286 result.addSuccess(self) 287 finally: 288 result.stopTest(self)
289 else:
290 - def __call__(self, result=None):
291 if result is None: 292 result = self.defaultTestResult() 293 result.startTest(self) 294 testMethod = getattr(self, self._TestCase__testMethodName) 295 try: 296 try: 297 self.setUp() 298 except (KeyboardInterrupt, SystemExit): 299 raise 300 except: 301 result.addError(self, self._TestCase__exc_info()) 302 return 303 304 ok = 0 305 try: 306 testMethod() 307 ok = 1 308 except self.failureException: 309 result.addFailure(self, self._TestCase__exc_info()) 310 except (KeyboardInterrupt, SystemExit): 311 raise 312 except: 313 result.addError(self, self._TestCase__exc_info()) 314 315 try: 316 self.tearDown() 317 except (KeyboardInterrupt, SystemExit): 318 raise 319 except: 320 result.addError(self, self._TestCase__exc_info()) 321 ok = 0 322 if ok: 323 result.addSuccess(self) 324 finally: 325 result.stopTest(self)
326
327 - def assertStatus(self, status, msg=None):
328 """Fail if self.status != status.""" 329 if isinstance(status, basestring): 330 if not self.status == status: 331 if msg is None: 332 msg = 'Status (%s) != %s' % (`self.status`, `status`) 333 self._handlewebError(msg) 334 elif isinstance(status, int): 335 code = int(self.status[:3]) 336 if code != status: 337 if msg is None: 338 msg = 'Status (%s) != %s' % (`self.status`, `status`) 339 self._handlewebError(msg) 340 else: 341 # status is a tuple or list. 342 match = False 343 for s in status: 344 if isinstance(s, basestring): 345 if self.status == s: 346 match = True 347 break 348 elif int(self.status[:3]) == s: 349 match = True 350 break 351 if not match: 352 if msg is None: 353 msg = 'Status (%s) not in %s' % (`self.status`, `status`) 354 self._handlewebError(msg)
355
356 - def assertHeader(self, key, value=None, msg=None):
357 """Fail if (key, [value]) not in self.headers.""" 358 lowkey = key.lower() 359 for k, v in self.headers: 360 if k.lower() == lowkey: 361 if value is None or str(value) == v: 362 return v 363 364 if msg is None: 365 if value is None: 366 msg = '%s not in headers' % `key` 367 else: 368 msg = '%s:%s not in headers' % (`key`, `value`) 369 self._handlewebError(msg)
370
371 - def assertNoHeader(self, key, msg=None):
372 """Fail if key in self.headers.""" 373 lowkey = key.lower() 374 matches = [k for k, v in self.headers if k.lower() == lowkey] 375 if matches: 376 if msg is None: 377 msg = '%s in headers' % `key` 378 self._handlewebError(msg)
379
380 - def assertBody(self, value, msg=None):
381 """Fail if value != self.body.""" 382 if value != self.body: 383 if msg is None: 384 msg = 'expected body:\n%s\n\nactual body:\n%s' % (`value`, `self.body`) 385 self._handlewebError(msg)
386
387 - def assertInBody(self, value, msg=None):
388 """Fail if value not in self.body.""" 389 if value not in self.body: 390 if msg is None: 391 msg = '%s not in body' % `value` 392 self._handlewebError(msg)
393
394 - def assertNotInBody(self, value, msg=None):
395 """Fail if value in self.body.""" 396 if value in self.body: 397 if msg is None: 398 msg = '%s found in body' % `value` 399 self._handlewebError(msg)
400
401 - def assertMatchesBody(self, pattern, msg=None, flags=0):
402 """Fail if value (a regex pattern) is not in self.body.""" 403 if re.search(pattern, self.body, flags) is None: 404 if msg is None: 405 msg = 'No match for %s in body' % `pattern` 406 self._handlewebError(msg)
407 408 409 methods_with_bodies = ("POST", "PUT") 410
411 -def cleanHeaders(headers, method, body, host, port):
412 """Return request headers, with required headers added (if missing).""" 413 if headers is None: 414 headers = [] 415 416 # Add the required Host request header if not present. 417 # [This specifies the host:port of the server, not the client.] 418 found = False 419 for k, v in headers: 420 if k.lower() == 'host': 421 found = True 422 break 423 if not found: 424 if port == 80: 425 headers.append(("Host", host)) 426 else: 427 headers.append(("Host", "%s:%s" % (host, port))) 428 429 if method in methods_with_bodies: 430 # Stick in default type and length headers if not present 431 found = False 432 for k, v in headers: 433 if k.lower() == 'content-type': 434 found = True 435 break 436 if not found: 437 headers.append(("Content-Type", "application/x-www-form-urlencoded")) 438 headers.append(("Content-Length", str(len(body or "")))) 439 440 return headers
441 442
443 -def shb(response):
444 """Return status, headers, body the way we like from a response.""" 445 h = [] 446 key, value = None, None 447 for line in response.msg.headers: 448 if line: 449 if line[0] in " \t": 450 value += line.strip() 451 else: 452 if key and value: 453 h.append((key, value)) 454 key, value = line.split(":", 1) 455 key = key.strip() 456 value = value.strip() 457 if key and value: 458 h.append((key, value)) 459 460 return "%s %s" % (response.status, response.reason), h, response.read()
461 462
463 -def openURL(url, headers=None, method="GET", body=None, 464 host="127.0.0.1", port=8000, http_conn=httplib.HTTPConnection, 465 protocol="HTTP/1.1"):
466 """Open the given HTTP resource and return status, headers, and body.""" 467 468 headers = cleanHeaders(headers, method, body, host, port) 469 470 # Trying 10 times is simply in case of socket errors. 471 # Normal case--it should run once. 472 for trial in xrange(10): 473 try: 474 # Allow http_conn to be a class or an instance 475 if hasattr(http_conn, "host"): 476 conn = http_conn 477 else: 478 conn = http_conn(host, port) 479 480 conn._http_vsn_str = protocol 481 conn._http_vsn = int("".join([x for x in protocol if x.isdigit()])) 482 483 # skip_accept_encoding argument added in python version 2.4 484 if sys.version_info < (2, 4): 485 def putheader(self, header, value): 486 if header == 'Accept-Encoding' and value == 'identity': 487 return 488 self.__class__.putheader(self, header, value)
489 import new 490 conn.putheader = new.instancemethod(putheader, conn, conn.__class__) 491 conn.putrequest(method.upper(), url, skip_host=True) 492 else: 493 conn.putrequest(method.upper(), url, skip_host=True, 494 skip_accept_encoding=True) 495 496 for key, value in headers: 497 conn.putheader(key, value) 498 conn.endheaders() 499 500 if body is not None: 501 conn.send(body) 502 503 # Handle response 504 response = conn.getresponse() 505 506 s, h, b = shb(response) 507 508 if not hasattr(http_conn, "host"): 509 # We made our own conn instance. Close it. 510 conn.close() 511 512 return s, h, b 513 except socket.error: 514 time.sleep(0.5) 515 raise 516 517 518 # Add any exceptions which your web framework handles 519 # normally (that you don't want server_error to trap). 520 ignored_exceptions = [] 521 522 # You'll want set this to True when you can't guarantee 523 # that each response will immediately follow each request; 524 # for example, when handling requests via multiple threads. 525 ignore_all = False 526
527 -class ServerError(Exception):
528 on = False
529 530
531 -def server_error(exc=None):
532 """Server debug hook. Return True if exception handled, False if ignored. 533 534 You probably want to wrap this, so you can still handle an error using 535 your framework when it's ignored. 536 """ 537 if exc is None: 538 exc = sys.exc_info() 539 540 if ignore_all or exc[0] in ignored_exceptions: 541 return False 542 else: 543 ServerError.on = True 544 print 545 print "".join(traceback.format_exception(*exc)) 546 return True
547