1 """An implementation of the Web Site Process Bus.
2
3 This module is completely standalone, depending only on the stdlib.
4
5 Web Site Process Bus
6 --------------------
7
8 A Bus object is used to contain and manage site-wide behavior:
9 daemonization, HTTP server start/stop, process reload, signal handling,
10 drop privileges, PID file management, logging for all of these,
11 and many more.
12
13 In addition, a Bus object provides a place for each web framework
14 to register code that runs in response to site-wide events (like
15 process start and stop), or which controls or otherwise interacts with
16 the site-wide components mentioned above. For example, a framework which
17 uses file-based templates would add known template filenames to an
18 autoreload component.
19
20 Ideally, a Bus object will be flexible enough to be useful in a variety
21 of invocation scenarios:
22
23 1. The deployer starts a site from the command line via a
24 framework-neutral deployment script; applications from multiple frameworks
25 are mixed in a single site. Command-line arguments and configuration
26 files are used to define site-wide components such as the HTTP server,
27 WSGI component graph, autoreload behavior, signal handling, etc.
28 2. The deployer starts a site via some other process, such as Apache;
29 applications from multiple frameworks are mixed in a single site.
30 Autoreload and signal handling (from Python at least) are disabled.
31 3. The deployer starts a site via a framework-specific mechanism;
32 for example, when running tests, exploring tutorials, or deploying
33 single applications from a single framework. The framework controls
34 which site-wide components are enabled as it sees fit.
35
36 The Bus object in this package uses topic-based publish-subscribe
37 messaging to accomplish all this. A few topic channels are built in
38 ('start', 'stop', 'exit', 'graceful', 'log', and 'main'). Frameworks and
39 site containers are free to define their own. If a message is sent to a
40 channel that has not been defined or has no listeners, there is no effect.
41
42 In general, there should only ever be a single Bus object per process.
43 Frameworks and site containers share a single Bus object by publishing
44 messages and subscribing listeners.
45
46 The Bus object works as a finite state machine which models the current
47 state of the process. Bus methods move it from one state to another;
48 those methods then publish to subscribed listeners on the channel for
49 the new state.::
50
51 O
52 |
53 V
54 STOPPING --> STOPPED --> EXITING -> X
55 A A |
56 | \___ |
57 | \ |
58 | V V
59 STARTED <-- STARTING
60
61 """
62
63 import atexit
64 import os
65 import sys
66 import threading
67 import time
68 import traceback as _traceback
69 import warnings
70
71 from cherrypy._cpcompat import set
72
73
74
75
76
77
78
79 _startup_cwd = os.getcwd()
80
81
83
84 """Exception raised when errors occur in a listener during Bus.publish().
85 """
86 delimiter = '\n'
87
93
95 """Append the current exception to self."""
96 self._exceptions.append(sys.exc_info()[1])
97
99 """Return a list of seen exception instances."""
100 return self._exceptions[:]
101
105
106 __repr__ = __str__
107
109 return bool(self._exceptions)
110 __nonzero__ = __bool__
111
112
113
114
116
118 name = None
119
121 return "states.%s" % self.name
122
127 states = _StateEnum()
128 states.STOPPED = states.State()
129 states.STARTING = states.State()
130 states.STARTED = states.State()
131 states.STOPPING = states.State()
132 states.EXITING = states.State()
133
134
135 try:
136 import fcntl
137 except ImportError:
138 max_files = 0
139 else:
140 try:
141 max_files = os.sysconf('SC_OPEN_MAX')
142 except AttributeError:
143 max_files = 1024
144
145
147
148 """Process state-machine and messenger for HTTP site deployment.
149
150 All listeners for a given channel are guaranteed to be called even
151 if others at the same channel fail. Each failure is logged, but
152 execution proceeds on to the next listener. The only way to stop all
153 processing from inside a listener is to raise SystemExit and stop the
154 whole server.
155 """
156
157 states = states
158 state = states.STOPPED
159 execv = False
160 max_cloexec_files = max_files
161
163 self.execv = False
164 self.state = states.STOPPED
165 self.listeners = dict(
166 [(channel, set()) for channel
167 in ('start', 'stop', 'exit', 'graceful', 'log', 'main')])
168 self._priorities = {}
169
170 - def subscribe(self, channel, callback, priority=None):
171 """Add the given callback at the given channel (if not present)."""
172 if channel not in self.listeners:
173 self.listeners[channel] = set()
174 self.listeners[channel].add(callback)
175
176 if priority is None:
177 priority = getattr(callback, 'priority', 50)
178 self._priorities[(channel, callback)] = priority
179
181 """Discard the given callback (if present)."""
182 listeners = self.listeners.get(channel)
183 if listeners and callback in listeners:
184 listeners.discard(callback)
185 del self._priorities[(channel, callback)]
186
187 - def publish(self, channel, *args, **kwargs):
188 """Return output of all subscribers for the given channel."""
189 if channel not in self.listeners:
190 return []
191
192 exc = ChannelFailures()
193 output = []
194
195 items = [(self._priorities[(channel, listener)], listener)
196 for listener in self.listeners[channel]]
197 try:
198 items.sort(key=lambda item: item[0])
199 except TypeError:
200
201
202 items.sort()
203 for priority, listener in items:
204 try:
205 output.append(listener(*args, **kwargs))
206 except KeyboardInterrupt:
207 raise
208 except SystemExit:
209 e = sys.exc_info()[1]
210
211 if exc and e.code == 0:
212 e.code = 1
213 raise
214 except:
215 exc.handle_exception()
216 if channel == 'log':
217
218 pass
219 else:
220 self.log("Error in %r listener %r" % (channel, listener),
221 level=40, traceback=True)
222 if exc:
223 raise exc
224 return output
225
227 """An atexit handler which asserts the Bus is not running."""
228 if self.state != states.EXITING:
229 warnings.warn(
230 "The main thread is exiting, but the Bus is in the %r state; "
231 "shutting it down automatically now. You must either call "
232 "bus.block() after start(), or call bus.exit() before the "
233 "main thread exits." % self.state, RuntimeWarning)
234 self.exit()
235
237 """Start all services."""
238 atexit.register(self._clean_exit)
239
240 self.state = states.STARTING
241 self.log('Bus STARTING')
242 try:
243 self.publish('start')
244 self.state = states.STARTED
245 self.log('Bus STARTED')
246 except (KeyboardInterrupt, SystemExit):
247 raise
248 except:
249 self.log("Shutting down due to error in start listener:",
250 level=40, traceback=True)
251 e_info = sys.exc_info()[1]
252 try:
253 self.exit()
254 except:
255
256 pass
257
258 raise e_info
259
261 """Stop all services and prepare to exit the process."""
262 exitstate = self.state
263 try:
264 self.stop()
265
266 self.state = states.EXITING
267 self.log('Bus EXITING')
268 self.publish('exit')
269
270
271 self.log('Bus EXITED')
272 except:
273
274
275
276
277 os._exit(70)
278
279 if exitstate == states.STARTING:
280
281
282
283
284 os._exit(70)
285
287 """Restart the process (may close connections).
288
289 This method does not restart the process from the calling thread;
290 instead, it stops the bus and asks the main thread to call execv.
291 """
292 self.execv = True
293 self.exit()
294
296 """Advise all services to reload."""
297 self.log('Bus graceful')
298 self.publish('graceful')
299
300 - def block(self, interval=0.1):
301 """Wait for the EXITING state, KeyboardInterrupt or SystemExit.
302
303 This function is intended to be called only by the main thread.
304 After waiting for the EXITING state, it also waits for all threads
305 to terminate, and then calls os.execv if self.execv is True. This
306 design allows another thread to call bus.restart, yet have the main
307 thread perform the actual execv call (required on some platforms).
308 """
309 try:
310 self.wait(states.EXITING, interval=interval, channel='main')
311 except (KeyboardInterrupt, IOError):
312
313
314 self.log('Keyboard Interrupt: shutting down bus')
315 self.exit()
316 except SystemExit:
317 self.log('SystemExit raised: shutting down bus')
318 self.exit()
319 raise
320
321
322
323
324
325
326 self.log("Waiting for child threads to terminate...")
327 for t in threading.enumerate():
328
329
330
331
332 if (
333 t != threading.currentThread() and
334 t.isAlive() and
335 not isinstance(t, threading._MainThread)
336 ):
337
338 if hasattr(threading.Thread, "daemon"):
339
340 d = t.daemon
341 else:
342 d = t.isDaemon()
343 if not d:
344 self.log("Waiting for thread %s." % t.getName())
345 t.join()
346
347 if self.execv:
348 self._do_execv()
349
350 - def wait(self, state, interval=0.1, channel=None):
351 """Poll for the given state(s) at intervals; publish to channel."""
352 if isinstance(state, (tuple, list)):
353 states = state
354 else:
355 states = [state]
356
357 def _wait():
358 while self.state not in states:
359 time.sleep(interval)
360 self.publish(channel)
361
362
363
364
365
366
367
368 try:
369 sys.modules['psyco'].cannotcompile(_wait)
370 except (KeyError, AttributeError):
371 pass
372
373 _wait()
374
376 """Re-execute the current process.
377
378 This must be called from the main thread, because certain platforms
379 (OS X) don't allow execv to be called in a child thread very well.
380 """
381 args = sys.argv[:]
382 self.log('Re-spawning %s' % ' '.join(args))
383
384 if sys.platform[:4] == 'java':
385 from _systemrestart import SystemRestart
386 raise SystemRestart
387 else:
388 args.insert(0, sys.executable)
389 if sys.platform == 'win32':
390 args = ['"%s"' % arg for arg in args]
391
392 os.chdir(_startup_cwd)
393 if self.max_cloexec_files:
394 self._set_cloexec()
395 os.execv(sys.executable, args)
396
398 """Set the CLOEXEC flag on all open files (except stdin/out/err).
399
400 If self.max_cloexec_files is an integer (the default), then on
401 platforms which support it, it represents the max open files setting
402 for the operating system. This function will be called just before
403 the process is restarted via os.execv() to prevent open files
404 from persisting into the new process.
405
406 Set self.max_cloexec_files to 0 to disable this behavior.
407 """
408 for fd in range(3, self.max_cloexec_files):
409 try:
410 flags = fcntl.fcntl(fd, fcntl.F_GETFD)
411 except IOError:
412 continue
413 fcntl.fcntl(fd, fcntl.F_SETFD, flags | fcntl.FD_CLOEXEC)
414
416 """Stop all services."""
417 self.state = states.STOPPING
418 self.log('Bus STOPPING')
419 self.publish('stop')
420 self.state = states.STOPPED
421 self.log('Bus STOPPED')
422
424 """Start 'func' in a new thread T, then start self (and return T)."""
425 if args is None:
426 args = ()
427 if kwargs is None:
428 kwargs = {}
429 args = (func,) + args
430
431 def _callback(func, *a, **kw):
432 self.wait(states.STARTED)
433 func(*a, **kw)
434 t = threading.Thread(target=_callback, args=args, kwargs=kwargs)
435 t.setName('Bus Callback ' + t.getName())
436 t.start()
437
438 self.start()
439
440 return t
441
442 - def log(self, msg="", level=20, traceback=False):
443 """Log the given message. Append the last traceback if requested."""
444 if traceback:
445 msg += "\n" + "".join(_traceback.format_exception(*sys.exc_info()))
446 self.publish('log', msg, level)
447
448 bus = Bus()
449