python类dump()的实例源码

dumpcache.py 文件源码 项目:crm 作者: Incubaid 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def _update_json_file(data_dir, data, model_str):
    if len(model_str) > 100:
        model_str = model_str[:100]

    model_dir = os.path.abspath(os.path.join(data_dir, data['model']))
    _ensure_dirs(model_dir)

    record_path = os.path.abspath(os.path.join(
        model_dir, '%s_%s.json' % (data['id'], model_str)))

    updated_paths = [record_path]

    # Name changed
    if not os.path.exists(record_path):
        for root, dirs, files in os.walk(model_dir):
            for file in files:
                if file.startswith(data['id']):
                    print(file)
                    old = os.path.abspath(os.path.join(root, file))
                    os.rename(old, record_path)
                    updated_paths.append(old)

    with open(record_path, 'w') as f:
        json.dump(data, f, indent=4, sort_keys=True)
    return updated_paths
safari_ios.py 文件源码 项目:wptagent 作者: WPO-Foundation 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def wait_for_processing(self, task):
        """Wait for any background processing threads to finish"""
        if self.video_processing is not None:
            logging.debug('Waiting for video processing to finish')
            self.video_processing.communicate()
            self.video_processing = None
            if not self.job['keepvideo']:
                try:
                    os.remove(task['video_file'])
                except Exception:
                    pass
        opt = None
        if self.optimization is not None:
            opt = self.optimization.join()
        if self.wpt_result is not None:
            self.process_optimization_results(self.wpt_result['pageData'],
                                              self.wpt_result['requests'], opt)
            if self.path_base is not None:
                devtools_file = self.path_base + '_devtools_requests.json.gz'
                with gzip.open(devtools_file, 'wb', 7) as f_out:
                    json.dump(self.wpt_result, f_out)
ansify.py 文件源码 项目:ansify 作者: Kirkman 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def convert_image(options):
    print 'loading cache'
    options['cache'] = json.load(open('color_cache.json'))
    print 'beginning conversion'

    im = Image.open( options['filename'] )
    o = convert_frame(im, options)
    o += ANSI_RESET + '\n\n'

    # Save to ANSI file and add SAUCE record
    if options['output_file'] is not sys.stdout:
        save_frame(o,options)
        add_sauce(o,options)

    # Output to console (unicode)
    else:
        print_frame(o,options)

#   json.dump(options['cache'], open('color_cache.json','w'))



# MAIN FUNCTION
wayslack.py 文件源码 项目:wayslack 作者: wolever 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def refresh(self):
        self.status_file = self.path / "status.json"
        self.status = (
            {} if not self.status_file.exists() else
            json.loads(self.status_file.open().read())
        )

        for files in self.iter_file_lists():
            for file_obj in files:
                self.archive.downloader.add_file(file_obj)
                output_dir = self.path / ts2ymd(file_obj["created"])
                if not output_dir.exists():
                    output_dir.mkdir()
                output_file = output_dir / (file_obj["id"] + ".json")
                with open_atomic(str(output_file)) as f:
                    json.dump(file_obj, f)
cacheObject.py 文件源码 项目:osm-python-tools 作者: mocnik-science 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def query(self, *args, onlyCached=False, **kwargs):
        queryString, hashString = self._queryString(*args, **kwargs)
        filename = self.__cacheDir + '/' + self._prefix + '-' + self.__hash(hashString)
        if not os.path.exists(self.__cacheDir):
            os.makedirs(self.__cacheDir)
        if os.path.exists(filename):
            with open(filename, 'r') as file:
                data = ujson.load(file)
        elif onlyCached:
            print('[' + self._prefix + '] data not cached: ' + queryString)
            return None
        else:
            print('[' + self._prefix + '] downloading data: ' + queryString)
            if self._waitForReady() == None:
                if self.__lastQuery and self.__waitBetweenQueries and time.time() - self.__lastQuery < self.__waitBetweenQueries:
                    time.sleep(self.__waitBetweenQueries - time.time() + self.__lastQuery)
            self.__lastQuery = time.time()
            data = self.__query(queryString)
            with open(filename, 'w') as file:
                ujson.dump(data, file)
        result = self._rawToResult(data, queryString)
        if not self._isValid(result):
            raise(Exception('[' + self._prefix + '] error in result (' + filename + '): ' + queryString))
        return result
devtools_parser.py 文件源码 项目:devtools-parser 作者: WPO-Foundation 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def write(self):
        """Write out the resulting json data"""
        if self.out_file is not None and len(self.result['pageData']) and \
            len(self.result['requests']):
            try:
                _, ext = os.path.splitext(self.out_file)
                if ext.lower() == '.gz':
                    with gzip.open(self.out_file, 'wb') as f_out:
                        json.dump(self.result, f_out)
                else:
                    with open(self.out_file, 'w') as f_out:
                        json.dump(self.result, f_out)
            except Exception:
                logging.critical("Error writing to " + self.out_file)
snapshot.py 文件源码 项目:athena 作者: slint 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def save(self):
        """Save a snapshot of the dataset file in JSON-Line format."""
        with storage.open(self.path, 'w', self._compression_level) as fp:
            for dp in self._datapoints_query:
                json.dump((dp.pid, dp.json), fp)
                fp.write('\n')
run.py 文件源码 项目:twitter_mongodb_helper 作者: IDEA-NTHU-Taiwan 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def write_json_file(filename, path, result):
    """Writes the result to json with the given filename.

    Args:
        filename   (str): Filename to write to.
        path       (str): Directory path to use.
    """
    with open(path + filename + ".json", "w+") as json_file:
        ujson.dump(result, json_file)
    json_file.close()
cleanjson.py 文件源码 项目:cuny-bdif 作者: aristotle-tek 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def write_json_rows(rows, docfilepath, encode_html=True):
        """ Note that we are appending"""
        with open(docfilepath, 'a') as fp:
                for row in rows:
                        ujson.dump(row, fp, encode_html_chars=encode_html)
                        fp.write('\n')
#-----------------------------------------------------------

#-----------------------------------------------------------
__init__.py 文件源码 项目:mixpanel_api 作者: mixpanel 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def export_data(data, output_file, format='json', compress=False):
        """Writes and optionally compresses Mixpanel data to disk in json or csv format

        :param data: A list of Mixpanel events or People profiles, if format='json', arbitrary json can be exported
        :param output_file: Name of file to write to
        :param format:  Output format can be 'json' or 'csv' (Default value = 'json')
        :param compress:  Option to gzip output (Default value = False)
        :type data: list
        :type output_file: str
        :type format: str
        :type compress: bool

        """
        with open(output_file, 'w+') as output:
            if format == 'json':
                json.dump(data, output)
            elif format == 'csv':
                Mixpanel._write_items_to_csv(data, output_file)
            else:
                msg = "Invalid format - must be 'json' or 'csv': format = " + str(format) + '\n' \
                      + "Dumping json to " + output_file
                Mixpanel.LOGGER.warning(msg)
                json.dump(data, output)

        if compress:
            Mixpanel._gzip_file(output_file)
            os.remove(output_file)
__init__.py 文件源码 项目:mixpanel_api 作者: mixpanel 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def _prep_event_for_import(event, token, timezone_offset):
        """Takes an event dict and modifies it to meet the Mixpanel /import HTTP spec or dumps it to disk if it is invalid

        :param event: A Mixpanel event dict
        :param token: A Mixpanel project token
        :param timezone_offset: UTC offset (number of hours) of the timezone setting for the project that exported the
            data. Needed to convert the timestamp back to UTC prior to import.
        :type event: dict
        :type token: str
        :type timezone_offset: int | float
        :return: Mixpanel event dict with token added and timestamp adjusted to UTC
        :rtype: dict

        """
        # The /import API requires a 'time' and 'distinct_id' property, if either of those are missing we dump that
        # event to a log of invalid events and return
        if ('time' not in event['properties']) or ('distinct_id' not in event['properties']):
            Mixpanel.LOGGER.warning('Event missing time or distinct_id property, dumping to invalid_events.txt')
            with open('invalid_events.txt', 'a') as invalid:
                json.dump(event, invalid)
                invalid.write('\n')
                return
        event_copy = deepcopy(event)
        # transforms timestamp to UTC
        event_copy['properties']['time'] = int(int(event['properties']['time']) - (timezone_offset * 3600))
        event_copy['properties']['token'] = token
        return event_copy
dumpcache.py 文件源码 项目:crm 作者: Incubaid 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def _create_json_file(data_dir, data, model_str):
    if len(model_str) > 100:
        model_str = model_str[:100]

    model_dir = os.path.abspath(os.path.join(data_dir, data['model']))
    _ensure_dirs(model_dir)

    record_path = os.path.abspath(os.path.join(
        model_dir, '%s_%s.json' % (data['id'], model_str)))

    with open(record_path, 'w') as f:
        json.dump(data, f, indent=4, sort_keys=True)
    return [record_path]
dumpcache.py 文件源码 项目:crm 作者: Incubaid 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def _update_fs(items, data_dir, create=False, update=False, delete=False):
    """
    CREATE OPERATION HANDLING IS CONCERNED ONLY ABOUT ROOT models
    NON ROOT MODEL ITEMS ARE IGNORED

    We don't dump non-root models, since they belong to one root model
    any way and there data will be saved as part of a root model.
    example, if user created a task (non root model) and assigned it to contact
    cached data will look like
    {'changes': {'updated': IdentitySet([<crm.contact.models.Contact object at 0x7f5748a1f5c0>]),
    'deleted': IdentitySet([]),
    'created': IdentitySet([<crm.task.models.Task object at 0x7f574929dba8>])}}
    so Contact object has been updated any way and we'll handle that update add the task in contact data
    in File system.

    :param items: list of model dictionaries that was created
    :param data_dir: where to save files
    :return: newly created file paths
    :rtype: list
    """

    all_paths = []
    for item in items:
        data = item['data']
        if not _is_root_model(data['model']):
            continue
        if create:
            paths = _create_json_file(data_dir, data, item['obj_as_str'])
        elif update:
            paths = _update_json_file(data_dir, data, item['obj_as_str'])
        elif delete:
            paths = _delete_json_file(data_dir, data, item['obj_as_str'])
        all_paths.extend(paths)
    return all_paths
dumpdata.py 文件源码 项目:crm 作者: Incubaid 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def dumpdata():
    """
    Dump data table models into filesystem.
    Only Root models are dumped
    'Company', 'Contact', 'Deal',
    'Sprint', 'Project', 'Organization','User'
    """
    data_dir = app.config["DATA_DIR"]
    if not os.path.exists(data_dir):
        os.makedirs(data_dir)

    for model in RootModel.__subclasses__():
        model_dir = os.path.abspath(os.path.join(data_dir, model.__name__))
        if not os.path.exists(model_dir):
            os.mkdir(model_dir)

        for obj in model.query.all():
            obj_as_str = str(obj).replace('/', '_')
            if len(obj_as_str) > 100:
                obj_as_str = obj_as_str[:100]

            record_path = os.path.abspath(os.path.join(
                model_dir, '%s_%s.json' % (obj.id, obj_as_str)))
            data = obj.as_dict()
            with open(record_path, 'w') as f:
                json.dump(data, f, indent=4, sort_keys=True)
json_.py 文件源码 项目:just 作者: kootenpv 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def write(obj, fn):
    with open(fn, "w") as f:
        json.dump(obj, f, indent=4)
firefox.py 文件源码 项目:wptagent 作者: WPO-Foundation 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def process_requests(self, request_timings, task):
        """Convert all of the request and page events into the format needed for WPT"""
        result = {}
        result['requests'] = self.merge_requests(request_timings)
        result['pageData'] = self.calculate_page_stats(result['requests'])
        devtools_file = os.path.join(task['dir'], task['prefix'] + '_devtools_requests.json.gz')
        with gzip.open(devtools_file, 'wb', 7) as f_out:
            json.dump(result, f_out)
firefox_log_parser.py 文件源码 项目:wptagent 作者: WPO-Foundation 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def main():
    """ Main entry-point when running on the command-line"""
    import argparse
    parser = argparse.ArgumentParser(description='Chrome trace parser.',
                                     prog='trace-parser')
    parser.add_argument('-v', '--verbose', action='count',
                        help="Increase verbosity (specify multiple times for more)" \
                             ". -vvvv for full debug output.")
    parser.add_argument('-l', '--logfile', help="File name for the mozilla log.")
    parser.add_argument('-s', '--start',
                        help="Start Time in UTC with microseconds YYYY-MM-DD HH:MM:SS.xxxxxx.")
    parser.add_argument('-o', '--out', help="Output requests json file.")
    options, _ = parser.parse_known_args()

    # Set up logging
    log_level = logging.CRITICAL
    if options.verbose == 1:
        log_level = logging.ERROR
    elif options.verbose == 2:
        log_level = logging.WARNING
    elif options.verbose == 3:
        log_level = logging.INFO
    elif options.verbose >= 4:
        log_level = logging.DEBUG
    logging.basicConfig(
        level=log_level, format="%(asctime)s.%(msecs)03d - %(message)s", datefmt="%H:%M:%S")

    if not options.logfile or not options.start:
        parser.error("Input devtools file or start time is not specified.")

    parser = FirefoxLogParser()
    requests = parser.process_logs(options.logfile, options.start)
    if options.out:
        with open(options.out, 'w') as f_out:
            json.dump(requests, f_out, indent=4)
trace_parser.py 文件源码 项目:wptagent 作者: WPO-Foundation 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def write_json(self, out_file, json_data):
        """Write out one of the internal structures as a json blob"""
        try:
            _, ext = os.path.splitext(out_file)
            if ext.lower() == '.gz':
                with gzip.open(out_file, 'wb') as f:
                    json.dump(json_data, f)
            else:
                with open(out_file, 'w') as f:
                    json.dump(json_data, f)
        except BaseException:
            logging.critical("Error writing to " + out_file)
devtools_parser.py 文件源码 项目:wptagent 作者: WPO-Foundation 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def write(self):
        """Write out the resulting json data"""
        if self.out_file is not None and len(self.result['pageData']) and \
            len(self.result['requests']):
            try:
                _, ext = os.path.splitext(self.out_file)
                if ext.lower() == '.gz':
                    with gzip.open(self.out_file, 'wb') as f_out:
                        json.dump(self.result, f_out)
                else:
                    with open(self.out_file, 'w') as f_out:
                        json.dump(self.result, f_out)
            except Exception:
                logging.critical("Error writing to " + self.out_file)
webpagetest.py 文件源码 项目:wptagent 作者: WPO-Foundation 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def update_browser_viewport(self, task):
        """Update the browser border size based on the measured viewport"""
        if 'actual_viewport' in task and 'width' in task and 'height' in task and \
                self.job is not None and 'browser' in self.job:
            browser = self.job['browser']
            width = max(task['width'] - task['actual_viewport']['width'], 0)
            height = max(task['height'] - task['actual_viewport']['height'], 0)
            if browser not in self.margins or self.margins[browser]['width'] != width or \
                    self.margins[browser]['height'] != height:
                self.margins[browser] = {"width": width, "height": height}
                if not os.path.isdir(self.persistent_dir):
                    os.makedirs(self.persistent_dir)
                margins_file = os.path.join(self.persistent_dir, 'margins.json')
                with open(margins_file, 'wb') as f_out:
                    json.dump(self.margins, f_out)
safari_ios.py 文件源码 项目:wptagent 作者: WPO-Foundation 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def process_message(self, msg):
        """Process a message from the browser
        https://trac.webkit.org/browser/webkit/trunk/Source/JavaScriptCore/inspector/protocol"""
        try:
            if 'method' in msg and self.recording:
                parts = msg['method'].split('.')
                if len(parts) >= 2:
                    category = parts[0]
                    event = parts[1]
                    if category == 'Page':
                        self.process_page_event(event, msg)
                    elif category == 'Network':
                        self.process_network_event(event, msg)
                    elif category == 'Inspector':
                        self.process_inspector_event(event)
                    elif category == 'Timeline':
                        self.process_timeline_event(event, msg)
                    elif category == 'Console':
                        self.process_console_event(event, msg)
        except Exception:
            pass
        if self.timeline and 'method' in msg and self.recording:
            json.dump(msg, self.timeline)
            self.timeline.write(",\n")
        if 'id' in msg:
            response_id = int(re.search(r'\d+', str(msg['id'])).group())
            if response_id in self.pending_commands:
                self.pending_commands.remove(response_id)
                self.command_responses[response_id] = msg
ansify.py 文件源码 项目:ansify 作者: Kirkman 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def cache_all_colors():
    options = {}
    options['cache'] = json.load(open('color_cache.json'))

    for r in range(255):
        for g in range(255):
            for b in range(255):
                print 'r: ' + str(r) + ' | g: ' + str(g) + ' | b: ' + str(b)
                desired_color = { 
                    'r': r, 
                    'g': g, 
                    'b': b, 
                }
                color_id = str(r).zfill(3) + str(g).zfill(3) + str(b).zfill(3)
                closest_dist = INFINITY
                closest_color_index = 0
                for i, block_char in enumerate(ANSI_SHADED_BLOCKS_TO_RGB):
                    block_char_color = {
                        'r': int( block_char['r'] ), 
                        'g': int( block_char['g'] ), 
                        'b': int( block_char['b'] ), 
                    }
                    d = color_distance(block_char_color, desired_color)
                    if d < closest_dist:
                        closest_dist = d
                        closest_color_index = i
                # Add this index to our color cache so we don't have to look it up again
                options['cache'][color_id] = closest_color_index

    json.dump(options['cache'], open('color_cache.json','w'))


#@timing
userdb.py 文件源码 项目:ricedb 作者: TheReverend403 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def set_user_value(self, username, key, value):
        data = {key: value}
        try:
            self[username].update(data)
        except KeyError:
            self[username] = data
        with open(self.file, 'w') as fd:
            json.dump(self, fd)
13_analyse_results.py 文件源码 项目:webisalod 作者: sven-h 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def read_instance_types():

    redirects = read_redirects()
    subject_object = defaultdict(list)

    read_types(redirects, subject_object, 'instance_types_en.ttl')
    #read_types(redirects, subject_object, 'instance_types_transitive_en.ttl')
    #read_types(redirects, subject_object, 'instance_types_lhd_dbo_en.ttl')
    read_types(redirects, subject_object, 'instance_types_sdtyped_dbo_en.ttl')
    #read_types(redirects, subject_object, 'instance_types_dbtax_dbo_en.ttl')

    print('write json')
    with open('instance_types.json', 'w') as outfile:
        ujson.dump(subject_object, outfile)
    print('finish')
wayslack.py 文件源码 项目:wayslack 作者: wolever 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def _write_pending(self):
        try:
            self.pool.join()
        finally:
            to_write = list(self.pool.iter_incomplete())
            if not to_write:
                try:
                    self.pending_file.unlink()
                except OSError:
                    pass
                return

            with open_atomic(str(self.pending_file)) as f:
                json.dump(to_write, f)
wayslack.py 文件源码 项目:wayslack 作者: wolever 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def _refresh_messages(self):
        latest_archive = next(self.iter_archives(reverse=True), None)
        latest_ts = 0
        if latest_archive:
            msgs = self.load_messages(latest_archive)
            latest_ts = msgs[-1]["ts"] if msgs else 0

        slack = getattr(self.slack, self.attr)
        while True:
            resp = self._get_list(slack, latest_ts)
            assert_successful(resp)

            msgs = resp.body["messages"]
            msgs.sort(key=lambda m: m["ts"])

            if msgs and not self.path.exists():
                self.path.mkdir()

            for day, day_msgs in groupby(msgs, key=lambda m: ts2ymd(m["ts"])):
                day_msgs = list(day_msgs)
                day_archive = self.path / (day + ".json")
                cur = (
                    self.load_messages(day_archive)
                    if day_archive.exists() else []
                )
                cur.extend(day_msgs)
                print "%s: %s new messages in %s (saving to %s)" %(
                    self.pretty_name, len(day_msgs), self.pretty_name, day_archive,
                )
                for msg in day_msgs:
                    if "file" in msg or "attachments" in msg:
                        self.downloader.add_message(msg)
                with open_atomic(str(day_archive)) as f:
                    json.dump(cur, f)
                if float(day_msgs[-1]["ts"]) > float(latest_ts):
                    latest_ts = day_msgs[-1]["ts"]
            if not resp.body["has_more"]:
                break
wayslack.py 文件源码 项目:wayslack 作者: wolever 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def update_status(self, x):
        self.status.update(x)
        with open_atomic(str(self.status_file)) as f:
            json.dump(self.status, f)
ameliaprocessor.py 文件源码 项目:gulper 作者: QuantifiedSelfless 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def save_name(self, user_data):
        data = {
            "fbid": user_data.data['fbprofile']['id'],
            "showid": user_data.meta['showid'],
            "showdate": user_data.meta['showdate'],
        }
        os.makedirs("./data/" + self.name, exist_ok=True)
        filename = "./data/{}/{}.json".format(self.name, user_data.userid)
        with open(filename, "w+") as fd:
            json.dump(data, fd)
JSONDataSerializer.py 文件源码 项目:jsondata 作者: ArnoCan 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def json_export(self, sourcenode, fname, **kargs):
        """ Exports current data for later import.

        The exported data is a snapshot of current state.

        Args:
            fname: File name for the exported data.

            sourcenode: Base of sub-tree for export.
                None for complete JSON document.

            **kargs:
                ffs.

        Returns:
            When successful returns 'True', else returns either 'False',
            or raises an exception.

        Raises:
            JSONDataTargetFile:
        """
        if not sourcenode:
            sourcenode = self.data
        try:
            with open(fname, 'w') as fp:
                #ret = 
                myjson.dump(sourcenode, fp)
        except Exception as e:
            raise JSONDataTargetFile("open-"+str(e),"data.dump",str(fname))
        return True
JSONData.py 文件源码 项目:jsondata 作者: ArnoCan 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def __repr__(self):
        """Dump data.
        """
#         io = StringIO()
#         myjson.dump(self.data, io)
#         return io.getvalue()
        return repr(self.data)


问题


面经


文章

微信
公众号

扫码关注公众号