def resolve_env_to_prefix(name_or_prefix):
"""Convert an env name or path into a canonical prefix path.
Returns:
Absolute path of prefix or None if it isn't found.
"""
if os.path.isabs(name_or_prefix):
return name_or_prefix
json = info()
root_prefix = json.get('root_prefix', None)
if name_or_prefix == 'root':
return root_prefix
envs = json.get('envs', [])
for prefix in envs:
if os.path.basename(prefix) == name_or_prefix:
return prefix
return None
python类get()的实例源码
def parse_spec(spec):
"""Parse a package name and version spec as conda would.
Returns:
``ParsedSpec`` or None on failure
"""
m = _spec_pat.match(spec)
if m is None:
return None
pip_constraint = m.group('pc')
if pip_constraint is not None:
pip_constraint = pip_constraint.replace(' ', '')
return ParsedSpec(name=m.group('name').lower(), conda_constraint=m.group('cc'), pip_constraint=pip_constraint)
# these are in order of preference. On pre-4.1.4 Windows,
# CONDA_PREFIX and CONDA_ENV_PATH aren't set, so we get to
# CONDA_DEFAULT_ENV.
def environ_set_prefix(environ, prefix, varname=conda_prefix_variable()):
prefix = os.path.normpath(prefix)
environ[varname] = prefix
if varname != 'CONDA_DEFAULT_ENV':
# This case matters on both Unix and Windows
# with conda >= 4.1.4 since requirement.env_var
# is CONDA_PREFIX, and matters on Unix only pre-4.1.4
# when requirement.env_var is CONDA_ENV_PATH.
global _envs_dirs
global _root_dir
if _envs_dirs is None:
i = info()
_envs_dirs = [os.path.normpath(d) for d in i.get('envs_dirs', [])]
_root_dir = os.path.normpath(i.get('root_prefix'))
if prefix == _root_dir:
name = 'root'
else:
for d in _envs_dirs:
name = subdirectory_relative_to_directory(prefix, d)
if name != prefix:
break
environ['CONDA_DEFAULT_ENV'] = name
def node_byo():
token = ""
try:
if dockercloud.namespace:
json = dockercloud.api.http.send_request("POST", "api/infra/%s/%s/token" %
(API_VERSION, dockercloud.namespace))
else:
json = dockercloud.api.http.send_request("POST", "api/infra/%s/token" % API_VERSION)
if json:
token = json.get("token", "")
except Exception as e:
print(e, file=sys.stderr)
sys.exit(EXCEPTION_EXIT_CODE)
print("Docker Cloud lets you use your own servers as nodes to run containers. "
"For this you have to install our agent.")
print("Run the following command on your server:")
print()
print("\tcurl -Ls https://get.cloud.docker.com/ | sudo -H sh -s", token)
print()
def record_sync_status(total, status, message, data, sync_type):
delete_count = 'null'
worn_clues_list = []
update_count = 'null'
save_count = 'null'
if data:
delete_count = data.get("deleteNum")
worn_clues_list = data.get("wrongIdList")
update_count = data.get("updateNum")
save_count = data.get("saveNum")
sql = "insert into TAB_IOPM_CLUES_SYNC_INFO (id, total, status, message, delete_count, update_count, save_count, worn_clues_list, sync_type) values (%s, %s, %s, '%s', %s, %s, %s, '%s', %s) "%('seq_iopm_sync.nextval', total, status, message, delete_count, update_count, save_count, ','.join(worn_clues_list), sync_type)
print(sql)
return db.add(sql)
CoordinateTransferService.py 文件源码
项目:ugc.aggregator
作者: Dreamcatcher-GIS
项目源码
文件源码
阅读 30
收藏 0
点赞 0
评论 0
def geocode(address):
"""
????geocoding????????????
:param address:???????
:return:
"""
geocoding = {'s': 'rsv3',
'key': key,
'city': '??',
'address': address}
res = requests.get(
"http://restapi.amap.com/v3/geocode/geo", params=geocoding)
if res.status_code == 200:
json = res.json()
status = json.get('status')
count = json.get('count')
if status == '1' and int(count) >= 1:
geocodes = json.get('geocodes')[0]
lng = float(geocodes.get('location').split(',')[0])
lat = float(geocodes.get('location').split(',')[1])
return [lng, lat]
else:
return None
else:
return None
def refresh_client(self, from_dt=None, to_dt=None):
"""
Refreshes the CalendarService endpoint, ensuring that the
event data is up-to-date. If no 'from_dt' or 'to_dt' datetimes
have been given, the range becomes this month.
"""
today = datetime.today()
first_day, last_day = monthrange(today.year, today.month)
if not from_dt:
from_dt = datetime(today.year, today.month, first_day)
if not to_dt:
to_dt = datetime(today.year, today.month, last_day)
params = dict(self.params)
params.update({
'lang': 'en-us',
'usertz': get_localzone().zone,
'startDate': from_dt.strftime('%Y-%m-%d'),
'endDate': to_dt.strftime('%Y-%m-%d')
})
req = self.session.get(self._calendar_refresh_url, params=params)
self.response = req.json()
def lights():
"""
example_lights_json = {
'rooms': [
{'name': 'Living Room', 'on': True},
]
}
"""
if is_local_request(flask.request):
json = flask.request.get_json()
rooms = json.get('rooms', [])
logger.info('Switching rooms %s', rooms)
for json_room in rooms:
room_name = json_room['name']
on_state = json_room['on']
for room in ROOMS:
# Strip whitespace
if room.name == room_name:
logger.info('Switching room %s', room.name)
room.switch(on_state)
return "Light commands sent."
else:
logger.info('Lights accessed by remote address %s', flask.request.environ['REMOTE_ADDR'])
flask.abort(404)
coordTransform_utils.py 文件源码
项目:geojson-python-utils
作者: brandonxiang
项目源码
文件源码
阅读 42
收藏 0
点赞 0
评论 0
def geocode(address):
"""
????geocoding????????????
:param address:???????
:return:
"""
geocoding = {'s': 'rsv3',
'key': key,
'city': '??',
'address': address}
res = requests.get(
"http://restapi.amap.com/v3/geocode/geo", params=geocoding)
if res.status_code == 200:
json = res.json()
status = json.get('status')
count = json.get('count')
if status == '1' and int(count) >= 1:
geocodes = json.get('geocodes')[0]
lng = float(geocodes.get('location').split(',')[0])
lat = float(geocodes.get('location').split(',')[1])
return [lng, lat]
else:
return None
else:
return None
def check_authorization(self, access_token):
"""Check an authorization created by a registered application.
OAuth applications can use this method to check token validity
without hitting normal rate limits because of failed login attempts.
If the token is valid, it will return True, otherwise it will return
False.
:returns: bool
"""
p = self.session.params
auth = (p.get('client_id'), p.get('client_secret'))
if access_token and auth:
url = self._build_url('applications', str(auth[0]), 'tokens',
str(access_token))
resp = self._get(url, auth=auth, params={
'client_id': None, 'client_secret': None
})
return self._boolean(resp, 200, 404)
return False
def resolve_env_to_prefix(name_or_prefix):
"""Convert an env name or path into a canonical prefix path.
Returns:
Absolute path of prefix or None if it isn't found.
"""
if os.path.isabs(name_or_prefix):
return name_or_prefix
json = info()
root_prefix = json.get('root_prefix', None)
if name_or_prefix == 'root':
return root_prefix
envs = json.get('envs', [])
for prefix in envs:
if os.path.basename(prefix) == name_or_prefix:
return prefix
return None
def environ_set_prefix(environ, prefix, varname=conda_prefix_variable()):
prefix = os.path.normpath(prefix)
environ[varname] = prefix
if varname != 'CONDA_DEFAULT_ENV':
# This case matters on both Unix and Windows
# with conda >= 4.1.4 since requirement.env_var
# is CONDA_PREFIX, and matters on Unix only pre-4.1.4
# when requirement.env_var is CONDA_ENV_PATH.
global _envs_dirs
global _root_dir
if _envs_dirs is None:
i = info()
_envs_dirs = [os.path.normpath(d) for d in i.get('envs_dirs', [])]
_root_dir = os.path.normpath(i.get('root_prefix'))
if prefix == _root_dir:
name = 'root'
else:
for d in _envs_dirs:
name = subdirectory_relative_to_directory(prefix, d)
if name != prefix:
break
environ['CONDA_DEFAULT_ENV'] = name
# This isn't all (e.g. leaves out arm, power). it's sort of "all
# that people typically publish for"
def get(self, object_name, path=None, params=None, retries=0):
"""
Makes a GET request to the API. Checks for invalid requests that raise PardotAPIErrors. If the API key is
invalid, one re-authentication request is made, in case the key has simply expired. If no errors are raised,
returns either the JSON response, or if no JSON was returned, returns the HTTP response status code.
"""
if params is None:
params = {}
params.update({'user_key': self.user_key, 'api_key': self.api_key, 'format': 'json'})
try:
self._check_auth(object_name=object_name)
request = requests.get(self._full_path(object_name, path), params=params)
response = self._check_response(request)
return response
except PardotAPIError as err:
if err.message == 'Invalid API key or user key':
response = self._handle_expired_api_key(err, retries, 'get', object_name, path, params)
return response
else:
raise err
def environ_get_prefix(environ):
for name in _all_prefix_variables:
if name in environ:
return environ.get(name)
return None
def request(self, *args, **kwargs):
response = super(Session, self).request(*args, **kwargs)
content_type = response.headers.get('Content-Type', '').split(';')[0]
json_mimetypes = ['application/json', 'text/json']
if content_type not in json_mimetypes:
return response
try:
json = response.json()
except:
return response
reason = json.get('errorMessage') or json.get('reason')
if not reason and isinstance(json.get('error'), six.string_types):
reason = json.get('error')
if not reason and not response.ok:
reason = response.reason
if not reason and json.get('error'):
reason = "Unknown reason"
code = json.get('errorCode')
if reason:
raise Exception
return response
def _get_cookiejar_path(self):
# Get path for cookiejar file
return os.path.join(
self._cookie_directory,
''.join([c for c in self.user.get('apple_id') if match(r'\w', c)])
)
def __unicode__(self):
return 'iCloud API: %s' % self.user.get('apple_id')
def get_event_detail(self, pguid, guid):
"""
Fetches a single event's details by specifying a pguid
(a calendar) and a guid (an event's ID).
"""
params = dict(self.params)
params.update({'lang': 'en-us', 'usertz': get_localzone().zone})
url = '%s/%s/%s' % (self._calendar_event_detail_url, pguid, guid)
req = self.session.get(url, params=params)
self.response = req.json()
return self.response['Event'][0]
def party():
if not is_local_request(flask.request):
flask.abort(404)
global party_process
json = flask.request.get_json()
enabled = json.get('enabled', False)
logger.info('Got party state %r' % enabled)
if enabled and not env.is_party_mode():
# Start party XD
party_process = Popen(["python3", "./animate_web.py", "run"]) # async
env.set_party_mode(True)
env.set_motion_enabled(False)
elif not enabled and env.is_party_mode():
# Stop party :(
env.set_party_mode(False)
env.set_motion_enabled(True)
if party_process is not None:
party_process.kill()
party_process = None
# Return lights to circadian color
command = copy.deepcopy(COMMAND_FULL_ON)
circadian_color = get_current_circadian_color(date=get_local_time())
command_all_lights(circadian_color.apply_to_command(command))
return "Party mode is now %r" % enabled
def guest_mode():
if is_local_request(flask.request):
json = flask.request.get_json()
enabled = json.get('enabled', False)
env.set_guest_mode(enabled)
return "Guest mode is now %r" % enabled
else:
logger.info('Guest Mode accessed by remote address %s', flask.request.environ['REMOTE_ADDR'])
flask.abort(404)
def motion_mode():
if is_local_request(flask.request):
json = flask.request.get_json()
enabled = json.get('enabled', False)
env.set_motion_enabled(enabled)
return "Motion mode is now %r" % enabled
else:
logger.info('Motion Mode accessed by remote address %s', flask.request.environ['REMOTE_ADDR'])
flask.abort(404)
def vacation_mode():
if is_local_request(flask.request):
json = flask.request.get_json()
enabled = json.get('enabled', False)
env.set_vacation_mode(enabled)
return "Vacation mode is now %r" % enabled
else:
logger.info('Vacation Mode accessed by remote address %s', flask.request.environ['REMOTE_ADDR'])
flask.abort(404)
def get_eset_sig_from_scan(scans):
if scans is None:
return None
for scan in scans:
if scan.get('name') in ['ESET-NOD32', 'NOD32', 'NOD32v2']:
return scan.get('result')
return ''
def key_dict_clean(json):
if json is None:
return None
array = []
for key in json.keys():
tmp_dict = json.get(key)
tmp_dict["name"] = key
array.append(tmp_dict)
return array
# Replace dot with _
# in dictionaries keys
# in order to save them in mongo
def key_list_clean(json):
if json is None:
return None
array = []
for key in json.keys():
tmp_dict = {}
tmp_dict["name"] = key
tmp_dict["values"] = json.get(key)
array.append(tmp_dict)
return array
def jsonize(data):
return json.dumps(data, sort_keys=False, indent=4)
# Checks if the meta has a date. If it doesn't
# it updates it. If a date is found, the oldest
# date will get saved.
def change_date_to_str(res):
if res is None:
return None
for date_key in ["date", "upload_date", "date_start", "date_end", "date_enqueued"]:
if res.get(date_key) is None:
pass
else:
res[date_key] = str(res.get(date_key))
return res
def add_error(resp_dict, error_code, error_message):
if type(resp_dict) != dict:
return resp_dict
if resp_dict.get('errors') is None:
resp_dict["errors"] = []
resp_dict["errors"].append({"code": error_code, "message": error_message})
return resp_dict
def cursor_to_dict(f1, retrieve):
results = []
for f in f1:
results.append(f)
ret = []
for a in results:
dic = {}
for key in retrieve.keys():
steps = key.split('.')
partial_res = a
for step in steps:
partial_res = partial_res.get(step)
if partial_res is None:
break
if isinstance(partial_res, list):
partial_res = None
break
legend_to_show = key.split('.')[-1]
if (legend_to_show == "file_id"):
legend_to_show = "sha1"
if (legend_to_show == "TimeDateStamp" and partial_res is not None):
partial_res = time.strftime(
"%Y-%m-%d %H:%M:%S", time.gmtime(int(eval(partial_res), 16)))
if (legend_to_show == "timeDateStamp" and partial_res is not None):
partial_res = time.strftime(
"%Y-%m-%d %H:%M:%S", time.gmtime(partial_res))
dic[legend_to_show] = partial_res
ret.append(dic)
return ret
# ****************TEST_CODE******************
def get_required_param(json, param):
if json is None:
logger.info("Request is not a valid json")
raise InvalidUsage("Request is not a valid json")
value = json.get(param, None)
if (value is None) or (value=='') or (value==[]):
logger.info("A required request parameter '{}' had value {}".format(param, value))
raise InvalidUsage("A required request parameter '{}' was not provided".format(param))
return value