python类Observer()的实例源码

realtime.py 文件源码 项目:picopore 作者: scottgigante 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def __init__(self, runner):
        self.runner = runner

        self.event_handler = PatternMatchingEventHandler(patterns=["*.fast5"],
                ignore_patterns=[],
                ignore_directories=True)
        self.event_handler.on_created = self.on_created
        self.event_handler.on_moved = self.on_moved

        self.observer = Observer()

        self.observedPaths = []
        for path in self.runner.input:
            if os.path.isdir(path):
                self.observer.schedule(self.event_handler, path, recursive=True)
                self.observedPaths.append(path)
        log("Monitoring {} in real time. Press Ctrl+C to exit.".format(", ".join(self.observedPaths)))
container_notifier.py 文件源码 项目:docker-windows-volume-watcher 作者: merofeev 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def __init__(self, container, host_dir, container_dir):
        """
        Initialize a new instance of ContainerNotifier

        Args:
            container: Container
            host_dir (str): Host directory
            container_dir (str): Container directory
        """
        self.container = container
        self.host_dir = host_dir
        self.container_dir = container_dir

        event_handler = PatternMatchingEventHandler(ignore_directories=False)
        handler = self.__change_handler
        event_handler.on_created = handler
        event_handler.on_moved = handler
        event_handler.on_modified = handler

        self.observer = Observer()
        self.observer.schedule(event_handler, host_dir, recursive=True)
        self.observer.start()
directorymonitor.py 文件源码 项目:irida-miseq-uploader 作者: phac-nml 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def monitor_directory(directory):
    """Starts monitoring the specified directory in a background thread. File events
    will be passed to the `CompletedJobInfoEventHandler`.

    Arguments:
        directory: the directory to monitor.
    """
    observer = Observer()

    logging.info("Getting ready to monitor directory {}".format(directory))
    event_handler = CompletedJobInfoEventHandler()
    observer.schedule(event_handler, directory, recursive=True)

    def stop_monitoring(*args, **kwargs):
        """Tells watchdog to stop watching the directory when the newly processed run
        was discovered."""
        logging.info("Halting monitoring on directory because run was discovered.")
        observer.stop()
        observer.join()

    pub.subscribe(stop_monitoring, SettingsDialog.settings_closed_topic)
    pub.subscribe(stop_monitoring, DirectoryMonitorTopics.new_run_observed)
    pub.subscribe(stop_monitoring, DirectoryMonitorTopics.shut_down_directory_monitor)

    observer.start()
watchmedo.py 文件源码 项目:jf 作者: rolycg 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def shell_command(args):
    """
    Subcommand to execute shell commands in response to file system events.

    :param args:
        Command line argument options.
    """
    from watchdog.observers import Observer
    from watchdog.tricks import ShellCommandTrick

    if not args.command:
        args.command = None

    patterns, ignore_patterns = parse_patterns(args.patterns,
                                               args.ignore_patterns)
    handler = ShellCommandTrick(shell_command=args.command,
                                patterns=patterns,
                                ignore_patterns=ignore_patterns,
                                ignore_directories=args.ignore_directories,
                                wait_for_process=args.wait_for_process)
    observer = Observer(timeout=args.timeout)
    observe_with(observer, handler, args.directories, args.recursive)
watchmedo.py 文件源码 项目:hacker-scripts 作者: restran 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def shell_command(args):
    """
    Subcommand to execute shell commands in response to file system events.

    :param args:
        Command line argument options.
    """
    from watchdog.observers import Observer
    from watchdog.tricks import ShellCommandTrick

    if not args.command:
        args.command = None

    patterns, ignore_patterns = parse_patterns(args.patterns,
                                               args.ignore_patterns)
    handler = ShellCommandTrick(shell_command=args.command,
                                patterns=patterns,
                                ignore_patterns=ignore_patterns,
                                ignore_directories=args.ignore_directories,
                                wait_for_process=args.wait_for_process,
                                drop_during_process=args.drop_during_process)
    observer = Observer(timeout=args.timeout)
    observe_with(observer, handler, args.directories, args.recursive)
commands.py 文件源码 项目:markdownreveal 作者: markdownreveal 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def show(markdown_file: Path, host: str='localhost', port: int=8123):
    """
    Visualize your presentation (default).
    """
    markdown_file = Path(markdown_file)

    observer = Observer()
    handler = Handler(markdown_file)
    # Initial generation
    generate(markdown_file)
    observer.schedule(handler, '.', recursive=True)
    observer.start()

    server = Server()
    config = load_config()
    server.watch(str(config['output_path'] / '.reload'), delay=0)
    server.serve(
        root=str(config['output_path']),
        restart_delay=0,
        debug=True,
        open_url=True,
        open_url_delay=0,
        host=host,
        port=port,
    )
loader_watchdog.py 文件源码 项目:Informed-Finance-Canary 作者: Darthone 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def main():
    args = parse_args()
    config = common.load_config_file(args.config)
    logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S')

    event_handler = ArticleHandler(config['done_prefix'], config['error'])
    common.create_dir(config["done_prefix"])
    common.create_dir(config["error"])

    observer = Observer()
    for path in config['watch']:
        load_prexisting(path, config['done_prefix'], config['error'])
        observer.schedule(event_handler, path)
    observer.start()
    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        observer.stop()
    observer.join()
pymonitor.py 文件源码 项目:python1 作者: lilizhiwei 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def start_watch(path, callback):
    observer = Observer()  # ???????
    # ?????????????????????????????
    # ????????????restart???
    # recursive=True??????????????????????
    observer.schedule(MyFileSystemEventHander(restart_process), path, recursive=True)
    observer.start()  # ?????
    log('Watching directory %s...' % path)
    # ?????????subprocess.Popen??????python??????
    start_process()
    try:
        while True:
            time.sleep(0.5)
    except KeyboardInterrupt:
        observer.stop()
    observer.join()
watch.py 文件源码 项目:logscan 作者: magedu 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def __init__(self, filename, counter):
        self.filename = path.abspath(filename)
        self.queue = Queue()
        self.check_chain = CheckerChain(self.queue, counter)
        self.observer = Observer()
        self.fd = None
        self.offset = 0
        if path.isfile(self.filename):
            self.fd = open(self.filename)
            self.offset = path.getsize(self.filename)
cli.py 文件源码 项目:psync 作者: lazywei 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def watch():
    is_proj, root = get_project_root()

    state = {"dirty": False}

    if not is_proj:
        click.echo("Run psync to generate .psync config file.")
    else:
        click.echo("Start watching {} ...".format(root))
        event_handler = watcher.AnyEventHandler(state)
        observer = Observer()
        observer.schedule(event_handler, root, recursive=True)
        observer.start()
        try:
            while True:
                if state["dirty"]:
                    click.echo("Detect modification. Perform sync.")
                    perform_sync()
                    state["dirty"] = False
                time.sleep(1)
        except KeyboardInterrupt:
            observer.stop()
        observer.join()
pymonitor.py 文件源码 项目:mblog 作者: moling3650 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def start_watch(path, callback):
    observer = Observer()
    observer.schedule(MyFileSystemEventHander(restart_process), path, recursive=True)
    observer.start()
    log('Watching directory %s...' % path)
    start_process()
    try:
        while True:
            time.sleep(0.5)
    except KeyboardInterrupt:
        observer.stop()
    observer.join()
cli.py 文件源码 项目:bock 作者: afreeorange 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def start(port, articles_path, debug, version):
    """Start a Tornado server with an instance of the wiki. Handle the
    keyboard interrupt to stop the wiki. Start a filesystem observer to listen
    to changes to wiki articles.
    """

    if version:
        print('Bock v{}'.format(__version__))
        sys.exit(0)

    wiki = create_wiki(articles_path=articles_path, debug=debug)

    observer = Observer()
    observer.schedule(
        BockRepositoryEventHandler(patterns=['*.md'], wiki=wiki),
        wiki.config['articles_path'],
        recursive=True,
    )

    Process(
        target=article_watcher,
        args=(wiki, observer,)
    ).start()

    Process(
        target=web_server,
        args=(wiki, port, debug,)
    ).start()
reloader.py 文件源码 项目:uzdevsbot 作者: Uzbek-Developers 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def run_with_reloader(loop, coroutine, cleanup=None, *args, **kwargs):
    """ Run coroutine with reloader """

    clear_screen()
    print("??  Running in debug mode with live reloading")
    print("    (don't forget to disable it for production)")

    # Create watcher
    handler = Handler(loop)
    watcher = Observer()

    # Setup
    path = realpath(os.getcwd())
    watcher.schedule(handler, path=path, recursive=True)
    watcher.start()

    print("    (watching {})".format(path))

    # Run watcher and coroutine together
    done, pending = await asyncio.wait([coroutine, handler.changed],
                                       return_when=asyncio.FIRST_COMPLETED)

    # Cleanup
    cleanup and cleanup()
    watcher.stop()

    for fut in done:
        # If change event, then reload
        if isinstance(fut.result(), Event):
            print("Reloading...")
            reload()
dactyl_build.py 文件源码 项目:dactyl 作者: ripple 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def watch(mode, target, only_page="", pdf_file=DEFAULT_PDF_FILE,
          es_upload=NO_ES_UP):
    """Look for changed files and re-run the build whenever there's an update.
       Runs until interrupted."""
    target = get_target(target)

    class UpdaterHandler(PatternMatchingEventHandler):
        """Updates to pattern-matched files means rendering."""
        def on_any_event(self, event):
            logger.info("got event!")
            # bypass_errors=True because Watch shouldn't
            #  just die if a file is temporarily not found
            if mode == "pdf":
                make_pdf(pdf_file, target=target, bypass_errors=True,
                    only_page=only_page, es_upload=es_upload)
            else:
                render_pages(target, mode=mode, bypass_errors=True,
                            only_page=only_page, es_upload=es_upload)
            logger.info("done rendering")

    patterns = ["*template-*.html",
                "*.md",
                "*code_samples/*"]

    event_handler = UpdaterHandler(patterns=patterns)
    observer = Observer()
    observer.schedule(event_handler, config["template_path"], recursive=True)
    observer.schedule(event_handler, config["content_path"], recursive=True)
    observer.start()
    # The above starts an observing thread,
    #   so the main thread can just wait
    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        observer.stop()
    observer.join()
watcher.py 文件源码 项目:chainerboard 作者: koreyou 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def watch_file(inputfile, timeline_handler):
    # log handler does not load on detecting the initial file
    timeline_handler.load(inputfile)
    event_handler = LogHandler(inputfile, timeline_handler)
    observer = Observer()
    observer.schedule(event_handler,
                      os.path.dirname(os.path.abspath(inputfile)))
    observer.start()
    yield
    observer.stop()
    observer.join()
flowcoll.py 文件源码 项目:tool 作者: PathDump 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def init (dirpath):
    global observer, started, start_time

    if started: return
    event_handler = MyHandler()
    observer = Observer()
    observer.schedule (event_handler, path = dirpath, recursive = False)
    observer.start()

    start_time = time.time()
    started = True
uploader_daemon.py 文件源码 项目:google-music-manager 作者: jaymoulin 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def upload(directory='.', oauth='~/oauth', remove=False,
           uploader_id=netifaces.ifaddresses('eth0')[netifaces.AF_LINK][0]['addr'].upper()):
    logging.basicConfig(level=logging.INFO)
    logger = logging.getLogger(__name__)
    logger.info("Init Daemon - Press Ctrl+C to quit")

    api = Musicmanager()
    event_handler = MusicToUpload()
    event_handler.api = api
    event_handler.path = directory
    event_handler.willDelete = remove
    event_handler.logger = logger
    if not api.login(oauth, uploader_id):
        print("Error with oauth credentials")
        sys.exit(1)
    if remove:
        files = [file for file in glob.glob(directory + '/**/*', recursive=True)]
        for file_path in files:
            if os.path.isfile(file_path):
                logger.info("Uploading : " + file_path)
                uploaded, matched, not_uploaded = api.upload(file_path, True)
                if uploaded or matched:
                    os.remove(file_path)
    observer = Observer()
    observer.schedule(event_handler, directory, recursive=True)
    observer.start()
    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        observer.stop()
    observer.join()
pymonitor.py 文件源码 项目:awesome-python3-webapp 作者: syusonn 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def start_watch(path,callback):
    observer = Observer()
    observer.schedule(MyFileSystemEventHandler(restart_process),path,recursive=True)
    observer.start()
    log('Watching directory %s...' % path)
    start_process()
    try:
        while True:
            time.sleep(0.5)
    except KeyboardInterrupt:
        observer.stop()
    observer.join()
watch.py 文件源码 项目:harrier 作者: samuelcolvin 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def watch(config: Config):
    observer = Observer()
    event_handler = HarrierEventHandler(config)
    logger.info('Watch mode starting...')
    event_handler.build()
    event_handler.check_build()

    server_process = Process(target=serve, args=(config.target_dir, config.uri_subdirectory, config.serve_port,
                                                 config.asset_file))
    server_process.start()

    sp_ctrl = SubprocessGroupController(config.subprocesses)

    observer.schedule(event_handler, str(config.root), recursive=True)
    observer.start()
    try:
        event_handler.wait(sp_ctrl.check)
    except KeyboardInterrupt:
        pass
    finally:
        logger.warning('killing dev server')
        sp_ctrl.terminate()
        observer.stop()
        observer.join()
        if server_process.exitcode not in {None, 0}:
            raise RuntimeError('Server process already terminated with exit code {}'.format(server_process.exitcode))
        else:
            server_process.terminate()
            time.sleep(0.1)
serve.py 文件源码 项目:harrier 作者: samuelcolvin 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def serve(serve_root, subdirectory, port, asset_file=None):
    app = create_app(serve_root, subdirectory=subdirectory, asset_file=asset_file)

    # TODO in theory file watching could be replaced by accessing tool_chain.source_map
    observer = Observer()
    event_handler = DevServerEventEventHandler(app, serve_root)
    observer.schedule(event_handler, str(serve_root), recursive=True)
    observer.start()

    logger.info('Started dev server at http://localhost:%s, use Ctrl+C to quit', port)

    try:
        web.run_app(app, port=port, print=lambda msg: None)
    except KeyboardInterrupt:
        pass
    finally:
        observer.stop()
        observer.join()
plugin_collection.py 文件源码 项目:globibot 作者: best-coloc-ever 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def __init__(self, bot, plugin_path):
        plugin_abs_path = os.path.join(os.getcwd(), plugin_path)
        parent_plugin_abs_path, plugin_dir = os.path.split(plugin_abs_path)
        sys.path.insert(0, parent_plugin_abs_path)

        self.path_observer = PathObserver()
        self.plugin_reloaders = []

        for name, config in bot.plugin_descriptors:
            reloader = PluginReloader(plugin_dir, name, bot, config)
            self.plugin_reloaders.append(reloader)
            self.path_observer.schedule(
                reloader,
                path_join(plugin_abs_path, name),
                recursive=True
            )

        self.loaded = False
pickle_core.py 文件源码 项目:cachier 作者: shaypal5 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def wait_on_entry_calc(self, key):
        with self.lock:
            self._reload_cache()
            entry = self._get_cache()[key]
            if not entry['being_calculated']:
                return entry['value']
        event_handler = _PickleCore.CacheChangeHandler(
            filename=self._cache_fname(),
            core=self,
            key=key
        )
        observer = Observer()
        event_handler.inject_observer(observer)
        observer.schedule(
            event_handler,
            path=EXPANDED_CACHIER_DIR,
            recursive=True
        )
        observer.start()
        observer.join(timeout=1.0)
        if observer.isAlive():
            # print('Timedout waiting. Starting again...')
            return self.wait_on_entry_calc(key)
        # print("Returned value: {}".format(event_handler.value))
        return event_handler.value
__fabfile.py 文件源码 项目:hugo_jupyter 作者: knowsuchagency 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def serve(hugo_args='', init_jupyter=True):
    """
    Watch for changes in jupyter notebooks and render them anew while hugo runs.

    Args:
        init_jupyter: initialize jupyter if set to True
        hugo_args: command-line arguments to be passed to `hugo server`
    """
    observer = Observer()
    observer.schedule(NotebookHandler(), 'notebooks')
    observer.start()

    hugo_process = sp.Popen(('hugo', 'serve', *shlex.split(hugo_args)))

    if init_jupyter:
        jupyter_process = sp.Popen(('jupyter', 'notebook'), cwd='notebooks')

    local('open http://localhost:1313')

    try:
        print(crayons.green('Successfully initialized server(s)'),
              crayons.yellow('press ctrl+C at any time to quit'),
              )
        while True:
            pass
    except KeyboardInterrupt:
        print(crayons.yellow('Terminating'))
    finally:
        if init_jupyter:
            print(crayons.yellow('shutting down jupyter'))
            jupyter_process.kill()

        print(crayons.yellow('shutting down watchdog'))
        observer.stop()
        observer.join()
        print(crayons.yellow('shutting down hugo'))
        hugo_process.kill()
        print(crayons.green('all processes shut down successfully'))
        sys.exit(0)
pymonitor.py 文件源码 项目:python-awesome-web 作者: tianzhenyun 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def start_watch(path, callback):
    observer = Observer()
    observer.schedule(MyFileSystemEventHandler(restart_process), path, recursive=True)
    observer.start()

    log('Watching directory {}...'.format(path))
    start_process()
    try:
        while True:
            time.sleep(2)
    except KeyboardInterrupt:
        observer.stop()
    observer.join()
watch.py 文件源码 项目:vyper 作者: admiralobvious 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def watch_path(self):
        observer = Observer()
        observer.schedule(self.handler, self.directory)
        observer.start()
        try:
            while True:
                event = self.event
                if event is not None and event.src_path == self.config_file:
                    self.v.read_in_config()
                    if self.v._on_config_change is not None:
                        self.v._on_config_change()
                time.sleep(1)
        except KeyboardInterrupt:
            observer.stop()
        observer.join()
spe2fitsGUI.py 文件源码 项目:spe2fits 作者: jerryjiahaha 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def listener(self):
        """ observer of files, for listening file created event
        """
        if not hasattr(self, "_listener"):
            self._listener = Observer()
            self._listen_handler = customerDirHandler(self.on_created)
            self._listener.start()
        return self._listener
Client.py 文件源码 项目:behem0th 作者: robot0nfire 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def __init__(self, path='.', ignore_list=None, event_handler=None, no_log=True, verbose_log=False):
        log.NO_LOG = no_log and not verbose_log
        log.VERBOSE_LOG = not log.NO_LOG and verbose_log

        self._sock = None
        self._rlock = threading.RLock()
        self._peers = []

        self._sync_path = os.path.abspath(path)

        self._ignore_list = IGNORE_LIST
        if ignore_list:
            self._ignore_list += ignore_list

        log.info_v('Ignored files/directories: {0}', self._ignore_list)

        self._filelist = {}
        self._observer = Observer()
        self._observer.name = self._observer.name.replace('Thread', 'fs-event-handler')
        self._observer.schedule(_FsEventHandler(self), self._sync_path, recursive=True)

        log.info_v("Started watching folder '{0}'", self._sync_path)

        self._fsevent_ignore_list = []

        self._event_handler = event_handler if event_handler else EventHandler()
ChannelLibraries.py 文件源码 项目:QGL 作者: BBN-Q 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def __init__(self, main_path, callback):
        super(LibraryFileWatcher, self).__init__()

        self.main_path = os.path.normpath(main_path)
        self.callback = callback

        # Perform a preliminary loading to find all of the connected files...
        # TODO: modularity
        with open(os.path.abspath(self.main_path), 'r') as FID:
            loader = config.Loader(FID)
            try:
                tmpLib = loader.get_single_data()
                self.filenames = [os.path.normpath(lf) for lf in loader.filenames]
            finally:
                loader.dispose()

        self.eventHandler = MyEventHandler(self.filenames, self.callback)
        self.observer = Observer()
        self.observer.schedule(self.eventHandler, path=os.path.dirname(os.path.abspath(main_path)))

        self.observer.start()
        self.resume()
watcher.py 文件源码 项目:Yugioh-bot 作者: will7200 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def start_observer(self):
        self.observer = Observer()
        self.observer.schedule(self.watcher, os.path.dirname(self.file_observing), recursive=False)
        self.observer.start()
test_watchFile.py 文件源码 项目:Yugioh-bot 作者: will7200 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def setUp(self):
        set_data_file(self.data_file)
        self.watcher = WatchFile(patterns=[self.data_file])
        self.observer = Observer()
        self.observer.schedule(self.watcher, r"D:\Sync\OneDrive\Yu-gi-oh_bot", recursive=False)
        self.observer.start()
        self.data = read_data_file()
        self.data['test'] = 'yes'


问题


面经


文章

微信
公众号

扫码关注公众号