def auth():
# Basic Authentication
requests.get('https://api.github.com/user', auth=HTTPBasicAuth('user', 'pass'))
requests.get('https://api.github.com/user', auth=('user', 'pass'))
# Digest Authentication
url = 'http://httpbin.org/digest-auth/auth/user/pass'
requests.get(url, auth=HTTPDigestAuth('user', 'pass'))
# OAuth2 Authentication????requests-oauthlib
url = 'https://api.twitter.com/1.1/account/verify_credentials.json'
auth = OAuth2('YOUR_APP_KEY', 'YOUR_APP_SECRET', 'USER_OAUTH_TOKEN')
requests.get(url, auth=auth)
pass
python类HTTPDigestAuth()的实例源码
def _auth(http_auth):
if not http_auth:
return None
if http_auth['auth_type'] == 'basic':
return HTTPBasicAuth(
http_auth['username'],
http_auth['password']
)
if http_auth['auth_type'] == 'digest':
return HTTPDigestAuth(
http_auth['username'],
http_auth['password']
)
raise Exception('Authorization information is not valid.')
def update_membership_status():
r = requests.get(
'https://register.utn.se/api.php',
auth=HTTPDigestAuth(settings.MEMBERSHIP_API_USER,
settings.MEMBERSHIP_API_PASSWORD),
params={
'action': 'list',
},
)
try:
data = r.json()
except ValueError:
return
for member in Member.objects.all():
if member.person_number().replace('-', '') in data:
member.update_status(data='member')
else:
member.update_status(data='nonmember')
Member.objects.filter(pk=member.pk).update(
status=member.status, status_changed=member.status_changed
)
def __init__(self):
CradlepointClient.__init__(self)
# save the last 'request' and 'response' - init in the base class
# self.last_url = None, self.last_reply = None
# this is the base URL used by request calls,
# and will be like "http://192.168.1.1/api"
# self.router_ip = None
self.base_url = None
# self.digest will hold the HTTPDigestAuth(),
# ONCE the user/password are properly set
self.digest = None
return
def set_user_password(self, user_name, password):
"""
Create the DIGEST authentication handler, which handles the
Cradlepoint's challenge/response to Router API (CURL-like) calls.
:param str user_name:
:param str password:
:return:
"""
from requests.auth import HTTPDigestAuth
assert user_name is not None and isinstance(user_name, str)
assert password is not None and isinstance(password, str)
self.digest = HTTPDigestAuth(user_name, password)
# if self._logger is not None:
# self._logger.debug("CSClient() set user={}, pass=*****".format(
# user_name))
return
def get_devices(self):
'''
Gets all devices and configs from Indigo.
Returns:
devices (dict): A dict of all devices from Indigo
'''
devices = {}
url = self.url + '/devices.json'
device_list = requests.get(url,auth=HTTPDigestAuth(self.user, self.password), verify=False).json()
for d in device_list:
url = self.url + d.get('restURL')
ff_name = 'indigo-%s' % d.get('name')
devices[ff_name] = requests.get(url,auth=HTTPDigestAuth(self.user, self.password), verify=False).json()
devices[ff_name]['restURL'] = d.get('restURL')
return devices
def __init__(self, hass, device_info):
"""Initialize a generic camera."""
super().__init__()
self.hass = hass
self._authentication = device_info.get(CONF_AUTHENTICATION)
self._name = device_info.get(CONF_NAME)
self._still_image_url = device_info[CONF_STILL_IMAGE_URL]
self._still_image_url.hass = hass
self._limit_refetch = device_info[CONF_LIMIT_REFETCH_TO_URL_CHANGE]
username = device_info.get(CONF_USERNAME)
password = device_info.get(CONF_PASSWORD)
if username and password:
if self._authentication == HTTP_DIGEST_AUTHENTICATION:
self._auth = HTTPDigestAuth(username, password)
else:
self._auth = aiohttp.BasicAuth(username, password=password)
else:
self._auth = None
self._last_url = None
self._last_image = None
def target_function(self, url, user, password):
name = threading.current_thread().name
user = user.encode('utf-8').strip()
password = password.encode('utf-8').strip()
response = http_request(method="GET", url=url, auth=HTTPDigestAuth(user, password))
if response is not None and response.status_code != 401:
print_success("Target: {}:{} {}: Authentication Succeed - Username: '{}' Password: '{}'".format(self.target, self.port, name, user, password), verbose=self.verbosity)
self.credentials.append((self.target, self.port, user, password))
if self.stop_on_success:
raise StopThreadPoolExecutor
else:
print_error("Target: {}:{} {}: Authentication Failed - Username: '{}' Password: '{}'".format(self.target, self.port, name, user, password), verbose=self.verbosity)
def target_function(self, url, creds):
name = threading.current_thread().name
user, password = creds
user = user.encode('utf-8').strip()
password = password.encode('utf-8').strip()
response = http_request(method="GET", url=url, auth=HTTPDigestAuth(user, password))
if response is not None and response.status_code != 401:
print_success("Target: {}:{} {}: Authentication Succeed - Username: '{}' Password: '{}'".format(self.target, self.port, name, user, password), verbose=self.verbosity)
self.credentials.append((self.target, self.port, user, password))
if self.stop_on_success:
raise StopThreadPoolExecutor
else:
print_error("Target: {}:{} {}: Authentication Failed - Username: '{}' Password: '{}'".format(self.target, self.port, name, user, password), verbose=self.verbosity)
def target_function(self, url, user, password):
name = threading.current_thread().name
user = user.encode('utf-8').strip()
password = password.encode('utf-8').strip()
response = http_request(method="GET", url=url, auth=HTTPDigestAuth(user, password))
if response is not None and response.status_code != 401:
print_success("Target: {}:{} {}: Authentication Succeed - Username: '{}' Password: '{}'".format(self.target, self.port, name, user, password), verbose=self.verbosity)
self.credentials.append((self.target, self.port, user, password))
if self.stop_on_success:
raise StopThreadPoolExecutor
else:
print_error("Target: {}:{} {}: Authentication Failed - Username: '{}' Password: '{}'".format(self.target, self.port, name, user, password), verbose=self.verbosity)
def target_function(self, url, creds):
name = threading.current_thread().name
user, password = creds
user = user.encode('utf-8').strip()
password = password.encode('utf-8').strip()
response = http_request(method="GET", url=url, auth=HTTPDigestAuth(user, password))
if response is not None and response.status_code != 401:
print_success("Target: {}:{} {}: Authentication Succeed - Username: '{}' Password: '{}'".format(self.target, self.port, name, user, password), verbose=self.verbosity)
self.credentials.append((self.target, self.port, user, password))
if self.stop_on_success:
raise StopThreadPoolExecutor
else:
print_error("Target: {}:{} {}: Authentication Failed - Username: '{}' Password: '{}'".format(self.target, self.port, name, user, password), verbose=self.verbosity)
def http_get(self,ip,port,user,password,path,payload,auth_mode=0):
url = "http://{}:{}/{}".format(ip,port,path)
self.logger.debug("http_get: Sending: %s %s auth_mode=%d" % (url, payload, auth_mode) )
if auth_mode == 0:
auth = HTTPBasicAuth(user,password)
elif auth_mode == 1:
auth = HTTPDigestAuth(user,password)
else:
self.send_error("Unknown auth_mode '%s' for request '%s'. Must be 0 for 'digest' or 1 for 'basic'." % (auth_mode, url) )
return False
try:
response = requests.get(
url,
auth=auth,
params=payload,
timeout=5
)
# This is supposed to catch all request excpetions.
except requests.exceptions.RequestException as e:
self.send_error("Connection error for %s: %s" % (url, e))
return False
self.logger.debug("http_get: Got: code=%s", response.status_code)
if response.status_code == 200:
#self.logger.debug("http_get: Got: text=%s", response.text)
return response.text
elif response.status_code == 400:
self.send_error("Bad request: %s" % (url) )
elif response.status_code == 404:
self.send_error("Not Found: %s" % (url) )
elif response.status_code == 401:
# Authentication error
self.send_error(
"Failed to authenticate, please check your username and password")
else:
self.send_error("Unknown response %s: %s" % (response.status_code, url) )
return False
def get_books(self, url, username, password, vipGroups, picture_path,
conn_auth="basic", conn_verify=True, debug=False):
if debug: print("get_books(%s)" % url)
# url base
url_split = urllib.parse.urlparse(url)
url_base = url_split.scheme + '://' + url_split.netloc
# authentification
settings = {"verify": conn_verify}
if conn_auth == "basic":
settings["auth"] = (username, password)
elif conn_auth == "digest":
from requests.auth import HTTPDigestAuth
settings["auth"] = HTTPDigestAuth(user, passwd)
session = requests.session()
xml = self._get_xml(session, url, settings, debug)
hrefs = self._process_xml(xml, debug)
# convert into vcard objects
cards = []
for href in hrefs.keys():
cards.append(self._get_vcard(session, url_base + href, settings, debug))
vcf = fritzbox.VCF.Import()
books = vcf.get_books_by_cards(cards, vipGroups, picture_path, debug=debug)
return books
def bruteForce(password, username):
#print str(username) + str(password)
tryComb = requests.post("http://" + target +":9990/management", auth=HTTPDigestAuth(str(username), str(password)))
statusCode = tryComb.status_code
if statusCode == 415:
print "\033[92m\033[1m[+]\033[0m Credential Found => " + username + ":" + password
def _http_digest_auth(self, auth_id, auth_pw):
# We got everything as we expected, create the HTTPDigestAuth object.
self._auth = HTTPDigestAuth(auth_id, auth_pw)
def _get_query():
registry = args.registry
if not registry:
parser.error("No registry provided, neither as a command argument nor through the environment variable "
"REGISTRY.")
if registry.startswith('http'):
base_url = registry
else:
base_url = 'https://{0}'.format(registry)
kwargs = {}
if args.user:
if args.digest_auth:
kwargs['auth'] = HTTPDigestAuth(args.user, args.password)
else:
kwargs['auth'] = HTTPBasicAuth(args.user, args.password)
elif os.path.isfile(DOCKER_CONFIG_FILE):
config_auth = _get_auth_config(registry)
if config_auth:
kwargs['auth'] = HTTPBase64Auth(config_auth)
if args.verify is not None:
kwargs['verify'] = value_or_false(args.verify)
if args.client_cert:
if args.client_key:
kwargs['cert'] = (args.client_cert, args.client_key)
else:
kwargs['cert'] = args.client_cert
if args.use_get_manifest:
kwargs['use_get_manifest'] = True
client = DockerRegistryClient(base_url, **kwargs)
query = DockerRegistryQuery(client)
cache_fn = _get_cache_name(registry)
if cache_fn and os.path.isfile(cache_fn) and not args.refresh:
with open(cache_fn) as f:
query.load(f)
return query
def execute(self, **kwargs):
"""
Calls the FritzBox action and returns a dictionary with the arguments.
"""
headers = self.header.copy()
headers['soapaction'] = '%s#%s' % (self.service_type, self.name)
data = self.envelope.strip() % self._body_builder(kwargs)
url = 'http://%s:%s%s' % (self.address, self.port, self.control_url)
auth = None
if self.password:
auth=HTTPDigestAuth(self.user, self.password)
response = requests.post(url, data=data, headers=headers, auth=auth)
# lxml needs bytes, therefore response.content (not response.text)
result = self.parse_response(response.content)
return result
def test_DIGEST_HTTP_200_OK_GET(self, httpbin):
auth = HTTPDigestAuth('user', 'pass')
url = httpbin('digest-auth', 'auth', 'user', 'pass')
r = requests.get(url, auth=auth)
assert r.status_code == 200
r = requests.get(url)
assert r.status_code == 401
s = requests.session()
s.auth = HTTPDigestAuth('user', 'pass')
r = s.get(url)
assert r.status_code == 200
def test_DIGEST_AUTH_RETURNS_COOKIE(self, httpbin):
url = httpbin('digest-auth', 'auth', 'user', 'pass')
auth = HTTPDigestAuth('user', 'pass')
r = requests.get(url)
assert r.cookies['fake'] == 'fake_value'
r = requests.get(url, auth=auth)
assert r.status_code == 200
def test_DIGEST_AUTH_SETS_SESSION_COOKIES(self, httpbin):
url = httpbin('digest-auth', 'auth', 'user', 'pass')
auth = HTTPDigestAuth('user', 'pass')
s = requests.Session()
s.get(url, auth=auth)
assert s.cookies['fake'] == 'fake_value'
def test_DIGEST_STREAM(self, httpbin):
auth = HTTPDigestAuth('user', 'pass')
url = httpbin('digest-auth', 'auth', 'user', 'pass')
r = requests.get(url, auth=auth, stream=True)
assert r.raw.read() != b''
r = requests.get(url, auth=auth, stream=False)
assert r.raw.read() == b''
def test_DIGESTAUTH_WRONG_HTTP_401_GET(self, httpbin):
auth = HTTPDigestAuth('user', 'wrongpass')
url = httpbin('digest-auth', 'auth', 'user', 'pass')
r = requests.get(url, auth=auth)
assert r.status_code == 401
r = requests.get(url)
assert r.status_code == 401
s = requests.session()
s.auth = auth
r = s.get(url)
assert r.status_code == 401
def __init__(self):
username = config.CLOUDAPP_USERNAME
password = config.CLOUDAPP_PASSWORD
if not username or not password:
raise StorageError("Set 'cloudapp_username' and 'cloudapp_password' configs.")
self._cloudapp_auth = HTTPDigestAuth(username, password)
self._session = requests.Session()
self._session.headers = {
'Accept': 'application/json',
}
def __init__(self, url, username, password, user):
self.url = url
self.user = user
self.auth = HTTPDigestAuth(username, password)
self.form_list_url = urljoin(self.url, 'formList')
self.submission_list_url = urljoin(self.url, 'view/submissionList')
self.download_submission_url = urljoin(self.url,
'view/downloadSubmission')
self.forms_path = os.path.join(
self.user.username, 'briefcase', 'forms')
self.resumption_cursor = 0
self.logger = logging.getLogger('console_logger')
def get_auth():
"""Get authorization token for https
"""
import getpass
from requests.auth import HTTPDigestAuth
#This binds raw_input to input for Python 2
try:
input = raw_input
except NameError:
pass
uname = input("MODSCAG Username:")
pw = getpass.getpass("MODSCAG Password:")
auth = HTTPDigestAuth(uname, pw)
#wget -A'h8v4*snow_fraction.tif' --user=uname --password=pw
return auth
def auth(self):
""" Return credentials for current user. """
return HTTPDigestAuth(self.username, self.password)
def run(self, data):
port = data["port"]
users = data["users"]
passwords = data["passwords"]
url = "http://{0}:{1}".format(self.ip, port)
response = requests.get(url)
message = ""
if response is not None:
if response.status_code != 401:
message = "{} is not protected by authentication system".format(self.ip)
return message
else:
if 'Basic' in response.headers['WWW-Authenticate']:
auth = "basic"
elif 'Digest' in response.headers['WWW-Authenticate']:
auth = "digest"
else:
auth = None
for user in users:
for password in passwords:
try:
if auth == "basic":
response = requests.get(url, auth=HTTPBasicAuth(user, password.strip()))
elif auth == "digest":
response = requests.get(url, auth=HTTPDigestAuth(user, password.strip()))
else:
return None
if response.status_code == 200:
message += "Success!\nCredentials found\nUsername: {0} Password: {1}\n".format(user, password)
except requests.ConnectionError:
return None
if message == "":
message = "Error. Credentials not found"
return message
def setup_cluster(self):
if self.couple is not None and self.couple_pass is None:
self.couple_pass = self.adminpass
self.couple_user = self.adminuser
if self.boothost is None:
self.boothost = self.host[0]
self.host.remove(self.boothost)
if self.ml_init(self.boothost):
self.ml_security(self.boothost)
conn = Connection(self.boothost, HTTPDigestAuth(self.adminuser, self.adminpass))
self.marklogic = MarkLogic(conn)
for hostname in self.host:
self.ml_init(hostname)
self.ml_join(self.boothost, hostname)
if self.name is not None:
print("{0}: rename cluster...".format(self.boothost))
cluster = self.marklogic.cluster()
cluster.set_cluster_name(self.name)
cluster.update()
if self.couple is not None:
for couple in self.couple:
print("{0}: couple with {1}...".format(self.boothost, couple))
altconn = Connection(couple,
HTTPDigestAuth(self.couple_user,
self.couple_pass))
altml = MarkLogic(altconn)
altcluster = altml.cluster()
cluster = self.marklogic.cluster()
cluster.couple(altcluster)
print("Finished")
def _get_http_auth(username: str, password: str, auth_type: str) -> AuthBase:
if auth_type == 'basic':
return HTTPBasicAuth(username, password)
if auth_type == 'digest':
return HTTPDigestAuth(username, password)
raise RetsClientError('unknown auth type %s' % auth_type)
def update_status(self, data=None):
if data is None:
if self.person_number() == '':
return
try:
r = requests.get(
'https://register.utn.se/api.php',
auth=HTTPDigestAuth(settings.MEMBERSHIP_API_USER,
settings.MEMBERSHIP_API_PASSWORD),
params={
'action': 'check',
'person_number': self.person_number().replace('-', '')
},
)
data = r.json().get('status')
except requests.exceptions.ConnectionError:
data = 'unknown'
except ValueError:
return
if data == 'member':
self.status = 'member'
elif data == 'nonmember':
if self.status in ['unknown', 'nonmember']:
self.status = 'nonmember'
else:
self.status = 'alumnus'
self.status_changed = timezone.now()