python类CSafeLoader()的实例源码

pyyaml.py 文件源码 项目:django-wechat-api 作者: crazy-canux 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def Deserializer(stream_or_string, **options):
    """
    Deserialize a stream or string of YAML data.
    """
    if isinstance(stream_or_string, bytes):
        stream_or_string = stream_or_string.decode('utf-8')
    if isinstance(stream_or_string, six.string_types):
        stream = StringIO(stream_or_string)
    else:
        stream = stream_or_string
    try:
        for obj in PythonDeserializer(yaml.load(stream, Loader=SafeLoader), **options):
            yield obj
    except GeneratorExit:
        raise
    except Exception as e:
        # Map to deserializer error
        six.reraise(DeserializationError, DeserializationError(e), sys.exc_info()[2])
composition.py 文件源码 项目:docker-etude 作者: globality-corp 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def from_yaml(cls, data):
        return cls.from_dict(
            load(data, Loader=SafeLoader),
        )
icons.py 文件源码 项目:yamlloader 作者: fuzzysteve 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def importyaml(connection,metadata,sourcePath):
    eveIcons = Table('eveIcons',metadata)
    print "Importing Icons"
    print "Opening Yaml"
    with open(os.path.join(sourcePath,'fsd','iconIDs.yaml'),'r') as yamlstream:
        trans = connection.begin()
        icons=load(yamlstream,Loader=SafeLoader)
        print "Yaml Processed into memory"
        for icon in icons:
            connection.execute(eveIcons.insert(),
                            iconID=icon,
                            iconFile=icons[icon].get('iconFile',''),
                            description=icons[icon].get('description',''))
    trans.commit()
categories.py 文件源码 项目:yamlloader 作者: fuzzysteve 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def importyaml(connection,metadata,sourcePath):
    print "Importing Categories"
    invCategories = Table('invCategories',metadata)
    trnTranslations = Table('trnTranslations',metadata)

    print "opening Yaml"

    trans = connection.begin()
    with open(os.path.join(sourcePath,'fsd','categoryIDs.yaml'),'r') as yamlstream:
        print "importing"
        categoryids=load(yamlstream,Loader=SafeLoader)
        print "Yaml Processed into memory"
        for categoryid in categoryids:
            connection.execute(invCategories.insert(),
                            categoryID=categoryid,
                            categoryName=categoryids[categoryid].get('name',{}).get('en','').decode('utf-8'),
                            iconID=categoryids[categoryid].get('iconID'),
                            published=categoryids[categoryid].get('published',0))

            if (categoryids[categoryid].has_key('name')):
                for lang in categoryids[categoryid]['name']:
                    try:
                        connection.execute(trnTranslations.insert(),tcID=6,keyID=categoryid,languageID=lang,text=categoryids[categoryid]['name'][lang].decode('utf-8'));
                    except:                        
                        print '{} {} has a category problem'.format(categoryid,lang)
    trans.commit()
certificates.py 文件源码 项目:yamlloader 作者: fuzzysteve 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def importyaml(connection,metadata,sourcePath):
    certCerts = Table('certCerts',metadata)
    certSkills = Table('certSkills',metadata,)
    skillmap={"basic":0,"standard":1,"improved":2,"advanced":3,"elite":4}

    print "Importing Certificates"
    print "opening Yaml"
    with open(os.path.join(sourcePath,'fsd','certificates.yaml'),'r') as yamlstream:
        trans = connection.begin()
        certificates=load(yamlstream,Loader=SafeLoader)
        print "Yaml Processed into memory"
        for certificate in certificates:
            connection.execute(certCerts.insert(),
                            certID=certificate,
                            name=certificates[certificate].get('name',''),
                            description=certificates[certificate].get('description',''),
                            groupID=certificates[certificate].get('groupID'))
            for skill in certificates[certificate]['skillTypes']:
                for skillLevel in certificates[certificate]['skillTypes'][skill]:
                    connection.execute(certSkills.insert(),
                                        certID=certificate,
                                        skillID=skill,
                                        certLevelInt=skillmap[skillLevel],
                                        certLevelText=skillLevel,
                                        skillLevel=certificates[certificate]['skillTypes'][skill][skillLevel]
                                        )
    trans.commit()
config.py 文件源码 项目:ceph-lcm 作者: Mirantis 项目源码 文件源码 阅读 45 收藏 0 点赞 0 评论 0
def yaml_load(filename):
    with filename.open("rt") as yamlfp:
        return yaml.load(yamlfp, Loader=YAMLLoader)
meta.py 文件源码 项目:BlogInPy 作者: the-new-sky 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def run(self, lines):
        """ Parse Meta-Data and store in Markdown.Meta. """
        meta = {}
        key = None
        yaml_block = []
        have_yaml = False
        if lines and YAML_BEGIN_RE.match(lines[0]):
            have_yaml = True
            lines.pop(0)
            if self.config['yaml'] and not yaml:  # pragma: no cover
                log.warning('Document with YAML header, but PyYAML unavailable')
        while lines:
            line = lines.pop(0)
            m1 = META_RE.match(line)
            if line.strip() == '' or have_yaml and YAML_END_RE.match(line):
                break  # blank line or end of YAML header - done
            elif have_yaml and self.config['yaml'] and yaml:
                yaml_block.append(line)
            elif m1:
                key = m1.group('key').lower().strip()
                value = m1.group('value').strip()
                try:
                    meta[key].append(value)
                except KeyError:
                    meta[key] = [value]
            else:
                m2 = META_MORE_RE.match(line)
                if m2 and key:
                    # Add another line to existing key
                    meta[key].append(m2.group('value').strip())
                else:
                    lines.insert(0, line)
                    break  # no meta data - done
        if yaml_block:
            meta = yaml.load('\n'.join(yaml_block), SafeLoader)
        self.markdown.Meta = meta
        return lines
util.py 文件源码 项目:awesome-ancient-chinese-books 作者: bhuztez 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def load(stream):
    class Loader(yaml.CSafeLoader):
        pass

    def constructor(loader,node):
        return OrderedDict(loader.construct_pairs(node))

    Loader.add_constructor(
        yaml.resolver.BaseResolver.DEFAULT_MAPPING_TAG,
        constructor)

    return yaml.load(stream, Loader=Loader)
skins.py 文件源码 项目:yamlloader 作者: fuzzysteve 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def importyaml(connection,metadata,sourcePath):
    skinLicense = Table('skinLicense',metadata)
    skinMaterials = Table('skinMaterials',metadata)
    skins_table = Table('skins',metadata)
    skinShip = Table('skinShip',metadata)            

    trans = connection.begin()
    print "Importing Skins"
    print "opening Yaml1"
    with open(os.path.join(sourcePath,'fsd','skins.yaml'),'r') as yamlstream:
        skins=load(yamlstream,Loader=SafeLoader)
        print "Yaml Processed into memory"
        for skinid in skins:
            connection.execute(skins_table.insert(),
                            skinID=skinid,
                            internalName=skins[skinid].get('internalName',''),
                            skinMaterialID=skins[skinid].get('skinMaterialID',''))
            for ship in skins[skinid]['types']:
                connection.execute(skinShip.insert(),
                                skinID=skinid,
                                typeID=ship)


    print "opening Yaml2"
    with open(os.path.join(sourcePath,'fsd','skinLicenses.yaml'),'r') as yamlstream:
        skinlicenses=load(yamlstream,Loader=SafeLoader)
        print "Yaml Processed into memory"
        for licenseid in skinlicenses:
            connection.execute(skinLicense.insert(),
                                licenseTypeID=licenseid,
                                duration=skinlicenses[licenseid]['duration'],
                                skinID=skinlicenses[licenseid]['skinID'])
    print "opening Yaml3"
    with open(os.path.join(sourcePath,'fsd','skinMaterials.yaml'),'r') as yamlstream:
        skinmaterials=load(yamlstream,Loader=SafeLoader)
        print "Yaml Processed into memory"
        for materialid in skinmaterials:
            connection.execute(skinMaterials.insert(),
                                skinMaterialID=materialid,
                                displayNameID=skinmaterials[materialid]['displayNameID'],
                                materialSetID=skinmaterials[materialid]['materialSetID']
                                )

    trans.commit()
blueprints.py 文件源码 项目:yamlloader 作者: fuzzysteve 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def importyaml(connection,metadata,sourcePath):

    activityIDs={"copying":5,"manufacturing":1,"research_material":4,"research_time":3,"invention":8,"reaction":11};

    industryBlueprints=Table('industryBlueprints',metadata)
    industryActivity = Table('industryActivity',metadata)
    industryActivityMaterials = Table('industryActivityMaterials',metadata)
    industryActivityProducts = Table('industryActivityProducts',metadata)
    industryActivitySkills = Table('industryActivitySkills',metadata)
    industryActivityProbabilities = Table('industryActivityProbabilities',metadata)




    print "importing Blueprints"
    print "opening Yaml"
    trans = connection.begin()
    with open(os.path.join(sourcePath,'fsd','blueprints.yaml'),'r') as yamlstream:
        blueprints=load(yamlstream,Loader=SafeLoader)
        print "Yaml Processed into memory"
        for blueprint in blueprints:
            connection.execute(industryBlueprints.insert(),typeID=blueprint,maxProductionLimit=blueprints[blueprint]["maxProductionLimit"])
            for activity in blueprints[blueprint]['activities']:
                connection.execute(industryActivity.insert(),
                                    typeID=blueprint,
                                    activityID=activityIDs[activity],
                                    time=blueprints[blueprint]['activities'][activity]['time'])
                if blueprints[blueprint]['activities'][activity].has_key('materials'):
                    for material in blueprints[blueprint]['activities'][activity]['materials']:
                        connection.execute(industryActivityMaterials.insert(),
                                            typeID=blueprint,
                                            activityID=activityIDs[activity],
                                            materialTypeID=material['typeID'],
                                            quantity=material['quantity'])
                if blueprints[blueprint]['activities'][activity].has_key('products'):
                    for product in blueprints[blueprint]['activities'][activity]['products']:
                        connection.execute(industryActivityProducts.insert(),
                                            typeID=blueprint,
                                            activityID=activityIDs[activity],
                                            productTypeID=product['typeID'],
                                            quantity=product['quantity'])
                        if product.has_key('probability'):
                            connection.execute(industryActivityProbabilities.insert(),
                                                typeID=blueprint,
                                                activityID=activityIDs[activity],
                                                productTypeID=product['typeID'],
                                                probability=product['probability'])
                try:
                    if blueprints[blueprint]['activities'][activity].has_key('skills'):
                        for skill in blueprints[blueprint]['activities'][activity]['skills']:
                            connection.execute(industryActivitySkills.insert(),
                                                typeID=blueprint,
                                                activityID=activityIDs[activity],
                                                skillID=skill['typeID'],
                                                level=skill['level'])
                except:
                    print '{} has a bad skill'.format(blueprint)
    trans.commit()


问题


面经


文章

微信
公众号

扫码关注公众号