Package cherrypy :: Package test :: Module webtest
[hide private]
[frames] | no frames]

Source Code for Module cherrypy.test.webtest

  1  """Extensions to unittest for web frameworks. 
  2   
  3  Use the WebCase.getPage method to request a page from your HTTP server. 
  4   
  5  Framework Integration 
  6  ===================== 
  7   
  8  If you have control over your server process, you can handle errors 
  9  in the server-side of the HTTP conversation a bit better. You must run 
 10  both the client (your WebCase tests) and the server in the same process 
 11  (but in separate threads, obviously). 
 12   
 13  When an error occurs in the framework, call server_error. It will print 
 14  the traceback to stdout, and keep any assertions you have from running 
 15  (the assumption is that, if the server errors, the page output will not 
 16  be of further significance to your tests). 
 17  """ 
 18   
 19  import pprint 
 20  import re 
 21  import socket 
 22  import sys 
 23  import time 
 24  import traceback 
 25  import types 
 26   
 27  from unittest import * 
 28  from unittest import _TextTestResult 
 29   
 30  from cherrypy._cpcompat import basestring, ntob, py3k, HTTPConnection 
 31  from cherrypy._cpcompat import HTTPSConnection, unicodestr 
 32   
 33   
34 -def interface(host):
35 """Return an IP address for a client connection given the server host. 36 37 If the server is listening on '0.0.0.0' (INADDR_ANY) 38 or '::' (IN6ADDR_ANY), this will return the proper localhost.""" 39 if host == '0.0.0.0': 40 # INADDR_ANY, which should respond on localhost. 41 return "127.0.0.1" 42 if host == '::': 43 # IN6ADDR_ANY, which should respond on localhost. 44 return "::1" 45 return host
46 47
48 -class TerseTestResult(_TextTestResult):
49
50 - def printErrors(self):
51 # Overridden to avoid unnecessary empty line 52 if self.errors or self.failures: 53 if self.dots or self.showAll: 54 self.stream.writeln() 55 self.printErrorList('ERROR', self.errors) 56 self.printErrorList('FAIL', self.failures)
57 58
59 -class TerseTestRunner(TextTestRunner):
60 61 """A test runner class that displays results in textual form.""" 62
63 - def _makeResult(self):
64 return TerseTestResult(self.stream, self.descriptions, self.verbosity)
65
66 - def run(self, test):
67 "Run the given test case or test suite." 68 # Overridden to remove unnecessary empty lines and separators 69 result = self._makeResult() 70 test(result) 71 result.printErrors() 72 if not result.wasSuccessful(): 73 self.stream.write("FAILED (") 74 failed, errored = list(map(len, (result.failures, result.errors))) 75 if failed: 76 self.stream.write("failures=%d" % failed) 77 if errored: 78 if failed: 79 self.stream.write(", ") 80 self.stream.write("errors=%d" % errored) 81 self.stream.writeln(")") 82 return result
83 84
85 -class ReloadingTestLoader(TestLoader):
86
87 - def loadTestsFromName(self, name, module=None):
88 """Return a suite of all tests cases given a string specifier. 89 90 The name may resolve either to a module, a test case class, a 91 test method within a test case class, or a callable object which 92 returns a TestCase or TestSuite instance. 93 94 The method optionally resolves the names relative to a given module. 95 """ 96 parts = name.split('.') 97 unused_parts = [] 98 if module is None: 99 if not parts: 100 raise ValueError("incomplete test name: %s" % name) 101 else: 102 parts_copy = parts[:] 103 while parts_copy: 104 target = ".".join(parts_copy) 105 if target in sys.modules: 106 module = reload(sys.modules[target]) 107 parts = unused_parts 108 break 109 else: 110 try: 111 module = __import__(target) 112 parts = unused_parts 113 break 114 except ImportError: 115 unused_parts.insert(0, parts_copy[-1]) 116 del parts_copy[-1] 117 if not parts_copy: 118 raise 119 parts = parts[1:] 120 obj = module 121 for part in parts: 122 obj = getattr(obj, part) 123 124 if isinstance(obj, types.ModuleType): 125 return self.loadTestsFromModule(obj) 126 elif (((py3k and isinstance(obj, type)) 127 or isinstance(obj, (type, types.ClassType))) 128 and issubclass(obj, TestCase)): 129 return self.loadTestsFromTestCase(obj) 130 elif isinstance(obj, types.UnboundMethodType): 131 if py3k: 132 return obj.__self__.__class__(obj.__name__) 133 else: 134 return obj.im_class(obj.__name__) 135 elif hasattr(obj, '__call__'): 136 test = obj() 137 if not isinstance(test, TestCase) and \ 138 not isinstance(test, TestSuite): 139 raise ValueError("calling %s returned %s, " 140 "not a test" % (obj, test)) 141 return test 142 else: 143 raise ValueError("do not know how to make test from: %s" % obj)
144 145 146 try: 147 # Jython support 148 if sys.platform[:4] == 'java':
149 - def getchar():
150 # Hopefully this is enough 151 return sys.stdin.read(1)
152 else: 153 # On Windows, msvcrt.getch reads a single char without output. 154 import msvcrt 155
156 - def getchar():
157 return msvcrt.getch()
158 except ImportError: 159 # Unix getchr 160 import tty 161 import termios 162
163 - def getchar():
164 fd = sys.stdin.fileno() 165 old_settings = termios.tcgetattr(fd) 166 try: 167 tty.setraw(sys.stdin.fileno()) 168 ch = sys.stdin.read(1) 169 finally: 170 termios.tcsetattr(fd, termios.TCSADRAIN, old_settings) 171 return ch
172 173
174 -class WebCase(TestCase):
175 HOST = "127.0.0.1" 176 PORT = 8000 177 HTTP_CONN = HTTPConnection 178 PROTOCOL = "HTTP/1.1" 179 180 scheme = "http" 181 url = None 182 183 status = None 184 headers = None 185 body = None 186 187 encoding = 'utf-8' 188 189 time = None 190
191 - def get_conn(self, auto_open=False):
192 """Return a connection to our HTTP server.""" 193 if self.scheme == "https": 194 cls = HTTPSConnection 195 else: 196 cls = HTTPConnection 197 conn = cls(self.interface(), self.PORT) 198 # Automatically re-connect? 199 conn.auto_open = auto_open 200 conn.connect() 201 return conn
202
203 - def set_persistent(self, on=True, auto_open=False):
204 """Make our HTTP_CONN persistent (or not). 205 206 If the 'on' argument is True (the default), then self.HTTP_CONN 207 will be set to an instance of HTTPConnection (or HTTPS 208 if self.scheme is "https"). This will then persist across requests. 209 210 We only allow for a single open connection, so if you call this 211 and we currently have an open connection, it will be closed. 212 """ 213 try: 214 self.HTTP_CONN.close() 215 except (TypeError, AttributeError): 216 pass 217 218 if on: 219 self.HTTP_CONN = self.get_conn(auto_open=auto_open) 220 else: 221 if self.scheme == "https": 222 self.HTTP_CONN = HTTPSConnection 223 else: 224 self.HTTP_CONN = HTTPConnection
225
226 - def _get_persistent(self):
227 return hasattr(self.HTTP_CONN, "__class__")
228
229 - def _set_persistent(self, on):
230 self.set_persistent(on)
231 persistent = property(_get_persistent, _set_persistent) 232
233 - def interface(self):
234 """Return an IP address for a client connection. 235 236 If the server is listening on '0.0.0.0' (INADDR_ANY) 237 or '::' (IN6ADDR_ANY), this will return the proper localhost.""" 238 return interface(self.HOST)
239
240 - def getPage(self, url, headers=None, method="GET", body=None, 241 protocol=None):
242 """Open the url with debugging support. Return status, headers, body. 243 """ 244 ServerError.on = False 245 246 if isinstance(url, unicodestr): 247 url = url.encode('utf-8') 248 if isinstance(body, unicodestr): 249 body = body.encode('utf-8') 250 251 self.url = url 252 self.time = None 253 start = time.time() 254 result = openURL(url, headers, method, body, self.HOST, self.PORT, 255 self.HTTP_CONN, protocol or self.PROTOCOL) 256 self.time = time.time() - start 257 self.status, self.headers, self.body = result 258 259 # Build a list of request cookies from the previous response cookies. 260 self.cookies = [('Cookie', v) for k, v in self.headers 261 if k.lower() == 'set-cookie'] 262 263 if ServerError.on: 264 raise ServerError() 265 return result
266 267 interactive = True 268 console_height = 30 269
270 - def _handlewebError(self, msg):
271 print("") 272 print(" ERROR: %s" % msg) 273 274 if not self.interactive: 275 raise self.failureException(msg) 276 277 p = (" Show: " 278 "[B]ody [H]eaders [S]tatus [U]RL; " 279 "[I]gnore, [R]aise, or sys.e[X]it >> ") 280 sys.stdout.write(p) 281 sys.stdout.flush() 282 while True: 283 i = getchar().upper() 284 if not isinstance(i, type("")): 285 i = i.decode('ascii') 286 if i not in "BHSUIRX": 287 continue 288 print(i.upper()) # Also prints new line 289 if i == "B": 290 for x, line in enumerate(self.body.splitlines()): 291 if (x + 1) % self.console_height == 0: 292 # The \r and comma should make the next line overwrite 293 sys.stdout.write("<-- More -->\r") 294 m = getchar().lower() 295 # Erase our "More" prompt 296 sys.stdout.write(" \r") 297 if m == "q": 298 break 299 print(line) 300 elif i == "H": 301 pprint.pprint(self.headers) 302 elif i == "S": 303 print(self.status) 304 elif i == "U": 305 print(self.url) 306 elif i == "I": 307 # return without raising the normal exception 308 return 309 elif i == "R": 310 raise self.failureException(msg) 311 elif i == "X": 312 self.exit() 313 sys.stdout.write(p) 314 sys.stdout.flush()
315
316 - def exit(self):
317 sys.exit()
318
319 - def assertStatus(self, status, msg=None):
320 """Fail if self.status != status.""" 321 if isinstance(status, basestring): 322 if not self.status == status: 323 if msg is None: 324 msg = 'Status (%r) != %r' % (self.status, status) 325 self._handlewebError(msg) 326 elif isinstance(status, int): 327 code = int(self.status[:3]) 328 if code != status: 329 if msg is None: 330 msg = 'Status (%r) != %r' % (self.status, status) 331 self._handlewebError(msg) 332 else: 333 # status is a tuple or list. 334 match = False 335 for s in status: 336 if isinstance(s, basestring): 337 if self.status == s: 338 match = True 339 break 340 elif int(self.status[:3]) == s: 341 match = True 342 break 343 if not match: 344 if msg is None: 345 msg = 'Status (%r) not in %r' % (self.status, status) 346 self._handlewebError(msg)
347
348 - def assertHeader(self, key, value=None, msg=None):
349 """Fail if (key, [value]) not in self.headers.""" 350 lowkey = key.lower() 351 for k, v in self.headers: 352 if k.lower() == lowkey: 353 if value is None or str(value) == v: 354 return v 355 356 if msg is None: 357 if value is None: 358 msg = '%r not in headers' % key 359 else: 360 msg = '%r:%r not in headers' % (key, value) 361 self._handlewebError(msg)
362
363 - def assertHeaderIn(self, key, values, msg=None):
364 """Fail if header indicated by key doesn't have one of the values.""" 365 lowkey = key.lower() 366 for k, v in self.headers: 367 if k.lower() == lowkey: 368 matches = [value for value in values if str(value) == v] 369 if matches: 370 return matches 371 372 if msg is None: 373 msg = '%(key)r not in %(values)r' % vars() 374 self._handlewebError(msg)
375
376 - def assertHeaderItemValue(self, key, value, msg=None):
377 """Fail if the header does not contain the specified value""" 378 actual_value = self.assertHeader(key, msg=msg) 379 header_values = map(str.strip, actual_value.split(',')) 380 if value in header_values: 381 return value 382 383 if msg is None: 384 msg = "%r not in %r" % (value, header_values) 385 self._handlewebError(msg)
386
387 - def assertNoHeader(self, key, msg=None):
388 """Fail if key in self.headers.""" 389 lowkey = key.lower() 390 matches = [k for k, v in self.headers if k.lower() == lowkey] 391 if matches: 392 if msg is None: 393 msg = '%r in headers' % key 394 self._handlewebError(msg)
395
396 - def assertBody(self, value, msg=None):
397 """Fail if value != self.body.""" 398 if isinstance(value, unicodestr): 399 value = value.encode(self.encoding) 400 if value != self.body: 401 if msg is None: 402 msg = 'expected body:\n%r\n\nactual body:\n%r' % ( 403 value, self.body) 404 self._handlewebError(msg)
405
406 - def assertInBody(self, value, msg=None):
407 """Fail if value not in self.body.""" 408 if isinstance(value, unicodestr): 409 value = value.encode(self.encoding) 410 if value not in self.body: 411 if msg is None: 412 msg = '%r not in body: %s' % (value, self.body) 413 self._handlewebError(msg)
414
415 - def assertNotInBody(self, value, msg=None):
416 """Fail if value in self.body.""" 417 if isinstance(value, unicodestr): 418 value = value.encode(self.encoding) 419 if value in self.body: 420 if msg is None: 421 msg = '%r found in body' % value 422 self._handlewebError(msg)
423
424 - def assertMatchesBody(self, pattern, msg=None, flags=0):
425 """Fail if value (a regex pattern) is not in self.body.""" 426 if isinstance(pattern, unicodestr): 427 pattern = pattern.encode(self.encoding) 428 if re.search(pattern, self.body, flags) is None: 429 if msg is None: 430 msg = 'No match for %r in body' % pattern 431 self._handlewebError(msg)
432 433 434 methods_with_bodies = ("POST", "PUT") 435 436
437 -def cleanHeaders(headers, method, body, host, port):
438 """Return request headers, with required headers added (if missing).""" 439 if headers is None: 440 headers = [] 441 442 # Add the required Host request header if not present. 443 # [This specifies the host:port of the server, not the client.] 444 found = False 445 for k, v in headers: 446 if k.lower() == 'host': 447 found = True 448 break 449 if not found: 450 if port == 80: 451 headers.append(("Host", host)) 452 else: 453 headers.append(("Host", "%s:%s" % (host, port))) 454 455 if method in methods_with_bodies: 456 # Stick in default type and length headers if not present 457 found = False 458 for k, v in headers: 459 if k.lower() == 'content-type': 460 found = True 461 break 462 if not found: 463 headers.append( 464 ("Content-Type", "application/x-www-form-urlencoded")) 465 headers.append(("Content-Length", str(len(body or "")))) 466 467 return headers
468 469
470 -def shb(response):
471 """Return status, headers, body the way we like from a response.""" 472 if py3k: 473 h = response.getheaders() 474 else: 475 h = [] 476 key, value = None, None 477 for line in response.msg.headers: 478 if line: 479 if line[0] in " \t": 480 value += line.strip() 481 else: 482 if key and value: 483 h.append((key, value)) 484 key, value = line.split(":", 1) 485 key = key.strip() 486 value = value.strip() 487 if key and value: 488 h.append((key, value)) 489 490 return "%s %s" % (response.status, response.reason), h, response.read()
491 492
493 -def openURL(url, headers=None, method="GET", body=None, 494 host="127.0.0.1", port=8000, http_conn=HTTPConnection, 495 protocol="HTTP/1.1"):
496 """Open the given HTTP resource and return status, headers, and body.""" 497 498 headers = cleanHeaders(headers, method, body, host, port) 499 500 # Trying 10 times is simply in case of socket errors. 501 # Normal case--it should run once. 502 for trial in range(10): 503 try: 504 # Allow http_conn to be a class or an instance 505 if hasattr(http_conn, "host"): 506 conn = http_conn 507 else: 508 conn = http_conn(interface(host), port) 509 510 conn._http_vsn_str = protocol 511 conn._http_vsn = int("".join([x for x in protocol if x.isdigit()])) 512 513 # skip_accept_encoding argument added in python version 2.4 514 if sys.version_info < (2, 4): 515 def putheader(self, header, value): 516 if header == 'Accept-Encoding' and value == 'identity': 517 return 518 self.__class__.putheader(self, header, value)
519 import new 520 conn.putheader = new.instancemethod( 521 putheader, conn, conn.__class__) 522 conn.putrequest(method.upper(), url, skip_host=True) 523 elif not py3k: 524 conn.putrequest(method.upper(), url, skip_host=True, 525 skip_accept_encoding=True) 526 else: 527 import http.client 528 # Replace the stdlib method, which only accepts ASCII url's 529 530 def putrequest(self, method, url): 531 if ( 532 self._HTTPConnection__response and 533 self._HTTPConnection__response.isclosed() 534 ): 535 self._HTTPConnection__response = None 536 537 if self._HTTPConnection__state == http.client._CS_IDLE: 538 self._HTTPConnection__state = ( 539 http.client._CS_REQ_STARTED) 540 else: 541 raise http.client.CannotSendRequest() 542 543 self._method = method 544 if not url: 545 url = ntob('/') 546 request = ntob(' ').join( 547 (method.encode("ASCII"), 548 url, 549 self._http_vsn_str.encode("ASCII"))) 550 self._output(request) 551 import types 552 conn.putrequest = types.MethodType(putrequest, conn) 553 554 conn.putrequest(method.upper(), url) 555 556 for key, value in headers: 557 conn.putheader(key, value.encode("Latin-1")) 558 conn.endheaders() 559 560 if body is not None: 561 conn.send(body) 562 563 # Handle response 564 response = conn.getresponse() 565 566 s, h, b = shb(response) 567 568 if not hasattr(http_conn, "host"): 569 # We made our own conn instance. Close it. 570 conn.close() 571 572 return s, h, b 573 except socket.error: 574 time.sleep(0.5) 575 if trial == 9: 576 raise 577 578 579 # Add any exceptions which your web framework handles 580 # normally (that you don't want server_error to trap). 581 ignored_exceptions = [] 582 583 # You'll want set this to True when you can't guarantee 584 # that each response will immediately follow each request; 585 # for example, when handling requests via multiple threads. 586 ignore_all = False 587 588
589 -class ServerError(Exception):
590 on = False
591 592
593 -def server_error(exc=None):
594 """Server debug hook. Return True if exception handled, False if ignored. 595 596 You probably want to wrap this, so you can still handle an error using 597 your framework when it's ignored. 598 """ 599 if exc is None: 600 exc = sys.exc_info() 601 602 if ignore_all or exc[0] in ignored_exceptions: 603 return False 604 else: 605 ServerError.on = True 606 print("") 607 print("".join(traceback.format_exception(*exc))) 608 return True
609