def complete(self, text, state):
"Generic readline completion entry point."
buffer = readline.get_line_buffer()
line = readline.get_line_buffer().split()
# show all commands
if not line:
return [c + ' ' for c in self.commands][state]
# account for last argument ending in a space
if RE_SPACE.match(buffer):
line.append('')
# resolve command to the implementation function
cmd = line[0].strip()
if cmd in self.commands:
impl = getattr(self, 'complete_%s' % cmd)
args = line[1:]
if args:
return (impl(args) + [None])[state]
return [cmd + ' '][state]
results = [c + ' ' for c in self.commands if c.startswith(cmd)] + [None]
return results[state]
python类get_line_buffer()的实例源码
def echo(text, prompt_prefix=None):
readline_active = (rl_readline_state.value & RL_STATE_DONE) == 0
if readline_active:
saved_point = rl_point.value
saved_line = r.get_line_buffer()
rl_save_prompt()
rl_replace_line(c_char_p(b""), 0)
rl_redisplay()
print(text)
if readline_active:
if prompt_prefix is not None:
sys.stdout.write(prompt_prefix)
sys.stdout.flush()
rl_restore_prompt()
rl_replace_line(c_char_p(saved_line.encode()), 0)
rl_point.value = saved_point
rl_redisplay()
def start(self):
while True:
try:
self.cmdloop()
break
except KeyboardInterrupt as e:
if not readline.get_line_buffer():
raise KeyboardInterrupt
else:
print("")
self.intro = None
continue
if self.terminal:
self.terminal.kill()
# aliases
def autocomplete(self, text, state):
import readline
line = readline.get_line_buffer()
splitted = line.split(" ")
# if there is a space, delegate to the commands autocompleter
if len(splitted) > 1:
if splitted[0] in self.actions:
return self.actions[splitted[0]].autocomplete(self, line, text, state)
else:
return None
# no space, autocomplete will be the basic commands:
options = [x + " " for x in self.actions.keys() if x.startswith(text)]
try:
return options[state]
except:
return None
def completeOptions(self, text, state):
# need to simplify this much more
buffer = readline.get_line_buffer()
line = readline.get_line_buffer().split()
# show all commands
if not line:
return [c + ' ' for c in self.options][state]
# account for last argument ending in a space
if self.re.match(buffer):
line.append('')
# resolve command to the implementation function
cmd = line[0].strip()
if cmd in self.options:
args = line[1:]
if args:
return False
return [cmd + ' '][state]
results = [c + ' ' for c in self.options if c.startswith(cmd)] + [None]
return results[state]
def help(self):
line = readline.get_line_buffer().rstrip()
if line == '' or line[-1] != '(':
return
m = self.METHOD_PATTERN.search(line)
if not m:
return
try:
thisobject = eval(m.group(1), self.namespace)
except Exception:
return
if not inspect.ismethod(thisobject):
return
m = self.METHOD_DEF_PATTERN.match(inspect.getsource(thisobject))
if m:
print("")
print(m.group(1))
print(inspect.getdoc(thisobject).strip())
print(sys.ps1 + readline.get_line_buffer(), end='', flush=True)
def complete(self,text,index):
if text != self.text or not text:
self.text = text
if not readline.get_begidx():
self.matches = [ w for w in self.cmdcomplete if w.startswith(text) ]
else:
context = readline.get_line_buffer().split(" ")
#check first word, see if it's an atrophy command
if context[0] in self.cmdcomplete and context[0] != "run":
self.matches = [ s for s in self.symbols if s.startswith(text)]
else:
self.matches = [ f for f in glob.glob(context[-1]+'*')]
else:
try:
return self.matches[index]
except:
pass
try:
return self.matches[index]
except:
return None
def complete(text, state):
tokens = readline.get_line_buffer().split()
thistoken = tokens[-1]
thisdir = os.path.dirname(thistoken)
thispath = os.path.abspath(os.path.join(current_path, thisdir))
if thispath != '/':
thispath += '/'
if thispath not in tab_complete:
populateTabComplete(thispath)
if thispath not in tab_complete:
return False
suffix = [x for x in tab_complete[thispath] if x.startswith(text)][state:]
if len(suffix):
result = suffix[0]
if result[-1] != '/':
result += ' '
return result
return False
def generate_parameters(self, text, state):
if state == 0:
self.available = []
self.current = 0
try:
line = readline.get_line_buffer()[:readline.get_endidx()].split(' ')
cmd = self.manager.find(line[0])
except:
return 0
current_params = list(filter(lambda x: len(x.strip()) > 0, line[1:-1] if len(line) > 2 else []))
if ',' in text:
text = text.split(',')[-1]
for i in cmd.parameters(self.mole, current_params):
if i[:len(text)] == text:
self.available.append(i)
self.available.sort()
if len(self.available) == 1:
text = self.available[0]
self.available = []
self.current = len(self.available)
return text + cmd.parameter_separator(current_params)
return self.get_completion(text, state)
def complete(self, text, state):
"""Return the next possible completion for 'text'.
If a command has not been entered, then complete against command list.
Otherwise try to call complete_<command> to get list of completions.
"""
if state == 0:
import readline
origline = readline.get_line_buffer()
line = origline.lstrip()
stripped = len(origline) - len(line)
begidx = readline.get_begidx() - stripped
endidx = readline.get_endidx() - stripped
if begidx>0:
cmd, args, foo = self.parseline(line)
if cmd == '':
compfunc = self.completedefault
else:
try:
compfunc = getattr(self, 'complete_' + cmd)
except AttributeError:
compfunc = self.completedefault
else:
compfunc = self.completenames
self.completion_matches = compfunc(text, line, begidx, endidx)
try:
return self.completion_matches[state]
except IndexError:
return None
def _get_step_matches(self, text):
buffer = readline.get_line_buffer()
tail = buffer.rsplit(".", 1)[0]
matches = []
if tail:
obj_class = self.shell.run_command("{}.getClass()".format(tail))[0].split(" ")[-1]
obj_classname = obj_class.split(".")[-1]
if obj_classname == "DefaultGraphTraversal":
matches += self.shell.run_command("GraphTraversal.class.methods.name.unique()")
matches += self.shell.run_command("GraphTraversal.metaClass.methods.name.unique()")
matches += self.shell.run_command("getBinding().getVariable(\"sessionSteps\").keySet()")
else:
matches += self.shell.run_command("{}.getMethods().name.unique()".format(obj_class))
return [match for match in matches if match.startswith(text)]
def set_context(self):
line = readline.get_line_buffer()
self.context = "start"
for c in reversed(line):
if c in '.':
self.context = "traversal"
return
if c in ')}':
self.context = "complete"
return
elif c in '({':
self.context = "groovy"
return
def _get_step_matches(self, text):
buffer = readline.get_line_buffer()
tail = buffer.rsplit(".", 1)[0]
matches = []
if tail:
obj_class = self.shell.run_command("{}.getClass()".format(tail))[0].split(" ")[-1]
obj_classname = obj_class.split(".")[-1]
if obj_classname == "DefaultGraphTraversal":
matches += self.shell.run_command("GraphTraversal.class.methods.name.unique()")
matches += self.shell.run_command("GraphTraversal.metaClass.methods.name.unique()")
matches += self.shell.run_command("getBinding().getVariable(\"sessionSteps\").keySet()")
else:
matches += self.shell.run_command("{}.getMethods().name.unique()".format(obj_class))
return [match for match in matches if match.startswith(text)]
def set_context(self):
line = readline.get_line_buffer()
self.context = "start"
for c in reversed(line):
if c in '.':
self.context = "traversal"
return
if c in ')}':
self.context = "complete"
return
elif c in '({':
self.context = "groovy"
return
def complete(self, text, state):
"""Return the next possible completion for 'text'.
If a command has not been entered, then complete against command list.
Otherwise try to call complete_<command> to get list of completions.
"""
if state == 0:
original_line = readline.get_line_buffer()
line = original_line.lstrip()
stripped = len(original_line) - len(line)
start_index = readline.get_begidx() - stripped
end_index = readline.get_endidx() - stripped
if start_index > 0:
cmd, args = self.parse_line(line)
if cmd == '':
complete_function = self.default_completer
else:
try:
complete_function = getattr(self, 'complete_' + cmd)
except AttributeError:
complete_function = self.default_completer
else:
complete_function = self.raw_command_completer
self.completion_matches = complete_function(text, line, start_index, end_index)
try:
return self.completion_matches[state]
except IndexError:
return None
def complete(self, text, state):
"""Return the next possible completion for 'text'.
If a command has not been entered, then complete against command list.
Otherwise try to call complete_<command> to get list of completions.
"""
if state == 0:
import readline
origline = readline.get_line_buffer()
line = origline.lstrip()
stripped = len(origline) - len(line)
begidx = readline.get_begidx() - stripped
endidx = readline.get_endidx() - stripped
if begidx>0:
cmd, args, foo = self.parseline(line)
if cmd == '':
compfunc = self.completedefault
else:
try:
compfunc = getattr(self, 'complete_' + cmd)
except AttributeError:
compfunc = self.completedefault
else:
compfunc = self.completenames
self.completion_matches = compfunc(text, line, begidx, endidx)
try:
return self.completion_matches[state]
except IndexError:
return None
def complete(self, text, state):
"""Return the next possible completion for 'text'.
If a command has not been entered, then complete against command list.
Otherwise try to call complete_<command> to get list of completions.
"""
if state == 0:
original_line = readline.get_line_buffer()
line = original_line.lstrip()
stripped = len(original_line) - len(line)
start_index = readline.get_begidx() - stripped
end_index = readline.get_endidx() - stripped
if start_index > 0:
cmd, args = self.parse_line(line)
if cmd == '':
complete_function = self.default_completer
else:
try:
complete_function = getattr(self, 'complete_' + cmd)
except AttributeError:
complete_function = self.default_completer
else:
complete_function = self.raw_command_completer
self.completion_matches = complete_function(text, line, start_index, end_index)
try:
return self.completion_matches[state]
except IndexError:
return None
def pathCompleter(self,text,state):
line = readline.get_line_buffer().split()
return [x for x in glob.glob(text+'*')][state]
def display(self, msg, modifier=None):
if not type(msg) is unicode:
# force output unicode string to output
# Python will hopefully handle output printing
msg=obj2utf8(msg)
if msg:
if modifier=="error":
self.stdout.write(PupyCmd.format_error(msg))
elif modifier=="success":
self.stdout.write(PupyCmd.format_success(msg))
elif modifier=="info":
self.stdout.write(PupyCmd.format_info(msg))
elif modifier=="srvinfo":
buf_bkp=readline.get_line_buffer()
#nG move cursor to column n
#nE move cursor ro the beginning of n lines down
#nK Erases part of the line. If n is zero (or missing), clear from cursor to the end of the line. If n is one, clear from cursor to beginning of the line. If n is two, clear entire line. Cursor position does not change.
self.stdout.write("\x1b[0G"+PupyCmd.format_srvinfo(msg)+"\x1b[0E")
self.stdout.write("\x1b[2K")#clear line
self.stdout.write(self.raw_prompt+buf_bkp)#"\x1b[2K")
try:
readline.redisplay()
except Exception:
pass
elif modifier=="warning":
self.stdout.write(PupyCmd.format_warning(msg))
else:
self.stdout.write(PupyCmd.format_log(msg))
def complete(self, text, state):
if state == 0:
import readline
origline = readline.get_line_buffer()
line = origline.lstrip()
stripped = len(origline) - len(line)
begidx = readline.get_begidx() - stripped
endidx = readline.get_endidx() - stripped
if begidx>0:
cmd, args, foo = self.parseline(line)
if cmd == '':
compfunc = self.completedefault
else:
try:
#compfunc = getattr(self, 'complete_' + cmd)
compfunc = self.pupy_completer.complete
except AttributeError:
compfunc = self.completedefault
else:
compfunc = self.completenames
self.completion_matches = compfunc(text, line, begidx, endidx)
try:
if self.completion_matches:
return self.completion_matches[state]
except IndexError:
return None
def handle_return(x, y):
if not delete_input:
rl_done.value = 1
print()
return 0
line = r.get_line_buffer()
if line is not None and len(line) > 0:
r.add_history(line)
rl_set_prompt(c_char_p(b""))
rl_replace_line(c_char_p(b""), 0)
rl_redisplay()
rl_replace_line(c_char_p(line.encode()), 1)
rl_done.value = 1
return 0
def improved_rlcompleter(self):
"""Enhances the default rlcompleter
The function enhances the default rlcompleter by also doing
pathname completion and module name completion for import
statements. Additionally, it inserts a tab instead of attempting
completion if there is no preceding text.
"""
completer = rlcompleter.Completer(namespace=self.locals)
# - remove / from the delimiters to help identify possibility for path completion
readline.set_completer_delims(readline.get_completer_delims().replace('/', ''))
modlist = frozenset(name for _, name, _ in pkgutil.iter_modules())
def complete_wrapper(text, state):
line = readline.get_line_buffer().strip()
if line == '':
return None if state > 0 else self.tab
if state == 0:
if line.startswith(('import', 'from')):
completer.matches = [name for name in modlist if name.startswith(text)]
else:
match = completer.complete(text, state)
if match is None and '/' in text:
completer.matches = glob.glob(text+'*')
try:
match = completer.matches[state]
return '{}{}'.format(match, ' ' if keyword.iskeyword(match) else '')
except IndexError:
return None
return complete_wrapper
def complete(self, text, state):
"""Return the next possible completion for 'text'.
If a command has not been entered, then complete against command list.
Otherwise try to call complete_<command> to get list of completions.
"""
if state == 0:
import readline
origline = readline.get_line_buffer()
line = origline.lstrip()
stripped = len(origline) - len(line)
begidx = readline.get_begidx() - stripped
endidx = readline.get_endidx() - stripped
if begidx>0:
cmd, args, foo = self.parseline(line)
if cmd == '':
compfunc = self.completedefault
else:
try:
compfunc = getattr(self, 'complete_' + cmd)
except AttributeError:
compfunc = self.completedefault
else:
compfunc = self.completenames
self.completion_matches = compfunc(text, line, begidx, endidx)
try:
return self.completion_matches[state]
except IndexError:
return None
def complete(self, text, state):
"""Return the next possible completion for 'text'.
If a command has not been entered, then complete against command list.
Otherwise try to call complete_<command> to get list of completions.
"""
if state == 0:
import readline
origline = readline.get_line_buffer()
line = origline.lstrip()
stripped = len(origline) - len(line)
begidx = readline.get_begidx() - stripped
endidx = readline.get_endidx() - stripped
if begidx>0:
cmd, args, foo = self.parseline(line)
if cmd == '':
compfunc = self.completedefault
else:
try:
compfunc = getattr(self, 'complete_' + cmd)
except AttributeError:
compfunc = self.completedefault
else:
compfunc = self.completenames
self.completion_matches = compfunc(text, line, begidx, endidx)
try:
return self.completion_matches[state]
except IndexError:
return None
def display(self, msg, modifier=None):
if not type(msg) is unicode:
# force output unicode string to output
# Python will hopefully handle output printing
msg=obj2utf8(msg)
if msg:
if modifier=="error":
self.stdout.write(PupyCmd.format_error(msg))
elif modifier=="success":
self.stdout.write(PupyCmd.format_success(msg))
elif modifier=="info":
self.stdout.write(PupyCmd.format_info(msg))
elif modifier=="srvinfo":
buf_bkp=readline.get_line_buffer()
#nG move cursor to column n
#nE move cursor ro the beginning of n lines down
#nK Erases part of the line. If n is zero (or missing), clear from cursor to the end of the line. If n is one, clear from cursor to beginning of the line. If n is two, clear entire line. Cursor position does not change.
self.stdout.write("\x1b[0G"+PupyCmd.format_srvinfo(msg)+"\x1b[0E")
self.stdout.write("\x1b[2K")#clear line
self.stdout.write(self.raw_prompt+buf_bkp)#"\x1b[2K")
try:
readline.redisplay()
except Exception:
pass
elif modifier=="warning":
self.stdout.write(PupyCmd.format_warning(msg))
else:
self.stdout.write(PupyCmd.format_log(msg))
def complete(self, text, state):
if state == 0:
import readline
origline = readline.get_line_buffer()
line = origline.lstrip()
stripped = len(origline) - len(line)
begidx = readline.get_begidx() - stripped
endidx = readline.get_endidx() - stripped
if begidx>0:
cmd, args, foo = self.parseline(line)
if cmd == '':
compfunc = self.completedefault
else:
try:
#compfunc = getattr(self, 'complete_' + cmd)
compfunc = self.pupy_completer.complete
except AttributeError:
compfunc = self.completedefault
else:
compfunc = self.completenames
self.completion_matches = compfunc(text, line, begidx, endidx)
try:
if self.completion_matches:
return self.completion_matches[state]
except IndexError:
return None
def complete(self, text, state):
"""Complete current command"""
if len(readline.get_line_buffer().strip()) == 0 or (len(readline.get_line_buffer().split()) <= 1 and readline.get_line_buffer()[-1] != " "):
results = [command + " " for command in self.commands if command.startswith(text) ] + [None]
return results[state]
def next(self):
try:
l = raw_input(self.prompt+"> ")
if self.controlc:
#rl_done.value = 0
self.controlc = None
return None
return l
except (EOFError):
quit = True
print "\nKTHXBYE!"
except KeyboardInterrupt:
# user pressed ctrl-c whilst something was
# in the buffer. Make the code skip the next line of input.
if len(readline.get_line_buffer())>0:
#self.controlc = readline.get_line_buffer()
self.controlc = True
#print "rlbuf: ",rl_line_buffer.value
#rl_line_buffer.value = ""
#print "rl_done = ",rl_done
#rl_done.value = 1
#print "rl_done = ",rl_done
print "\nYour next line of input might be ignored.\nI have not understood readline's ^C handling good enough to make it work.\nFor the moment just type <enter> and ignore the displayed text."
return None
if quit:
raise StopIteration
def complete(self, text, state):
"""Return the next possible completion for 'text'.
If a command has not been entered, then complete against command list.
Otherwise try to call complete_<command> to get list of completions.
"""
if state == 0:
import readline
origline = readline.get_line_buffer()
# Offer completion just for commands that starts
# with the trigger :
if origline and not origline.startswith(':'):
return None
line = origline.lstrip().lstrip(':')
stripped = len(origline) - len(line)
begidx = readline.get_begidx() - stripped
endidx = readline.get_endidx() - stripped
if begidx>0:
cmd, args, foo = self.parseline(line)
if cmd == '':
compfunc = self.completedefault
else:
try:
compfunc = getattr(self, 'complete_' + cmd)
except AttributeError:
compfunc = self.completedefault
else:
compfunc = self.completenames
self.completion_matches = compfunc(text, line, begidx, endidx)
try:
return self.completion_matches[state]
except IndexError:
return None
def complete(self, text, state):
"""Return the next possible completion for 'text'.
If a command has not been entered, then complete against command list.
Otherwise try to call complete_<command> to get list of completions.
"""
if state == 0:
import readline
origline = readline.get_line_buffer()
line = origline.lstrip()
stripped = len(origline) - len(line)
begidx = readline.get_begidx() - stripped
endidx = readline.get_endidx() - stripped
if begidx>0:
cmd, args, foo = self.parseline(line)
if cmd == '':
compfunc = self.completedefault
else:
try:
compfunc = getattr(self, 'complete_' + cmd)
except AttributeError:
compfunc = self.completedefault
else:
compfunc = self.completenames
self.completion_matches = compfunc(text, line, begidx, endidx)
try:
return self.completion_matches[state]
except IndexError:
return None