def __init__(self, uri, auth, secure=True, verbose=False):
try:
self.driver = GraphDatabase.driver(uri, auth=auth, encrypted=secure)
except ServiceUnavailable as error:
raise ConsoleError("Could not connect to {} ({})".format(uri, error))
self.uri = uri
self.history = FileHistory(HISTORY_FILE)
self.prompt_args = {
"history": self.history,
"lexer": PygmentsLexer(CypherLexer),
"style": style_from_pygments(VimStyle, {
Token.Prompt: "#ansi{}".format(self.prompt_colour.replace("cyan", "teal")),
Token.TxCounter: "#ansi{} bold".format(self.tx_colour.replace("cyan", "teal")),
})
}
self.lexer = CypherLexer()
self.result_writer = TabularResultWriter()
if verbose:
from .watcher import watch
self.watcher = watch("neo4j.bolt")
self.commands = {
"//": self.set_multi_line,
"/e": self.edit,
"/?": self.help,
"/h": self.help,
"/help": self.help,
"/x": self.exit,
"/exit": self.exit,
"/r": self.run_read_tx,
"/read": self.run_read_tx,
"/w": self.run_write_tx,
"/write": self.run_write_tx,
"/csv": self.set_csv_result_writer,
"/table": self.set_tabular_result_writer,
"/tsv": self.set_tsv_result_writer,
"/config": self.config,
"/kernel": self.kernel,
}
self.session = None
self.tx = None
self.tx_counter = 0
评论列表
文章目录