python类load()的实例源码

mainplugin.py 文件源码 项目:qgis_wp 作者: Zverik 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def openGeoPackage(self, filename=None):
        if not filename:
            filename = QFileDialog.getOpenFileName(
                parent=None,
                caption=self.tr(u'Select GeoPackage file'),
                filter=self.tr(u'GeoPackage File') + u' (*.gpkg *.geopackage)')
        if not filename or not os.path.isfile(filename):
            return
        filename = os.path.abspath(filename)

        styleFile = os.path.join(self.path, 'res', 'wp_style.yaml')
        with open(styleFile, 'r') as f:
            style = yaml.load(f)
        applyStyle(filename, style)
        for layer in self.iface.legendInterface().layers():
            self.iface.legendInterface().refreshLayerSymbology(layer)
        self.createPie()
ymlr.py 文件源码 项目:botterlord 作者: Marchearth 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def retrieve(cont, filename):
    stream = file(filename, 'r')
    data = yaml.load(stream)
    #return yaml.dump(data, encoding=('utf-8'), default_flow_style=False, allow_unicode=True)
    return data[cont].encode('utf-8')
ymlr.py 文件源码 项目:botterlord 作者: Marchearth 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def internal_data(filename, io, entry, cont, cont_in = None, cont_in2 = None): #Supports up to 3 containers stacked on top.
    "filename = 'string',, io = [in, out],, entry = val,, cont,,..."
    stream = open(filename, 'r')
    prof = yaml.load(stream)
    if io == 'out':
        if cont_in == None:
            val = prof[cont]
        else:
            if cont_in2 == None:
                val = prof[cont][cont_in]
            else:
                val = prof[cont][cont_in][cont_in2]
        return val

    if io == 'in':
        if cont_in == None:
            prof[cont] = entry
        else:
            if cont_in2 == None:
                prof[cont][cont_in] = entry
            else:
                prof[cont][cont_in][cont_in2] = entry
        with open(filename, 'w') as yaml_file:
            yaml_file.write(yaml.dump(prof, default_flow_style = False))
settings.py 文件源码 项目:fuel-nailgun-extension-iac 作者: openstack 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def __init__(self):
        settings_files = []
        project_path = os.path.dirname(__file__)
        project_settings_file = os.path.join(project_path, 'settings.yaml')
        settings_files.append(project_settings_file)
        settings_files.append('/etc/git-exension-settings.yaml')
        settings_files.append('/etc/nailgun/git-exension-settings.yaml')
        self.config = {}
        for sf in settings_files:
            try:
                logger.debug("Trying to read config file %s" % sf)
                with open(sf) as custom_config:
                    self.config.update(yaml.load(custom_config.read()))
            except Exception as e:
                logger.error("Error while reading config file %s: %s" %
                             (sf, str(e)))
consul_check_postgres.py 文件源码 项目:consul-pg 作者: adamcstephens 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def configure(self):
        # load config values
        try:
            with open(self.configfile) as configfile_contents:
                self.config = json.load(configfile_contents)
        except:
            self.config = {}

        try:
            self.agent_services = self.api_session.get(self.api_endpoint + '/agent/services?stale').json()
        except:
            print_exc()
            exit(135)
        self.managed_service = self.agent_services[self.service]

        if self.managed_service['Tags'] == None:
            self.managed_service['Tags'] = []

        if self.role_source == "facter":
            self.get_facter_state(self.DEFAULT_FACTERFILE)
        else:
            print("!! unsupported PG role source !!")
            exit(140)
usermanager.py 文件源码 项目:ownbot 作者: michaelimfeld 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def __load_config(self):
        """Loads the configuration file.

            Loads all usergroups and users as a dict from
            the configuration file into the config attribute.
        """
        if not os.path.exists(self.USERS_CONF_PATH):
            self.__config = {}
            return

        with open(self.USERS_CONF_PATH, "r") as config_file:
            config = yaml.load(config_file)
            if not config:
                self.__config = {}
                return

            self.__config = config
yaml.py 文件源码 项目:PyPlanet 作者: PyPlanet 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def load(self):
        # Prepare + load directory.
        super().load()

        # Load the files and parse Yaml.
        parsed_settings = dict()

        try:
            for file_name in self.files:
                file_path = os.path.join(self.directory, file_name)
                with open(file_path, 'r') as file_handle:
                    parsed_settings.update(yaml.load(file_handle))
        except (yaml.YAMLError, yaml.MarkedYAMLError) as e:
            raise ImproperlyConfigured(
                'Your settings file(s) contain invalid YAML syntax! Please fix and restart!, {}'.format(str(e))
            )

        # Loop and set in local settings (+ uppercase keys).
        for key, value in parsed_settings.items():
            self.settings[key.upper()] = value
input_pipeline_test.py 文件源码 项目:seq2seq 作者: google 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_without_extra_args(self):
    pipeline_def = yaml.load("""
      class: ParallelTextInputPipeline
      params:
        source_files: ["file1"]
        target_files: ["file2"]
        num_epochs: 1
        shuffle: True
    """)
    pipeline = input_pipeline.make_input_pipeline_from_def(
        pipeline_def, tf.contrib.learn.ModeKeys.TRAIN)
    self.assertIsInstance(pipeline, input_pipeline.ParallelTextInputPipeline)
    #pylint: disable=W0212
    self.assertEqual(pipeline.params["source_files"], ["file1"])
    self.assertEqual(pipeline.params["target_files"], ["file2"])
    self.assertEqual(pipeline.params["num_epochs"], 1)
    self.assertEqual(pipeline.params["shuffle"], True)
input_pipeline_test.py 文件源码 项目:seq2seq 作者: google 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_with_extra_args(self):
    pipeline_def = yaml.load("""
      class: ParallelTextInputPipeline
      params:
        source_files: ["file1"]
        target_files: ["file2"]
        num_epochs: 1
        shuffle: True
    """)
    pipeline = input_pipeline.make_input_pipeline_from_def(
        def_dict=pipeline_def,
        mode=tf.contrib.learn.ModeKeys.TRAIN,
        num_epochs=5,
        shuffle=False)
    self.assertIsInstance(pipeline, input_pipeline.ParallelTextInputPipeline)
    #pylint: disable=W0212
    self.assertEqual(pipeline.params["source_files"], ["file1"])
    self.assertEqual(pipeline.params["target_files"], ["file2"])
    self.assertEqual(pipeline.params["num_epochs"], 5)
    self.assertEqual(pipeline.params["shuffle"], False)
configuration.py 文件源码 项目:yt-browser 作者: juanfgs 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def __init__(self):
        self.config_dir = os.path.expanduser("~/.config/ytbrowser/")
        self.defaults['format'] = "mkv"
        self.defaults['quality'] = "bestvideo"
        self.defaults['preferredcodec'] = "mp3"
        self.defaults['preferredquality'] = 192
        self.defaults['developerKey'] = "AIzaSyDFuK00HWV0fd1VMb17R8GghRVf_iQx9uk"
        self.defaults['apiServiceName'] = "youtube"
        self.defaults['apiVersion'] = "v3"

        if not os.path.exists(self.config_dir):
            os.makedirs(self.config_dir)

        if not os.path.exists(self.config_dir + "config.yml"):
            open(self.config_dir + "config.yml", "a").close()

        with open(self.config_dir + "config.yml", 'r') as ymlfile:
                self.user_settings = yaml.load(ymlfile)

        if self.user_settings is None:
            self.user_settings = {}
datalayer.py 文件源码 项目:tripletloss 作者: luhaofang 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def setup(self, bottom, top):
        """Setup the RoIDataLayer."""
        # parse the layer parameter string, which must be valid YAML
        layer_params = yaml.load(self.param_str_)    
        self._batch_size = config.BATCH_SIZE
        self._triplet = self._batch_size/3
        assert self._batch_size % 3 == 0
        self._name_to_top_map = {
            'data': 0,
            'labels': 1}

        self.data_container =  sampledata() 
        self._index = 0

        # data blob: holds a batch of N images, each with 3 channels
        # The height and width (100 x 100) are dummy values
        top[0].reshape(self._batch_size, 3, 224, 224)

        top[1].reshape(self._batch_size)
models.py 文件源码 项目:cbapi-python 作者: carbonblack 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def __getattr__(self, item):
        try:
            val = super(NewBaseModel, self).__getattribute__(item)
        except AttributeError:
            pass         # fall through to the rest of the logic...

        # try looking up via self._info, if we already have it.
        if item in self._info:
            return self._info[item]

        # if we're still here, let's load the object if we haven't done so already.
        if not self._full_init:
            self._refresh()

        # try one more time.
        if item in self._info:
            return self._info[item]
        else:
            raise AttributeError("'{0}' object has no attribute '{1}'".format(self.__class__.__name__,
                                                                              item))
models.py 文件源码 项目:cbapi-python 作者: carbonblack 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def __str__(self):
        lines = []
        lines.append("{0:s} object, bound to {1:s}.".format(self.__class__.__name__, self._cb.session.server))
        if self._last_refresh_time:
            lines.append(" Last refreshed at {0:s}".format(time.ctime(self._last_refresh_time)))
        if not self._full_init:
            lines.append(" Partially initialized. Use .refresh() to load all attributes")
        lines.append("-"*79)
        lines.append("")

        for attr in sorted(self._info):
            status = "   "
            if attr in self._dirty_attributes:
                if self._dirty_attributes[attr] is None:
                    status = "(+)"
                else:
                    status = "(*)"
            val = str(self._info[attr])
            if len(val) > 50:
                val = val[:47] + u"..."
            lines.append(u"{0:s} {1:>20s}: {2:s}".format(status, attr, val))

        return "\n".join(lines)
cluster_auto_start_daemon.py 文件源码 项目:sm-engine-ansible 作者: METASPACE2020 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def __init__(self, ansible_config_path, aws_key_name=None, interval=60,
                 qname='sm_annotate', debug=False):

        with open(ansible_config_path) as fp:
            self.ansible_config = yaml.load(fp)

        self.interval = min(interval, 1200)
        self.aws_key_name = aws_key_name or self.ansible_config['aws_key_name']
        self.master_hostgroup = self.ansible_config['cluster_configuration']['instances']['master']['hostgroup']
        self.slave_hostgroup = self.ansible_config['cluster_configuration']['instances']['slave']['hostgroup']
        self.stage = self.ansible_config['stage']
        self.qname = qname
        self.debug = debug

        self._setup_logger()
        self.ec2 = boto3.resource('ec2', self.ansible_config['aws_region'])
config.py 文件源码 项目:boss 作者: kabirbaidhya 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def resolve_dotenv_file(path, stage=None):
    '''
    Resolve dotenv file and load environment vars if it exists.
    If stage parameter is provided, then stage specific .env file is resolved,
    for instance .env.production if stage=production etc.
    If stage is None, just .env file is resolved.
    '''
    filename = '.env' + ('' if not stage else '.{}'.format(stage))
    dotenv_path = os.path.join(path, filename)
    fallback_path = os.path.join(path, '.env')

    if fs.exists(dotenv_path):
        info('Resolving env file: {}'.format(cyan(dotenv_path)))
        dotenv.load_dotenv(dotenv_path)

    elif fs.exists(fallback_path):
        info('Resolving env file: {}'.format(cyan(fallback_path)))
        dotenv.load_dotenv(fallback_path)
config.py 文件源码 项目:boss 作者: kabirbaidhya 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def load(filename=DEFAULT_CONFIG_FILE, stage=None):
    ''' Load the configuration and return it. '''
    try:
        # pass
        file_contents = fs.read(filename)
        resolve_dotenv_file(os.path.dirname(filename), stage)

        # Expand the environment variables used in the yaml config.
        loaded_config = os.path.expandvars(file_contents)

        # Parse the yaml configuration.
        # And merge it with the defaults before it's used everywhere.
        loaded_config = yaml.load(loaded_config)
        merged_config = merge_config(loaded_config)

        _config.update(merged_config)

        return get()

    except KeyError:
        halt('Invalid configuration file "{}"'.format(filename))

    except IOError:
        halt('Error loading config file "%s"' % filename)
runner.py 文件源码 项目:PyWebRunner 作者: IntuitiveWebSolutions 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def main():
    global ARGS

    parser = argparse.ArgumentParser(description='Run a PyWebRunner YAML/JSON script.')
    parser.add_argument('-b', '--browser', help='Which browser to load. Defaults to Chrome.')
    parser.add_argument('--base-url', help='Base URL to use with goto command.')
    parser.add_argument('-t', '--timeout', help='Global wait timeout (in seconds). Defaults to 30.')
    parser.add_argument('-p', '--processes', help='Number of processes (browsers) to use. Defaults to 1')
    parser.add_argument('-do', '--default-offset', help='New default offset for scroll_to_element. (Default is 0)')
    parser.add_argument('--errors', dest='errors', action='store_true', help='Show errors.')
    parser.add_argument('--focus', dest='focus', action='store_true', help='Focus the browser on launch.')
    parser.add_argument('-v', '--verbose', dest='verbose', action='store_true', help='Verbose output of commands being executed.')
    parser.add_argument('files', nargs='*')
    ARGS = parser.parse_args()

    processes = ARGS.processes or 1
    pool = Pool(int(processes))

    pool.map(run_test, ARGS.files)

    pool.close()
    pool.join()
integration.py 文件源码 项目:Auto_Analysis 作者: ztwo 项目源码 文件源码 阅读 47 收藏 0 点赞 0 评论 0
def get_device_info():
    """
    ?????????devices
    :return: ??????
    """
    device_list = []
    ini = U.ConfigIni()
    test_info = ini.get_ini('test_info', 'info')
    test_device = ini.get_ini('test_device', 'device')
    with open(test_info) as f:
        test_dic = yaml.load(f)[0]

    with open(test_device) as f:
        for device in yaml.load(f):
            device_list.append(dict(test_dic.items() + device.items()))

    return device_list
db_utils.py 文件源码 项目:pybot 作者: spillai 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def iter_keys_values(self, keys, inds=None, verbose=False): 
        for key in keys: 
            if key not in self.keys_: 
                raise RuntimeError('Key %s not found in dataset. keys: %s' % (key, self.keys_))

        idx, ii = 0, 0
        total_chunks = len(self.meta_file_.chunks)
        inds = np.sort(inds) if inds is not None else None

        for chunk_idx, chunk in enumerate(progressbar(self.meta_file_.chunks, size=total_chunks, verbose=verbose)): 
            data = AttrDict.load(self.get_chunk_filename(chunk_idx))

            # if inds is None: 
            items = (data[key] for key in keys)
            for item in izip(*items): 
                yield item
            # else:
            #     for i, item in enumerate(data[key]): 
            #         if inds[ii] == idx + i: 
            #             yield item
            #             ii += 1
            #             if ii >= len(inds): break
            #     idx += len(data[key])
db_utils.py 文件源码 项目:pybot 作者: spillai 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def iterchunks(self, key, batch_size=10, verbose=False): 
        if key not in self.keys_: 
            raise RuntimeError('Key %s not found in dataset. keys: %s' % (key, self.keys_))

        idx, ii = 0, 0
        total_chunks = len(self.meta_file_.chunks)
        batch_chunks = grouper(range(len(self.meta_file_.chunks)), batch_size)

        for chunk_group in progressbar(batch_chunks, size=total_chunks / batch_size, verbose=verbose): 
            items = []
            # print key, chunk_group
            for chunk_idx in chunk_group: 
                # grouper will fill chunks with default none values
                if chunk_idx is None: continue
                # Load chunk
                data = AttrDict.load(self.get_chunk_filename(chunk_idx))
                for item in data[key]: 
                    items.append(item)
            yield items
find_entities_test.py 文件源码 项目:gransk 作者: pcbje 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_config(self):
    with open('config.yml') as inp:
      config = yaml.load(inp.read())

    _find_entities = find_entities.Subscriber(test_helper.get_mock_pipeline([]))
    _find_entities.setup(config)

    doc = document.get_document('dummy')

    for entity_type, pattern_conf in config.get(helper.ENTITIES, {}).items():
      if not isinstance(pattern_conf['test'], list):
        pattern_conf['test'] = [pattern_conf['test']]

      for test in pattern_conf['test']:
        doc.text = 'dum dum {} dum'.format(test)
        _find_entities.consume(doc, None)
        entities = doc.entities.get_all()

        self.assertEqual(1, len(entities),
                         msg='regex for %s found nothing' % entity_type)
        self.assertEqual(entity_type, entities[0][1]['type'])
        self.assertEqual(test, entities[0][1]['value'])
aa_tasks.py 文件源码 项目:saapy 作者: ashapochka 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def import_scitools_yaml_to_neo4j(ctx, yaml_path, neo4j_url='bolt://localhost',
                                  user='neo4j', labels=''):
    """

    :param labels:
    :param ctx:
    :param yaml_path:
    :param neo4j_url:
    :param user:
    """
    label_list = to_label_list(labels)
    with open(yaml_path, 'r') as input_stream:
        scitools_db = yaml.load(input_stream)
    neo4j_client = connect_neo4j(ctx, neo4j_url, user)
    ScitoolsETL.import_to_neo4j(scitools_db, neo4j_client, labels=label_list)


# noinspection PyUnusedLocal
connector.py 文件源码 项目:webtzite 作者: materialsproject 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def split_config(config):
        """Split the 'config' object into a set of fields.

        :param config: Configuration
        :type param: dict or str
        :raises: yaml.error.YAMLError if config is a str that doesn't parse
        """
        if not isinstance(config, dict):
            config = yaml.load(config)
        db = config.get("db", None)
        host = config.get("host", "0.0.0.0")
        user_name = config.get("user_name", None)
        password = config.get("password", None)
        port = int(config.get("port", 27017))
        coll = config.get("collection", None)
        return db, host, user_name, password, port, coll
messier.py 文件源码 项目:messier 作者: conorsch 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def parse_messier_config(self, config_filepath=".messier"):
        """
        Read YAML config file for Messier. Defaults to .messier.
        Supported options include:

          `serverspec_commands`: list of shell commands to run for Serverspec
          `serverspec_base_directory`: directory to cd into prior to running Serverspec
        """
        try:
            config_file = open(config_filepath,'r')
        except IOError:
            config = {}
        else:
            config = yaml.load(config_file)
            if not config:
                config = {}
        return config


    # Elegant solution from https://gist.github.com/LeoHuckvale/8f50f8f2a6235512827b
    # Stuffing this method into class because it's harder to reference otherwise
config.py 文件源码 项目:caduc 作者: tjamet 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def __init__(self, options=[], config_path=None):
        if config_path is None:
            config_path = os.path.join(os.path.expanduser("~"), ".caduc", "config.yml")
            if os.path.exists(config_path):
                config = yaml.load(open(config_path, 'r'))
            else:
                config = {}
        else:
            config = yaml.load(open(config_path, 'r'))
        super(Config, self).__init__(**config)
        for opt in options:
            k, v = self.parse_kv(opt)
            node = {}
            child = node
            keys = self.parse_key(k)
            for key in keys[:-1]:
                child[key] = {}
                child = child[key]
            child[keys[-1]] = v
            self.update(node)
benchmark.py 文件源码 项目:DeepSea 作者: SUSE 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def _get_job_parameters(self, job_spec, job_log_dir, client):
        with open('{}/{}'.format(self.bench_dir, job_spec, 'r')) as yml:
            try:
                job = yaml.load(yml)
            except YAMLError as error:
                log.error('Error parsing job spec in file {}/fio/{}'.format(self.bench_dir, job_spec))
                log.error(error)
                raise error
        output_options = '''
        write_bw_log={logdir}/output
        write_lat_log={logdir}/output
        write_hist_log={logdir}/output
        write_iops_log={logdir}/output
        '''.format(logdir=job_log_dir)
        job.update({'dir': self.work_dir, 'output_options': output_options,
                    'client': client})
        return job
proposal.py 文件源码 项目:DeepSea 作者: SUSE 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def _record_filter(args, base_dir):
    """
    Save the filter provided
    """
    filter_file = '{}/.filter'.format(base_dir)

    if not isfile(filter_file):
        # do a touch filter_file
        open(filter_file, 'a').close()

    current_filter = {}
    with open(filter_file) as filehandle:
        current_filter = yaml.load(filehandle)
    if current_filter is None:
        current_filter = {}

    pprint.pprint(current_filter)

    # filter a bunch of salt content and the target key before writing
    rec_args = {k: v for k, v in args.items() if k is not 'target' and not
                k.startswith('__')}
    current_filter[args['target']] = rec_args

    with open(filter_file, 'w') as filehandle:
        yaml.dump(current_filter, filehandle, default_flow_style=False)
cli.py 文件源码 项目:manage 作者: rochacbruno 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def load_manage_dict(filename=None):
    manage_filename = None
    if not MANAGE_DICT:
        if filename:
            manage_filename = filename
        elif os.path.exists(MANAGE_FILE):
            manage_filename = MANAGE_FILE
        elif os.path.exists(HIDDEN_MANAGE_FILE):
            manage_filename = HIDDEN_MANAGE_FILE
        else:
            MANAGE_DICT.update(copy.deepcopy(default_manage_dict))
            MANAGE_DICT['shell']['banner']['message'] = (
                "WARNING: This is not a managed project\n"
                "\tPlease `exit()` and \n"
                "\trun `$ manage init`\n"
                "\tand edit `manage.yml` file with desired options"
            )
            MANAGE_DICT['shell']['auto_import']['display'] = False
        if manage_filename:
            with open(manage_filename) as manage_file:
                MANAGE_DICT.update(yaml.load(manage_file))
    return MANAGE_DICT
template.py 文件源码 项目:clouds-aws 作者: elias5000 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def load_parameters(stack):
    """load parameters from yaml file and return as dictionary"""
    params = []
    param_path = path.join('stacks', stack, 'parameters.yaml')

    if not path.exists(param_path):
        return params

    with open(param_path, encoding='utf-8') as file:
        params_raw = yaml.load(file.read())

        # build parameter dict
        for param in params_raw.keys():
            params.append({
                'ParameterKey': param,
                'ParameterValue': params_raw[param]
            })
    return params
tally_server.py 文件源码 项目:privcount 作者: privcount 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def run(self):
        '''
        Called by twisted
        '''
        # load initial config
        self.refresh_config()
        if self.config is None:
            logging.critical("cannot start due to error in config file")
            return

        # refresh and check status every event_period seconds
        self.refresh_task = task.LoopingCall(self.refresh_loop)
        refresh_deferred = self.refresh_task.start(self.config['event_period'], now=False)
        refresh_deferred.addErrback(errorCallback)

        # setup server for receiving blinded counts from the DC nodes and key shares from the SK nodes
        listen_port = self.config['listen_port']
        key_path = self.config['key']
        cert_path = self.config['cert']
        ssl_context = ssl.DefaultOpenSSLContextFactory(key_path, cert_path)

        logging.info("Tally Server listening on port {}".format(listen_port))
        reactor.listenSSL(listen_port, self, ssl_context)
        reactor.run()


问题


面经


文章

微信
公众号

扫码关注公众号