def frames2video(path):
"""
Merges images in path into a video
:param path: path with prediction images
:return: nothing
"""
fnames = os.listdir(path)
fnames.sort()
images = np.array([plt.imread(os.path.join(path, fname)) for fname in fnames])
# h, w, c = images[0].shape
videowriter = imageio.get_writer('prediction_video.mp4', fps=25)
for im in images:
videowriter.append_data(im)
videowriter.close()
python类get_writer()的实例源码
def frames2video(name, path):
"""
Merges images in path into a video
:param path: path with prediction images
:return:
"""
batch_size = 100
fnames = os.listdir(path)
fnames.sort()
#images = np.array([plt.imread(os.path.join(path, fname)) for fname in fnames])
# h, w, c = images[0].shape
videowriter = imageio.get_writer(name + '_video.mp4', fps=25)
for fname in tqdm.tqdm(fnames):
videowriter.append_data(plt.imread(os.path.join(path, fname)))
videowriter.close()
def __init__(self, d, save_dir='report'):
image_dir = os.path.join(save_dir, 'images')
if not os.path.exists(image_dir):
os.makedirs(image_dir)
self.d = d
self.save_dir = save_dir
self.steps = []
self.result = None
self.__gif_path = os.path.join(save_dir, 'output.gif')
self.__gif = imageio.get_writer(self.__gif_path, format='GIF', fps=2)
self.__uia_last_position = None
self.__last_screenshot = None
self.__closed = False
self.start_record()
def make_gif(self, frame_count_limit=IMAGE_LIMIT, gif_name="mygif.gif", frame_duration=0.4):
"""Make a GIF visualization of view graph."""
self.make_thumbnails(frame_count_limit=frame_count_limit)
file_names = sorted([file_name for file_name in os.listdir(self.thumbnail_path)
if file_name.endswith('thumbnail.png')])
images = []
for file_name in file_names:
images.append(Image.open(self.thumbnail_path + file_name))
destination_filename = self.graph_path + gif_name
iterator = 0
with io.get_writer(destination_filename, mode='I', duration=frame_duration) as writer:
for file_name in file_names:
image = io.imread(self.thumbnail_path + file_name)
writer.append_data(image)
iterator += 1
writer.close()
def main(input_path, output_path, size, input_type, output_type):
output_size = (size[0] * 3, size[1] * 2)
reader = imageio.get_reader(input_path)
metadata = reader.get_meta_data()
projector = get_projector(input_type, output_type)
with projector(output_size) as renderer:
writer_args = {}
frames = 1
if 'fps' in metadata:
# Handle videos
writer_args['fps'] = metadata['fps']
frames = metadata['nframes']
with imageio.get_writer(output_path, **writer_args) as writer:
if frames > 1:
render_many(renderer, reader, writer, frames)
else:
render_single(renderer, reader, writer)
def make_mp4(ims, name="", fps=20):
print("Making mp4...")
with imageio.get_writer("{}.mp4".format(name), mode='I', fps=fps) as writer:
for im in ims:
writer.append_data(bgr2rgb(im))
print("Done")
def make_mp4(ims, name="", fps=20):
print("Making mp4...")
with imageio.get_writer("{}.mp4".format(name), mode='I', fps=fps) as writer:
for im in ims:
writer.append_data(bgr2rgb(im))
print("Done")
def make_mp4(ims, name="", fps=20, scale=1):
print("Making mp4...")
with imageio.get_writer("{}.mp4".format(name), mode='I', fps=fps) as writer:
for im in ims:
if scale != 1:
new_shape = (int(im.shape[1] * scale), int(im.shape[0] * scale))
interpolation = cv2.INTER_CUBIC if scale > 1 else cv2.INTER_AREA
im = cv2.resize(im, new_shape, interpolation=interpolation)
writer.append_data(im[..., ::-1])
print("Done")
def make_gif(parent_folder,frame_duration=0.3):
items = os.listdir(parent_folder)
png_filenames = []
for elem in items:
if elem.find(".png")!=-1 and elem.find("heatmap")!=-1:
png_filenames.append(elem)
sorted_png = []
while True:
lowest = 10000000
lowest_idx = -1
for p in png_filenames:
old_save_format=False
if old_save_format:
iter_val = int(p.split("-")[2].split(":")[1])
epoch_val = int(p.split("-")[3].split(":")[1].split(".")[0])
val = float(iter_val)+0.1*epoch_val
else:
iter_val = int(p.split("-")[3].split(":")[1].split(".")[0])
epoch_val = int(p.split("-")[2].split(":")[1])
val = float(epoch_val)+0.1*iter_val
if lowest_idx==-1 or val<lowest:
lowest = val
lowest_idx = png_filenames.index(p)
sorted_png.append(png_filenames[lowest_idx])
del png_filenames[lowest_idx]
if len(png_filenames)==0: break
png_filenames = sorted_png
with imageio.get_writer(parent_folder+"/prediction-heatmap.gif", mode='I',duration=frame_duration) as writer:
for filename in png_filenames:
image = imageio.imread(parent_folder+"/"+filename)
writer.append_data(image)
def insert_in_frame(file_path, conf):
t = 1
imlist = create_video_pixdistrib_gif(file_path, conf, t, suffix='_t{}'.format(t), n_exp=6, suppress_number=True, makegif= False)
frame = Image.open(file_path + '/frame.png', mode='r')
writer = imageio.get_writer(file_path + '/genpix_withframe.mp4', fps=3)
pic_path = file_path + "/animated"
if not os.path.exists(pic_path):
os.mkdir(pic_path)
import copy
for i, img in enumerate(imlist):
origsize = img.shape
img = Image.fromarray(img)
img = img.resize((origsize[1]*2, origsize[0]*2), Image.ANTIALIAS)
img = np.asarray(img)
size_insert = img.shape
newimg = copy.deepcopy(np.asarray(frame)[:,:,:3])
if 'ndesig' in conf:
startr = 350
else:
startr = 380
startc = 295
newimg[startr :startr + size_insert[0],startc: startc + size_insert[1]] = img
# Image.fromarray(newimg).show()
writer.append_data(newimg)
Image.fromarray(newimg).save(pic_path + '/img{}.png'.format(i))
writer.close()
def put_genpix_in_frame():
file = '/home/guser/catkin_ws/src/lsdc/tensorflow_data/sawyer/dna_correct_nummask/vid_rndaction_var10_66002_diffmotions_b0_l30.gif'
frames_dna = getFrames(file)
file = '/home/guser/catkin_ws/src/lsdc/tensorflow_data/sawyer/1stimg_bckgd_cdna/vid_rndaction_var10_64002_diffmotions_b0_l30.gif'
frames_cdna = getFrames(file)
t = 1
dest_path = '/home/guser/frederik/doc_video'
frame = Image.open(dest_path + '/frame_comp_oadna.png', mode='r')
writer = imageio.get_writer(dest_path + '/genpix_withframe.mp4', fps=3)
pic_path = dest_path + "/animated"
if not os.path.exists(pic_path):
os.mkdir(pic_path)
for i, img_dna, img_cdna in zip(range(len(frames_dna)), frames_dna, frames_cdna):
newimg = copy.deepcopy(np.asarray(frame)[:, :, :3])
img_dna, size_insert = resize(img_dna)
# Image.fromarray(img_dna)
startr = 230
startc = 650
newimg[startr:startr + size_insert[0], startc: startc + size_insert[1]] = img_dna
img_cdna, size_insert = resize(img_cdna)
# Image.fromarray(img_cdna)
startr = 540
startc = 650
newimg[startr:startr + size_insert[0], startc: startc + size_insert[1]] = img_cdna
writer.append_data(newimg)
Image.fromarray(newimg).save(pic_path + '/img{}.png'.format(i))
writer.close()
def save_highres(self):
# clip = mpy.ImageSequenceClip(self.highres_imglist, fps=10)
# clip.write_gif(self.image_folder + '/highres_traj{}.mp4'.format(self.itr))
writer = imageio.get_writer(self.image_folder + '/highres_traj{}.mp4'.format(self.itr), fps=10)
print 'shape highes:', self.highres_imglist[0].shape
for im in self.highres_imglist:
writer.append_data(im)
writer.close()
def save_video(queue, filename, fps):
writer = imageio.get_writer(filename, fps=fps)
while True:
frame = queue.get()
if frame is None:
break
writer.append_data(frame)
writer.close()
def draw(img):
width = img.size[0]
height = img.size[1]
imgs = []
filename = "imagesTest/movie_"+str(randint(1000000,9999999))+".gif"
val = randint(150,192)
colour=(val,int(val*7/8),int(val*5/6),255)
pen = (0,0,0,255)
# with imageio.get_writer(filename, mode='I') as writer:
P = []
V = []
delta = 2
for i in xrange(0,150):
P.append((randint(0,width-1),randint(0,height-1),randint(32,192)))
V.append((randint(-1,1)*randint(delta>1,delta),(randint(-1,1)*randint(delta>1,delta)),(randint(-1,1)*randint(delta>1,delta))))
P.append(P[0])
P.append(P[1])
V.append(V[0])
V.append(V[1])
for i in xrange(1,1200):
imgNew = img.copy() #Image.new("RGBA",size=(img.size[0],img.size[1]),color=colour)
# print(P)
testLineAnimation.draw(imgNew,P,pen)
# writer.append_data(imgNew)
imgs.append(array(imgNew.getdata()).reshape(imgNew.size[0], imgNew.size[1], 4))
Q = []
for j in xrange(0,len(P)):
(x,y,z) = P[j]
(vx,vy,vz) = V[j]
nx = vx+x
ny = vy+y
nz = vz+z
Q.append((nx,ny,nz))
P = Q
imageio.mimsave(filename, imgs)
def OpenGif(self, filename):
try:
import imageio
except BaseException:
raise Exception('To use this feature, install imageio')
if filename[-3:] != 'gif':
raise Exception('Unsupported filetype')
self.mwriter = imageio.get_writer(filename, mode='I')
def execute(self, input_data, input_directory, output_directory):
if not input_data['isvideo']:
return {}
# Open output video stream if this is first frame.
if input_data['frame'] == 0:
# The output directory structure should match input directory structure.
relpath_of_input_file = os.path.relpath(input_data['file'], input_directory)
relparent_of_input_file = os.path.dirname(relpath_of_input_file)
inp_filename,inp_extension = os.path.splitext(os.path.basename(relpath_of_input_file))
output_filedir = os.path.join(output_directory, relparent_of_input_file)
if not os.path.exists(output_filedir):
os.makedirs(output_filedir)
self.output_filepath = os.path.join(output_filedir,
inp_filename + '-annotated.' + self.cfg['params']['format'])
self.output_video = imageio.get_writer(self.output_filepath, 'ffmpeg')
img = input_data['img'].copy()
for comp in self.cfg['inputs']:
comp_outputs = input_data.get(comp)
comp_reports = comp_outputs['reports']
if not comp_reports:
print("Warning: pipeline file specifies {} as input for {} but {} is not outputting any location reports".format(
comp, self.name, comp
))
continue
annotate(img, comp_reports)
final_img = cv2.resize(img, (self.cfg['params']['size']['width'], self.cfg['params']['size']['height']))
self.output_video.append_data(final_img)
return {'file': self.output_filepath}
def log2gif(log,filename,title):
writer = imageio.get_writer(filename, fps=30)
for x in log:
writer.append_data(draw_pendulum(x[0],title))
writer.close()
def play(self, mode="random"):
init = tf.global_variables_initializer()
with tf.Session() as sess:
sess.run(init)
for i in range(1):
writer = imageio.get_writer('gif/demo.gif', mode='I')
game_state = game.GameState()
total_steps = 0
img_batch = []
action = np.zeros([2])
action[0] = 1
new_state, reward, done = game_state.frame_step(action)
temp_img = self.pre_process(new_state)
for j in range(4):
img_batch.insert(len(img_batch), temp_img)
for j in range(self.max_steps):
if(mode=="random"):
temp_action = random.randint(0,1)
else :
temp_weights = sess.run([self.main_net.q_values], feed_dict={self.main_net.input_state:np.reshape(np.stack(img_batch,axis=2),[-1, 80, 80, 4])})
temp_action = np.argmax(temp_weights)
print(temp_weights)
action = np.zeros([2])
action[temp_action] = 1
new_state, reward, done = game_state.frame_step(action)
temp_new_state = np.flip(np.rot90(new_state, k=1, axes=(1,0)), 1)
temp_img = self.pre_process(new_state)
img_batch.insert(0, temp_img)
img_batch.pop(len(img_batch)-1)
print(temp_action)
total_steps += 1
if done:
break
print("Total Steps ", str(total_steps))
sys.exit()