def one_request(port):
"""
Listen for one http request on port, then close and return request query
args:
port (int): the port to listen for the request
returns:
str: the request
"""
logger.info("listening for a request on port {}...".format(port))
class RequestHandler(BaseHTTPRequestHandler):
def do_GET(self):
self.send_response(200)
self.send_header("Content-type", "text/html")
self.end_headers()
self.wfile.write(landing_page.encode('utf-8'))
self.server.path = self.path
httpd = HTTPServer(('', port), RequestHandler)
httpd.handle_request()
httpd.server_close()
parsed = urlparse(httpd.path)
logger.info("received a request {}".format(httpd.path))
return parse_qs(parsed.query)
python类HTTPServer()的实例源码
def main(args):
global HTTPD, CREDENTIALS
if args:
load_settings(args[0])
print("Starting server")
server_address = (LISTENIP, LISTENPORT)
if CREDENTIALS:
CREDENTIALS = base64.b64encode(bytes(CREDENTIALS, "utf-8"))
Handler = AuthHandler
else:
Handler = RequestHandler
if not SSL_CERTIFICATE:
HTTPD = HTTPServer(server_address, Handler)
else:
HTTPD = socketserver.TCPServer(server_address, Handler)
HTTPD.socket = ssl.wrap_socket(HTTPD.socket,
certfile=SSL_CERTIFICATE,
keyfile=SSL_KEY,
server_side=True)
print('Listening on: %s://%s:%i' % ('https' if SSL_CERTIFICATE else 'http',
LISTENIP,
LISTENPORT))
if BASEPATH:
os.chdir(BASEPATH)
HTTPD.serve_forever()
def main(port=8000):
thisdir = os.path.dirname(os.path.realpath(__file__))
rootdir = os.path.realpath(os.path.join(thisdir, os.pardir, os.pardir))
subdir = SubRepo.get_path()
ctf = os.path.basename(rootdir)
submissions = os.path.basename(Settings.submissions_project)
routes = [
('/%s' % ctf, rootdir),
('/%s' % submissions, subdir),
]
forbidden = {
LocalSettings.path(),
TeamSecrets.path(),
}
HandlerClass = handler(routes, '/%s' % ctf, forbidden)
server_address = ('localhost', port)
httpd = HTTPServer(server_address, HandlerClass)
sa = httpd.socket.getsockname()
print("Serving HTTP on", sa[0], "port", sa[1], "...")
httpd.serve_forever()
def _create_healthcheck(self, port):
class HealthcheckHTTPRequestHandler(BaseHTTPRequestHandler):
def do_GET(self):
health = {'status': 'ok'}
self.send_response(200)
self.send_header('Content-Type', 'application/json')
self.end_headers()
self.wfile.write(bytes(json.dumps(health), 'UTF-8'))
def log_message(self, format, *args):
logger.debug("Healthcheck from %s %s" % (self.address_string(), format % args))
self.healthcheck_http_server = HTTPServer(('', port), HealthcheckHTTPRequestHandler)
healthcheck_thread = Thread(target=self._run_healthcheck_thread,
name='healthcheck', args=(), daemon=True)
healthcheck_thread.start()
def main():
global REDFISH_MOCKUP_FILES
args = parse_args()
if not os.path.exists(args.mockup_files):
print('Mockup files %s not found' % args.mockup_files)
sys.exit(1)
REDFISH_MOCKUP_FILES = os.path.realpath(args.mockup_files)
httpd = http_server.HTTPServer(('', args.port), RequestHandler)
if args.ssl_certificate and args.ssl_key:
httpd.socket = ssl.wrap_socket(
httpd.socket, keyfile=args.ssl_key,
certfile=args.ssl_certificate, server_side=True)
httpd.serve_forever()
def start():
global localHTTP, zeroconf, info, httpthread
ip = get_local_address()
logging.info("Local IP is " + ip)
desc = {'version': '0.1'}
info = ServiceInfo("_http._tcp.local.",
"Alexa Device._http._tcp.local.",
socket.inet_aton(ip), alexa_params.LOCAL_PORT, 0, 0,
desc, alexa_params.LOCAL_HOST + ".")
zeroconf = Zeroconf()
zeroconf.registerService(info)
logging.info("Local mDNS is started, domain is " + alexa_params.LOCAL_HOST)
localHTTP = HTTPServer(("", alexa_params.LOCAL_PORT), alexa_http_config.AlexaConfig)
httpthread = threading.Thread(target=localHTTP.serve_forever)
httpthread.start()
logging.info("Local HTTP is " + alexa_params.BASE_URL)
alexa_control.start()
def __init__(self, grenouille_bot):
"""Create a HTTP server to manage commands
Args:
grenouille_bot: master class of the module.
"""
Thread.__init__(self)
config = grenouille_bot.config['DEFAULT']
self.port = int(config['webserverport'])
self.server_address = ('127.0.0.1', self.port)
self.httpd = HTTPServer(self.server_address, HTTPServer_RequestHandler)
config = grenouille_bot.config['DEFAULT']
self.httpd.grenouille_bot = grenouille_bot
self.httpd.secret = config['grenouille_api_key']
def main(args):
global HTTPD, CREDENTIALS
if args:
load_settings(args[0])
LOG.info("Starting server")
server_address = (LISTENIP, LISTENPORT)
if CREDENTIALS:
CREDENTIALS = base64.b64encode(bytes(CREDENTIALS, "utf-8"))
Handler = AuthHandler
else:
Handler = RequestHandler
if not SSL_CERTIFICATE:
HTTPD = HTTPServer(server_address, Handler)
else:
HTTPD = socketserver.TCPServer(server_address, Handler)
HTTPD.socket = ssl.wrap_socket(HTTPD.socket,
certfile=SSL_CERTIFICATE,
keyfile=SSL_KEY,
server_side=True)
LOG.info('Listening on: %s://%s:%i' % ('https' if SSL_CERTIFICATE else 'http',
LISTENIP,
LISTENPORT))
if BASEPATH:
os.chdir(BASEPATH)
HTTPD.serve_forever()
def run(self):
def on_ready_callback(parts):
logger.debug(
'On ready called with parts %s, calling my callback', parts
)
self.ready_callback(parts)
logger.debug('Request finished')
logger.debug(
'Starting http server on %s:%s' % (HTTP_ADDRESS, HTTP_PORT)
)
try:
self.server = HTTPServer(
(
HTTP_ADDRESS,
HTTP_PORT
),
get_handler(on_ready_callback))
except OSError:
logger.exception('Could not bind to address')
self.started_event.set()
else:
self.started_event.set()
logger.debug('Serving forever...')
self.server.serve_forever()
logger.debug('Server has shut down')
def do_POST(self): # noqa: ignore=N802
"""Mini service router handling POST requests"""
if self.path == '/your_ip':
try:
self._handle_path_your_ip()
except RequestProcessingException as e:
logging.error("Request processing exception occured: "
"code: {}, reason: '{}', explanation: '{}'".format(
e.code, e.reason, e.explanation))
self.send_error(e.code, e.reason, e.explanation)
elif self.path == '/signal_test_cache':
self._handle_path_signal_test_cache(True)
elif self.path == '/run_cmd':
self._handle_path_run_cmd()
else:
self.send_error(404, 'Not found', 'Endpoint is not supported')
# We use a HTTPServer with the ThreadingMixIn in order to handle stalled HTTP requess. Without this, the behaviour
# is to handle requests serially. If a connection is severed, this can result in the server hanging for the TCP
# timeout before answering another requests...leading to failures.
def main(args):
global HTTPD, CREDENTIALS
if args:
load_settings(args[0])
LOG.info("Starting server")
server_address = (LISTENIP, LISTENPORT)
if CREDENTIALS:
CREDENTIALS = base64.b64encode(bytes(CREDENTIALS, "utf-8"))
Handler = AuthHandler
else:
Handler = RequestHandler
if not SSL_CERTIFICATE:
HTTPD = HTTPServer(server_address, Handler)
else:
HTTPD = socketserver.TCPServer(server_address, Handler)
HTTPD.socket = ssl.wrap_socket(HTTPD.socket,
certfile=SSL_CERTIFICATE,
keyfile=SSL_KEY,
server_side=True)
LOG.info('Listening on: %s://%s:%i' % ('https' if SSL_CERTIFICATE else 'http',
LISTENIP,
LISTENPORT))
if BASEPATH:
os.chdir(BASEPATH)
HTTPD.serve_forever()
def receive_code(self, port, final_redirect=None):
class RequestHandler(BaseHTTPRequestHandler):
def do_GET(self):
try:
query = urlparse(self.path).query
query = parse_qs(query)
self.code = query['code'][0]
except Exception as e:
self.send_response(500)
self.code = None
else:
if final_redirect:
self.send_response(302)
self.send_header("Location", final_redirect)
else:
self.send_response(200)
finally:
self.end_headers()
address = ('localhost', port)
server = HTTPServer(address, RequestHandler)
request, client_address = server.get_request()
code = RequestHandler(request, client_address, server).code
return code
def start_server():
# avoid 8080, as the router may have service on it.
# Firewall rules will need to be changed in the router
# to allow access on this port.
server_address = ('', 9001)
cs.CSClient().log(APP_NAME, "Starting Server: {}".format(server_address))
cs.CSClient().log(APP_NAME, "Web Message is: {}".format(WEB_MESSAGE))
httpd = HTTPServer(server_address, WebServerRequestHandler)
# Use the line below to serve the index.html page that is in the
# app directory.
# httpd = HTTPServer(server_address, SimpleHTTPRequestHandler)
try:
httpd.serve_forever()
except KeyboardInterrupt:
cs.CSClient().log(APP_NAME, "Stopping Server, Key Board interrupt")
return 0
def get_authorization_code(self, url):
if input(i18n_use_browser).lower().startswith("y"):
httpServer = HTTPServer((HOST, PORT),
lambda request, address, server: HTTPServerHandler(request, address, server))
log.debug(i18n_oauth_server, HOST, PORT)
open_new(url)
log.debug(i18n_oauth_server_url, url)
httpServer.handle_request()
log.debug(i18n_oauth_server_cback, httpServer.callback)
return httpServer.callback
else:
print(i18n_open_url + url)
return input(i18n_insert_url)
def run(server_class=HTTPServer, handler_class=BaseHTTPRequestHandler):
server_address = ('', 8000)
httpd = server_class(server_address, handler_class)
httpd.serve_forever()
# RELAY.on() activate relay
# Import JSON config file as a dictionary for future reference by ID and IP.
# Thread: Listen for signal from thermostat. When received, use the function
# below and pass it the ID and IP.
# Function: send ID, IP, and delay to the server.
# Thread: Listen for activate command from server. When received, open relay.
def test_pipeline():
'''Tests a few pipelines.'''
global progresses
server = HTTPServer(('', 9000), SaveHooks)
thread = threading.Thread(target = server.serve_forever, daemon=True)
thread.start()
for spec in pipelines():
if spec.pipeline_id.startswith('./tests/env/dummy/pipeline-test'):
eid = gen_execution_id()
status.get(spec.pipeline_id).queue_execution(eid, 'manual')
success, _, _ = execute_pipeline(spec, eid, use_cache=False)
assert success
assert len(called_hooks) == 3
assert called_hooks == [
{"pipeline_id": "./tests/env/dummy/pipeline-test-hooks", "event": "queue"},
{"pipeline_id": "./tests/env/dummy/pipeline-test-hooks", "event": "start"},
{"pipeline_id": "./tests/env/dummy/pipeline-test-hooks", "event": "finish", "success": True,
'stats': {'.dpp': {'out-datapackage-url': 'hooks-outputs/datapackage.json'},
'bytes': 258, 'count_of_rows': None,
'dataset_name': 'hook-tests', 'hash': 'f3f25f5ecd8e7e2c35d83139178072b8'}}
]
assert progresses >= 1
def __init__(self, computation, host='', port=8181, poll_sec=10, DocumentRoot=None,
keyfile=None, certfile=None, show_task_args=True):
self._lock = threading.Lock()
if not DocumentRoot:
DocumentRoot = os.path.join(os.path.dirname(__file__), 'data')
self._nodes = {}
self._updates = {}
if poll_sec < 1:
pycos.logger.warning('invalid poll_sec value %s; it must be at least 1', poll_sec)
poll_sec = 1
self._poll_sec = poll_sec
self._show_args = bool(show_task_args)
self._server = BaseHTTPServer.HTTPServer((host, port), lambda *args:
HTTPServer._HTTPRequestHandler(self, DocumentRoot, *args))
if certfile:
self._server.socket = ssl.wrap_socket(self._server.socket, keyfile=keyfile,
certfile=certfile, server_side=True)
self._httpd_thread = threading.Thread(target=self._server.serve_forever)
self._httpd_thread.daemon = True
self._httpd_thread.start()
self.computation = computation
self.status_task = pycos.Task(self.status_proc)
if computation.status_task:
client_task = computation.status_task
def chain_msgs(task=None):
task.set_daemon()
while 1:
msg = yield task.receive()
self.status_task.send(msg)
client_task.send(msg)
computation.status_task = pycos.Task(chain_msgs)
else:
computation.status_task = self.status_task
pycos.logger.info('Started HTTP%s server at %s',
's' if certfile else '', str(self._server.socket.getsockname()))
def __init__(self, computation, host='', port=8181, poll_sec=10, DocumentRoot=None,
keyfile=None, certfile=None, show_task_args=True):
self._lock = threading.Lock()
if not DocumentRoot:
DocumentRoot = os.path.join(os.path.dirname(__file__), 'data')
self._nodes = {}
self._updates = {}
if poll_sec < 1:
pycos.logger.warning('invalid poll_sec value %s; it must be at least 1', poll_sec)
poll_sec = 1
self._poll_sec = poll_sec
self._show_args = bool(show_task_args)
self._server = BaseHTTPServer.HTTPServer((host, port), lambda *args:
HTTPServer._HTTPRequestHandler(self, DocumentRoot, *args))
if certfile:
self._server.socket = ssl.wrap_socket(self._server.socket, keyfile=keyfile,
certfile=certfile, server_side=True)
self._httpd_thread = threading.Thread(target=self._server.serve_forever)
self._httpd_thread.daemon = True
self._httpd_thread.start()
self.computation = computation
self.status_task = pycos.Task(self.status_proc)
if computation.status_task:
client_task = computation.status_task
def chain_msgs(task=None):
task.set_daemon()
while 1:
msg = yield task.receive()
self.status_task.send(msg)
client_task.send(msg)
computation.status_task = pycos.Task(chain_msgs)
else:
computation.status_task = self.status_task
pycos.logger.info('Started HTTP%s server at %s',
's' if certfile else '', str(self._server.socket.getsockname()))
def listen(self):
server = HTTPServer(('', self.handler_class.http_port), self.handler_class)
server.serve_forever()
def setUpClass(cls):
"""Create and serve a fake HTTP server."""
cls.httpd = HTTPServer(cls.server_address, cls.handler)
cls.running_thread = threading.Thread(target=cls.httpd.serve_forever)
cls.running_thread.start()
def __init__(self, port, delay = None, handler_func = None):
self.port = port
RequestHandler.delay = delay
RequestHandler.handler_func = [handler_func]
RequestHandler.response_data = '{}'
RequestHandler.response_code = 200
threading.Thread.__init__(self)
self.server = HTTPServer(('localhost', self.port), RequestHandler)
def config_sample():
return '''
# HTTP Server that exposes sauna status
# as a REST API or a web dashboard
- type: HTTPServer
port: 8080
data_type: json # Can be json or html
'''
def simple_http_server(handler_class, server_class=HTTPServer, app=None, bind=('', 0)):
server = server_class(bind, handler_class)
if app:
assert issubclass(server_class, WSGIServer)
server.set_app(app)
thread = threading.Thread(target=server.serve_forever, name='sa-media test server.', daemon=True)
thread.start()
yield server
server.shutdown()
thread.join()
def _run(self):
def cb(s):
msg = self._input_mapper(s)
self._manager.add_msg(msg)
def handle(*args):
WebhookListener(cb, *args)
self._server = HTTPServer(('', self._input_port), handle)
self._server.serve_forever()
def cleanup(self):
logger.debug("Cleaning up Magic Mirror platform")
self.httpd.shutdown()
self.shutdown = True
# Subclass HTTPServer with additional callback
def start_mock_server(port):
mock_server = HTTPServer(('127.0.0.1', port), MockServerRequestHandler)
mock_server_thread = Thread(target=mock_server.serve_forever)
mock_server_thread.setDaemon(True)
mock_server_thread.start()
def start(self):
'''
Starts a server on the port provided in the :class:`Server` constructor
in a separate thread
:rtype: Server
:returns: server instance for chaining
'''
self._handler = _create_handler_class(self._rules, self._always_rules)
self._server = HTTPServer(('', self._port), self._handler)
self._thread = Thread(target=self._server.serve_forever, daemon=True)
self._thread.start()
self.running = True
return self
def setUpClass(cls):
# Configure mock server.
cls.mock_server_port = get_free_port()
cls.mock_server = HTTPServer(('localhost', cls.mock_server_port), MockServerRequestHandler)
# Start running mock server in a separate thread.
# Daemon threads automatically shut down when the main process exits.
cls.mock_server_thread = Thread(target=cls.mock_server.serve_forever)
cls.mock_server_thread.setDaemon(True)
cls.mock_server_thread.start()
cls.json_url = 'http://localhost:{port}/'.format(port=cls.mock_server_port)
def setUpClass(cls):
# Configure mock server.
cls.mock_server_port = REDIRECT_URI_PORT
cls.mock_server = HTTPServer(('localhost', cls.mock_server_port), MockServerRequestHandler)
# Start running mock server in a separate thread.
# Daemon threads automatically shut down when the main process exits.
cls.mock_server_thread = Thread(target=cls.mock_server.serve_forever)
cls.mock_server_thread.setDaemon(True)
cls.mock_server_thread.start()
cls.json_url = 'http://localhost:{port}/'.format(port=cls.mock_server_port)
def run_server(host, port):
threading.Thread(target=scan_raumfeld).start()
threading.Thread(target=timed_action).start()
try:
server = HTTPServer((host, int(port)), RequestHandler)
server.serve_forever()
except Exception as e:
print("run_Server error:"+str(e))