1 from cherrypy.test import test
2 test.prefer_parent_path()
3
4 import os
5 localDir = os.path.dirname(__file__)
6 import sys
7 import threading
8 import time
9
10 import cherrypy
11 from cherrypy.lib import sessions
12
13
15 class Root:
16
17 _cp_config = {'tools.sessions.on': True,
18 'tools.sessions.storage_type' : 'ram',
19 'tools.sessions.storage_path' : localDir,
20 'tools.sessions.timeout': 0.017,
21 'tools.sessions.clean_freq': 0.017,
22 }
23
24 def testGen(self):
25 counter = cherrypy.session.get('counter', 0) + 1
26 cherrypy.session['counter'] = counter
27 yield str(counter)
28 testGen.exposed = True
29
30 def testStr(self):
31 counter = cherrypy.session.get('counter', 0) + 1
32 cherrypy.session['counter'] = counter
33 return str(counter)
34 testStr.exposed = True
35
36 def setsessiontype(self, newtype):
37 self.__class__._cp_config.update({'tools.sessions.storage_type': newtype})
38 setsessiontype.exposed = True
39
40 def index(self):
41 sess = cherrypy.session
42 c = sess.get('counter', 0) + 1
43 time.sleep(0.01)
44 sess['counter'] = c
45 return str(c)
46 index.exposed = True
47
48 def keyin(self, key):
49 return str(key in cherrypy.session)
50 keyin.exposed = True
51
52 def delete(self):
53 cherrypy.session.delete()
54 sessions.expire()
55 return "done"
56 delete.exposed = True
57
58 def delkey(self, key):
59 del cherrypy.session[key]
60 return "OK"
61 delkey.exposed = True
62
63 def blah(self):
64 return self._cp_config['tools.sessions.storage_type']
65 blah.exposed = True
66
67 def iredir(self):
68 raise cherrypy.InternalRedirect('/blah')
69 iredir.exposed = True
70
71 cherrypy.tree.mount(Root())
72 cherrypy.config.update({'environment': 'test_suite'})
73
74
75 from cherrypy.test import helper
76
78
80 self.getPage('/testStr')
81 self.assertBody('1')
82 self.getPage('/testGen', self.cookies)
83 self.assertBody('2')
84 self.getPage('/testStr', self.cookies)
85 self.assertBody('3')
86 self.getPage('/delkey?key=counter', self.cookies)
87 self.assertStatus(200)
88
89 self.getPage('/setsessiontype/file')
90 self.getPage('/testStr')
91 self.assertBody('1')
92 self.getPage('/testGen', self.cookies)
93 self.assertBody('2')
94 self.getPage('/testStr', self.cookies)
95 self.assertBody('3')
96 self.getPage('/delkey?key=counter', self.cookies)
97 self.assertStatus(200)
98
99
100 time.sleep(1.25)
101 self.getPage('/')
102 self.assertBody('1')
103
104
105 self.getPage('/keyin?key=counter', self.cookies)
106 self.assertBody("True")
107
108
109 self.getPage('/delete', self.cookies)
110 self.assertBody("done")
111 f = lambda: [x for x in os.listdir(localDir) if x.startswith('session-')]
112 self.assertEqual(f(), [])
113
114
115 self.getPage('/')
116 f = lambda: [x for x in os.listdir(localDir) if x.startswith('session-')]
117 self.assertNotEqual(f(), [])
118 time.sleep(2)
119 self.assertEqual(f(), [])
120
124
128
130 client_thread_count = 5
131 request_count = 30
132
133
134 self.getPage("/")
135 self.assertBody("1")
136 cookies = self.cookies
137
138 data_dict = {}
139
140 def request(index):
141 for i in xrange(request_count):
142 self.getPage("/", cookies)
143
144
145 data_dict[index] = v = int(self.body)
146
147
148
149 ts = []
150 for c in xrange(client_thread_count):
151 data_dict[c] = 0
152 t = threading.Thread(target=request, args=(c,))
153 ts.append(t)
154 t.start()
155
156 for t in ts:
157 t.join()
158
159 hitcount = max(data_dict.values())
160 expected = 1 + (client_thread_count * request_count)
161 self.assertEqual(hitcount, expected)
162
168
170
171 self.getPage('/testStr')
172
173 id = self.cookies[0][1].split(";", 1)[0].split("=", 1)[1]
174 path = os.path.join(localDir, "session-" + id)
175 os.unlink(path)
176 self.getPage('/testStr', self.cookies)
177
178
179
180 if __name__ == "__main__":
181 setup_server()
182 helper.testmain()
183