def make_vcr():
cassette_library_dir = os.path.join(os.path.dirname(__file__),
'fixtures',
'cassettes')
return VCR(cassette_library_dir=cassette_library_dir,
filter_query_parameters=[
'key',
'oauth_consumer_key',
'oauth_nonce',
'oauth_signature_method',
'oauth_timestamp',
'oauth_token',
'oauth_version',
'oauth_signature',
],
record_mode='new_episodes')
python类VCR的实例源码
def vcr(self):
"""
Returns a new vcrpy instance.
"""
cassettes_dir = join(dirname(getfile(self.__class__)), 'cassettes')
kwargs = {
'record_mode': getattr(self, 'vcrpy_record_mode', 'once'),
'cassette_library_dir': cassettes_dir,
'match_on': ['method', 'scheme', 'host', 'port', 'path', 'query'],
'filter_query_parameters': FILTER_QUERY_PARAMS,
'filter_post_data_parameters': FILTER_QUERY_PARAMS,
'before_record_response': IGittTestCase.remove_link_headers,
'filter_headers': ['Link'],
}
kwargs.update(self.vcr_options)
return VCR(**kwargs)
def _post_recording_scrub(self):
""" Perform post-recording cleanup on the YAML file that can't be accomplished with the
VCR recording hooks. """
src_path = self.cassette_path
rg_name = getattr(self, 'resource_group', None)
rg_original = getattr(self, 'resource_group_original', None)
t = tempfile.NamedTemporaryFile('r+')
with open(src_path, 'r') as f:
for line in f:
# scrub resource group names
if rg_original and rg_name != rg_original:
line = line.replace(rg_name, rg_original)
# omit bearer tokens
if 'authorization:' not in line.lower():
t.write(line)
t.seek(0)
with open(src_path, 'w') as f:
for line in t:
f.write(line)
t.close()
# COMMAND METHODS
def _post_recording_scrub(self):
""" Perform post-recording cleanup on the YAML file that can't be accomplished with the
VCR recording hooks. """
src_path = self.cassette_path
rg_name = getattr(self, 'resource_group', None)
rg_original = getattr(self, 'resource_group_original', None)
t = tempfile.NamedTemporaryFile('r+')
with open(src_path, 'r') as f:
for line in f:
# scrub resource group names
if rg_name != rg_original:
line = line.replace(rg_name, rg_original)
# omit bearer tokens
if 'authorization:' not in line.lower():
t.write(line)
t.seek(0)
with open(src_path, 'w') as f:
for line in t:
f.write(line)
t.close()
# COMMAND METHODS
def __init__(self, # pylint: disable=too-many-arguments
method_name, config_file=None, recording_dir=None, recording_name=None, recording_processors=None,
replay_processors=None, recording_patches=None, replay_patches=None):
super(ReplayableTest, self).__init__(method_name)
self.recording_processors = recording_processors or []
self.replay_processors = replay_processors or []
self.recording_patches = recording_patches or []
self.replay_patches = replay_patches or []
self.config = TestConfig(config_file=config_file)
self.disable_recording = False
test_file_path = inspect.getfile(self.__class__)
recording_dir = recording_dir or os.path.join(os.path.dirname(test_file_path), 'recordings')
self.is_live = self.config.record_mode
self.vcr = vcr.VCR(
cassette_library_dir=recording_dir,
before_record_request=self._process_request_recording,
before_record_response=self._process_response_recording,
decode_compressed_response=True,
record_mode='once' if not self.is_live else 'all',
filter_headers=self.FILTER_HEADERS
)
self.vcr.register_matcher('query', self._custom_request_query_matcher)
self.recording_file = os.path.join(
recording_dir,
'{}.yaml'.format(recording_name or method_name)
)
if self.is_live and os.path.exists(self.recording_file):
os.remove(self.recording_file)
self.in_recording = self.is_live or not os.path.exists(self.recording_file)
self.test_resources_count = 0
self.original_env = os.environ.copy()
def pytest_configure(config):
# register the online marker
config.addinivalue_line('markers',
'online: mark a test that goes online. VCR will automatically be used.')
def _get_vcr(self):
testdir = os.path.dirname(inspect.getfile(self.__class__))
cassettes_dir = os.path.join(testdir, 'cassettes')
return vcr.VCR(
record_mode=self.vcrpy_record_mode,
cassette_library_dir=cassettes_dir,
match_on=['method', 'scheme', 'host', 'port', 'path'],
filter_query_parameters=FILTER_QUERY_PARAMS,
filter_post_data_parameters=FILTER_QUERY_PARAMS,
before_record_response=GitmateTestCase.remove_link_headers)
def get_vcr(filepath):
return vcr.VCR(
path_transformer=vcr.VCR.ensure_suffix('.yaml'),
match_on=("method", "scheme", "host", "port", "path", "query", "body"),
cassette_library_dir=os.path.splitext(filepath)[0]
)
def __init__(self, test_file, test_name, run_live=False, debug=False, debug_vcr=False,
skip_setup=False, skip_teardown=False):
super(VCRTestBase, self).__init__(test_name)
self.test_name = test_name
self.recording_dir = find_recording_dir(test_file)
self.cassette_path = os.path.join(self.recording_dir, '{}.yaml'.format(test_name))
self.playback = os.path.isfile(self.cassette_path)
if os.environ.get(ENV_LIVE_TEST, None) == 'True':
self.run_live = True
else:
self.run_live = run_live
self.skip_setup = skip_setup
self.skip_teardown = skip_teardown
self.success = False
self.exception = None
self.track_commands = os.environ.get(COMMAND_COVERAGE_CONTROL_ENV, None)
self._debug = debug
if not self.playback and ('--buffer' in sys.argv) and not run_live:
self.exception = CLIError('No recorded result provided for {}.'.format(self.test_name))
if debug_vcr:
logging.basicConfig()
vcr_log = logging.getLogger('vcr')
vcr_log.setLevel(logging.INFO)
self.my_vcr = vcr.VCR(
cassette_library_dir=self.recording_dir,
before_record_request=self._before_record_request,
before_record_response=self._before_record_response,
decode_compressed_response=True
)
self.my_vcr.register_matcher('custom', _custom_request_matcher)
self.my_vcr.match_on = ['custom']
def __init__(self, cli, method_name, filter_headers=None):
super(ScenarioTest, self).__init__(cli, method_name)
self.name_replacer = GeneralNameReplacer()
self.recording_processors = [LargeRequestBodyProcessor(),
LargeResponseBodyProcessor(),
self.name_replacer]
self.replay_processors = [LargeResponseBodyReplacer()]
self.filter_headers = filter_headers or []
test_file_path = inspect.getfile(self.__class__)
recordings_dir = find_recording_dir(test_file_path)
live_test = os.environ.get(ENV_LIVE_TEST, None) == 'True'
self.vcr = vcr.VCR(
cassette_library_dir=recordings_dir,
before_record_request=self._process_request_recording,
before_record_response=self._process_response_recording,
decode_compressed_response=True,
record_mode='once' if not live_test else 'all',
filter_headers=self.filter_headers
)
self.vcr.register_matcher('query', self._custom_request_query_matcher)
self.recording_file = os.path.join(recordings_dir, '{}.yaml'.format(method_name))
if live_test and os.path.exists(self.recording_file):
os.remove(self.recording_file)
self.in_recording = live_test or not os.path.exists(self.recording_file)
self.test_resources_count = 0
self.original_env = os.environ.copy()
def __init__(self, test_file, test_name, run_live=False, debug=False, debug_vcr=False,
skip_setup=False, skip_teardown=False):
super(VCRTestBase, self).__init__(test_name)
self.test_name = test_name
self.recording_dir = os.path.join(os.path.dirname(test_file), 'recordings')
self.cassette_path = os.path.join(self.recording_dir, '{}.yaml'.format(test_name))
self.playback = os.path.isfile(self.cassette_path)
if os.environ.get(LIVE_TEST_CONTROL_ENV, None) == 'True':
self.run_live = True
else:
self.run_live = run_live
self.skip_setup = skip_setup
self.skip_teardown = skip_teardown
self.success = False
self.exception = None
self.track_commands = os.environ.get(COMMAND_COVERAGE_CONTROL_ENV, None)
self._debug = debug
if not self.playback and ('--buffer' in sys.argv) and not run_live:
self.exception = CLIError('No recorded result provided for {}.'.format(self.test_name))
if debug_vcr:
import logging
logging.basicConfig()
vcr_log = logging.getLogger('vcr')
vcr_log.setLevel(logging.INFO)
self.my_vcr = vcr.VCR(
cassette_library_dir=self.recording_dir,
before_record_request=self._before_record_request,
before_record_response=self._before_record_response,
decode_compressed_response=True
)
self.my_vcr.register_matcher('custom', _custom_request_matcher)
self.my_vcr.match_on = ['custom']
def record_requests(request):
# vcr interceptor
global REQUESTS
global REQUESTID
if type(request) == dict:
thistype = 'response'
else:
REQUESTID += 1
thistype = 'request'
if thistype == 'request':
ydata = {}
ydata['request'] = {}
ydata['request']['method'] = request.method
ydata['request']['uri'] = request.uri
if request.headers:
ydata['request']['headers'] = {}
for x in request.headers.items():
ydata['request']['headers'][x[0]] = x[1]
else:
ydata['request']['headers'] = request.headers
ydata['request']['body'] = literal(pretty_xml(request.body))
REQUESTS.append(ydata)
else:
ydata = REQUESTS[-1]
ydata['response'] = request.copy()
REQUESTS[-1]['response'] = request.copy()
REQUESTS[-1]['response']['body'] = literal(pretty_xml(request['body']['string']))
fid = str(REQUESTID).rjust(4, '0')
fname = os.path.join('/tmp', 'fixtures', '%s.yml' % (fid))
with open(fname, 'wb') as f:
yaml.dump(REQUESTS[-1], f, width=1000, indent=2)
return request
# Make a custom VCR instance so we can inject our spyware into it
def _run(self, args, working_dir, output_filename, request_json_builder = None, live_request_json_builder = None):
'''
Runs the test given the command line arguments, and the output filename.
:type args: list(str) representing the components of the command line argument.
:c output_filename: str represents the filename of the file to write the command line output to.
'''
self.logger.info("%s" % (' '.join(args)))
self._reset_output_file(output_filename)
def match_um_requests_on(r1, r2):
'''
Custom uri matcher for use with vcrpy. Basically it provides custom matching for user and action UM
requests, which ignores the org ID portion of the request. Otherwise, the request uri must match exactly.
:param r1:
:param r2:
:return:
'''
if re.match(self.test_suite_config['users_request_matcher'], r1.uri):
if re.match(self.test_suite_config['users_request_matcher'], r2.uri):
return True
if re.match(self.test_suite_config['actions_request_matcher'], r2.uri):
if re.match(self.test_suite_config['actions_request_matcher'], r2.uri):
return True
return r1.uri == r2.uri
record_mode = 'all' if self.server_config['live_mode'] else 'none'
mock_spec = 'proxy' if self.server_config['live_mode'] else 'playback'
recorder = vcr.VCR(
record_mode=record_mode,
match_on=['um_request'],
# match_on=match_um_requests_on,
decode_compressed_response=True,
filter_headers=['authorization']
)
recorder.register_matcher('um_request',match_um_requests_on)
with recorder.use_cassette(self.server_config['cassette_filename']) as cassette:
service = server.TestService(self.server_config, cassette, request_json_builder)
service.run()
with open(output_filename, 'w') as output_file:
subprocess.call(args,
cwd=working_dir,
env=dict(os.environ, UMAPI_MOCK=mock_spec),
shell=IS_NT_PLATFORM,
stdin=None,
stdout=output_file,
stderr=output_file)
# p = subprocess.Popen(args, cwd=working_dir, stdout=output_file, stderr=output_file)
# output_bytes = subprocess.check_output(cmd, cwd=working_dir, shell=True)
# output_file.write(output_bytes.decode())
# p.communicate()
service.stop()
if live_request_json_builder:
for stored_request, stored_response in cassette.data:
if stored_request.body:
live_request_json_builder.extend_with_json_string(stored_request.body)
def use_vcr(request, monkeypatch):
"""
This fixture is applied automatically to any test using the `online` mark. It will record and playback network
sessions using VCR.
The record mode of VCR can be set using the VCR_RECORD_MODE environment variable when running tests.
"""
if VCR_RECORD_MODE == 'off':
yield None
else:
path_segments = [VCR_CASSETTE_DIR]
if request.module is not None:
path_segments.extend(request.module.__name__.split('tests.')[-1].split('.'))
if request.cls is not None:
path_segments.append(request.cls.__name__)
# Take into account the parametrization set by pytest
# to create many tests from a single function with many parameters.
request_keywords = request.keywords.keys()
if 'parametrize' in request_keywords:
try:
param_name = [x for x in request_keywords if x.startswith('_with_')][0]
except IndexError:
param_name = ''
else:
param_name = ''
path_segments.append(request.function.__name__ + param_name + '.yaml')
cassette_path = os.path.join(*path_segments)
filter_query = [('applicationId', 'XXXXXX')]
filter_headers = [('Authorization', 'ESA XXXXXX')]
filter_post = []
online = True
if vcr.record_mode == 'none':
online = False
elif vcr.record_mode == 'once':
online = not os.path.exists(cassette_path)
# If we are going online, check the credentials
if online:
if os.environ.get("RAKUTEN_APP_ID", None) is None:
pytest.skip('need credentials to run this test')
with vcr.use_cassette(path=cassette_path,
filter_query_parameters=filter_query,
filter_headers=filter_headers,
filter_post_data_parameters=filter_post,
decode_compressed_response=True) as cassette:
yield cassette