def __get_video_properties(self):
self.frame_dims = (int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT)),
int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH)))
self.frame_count = int(self.cap.get(cv2.CAP_PROP_FRAME_COUNT))
self.fps = self.cap.get(cv2.CAP_PROP_FPS)
if self.cap.get(cv2.CAP_PROP_MONOCHROME) == 0.0:
self.n_channels = 3
else:
self.n_channels = 1
self.frame = np.zeros((self.frame_dims[0], self.frame_dims[1], self.n_channels), np.uint8)
self.previous_frame = np.zeros((self.frame_dims[0], self.frame_dims[1], self.n_channels), np.uint8)
python类CAP_PROP_FRAME_COUNT的实例源码
def skip_from_launch(cap, time):
"""
Move the capture to T+time (time can be negative) and returns the frame index.
:param cap: OpenCV capture
:param time: delta time from launch to skip to
:return: index of requested frame
"""
number_of_frames = int(cap.get(cv2.CAP_PROP_FPS) * time) + skip_to_launch(cap)
number_of_frames = max(number_of_frames, 0)
number_of_frames = min(number_of_frames, cap.get(cv2.CAP_PROP_FRAME_COUNT))
cap.set(cv2.CAP_PROP_POS_FRAMES, number_of_frames)
return number_of_frames
def showVideoInfo(video_path):
try:
vhandle = cv2.VideoCapture(video_path) # For read Chinease-name video
fps = vhandle.get(cv2.CAP_PROP_FPS)
count = vhandle.get(cv2.CAP_PROP_FRAME_COUNT)
size = (int(vhandle.get(cv2.CAP_PROP_FRAME_WIDTH)),
int(vhandle.get(cv2.CAP_PROP_FRAME_HEIGHT)))
ret, firstframe = vhandle.read()
if ret:
print("FPS: %.2f" % fps)
print("COUNT: %.2f" % count)
print("WIDTH: %d" % size[0])
print("HEIGHT: %d" % size[1])
return vhandle, fps, size, firstframe
else:
print("Video can not read!")
except:
"Error in showVideoInfo"
def split_video(video):
vidcap = cv2.VideoCapture(video)
total_frame = int(vidcap.get(cv2.CAP_PROP_FRAME_COUNT))
fps = int(vidcap.get(cv2.CAP_PROP_FPS))
for index in range(0, TOTAL_IMAGE):
if index == 0:
frame_no = fps * 2 - 1 # The frame in 2nd second
else:
frame_no = (total_frame / TOTAL_IMAGE) * index - 1
vidcap.set(cv2.CAP_PROP_POS_FRAMES, frame_no)
success, image = vidcap.read()
cv2.imwrite("frame%d.jpg" % index, image)
def video3d(self, filename, color=False, skip=True):
cap = cv2.VideoCapture(filename)
nframe = cap.get(cv2.CAP_PROP_FRAME_COUNT)
if skip:
frames = [x * nframe / self.depth for x in range(self.depth)]
else:
frames = [x for x in range(self.depth)]
framearray = []
for i in range(self.depth):
cap.set(cv2.CAP_PROP_POS_FRAMES, frames[i])
ret, frame = cap.read()
frame = cv2.resize(frame, (self.height, self.width))
if color:
framearray.append(frame)
else:
framearray.append(cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY))
cap.release()
return np.array(framearray)
def createTrainingData(filename,time_start,time_stop):
vidcap = cv2.VideoCapture(filename)
try:
os.makedirs("trainingdata_"+filename)
except OSError:
pass
os.chdir("trainingdata_"+filename)
length = int(vidcap.get(cv2.CAP_PROP_FRAME_COUNT))
fps = int(vidcap.get(cv2.CAP_PROP_FPS))
for time in range(time_start,time_stop):
vidcap.set(cv2.CAP_PROP_POS_MSEC,time*1000)
success,image = vidcap.read()
image = cv2.medianBlur(image,7)
resized = imutils.resize(image, width=800)
p1 = resized[370:430,220:300]
p2 = resized[370:430,520:600]
p1 = cv2.Canny(p1, 400, 100, 255)
p2 = cv2.Canny(p2, 400, 100, 255)
cv2.imwrite('p1_'+str(time)+".png",p1)
cv2.imwrite('p2_'+str(time)+".png",p2)
os.chdir("..")
def __init__(self, videoPath, ratio, reprojThresh):
self.videoPath = videoPath
self.vidcap = cv2.VideoCapture(videoPath)
initialFrame = int(self.vidcap.get(cv2.CAP_PROP_FRAME_COUNT))
self.videoSize = (int(self.vidcap.get(cv2.CAP_PROP_FRAME_WIDTH)),
int(self.vidcap.get(cv2.CAP_PROP_FRAME_HEIGHT)))
self.vidcap.set(cv2.CAP_PROP_POS_FRAMES, initialFrame / 6)
self.ratio = ratio
self.reprojThresh = reprojThresh
self.isv3 = imutils.is_cv3()
def _get_rois_opencv(self, file, mode='gray'):
cap = cv2.VideoCapture(file)
vidframes = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
rois = self._read_roi_file(file)
totalframes = rois.shape[0]
if totalframes != vidframes:
print('Roi Frames: %d\n' % totalframes)
print('Vid Frames: %d\n' % vidframes)
raise Exception('Mismatch between the actual number of video frames and the provided ROI _labels')
if mode == 'gray':
roi_seq = np.zeros((totalframes, self._yres, self._xres), dtype=np.float32)
elif mode == 'rgb':
roi_seq = np.zeros((totalframes, self._yres, self._xres, 3), dtype=np.float32)
else:
raise Exception('gray or rgb')
this_frame = 0
while cap.isOpened():
ret, frame = cap.read()
if ret is False:
break
if mode == 'gray':
gray = cv2.cvtColor(frame, cv2.COLOR_RGB2GRAY)
gray_roi = _crop_roi(gray, rois[this_frame, :])
resized = self._resize_frame(gray_roi)
elif mode == 'rgb':
rgb_roi = _crop_roi(frame, rois[this_frame, :])
resized = self._resize_frame(rgb_roi)
else:
raise Exception('gray or rgb')
roi_seq[this_frame, :, :] = resized
this_frame += 1
return roi_seq
def getNextStatusFrame(self):
totalFrame = self.cap.get(cv2.CAP_PROP_FRAME_COUNT)
if self.frame == totalFrame:
self.stop()
self.deleteLater()
self.statusNextFrame = False
elif self.statusNextFrame is True:
self.nextFrame()
def frames(self):
if self.frames_cache:
return self.frames_cache
self.frames_cache = int(self.vid.get(cv2.CAP_PROP_FRAME_COUNT))
return self.frames_cache
def extract_optical_flow(fn, times, frames=8, scale_factor=1.0):
cap = cv2.VideoCapture(fn)
n_frames = cap.get(cv2.CAP_PROP_FRAME_COUNT)
outputs = []
if n_frames < frames * 2:
return outputs
def resize(im):
if scale_factor != 1.0:
new_size = (int(im.shape[1] * scale_factor), int(im.shape[0] * scale_factor))
return cv2.resize(im, new_size, interpolation=cv2.INTER_LINEAR)
else:
return im
for t in times:
cap.set(cv2.CAP_PROP_POS_FRAMES, min(t * n_frames, n_frames - 1 - frames))
ret, frame0 = cap.read()
im0 = resize(cv2.cvtColor(frame0, cv2.COLOR_BGR2GRAY))
mags = []
middle_frame = frame0
for f in range(frames - 1):
ret, frame1 = cap.read()
if f == frames // 2:
middle_frame = frame1
im1 = resize(cv2.cvtColor(frame1, cv2.COLOR_BGR2GRAY))
flow = cv2.calcOpticalFlowFarneback(im0, im1,
None, 0.5, 3, 15, 3, 5, 1.2, 0)
mag, ang = cv2.cartToPolar(flow[...,0], flow[...,1])
mags.append(mag)
im0 = im1
mag = np.sum(mags, 0)
mag = mag.clip(min=0)
norm_mag = (mag - mag.min()) / (mag.max() - mag.min() + 1e-5)
x = middle_frame[..., ::-1].astype(np.float32) / 255
outputs.append((x, norm_mag))
return outputs
def number_of_frames(self):
"""Total number of frames in video source."""
return self.source.get(cv2.CAP_PROP_FRAME_COUNT)
def __init__(self, stream):
"""
Default Initializer
:param stream: the video stream from OpenCV
"""
self.stream = stream
self.len = stream.get(cv2.CAP_PROP_FRAME_COUNT)
self.fps = stream.get(cv2.CAP_PROP_FPS)
def extract_frames(path, stride=1):
print(path)
cap = cv2.VideoCapture(path)
if not cap.isOpened():
print("Error: Failed to open %s" % path)
sys.exit(-1)
try:
FRAME_COUNT = cv2.CAP_PROP_FRAME_COUNT
FRAME_HEIGHT = cv2.CAP_PROP_FRAME_HEIGHT
FRAME_WIDTH = cv2.CAP_PROP_FRAME_WIDTH
except AttributeError:
FRAME_COUNT = cv2.cv.CV_CAP_PROP_FRAME_COUNT
FRAME_HEIGHT = cv2.cv.CV_CAP_PROP_FRAME_HEIGHT
FRAME_WIDTH = cv2.cv.CV_CAP_PROP_FRAME_WIDTH
number_of_frames = int(cap.get(FRAME_COUNT))
length2 = number_of_frames // stride
height = int(cap.get(FRAME_HEIGHT))
width = int(cap.get(FRAME_WIDTH))
frames = np.zeros((length2, height, width, 3), dtype=np.uint8)
for frame_i in xrange(length2):
_, image = cap.read()
frames[frame_i] = image[:, :, :]
for i in xrange(1, stride):
_, image = cap.read()
print(len(frames))
return frames
def get_last_frame(self):
if self.cap is None:
return -1
else:
return int(self.cap.get(cv2.CAP_PROP_FRAME_COUNT) - self.global_video_offset) - 1
def init_camera(self):
self.video_path = os.path.join(self.recording_path, 'camera_front.mp4')
self.cap = cv2.VideoCapture(self.video_path)
self.n_frames = min(len(self.timestamps), int(self.cap.get(cv2.CAP_PROP_FRAME_COUNT)))
self.fps = int(self.cap.get(cv2.CAP_PROP_FPS))
self.frame_id = -1
self.read()
def is_live(cap):
"""
Returns True if the capture is live and False otherwise
:param cap: An OpenCV capture
:return: True if the capture is live and False otherwise
"""
return int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) < 0
def skip_to_launch(cap):
"""
Move cap to the frame before the launch
:param cap: An OpenCV capture of the launch.
:return: the index of first frame at T+00:00:00
"""
initialize(1080)
left = 0
right = cap.get(cv2.CAP_PROP_FRAME_COUNT) - 1
cap.set(cv2.CAP_PROP_POS_FRAMES, int((right+left)/2))
while right > left+1:
_, frame = cap.read()
image = crop(frame, rects['sign'])
if exists(image, sign_template, thresh_dict[frame.shape[0]][1]):
left = int((right+left)/2)
else:
right = int((right+left)/2)
cap.set(cv2.CAP_PROP_POS_FRAMES, int((right + left) / 2))
cap.set(cv2.CAP_PROP_POS_FRAMES, left)
return left
def _compute_3d_dct_opencv(self, file):
r"""
Runs much faster than the menpo-based one
but usually opencv is distributed without video support (compile flag)
and is harder to set up
Works fine with the opencv package from arch linux repos
in which case the system Python has to be used
:param file:
:return:
"""
cap = cv2.VideoCapture(file)
vidframes = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
rois = self._read_roi_file(file)
totalframes = rois.shape[0]
if totalframes != vidframes:
print('Roi Frames: %d\n' % totalframes)
print('Vid Frames: %d\n' % vidframes)
raise Exception('Mismatch between the actual number of video frames and the provided ROI _labels')
dct_seq = np.zeros((totalframes, self._yres, self._xres),
dtype=np.float32) # _yres goes first since numpy indexing is rows-first
this_frame = 0
while cap.isOpened():
ret, frame = cap.read()
if ret is False:
break
gray = cv2.cvtColor(frame, cv2.COLOR_RGB2GRAY)
gray_roi = _crop_roi(gray, rois[this_frame, :])
resized = self._resize_frame(gray_roi)
dctmat = np.zeros(np.shape(resized))
cv2.dct(resized, dctmat)
dct_seq[this_frame, :, :] = dctmat
this_frame += 1
return dct_seq
def vidocr_to_csv(video,vcoords,tbcoords,f1app=True):
# inputs:
# video = video file as a string
# vcoords = array of pixel coordinates [top left x, top left y, bottom right x, bottom right y] for velocity
# tbcoords = array of pixel coordinates [top left x, top left y, bottom right x, bottom right y] for throttle/brake
# f1app = boolean, default = True, use True for video from the F1 app, False for onboard video.
# outputs .csv file with each line as a row of each extracted parameter.
# capture video via opencv
vid = cv2.VideoCapture(video)
s,frm = vid.read()
v_all = []
t_all = []
thr_all = []
brk_all = []
step = 1
total_frames = vid.get(cv2.CAP_PROP_FRAME_COUNT);
print(total_frames)
i = int(total_frames*0);
vid.set(0,i)
# go through each frame and extract data
while s:
if i >= int(total_frames):
break
s,frm = vid.read()
if i%step == 0 or i == total_frames-1:
v_temp = velocity_ocr(frm,vcoords,f1app)
t_temp = vid.get(cv2.CAP_PROP_POS_MSEC)/1e3
v_all += [v_temp]
# thr_temp = throttle_ocr(frm,tbcoords)
# brk_temp = brake_ocr(frm,tbcoords)
# thr_all += [thr_temp]
# brk_all += [brk_temp]
if i%200 == 0:
print(v_temp,t_temp,i)
i += 1
t_all = get_timestamps(video)
# save data to .csv with same filename as videofile
with open(video[0:-4]+'.csv', 'w', newline='') as csvfile:
writer = csv.writer(csvfile)
writer.writerow(v_all)
writer.writerow(t_all)
# writer.writerow(thr_all)
# writer.writerow(brk_all)
writer.writerow([])
writer.writerow([])
print(video,"completed.")
def extract_optical_flow(fn, times, frames=8, scale_factor=1.0):
cap = cv2.VideoCapture(fn)
if not cap.isOpened():
return []
n_frames = cap.get(cv2.CAP_PROP_FRAME_COUNT)
outputs = []
if n_frames < frames * 2:
return outputs
def resize(im):
if scale_factor != 1.0:
new_size = (int(im.shape[1] * scale_factor), int(im.shape[0] * scale_factor))
return cv2.resize(im, new_size, interpolation=cv2.INTER_LINEAR)
else:
return im
for t in times:
cap.set(cv2.CAP_PROP_POS_FRAMES, min(t * n_frames, n_frames - 1 - frames))
ret, frame0 = cap.read()
im0 = resize(cv2.cvtColor(frame0, cv2.COLOR_BGR2GRAY))
mags = []
middle_frame = frame0
flows = []
for f in range(frames - 1):
ret, frame1 = cap.read()
if f == frames // 2:
middle_frame = frame1
im1 = resize(cv2.cvtColor(frame1, cv2.COLOR_BGR2GRAY))
flow = cv2.calcOpticalFlowFarneback(im0, im1,
None,
0.5, # py_scale
8, # levels
int(40 * scale_factor), # winsize
10, # iterations
5, # poly_n
1.1, # poly_sigma
cv2.OPTFLOW_FARNEBACK_GAUSSIAN)
#mag, ang = cv2.cartToPolar(flow[...,0], flow[...,1])
#mags.append(mag)
flows.append(flow)
im0 = im1
flow = (np.mean(flows, 0) / 100).clip(-1, 1)
#flow = np.mean(flows, 0)
#flow /= (flow.mean() * 5 + 1e-5)
#flow = flow.clip(-1, 1)
#flows = flows / (np.mean(flows, 0, keepdims=True) + 1e-5)
x = middle_frame[..., ::-1].astype(np.float32) / 255
outputs.append((x, flow))
return outputs
def get_video_stats(self):
assert(self.sourcetype == 'file')
framecount = self._video.get(cv2.CAP_PROP_FRAME_COUNT)
fps = self._video.get(cv2.CAP_PROP_FPS)
return framecount, fps
def main():
args = parser.parse_args()
mask = cv2.imread(args.mask_file, cv2.IMREAD_COLOR)
cap = cv2.VideoCapture(args.in_video)
last_frame = cap.get(cv2.CAP_PROP_FRAME_COUNT) - 1
if args.end_with == -1:
args.end_with = last_frame
else:
if args.end_with > last_frame:
print(
"Warning: specified end frame ({:d})is beyond the last video frame ({:d}). Stopping after last frame.".format(
args.end_with, last_frame))
args.end_with = last_frame
if args.out_video == "":
args.out_video = args.in_video[:-4] + "_masked.mp4"
writer = cv2.VideoWriter(args.out_video, cv2.VideoWriter_fourcc('X', '2', '6', '4'),
cap.get(cv2.CAP_PROP_FPS),
(int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)), int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))), True)
writer.set(cv2.VIDEOWRITER_PROP_NSTRIPES, cpu_count())
if args.start_from > 0:
cap.set(cv2.CAP_PROP_POS_FRAMES, args.start_from)
total_frame_span = args.end_with - args.start_from
frame_counter = 0
if args.frame_count == -1:
cur_frame_number = args.start_from
while cur_frame_number < args.end_with:
process_frame(cap, writer, mask)
frame_counter += 1
amount_done = frame_counter / total_frame_span
update_progress(amount_done)
cur_frame_number += 1
else:
frame_interval = total_frame_span // args.frame_count
for i_frame in range(args.start_from, args.end_with, frame_interval):
cap.set(cv2.CAP_PROP_POS_FRAMES, i_frame)
process_frame(cap, writer, mask)
frame_counter += 1
amount_done = frame_counter / args.frame_count
update_progress(amount_done)
cap.release()
writer.release()
return 0
def __init__(self, args, out_postfix="_out", with_video_output=True):
self.global_video_offset = 0
self.flip_video = False
self.datapath = "./"
self.__dict__.update(vars(args))
self.writer = None
if os.path.exists("settings.yaml"):
stream = open("settings.yaml", mode='r')
self.settings = load(stream, Loader=Loader)
stream.close()
self.datapath = self.settings['datapath'].replace("<current_user>", getuser())
print("Processing path: ", self.datapath)
if 'raw_options' in self.settings:
raw_options = self.settings['raw_options']
if self.in_video in raw_options:
self.global_video_offset = raw_options[args.in_video]['global_offset']
self.flip_video = raw_options[args.in_video]['flip']
self.cap = None
self.reload_video()
print("Processing video file {:s}.".format(self.in_video))
last_frame = int(self.cap.get(cv2.CAP_PROP_FRAME_COUNT) - 1)
if self.end_with == -1:
self.end_with = last_frame
else:
if self.end_with > last_frame:
print(("Warning: specified end frame ({:d}) is beyond the last video frame" +
" ({:d}). Stopping after last frame.")
.format(self.end_with, last_frame))
self.end_with = last_frame
print("Frame range: {:d}--{:d}".format(self.start_from, self.end_with))
if with_video_output:
if self.out_video == "":
self.out_video = args.in_video[:-4] + "_" + out_postfix + ".mp4"
self.writer = cv2.VideoWriter(os.path.join(self.datapath, self.out_video),
cv2.VideoWriter_fourcc('X', '2', '6', '4'),
self.cap.get(cv2.CAP_PROP_FPS),
(int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH)),
int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT))),
True)
self.writer.set(cv2.VIDEOWRITER_PROP_NSTRIPES, cpu_count())
else:
self.writer = None
self.frame = None
self.cur_frame_number = None
def _load_input_videos(self):
""" Opens and checks that all input video files are valid, can
be processed, and have the same resolution and framerate. """
self.video_resolution = None
self.video_fps = None
self.frames_total = 0
if not len(self.video_paths) > 0:
return False
for video_path in self.video_paths:
cap = cv2.VideoCapture()
cap.open(video_path)
video_name = os.path.basename(video_path)
if not cap.isOpened():
if not self.suppress_output:
print("[DVR-Scan] Error: Couldn't load video %s." % video_name)
print("[DVR-Scan] Check that the given file is a valid video"
" clip, and ensure all required software dependencies"
" are installed and configured properly.")
cap.release()
return False
curr_resolution = (int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)),
int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)))
curr_framerate = cap.get(cv2.CAP_PROP_FPS)
self.frames_total += cap.get(cv2.CAP_PROP_FRAME_COUNT)
cap.release()
if self.video_resolution is None and self.video_fps is None:
self.video_resolution = curr_resolution
self.video_fps = curr_framerate
if not self.suppress_output:
print("[DVR-Scan] Opened video %s (%d x %d at %2.3f FPS)." % (
video_name, self.video_resolution[0],
self.video_resolution[1], self.video_fps))
# Check that all other videos specified have the same resolution
# (we'll assume the framerate is the same if the resolution matches,
# since the VideoCapture FPS information is not always accurate).
elif curr_resolution != self.video_resolution:
if not self.suppress_output:
print("[DVR-Scan] Error: Can't append clip %s, video resolution"
" does not match the first input file." % video_name)
return False
else:
if not self.suppress_output:
print("[DVR-Scan] Appended video %s." % video_name)
# If we get to this point, all videos have the same parameters.
return True
def videoSlice(video_path, save_path, progressbarsetter=None, save_type="png", img_comp=0, start_idx=1):
"""
:param video_path:
:param save_path:
:param save_type:
:param img_comp: default0:
None Higher number increase compressive level
png[0-9], jpg[0-100]
:return:
"""
# For read Chinease-name video
vid_handle = cv2.VideoCapture(video_path)
# vid_handle = cv2.VideoCapture(video_path.encode('utf-8'))
fps = vid_handle.get(cv2.CAP_PROP_FPS)
count = vid_handle.get(cv2.CAP_PROP_FRAME_COUNT)
size = (int(vid_handle.get(cv2.CAP_PROP_FRAME_WIDTH)),
int(vid_handle.get(cv2.CAP_PROP_FRAME_HEIGHT)))
prefix = os.path.basename(save_path)
idx = start_idx # start from 000001.xxx
cnt_idx = 1
params = None
suffix = None
if save_type.upper() == "JPEG" or save_type.upper() == "JPG":
img_type = int(cv2.IMWRITE_JPEG_OPTIMIZE)
suffix = ".jpg"
params = [img_type, img_comp]
elif save_type.upper() == "PNG":
img_type = int(cv2.IMWRITE_PNG_COMPRESSION)
suffix = ".png"
params = [img_type, img_comp]
else:
print("Do not support %s format!" % save_type)
while True:
ret, frame = vid_handle.read()
if ret:
cur_progress = cnt_idx/(count/100.0)
if progressbarsetter is not None:
progressbarsetter(cur_progress)
print("Progress %.2f%%" % cur_progress)
img_name = save_path + "/" + ("%06d" % idx) + suffix
# print img_name
print params
cv2.imwrite(img_name, frame, params)
idx += 1
cnt_idx += 1
else:
break
print("Slicing Done!")
return count