def test_submit_media_exceptions(self, response):
"""
Tests media submission exceptions
"""
responses.add(
responses.HEAD,
u'https://s3.amazonaws.com/bkt/video.mp4',
headers={'Content-Type': u'video/mp4'},
status=200,
)
responses.add(responses.GET, u'https://api.3playmedia.com/caption_imports/available_languages', **{
'status': 200,
'body': json.dumps([{
"iso_639_1_code": "en",
"language_id": 1,
}])
})
responses.add(responses.POST, u'https://api.3playmedia.com/files', **response)
three_play_client = ThreePlayMediaClient(**self.video_transcript_preferences)
with self.assertRaises(ThreePlayMediaPerformTranscriptionError):
three_play_client.submit_media()
python类HEAD的实例源码
def test_generate_transcripts_exceptions(self, first_response, second_response, third_response, mock_log):
"""
Tests the proper exceptions during transcript generation.
"""
responses.add(responses.HEAD, u'https://s3.amazonaws.com/bkt/video.mp4', **first_response)
responses.add(
responses.GET, u'https://api.3playmedia.com/caption_imports/available_languages', **second_response
)
responses.add(responses.POST, u'https://api.3playmedia.com/files', **third_response)
three_play_client = ThreePlayMediaClient(**self.video_transcript_preferences)
three_play_client.generate_transcripts()
self.assertFalse(mock_log.info.called)
mock_log.exception.assert_called_with(
u'[3PlayMedia] Could not process transcripts for video=%s source_language=%s.',
VIDEO_DATA['studio_id'],
VIDEO_DATA['source_language'],
)
self.assertEqual(TranscriptProcessMetadata.objects.count(), 0)
def set_up_glove(url: str, byt: bytes, change_etag_every: int = 1000):
# Mock response for the datastore url that returns glove vectors
responses.add(
responses.GET,
url,
body=byt,
status=200,
content_type='application/gzip',
stream=True,
headers={'Content-Length': str(len(byt))}
)
etags_left = change_etag_every
etag = "0"
def head_callback(_):
"""
Writing this as a callback allows different responses to different HEAD requests.
In our case, we're going to change the ETag header every `change_etag_every`
requests, which will allow us to simulate having a new version of the file.
"""
nonlocal etags_left, etag
headers = {"ETag": etag}
# countdown and change ETag
etags_left -= 1
if etags_left <= 0:
etags_left = change_etag_every
etag = str(int(etag) + 1)
return (200, headers, "")
responses.add_callback(
responses.HEAD,
url,
callback=head_callback
)
def test_validate_media_url(self, response):
"""
Tests media url validations.
"""
responses.add(responses.HEAD, u'https://s3.amazonaws.com/bkt/video.mp4', **response)
three_play_client = ThreePlayMediaClient(**self.video_transcript_preferences)
with self.assertRaises(ThreePlayMediaUrlError):
three_play_client.validate_media_url()
def test_remove_from_search_after_sync(self):
"""When an image is removed from the source, it should be removed from the search engine"""
self._index_img(self.removed)
s = self.s.query(Q("match", title="removed"))
r = s.execute()
self.assertEquals(1, r.hits.total)
with responses.RequestsMock() as rsps:
rsps.add(responses.HEAD, FOREIGN_URL + TEST_IMAGE_REMOVED, status=404)
self.removed.sync()
signals._update_search_index(self.removed)
self.es.indices.refresh()
s = self.s.query(Q("match", title="removed"))
r = s.execute()
self.assertEquals(0, r.hits.total)
def test_push(self):
mydir = os.path.dirname(__file__)
build_path = os.path.join(mydir, './build_simple.yml')
command.build('foo/bar', build_path)
pkg_obj = store.PackageStore.find_package('foo', 'bar')
pkg_hash = pkg_obj.get_hash()
assert pkg_hash
contents = pkg_obj.get_contents()
all_hashes = set(find_object_hashes(contents))
upload_urls = {
blob_hash: dict(
head="https://example.com/head/{owner}/{hash}".format(owner='foo', hash=blob_hash),
put="https://example.com/put/{owner}/{hash}".format(owner='foo', hash=blob_hash)
) for blob_hash in all_hashes
}
# We will push the package twice, so we're mocking all responses twice.
for blob_hash in all_hashes:
urls = upload_urls[blob_hash]
# First time the package is pushed, s3 HEAD 404s, and we get a PUT.
self.requests_mock.add(responses.HEAD, urls['head'], status=404)
self.requests_mock.add(responses.PUT, urls['put'])
# Second time, s3 HEAD succeeds, and we're not expecting a PUT.
self.requests_mock.add(responses.HEAD, urls['head'])
self._mock_put_package('foo/bar', pkg_hash, upload_urls)
self._mock_put_tag('foo/bar', 'latest')
self._mock_put_package('foo/bar', pkg_hash, upload_urls)
self._mock_put_tag('foo/bar', 'latest')
# Push a new package.
command.push('foo/bar')
# Push it again; this time, we're verifying that there are no s3 uploads.
command.push('foo/bar')
def setup_recording(self, **kwargs):
_logger.info("recording ...")
self.responses.reset()
all_requests_re = re.compile("http.*")
methods = (responses.GET, responses.POST, responses.PUT,
responses.PATCH, responses.DELETE, responses.HEAD,
responses.OPTIONS)
for http_method in methods:
self.responses.add_callback(
http_method, all_requests_re,
match_querystring=False,
callback=self.record())
def test_get_from_cache(self):
url = 'http://fake.datastore.com/glove.txt.gz'
set_up_glove(url, self.glove_bytes, change_etag_every=2)
filename = get_from_cache(url, cache_dir=self.TEST_DIR)
assert filename == os.path.join(self.TEST_DIR, url_to_filename(url, etag="0"))
# We should have made one HEAD request and one GET request.
method_counts = Counter(call.request.method for call in responses.calls)
assert len(method_counts) == 2
assert method_counts['HEAD'] == 1
assert method_counts['GET'] == 1
# And the cached file should have the correct contents
with open(filename, 'rb') as cached_file:
assert cached_file.read() == self.glove_bytes
# A second call to `get_from_cache` should make another HEAD call
# but not another GET call.
filename2 = get_from_cache(url, cache_dir=self.TEST_DIR)
assert filename2 == filename
method_counts = Counter(call.request.method for call in responses.calls)
assert len(method_counts) == 2
assert method_counts['HEAD'] == 2
assert method_counts['GET'] == 1
with open(filename2, 'rb') as cached_file:
assert cached_file.read() == self.glove_bytes
# A third call should have a different ETag and should force a new download,
# which means another HEAD call and another GET call.
filename3 = get_from_cache(url, cache_dir=self.TEST_DIR)
assert filename3 == os.path.join(self.TEST_DIR, url_to_filename(url, etag="1"))
method_counts = Counter(call.request.method for call in responses.calls)
assert len(method_counts) == 2
assert method_counts['HEAD'] == 3
assert method_counts['GET'] == 2
with open(filename3, 'rb') as cached_file:
assert cached_file.read() == self.glove_bytes