def main(argv):
parser = build_cli_parser()
opts, args = parser.parse_args(argv)
if not opts.cache_name:
parser.print_help()
sys.exit(-1)
global cache_file_name
cache_file_name = opts.cache_name
requests_cache.install_cache(cache_file_name, allowable_methods=('GET', 'POST'))
global cb
cb = CbEnterpriseResponseAPI()
large_process_search()
large_binary_search()
sensor_search()
watchlist_search()
feed_search()
python类install_cache()的实例源码
def init_requests_cache(refresh_cache=False):
"""
Initializes a cache which the ``requests`` library will consult for
responses, before making network requests.
:param refresh_cache: Whether the cache should be cleared out
"""
# Cache data from external sources; used in some checks
dirs = AppDirs("stix2-validator", "OASIS")
# Create cache dir if doesn't exist
try:
os.makedirs(dirs.user_cache_dir)
except OSError as e:
if e.errno != errno.EEXIST:
raise
requests_cache.install_cache(
cache_name=os.path.join(dirs.user_cache_dir, 'py{}cache'.format(
sys.version_info[0])),
expire_after=datetime.timedelta(weeks=1))
if refresh_cache:
clear_requests_cache()
def enable_cache(expire_after=691200):
if not 'requests_cache' in modules:
return False
requests_cache.install_cache('loggingnight_cache', backend='sqlite', expire_after=expire_after)
return True
def generate_csl_items(args, citation_df):
"""
General CSL (citeproc) items for standard_citations in citation_df.
Writes references.json to disk and logs warnings for potential problems.
"""
# Read manual references (overrides) in JSON CSL
manual_refs = read_manual_references(args.manual_references_path)
requests_cache.install_cache(args.requests_cache_path, include_get_headers=True)
cache = requests_cache.get_cache()
if args.clear_requests_cache:
logging.info('Clearing requests-cache')
requests_cache.clear()
logging.info(f'requests-cache starting with {len(cache.responses)} cached responses')
csl_items = list()
failures = list()
for citation in citation_df.standard_citation.unique():
if citation in manual_refs:
csl_items.append(manual_refs[citation])
continue
try:
citeproc = citation_to_citeproc(citation)
csl_items.append(citeproc)
except Exception as error:
logging.exception(f'Citeproc retrieval failure for {citation}')
failures.append(citation)
logging.info(f'requests-cache finished with {len(cache.responses)} cached responses')
requests_cache.uninstall_cache()
if failures:
message = 'Citeproc retrieval failed for:\n{}'.format(
'\n'.join(failures))
logging.error(message)
# Write JSON CSL bibliography for Pandoc.
with args.references_path.open('w') as write_file:
json.dump(csl_items, write_file, indent=2, ensure_ascii=False)
write_file.write('\n')
return csl_items
def main():
if os.path.isfile(BASEDIR):
sys.exit('Please remove your old configuration file at {}'.format(BASEDIR))
os.makedirs(BASEDIR, exist_ok=True)
global CONFIG
CONFIG = read_configuration(CONFFILE)
locale.setlocale(locale.LC_MONETARY, CONFIG['locale'].get('monetary', ''))
requests_cache.install_cache(cache_name='api_cache', backend='memory',
expire_after=int(CONFIG['api'].get('cache', 10)))
curses.wrapper(mainc)
def _enable_cache():
if not os.path.exists(CACHE_DIR):
os.makedirs(CACHE_DIR)
requests_cache.install_cache(CACHE_FILE)
def __init__(self, cache=False,
cache_filename="requests.cache"):
self._cache = cache
if cache:
requests_cache.install_cache(cache_filename)
self._transform_json = True
def use_requests_cache():
import requests_cache
requests_cache.install_cache('test_cache')
def pytest_runtest_setup(item):
# called for running each test in 'a' directory
import requests_cache
requests_cache.install_cache('test_cache')
def pytest_configure(config):
if config.getoption('--use-cache'):
import requests_cache
requests_cache.install_cache('test_cache')
api = Api()
pytest.game_ids = api.GetSeasonGameIDs('2009-10', 'Regular Season')[:2] # Hack to carry the gameids to tests
pytest.game_ids = ['0020900292']
def fetch(outfile):
"""The main function for downloading all scripts from github."""
if not os.path.exists(REQUESTS_CACHE):
os.makedirs(REQUESTS_CACHE)
requests_cache.install_cache(REQUESTS_CACHE)
result = []
label_counts = defaultdict(int)
print('Fetching scripts')
for label, url in DATA_URLS.items():
print(url)
scripts = fetch_scripts(url)
for script in scripts:
try:
result.append({
'tree': build_tree(script), 'metadata': {'label': label}
})
label_counts[label] += 1
except Exception as err:
print(err)
print('Label counts: ', label_counts)
print('Dumping scripts')
with open(outfile, 'wb') as file_handler:
pickle.dump(result, file_handler)
def setup_cache():
global SESSION
if get_setting_value('USE_CACHE'):
import requests_cache
requests_cache.install_cache(cache_name='gdc_cache', backend='sqlite', expire_after=18000)
# import cachecontrol
# from cachecontrol.caches import FileCache
# SESSION = cachecontrol.CacheControl(requests.Session(), cache=FileCache('.web_cache', forever=True))
#else:
# SESSION = requests.Session()
def set_caching(self, cache_db='mygene_cache', verbose=True, **kwargs):
''' Installs a local cache for all requests.
**cache_db** is the path to the local sqlite cache database.'''
if caching_avail:
requests_cache.install_cache(cache_name=cache_db, allowable_methods=('GET', 'POST'), **kwargs)
self._cached = True
if verbose:
print('[ Future queries will be cached in "{0}" ]'.format(os.path.abspath(cache_db + '.sqlite')))
else:
print("Error: The requests_cache python module is required to use request caching.")
print("See - https://requests-cache.readthedocs.io/en/latest/user_guide.html#installation")
return
def __init__(self, key, version, cache_name=None, backend=None, **backend_options):
self.__key = key
self.__version = version
self.__endpoint = 'https://opendata.resas-portal.go.jp'
if cache_name is not None:
requests_cache.install_cache(cache_name,
backend,
**backend_options)
def __init__(self):
with open(os.path.join(os.path.dirname(__file__), 'cafeteria.json')) as f:
self.cafeterias = json.load(f)
self.url = 'https://sio.no/mat-og-drikke/_window/mat+og+drikke+-+dagens+middag?s={}'
requests_cache.install_cache('sio', expire_after=360)
def get_from_api(url, params=None, encoding=None, cache=False, cachename='dafault',
cache_experation=60):
"""
Common method to get infomration from a REST api that doesn't use authentication
:param url: URL for the api
:param params: the parameter for the request
:param encoding: to override the endogind
:param cache: Use cache(default False
:param cachename: Name of the cache
:param cache_experation: when do you want the cache to expire in seconds, default : 60
:return:
"""
response = requests.get(url, params=params)
if cache:
requests_cache.install_cache(cachename, expire_after=cache_experation)
if response.encoding is None:
if encoding is None:
response.encoding = chardet.detect(response.raw.data)['encoding']
else:
response.encoding = encoding
if response.status_code is not 200:
raise Exception('%s:%s' % (response.status_code, response.text))
try:
return json.loads(response.text)
except Exception as e:
raise Exception('Can\'t parse the json string\n %s' % url)
def run(self, cache=True):
"""Run application."""
self._query()
# configure `requests` cache
if cache:
cache_dir = appdirs.user_cache_dir('craigslist')
os.makedirs(cache_dir, exist_ok=True)
requests_cache.install_cache(
cache_name=os.path.join(cache_dir, 'craigslist'),
expire_after=timedelta(hours=0.5))
print('Running query...\n')
# record the start time
start = time.time()
self.prices = self._getprices()
# determine elapsed time of queries
self.duration = time.time() - start
# remove expired cache entries
if cache:
requests_cache.core.remove_expired_responses()
# print statistics (if any price data exists)
if self.prices:
self._print()
else:
print('Nothing found for that search.')
def __init__(self):
# Configure logging
logging.getLogger("requests").setLevel(logging.WARNING)
self.logger = logging.getLogger('tenma')
# Setup requests caching
requests_cache.install_cache('./media/CACHE/comicvine-cache', expire_after=1800)
requests_cache.core.remove_expired_responses()
# Set basic reusable strings
self.api_key = Settings.get_solo().api_key
self.directory_path = 'files'
# API Strings
self.baseurl = 'https://comicvine.gamespot.com/api/'
self.imageurl = 'https://comicvine.gamespot.com/api/image/'
self.base_params = { 'format': 'json', 'api_key': self.api_key }
self.headers = { 'user-agent': 'tenma' }
# API field strings
self.arc_fields = 'deck,description,id,image,name,site_detail_url'
self.character_fields = 'deck,description,id,image,name,site_detail_url'
self.creator_fields = 'deck,description,id,image,name,site_detail_url'
self.issue_fields = 'api_detail_url,character_credits,cover_date,deck,description,id,image,issue_number,name,person_credits,site_detail_url,story_arc_credits,team_credits,volume'
self.publisher_fields = 'deck,description,id,image,name,site_detail_url'
self.query_issue_fields ='cover_date,id,issue_number,name,volume'
self.query_issue_limit = '100'
self.series_fields = 'api_detail_url,deck,description,id,name,publisher,site_detail_url,start_year'
self.team_fields = 'characters,deck,description,id,image,name,site_detail_url'
# International reprint publishers
# Ordered by # of issues (est.) for quick matching.
self.int_pubs = [
2350, # Panini (21.5k)
2812, # Marvel UK (4.2k)
2094, # Abril (2.1k)
2319, # Planeta DeAgostini (2.1k)
2903, # Ediciones Zinco (0.7k)
1133, # Semic As (0.3k)
2961, # Marvel Italia (0.04k)
]
#==================================================================================================
def main():
parser = argparse.ArgumentParser(description='Generates CWL files from the GATK documentation')
parser.add_argument("--version", "-v", dest='gatkversion', default="3.5",
help="Sets the version of GATK to parse documentation for. Default is 3.5")
parser.add_argument('--out', "-o", dest='outputdir',
help="Sets the output directory for generated files. Default is ./gatk_cmdline_tools/<VERSION>/")
parser.add_argument('--include', dest='include_file',
help="Only generate this file (note, CommandLinkGATK has to be generated for v3.x)")
parser.add_argument("--dev", dest="dev", action="store_true",
help="Enable network caching and overwriting of the generated files (for development purposes). " +
"Requires requests_cache to be installed")
parser.add_argument("--docker_container_name", "-c", dest="docker_container_name",
help="Docker container name for generated cwl files. Default is 'broadinstitute/gatk3:<VERSION>' " +
"for version 3.x and 'broadinstitute/gatk:<VERSION>' for 4.x")
parser.add_argument("--gatk_location", "-l", dest="gatk_location",
help="Location of the gatk jar file. Default is '/usr/GenomeAnalysisTK.jar' for gatk 3.x and '/gatk/gatk.jar' for gatk 4.x")
cmd_line_options = parser.parse_args()
if cmd_line_options.dev:
import requests_cache
requests_cache.install_cache() # Decreases the time to run dramatically
if not cmd_line_options.outputdir:
cmd_line_options.outputdir = os.getcwd() + '/gatk_cmdline_tools/' + cmd_line_options.gatkversion
if not cmd_line_options.docker_container_name:
if is_version_3(cmd_line_options.gatkversion):
cmd_line_options.docker_container_name = "broadinstitute/gatk3:" + cmd_line_options.gatkversion
else:
cmd_line_options.docker_container_name = "broadinstitute/gatk:" + cmd_line_options.gatkversion
if not cmd_line_options.gatk_location:
if is_version_3(cmd_line_options.gatkversion):
cmd_line_options.gatk_location = "/usr/GenomeAnalysisTK.jar"
else:
cmd_line_options.gatk_location = "/gatk/gatk.jar"
print("Your chosen directory is: %s" % cmd_line_options.outputdir)
grouped_urls = get_json_links(cmd_line_options.gatkversion)
generate_cwl_and_json_files(cmd_line_options.outputdir, grouped_urls, cmd_line_options)
def link_crawler(start_url, link_regex, robots_url=None, user_agent='wswp',
proxies=None, delay=3, max_depth=4, num_retries=2, expires=timedelta(days=30)):
""" Crawl from the given start URL following links matched by link_regex. In the current
implementation, we do not actually scrapy any information.
args:
start_url (str): web site to start crawl
link_regex (str): regex to match for links
kwargs:
robots_url (str): url of the site's robots.txt (default: start_url + /robots.txt)
user_agent (str): user agent (default: wswp)
proxies (list of dicts): a list of possible dicts for http / https proxies
For formatting, see the requests library
delay (int): seconds to throttle between requests to one domain (default: 3)
max_depth (int): maximum crawl depth (to avoid traps) (default: 4)
num_retries (int): # of retries when 5xx error (default: 2)
expires (timedelta): timedelta for cache expirations (default: 30 days)
"""
crawl_queue = [start_url]
# keep track which URL's have seen before
seen = {}
requests_cache.install_cache(backend='redis', expire_after=expires)
if not robots_url:
robots_url = '{}/robots.txt'.format(start_url)
rp = get_robots_parser(robots_url)
D = Downloader(delay=delay, user_agent=user_agent, proxies=proxies)
while crawl_queue:
url = crawl_queue.pop()
# check url passes robots.txt restrictions
if rp.can_fetch(user_agent, url):
depth = seen.get(url, 0)
if depth == max_depth:
print('Skipping %s due to depth' % url)
continue
html = D(url, num_retries=num_retries)
if not html:
continue
# TODO: add actual data scraping here
# filter for links matching our regular expression
for link in get_links(html):
if re.match(link_regex, link):
abs_link = urljoin(start_url, link)
if abs_link not in seen:
seen[abs_link] = depth + 1
crawl_queue.append(abs_link)
else:
print('Blocked by robots.txt:', url)
requests_cache_link_crawler.py 文件源码
项目:Python-Web-Scraping-Second-Edition
作者: PacktPublishing
项目源码
文件源码
阅读 21
收藏 0
点赞 0
评论 0
def link_crawler(start_url, link_regex, robots_url=None, user_agent='wswp',
proxies=None, delay=3, max_depth=4, num_retries=2, expires=timedelta(days=30)):
""" Crawl from the given start URL following links matched by link_regex. In the current
implementation, we do not actually scrapy any information.
args:
start_url (str): web site to start crawl
link_regex (str): regex to match for links
kwargs:
robots_url (str): url of the site's robots.txt (default: start_url + /robots.txt)
user_agent (str): user agent (default: wswp)
proxies (list of dicts): a list of possible dicts for http / https proxies
For formatting, see the requests library
delay (int): seconds to throttle between requests to one domain (default: 3)
max_depth (int): maximum crawl depth (to avoid traps) (default: 4)
num_retries (int): # of retries when 5xx error (default: 2)
expires (timedelta): timedelta for cache expirations (default: 30 days)
"""
crawl_queue = [start_url]
# keep track which URL's have seen before
seen = {}
requests_cache.install_cache(backend='redis', expire_after=expires)
if not robots_url:
robots_url = '{}/robots.txt'.format(start_url)
rp = get_robots_parser(robots_url)
D = Downloader(delay=delay, user_agent=user_agent, proxies=proxies)
while crawl_queue:
url = crawl_queue.pop()
# check url passes robots.txt restrictions
if rp.can_fetch(user_agent, url):
depth = seen.get(url, 0)
if depth == max_depth:
print('Skipping %s due to depth' % url)
continue
html = D(url, num_retries=num_retries)
if not html:
continue
# TODO: add actual data scraping here
# filter for links matching our regular expression
for link in get_links(html):
if re.match(link_regex, link):
abs_link = urljoin(start_url, link)
if abs_link not in seen:
seen[abs_link] = depth + 1
crawl_queue.append(abs_link)
else:
print('Blocked by robots.txt:', url)
def scrape(folder=None):
"""
Returns data in the format:
{
node_id: {
channels: [channel_name, ...],
version: string,
registries: {
histogram: [path, ...]
event: [path, ...]
scalar: [path, ...]
}
},
...
}
"""
if folder is None:
folder = tempfile.mkdtemp()
error_cache = load_error_cache(folder)
requests_cache.install_cache(os.path.join(folder, 'probe_scraper_cache'))
results = defaultdict(dict)
for channel in CHANNELS.iterkeys():
tags = load_tags(channel)
versions = extract_tag_data(tags, channel)
save_error_cache(folder, error_cache)
print "\n" + channel + " - extracted version data:"
for v in versions:
print " " + str(v)
print "\n" + channel + " - loading files:"
for v in versions:
print " from: " + str(v)
files = download_files(channel, v['node'], folder, error_cache)
results[channel][v['node']] = {
'channel': channel,
'version': v['version'],
'registries': files,
}
save_error_cache(folder, error_cache)
return results