def handle(self, *args, **kwargs):
"""
Generate config a file for the view-server, and send the view-server command to stdout.
The reason for sending the command to stdout instead of just running
it is so that supervisord can directly manage the resulting
process (otherwise we would have to handle passing signals through).
"""
from chroma_core.lib.util import site_dir
SITE_ROOT = site_dir()
VIEW_SERVER_DIR = os.path.join(SITE_ROOT, 'ui-modules', 'node_modules', '@iml', 'view-server', 'dist')
CONF = os.path.join(VIEW_SERVER_DIR, "conf.json")
conf = {
"ALLOW_ANONYMOUS_READ": settings.ALLOW_ANONYMOUS_READ,
"BUILD": settings.BUILD,
"IS_RELEASE": settings.IS_RELEASE,
"LOG_PATH": settings.LOG_PATH,
"SERVER_HTTP_URL": settings.SERVER_HTTP_URL,
"SITE_ROOT": settings.SITE_ROOT,
"VIEW_SERVER_PORT": settings.VIEW_SERVER_PORT,
"VERSION": settings.VERSION
}
json.dump(conf, open(CONF, 'w'), indent=2)
cmdline = ["node", VIEW_SERVER_DIR + '/bundle.js']
print " ".join(cmdline)
python类VERSION的实例源码
def cmdparse():
parser = argparse.ArgumentParser(usage='\n%(prog)s [options] [-u url|-f file.txt]'
'\n%(prog)s [options] --continue',
description='Yet Another Web Spider',
version=VERSION)
parser.add_argument('-u', '--url', dest='url',
help='Target url, if no tld, only urls in this subdomain')
parser.add_argument('-f', '--file', dest='file', type=open,
help='Load target from file')
parser.add_argument('--cookie-file', dest='cookie_file', metavar='FILE',
help='Cookie file from chrome export by EditThisCookie')
parser.add_argument('--tld', action='store_true', dest='tld',
help='Crawl all subdomains')
parser.add_argument('--continue', dest='keepon', action='store_true',
help='Continue last task, no init target [-u|-f] need')
worker = parser.add_argument_group(title='Worker', description='[optional] options for worker')
worker.add_argument('-c', '--consumer', metavar='N', type=int, default=5, dest='consumer',
help='Max number of consumer processes to run, default 5')
worker.add_argument('-p', '--producer', metavar='N', type=int, default=1, dest='producer',
help='Max number of producer processes to run, default 1')
db = parser.add_argument_group(title='Database', description='[optional] options for redis and mongodb')
db.add_argument('--mongo-db', metavar='STRING', dest='mongo_db', default=MongoConf.db,
help='Mongodb database name, default "tspider"')
db.add_argument('--redis-db', metavar='NUMBER', dest='redis_db', type=int, default=RedisConf.db,
help='Redis db index, default 0')
args = parser.parse_args()
if not any([args.url, args.file, args.keepon]):
parser.exit(parser.format_help())
return args
def __init__(self, app, hub, debug):
window_size = qt_core.QSize(800, 600)
BaseWebUI.__init__(self, index.html, app, hub, window_size, debug)
self.agent = '%s v.%s' % (USER_AGENT, '.'.join(str(v) for v in VERSION))
log("Starting [%s]..." % self.agent, LEVEL_INFO)
self.app = app
self.debug = debug
self.hub = hub
self.app.aboutToQuit.connect(self._handleAboutToQuit)
self.sumokoind_daemon_manager = None
self.wallet_cli_manager = None
self.wallet_rpc_manager = None
self.new_wallet_ui = None
self.wallet_info = WalletInfo(app)
# load app settings
self.app_settings = AppSettings()
self.app_settings.load()
## Blockchain height
self.target_height = self.app_settings.settings['blockchain']['height']
self.current_height = 0
def test_version(self):
host_info = self.mock_servers['mynewhost']
with timed('csr', 10):
data = {
'fqdn': host_info['fqdn'],
'nodename': host_info['nodename'],
'version': '1.0',
'capabilities': ['manage_targets'],
'address': 'mynewhost',
'csr': generate_csr(host_info['fqdn']),
}
with patch(settings, VERSION='2.0'):
# Try with a mis-matched version
token = RegistrationToken.objects.create(profile=ServerProfile.objects.get())
with timed('register fail', 10):
response = Client().post("/agent/register/%s/" % token.secret, data=json.dumps(data), content_type="application/json")
self.assertEqual(response.status_code, 400)
# Try with a matching version
token = RegistrationToken.objects.create(profile=ServerProfile.objects.get())
settings.VERSION = '1.1'
with timed('register pass', 10):
response = Client().post("/agent/register/%s/" % token.secret, data=json.dumps(data), content_type="application/json")
self.assertEqual(response.status_code, 201)
content = json.loads(response.content)
# reregistration should fail with unknown serial
data = {'address': 'mynewhost', 'fqdn': 'mynewhost.newcompany.com'}
headers = {'HTTP_X_SSL_CLIENT_NAME': host_info['fqdn'], 'HTTP_X_SSL_CLIENT_SERIAL': ''}
response = Client().post('/agent/reregister/', data=json.dumps(data), content_type='application/json', **headers)
self.assertEqual(response.status_code, 403)
# reregistration should update host's domain name
headers['HTTP_X_SSL_CLIENT_SERIAL'] = Crypto().get_serial(content['certificate'])
response = Client().post('/agent/reregister/', data=json.dumps(data), content_type='application/json', **headers)
self.assertEqual(response.status_code, 200)
host = ManagedHost.objects.get(id=content['host_id'])
self.assertEqual(host.fqdn, data['fqdn'])
# TOOD: reinstate selinux check, probably within the agent itself (it should fail
# its own registration step without even talking to the manager)
# def test_selinux_detection(self):
# """Test that a host with SELinux enabled fails setup."""
# MockAgentRpc.selinux_enabled = True
# try:
# import time
# host = self._create_host('myaddress')
# self.assertTrue(Command.objects.all().order_by("-id")[0].errored)
# self.assertState(host, 'unconfigured')
# finally:
# MockAgentRpc.selinux_enabled = False