def banner():
print(Style.DIM)
print(' ___________________________')
print(' / /\\')
print(' / sadboyzvone\'s _/ /\\')
print(' / Intel 8080 / \/')
print(' / Assembler /\\')
print('/___________________________/ /')
print('\___________________________\/')
print(' \\ \\ \\ \\ \\ \\ \\ \\ \\ \\ \\ \\ \\ \\ \\'
+ Style.RESET_ALL + Style.BRIGHT)
print(Fore.WHITE + '\nPowered by ' + Fore.BLUE + 'Pyt'
+ Fore.YELLOW + 'hon' + Fore.WHITE
+ '\nCopyright (C) 2017, Zvonimir Rudinski')
# Print usage information
python类DIM的实例源码
def test_tag_chars():
with raises(ValueError):
am = AnsiMarkup(tag_sep='{')
with raises(ValueError):
am = AnsiMarkup(tag_sep='(-)')
with raises(ValueError):
am = AnsiMarkup(tag_sep='qq')
am = AnsiMarkup(tag_sep='{}')
r1 = p('0<b>1<d>2</d>3</b>4')
r2 = am.parse('0{b}1{d}2{/d}3{/b}4')
assert r1 == r2 == '0' + S.BRIGHT + '1' + S.DIM + '2' + S.RESET_ALL + S.BRIGHT + '3' + S.RESET_ALL + '4'
assert s('<b>1</b>') == am.strip('{b}1{/b}') == '1'
def local_exec(cmd, env=None, shell=False, check_retcode=True):
LOG.debug("{}Executing locally: '{}'{}".format(
Style.DIM, cmd if isinstance(cmd, basestring) else " ".join(cmd),
Style.RESET_ALL))
if env is not None:
# Sanitize env because it doees not digest non string values
env = dict((key, str(value)) for key, value in env.items())
# Unite the system and custom environments
env = dict(os.environ, **env)
proc = subprocess.Popen(cmd, env=env, stderr=subprocess.PIPE,
stdout=subprocess.PIPE, shell=shell)
out, err = proc.communicate()
ret_code = proc.returncode
_proceed_exec_result(
out.decode('ascii', 'ignore'), err.decode('ascii', 'ignore'),
ret_code, check_retcode)
return ret_code, out, err
def ssh_exec_live(ssh, cmd, timeout=None, check_retcode=True):
LOG.debug(u"{}Calling SSH live: '{}'{}".format(Style.DIM, cmd,
Style.RESET_ALL))
try:
interact = SSHClientInteraction(ssh, timeout=timeout, display=True,
logger=logging.getLogger())
interact.expect('.*')
interact.send(cmd + "; exit $?") # needed to not depend on prompt type
interact.tail()
ret_code = interact.channel.recv_exit_status()
except Exception:
LOG.debug("Something went wrong in 'ssh_exec_live':\n{}".format(
format_exception(sys.exc_info())
))
raise
_proceed_exec_result("", "", ret_code, check_retcode)
return ret_code, "", ""
def get_scanning_result(self, iface):
indent = ' ' * 1
flagExec = self.exec_iwlist(iface)
if flagExec:
header = indent + 'Scanning WiFi networks using interface \'' + iface + '\'\n'
network_table = [['SSID', 'AP Address', 'Channel', 'Encryption', 'Quality']]
for dict_network in self.parsed_cells:
network_table.append([
dict_network['Name'],
dict_network['Address'],
dict_network['Channel'],
dict_network['Encryption'],
dict_network['Quality']
])
table = AsciiTable(network_table)
print (Fore.YELLOW + Style.DIM + header + table.table)
return True
else:
return False
def print_ifaces_wireless_table(self):
indent = ' ' * 1
header = indent + 'Wireless interfaces information:\n'
self.wireless_table = [['Interface', 'Status', 'IP Address',
'Mask', 'Mode', 'SSID', 'AP Address', 'Wireless Type']]
for data in self.ifaces_wireless:
self.wireless_table.append([
data,
self.dict_status[data],
self.dict_ipaddr[data],
self.dict_mask[data],
self.dict_mode[data],
self.dict_ssid[data],
self.dict_apaddr[data],
self.dict_wname[data]
])
table = AsciiTable(self.wireless_table)
print (Fore.YELLOW + Style.DIM + header + table.table)
def app_header(self):
header = '\n'
header += ' ??????????? ???? ?????? ???? ??? ?????? ??????? ??????????????? \n'
header += ' ????????????? ?????????????????? ??????????????????? ????????????????\n'
header += ' ??? ????????????????????????? ?????????????? ?????????? ????????\n'
header += ' ??? ???????????????????????????????????????? ????????? ????????\n'
header += ' ??????????? ??? ?????? ?????? ????????? ??????????????????????? ???\n'
header += ' ?????????? ?????? ?????? ???????? ??? ??????? ??????????? ???\n'
header2 = ' Connection Manager\n'
header3 = ' {}\n'.format('-' * 70)
header3 += ' Version : {}\n'.format(__version__)
header3 += ' Release date : {}\n'.format(__release_date__)
header3 += ' Github : https://github.com/fachrioktavian/CManager\n'
header3 += ' Dev by : {0} - ({1})\n'.format(__author__, __author_email__)
header3 += ' Official : https://dracos-linux.org\n'
header3 += ' {}\n'.format('-' * 70)
print (Fore.RED + Style.DIM + header + header2)
print (Fore.CYAN + Style.DIM + header3)
def menu():
# Using colour from colorama: https://pypi.python.org/pypi/colorama
# Formatting e.g.: Fore.COLOUR, Back.COLOUR, Style.DIM with e.g. DIM, RED, CYAN, etc.:
print(Fore.BLACK + Back.WHITE + "10cbazbt3 menu:" + Style.RESET_ALL)
print(Fore.YELLOW + Style.DIM + "Main:")
print(" b = Blurb m = Mentions")
print(" r = Reply t = Timeline")
print(" blog = BLOG o = Own blurbs")
print(" pins = PINS")
print("Admin:")
print(" Login = Login menu = show Menu")
print(" Logout = Logout. sites = my Sites")
print(" exit = Exit")
print(Style.RESET_ALL)
# DEFINE 10C POST INTERACTIONS:
# LOTS OF DUPLICATION HERE!
# Define the 'blurb' (social post) subroutine:
def blurb():
# Input some text:
posttext = input(Fore.YELLOW + Style.DIM + "Write some text: " + Style.RESET_ALL)
# Saves the input text to 'posttext.txt':
file = open("posttext.txt", "w")
file.write(posttext)
file.close()
# Uses the global header & creates the data to be passed to the url:
url = 'https://api.10centuries.org/content'
data = {'content': posttext}
response = requests.post(url, headers=headers, data=data)
# Displays the server's response:
responsestatus = response.status_code
showapiresponse(responsestatus)
# Define the 'post' (blog post) subroutine:
def post():
# Input blog post data:
posttitle = input(Fore.YELLOW + Style.DIM + "Write a blog post title: " + Style.RESET_ALL)
print(Fore.YELLOW + Style.DIM + "Write a blog post:")
print("(Press [ctrl-d] to 'save' when you finish writing.)" + Style.RESET_ALL)
posttext = sys.stdin.read()
# Adds a post date & time, currently set as 'now':
postdatetime = strftime("%Y-%m-%d %H:%M:%S")
# Uses the global header & creates the data to be passed to the url:
url = 'https://api.10centuries.org/content'
# IMPORTANT: @bazbt3's channel_id = 6. SUBSTITUTE WITH YOUR CHANNEL_ID in global definitions!
data = {'title': posttitle, 'content': posttext, 'channel_id': channelid, 'send_blurb': 'Y', 'pubdts': postdatetime}
response = requests.post(url, headers=headers, data=data)
# Displays the server's response:
responsestatus = response.status_code
showapiresponse(responsestatus)
# Define the 'reply' subroutine:
def reply():
# INEFFICIENT: NEED TO MERGE MOST OF THE CODE FROM THIS AND THE REPLYINLINE SUBROUTINE:
# Input a reply-to post number:
replytoid = input(Fore.YELLOW + Style.DIM + "Post number to reply to: " + Style.RESET_ALL)
# Input some text:
posttext = input(Fore.YELLOW + Style.DIM + "Write some text (add usernames!): " + Style.RESET_ALL)
# Saves the input text to 'posttext.txt':
file = open("posttext.txt", "w")
file.write(posttext)
file.close()
# Uses the global header & creates the data to be passed to the url:
url = 'https://api.10centuries.org/content'
data = {'reply_to': replytoid, 'content': posttext}
response = requests.post(url, headers=headers, data=data)
# Displays the server's response:
responsestatus = response.status_code
showapiresponse(responsestatus)
# Define the 'replyinline' subroutine:
# INEFFICIENT: SEE THE REPLY SUBROUTINE:
def replyinline(postidreply, poster):
# Use the to-be-replied-to post id:
replytoid = postidreply
replytoposter = poster
# Input some text:
posttext = input(Fore.YELLOW + Style.DIM + "Write some text: " + Style.RESET_ALL)
# Add '@', username to posttext:
posttext = ("@" + replytoposter + " " + posttext)
# Saves the input text to 'posttext.txt':
file = open("posttext.txt", "w")
file.write(posttext)
file.close()
# Uses the global header & creates the data to be passed to the url:
url = 'https://api.10centuries.org/content'
data = {'reply_to': replytoid, 'content': posttext}
response = requests.post(url, headers=headers, data=data)
# Displays the server's response:
responsestatus = response.status_code
showapiresponse(responsestatus)
# Define the 'repostinline' subroutine:
def test_flat():
assert p('<b>1</b>') == p('<bold>1</bold>') == S.BRIGHT + '1' + S.RESET_ALL
assert p('<d>1</d>') == p('<dim>1</dim>') == S.DIM + '1' + S.RESET_ALL
assert p('<b>1</b><d>2</d>') == S.BRIGHT + '1' + S.RESET_ALL + S.DIM + '2' + S.RESET_ALL
assert p('<b>1</b>2<d>3</d>') == S.BRIGHT + '1' + S.RESET_ALL + '2' + S.DIM + '3' + S.RESET_ALL
def test_nested():
assert p('0<b>1<d>2</d>3</b>4') == '0' + S.BRIGHT + '1' + S.DIM + '2' + S.RESET_ALL + S.BRIGHT + '3' + S.RESET_ALL + '4'
assert p('<d>0<b>1<d>2</d>3</b>4</d>') == S.DIM + '0' + S.BRIGHT + '1' + S.DIM + '2' + S.RESET_ALL + S.DIM + S.BRIGHT + '3' + S.RESET_ALL + S.DIM +'4' + S.RESET_ALL
def m_info(self, m):
m = '[*] ' + m
if COLORAMA:
print Style.DIM + m
else:
print m
def m_info(self, m):
m = '[*] ' + m
if COLORAMA:
print Style.DIM + m
else:
print m
def m_info(self, m):
m = '[*] ' + m
if COLORAMA:
print Style.DIM + m
else:
print m
def banner():
banner = Style.DIM + """
__ ___ __ ___ | |_ __ _ __ ___
\ \/ / '_ ` _ \| | '__| '_ \ / __|____
> <| | | | | | | | | |_) | (_|_____|
/_/\_\_| |_| |_|_|_| | .__/ \___|
|_|
_ _ __
| |__ _ __ _ _| |_ ___ / _| ___ _ __ ___ ___ _ __
| '_ \| '__| | | | __/ _ \ |_ / _ \| '__/ __/ _ \ '__|
| |_) | | | |_| | || __/ _| (_) | | | (_| __/ |
|_.__/|_| \__,_|\__\___|_| \___/|_| \___\___|_| """ + Style.RESET_ALL
print("{}".format(banner))
def ssh_exec(ssh, cmd, timeout=None, check_retcode=True, get_pty=False):
LOG.debug(u"{}Calling SSH: '{}'{}".format(Style.DIM, cmd, Style.RESET_ALL))
try:
_, out, err = ssh.exec_command(cmd, timeout=timeout, get_pty=get_pty)
ret_code = out.channel.recv_exit_status()
out, err = out.read().strip(), err.read().strip()
except Exception:
LOG.debug("Something went wrong in 'ssh_exec':\n{}".format(
format_exception(sys.exc_info())
))
raise
_proceed_exec_result(out, err, ret_code, check_retcode)
return ret_code, out, err
def _run_ssh_pexpect(cmd, password, using_bashc=False):
"""
Run a given command using pexpect.
"""
logger.debug(u'{}SSH Command: {}{}'.format(Style.DIM, cmd,
Style.RESET_ALL))
if using_bashc:
ssh_cli = pexpect.spawn('/bin/bash', ['-c', cmd])
else:
ssh_cli = pexpect.spawn(cmd)
i = ssh_cli.expect(['[Pp]assword: ', '\(yes/no\)\? '])
if i == 1:
ssh_cli.sendline('yes')
ssh_cli.expect('[Pp]assword: ')
ssh_cli.sendline(password)
time.sleep(1)
ssh_cli.expect(['Connection to [0-9\.a-z]+ is closed.', pexpect.EOF,
pexpect.TIMEOUT], timeout=5)
# Expected behavior is to get pexpect.EOF or closed connection, but due to
# a bug in docker we have to send an additional new line or Ctrl^C
out = str(ssh_cli.before) + str(ssh_cli.after)
logger.debug(u'Output:\n {}{}{}'.format(Fore.YELLOW, out, Style.RESET_ALL))
if ssh_cli.isalive():
ssh_cli.close()
return out
def _run_scp_command(cmd, user, host, password):
"""
Emulate user command line interation using SCP protocol
:param cmd: command to be executed
:param user: remote host user
:param host: remote host IP/hostname
:param password: passwrod for remote user on host
:returns None:
"""
logger.debug(u'{}Running SCP: {}{}'.format(
Style.DIM, cmd, Style.RESET_ALL))
scp = pexpect.spawn(cmd)
i = scp.expect(['\(yes/no\)\? ', '[Pp]assword: '])
if i == 0:
scp.sendline('yes')
scp.expect('[Pp]assword: ')
scp.sendline(password)
time.sleep(1)
try:
while True:
i = scp.expect([pexpect.EOF, '[0-9][0-9]:[0-9][0-9] '],
timeout=5)
if i == 0:
logger.debug(u'{}{}{}'.format(Fore.YELLOW, scp.before,
Style.RESET_ALL))
break
logger.debug(u'{}{}{}{}'.format(Fore.YELLOW, scp.before,
scp.after, Style.RESET_ALL))
time.sleep(.1)
except pexpect.TIMEOUT:
# A docker bug expecting an extra new line in the end. Ideally we
# will exit the loop getting pexpect.EOF, i.e. i==0
logger.debug(u'{}{}{}'.format(Fore.YELLOW, scp.before,
Style.RESET_ALL))
finally:
if scp.isalive():
scp.close()
def _compare_files(local_path, remote_path, ssh):
"""
Compare hashes of two files, one on local and another one on remote server
:param local_path: path to file on the local server
:param remote_path: path to file on the remote server
:param ssh: SSHClient instance to communicate with remote server
:returns: True/False on success/fail
:rtype: bool
"""
logger.debug(u'{}Comparing files. host: {} and container: {}{}'.format(
Style.DIM, local_path, remote_path, Style.RESET_ALL))
# Sometimes ssh_exec exits with a 0 code, but no stdout can be read,
# so we moved the file hash call inside a function.
def _remote_md5sum(ssh, path):
remote_cmd = 'md5sum {}'.format(path)
ret_code, out, err = ssh_exec(ssh=ssh, cmd=remote_cmd, get_pty=True)
_remote_file_hash = out.strip().split()[0].strip()
return _remote_file_hash
# Get hash of the remote file.
remote_file_hash = retry(_remote_md5sum, ssh=ssh, path=remote_path,
tries=3, interval=1)
# Get hash of the local file
try:
with open(local_path) as f:
local_file_hash = hashlib.md5(f.read()).hexdigest()
except (OSError, IOError) as e:
raise FileTransferValidationFailed(str(e))
# Compare hashes
if local_file_hash != remote_file_hash:
message = 'Hashes not equal. Host: {} != container: {}'.format(
local_file_hash, remote_file_hash)
logger.debug(u'{}host: {} container: {}{}'.format(
Fore.RED, local_path, remote_path, Style.RESET_ALL))
raise FileTransferValidationFailed(message)
logger.debug(u'{}Validation: OK{}'.format(Fore.GREEN, Style.RESET_ALL))
return True
def test_SNAT_rules(cluster):
container_ids, container_ips, pods, specs = setup_pods(cluster)
# --------- Test that SNAT rules are applied correctly --------
jenkins_ip = _get_jenkins_ip(cluster)
LOG.debug('{}Test that SNAT rules work properly{}'.format(
Fore.CYAN, Style.RESET_ALL))
LOG_MSG = "Check SNAT rules for pod '{}' public IP: '{}' host node: '{}'"
BIND_IP = '0.0.0.0'
POD_TCP_CMD = 'nc -z -v {} {}'.format(jenkins_ip, JENKINS_TCP_SERVER_PORT)
POD_UDP_CMD = 'nc -u -z -v {} {}'.format(jenkins_ip,
JENKINS_UDP_SERVER_PORT)
for name, pod in pods.items():
msg = LOG_MSG.format(name, pod.public_ip, specs[name]['host'])
# Check if pod can ping jenkins
ping(pod, container_ids[name], jenkins_ip)
LOG.debug('{}TCP check {}{}'.format(Style.DIM, msg, Style.RESET_ALL))
# Check if SNAT rules work properly for TCP connections
with jenkins_accept_connections(
SocketServer.TCPServer, MyRequestHandler, BIND_IP,
JENKINS_TCP_SERVER_PORT) as connection_list:
pod.docker_exec(container_ids[name], POD_TCP_CMD)
_check_visible_ip(pod, specs, connection_list)
LOG.debug('{}UDP check {}{}'.format(Style.DIM, msg, Style.RESET_ALL))
# Check if SNAT rules work properly for UDP connections
with jenkins_accept_connections(
SocketServer.UDPServer, MyRequestHandler, BIND_IP,
JENKINS_UDP_SERVER_PORT) as connection_list:
pod.docker_exec(container_ids[name], POD_UDP_CMD)
_check_visible_ip(pod, specs, connection_list)
def say(message):
print(prefix + Style.DIM + message + Style.RESET_ALL)
def log_debug(message):
print(Style.DIM + Fore.MAGENTA + message + Fore.RESET + Style.RESET_ALL)
def log_debug(message):
print(Style.DIM + Fore.MAGENTA + message + Fore.RESET + Style.RESET_ALL)
def winrm(self, script, winrm_kwargs=dict(), quiet=False, **kwargs):
"""
Executes a remote windows powershell script
:param script: A string with the powershell script
:param winrm_kwargs: The pywinrm Protocol class kwargs
:param quiet: Whether to hide the stdout/stderr output or not
:return: A tuple with the status code, the stdout and the stderr
:raise: WinRmError: If the command fails
"""
if self._vm_object:
self._wait_for_winrm_service(
kwargs['vcdriver_vm_winrm_username'],
kwargs['vcdriver_vm_winrm_password'],
**winrm_kwargs
)
winrm_session = self._open_winrm_session(
kwargs['vcdriver_vm_winrm_username'],
kwargs['vcdriver_vm_winrm_password'],
winrm_kwargs
)
if not quiet:
print('Executing remotely on {} ...'.format(self.ip()))
styled_print(Style.DIM)(script)
status, stdout, stderr = self._run_winrm_ps(winrm_session, script)
if not quiet:
styled_print(Style.BRIGHT)('CODE: {}'.format(status))
styled_print(Fore.GREEN)(stdout)
if status != 0:
if not quiet:
styled_print(Fore.RED)(stderr)
raise WinRmError(script, status, stdout, stderr)
else:
return status, stdout, stderr
def TOR_CHECK_PRNT(data):
if (data["IsTor"]==True):
print ("[" + Fore.GREEN + Style.BRIGHT + "+" + Style.RESET_ALL + "]" + Fore.GREEN + Style.BRIGHT + " TorStat's network traffic was routed through tor, onto the next stage...\n" + Style.RESET_ALL)
return True
elif (data["IsTor"]==False):
print ("[" + Fore.YELLOW + Style.BRIGHT + "!" + Style.RESET_ALL + "]" + Fore.YELLOW + Style.BRIGHT +" TorStat cannot perform any more recon on the tor servers because TorStat's network traffic was not routed through tor.\n\t " + Style.RESET_ALL + Fore.WHITE + Style.DIM + "=> try : proxychains python TorStat.py\n" + Style.RESET_ALL)
return False
def parseLine(line):
severity = getSeverity(line)
# Add color based on severity
if 'severity' not in locals():
severity = 3
if severity == 0:
color = Style.DIM + Fore.WHITE
elif severity == 1:
color = Style.NORMAL + Fore.BLUE
elif severity == 2:
color = Style.NORMAL + Fore.CYAN
elif severity == 3:
color = Style.NORMAL + Fore.WHITE
elif severity == 4:
color = Style.NORMAL + Fore.RED
elif severity == 5:
color = Style.NORMAL + Fore.BLACK + Back.RED
else:
color = Style.NORMAL + Fore.BLACK + Back.YELLOW
# Replace html tab entity with actual tabs
line = clearTags(line)
line = line.replace('	', "\t")
return color + line + Style.RESET_ALL
def chitchat(self, msg, eol=None):
if msg: print(Style.DIM + msg + Style.RESET_ALL, end=eol)
sys.stdout.flush()
# show warning message