# 2010 april 28 awinter # libfcgi-test.py import sys,time,threading sys.path.append('release') import libfcgi nreqs = 0 shutting_down = False class stats(threading.Thread): def run(self): global nreqs nsleep = 0 while 1: time.sleep(0.1) nsleep += 0.1 if nsleep > 4 and nreqs > 0: print "%.2f rps (%.1fs)" % (nreqs/4.0,nsleep) nreqs = 0 nsleep = 0 if shutting_down: break class req(threading.Thread): def run(self): global nreqs req = libfcgi.fcgi_request() while 1: ret = req.accept() if ret < 0: break # print "ok",req.env['REQUEST_URI'],self.ident nreqs += 1 outstr = "Content-type: text/html\r\n\r\nfcgi test suitepython libfcgi is active\r\n\r\n" req.write(outstr) def test_memstatus(): pass portno = 9000 libfcgi.init_socket(portno,10) NTHREADS = 5 threads = [req() for i in range(NTHREADS)] for t in threads: t.start() print "PYTHON FCGI: %i THREADS ON PORT %i" % (NTHREADS,portno) STAT = stats() STAT.start() while 1: try: time.sleep(0.1) if threading.active_count() < NTHREADS: for i in range(len(threads)): if not threads[i].is_alive(): threads[i] = req() threads[i].start() print "respawned a downed thread" except KeyboardInterrupt: print "ctrl+c, shutting down" libfcgi.shutdown_pending() shutting_down = True break