def load_bgp_database(filename):
""" load bgp database
we load our existing bgp information from the yaml file generated from
our first script here in order to graph that information """
# initialize our bgp db
bgp_db = {}
# load our existing bgp databses
with open(filename, 'r') as fn:
yaml=YAML()
bgp_db = yaml.load(fn)
return bgp_db
python类YAML的实例源码
def load_spec_dict(spec_path):
with open(spec_path, 'r') as spec_file:
return YAML().load(spec_file.read())
def _get_lang_data(self, lang):
with open(f'./resources/lang/{lang}.yml') as f:
return YAML(typ='safe').load(f)
def _setUp(self, bf, yml):
self.bf = bf
self.yml = yml
self.name = self.__class__.__name__[7:].lower()
YAML = yaml.YAML()
self.cm = YAML.load(self.yml)
if self.cm is None:
self.cm = CommentedMap()
self.non_empty = []
for a in BuildFlags.attrs:
if getattr(bf, a):
self.non_empty.append(a)
def load_settings():
conandir = os.path.expanduser("~/.conan/")
if not os.path.exists(conandir):
return
settings_file = os.path.join(conandir, 'settings.yml')
with open(settings_file) as f:
txt = f.read()
YAML = yaml.YAML()
data = YAML.load(txt)
settings = odict(data)
return settings
def load_txt(yml_txt):
"""load a yml txt into a compilers, flags pair"""
YAML = yaml.YAML()
dump = YAML.load(yml_txt)
fa = dump.get('flag_aliases', dump)
return load_yml(fa)
def _load_yml(self, yml):
YAML = yaml.YAML()
dump = YAML.load(yml)
dump = odict(dump)
for i in ('project', 'config', 'flag_aliases'):
if dump.get(i) is None:
dump[i] = CommentedMap()
setattr(self, i, dump[i])
self._dump = dump
from . import flags as c4flags
self.flag_aliases = c4flags.FlagAliases(yml=dump.get('flag_aliases', CommentedMap()))
def setup_cache(self):
if not self.cache_path or not os.path.isfile(self.cache_path):
cache = {}
else:
yaml = YAML()
cache = yaml.load(open(self.cache_path, 'r'))
if 'ho_cookies' not in cache:
cache['ho_cookies'] = {}
if 'admin_channels' not in cache:
cache['admin_channels'] = {}
if 'joined_conversations' not in cache:
cache['joined_conversations'] = {}
return cache
def save_cache(self):
if self.cache_path:
self.cache['joined_conversations'] = dict(self.joined_conversations)
yaml = YAML()
yaml.dump(self.cache, open(self.cache_path, 'w'))
def bgp_parse_logic(filename):
""" bgp parse logic
parse function to run in this example exercise """
# initialize final reliability datastructure
bgp_db = {}
# initialize basic variables
route_address = ''
next_hop_address = ''
with open(filename, 'r') as fn:
# iterate through the file
for line in fn:
# store valid addresses
valid_addresses = flexible_parse(line)
print(valid_addresses)
#valid_addresses = strict_parse(line)
# check number of valid addresses is 2
if len(valid_addresses) == 2:
# initialize the valid addresses
route_address = valid_addresses[0]
next_hop_address = valid_addresses[1]
# initialize route address in dictionary if not already done
if route_address not in bgp_db:
bgp_db[route_address] = 0
# increment redundancy count
bgp_db[route_address] += 1
# check number of valid addresses is 1
# this should only be invoked with a different format of
# 'show ip bgp' - aka CISCO DEVICES
# this will also break strict parsing FYI
elif len(valid_addresses) == 1:
# initialize valid addresses
next_hop_address = valid_addresses[0]
# initialize route address in dictionary if not already done
if route_address not in bgp_db:
bgp_db[route_address] = 0
# increment redundancy count
bgp_db[route_address] += 1
# save our data as a yaml file to be reused for graping purposes!
with open('fullbgpredundancy.yml', 'w') as fn:
yaml=YAML()
yaml.default_flow_style = False
yaml.dump(bgp_db, fn)
def result(self, url, txt1, txt2):
fn = open(txt1,"r").read()
catalog = fn
fn2 = open(txt2,"r").read()
extract = fn2
yaml = YAML()
import sys
catalog = yaml.load(catalog)
yaml.dump(catalog, sys.stdout)
"""
catalog = {
"default":{
"index": "index|node|/[a-z0-9]+$",
"content": "content|[/_]20[01][0-9][/_-]?\d\d[/_-]?\d\d",
},
"auto.china.com.cn":{
"content": "/20[01][0-9]\d{4}/\d+\.shtml",
},
}
"""
fetcher = Fetcher()
extract_rule = yaml.load(extract)
print (">>>>>",type(extract_rule))
links = fetcher.build(url,'.china.com.cn', catalog, iframe_a=iframe_as.link)
lll = sorted(links.items() , key=lambda t: t[0]+t[1].catalog)
result_list = []
content_List = []
urlNum = 1
contentNum = 1
for k, v in lll:
if True:
result_list.append( (urlNum,v.catalog, v.netloc, v.title, k))
urlNum += 1
if v.netloc in catalog['accepted']:
res = fetcher.detail_page(k, extract_rule)
if len(res) == 7:
content_List.append((contentNum,res["author"],res["editor"],res["title"], k,res["crumbs"],res["date"],res['source'], res['content']))
contentNum += 1
else:
print(res)
self.list_result = result_list
self.content_result = content_List
return result_list,content_List