def grab(cam, queue, width, height, fps):
global running
capture = cv2.VideoCapture(cam)
capture.set(cv2.CAP_PROP_FRAME_WIDTH, width)
capture.set(cv2.CAP_PROP_FRAME_HEIGHT, height)
capture.set(cv2.CAP_PROP_FPS, fps)
while(running):
frame = {}
capture.grab()
retval, img = capture.retrieve(0)
frame["img"] = img
frame["1"] = config["1"]
frame["2"] = config["2"]
blur = get_blur(img, 0.05)
frame["blur"] = blur
if queue.qsize() < 10:
queue.put(frame)
else:
print(queue.qsize())
python类CAP_PROP_FRAME_HEIGHT的实例源码
def __get_video_properties(self):
self.frame_dims = (int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT)),
int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH)))
self.frame_count = int(self.cap.get(cv2.CAP_PROP_FRAME_COUNT))
self.fps = self.cap.get(cv2.CAP_PROP_FPS)
if self.cap.get(cv2.CAP_PROP_MONOCHROME) == 0.0:
self.n_channels = 3
else:
self.n_channels = 1
self.frame = np.zeros((self.frame_dims[0], self.frame_dims[1], self.n_channels), np.uint8)
self.previous_frame = np.zeros((self.frame_dims[0], self.frame_dims[1], self.n_channels), np.uint8)
def showVideoInfo(video_path):
try:
vhandle = cv2.VideoCapture(video_path) # For read Chinease-name video
fps = vhandle.get(cv2.CAP_PROP_FPS)
count = vhandle.get(cv2.CAP_PROP_FRAME_COUNT)
size = (int(vhandle.get(cv2.CAP_PROP_FRAME_WIDTH)),
int(vhandle.get(cv2.CAP_PROP_FRAME_HEIGHT)))
ret, firstframe = vhandle.read()
if ret:
print("FPS: %.2f" % fps)
print("COUNT: %.2f" % count)
print("WIDTH: %d" % size[0])
print("HEIGHT: %d" % size[1])
return vhandle, fps, size, firstframe
else:
print("Video can not read!")
except:
"Error in showVideoInfo"
def grab(cam, queue, width, height, fps):
global running
capture = cv2.VideoCapture(cam)
capture.set(cv2.CAP_PROP_FRAME_WIDTH, width)
capture.set(cv2.CAP_PROP_FRAME_HEIGHT, height)
capture.set(cv2.CAP_PROP_FPS, fps)
while(running):
frame = {}
capture.grab()
retval, img = capture.retrieve(0)
frame["img"] = img
frame["1"] = config["1"]
frame["2"] = config["2"]
blur = get_blur(img, 0.05)
frame["blur"] = blur
if queue.qsize() < 10:
queue.put(frame)
else:
print(queue.qsize())
def initialize(self):
# Initialize video capture
self.cap = cv2.VideoCapture(self.ID)
frameRate = 20.0
frameWidth = 640
frameHeight = 480
if cv2.__version__[0] == "2":
# Latest Stable Version (2.x)
self.cap.set(cv2.cv.CV_CAP_PROP_FPS, frameRate)
self.cap.set(cv2.cv.CV_CAP_PROP_FRAME_WIDTH, frameWidth)
self.cap.set(cv2.cv.CV_CAP_PROP_FRAME_HEIGHT, frameHeight)
else:
# version 3.1.0 (BETA)
self.cap.set(cv2.CAP_PROP_FPS, frameRate)
self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, frameWidth)
self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, frameHeight)
self.thresh = 0.4
self.thresh_img = np.zeros((frameHeight, frameWidth, 3), dtype=np.uint8)
def capture(self):
capture = cv2.VideoCapture(self.device)
capture.set(cv2.CAP_PROP_FRAME_WIDTH, self.width)
capture.set(cv2.CAP_PROP_FRAME_HEIGHT, self.height)
if not capture.isOpened():
raise Exception('Failed to open camera capture.')
for _ in range(0, 10):
ret, img = capture.read()
if not ret or self._blur_index(img) < self.blur_thres:
time.sleep(0.5)
continue
capture.release()
return img
capture.release()
raise Exception('Failed to capture image.')
def pullData(self):
try:
if self.pth:
capture = cv2.VideoCapture(1)
capture.set(cv2.CAP_PROP_FRAME_WIDTH, self.device['baudrate'][1])
capture.set(cv2.CAP_PROP_FRAME_HEIGHT, self.device['baudrate'][0])
while True:
if self.endtr:
capture.release()
return
_, frame = capture.read()
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
self.response.assignStatus(RESPONSE_STATUS['OK'])
self.response.assignData(frame)
yield self.response
except Exception:
traceback.print_exc(file=sys.stdout)
self.endCommunication()
print('Video ended or interrupted, dropped Buffer')
def __init__(self, args, main_out_vid_name="foreground"):
self.mask_writer = None
super().__init__(args, main_out_vid_name)
if args.mask_output_video == "":
args.mask_output_video = args.in_video[:-4] + "_bs_mask.mp4"
self.mask_writer = cv2.VideoWriter(os.path.join(self.datapath, args.mask_output_video),
cv2.VideoWriter_fourcc('X', '2', '6', '4'),
self.cap.get(cv2.CAP_PROP_FPS),
(int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH)),
int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT))),
False)
self.mask_writer.set(cv2.VIDEOWRITER_PROP_NSTRIPES, cpu_count())
self.foreground_writer = self.writer
self.foreground = None
self.mask = None
def webcam(camera_id=0):
# TODO update to support Python's 'with' construct
camera = cv2.VideoCapture()
camera.open(camera_id)
# Use a smaller image size for faster processing
camera.set(cv2.CAP_PROP_FRAME_WIDTH, 320.0)
camera.set(cv2.CAP_PROP_FRAME_HEIGHT, 240.0)
if not camera.isOpened():
raise CameraInitializationError('Camera #{0} failed to open.'.format(camera_id))
while camera.isOpened():
success, frame = camera.read()
if success:
yield frame
camera.release()
def create_capture(source = 0, fallback = presets['chess']):
'''source: <int> or '<int>|<filename>|synth [:<param_name>=<value> [:...]]'
'''
source = str(source).strip()
chunks = source.split(':')
# handle drive letter ('c:', ...)
if len(chunks) > 1 and len(chunks[0]) == 1 and chunks[0].isalpha():
chunks[1] = chunks[0] + ':' + chunks[1]
del chunks[0]
source = chunks[0]
try: source = int(source)
except ValueError: pass
params = dict( s.split('=') for s in chunks[1:] )
cap = None
if source == 'synth':
Class = classes.get(params.get('class', None), VideoSynthBase)
try: cap = Class(**params)
except: pass
else:
cap = cv2.VideoCapture(source)
if 'size' in params:
w, h = map(int, params['size'].split('x'))
cap.set(cv2.CAP_PROP_FRAME_WIDTH, w)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, h)
if cap is None or not cap.isOpened():
print('Warning: unable to open video source: ', source)
if fallback is not None:
return create_capture(fallback, None)
return cap
def __init__(self, videoPath, ratio, reprojThresh):
self.videoPath = videoPath
self.vidcap = cv2.VideoCapture(videoPath)
initialFrame = int(self.vidcap.get(cv2.CAP_PROP_FRAME_COUNT))
self.videoSize = (int(self.vidcap.get(cv2.CAP_PROP_FRAME_WIDTH)),
int(self.vidcap.get(cv2.CAP_PROP_FRAME_HEIGHT)))
self.vidcap.set(cv2.CAP_PROP_POS_FRAMES, initialFrame / 6)
self.ratio = ratio
self.reprojThresh = reprojThresh
self.isv3 = imutils.is_cv3()
def __init__(self, center=int(cvsettings.CAMERA_WIDTH / 2), debug=False, is_usb_webcam=True, period_s=0.025):
# Our video stream
# If its not a usb webcam then get pi camera
if not is_usb_webcam:
self.vs = PiVideoStream(resolution=(cvsettings.CAMERA_WIDTH, cvsettings.CAMERA_HEIGHT))
# Camera cvsettings
self.vs.camera.shutter_speed = cvsettings.SHUTTER
self.vs.camera.exposure_mode = cvsettings.EXPOSURE_MODE
self.vs.camera.exposure_compensation = cvsettings.EXPOSURE_COMPENSATION
self.vs.camera.awb_gains = cvsettings.AWB_GAINS
self.vs.camera.awb_mode = cvsettings.AWB_MODE
self.vs.camera.saturation = cvsettings.SATURATION
self.vs.camera.rotation = cvsettings.ROTATION
self.vs.camera.video_stabilization = cvsettings.VIDEO_STABALIZATION
self.vs.camera.iso = cvsettings.ISO
self.vs.camera.brightness = cvsettings.BRIGHTNESS
self.vs.camera.contrast = cvsettings.CONTRAST
# Else get the usb camera
else:
self.vs = WebcamVideoStream(src=0)
self.vs.stream.set(cv2.CAP_PROP_FRAME_WIDTH, cvsettings.CAMERA_WIDTH)
self.vs.stream.set(cv2.CAP_PROP_FRAME_HEIGHT, cvsettings.CAMERA_HEIGHT)
# Has camera started
self.camera_started = False
self.start_camera() # Starts our camera
# To calculate our error in positioning
self.center = center
# To determine if we actually detected lane or not
self.detected_lane = False
# debug mode on? (to display processed images)
self.debug = debug
# Time interval between in update (in ms)
# FPS = 1/period_s
self.period_s = period_s
# Starting time
self.start_time = time.time()
# Mouse event handler for get_hsv
def determine_size(self, capture):
"""Determines the height and width of the image source.
If no dimensions are available, this method defaults to a resolution of
640x480, thus returns (480, 640).
If capture has a get method it is assumed to understand
`cv2.CAP_PROP_FRAME_WIDTH` and `cv2.CAP_PROP_FRAME_HEIGHT` to get the
information. Otherwise it reads one frame from the source to determine
image dimensions.
Args:
capture: the source to read from.
Returns:
A tuple containing integers of height and width (simple casts).
"""
width = 640
height = 480
if capture and hasattr(capture, 'get'):
width = capture.get(cv2.CAP_PROP_FRAME_WIDTH)
height = capture.get(cv2.CAP_PROP_FRAME_HEIGHT)
else:
self.frame_offset += 1
ret, frame = capture.read()
if ret:
width = frame.shape[1]
height = frame.shape[0]
return (int(height), int(width))
def __init__(self, index=0):
self.frame = None
self.capture = cv2.VideoCapture(0)
self.capture.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
self.capture.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)
# self.capture.set(cv2.CAP_PROP_EXPOSURE, 0)
# self.capture.set(cv2.CAP_PROP_GAIN, 0)
# Define codec and create VideoWriter object
# fourcc = cv2.VideoWriter_fourcc('M', 'J', 'P', 'G')
# self.out = cv2.VideoWriter('output.avi', fourcc, 120.0, (640, 480))
def height(self):
return int(self.vid.get(cv2.CAP_PROP_FRAME_HEIGHT))
def shape(self):
w = int(self.vid.get(cv2.CAP_PROP_FRAME_WIDTH))
h = int(self.vid.get(cv2.CAP_PROP_FRAME_HEIGHT))
return (h, w)
def Camera():
if len(sys.argv) > 1:
CamNum = sys.argv[1]
else:
print("This's The Main Camera")
CamNum = 0
# Resize Capture
# cap.set(cv2.CAP_PROP_FRAME_WIDTH, 300);
# cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 300);
return cv2.VideoCapture(int(CamNum))
def create_capture(source = 0, fallback = presets['chess']):
'''source: <int> or '<int>|<filename>|synth [:<param_name>=<value> [:...]]'
'''
source = str(source).strip()
chunks = source.split(':')
# handle drive letter ('c:', ...)
if len(chunks) > 1 and len(chunks[0]) == 1 and chunks[0].isalpha():
chunks[1] = chunks[0] + ':' + chunks[1]
del chunks[0]
source = chunks[0]
try: source = int(source)
except ValueError: pass
params = dict( s.split('=') for s in chunks[1:] )
cap = None
if source == 'synth':
Class = classes.get(params.get('class', None), VideoSynthBase)
try: cap = Class(**params)
except: pass
else:
cap = cv2.VideoCapture(source)
if 'size' in params:
w, h = map(int, params['size'].split('x'))
cap.set(cv2.CAP_PROP_FRAME_WIDTH, w)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, h)
if cap is None or not cap.isOpened():
print('Warning: unable to open video source: ', source)
if fallback is not None:
return create_capture(fallback, None)
return cap
def height(self):
return int(self._video.get(cv2.CAP_PROP_FRAME_HEIGHT))
def initCap():
cap = cv2.VideoCapture(1)
cap.set(cv2.CAP_PROP_FRAME_WIDTH,common_config.CAP_WIDTH);
cap.set(cv2.CAP_PROP_FRAME_HEIGHT,common_config.CAP_HEIGHT);
return cap
# ????,img????
def init(host='lenovo-pc', port=1234, capL_id=2, capR_id=1):
global capL, capR, ltr, rtr, sock
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((host, port))
capL = cv2.VideoCapture(capL_id)
capL.set(cv2.CAP_PROP_FRAME_WIDTH, BINCAP_W)
capL.set(cv2.CAP_PROP_FRAME_HEIGHT, BINCAP_H)
capR = cv2.VideoCapture(capR_id)
capR.set(cv2.CAP_PROP_FRAME_WIDTH, BINCAP_W)
capR.set(cv2.CAP_PROP_FRAME_HEIGHT, BINCAP_H)
ltr = GetFrameLThread()
rtr = GetFrameRThread()
ltr.start()
rtr.start()
def extract_frames(path, stride=1):
print(path)
cap = cv2.VideoCapture(path)
if not cap.isOpened():
print("Error: Failed to open %s" % path)
sys.exit(-1)
try:
FRAME_COUNT = cv2.CAP_PROP_FRAME_COUNT
FRAME_HEIGHT = cv2.CAP_PROP_FRAME_HEIGHT
FRAME_WIDTH = cv2.CAP_PROP_FRAME_WIDTH
except AttributeError:
FRAME_COUNT = cv2.cv.CV_CAP_PROP_FRAME_COUNT
FRAME_HEIGHT = cv2.cv.CV_CAP_PROP_FRAME_HEIGHT
FRAME_WIDTH = cv2.cv.CV_CAP_PROP_FRAME_WIDTH
number_of_frames = int(cap.get(FRAME_COUNT))
length2 = number_of_frames // stride
height = int(cap.get(FRAME_HEIGHT))
width = int(cap.get(FRAME_WIDTH))
frames = np.zeros((length2, height, width, 3), dtype=np.uint8)
for frame_i in xrange(length2):
_, image = cap.read()
frames[frame_i] = image[:, :, :]
for i in xrange(1, stride):
_, image = cap.read()
print(len(frames))
return frames
def __init__(self):
self.cam = cv2.VideoCapture(0)
self.w = 800
self.h = 600
self.cam.set(cv2.CAP_PROP_FRAME_HEIGHT, self.w)
self.cam.set(cv2.CAP_PROP_FRAME_WIDTH, self.h)
def _writevideoframe(self):
if not self.is_writingvideo:
return
if self._videoWriter is None:
fps = self._capture.get(cv2.CAP_PROP_FPS)
if fps == 0.0:
# FPS???????
if self._frameElapsed < 20:
# wait until more frame elapse so that the estimate is more stable.
return
else:
fps = self._fpsEstimate
# print fps
size = (int(self._capture.get(cv2.CAP_PROP_FRAME_WIDTH)), int(self._capture.get(cv2.CAP_PROP_FRAME_HEIGHT)))
self._videoWriter = cv2.VideoWriter(self._videoFilename, self._videoEncoding, fps, size)
self._videoWriter.write(self._frame)
def create_capture(source = 0, fallback = presets['chess']):
'''source: <int> or '<int>|<filename>|synth [:<param_name>=<value> [:...]]'
'''
source = str(source).strip()
chunks = source.split(':')
# handle drive letter ('c:', ...)
if len(chunks) > 1 and len(chunks[0]) == 1 and chunks[0].isalpha():
chunks[1] = chunks[0] + ':' + chunks[1]
del chunks[0]
source = chunks[0]
try: source = int(source)
except ValueError: pass
params = dict( s.split('=') for s in chunks[1:] )
cap = None
if source == 'synth':
Class = classes.get(params.get('class', None), VideoSynthBase)
try: cap = Class(**params)
except: pass
else:
cap = cv2.VideoCapture(source)
if 'size' in params:
w, h = map(int, params['size'].split('x'))
cap.set(cv2.CAP_PROP_FRAME_WIDTH, w)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, h)
if cap is None or not cap.isOpened():
print('Warning: unable to open video source: ', source)
if fallback is not None:
return create_capture(fallback, None)
return cap
def __init__(self, source, config):
camera = config.camera()
self.__source = source
self.__camera = cv2.VideoCapture(source)
self.__width = int(camera['width'])
self.__height = int(camera['height'])
self.__camera.set(cv2.CAP_PROP_FRAME_WIDTH, self.__width)
self.__camera.set(cv2.CAP_PROP_FRAME_HEIGHT, self.__height)
def create_capture(source = 0, fallback = presets['chess']):
'''source: <int> or '<int>|<filename>|synth [:<param_name>=<value> [:...]]'
'''
source = str(source).strip()
chunks = source.split(':')
# handle drive letter ('c:', ...)
if len(chunks) > 1 and len(chunks[0]) == 1 and chunks[0].isalpha():
chunks[1] = chunks[0] + ':' + chunks[1]
del chunks[0]
source = chunks[0]
try: source = int(source)
except ValueError: pass
params = dict( s.split('=') for s in chunks[1:] )
cap = None
if source == 'synth':
Class = classes.get(params.get('class', None), VideoSynthBase)
try: cap = Class(**params)
except: pass
else:
cap = cv2.VideoCapture(source)
if 'size' in params:
w, h = map(int, params['size'].split('x'))
cap.set(cv2.CAP_PROP_FRAME_WIDTH, w)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, h)
if cap is None or not cap.isOpened():
print('Warning: unable to open video source: ', source)
if fallback is not None:
return create_capture(fallback, None)
return cap
def ip_camera(url):
warnings.warn('Untested.', RuntimeWarning)
# Alternate method, should work better than below if it works
camera = cv2.VideoCapture(url)
# Unnecessary if resizing through firmware
# camera.set(cv2.CAP_PROP_FRAME_WIDTH, 320.0)
# camera.set(cv2.CAP_PROP_FRAME_HEIGHT, 240.0)
if not camera.isOpened():
raise CameraInitializationError('Failed to open camera at {}'.format(url))
while camera.isOpened():
success, frame = camera.read()
if success:
yield frame
camera.release()
# mjpeg stream decoding: http://stackoverflow.com/a/21844162/5623874
# stream = request.urlopen(url)
# bytes = b''
# while True:
# bytes += stream.read(1024)
# a = bytes.find(b'\xff\xd8')
# b = bytes.find(b'\xff\xd9')
# if a != -1 and b != -1:
# jpg = bytes[a:b + 2]
# bytes = bytes[b + 2:]
# image = cv2.imdecode(np.fromstring(jpg, dtype=np.uint8), cv2.IMREAD_COLOR)
# yield image
# TODO documentation
video.py 文件源码
项目:Image-Processing-and-Feature-Detection
作者: amita-kapoor
项目源码
文件源码
阅读 26
收藏 0
点赞 0
评论 0
def create_capture(source = 0, fallback = presets['chess']):
'''source: <int> or '<int>|<filename>|synth [:<param_name>=<value> [:...]]'
'''
source = str(source).strip()
chunks = source.split(':')
# handle drive letter ('c:', ...)
if len(chunks) > 1 and len(chunks[0]) == 1 and chunks[0].isalpha():
chunks[1] = chunks[0] + ':' + chunks[1]
del chunks[0]
source = chunks[0]
try: source = int(source)
except ValueError: pass
params = dict( s.split('=') for s in chunks[1:] )
cap = None
if source == 'synth':
Class = classes.get(params.get('class', None), VideoSynthBase)
try: cap = Class(**params)
except: pass
else:
cap = cv2.VideoCapture(source)
if 'size' in params:
w, h = map(int, params['size'].split('x'))
cap.set(cv2.CAP_PROP_FRAME_WIDTH, w)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, h)
if cap is None or not cap.isOpened():
print('Warning: unable to open video source: ', source)
if fallback is not None:
return create_capture(fallback, None)
return cap
def main(parser):
capture = cv2.VideoCapture(parser.source)
src_width, src_height = int(capture.get(cv2.CAP_PROP_FRAME_WIDTH)), int(capture.get(cv2.CAP_PROP_FRAME_HEIGHT))
if(parser.record == True):
fourcc = cv2.VideoWriter_fourcc(*'XVID')
out = cv2.VideoWriter(parser.output_path,fourcc, 20.0, (src_width,src_height))
cascPath = "./haarcascade_frontalface_default.xml"
faceCascade = cv2.CascadeClassifier(cascPath)
while True:
ret, frame = capture.read()
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
faces = faceCascade.detectMultiScale(
gray,
scaleFactor=1.1,
minNeighbors=5,
minSize=(30, 30),
flags = cv2.CASCADE_SCALE_IMAGE
)
pred_features = detect_features(gray, faces, src_width, src_height, parser.width, parser.height)
result_img = draw_features_point_on_image(frame, pred_features, src_width, src_height)
for (x, y, w, h) in faces:
cv2.rectangle(result_img, (x, y), (x+w, y+h), (0, 255, 0), 1)
if (ret==True) and (parser.record == True):
out.write(result_img)
cv2.imshow('Video', result_img)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
capture.release()
if parser.record == True:
out.release()
cv2.destroyAllWindows()