def __init__(self, config_file=None, default_profile='default'):
self.default_profile = default_profile
self.config = {
'default-profile': self.default_profile,
'requirements': None,
'profiles': {}}
if config_file:
self.config_file = config_file
else:
self.config_file = os.path.join(
click.get_app_dir('datafs'), 'config.yml')
# Create default directory if it does not exist
# If using a user-supplied file, they are responsible for creating
# the directory.
if not os.path.isdir(os.path.dirname(self.config_file)):
os.makedirs(os.path.dirname(self.config_file))
python类get_app_dir()的实例源码
def load(self):
config_paths = [
os.path.expanduser('~/.hipcat.ini'),
os.path.join(click.get_app_dir(APP_NAME), 'config.ini'),
os.path.join(
click.get_app_dir(APP_NAME, force_posix=True), 'config.ini'
),
]
config = configparser.ConfigParser()
configs_found = config.read(config_paths)
if not configs_found:
raise ConfigurationError("""
Missing configuration!
You must provide configuration in one of the following places:
""" + "\n".join(' * {}'.format(path) for path in config_paths) + """
"""
)
self.config = config
return self
def main():
global cfgfile
init_logging()
cfgfile = os.path.join(click.get_app_dir("ecxcli"), 'config.ini')
cfgdir = os.path.dirname(cfgfile)
if not os.path.exists(cfgdir):
os.makedirs(cfgdir)
try:
cli()
except Exception as e:
logging.error(traceback.format_exc())
exctype, value = sys.exc_info()[:2]
click.secho(traceback.format_exception_only(exctype, value)[0], fg='red')
process_http_error(e)
def setup_assistant():
# Load credentials.
try:
credentials = os.path.join(
click.get_app_dir(common_settings.ASSISTANT_APP_NAME),
common_settings.ASSISTANT_CREDENTIALS_FILENAME
)
global creds
creds = auth_helpers.load_credentials(credentials, scopes=[common_settings.ASSISTANT_OAUTH_SCOPE, common_settings.PUBSUB_OAUTH_SCOPE])
except Exception as e:
logging.error('Error loading credentials: %s', e)
logging.error('Run auth_helpers to initialize new OAuth2 credentials.')
return -1
# Create gRPC channel
grpc_channel = auth_helpers.create_grpc_channel(ASSISTANT_API_ENDPOINT, creds)
logging.info('Connecting to %s', ASSISTANT_API_ENDPOINT)
# Create Google Assistant API gRPC client.
global assistant
assistant = embedded_assistant_pb2.EmbeddedAssistantStub(grpc_channel)
return 0
def __init__(self, app_name, mode='w+', cfg_name=None, box=None,
keyring=True, service_name=None, **kwargs):
args = (kwargs, Config.funct_args, Config.bad_kwds, Config.safe_kwds)
self.kwargs = group_kwargs_by_funct(*args)
self.box = box
mode = mode or ''
frozen = kwargs.get('frozen_box')
self.readable = 'r' in mode or '+' in mode and not frozen
self.writeable = 'w' in mode or '+' in mode
if self.readable or self.writeable:
cfg_name = cfg_name or Config.cfg_name
app_dir_kwargs = self.kwargs['get_app_dir']
self.filename = get_filename(app_name, cfg_name, **app_dir_kwargs)
self.keyring = keyring
if keyring:
service = (service_name or app_name) + '_' + get_user()
KeyringAttrDict.service = service
KeyringAttrDict.keyring = KeyringAttrDict.keyring or keyring_module
if keyring and keyring is not True:
KeyringAttrDict.set_keyring(keyring)
def write_config(self, fp=None):
if fp is None:
fp = os.path.join(click.get_app_dir('datafs'), 'config.yml')
with open_filelike(fp, 'w+') as f:
f.write(yaml.dump(self.config))
def main_group(ctx, name, daemon, mode, master, slave, etcd_host, quiet, verbose, config):
"""This is the command line interface to ScarlettOS.
"""
# NOTE: ctx
# Most public functions are actually methods of a 'context' object which
# is passed as the first parameter (ctx). The context object stores the
# precision, cached data, and a few other things. It also defines
# conversions so that the same high-level code can be used for several
# different base types (mpf, mpfs in Sage, intervals, Python floats) by
# switching contexts.
#
# The default context is called 'mp'. You can call most functions as
# mpmath.mp.foo(). The top-level function mpmath.foo() is just an alias
# for this.
ctx.obj = {}
config = config or os.path.join(click.get_app_dir('scarlett_os'), 'scarlett_os.ini')
cfg = read_config(config)
if cfg:
ctx.obj['config_file'] = config
ctx.obj['cfg'] = cfg
ctx.default_map = cfg
verbosity = (os.environ.get('SCARLETTOS_VERBOSE') or
ctx.lookup_default('scarlett_os.verbosity') or 0)
if verbose or quiet:
verbosity = verbose - quiet
verbosity = int(verbosity)
configure_logging(verbosity)
ctx.obj['verbosity'] = verbosity
def read_config():
"""Reads configuration storj client configuration.
Mac OS X (POSIX):
~/.foo-bar
Unix (POSIX):
~/.foo-bar
Win XP (not roaming):
``C:\Documents and Settings\<user>\Application Data\storj``
Win 7 (not roaming):
``C:\\Users\<user>\AppData\Local\storj``
Returns:
(tuple[str, str]): storj account credentials (email, password).
"""
# OSX: /Users/<username>/.storj
cfg = os.path.join(
click.get_app_dir(
APP_NAME,
force_posix=True),
'storj.ini')
parser = RawConfigParser()
parser.read([cfg])
return parser.get(APP_NAME, CFG_EMAIL), parser.get(APP_NAME, CFG_PASSWORD)
def __init__(self, api_key=None, library_id=None, library_type='user',
autosync=False):
""" Service class for communicating with the Zotero API.
This is mainly a thin wrapper around :py:class:`pyzotero.zotero.Zotero`
that handles things like transparent HTML<->[edit-formt] conversion.
:param api_key: API key for the Zotero API, will be loaded from
the configuration if not specified
:param library_id: Zotero library ID the API key is valid for, will
be loaded from the configuration if not specified
:param library_type: Type of the library, can be 'user' or 'group'
"""
self._logger = logging.getLogger()
idx_path = os.path.join(click.get_app_dir(APP_NAME), 'index.sqlite')
self.config = load_config()
self.note_format = self.config['zotcli.note_format']
self.storage_dir = self.config.get('zotcli.storage_dir')
api_key = api_key or self.config.get('zotcli.api_key')
library_id = library_id or self.config.get('zotcli.library_id')
if not api_key or not library_id:
raise ValueError(
"Please set your API key and library ID by running "
"`zotcli configure` or pass them as command-line options.")
self._zot = Zotero(library_id=library_id, api_key=api_key,
library_type=library_type)
self._index = SearchIndex(idx_path)
sync_interval = self.config.get('zotcli.sync_interval', 300)
since_last_sync = int(time.time()) - self._index.last_modified
if autosync and since_last_sync >= int(sync_interval):
click.echo("{} seconds since last sync, synchronizing."
.format(since_last_sync))
num_updated = self.synchronize()
click.echo("Updated {} items".format(num_updated))
def _get_config_path():
return os.path.join(click.get_app_dir(APP_NAME), 'config.ini')
def get_app_dir():
import click
return click.get_app_dir('q2cli', roaming=False)
# NOTE: `get_cache_dir` and `get_completion_path` live here instead of
# `q2cli.cache` because `q2cli.cache` can be slow to import.
# `get_completion_path` (which relies on `get_cache_dir`) is imported and
# executed by the Bash completion function each time the user hits <tab>, so it
# must be quick to import.
def get_cache_dir():
import os.path
return os.path.join(get_app_dir(), 'cache')
def get_app_dir(*args):
return os.path.join(click.get_app_dir('cget'), *args)
def get_cache_path(*args):
return get_app_dir('cache', *args)
def read_config(config_path):
if not config_path:
config_path = click.get_app_dir(APP_NAME)
else:
config_path = os.path.abspath(config_path)
config_file = os.path.join(config_path, 'config.yml')
config_dict = copy.copy(DEFAULT_CONFIG)
if os.path.exists(config_file):
with io.open(config_file, encoding='utf-8') as file_handle:
yaml_dict = poyo.parse_string(file_handle.read())
update(config_dict, yaml_dict)
return config_dict
def get_template_path(template_name, config_path):
if not config_path:
config_path = click.get_app_dir(APP_NAME)
else:
config_path = os.path.abspath(config_path)
template_path = os.path.join(config_path, 'templates', template_name)
template_file = os.path.join(template_path, "Dockerfile")
if not os.path.isdir(template_path):
raise ValueError("{0} does not exist. Please create the template directory".format(template_path))
if not os.path.isfile(template_file):
raise ValueError("{0}: Dockerfile was not found in {1}.".format(template_name, template_path))
return template_path
def setup_logging():
"""Reads the Storj GUI logging configuration from logging.conf.
If the file does not exist it will load a default configuration.
Mac OS X (POSIX):
~/.storj-gui
Unix (POSIX):
~/.storj-gui
Win XP (not roaming):
``C:\Documents and Settings\<user>\Application Data\storj-gui``
Win 7 (not roaming):
``C:\\Users\<user>\AppData\Local\storj-gui``
"""
logging_conf = os.path.join(
click.get_app_dir(APP_NAME, force_posix=True),
'logging.conf')
if not os.path.exists(logging_conf) or not os.path.isfile(logging_conf):
load_default_logging()
logging.getLogger(__name__).warning('%s logging configuration file does not exist', logging_conf)
return
try:
config.fileConfig(logging_conf, disable_existing_loggers=False)
logging.getLogger(__name__).info('%s configuration file was loaded.', logging_conf)
except RuntimeError:
load_default_logging()
logging.getLogger(__name__).warning('failed to load configuration from %s', logging_conf)
return
logging.getLogger(__name__).info('using logging configuration from %s', logging_conf)
def completion_dir() -> str:
""" Get the name of the completion directory """
return click.get_app_dir("temci")
def load_from_config_dir(self):
"""
Load the config file from the application directory (e.g. in the users home folder) if it exists.
"""
conf = os.path.join(click.get_app_dir("temci"), "config.yaml")
if os.path.exists(conf) and os.path.isfile(conf):
self.load_file(conf)
def __init__(self, loop):
"""
:param asyncio.AbstractEventLoop | None loop:
"""
# Information about host and user.
self.host_name = os.uname()[1]
self.user_name = get_login_username()
self.user_uid = getpwnam(self.user_name).pw_uid
self.user_home = os.path.expanduser('~' + self.user_name)
self.config_dir = click.get_app_dir('onedrived')
self._create_config_dir_if_missing()
self.config = self.DEFAULT_CONFIG
self.loop = loop
self._watcher = None
def setUp(self):
self.tempdir = tempfile.TemporaryDirectory()
click.get_app_dir = lambda x: self.tempdir.name + '/' + x
od_main.context = od_main.load_context()
od_main.context._create_config_dir_if_missing()
def setUp(self):
self.tempdir = tempfile.TemporaryDirectory()
click.get_app_dir = lambda x: self.tempdir.name + '/' + x
od_pref.context = od_pref.load_context()
od_pref.context._create_config_dir_if_missing()
def initialize(self):
self.ASSISTANT_API_ENDPOINT = 'embeddedassistant.googleapis.com'
self.END_OF_UTTERANCE = embedded_assistant_pb2.ConverseResponse.END_OF_UTTERANCE
self.DIALOG_FOLLOW_ON = embedded_assistant_pb2.ConverseResult.DIALOG_FOLLOW_ON
self.CLOSE_MICROPHONE = embedded_assistant_pb2.ConverseResult.CLOSE_MICROPHONE
api_endpoint=self.ASSISTANT_API_ENDPOINT
credentials=os.path.join(click.get_app_dir(common_settings.ASSISTANT_APP_NAME), common_settings.ASSISTANT_CREDENTIALS_FILENAME)
verbose=False
self.audio_sample_rate=common_settings.DEFAULT_AUDIO_SAMPLE_RATE
self.audio_sample_width=common_settings.DEFAULT_AUDIO_SAMPLE_WIDTH
self.audio_iter_size=common_settings.DEFAULT_AUDIO_ITER_SIZE
self.audio_block_size=common_settings.DEFAULT_AUDIO_DEVICE_BLOCK_SIZE
self.audio_flush_size=common_settings.DEFAULT_AUDIO_DEVICE_FLUSH_SIZE
self.grpc_deadline=common_settings.DEFAULT_GRPC_DEADLINE
# Load credentials.
try:
creds = auth_helpers.load_credentials(credentials, scopes=[common_settings.ASSISTANT_OAUTH_SCOPE])
except Exception as e:
self.error('Error loading credentials: %s', e)
self.error('Run auth_helpers to initialize new OAuth2 credentials.')
return
# Create gRPC channel
grpc_channel = auth_helpers.create_grpc_channel(api_endpoint, creds, ssl_credentials_file="", grpc_channel_options="")
self.log('Connecting to google')
# Create Google Assistant API gRPC client.
self.assistant = embedded_assistant_pb2.EmbeddedAssistantStub(grpc_channel)
# Configure audio source and sink.
self.audio_device = None
self.audio_source = self.audio_device = (self.audio_device or audio_helpers.SoundDeviceStream(sample_rate=self.audio_sample_rate, sample_width=self.audio_sample_width, block_size=self.audio_block_size, flush_size=self.audio_flush_size))
self.audio_sink = self.audio_device = (self.audio_device or audio_helpers.SoundDeviceStream(sample_rate=self.audio_sample_rate, sample_width=self.audio_sample_width, block_size=self.audio_block_size, flush_size=self.audio_flush_size))
# Create conversation stream with the given audio source and sink.
self.conversation_stream = audio_helpers.ConversationStream(source=self.audio_source, sink=self.audio_sink, iter_size=self.audio_iter_size)
self.conversation_state_bytes = None
self.volume_percentage = 70
self.listen_state(self.startGH,self.args["activation_boolean"],new="on")
self.log("App started. now listening to Homeassistant input")
def cli(ctx, status_file):
"""
Runs the tutorial.
"""
ctx.obj = {'status_filename': os.path.join(
click.get_app_dir('Click Tutorial'), status_file)}
lessons = get_lessons(ctx.obj['status_filename'])
if lessons:
ctx.obj['lessons'] = lessons
else:
ctx.abort()
def get_dir_path(cls):
return click.get_app_dir(cls.APP_NAME, False, False)
def __init__(self, auction_contract_addr, chain,
state_file_path: str=click.get_app_dir('event_sampler')):
self.contract_addr = auction_contract_addr
self.chain = chain
Auction = self.chain.provider.get_contract_factory('DutchAuction')
self.auction_contract = Auction(address=auction_contract_addr)
self.auction_contract_addr = auction_contract_addr
self.state = EventSamplerState(state_file_path)
callbacks = {
'BidSubmission': self.on_bid_submission,
'AuctionEnded': self.on_auction_end,
'Deployed': self.on_deployed_event,
'AuctionStarted': self.on_auction_start,
'ClaimedTokens': self.on_claimed_tokens
}
self.events = defaultdict(list)
self.auction_start_block = None
self.total_claimed = 0
self.final_price = None
self.auction_start_time = None
self.auction_end_time = None
self.price_start = None
self.price_constant = None
self.price_exponent = None
for k, v in callbacks.items():
self.sync_events(k, v)
watch_logs(self.auction_contract, k, v)
# start state save event - after the events are synced
self.save_event = StateSave(self.state)
self.save_event.start()
def default_config_dir():
"""Return default config directory."""
return click.get_app_dir(APP_NAME)
def get_game(path=None):
global GAME
if GAME:
return GAME
path = path or os.path.join(click.get_app_dir(APP_NAME), 'config.json')
GAME = Game.load(path) if os.path.exists(path) else Game.create(path)
return GAME
def __write_settings_file(url, username, password, authMethod, json):
data = {}
data['url'] = url.replace("/" + url.split('/')[-1], "")
data['project'] = url.split('/')[-1]
data['authMethod'] = authMethod
data['username'] = username
data['password'] = password
data['repo_id'] = {}
for project in json['value']:
data['repo_id'][project['name'].lower()] = project['id']
if not os.path.exists(click.get_app_dir("Codereview")):
os.makedirs(click.get_app_dir("Codereview"))
stream = open(Configuration.file_path, 'w')
yaml.dump(data, stream)
def test_kwarg_splitter():
kwargs = {'frozen_box': True, 'indent': 2}
result = group_kwargs_by_funct(kwargs, FUNCTS)
expect = {'open': {}, 'box': {'frozen_box': True},
'get_app_dir': {}, 'load': {}, 'dump': {'indent': 2}}
assert result == expect