def sparse_optical_flow(im1, im2, pts, fb_threshold=-1,
window_size=15, max_level=2,
criteria=(cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 10, 0.03)):
# Forward flow
p1, st, err = cv2.calcOpticalFlowPyrLK(im1, im2, pts, None,
winSize=(window_size, window_size),
maxLevel=max_level, criteria=criteria )
# Backward flow
if fb_threshold > 0:
p0r, st0, err = cv2.calcOpticalFlowPyrLK(im2, im1, p1, None,
winSize=(window_size, window_size),
maxLevel=max_level, criteria=criteria)
p0r[st0 == 0] = np.nan
# Set only good
fb_good = (np.fabs(p0r-p0) < fb_threshold).all(axis=1)
p1[~fb_good] = np.nan
st = np.bitwise_and(st, st0)
err[~fb_good] = np.nan
return p1, st, err
python类calcOpticalFlowPyrLK()的实例源码
def track(self, im0, im1, p0):
"""
Main tracking method using sparse optical flow (LK)
"""
if p0 is None or not len(p0):
return np.array([])
# Forward flow
p1, st1, err1 = cv2.calcOpticalFlowPyrLK(im0, im1, p0, None, **self.lk_params_)
p1[st1 == 0] = np.nan
if self.fb_check_:
# Backward flow
p0r, st0, err0 = cv2.calcOpticalFlowPyrLK(im1, im0, p1, None, **self.lk_params_)
p0r[st0 == 0] = np.nan
# Set only good
fb_good = (np.fabs(p0r-p0) < 3).all(axis=1)
p1[~fb_good] = np.nan
return p1
def feature_tracking(image_ref, image_cur, px_ref):
"""Feature Tracking
Parameters
----------
image_ref : np.array
Reference image
image_cur : np.array
Current image
px_ref :
Reference pixels
Returns
-------
(kp1, kp2) : (list of Keypoints, list of Keypoints)
"""
# Setup
win_size = (21, 21)
max_level = 3
criteria = (cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 30, 0.01)
# Perform LK-tracking
lk_params = {"winSize": win_size,
"maxLevel": max_level,
"criteria": criteria}
kp2, st, err = cv2.calcOpticalFlowPyrLK(image_ref,
image_cur,
px_ref,
None,
**lk_params)
# Post-process
st = st.reshape(st.shape[0])
kp1 = px_ref[st == 1]
kp2 = kp2[st == 1]
return kp1, kp2
def update(self,frame,events):
if self.active:
recent_pupil_positions = events['pupil_positions']
if self.first_img is None:
self.first_img = frame.gray.copy()
self.detected = False
if self.count:
gray = frame.gray
# in cv2.3 nextPts is falsly required as an argument.
nextPts_dummy = self.point.copy()
nextPts,status, err = cv2.calcOpticalFlowPyrLK(self.first_img,gray,self.point,nextPts_dummy,winSize=(100,100))
if status[0]:
self.detected = True
self.point = nextPts
self.first_img = gray.copy()
nextPts = nextPts[0]
self.pos = normalize(nextPts,(gray.shape[1],gray.shape[0]),flip_y=True)
self.count -=1
ref = {}
ref["screen_pos"] = nextPts
ref["norm_pos"] = self.pos
ref["timestamp"] = frame.timestamp
self.ref_list.append(ref)
#always save pupil positions
for p_pt in recent_pupil_positions:
if p_pt['confidence'] > self.pupil_confidence_threshold:
self.pupil_list.append(p_pt)
if self.count:
self.button.status_text = 'Sampling Gaze Data'
else:
self.button.status_text = 'Click to Sample at Location'
def process_new_frame(self, new_frame):
# convert this frame to grayscale
frame_gray = cv2.cvtColor(new_frame, cv2.COLOR_BGR2GRAY)
# for first frame, bail fast
if self.last_frame_gray is None:
self.store_as_last_frame(frame_gray)
return
# use OpenCV to calculate optical flow
new_frame_matched_features, status, error = cv2.calcOpticalFlowPyrLK(
self.last_frame_gray,
frame_gray,
self.last_frame_features,
None,
**self.lk_params
)
self.publish_interframe_motion(
self.last_frame_features,
new_frame_matched_features,
status,
error
)
# save data for next frame
self.store_as_last_frame(frame_gray)
def update(self,frame,events):
img = frame.img
img_shape = img.shape[:-1][::-1] # width,height
succeeding_frame = frame.index-self.prev_frame_idx == 1
same_frame = frame.index == self.prev_frame_idx
gray_img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
#vars for calcOpticalFlowPyrLK
lk_params = dict( winSize = (90, 90),
maxLevel = 3,
criteria = (cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 20, 0.03))
updated_past_gaze = []
#lets update past gaze using optical flow: this is like sticking the gaze points onto the pixels of the img.
if self.past_gaze_positions and succeeding_frame:
past_screen_gaze = np.array([denormalize(ng['norm_pos'] ,img_shape,flip_y=True) for ng in self.past_gaze_positions],dtype=np.float32)
new_pts, status, err = cv2.calcOpticalFlowPyrLK(self.prev_gray,gray_img,past_screen_gaze,None,minEigThreshold=0.005,**lk_params)
for gaze,new_gaze_pt,s,e in zip(self.past_gaze_positions,new_pts,status,err):
if s:
# print "norm,updated",gaze['norm_gaze'], normalize(new_gaze_pt,img_shape[:-1],flip_y=True)
gaze['norm_pos'] = normalize(new_gaze_pt,img_shape,flip_y=True)
updated_past_gaze.append(gaze)
# logger.debug("updated gaze")
else:
# logger.debug("dropping gaze")
# Since we will replace self.past_gaze_positions later,
# not appedning tu updated_past_gaze is like deliting this data point.
pass
else:
# we must be seeking, do not try to do optical flow, or pausing: see below.
pass
if same_frame:
# paused
# re-use last result
events['gaze_positions'][:] = self.past_gaze_positions[:]
else:
# trim gaze that is too old
if events['gaze_positions']:
now = events['gaze_positions'][0]['timestamp']
cutoff = now-self.timeframe
updated_past_gaze = [g for g in updated_past_gaze if g['timestamp']>cutoff]
#inject the scan path gaze points into recent_gaze_positions
events['gaze_positions'][:] = updated_past_gaze + events['gaze_positions']
events['gaze_positions'].sort(key=lambda x: x['timestamp']) #this may be redundant...
#update info for next frame.
self.prev_gray = gray_img
self.prev_frame_idx = frame.index
self.past_gaze_positions = events['gaze_positions']
def track_features(self, image_ref, image_cur):
"""Track Features
Parameters
----------
image_ref : np.array
Reference image
image_cur : np.array
Current image
"""
# Re-detect new feature points if too few
if len(self.tracks_tracking) < self.min_nb_features:
self.tracks_tracking = [] # reset alive feature tracks
self.detect(image_ref)
# LK parameters
win_size = (21, 21)
max_level = 2
criteria = (cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 30, 0.03)
# Convert reference keypoints to numpy array
self.kp_ref = self.last_keypoints()
# Perform LK tracking
lk_params = {"winSize": win_size,
"maxLevel": max_level,
"criteria": criteria}
self.kp_cur, statuses, err = cv2.calcOpticalFlowPyrLK(image_ref,
image_cur,
self.kp_ref,
None,
**lk_params)
# Filter out bad matches (choose only good keypoints)
status = statuses.reshape(statuses.shape[0])
still_alive = []
for i in range(len(status)):
if status[i] == 1:
track_id = self.tracks_tracking[i]
still_alive.append(track_id)
kp = KeyPoint(self.kp_cur[i], 0)
self.tracks[track_id].update(self.frame_id, kp)
self.tracks_tracking = still_alive
def calculate_sift(self, last_frame, new_frame, last_kp=None):
# find corresponding points in the input image and the template image
bf = cv2.BFMatcher()
matches = bf.knnMatch(self.descs[k], scene_desc, k=2)
# Apply Lowe Ratio Test to the keypoints
# this should weed out unsure matches
good_keypoints = []
for m, n in matches:
if m.distance < self.good_thresh * n.distance:
good_keypoints.append(m)
# put keypoints from template image in template_pts
# transform the keypoint data into arrays for homography check
# grab precomputed points
template_pts = np.float32(
[self.kps[k][m.queryIdx].pt for m in good_keypoints]
).reshape(-1, 1, 2)
# put corresponding keypoints from input image in scene_img_pts
scene_img_pts = np.float32(
[scene_kps[m.trainIdx].pt for m in good_keypoints]
).reshape(-1, 1, 2)
# if we can't find any matching keypoints, bail
# (probably the scene image was nonexistant/real bad)
if scene_img_pts.shape[0] == 0:
return None
# use OpenCV to calculate optical flow
new_frame_matched_features, status, error = cv2.calcOpticalFlowPyrLK(
self.last_frame_gray,
frame_gray,
self.last_frame_features,
None,
**self.lk_params
)
self.publish_interframe_motion(
self.last_frame_features,
new_frame_matched_features,
status,
error
)
# save data for next frame
self.store_as_last_frame(frame_gray)