def on_query_context(view_id, key, operator, operand, match_all):
v = sublime.View(view_id)
for callback in all_callbacks['on_query_context']:
try:
val = callback.on_query_context(v, key, operator, operand, match_all)
if val:
return True
except:
traceback.print_exc()
for vel in event_listeners_for_view(v):
if 'on_query_context' in vel.__class__.__dict__:
try:
val = vel.on_query_context(key, operator, operand, match_all)
if val:
return True
except:
traceback.print_exc()
return False
python类View()的实例源码
def on_query_completions(view_id, prefix, locations):
v = sublime.View(view_id)
completions = []
flags = 0
for callback in all_callbacks['on_query_completions']:
try:
res = callback.on_query_completions(v, prefix, locations)
if isinstance(res, tuple):
completions += [normalise_completion(c) for c in res[0]]
flags |= res[1]
elif isinstance(res, list):
completions += [normalise_completion(c) for c in res]
except:
traceback.print_exc()
return (completions,flags)
def queue_did_change(view: sublime.View):
buffer_id = view.buffer_id()
buffer_version = 1
pending_buffer = None
if buffer_id in pending_buffer_changes:
pending_buffer = pending_buffer_changes[buffer_id]
buffer_version = pending_buffer["version"] + 1
pending_buffer["version"] = buffer_version
else:
pending_buffer_changes[buffer_id] = {
"view": view,
"version": buffer_version
}
sublime.set_timeout_async(
lambda: purge_did_change(buffer_id, buffer_version), 500)
def notify_did_change(view: sublime.View):
if util.is_apex_file(view) and view.buffer_id() in pending_buffer_changes:
del pending_buffer_changes[view.buffer_id()]
if client:
document_state = get_document_state(view.file_name())
uri = util.filename_to_uri(view.file_name())
params = {
"textDocument": {
"uri": uri,
# "languageId": config.languageId, clangd does not like this field, but no server uses it?
"version": document_state.inc_version(),
},
"contentChanges": [{
"text": view.substr(sublime.Region(0, view.size()))
}]
}
client.send_notification(Notification.didChange(params))
def has_valid_syntax(view):
"""Check if syntax is valid for this plugin
Args:
view (sublime.View): current view
Returns:
bool: True if valid, False otherwise
"""
syntax = Tools.get_view_syntax(view)
if syntax in Tools.valid_syntax:
logging.debug("%s file has valid syntax: %s" %
(view.file_name(), syntax))
return True
logging.debug("%s file has unsupported syntax: %s" %
(view.file_name(), syntax))
return False
def on_activated_async(view):
"""Called upon activating a view. Execution in a worker thread.
Args:
view (sublime.View): current view
"""
logging.debug(" on_activated_async view id %s", view.buffer_id())
if Tools.is_valid_view(view):
if not completer:
return
if completer.exist_for_view(view.buffer_id()):
logging.debug(" view %s, already has a completer",
view.buffer_id())
return
logging.info(" init completer for view id %s" % view.buffer_id())
completer.init(view)
logging.info(" init completer for view id %s done" %
view.buffer_id())
def add_error(self, view, error_dict):
"""Put new compile error in the dictionary of errors.
Args:
view (sublime.View): current view
error_dict (dict): current error dict {row, col, file, region}
"""
logging.debug(" adding error %s", error_dict)
error_source_file = path.basename(error_dict['file'])
if error_source_file == path.basename(view.file_name()):
row = int(error_dict['row'])
col = int(error_dict['col'])
point = view.text_point(row - 1, col - 1)
error_dict['region'] = view.word(point)
if row in self.err_regions[view.buffer_id()]:
self.err_regions[view.buffer_id()][row] += [error_dict]
else:
self.err_regions[view.buffer_id()][row] = [error_dict]
def show_errors(self, view):
"""Show current error regions.
Args:
view (sublime.View): Current view
"""
if view.buffer_id() not in self.err_regions:
# view has no errors for it
return
current_error_dict = self.err_regions[view.buffer_id()]
regions = PopupErrorVis._as_region_list(current_error_dict)
log.debug("showing error regions: %s", regions)
view.add_regions(
PopupErrorVis._TAG,
regions,
PopupErrorVis._ERROR_SCOPE,
self.gutter_mark,
PopupErrorVis._ERROR_FLAGS)
def show_popup_if_needed(self, view, row):
"""Show a popup if it is needed in this row.
Args:
view (sublime.View): current view
row (int): number of row
"""
if view.buffer_id() not in self.err_regions:
return
current_err_region_dict = self.err_regions[view.buffer_id()]
if row in current_err_region_dict:
errors_dict = current_err_region_dict[row]
errors_html = PopupErrorVis._as_html(errors_dict)
view.show_popup(errors_html, max_width=self._MAX_POPUP_WIDTH)
else:
log.debug("no error regions for row: %s", row)
def show_phantoms(self, view):
"""Show phantoms for compilation errors.
Args:
view (sublime.View): current view
"""
view.erase_phantoms(PopupErrorVis._TAG)
if view.buffer_id() not in self.phantom_sets:
phantom_set = sublime.PhantomSet(view, PopupErrorVis._TAG)
self.phantom_sets[view.buffer_id()] = phantom_set
else:
phantom_set = self.phantom_sets[view.buffer_id()]
phantoms = []
current_error_dict = self.err_regions[view.buffer_id()]
for err in current_error_dict:
errors_dict = current_error_dict[err]
errors_html = PhantomErrorVis._as_html(errors_dict)
pt = view.text_point(err - 1, 1)
phantoms.append(sublime.Phantom(
sublime.Region(pt, view.line(pt).b),
errors_html,
sublime.LAYOUT_BELOW,
on_navigate=self._on_phantom_navigate))
phantom_set.update(phantoms)
def cursor_pos(view, pos=None):
"""Get current cursor position.
Args:
view (sublime.View): current view
pos (int, optional): given position. First selection by default.
Returns:
(row, col): tuple of row and col for cursor position
"""
if not pos:
pos = view.sel()
if len(pos) < 1:
# something is wrong
return None
# we care about the first position
pos = pos[0].a
(row, col) = view.rowcol(pos)
row += 1
col += 1
return (row, col)
def get_view_lang(view):
"""Get language from view description.
Args:
view (sublime.View): Current view
Returns:
str: language, "C", "C++", "Objective-C", or "Objective-C++""
"""
syntax = Tools.get_view_syntax(view)
if syntax in Tools.C_SYNTAX:
return "C"
if syntax in Tools.CPP_SYNTAX:
return "C++"
if syntax in Tools.OBJECTIVE_C_SYNTAX:
return "Objective-C"
if syntax in Tools.OBJECTIVE_CPP_SYNTAX:
return "Objective-C++"
return None
def get_view_syntax(view):
"""Get syntax from view description.
Args:
view (sublime.View): Current view
Returns:
str: syntax, e.g. "C", "C++"
"""
try:
syntax = re.findall(Tools.syntax_regex,
view.settings().get('syntax'))
if len(syntax) > 0:
return syntax[0]
except TypeError as e:
# if the view is killed while this is being run, an exception is
# thrown. Let's dela with it gracefully.
log.error("error while getting current language: '%s'", e)
return None
def settings_for_view(self, view):
"""Get settings stored for a view.
Args:
view (sublime.View): current view
Returns:
settings.SettingsStorage: settings for view
"""
if not self.__default_settings:
self.__init_default_settings()
view_id = view.buffer_id()
if view_id not in self.__settings_dict:
log.debug("no settings for view %s. Reinitializing.", view_id)
self.__init_for_view(view)
if view_id in self.__settings_dict:
# when the view is closed quickly there can be an error here
return self.__settings_dict[view_id]
return None
def update(self, view, settings):
"""Update build for current view.
Args:
view (sublime.View): this view
show_errors (TYPE): do we need to show errors? If not this is a
dummy function as we gain nothing from building it with binary.
"""
if settings.errors_style == SettingsStorage.NONE_STYLE:
# in this class there is no need to rebuild the file. It brings no
# benefits. We only want to do it if we need to show errors.
return False
start = time.time()
output_text = self.run_clang_command(view, "update")
end = time.time()
log.debug("rebuilding done in %s seconds", end - start)
self.save_errors(output_text)
self.show_errors(view)
def get_declaration_location(self, view, row, col):
"""Get location of declaration from given location in file.
Args:
view (sublime.View): current view
row (int): cursor row
col (int): cursor col
Returns:
Location: location of declaration
"""
with Completer.rlock:
if not self.tu:
return None
cursor = self.tu.cursor.from_location(
self.tu, self.tu.get_location(view.file_name(), (row, col)))
ref_new = None
if cursor and cursor.referenced:
ref = cursor.referenced
if cursor.kind.is_declaration():
ref_new = ref.get_definition()
return (ref_new or ref).location
return None
def on_selection_modified_async(self, view):
"""Call when selection is modified. Executed in gui thread.
Args:
view (sublime.View): current view
"""
settings = EasyClangComplete.settings_manager.settings_for_view(view)
if settings.errors_style == SettingsStorage.PHANTOMS_STYLE:
return
if Tools.is_valid_view(view):
(row, _) = SublBridge.cursor_pos(view)
view_config = EasyClangComplete.view_config_manager.get_from_cache(
view)
if not view_config:
return
if not view_config.completer:
return
view_config.completer.error_vis.show_popup_if_needed(view, row)
def config_for_scope(view: sublime.View) -> 'Optional[ClientConfig]':
# check window_client_config first
window_client_config = get_window_client_config(view)
if not window_client_config:
global_client_config = get_global_client_config(view)
if global_client_config:
window = view.window()
if window:
window_client_config = apply_window_settings(global_client_config, view)
add_window_client_config(window, window_client_config)
return window_client_config
else:
# always return a client config even if the view has no window anymore
return global_client_config
return window_client_config
def client_for_view(view: sublime.View) -> 'Optional[Client]':
window = view.window()
if not window:
debug("no window for view", view.file_name())
return None
config = config_for_scope(view)
if not config:
debug("config not available for view", view.file_name())
return None
clients = window_clients(window)
if config.name not in clients:
debug(config.name, "not available for view",
view.file_name(), "in window", window.id())
return None
else:
return clients[config.name]
def start_active_views():
window = sublime.active_window()
if window:
views = list() # type: List[sublime.View]
num_groups = window.num_groups()
for group in range(0, num_groups):
view = window.active_view_in_group(group)
if is_supported_view(view):
if window.active_group() == group:
views.insert(0, view)
else:
views.append(view)
if len(views) > 0:
first_view = views.pop(0)
debug('starting active=', first_view.file_name(), 'other=', len(views))
initialize_on_open(first_view)
if len(views) > 0:
for view in views:
didopen_after_initialize.append(view)
def initialize_on_open(view: sublime.View):
window = view.window()
if not window:
return
if window_clients(window):
unload_old_clients(window)
global didopen_after_initialize
config = config_for_scope(view)
if config:
if config.enabled:
if config.name not in window_clients(window):
didopen_after_initialize.append(view)
start_window_client(view, window, config)
else:
debug(config.name, 'is not enabled')
def queue_did_change(view: sublime.View):
buffer_id = view.buffer_id()
buffer_version = 1
pending_buffer = None
if buffer_id in pending_buffer_changes:
pending_buffer = pending_buffer_changes[buffer_id]
buffer_version = pending_buffer["version"] + 1
pending_buffer["version"] = buffer_version
else:
pending_buffer_changes[buffer_id] = {
"view": view,
"version": buffer_version
}
sublime.set_timeout_async(
lambda: purge_did_change(buffer_id, buffer_version), 500)
def notify_did_open(view: sublime.View):
config = config_for_scope(view)
client = client_for_view(view)
if client and config:
view.settings().set("show_definitions", False)
window = view.window()
view_file = view.file_name()
if window and view_file:
if not has_document_state(window, view_file):
ds = get_document_state(window, view_file)
if settings.show_view_status:
view.set_status("lsp_clients", config.name)
params = {
"textDocument": {
"uri": filename_to_uri(view_file),
"languageId": config.languageId,
"text": view.substr(sublime.Region(0, view.size())),
"version": ds.version
}
}
client.send_notification(Notification.didOpen(params))
def notify_did_change(view: sublime.View):
file_name = view.file_name()
window = view.window()
if window and file_name:
if view.buffer_id() in pending_buffer_changes:
del pending_buffer_changes[view.buffer_id()]
# config = config_for_scope(view)
client = client_for_view(view)
if client:
document_state = get_document_state(window, file_name)
uri = filename_to_uri(file_name)
params = {
"textDocument": {
"uri": uri,
# "languageId": config.languageId, clangd does not like this field, but no server uses it?
"version": document_state.inc_version(),
},
"contentChanges": [{
"text": view.substr(sublime.Region(0, view.size()))
}]
}
client.send_notification(Notification.didChange(params))
def update_diagnostics_phantoms(view: sublime.View, diagnostics: 'List[Diagnostic]'):
global phantom_sets_by_buffer
buffer_id = view.buffer_id()
if not settings.show_diagnostics_phantoms or view.is_dirty():
phantoms = None
else:
phantoms = list(
create_phantom(view, diagnostic) for diagnostic in diagnostics)
if phantoms:
phantom_set = phantom_sets_by_buffer.get(buffer_id)
if not phantom_set:
phantom_set = sublime.PhantomSet(view, "lsp_diagnostics")
phantom_sets_by_buffer[buffer_id] = phantom_set
phantom_set.update(phantoms)
else:
phantom_sets_by_buffer.pop(buffer_id, None)
def show_tooltip(self, view: sublime.View, tooltip: str, content: Dict[str, str], fallback: Callable) -> None: # noqa
"""Generates and display a tooltip or pass execution to fallback
"""
st_ver = int(sublime.version())
if st_ver < 3070:
return fallback()
width = get_settings(view, 'font_size', 8) * 75
kwargs = {'location': -1, 'max_width': width if width < 900 else 900}
if st_ver >= 3071:
kwargs['flags'] = sublime.COOPERATE_WITH_AUTO_COMPLETE
text = self._generate(tooltip, content)
if text is None:
return fallback()
return view.show_popup(text, **kwargs)
def on_modified(self, view: sublime.View) -> None:
"""
Called after changes have been made to a view.
Runs in a separate thread, and does not block the application.
"""
constraints = ONLY_CODE | NOT_SCRATCH | LINTING_ENABLED
if check_linting(view, constraints, code=self.lang.lower()):
# remove previous linting marks if configured to do so
if not get_settings(view, 'anaconda_linter_persistent', False):
erase_lint_marks(view)
# check lint behavior and lint if always and auto lint is set
if check_linting_behaviour(view, ['always']):
# update the last selected line number
self.last_selected_line = -1
ANACONDA['LAST_PULSE'] = time.time()
ANACONDA['ALREADY_LINTED'] = False
if self.check_auto_lint:
self.lint()
else:
self._erase_marks_if_no_linting(view)
def on_modified(self, view: sublime.View) -> None:
"""Called after changes has been made to a view.
"""
if not is_python(view, autocomplete_ignore_repl=True):
return
global JUST_COMPLETED
if (view.substr(view.sel()[0].begin() - 1) == '('
and view.substr(view.sel()[0].begin()) == ')'):
if JUST_COMPLETED:
view.run_command('anaconda_complete_funcargs')
JUST_COMPLETED = False
elif view.substr(sublime.Region(
view.sel()[0].begin() - 7, view.sel()[0].end())) == 'import ':
self._run_auto_complete()
def prepare_data_status(
self, view: sublime.View, data: Dict[str, Any]) -> Any:
"""Prepare the returned data for status
"""
if (data['success'] and 'No docstring' not
in data['doc'] and data['doc'] != 'list\n'):
self.signature = data['doc']
if self._signature_excluded(self.signature):
return
try:
self.signature = self.signature.splitlines()[2]
except KeyError:
return
return self._show_status(view)
def on_query_completions(view_id, prefix, locations):
v = sublime.View(view_id)
completions = []
flags = 0
for callback in all_callbacks['on_query_completions']:
try:
res = callback.on_query_completions(v, prefix, locations)
if isinstance(res, tuple):
completions += [normalise_completion(c) for c in res[0]]
flags |= res[1]
elif isinstance(res, list):
completions += [normalise_completion(c) for c in res]
except:
traceback.print_exc()
return (completions,flags)