def sparse_optical_flow(im1, im2, pts, fb_threshold=-1,
window_size=15, max_level=2,
criteria=(cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 10, 0.03)):
# Forward flow
p1, st, err = cv2.calcOpticalFlowPyrLK(im1, im2, pts, None,
winSize=(window_size, window_size),
maxLevel=max_level, criteria=criteria )
# Backward flow
if fb_threshold > 0:
p0r, st0, err = cv2.calcOpticalFlowPyrLK(im2, im1, p1, None,
winSize=(window_size, window_size),
maxLevel=max_level, criteria=criteria)
p0r[st0 == 0] = np.nan
# Set only good
fb_good = (np.fabs(p0r-p0) < fb_threshold).all(axis=1)
p1[~fb_good] = np.nan
st = np.bitwise_and(st, st0)
err[~fb_good] = np.nan
return p1, st, err
python类TERM_CRITERIA_COUNT的实例源码
def feature_tracking(image_ref, image_cur, px_ref):
"""Feature Tracking
Parameters
----------
image_ref : np.array
Reference image
image_cur : np.array
Current image
px_ref :
Reference pixels
Returns
-------
(kp1, kp2) : (list of Keypoints, list of Keypoints)
"""
# Setup
win_size = (21, 21)
max_level = 3
criteria = (cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 30, 0.01)
# Perform LK-tracking
lk_params = {"winSize": win_size,
"maxLevel": max_level,
"criteria": criteria}
kp2, st, err = cv2.calcOpticalFlowPyrLK(image_ref,
image_cur,
px_ref,
None,
**lk_params)
# Post-process
st = st.reshape(st.shape[0])
kp1 = px_ref[st == 1]
kp2 = kp2[st == 1]
return kp1, kp2
def _get_alignment(im_ref, im_to_align, key):
if key is not None:
cached_path = Path('align_cache').joinpath('{}.alignment'.format(key))
if cached_path.exists():
with cached_path.open('rb') as f:
return pickle.load(f)
logger.info('Getting alignment for {}'.format(key))
warp_mode = cv2.MOTION_TRANSLATION
warp_matrix = np.eye(2, 3, dtype=np.float32)
criteria = (
cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 5000, 1e-8)
cc, warp_matrix = cv2.findTransformECC(
im_ref, im_to_align, warp_matrix, warp_mode, criteria)
if key is not None:
with cached_path.open('wb') as f:
pickle.dump((cc, warp_matrix), f)
logger.info('Got alignment for {} with cc {:.3f}: {}'
.format(key, cc, str(warp_matrix).replace('\n', '')))
return cc, warp_matrix
def deal(self,frame):
frame=frame.copy()
track_window=self.track_window
term_crit = ( cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 10, 1 )
roi_hist=self.roi_hist
dst = cv2.calcBackProject([frame],[0],roi_hist,[0,180],1)
if self.m=='m':
ret, track_window_r = cv2.meanShift(dst, track_window, term_crit)
x,y,w,h = track_window_r
img2 = cv2.rectangle(frame, (x,y), (x+w,y+h), 255,2)
elif self.m=='c':
ret, track_window_r = cv2.CamShift(dst, track_window, term_crit)
pts = cv2.boxPoints(ret)
pts = np.int0(pts)
img2 = cv2.polylines(frame,[pts],True, 255,2)
rectsNew=[]
center1=(track_window[0]+track_window[2]//2,track_window[1]+track_window[3]//2)
center2=(track_window_r[0]+track_window_r[2]//2,track_window_r[1]+track_window_r[3]//2)
img2 = cv2.line(img2,center1,center2,color=0)
rectsNew=track_window_r
# x,y,w,h = track_window
# img2 = cv2.rectangle(frame, (x,y), (x+w,y+h), 255,2)
cv2.imshow('img2',img2)
cv2.waitKey(0)
cv2.destroyAllWindows()
return rectsNew
def affine(self):
warp_mode = cv2.MOTION_HOMOGRAPHY
criteria = (cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 5000, 1e-10)
warp_matrix = np.eye(3, 3, dtype=np.float32)
while True:
try:
if self.ret[0] is not None and self.client[0].img is not None:
master_cam_grey = cv2.cvtColor(self.client[0].img, cv2.COLOR_BGR2GRAY)
else:
print("Image was none!")
for i in range(1,self.cams):
if self.ret[i] is not None:
print("Trying to calibrate")
slave_cam = cv2.cvtColor(self.client[i].img, cv2.COLOR_BGR2GRAY)
try:
(cc, warp_matrix) = cv2.findTransformECC (self.get_gradient(master_cam_grey), self.get_gradient(slave_cam),warp_matrix, warp_mode, criteria)
except Exception as e:
print(e)
print(warp_matrix)
else:
print("Image was none")
ti.sleep(5);
except:
ti.sleep(1)
def subpixel_pts(self, im, pts):
"""Perform subpixel refinement"""
term = ( cv2.TERM_CRITERIA_EPS + cv2.TERM_CRITERIA_COUNT, 30, 0.1 )
cv2.cornerSubPix(im, pts, (10, 10), (-1, -1), term)
return
def __init__(self, videoSource, featurePtMask=None, verbosity=0):
# cap the length of optical flow tracks
self.maxTrackLength = 10
# detect feature points in intervals of frames; adds robustness for
# when feature points disappear.
self.detectionInterval = 5
# Params for Shi-Tomasi corner (feature point) detection
self.featureParams = dict(
maxCorners=500,
qualityLevel=0.3,
minDistance=7,
blockSize=7
)
# Params for Lucas-Kanade optical flow
self.lkParams = dict(
winSize=(15, 15),
maxLevel=2,
criteria=(cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 10, 0.03)
)
# # Alternatively use a fast feature detector
# self.fast = cv2.FastFeatureDetector_create(500)
self.verbosity = verbosity
(self.videoStream,
self.width,
self.height,
self.featurePtMask) = self._initializeCamera(videoSource)
def test_features():
from atx.drivers.android_minicap import AndroidDeviceMinicap
cv2.namedWindow("preview")
d = AndroidDeviceMinicap()
# r, h, c, w = 200, 100, 200, 100
# track_window = (c, r, w, h)
# oldimg = cv2.imread('base1.png')
# roi = oldimg[r:r+h, c:c+w]
# hsv_roi = cv2.cvtColor(roi, cv2.COLOR_BGR2HSV)
# mask = cv2.inRange(hsv_roi, 0, 255)
# roi_hist = cv2.calcHist([hsv_roi], [0], mask, [180], [0,180])
# cv2.normalize(roi_hist, roi_hist, 0, 255, cv2.NORM_MINMAX)
# term_cirt = (cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 10, 1)
while True:
try:
w, h = d._screen.shape[:2]
img = cv2.resize(d._screen, (h/2, w/2))
cv2.imshow('preview', img)
hist = cv2.calcHist([img], [0], None, [256], [0,256])
plt.plot(plt.hist(hist.ravel(), 256))
plt.show()
# if img.shape == oldimg.shape:
# # hsv = cv2.cvtColor(img, cv2.COLOR_BGR2HSV)
# # ret, track_window = cv2.meanShift(hsv, track_window, term_cirt)
# # x, y, w, h = track_window
# cv2.rectangle(img, (x, y), (x+w, y+h), 255, 2)
# cv2.imshow('preview', img)
# # cv2.imshow('preview', img)
cv2.waitKey(1)
except KeyboardInterrupt:
break
cv2.destroyWindow('preview')
def test_features():
from atx.drivers.android_minicap import AndroidDeviceMinicap
cv2.namedWindow("preview")
d = AndroidDeviceMinicap()
# r, h, c, w = 200, 100, 200, 100
# track_window = (c, r, w, h)
# oldimg = cv2.imread('base1.png')
# roi = oldimg[r:r+h, c:c+w]
# hsv_roi = cv2.cvtColor(roi, cv2.COLOR_BGR2HSV)
# mask = cv2.inRange(hsv_roi, 0, 255)
# roi_hist = cv2.calcHist([hsv_roi], [0], mask, [180], [0,180])
# cv2.normalize(roi_hist, roi_hist, 0, 255, cv2.NORM_MINMAX)
# term_cirt = (cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 10, 1)
while True:
try:
w, h = d._screen.shape[:2]
img = cv2.resize(d._screen, (h/2, w/2))
cv2.imshow('preview', img)
hist = cv2.calcHist([img], [0], None, [256], [0,256])
plt.plot(plt.hist(hist.ravel(), 256))
plt.show()
# if img.shape == oldimg.shape:
# # hsv = cv2.cvtColor(img, cv2.COLOR_BGR2HSV)
# # ret, track_window = cv2.meanShift(hsv, track_window, term_cirt)
# # x, y, w, h = track_window
# cv2.rectangle(img, (x, y), (x+w, y+h), 255, 2)
# cv2.imshow('preview', img)
# # cv2.imshow('preview', img)
cv2.waitKey(1)
except KeyboardInterrupt:
break
cv2.destroyWindow('preview')
def update(self,frame,events):
img = frame.img
img_shape = img.shape[:-1][::-1] # width,height
succeeding_frame = frame.index-self.prev_frame_idx == 1
same_frame = frame.index == self.prev_frame_idx
gray_img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
#vars for calcOpticalFlowPyrLK
lk_params = dict( winSize = (90, 90),
maxLevel = 3,
criteria = (cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 20, 0.03))
updated_past_gaze = []
#lets update past gaze using optical flow: this is like sticking the gaze points onto the pixels of the img.
if self.past_gaze_positions and succeeding_frame:
past_screen_gaze = np.array([denormalize(ng['norm_pos'] ,img_shape,flip_y=True) for ng in self.past_gaze_positions],dtype=np.float32)
new_pts, status, err = cv2.calcOpticalFlowPyrLK(self.prev_gray,gray_img,past_screen_gaze,None,minEigThreshold=0.005,**lk_params)
for gaze,new_gaze_pt,s,e in zip(self.past_gaze_positions,new_pts,status,err):
if s:
# print "norm,updated",gaze['norm_gaze'], normalize(new_gaze_pt,img_shape[:-1],flip_y=True)
gaze['norm_pos'] = normalize(new_gaze_pt,img_shape,flip_y=True)
updated_past_gaze.append(gaze)
# logger.debug("updated gaze")
else:
# logger.debug("dropping gaze")
# Since we will replace self.past_gaze_positions later,
# not appedning tu updated_past_gaze is like deliting this data point.
pass
else:
# we must be seeking, do not try to do optical flow, or pausing: see below.
pass
if same_frame:
# paused
# re-use last result
events['gaze_positions'][:] = self.past_gaze_positions[:]
else:
# trim gaze that is too old
if events['gaze_positions']:
now = events['gaze_positions'][0]['timestamp']
cutoff = now-self.timeframe
updated_past_gaze = [g for g in updated_past_gaze if g['timestamp']>cutoff]
#inject the scan path gaze points into recent_gaze_positions
events['gaze_positions'][:] = updated_past_gaze + events['gaze_positions']
events['gaze_positions'].sort(key=lambda x: x['timestamp']) #this may be redundant...
#update info for next frame.
self.prev_gray = gray_img
self.prev_frame_idx = frame.index
self.past_gaze_positions = events['gaze_positions']
def track_features(self, image_ref, image_cur):
"""Track Features
Parameters
----------
image_ref : np.array
Reference image
image_cur : np.array
Current image
"""
# Re-detect new feature points if too few
if len(self.tracks_tracking) < self.min_nb_features:
self.tracks_tracking = [] # reset alive feature tracks
self.detect(image_ref)
# LK parameters
win_size = (21, 21)
max_level = 2
criteria = (cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 30, 0.03)
# Convert reference keypoints to numpy array
self.kp_ref = self.last_keypoints()
# Perform LK tracking
lk_params = {"winSize": win_size,
"maxLevel": max_level,
"criteria": criteria}
self.kp_cur, statuses, err = cv2.calcOpticalFlowPyrLK(image_ref,
image_cur,
self.kp_ref,
None,
**lk_params)
# Filter out bad matches (choose only good keypoints)
status = statuses.reshape(statuses.shape[0])
still_alive = []
for i in range(len(status)):
if status[i] == 1:
track_id = self.tracks_tracking[i]
still_alive.append(track_id)
kp = KeyPoint(self.kp_cur[i], 0)
self.tracks[track_id].update(self.frame_id, kp)
self.tracks_tracking = still_alive
def __init__(self, image_topic, feature_detector='FAST'):
super(OpticalFlowMatcher, self).__init__()
rospy.init_node('optical_flow_matcher')
self.cv_bridge = CvBridge()
self.rectified_image_topic = rospy.Subscriber(
image_topic,
Image,
self.new_image_callback
)
self.pub_keypoint_motion = rospy.Publisher(
'keypoint_motion',
KeypointMotion,
queue_size=10
)
self.feature_params = None
if feature_detector == 'FAST':
self.get_features = self.get_features_fast
# Initiate FAST detector with default values
self.fast = cv2.FastFeatureDetector_create()
self.fast.setThreshold(20)
elif feature_detector == 'GOOD':
self.get_features = self.get_features_good
# params for ShiTomasi 'GOOD' corner detection
self.feature_params = dict(
maxCorners=200,
qualityLevel=0.3,
minDistance=7,
blockSize=7
)
else:
raise Exception(
'{} feature detector not implemented'.format(feature_detector)
)
# Parameters for lucas kanade optical flow
self.lk_params = dict(
winSize=(15, 15),
maxLevel=2,
criteria=(
cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT,
10,
0.03
)
)
self.last_frame_gray = None
self.good_old = None
self.good_new = None
def translationalThermalReg(im1,im2):
import cv2,numpy
#get dimensions
s1=im1.shape
s2=im2.shape
#check sizes agree as a sanity check for inputs
if s1!=s2:
raise TypeError('Array Inputs are of different sizes!')
#Select translation model in CV
warp_model = cv2.MOTION_AFFINE
#Define 2x3 Warp Matrix
warp_matrix = numpy.eye(2, 3, dtype=numpy.float32)
#Number of iterations allowed to converge on solution
num_it=10000
#Terminal Threshold
termTh = 1e-9
#Define Stopping Criteria
criteria = (cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, num_it, termTh)
#Ensure images are of datatype float32 (for compatibility with transformation convergence)
im1=im1.astype(numpy.float32)
im2=im2.astype(numpy.float32)
#Find Ideal Transform given input parameters
(cc, warp_matrix) = cv2.findTransformECC(im1,im2,warp_matrix, warp_model, criteria)
#Apply Transform
aligned = cv2.warpAffine(im2, warp_matrix, (s1[1], s1[0]), flags=cv2.INTER_LINEAR + cv2.WARP_INVERSE_MAP);
print('Calculated Affine Warp Matrix:')
print(warp_matrix)
return aligned, warp_matrix
#Test Harness for debugging and testing of functions