def _render(self, mode='human', close=False):
if close:
return
outfile = StringIO.StringIO() if mode == 'ansi' else sys.stdout
row, col = self.s // self.ncol, self.s % self.ncol
desc = self.desc.tolist()
desc[row][col] = utils.colorize(desc[row][col], "red", highlight=True)
outfile.write("\n".join("".join(row) for row in desc)+"\n")
if self.lastaction is not None:
outfile.write(" ({})\n".format(self.get_action_meanings()[self.lastaction]))
else:
outfile.write("\n")
return outfile
python类StringIO()的实例源码
def _render(self, mode='human', close=False):
if close:
return
outfile = StringIO.StringIO() if mode == 'ansi' else sys.stdout
row, col = self.s // self.ncol, self.s % self.ncol
desc = self.desc.tolist()
desc[row][col] = utils.colorize(desc[row][col], "red", highlight=True)
outfile.write("\n".join("".join(row) for row in desc)+"\n")
if self.lastaction is not None:
outfile.write(" ({})\n".format(self.get_action_meanings()[self.lastaction]))
else:
outfile.write("\n")
return outfile
def testRun(self):
db = DAL(DEFAULT_URI, check_reserved=['all'])
db.define_table('person', Field('name'))
db.define_table('pet',Field('friend',db.person),Field('name'))
for n in range(2):
db(db.pet).delete()
db(db.person).delete()
for k in range(10):
id = db.person.insert(name=str(k))
db.pet.insert(friend=id,name=str(k))
db.commit()
stream = StringIO.StringIO()
db.export_to_csv_file(stream)
db(db.pet).delete()
db(db.person).delete()
stream = StringIO.StringIO(stream.getvalue())
db.import_from_csv_file(stream)
assert db(db.person).count()==10
assert db(db.pet.name).count()==10
drop(db.pet)
drop(db.person)
db.commit()
def testRun(self):
db = DAL(DEFAULT_URI, check_reserved=['all'])
db.define_table('person', Field('name'),Field('uuid'))
db.define_table('pet',Field('friend',db.person),Field('name'))
for n in range(2):
db(db.pet).delete()
db(db.person).delete()
for k in range(10):
id = db.person.insert(name=str(k),uuid=str(k))
db.pet.insert(friend=id,name=str(k))
db.commit()
stream = StringIO.StringIO()
db.export_to_csv_file(stream)
db(db.person).delete()
db(db.pet).delete()
stream = StringIO.StringIO(stream.getvalue())
db.import_from_csv_file(stream)
assert db(db.person).count()==10
assert db(db.pet).count()==10
drop(db.pet)
drop(db.person)
db.commit()
def testRun(self):
db = DAL(DEFAULT_URI, check_reserved=['all'])
db.define_table('person', Field('name'))
db.define_table('pet',Field('friend',db.person),Field('name'))
for n in range(2):
db(db.pet).delete()
db(db.person).delete()
for k in range(10):
id = db.person.insert(name=str(k))
db.pet.insert(friend=id,name=str(k))
db.commit()
stream = StringIO.StringIO()
db.export_to_csv_file(stream)
db(db.pet).delete()
db(db.person).delete()
stream = StringIO.StringIO(stream.getvalue())
db.import_from_csv_file(stream)
assert db(db.person.id==db.pet.friend)(db.person.name==db.pet.name).count()==10
db.pet.drop()
db.person.drop()
db.commit()
def testRun(self):
db = DAL(DEFAULT_URI, check_reserved=['all'])
db.define_table('person', Field('name'),Field('uuid'))
db.define_table('pet',Field('friend',db.person),Field('name'))
for n in range(2):
db(db.pet).delete()
db(db.person).delete()
for k in range(10):
id = db.person.insert(name=str(k),uuid=str(k))
db.pet.insert(friend=id,name=str(k))
db.commit()
stream = StringIO.StringIO()
db.export_to_csv_file(stream)
stream = StringIO.StringIO(stream.getvalue())
db.import_from_csv_file(stream)
assert db(db.person).count()==10
assert db(db.person.id==db.pet.friend)(db.person.name==db.pet.name).count()==20
db.pet.drop()
db.person.drop()
db.commit()
def __init__(self):
self.body = StringIO.StringIO()
def __init__(self):
self.body = StringIO.StringIO()
def retrieve(self, name, path=None, nameonly=False):
"""
if nameonly==True return (filename, fullfilename) instead of
(filename, stream)
"""
self_uploadfield = self.uploadfield
if self.custom_retrieve:
return self.custom_retrieve(name, path)
import http
if self.authorize or isinstance(self_uploadfield, str):
row = self.db(self == name).select().first()
if not row:
raise http.HTTP(404)
if self.authorize and not self.authorize(row):
raise http.HTTP(403)
m = REGEX_UPLOAD_PATTERN.match(name)
if not m or not self.isattachment:
raise TypeError('Can\'t retrieve %s' % name)
file_properties = self.retrieve_file_properties(name,path)
filename = file_properties['filename']
if isinstance(self_uploadfield, str): # ## if file is in DB
stream = StringIO.StringIO(row[self_uploadfield] or '')
elif isinstance(self_uploadfield,Field):
blob_uploadfield_name = self_uploadfield.uploadfield
query = self_uploadfield == name
data = self_uploadfield.table(query)[blob_uploadfield_name]
stream = StringIO.StringIO(data)
elif self.uploadfs:
# ## if file is on pyfilesystem
stream = self.uploadfs.open(name, 'rb')
else:
# ## if file is on regular filesystem
# this is intentially a sting with filename and not a stream
# this propagates and allows stream_file_or_304_or_206 to be called
fullname = pjoin(file_properties['path'],name)
if nameonly:
return (filename, fullname)
stream = open(fullname,'rb')
return (filename, stream)
def __str__(self):
"""
serializes the table into a csv file
"""
s = StringIO.StringIO()
self.export_to_csv_file(s)
return s.getvalue()
def __init__(self):
self.body = StringIO.StringIO()
def getText(self, program_name, interval):
"""
:type interval: Interval.Interval
:param program_name:
:param interval:
:return:
"""
rewrites = self.programs.get(program_name)
start = interval.start
stop = interval.stop
# ensure start/end are in range
if stop > len(self.tokens.tokens) - 1: stop = len(self.tokens.tokens) - 1
if start < 0: start = 0
# if no instructions to execute
if not rewrites: return self.tokens.getText(interval)
buf = StringIO()
indexToOp = self._reduceToSingleOperationPerIndex(rewrites)
i = start
while all((i <= stop, i < len(self.tokens.tokens))):
op = indexToOp.get(i)
token = self.tokens.get(i)
if op is None:
if token.type != Token.EOF: buf.write(token.text)
i += 1
else:
i = op.execute(buf)
if stop == len(self.tokens.tokens)-1:
for op in indexToOp.values():
if op.index >= len(self.tokens.tokens)-1: buf.write(op.text)
return buf.getvalue()
def execute(self, buf):
"""
:type buf: StringIO.StringIO
:param buf:
:return:
"""
return self.index
def __init__(self):
self.body = StringIO.StringIO()
def compress_results(res):
out = StringIO.StringIO()
file_content = json.dumps(res)
with gzip.GzipFile(fileobj=out, mode="w") as f:
f.write(file_content)
return out.getvalue()
def __init__(self):
self.body = StringIO.StringIO()
def __init__(self, f, dialect=csv.excel, encoding="utf-8", **kwds):
self.queue = StringIO.StringIO()
self.writer = csv.writer(self.queue, dialect=dialect, **kwds)
self.stream = f
self.encoder = codecs.getincrementalencoder(encoding)()
def __init__(self):
self.body = StringIO.StringIO()
def retrieve(self, name, path=None, nameonly=False):
"""
If `nameonly==True` return (filename, fullfilename) instead of
(filename, stream)
"""
self_uploadfield = self.uploadfield
if self.custom_retrieve:
return self.custom_retrieve(name, path)
import http
if self.authorize or isinstance(self_uploadfield, str):
row = self.db(self == name).select().first()
if not row:
raise http.HTTP(404)
if self.authorize and not self.authorize(row):
raise http.HTTP(403)
file_properties = self.retrieve_file_properties(name, path)
filename = file_properties['filename']
if isinstance(self_uploadfield, str): # ## if file is in DB
stream = StringIO.StringIO(row[self_uploadfield] or '')
elif isinstance(self_uploadfield, Field):
blob_uploadfield_name = self_uploadfield.uploadfield
query = self_uploadfield == name
data = self_uploadfield.table(query)[blob_uploadfield_name]
stream = StringIO.StringIO(data)
elif self.uploadfs:
# ## if file is on pyfilesystem
stream = self.uploadfs.open(name, 'rb')
else:
# ## if file is on regular filesystem
# this is intentially a sting with filename and not a stream
# this propagates and allows stream_file_or_304_or_206 to be called
fullname = pjoin(file_properties['path'], name)
if nameonly:
return (filename, fullname)
stream = open(fullname, 'rb')
return (filename, stream)
def __str__(self):
"""
Serializes the table into a csv file
"""
s = StringIO.StringIO()
self.export_to_csv_file(s)
return s.getvalue()
def detect_face_task(img):
"""Detect faces from image
@input: image
@output:
- all faces information
"""
# paramter for detect
# image_size = 160
# margin = 44
minsize = 20 # minimum size of face
threshold = [0.6, 0.7, 0.7] # three steps's threshold
factor = 0.709 # scale factor
# caffe model
pnet = caffe_model.get_pnet()
rnet = caffe_model.get_rnet()
onet = caffe_model.get_onet()
bounding_boxes, _ = detect_face.detect_face(img, minsize, pnet, rnet, onet, threshold, factor)
print('detect bounding: ', bounding_boxes)
print('Find faces: ', bounding_boxes.shape[0])
# all_faces is faces information list, include face bytes, face position
all_faces = []
for face_position in bounding_boxes:
face_position = face_position.astype(int)
print('face position: ', face_position)
# each face information, include position, face image
head_rect = face_position[:4].tolist() # numpy array to python list
head_img = misc.toimage(img).crop(head_rect)
head_img_io = StringIO.StringIO()
head_img.save(head_img_io, format='JPEG')
head_img_b64 = base64.b64encode(head_img_io.getvalue())
# construct response
face_info = {}
face_info['rect'] = head_rect
face_info['image'] = head_img_b64
all_faces.append(face_info)
return all_faces
utils.py 文件源码
项目:Semantic-Segmentation-using-Adversarial-Networks
作者: oyam
项目源码
文件源码
阅读 19
收藏 0
点赞 0
评论 0
def draw_label(label, img, n_class, label_titles, bg_label=0):
"""Convert label to rgb with label titles.
@param label_title: label title for each labels.
@type label_title: dict
"""
from PIL import Image
from scipy.misc import fromimage
from skimage.color import label2rgb
from skimage.transform import resize
colors = labelcolormap(n_class)
label_viz = label2rgb(label, img, colors=colors[1:], bg_label=bg_label)
# label 0 color: (0, 0, 0, 0) -> (0, 0, 0, 255)
label_viz[label == 0] = 0
# plot label titles on image using matplotlib
plt.subplots_adjust(left=0, right=1, top=1, bottom=0,
wspace=0, hspace=0)
plt.margins(0, 0)
plt.gca().xaxis.set_major_locator(plt.NullLocator())
plt.gca().yaxis.set_major_locator(plt.NullLocator())
plt.axis('off')
# plot image
plt.imshow(label_viz)
# plot legend
plt_handlers = []
plt_titles = []
for label_value in np.unique(label):
if label_value not in label_titles:
continue
fc = colors[label_value]
p = plt.Rectangle((0, 0), 1, 1, fc=fc)
plt_handlers.append(p)
plt_titles.append(label_titles[label_value])
plt.legend(plt_handlers, plt_titles, loc='lower right', framealpha=0.5)
# convert plotted figure to np.ndarray
f = StringIO.StringIO()
plt.savefig(f, bbox_inches='tight', pad_inches=0)
result_img_pil = Image.open(f)
result_img = fromimage(result_img_pil, mode='RGB')
result_img = resize(result_img, img.shape, preserve_range=True)
result_img = result_img.astype(img.dtype)
return result_img