python类dumps()的实例源码

utils.py 文件源码 项目:ISB-CGC-pipelines 作者: isb-cgc 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def restartJobs(args, config):  # TODO: reimplement
        pipelineDbUtils = PipelineDbUtils(config)
        pipelineQueueUtils = PipelineQueueUtils('WAIT_Q')

        if args.jobId:
            request = json.loads(pipelineDbUtils.getJobInfo(select=["request"], where={"job_id": args.jobId})[0].request)
            msg = {
                "job_id": args.jobId,
                "request": request
            }
            pipelineQueueUtils.publish(json.dumps(msg))

        if args.preempted:
            preempted = pipelineDbUtils.getJobInfo(select=["job_id", "request"], where={"current_status": "PREEMPTED"})

            for p in preempted:
                msg = {
                    "job_id": p.job_id,
                    "request": json.loads(p.request)
                }
                pipelineQueueUtils.publish(json.dumps(msg))
STFDevicesControl.py 文件源码 项目:PhonePerformanceMeasure 作者: KyleCe 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def add_user_devices(self, serial):
        # (url, access_token, api_token) = self.get_api_conf()
        api_url = self.url + "/api/v1/user/devices"
        token = self.access_token + " " + self.api_token

        data = {'serial': serial}
        request = urllib2.Request(api_url, json.dumps(data))
        request.add_header('Authorization', token)
        request.add_header('Content-Type', 'application/json')
        try:
            urllib2.urlopen(request)
        except Exception, e:
            print e.code
            print e.read()

    # ?????????
signals.py 文件源码 项目:socialhome 作者: jaywink 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def notify_listeners(content):
    """Send out to listening consumers."""
    data = json.dumps({"event": "new", "id": content.id})
    if content.content_type == ContentType.REPLY:
        # Content reply
        StreamConsumer.group_send("streams_content__%s" % content.parent.channel_group_name, data)
    elif content.content_type == ContentType.SHARE:
        # Share
        # TODO do we need to do much?
        pass
    else:
        # Public stream
        if content.visibility == Visibility.PUBLIC:
            StreamConsumer.group_send("streams_public", data)
        # Tag streams
        for tag in content.tags.all():
            StreamConsumer.group_send("streams_tag__%s" % tag.channel_group_name, data)
        # Profile streams
        StreamConsumer.group_send("streams_profile__%s" % content.author.id, data)
        StreamConsumer.group_send("streams_profile_all__%s" % content.author.id, data)
        # Followed stream
        followed_qs = Profile.objects.followers(content.author).filter(user__isnull=False)
        for username in followed_qs.values_list("user__username", flat=True):
            StreamConsumer.group_send("streams_followed__%s" % username, data)
post_api.py 文件源码 项目:uicourses_v2 作者: sumerinlan 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def write_response(self, d):
        # send header
        self.send_response(200)
        self.send_header('Content-type', 'text/json; charset=utf-8')
        self.end_headers()
        log('I', 'conn', 'Header sent.')

        # send data
        log('I', 'conn', 'Sending data...')
        json_str = json.dumps(d)
        # print(json_str.encode('utf-8').decode('unicode-escape').replace('\n', '\\n'))
        self.wfile.write(json_str.replace('"', '\\"')
                         .decode('unicode-escape')
                         .encode('utf-8')
                         .replace('\n', '\\n'))
        log('C', 'conn', 'Data sent.')

        # clean up
        self.wfile.close()
        log('C', 'conn', 'Connection closed.')
test1.py 文件源码 项目:cerebros_bot 作者: jslim18 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def getESCXBalance(address): 
   try:
      payload = {
         "method": "get_balances",
         "params": {
            "filters":[{"field": "address", "op": "==", "value": address},
                       {"field": "asset", "op": "==", "value": "ESCX"}],
            "filterop": "and"
            },
          "jsonrpc":"2.0",
          "id":0
         }
      response = requests.post(url, data=json.dumps(payload), headers=headers, auth=auth)
      json_data = response.json()
      #quantity = json_data.quantity 
      return (json_data['result'].pop()['quantity']) / 100000000
   except: 
      return 0;
data.py 文件源码 项目:ISB-CGC-pipelines 作者: isb-cgc 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def getFilesize(fileUuid, tokenFile=None, projectId=None):
        if tokenFile:
            filters = {
                "op": "=",
                "content": {
                    "field": "file_id",
                    "value": [fileUuid]
                }
            }

            params = {
                "filters": json.dumps(filters)
            }

            fileInfo = GDCDataUtils.query(tokenFile, "files", params=params)

            return int(fileInfo.json()["data"]["hits"][0]["file_size"])

        else:
            bq = GoogleApiService.create('bq', 'v2')
            body = {
                "query": "SELECT SUM(a_file_size) FROM GDC_metadata.GCS_join1 WHERE file_id = {fileUuid}".format(fileUuid=fileUuid)
            }
            results = bq.jobs().query(projectId=projectId, body=body).execute()
            return results["rows"][results["rows"].keys()[0]][0]
data.py 文件源码 项目:ISB-CGC-pipelines 作者: isb-cgc 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def constructGCSFilePath(fileUuid, tokenFile):
        filters = {
            "op": "=",
            "content": {
                "field": "file_id",
                "value": [fileUuid]
            }
        }

        params = {
            "filters": json.dumps(filters)
        }

        query = "?expand=cases.project"

        fileInfo = GDCDataUtils.query(tokenFile, "files", query=query, params=params).json()
        pprint.pprint(fileInfo)

        return "{project}/{strategy}/{platform}/{uuid}/{filename}".format(
            project=fileInfo["data"]["hits"][0]["cases"][0]["project"]["project_id"],
            strategy=str(fileInfo["data"]["hits"][0]["experimental_strategy"]),
            platform=str(fileInfo["data"]["hits"][0]["platform"]),
            uuid=str(fileUuid),
            filename=str(fileInfo["data"]["hits"][0]["file_name"])
        )
unitdata.py 文件源码 项目:charm-plumgrid-gateway 作者: openstack 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def unsetrange(self, keys=None, prefix=""):
        """
        Remove a range of keys starting with a common prefix, from the database
        entirely.

        :param list keys: List of keys to remove.
        :param str prefix: Optional prefix to apply to all keys in ``keys``
            before removing.
        """
        if keys is not None:
            keys = ['%s%s' % (prefix, key) for key in keys]
            self.cursor.execute('delete from kv where key in (%s)' % ','.join(['?'] * len(keys)), keys)
            if self.revision and self.cursor.rowcount:
                self.cursor.execute(
                    'insert into kv_revisions values %s' % ','.join(['(?, ?, ?)'] * len(keys)),
                    list(itertools.chain.from_iterable((key, self.revision, json.dumps('DELETED')) for key in keys)))
        else:
            self.cursor.execute('delete from kv where key like ?',
                                ['%s%%' % prefix])
            if self.revision and self.cursor.rowcount:
                self.cursor.execute(
                    'insert into kv_revisions values (?, ?, ?)',
                    ['%s%%' % prefix, self.revision, json.dumps('DELETED')])
json_context.py 文件源码 项目:socialhome 作者: jaywink 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def json_context(context, raise_error=False):
    """Generate a JS <script> tag from context

    Serializes ``context["json_context"]`` into JSON and generates a JS
    script to attach it to ``window.context``

    If ``raise_error`` is False, values in context that are not
    JSON serialisable will be converted to string. Otherwise, it
    raises TypeError

    :param context: Current view context
    :param raise_error: Control whether to raise an error on non-JSON serialisable values
    :return: ``<script>window.context = {<context["json_context"]>}</script>``
    """
    if not context.get("json_context"):
        return ""

    json_default = None if raise_error else lambda obj: str(obj)

    json_dump = json.dumps(context["json_context"], default=json_default)
    return mark_safe("<script>window.context = %s;</script>" % json_dump)
server.py 文件源码 项目:uicourses_v2 作者: sumerinlan 项目源码 文件源码 阅读 73 收藏 0 点赞 0 评论 0
def write_response(self, d):
        # send header
        self.send_response(200)
        self.send_header('Content-type', 'text/json; charset=utf-8')
        self.end_headers()
        log('I', 'conn', 'Header sent.')

        # send data
        log('I', 'conn', 'Sending data...')
        json_str = json.dumps(d)
        # print(json_str.encode('utf-8').decode('unicode-escape').replace('\n', '\\n'))
        self.wfile.write(json_str.replace('"', '\\"').decode('unicode-escape').encode('utf-8').replace('\n', '\\n'))
        log('C', 'conn', 'Data sent.')

        # clean up
        self.wfile.close()
        log('C', 'conn', 'Connection closed.')
Device.py 文件源码 项目:kiota 作者: Morteo 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def connect(self, gateway):

    self.gateway = gateway

    if self.config['id'] is not None:
      self.topic =  gateway.topic + "/" + self.config['id']
    else:
      self.topic =  gateway.topic + "/" + self.__class__.__name__
      if self.config['gpio'] is not None:
        self.topic =  self.topic + "/" + str(self.config["gpio"])

    payload = { 'connected': self.topic }
    self.gateway.publish(gateway.topic, json.dumps(payload))

    self.gateway.subscribe(self.topic+"/configure")
    self.gateway.subscribe(self.topic+"/get")
    self.gateway.subscribe(self.topic+"/set")
generate_ddl.py 文件源码 项目:monasca-transform 作者: openstack 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def generate(self, template_path, source_json_path, output_path):
        print("Generating content at %s with template at %s, using key %s" % (
            output_path, template_path, self.key_name))
        data = []
        with open(source_json_path) as f:
            for line in f:
                json_line = json.loads(line)
                data_line = '(\'%s\',\n\'%s\')' % (
                    json_line[self.key_name], json.dumps(json_line))
                data.append(str(data_line))
        print(data)
        with open(template_path) as f:
            template = f.read()
        with open(output_path, 'w') as write_file:
            write_file.write(template)
            for record in data:
                write_file.write(record)
                write_file.write(',\n')
            write_file.seek(-2, 1)
            write_file.truncate()
            write_file.write(';')
hippo.py 文件源码 项目:Cortex-Analyzers 作者: CERT-BDF 项目源码 文件源码 阅读 41 收藏 0 点赞 0 评论 0
def run(self):
        data = self.getData()

        value = {
            data: {
                "type": self.data_type
            }
        }
        json_data = json.dumps(value)
        post_data = json_data.encode('utf-8')
        headers = {'Content-Type': 'application/json'}

        try:
            request = urllib2.Request('{}/hippocampe/api/v1.0/{}'.format(self.url, self.service), post_data, headers)
            response = urllib2.urlopen(request)
            report = json.loads(response.read())

            self.report(report)
        except urllib2.HTTPError:
            self.error("Hippocampe: " + str(sys.exc_info()[1]))
        except urllib2.URLError:
            self.error("Hippocampe: service is not available")
        except Exception as e:
            self.unexpectedError(e)
test_run_no_updates_available.py 文件源码 项目:pyupdater-wx-demo 作者: wettenhj 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def setUp(self):
        tempFile = tempfile.NamedTemporaryFile()
        self.fileServerDir = tempFile.name
        tempFile.close()
        os.mkdir(self.fileServerDir)
        os.environ['PYUPDATER_FILESERVER_DIR'] = self.fileServerDir
        privateKey = ed25519.SigningKey(PRIVATE_KEY.encode('utf-8'),
                                        encoding='base64')
        signature = privateKey.sign(six.b(json.dumps(VERSIONS, sort_keys=True)),
                                    encoding='base64').decode()
        VERSIONS['signature'] = signature
        keysFilePath = os.path.join(self.fileServerDir, 'keys.gz')
        with gzip.open(keysFilePath, 'wb') as keysFile:
            keysFile.write(json.dumps(KEYS, sort_keys=True))
        versionsFilePath = os.path.join(self.fileServerDir, 'versions.gz')
        with gzip.open(versionsFilePath, 'wb') as versionsFile:
            versionsFile.write(json.dumps(VERSIONS, sort_keys=True))
        os.environ['WXUPDATEDEMO_TESTING'] = 'True'
        from wxupdatedemo.config import CLIENT_CONFIG
        self.clientConfig = CLIENT_CONFIG
        self.clientConfig.PUBLIC_KEY = PUBLIC_KEY
test_run_update_available.py 文件源码 项目:pyupdater-wx-demo 作者: wettenhj 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def setUp(self):
        tempFile = tempfile.NamedTemporaryFile()
        self.fileServerDir = tempFile.name
        tempFile.close()
        os.mkdir(self.fileServerDir)
        os.environ['PYUPDATER_FILESERVER_DIR'] = self.fileServerDir
        privateKey = ed25519.SigningKey(PRIVATE_KEY.encode('utf-8'),
                                        encoding='base64')
        signature = privateKey.sign(six.b(json.dumps(VERSIONS, sort_keys=True)),
                                    encoding='base64').decode()
        VERSIONS['signature'] = signature
        keysFilePath = os.path.join(self.fileServerDir, 'keys.gz')
        with gzip.open(keysFilePath, 'wb') as keysFile:
            keysFile.write(json.dumps(KEYS, sort_keys=True))
        versionsFilePath = os.path.join(self.fileServerDir, 'versions.gz')
        with gzip.open(versionsFilePath, 'wb') as versionsFile:
            versionsFile.write(json.dumps(VERSIONS, sort_keys=True))
        os.environ['WXUPDATEDEMO_TESTING'] = 'True'
        from wxupdatedemo.config import CLIENT_CONFIG
        self.clientConfig = CLIENT_CONFIG
        self.clientConfig.PUBLIC_KEY = PUBLIC_KEY
        self.clientConfig.APP_NAME = APP_NAME
safetypy.py 文件源码 项目:safetyculture-sdk-python 作者: SafetyCulture 项目源码 文件源码 阅读 45 收藏 0 点赞 0 评论 0
def get_audit_actions(self, date_modified, offset=0, page_length=100):
        """
        Get all actions created after a specified date. If the number of actions found is more than 100, this function will
        page until it has collected all actions

        :param date_modified:   ISO formatted date/time string. Only actions created after this date are are returned.
        :param offset:          The index to start retrieving actions from
        :param page_length:     How many actions to fetch for each page of action results
        :return:                Array of action objects
        """
        logger = logging.getLogger('sp_logger')
        actions_url = self.api_url + 'actions/search'
        response = self.authenticated_request_post(
            actions_url,
            data=json.dumps({
                "modified_at": {"from": str(date_modified)},
                "offset": offset,
                "status": [0, 10, 50, 60]
            })
        )
        result = self.parse_json(response.content) if response.status_code == requests.codes.ok else None
        self.log_http_status(response.status_code, 'GET actions')
        if result is None or None in [result.get('count'), result.get('offset'), result.get('total'), result.get('actions')]:
            return None
        return self.get_page_of_actions(logger, date_modified, result, offset, page_length)
tokenizer.py 文件源码 项目:almond-nnparser 作者: Stanford-Mobisocial-IoT-Lab 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def tokenize(self, language_tag, query):
        id = self._next_id
        self._next_id += 1

        req = dict(req=id, utterance=query, languageTag=language_tag)
        outer = Future()
        self._requests[id] = outer

        def then(future):
            if future.exception():
                outer.set_exception(future.exception())
                del self._requests[id]

        future = self._socket.write(json.dumps(req).encode())
        future.add_done_callback(then)
        return outer
reformat.py 文件源码 项目:xr-telemetry-m2m-web 作者: cisco 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def process_reply(reply, nested=False):
    """
    Process a reply so it looks nice:

    - if it's from the prototype yang integration, ditch the 'data' root
    - convert from list to nested format if requested
    - convert quotes to avoid escaping
    """
    try:
        # @@@ strip 'data' from yang output
        reply['result'] = reply['result'][0]['data'] 
    except Exception:
        pass

    # If required, and query successful, convert the reply['result'] format.
    try:
        if nested:
            reply['result'] = reformat(reply['result'])
    except KeyError:
        # Fails silently if there is no 'reply['result']' in the reply['result'], this
        # means an error occurred.
        pass

    # @@@ cheesily try to avoid \" everywhere, at cost of valid json
    return re.sub(r'\\"', "'", json.dumps(reply))
render_explorer.py 文件源码 项目:xr-telemetry-m2m-web 作者: cisco 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def render_POST(self, request):
        paths = request.args['paths[]']

        def got_data(responses):
            reply = {}
            for path, data in zip(paths, responses):
                try:
                    reply[path] = data['result']
                except KeyError:
                    reply[path] = data['error']
            request.sdata.add_to_push_queue('explorer', text=dumps(reply))
            request.sdata.log('got reply {}'.format(paths))

        reqs = map(request.sdata.api.get_schema, paths)
        d = defer.gatherResults(reqs)
        d.addCallback(got_data)

        request.setHeader('Content-Type', 'application/json')
        return '{}'
render_push_queue.py 文件源码 项目:xr-telemetry-m2m-web 作者: cisco 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def render_POST(self, request):
        request.setHeader('Content-Type', 'application/json')
        pq = request.sdata.drain_push_queue()
        if len(pq) > 0:
            return json.dumps(pq)
        else:
            def finish_later(pq):
                try:
                    request.write(json.dumps(pq))
                    request.finish()
                except exceptions.RuntimeError as e:
                    print("### can't send push queue: ", e)
                    request.sdata.restore_push_queue(pq)
            request.sdata.add_pending_push_queue_dispatch(finish_later)
            request.notifyFinish().addErrback(lambda _: request.sdata.remove_pending_push_queue_dispatch())
            return server.NOT_DONE_YET
render_manage_intf.py 文件源码 项目:xr-telemetry-m2m-web 作者: cisco 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def update_json(self, request):
        interface = request.args['interface'][0]
        base_path = "RootCfg.InterfaceConfiguration(" \
                    "['act', '{}'])".format(interface)

        cfg = OrderedDict()
        cfg[base_path + '.Description'] = request.args['description'][0]
        cfg[base_path + '.IPV4Network.Addresses.Primary'] = \
                OrderedDict((('Address', request.args['ipv4_addr'][0]),
                             ('Netmask', request.args['ipv4_mask'][0])))

            #[request.args['ipv4_addr'][0],
            # request.args['ipv4_mask'][0]]

        extra_cli = ["interface {} ".format(interface) + x
                     for x in request.args['extra_cli'][0].split('\n') if len(x) > 0]

        cfg_json = OrderedDict((('sets', cfg), ('cli_sets', extra_cli)))

        request.sdata.set_text('#manage_intf_json', json.dumps(cfg_json, indent=4))
        request.sdata.highlight('#manage_intf_json')
        return base_path, cfg, extra_cli
render_write_file.py 文件源码 项目:xr-telemetry-m2m-web 作者: cisco 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def render_POST(self, request):
        file_path = request.args['file_path'][0]
        data = request.args['file_contents'][0]

        def got_reply(reply):
            request.sdata.add_to_push_queue('write_file',
                                            text=dumps(reply),
                                            filename=file_path)
            request.sdata.log('got reply id {}'.format(reply['id']))

        def got_error(error):
            error_code = error.getErrorMessage()
            traceback = error.getTraceback()
            request.sdata.add_to_push_queue('error',
                                            error=error_code,
                                            traceback=traceback,
                                            tab='write_file')

        d = request.sdata.api.write_file(file_path, data)
        d.addCallback(got_reply)
        d.addErrback(got_error)

        request.setHeader('Content-Type', 'application/json')
        return '{}'
bot.py 文件源码 项目:cerebros_bot 作者: jslim18 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def getESCXBalance(address): 
   try:
      payload = {
         "method": "get_balances",
         "params": {
            "filters":[{"field": "address", "op": "==", "value": address},
                       {"field": "asset", "op": "==", "value": "ESCX"}],
            "filterop": "and"
            },
          "jsonrpc":"2.0",
          "id":0
         }
      response = requests.post(url, data=json.dumps(payload), headers=headers, auth=auth)
      json_data = response.json()
      #quantity = json_data.quantity 
      return (json_data['result'].pop()['quantity']) / 100000000
   except: 
      return 0;
cli.py 文件源码 项目:dsq 作者: baverman 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def dump_schedule(tasks):
    """Dump schedule content"""
    from .utils import load_manager
    manager = load_manager(tasks)

    count = 5000
    offset = 0
    while True:
        items = manager.queue.get_schedule(offset, count)
        if not items:
            break

        for ts, queue, item in items:
            print(datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S'),
                  queue,
                  json.dumps(item, ensure_ascii=False, sort_keys=True),
                  sep='\t')

        offset += count
distro.py 文件源码 项目:python- 作者: secondtonone1 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def main():
    import argparse

    logger = logging.getLogger(__name__)
    logger.setLevel(logging.DEBUG)
    logger.addHandler(logging.StreamHandler(sys.stdout))

    parser = argparse.ArgumentParser(description="Linux distro info tool")
    parser.add_argument(
        '--json',
        '-j',
        help="Output in machine readable format",
        action="store_true")
    args = parser.parse_args()

    if args.json:
        logger.info(json.dumps(info(), indent=4, sort_keys=True))
    else:
        logger.info('Name: %s', name(pretty=True))
        distribution_version = version(pretty=True)
        if distribution_version:
            logger.info('Version: %s', distribution_version)
        distribution_codename = codename()
        if distribution_codename:
            logger.info('Codename: %s', distribution_codename)
__init__.py 文件源码 项目:python- 作者: secondtonone1 项目源码 文件源码 阅读 39 收藏 0 点赞 0 评论 0
def sign(payload, keypair):
    """Return a JWS-JS format signature given a JSON-serializable payload and 
    an Ed25519 keypair."""
    get_ed25519ll()
    #
    header = {
                "alg": ALG,
                "jwk": {
                    "kty": ALG, # alg -> kty in jwk-08.
                    "vk": native(urlsafe_b64encode(keypair.vk))
                }
             }

    encoded_header = urlsafe_b64encode(binary(json.dumps(header, sort_keys=True)))
    encoded_payload = urlsafe_b64encode(binary(json.dumps(payload, sort_keys=True)))
    secured_input = b".".join((encoded_header, encoded_payload))
    sig_msg = ed25519ll.crypto_sign(secured_input, keypair.sk)
    signature = sig_msg[:ed25519ll.SIGNATUREBYTES]
    encoded_signature = urlsafe_b64encode(signature)

    return {"recipients": 
            [{"header":native(encoded_header),
             "signature":native(encoded_signature)}],
            "payload": native(encoded_payload)}
sensor_backlog_individual.py 文件源码 项目:cbapi-examples 作者: cbcommunity 项目源码 文件源码 阅读 45 收藏 0 点赞 0 评论 0
def query_forever(cb, interval, udp):

    while True:

        try:
            sensors = cb.sensors()
            for sensor in sensors:

                summary = {}
                summary['computer_name'] = sensor['computer_name'].strip()
                summary['id'] = sensor['id']
                summary['computer_sid'] = sensor['computer_sid'].strip()
                summary['num_storefiles_bytes'] = sensor['num_storefiles_bytes']
                summary['num_eventlog_bytes'] = sensor['num_eventlog_bytes']

                output(json.dumps(summary), udp)
        except Exception, e:
            print e
            pass 

        time.sleep(float(interval))

    return
grpc_example.py 文件源码 项目:ios-xr-grpc-python 作者: cisco-grpc-connection-libs 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def main():
    '''
    To not use tls we need to do 2 things.
    1. Comment the variables creds and options out
    2. Remove creds and options CiscoGRPCClient
    ex: client = CiscoGRPCClient('11.1.1.10', 57777, 10, 'vagrant', 'vagrant')
    '''
    creds = open('ems.pem').read()
    options = 'ems.cisco.com'
    client = CiscoGRPCClient('127.0.0.1', 57777, 10, 'vagrant', 'vagrant', creds, options)
    #Test 1: Test Get config json requests
    path = '{"Cisco-IOS-XR-ip-static-cfg:router-static": [null]}'
    try:
        err, result = client.getconfig(path)
        if err:
            print(err)
        print(json.dumps(json.loads(result)))
    except AbortionError:
        print(
            'Unable to connect to local box, check your gRPC destination.'
            )
nanobot.py 文件源码 项目:nanobot 作者: bgporter 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def on_success(self, data):
      ''' Called when we detect an event through the streaming API. 
         The base class version looks for quoted tweets and for each one it 
         finds, we write out a text file that contains the ID of the tweet 
         that mentions us.

         The other (cron-job) version of your bot will look for any files with the 
         correct extension (identified by `kStreamFileExtension`) in its 
         HandleQuotes() method and favorite^H^H^H^H like those tweets.

         See https://dev.twitter.com/streaming/userstreams
      '''
      # for now, all we're interested in handling are events. 
      if 'event' in data:
         # Dump the data into a JSON file for the other cron-process to 
         # handle the next time it wakes up.
         fileName = os.path.join(self.path, "{0}{1}".format(
            uuid4().hex, kStreamFileExtension))
         with open(fileName, "wt") as f:
            f.write(json.dumps(data).encode("utf-8"))
vclustermgr.py 文件源码 项目:docklet 作者: unias 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def add_port_mapping(self,username,clustername,node_name,node_ip,port,quota):
        port_mapping_count = self.count_port_mapping(username)

        if port_mapping_count >= int(quota['portmapping']):
            return [False, 'Port mapping quota exceed.']

        [status, clusterinfo] = self.get_clusterinfo(clustername, username)
        host_port = 0
        if self.distributedgw == 'True':
            worker = self.nodemgr.ip_to_rpc(clusterinfo['proxy_server_ip'])
            [success, host_port] = worker.acquire_port_mapping(node_name, node_ip, port)
        else:
            [success, host_port] = portcontrol.acquire_port_mapping(node_name, node_ip, port)
        if not success:
            return [False, host_port]
        if 'port_mapping' not in clusterinfo.keys():
            clusterinfo['port_mapping'] = []
        clusterinfo['port_mapping'].append({'node_name':node_name, 'node_ip':node_ip, 'node_port':port, 'host_port':host_port})
        clusterfile = open(self.fspath + "/global/users/" + username + "/clusters/" + clustername, 'w')
        clusterfile.write(json.dumps(clusterinfo))
        clusterfile.close()
        return [True, clusterinfo]


问题


面经


文章

微信
公众号

扫码关注公众号