def _adjust_kwargs(cls, **kwargs):
# Factory Boy special method used to alter custom settings dictionaries.
# Make a copy of settings and override transport to use base transport.
kwargs['data'] = dict(
kwargs['data'],
transport={
'path': 'pysoa.common.transport.base:ServerTransport',
},
)
return kwargs
python类Factory()的实例源码
def get_data_archive_servers(self):
# type: () -> osisoftpy.structures.TypedList[DataArchive]
"""
:return:
"""
log.debug('Retrieving all PI Data Archive servers from %s', self.url)
r = self.session.get(self.url + '/dataservers')
if r.status_code == requests.codes.ok:
data = r.json()
if len(data['Items']) > 0:
log.debug('HTTP %s - Instantiating OSIsoftPy.DataArchives()',
r.status_code)
factory = Factory(DataArchive)
servers = TypedList(validtypes=DataArchive)
log.debug('Staging %s PI server(s) for instantiation...',
get_count(data['Items']))
for i in data['Items']:
try:
log.debug('Instantiating "%s" as '
'OSIsoftPy.DataArchive...', i['Name'])
server = factory.create(name=i['Name'],
serverversion=i[
'ServerVersion'],
webid=i['WebId'],
isconnected=i['IsConnected'],
id=i['Id'])
servers.append(server)
except OSIsoftPyException as e:
log.error('Unable to retrieve server info for '
'"%s"', i['Name'], exc_info=True)
log.debug('PI Data Archive server retrieval success! %s PI '
'server(s) were '
'found and instantiated.', get_count(servers))
return servers
r.raise_for_status()
def get_points(self, query, count=10, scope='*'):
# type: (str, int, str) -> osisoftpy.structures.TypedList[Point]
"""
:param query:
:param count:
:param scope:
:return:
"""
payload = {'q': query, 'count': count, 'scope': scope}
log.debug(
'Executing Query against PI Web WebAPI Indexed Search with '
'the following parameters: Query: "%s", Count: "%s". Payload: %s',
query, count, payload)
r = self.session.get(self.url + '/search/query', params=payload)
if r.status_code != requests.codes.ok:
r.raise_for_status()
else:
data = r.json()
log.debug('HTTP %s - Instantiating %s PI points', r.status_code,
get_count(data.get('Items', None)))
factory = Factory(Point)
items = list(map(lambda x: create(factory, x),
data.get('Items', None)))
points = TypedList(Point)
for point in items:
points.append(point)
log.debug('PI Point retrieval success! %s PI '
'point(s) were '
'found and instantiated.', get_count(points))
if len(data['Errors']) != 0:
for error in data['Errors']:
try:
log.warning('The PI Web WebAPI returned the '
'following error while instantiating '
'PI points. '
'ErrorCode: {0}, Source: {1}, '
'Message {2}'.format(
error['ErrorCode'], error['Source'],
error['Message']))
except Exception as e:
log.error('Exception encounted while '
'instantiating '
'PI points!', exc_info=True)
return points