def optical_flow(one, two):
"""
method taken from (https://chatbotslife.com/autonomous-vehicle-speed-estimation-from-dashboard-cam-ca96c24120e4)
"""
one_g = cv2.cvtColor(one, cv2.COLOR_RGB2GRAY)
two_g = cv2.cvtColor(two, cv2.COLOR_RGB2GRAY)
hsv = np.zeros((120, 320, 3))
# set saturation
hsv[:,:,1] = cv2.cvtColor(two, cv2.COLOR_RGB2HSV)[:,:,1]
# obtain dense optical flow paramters
flow = cv2.calcOpticalFlowFarneback(one_g, two_g, flow=None,
pyr_scale=0.5, levels=1, winsize=15,
iterations=2,
poly_n=5, poly_sigma=1.1, flags=0)
# convert from cartesian to polar
mag, ang = cv2.cartToPolar(flow[..., 0], flow[..., 1])
# hue corresponds to direction
hsv[:,:,0] = ang * (180/ np.pi / 2)
# value corresponds to magnitude
hsv[:,:,2] = cv2.normalize(mag,None,0,255,cv2.NORM_MINMAX)
# convert HSV to int32's
hsv = np.asarray(hsv, dtype= np.float32)
rgb_flow = cv2.cvtColor(hsv,cv2.COLOR_HSV2RGB)
return rgb_flow
python类calcOpticalFlowFarneback()的实例源码
def readImg(FileList, data_shape):
mat = []
tmp = 0
ret = len(FileList)/(NUM_SAMPLES+1)
for i in range(ret):
for j in range(NUM_SAMPLES):
index = i * (NUM_SAMPLES+1) + j
img_1 = cv2.imread(FileList[index], 0)
img_11 = cv2.resize(img_1, (data_shape[2], data_shape[1]))
img_111 = np.multiply(img_11, 1/255.0)
img_2 = cv2.imread(FileList[index+1], 0)
img_22 = cv2.resize(img_2, (data_shape[2], data_shape[1]))
img_222 = np.multiply(img_22, 1/255.0)
flow = cv2.calcOpticalFlowFarneback(img_111, img_222, 0.5, 3, 15, 3, 5, 1.2, 0)
flow = np.array(flow)
flow_1 = flow.transpose((2,1,0))
flow_1 = flow_1.tolist()
mat.append(flow_1)
return mat
def dense_optical_flow(im1, im2, pyr_scale=0.5, levels=3, winsize=5,
iterations=3, poly_n=5, poly_sigma=1.2, fb_threshold=-1,
mask1=None, mask2=None,
flow1=None, flow2=None):
if flow1 is None:
fflow = cv2.calcOpticalFlowFarneback(to_gray(im1), to_gray(im2), pyr_scale, levels, winsize,
iterations, poly_n, poly_sigma, 0)
else:
fflow = cv2.calcOpticalFlowFarneback(to_gray(im1), to_gray(im2), pyr_scale, levels, winsize,
iterations, poly_n, poly_sigma, 0, flow1.copy())
if mask1 is not None:
fflow[~mask1.astype(np.bool)] = np.nan
if fb_threshold > 0:
H, W = im1.shape[:2]
xs, ys = np.meshgrid(np.arange(W), np.arange(H))
xys1 = np.dstack([xs, ys])
xys2 = xys1 + fflow
rflow = dense_optical_flow(im2, im1, pyr_scale=pyr_scale, levels=levels,
winsize=winsize, iterations=iterations, poly_n=poly_n,
poly_sigma=poly_sigma, fb_threshold=-1)
if mask2 is not None:
rflow[~mask2.astype(np.bool)] = np.nan
xys1r = xys2 + rflow
fb_bad = (np.fabs(xys1r - xys1) > fb_threshold).all(axis=2)
fflow[fb_bad] = np.nan
return fflow
def optical_flow(one, two):
"""
method taken from https://chatbotslife.com/autonomous-vehicle-speed-estimation-from-dashboard-cam-ca96c24120e4
input: image_current, image_next (RGB images)
calculates optical flow magnitude and angle and places it into HSV image
"""
one_g = cv2.cvtColor(one, cv2.COLOR_RGB2GRAY)
two_g = cv2.cvtColor(two, cv2.COLOR_RGB2GRAY)
hsv = np.zeros((120, 320, 3))
# set saturation
hsv[:,:,1] = cv2.cvtColor(two, cv2.COLOR_RGB2HSV)[:,:,1]
# obtain dense optical flow paramters
flow = cv2.calcOpticalFlowFarneback(one_g, two_g, flow=None,
pyr_scale=0.5,
levels=1,
winsize=10,
iterations=2,
poly_n=5,
poly_sigma=1.1,
flags=0)
# convert from cartesian to polar
mag, ang = cv2.cartToPolar(flow[..., 0], flow[..., 1])
# hue corresponds to direction
hsv[:,:,0] = ang * (180/ np.pi / 2)
# value corresponds to magnitude
hsv[:,:,2] = cv2.normalize(mag,None,0,255,cv2.NORM_MINMAX)
# convert HSV to int32's
hsv = np.asarray(hsv, dtype= np.float32)
rgb_flow = cv2.cvtColor(hsv,cv2.COLOR_HSV2RGB)
return rgb_flow
def calculate_flow(self, frame_a, frame_b):
previous_frame = cv2.cvtColor(frame_a, cv2.COLOR_BGR2GRAY)
next_frame = cv2.cvtColor(frame_b, cv2.COLOR_BGR2GRAY)
flow = cv2.calcOpticalFlowFarneback(
previous_frame,
next_frame,
None,
0.5, 3, 15, 3, 5, 1.2, 0
)
# Change here
horz = cv2.normalize(flow[..., 0], None, 0, 255, cv2.NORM_MINMAX)
vert = cv2.normalize(flow[..., 1], None, 0, 255, cv2.NORM_MINMAX)
horz = horz.astype('uint8')
vert = vert.astype('uint8')
# Change here too
cv2.imshow('Horizontal Component', horz)
cv2.imshow('Vertical Component', vert)
k = cv2.waitKey(0) & 0xff
if k == ord('s'): # Change here
cv2.imwrite('opticalflow_horz.pgm', horz)
cv2.imwrite('opticalflow_vert.pgm', vert)
cv2.destroyAllWindows()
def extract_optical_flow(fn, times, frames=8, scale_factor=1.0):
cap = cv2.VideoCapture(fn)
n_frames = cap.get(cv2.CAP_PROP_FRAME_COUNT)
outputs = []
if n_frames < frames * 2:
return outputs
def resize(im):
if scale_factor != 1.0:
new_size = (int(im.shape[1] * scale_factor), int(im.shape[0] * scale_factor))
return cv2.resize(im, new_size, interpolation=cv2.INTER_LINEAR)
else:
return im
for t in times:
cap.set(cv2.CAP_PROP_POS_FRAMES, min(t * n_frames, n_frames - 1 - frames))
ret, frame0 = cap.read()
im0 = resize(cv2.cvtColor(frame0, cv2.COLOR_BGR2GRAY))
mags = []
middle_frame = frame0
for f in range(frames - 1):
ret, frame1 = cap.read()
if f == frames // 2:
middle_frame = frame1
im1 = resize(cv2.cvtColor(frame1, cv2.COLOR_BGR2GRAY))
flow = cv2.calcOpticalFlowFarneback(im0, im1,
None, 0.5, 3, 15, 3, 5, 1.2, 0)
mag, ang = cv2.cartToPolar(flow[...,0], flow[...,1])
mags.append(mag)
im0 = im1
mag = np.sum(mags, 0)
mag = mag.clip(min=0)
norm_mag = (mag - mag.min()) / (mag.max() - mag.min() + 1e-5)
x = middle_frame[..., ::-1].astype(np.float32) / 255
outputs.append((x, norm_mag))
return outputs
video_jpeg_rolls_flow_saliency.py 文件源码
项目:self-supervision
作者: gustavla
项目源码
文件源码
阅读 22
收藏 0
点赞 0
评论 0
def extract_optical_flow(fn, n_frames=34):
img = dd.image.load(fn)
if img.shape != (128*34, 128, 3):
return []
frames = np.array_split(img, 34, axis=0)
grayscale_frames = [fr.mean(-1) for fr in frames]
mags = []
skip_frames = np.random.randint(34 - n_frames + 1)
middle_frame = frames[np.random.randint(skip_frames, skip_frames+n_frames)]
im0 = grayscale_frames[skip_frames]
for f in range(1+skip_frames, 1+skip_frames+n_frames-1):
im1 = grayscale_frames[f]
flow = cv2.calcOpticalFlowFarneback(im0, im1,
None, # flow
0.5, # pyr_scale
3, # levels
np.random.randint(3, 20), # winsize
3, #iterations
5, #poly_n
1.2, #poly_sigma
0 # flags
)
mag, ang = cv2.cartToPolar(flow[...,0], flow[...,1])
mags.append(mag)
im0 = im1
mag = np.sum(mags, 0)
mag = mag.clip(min=0)
#norm_mag = np.tanh(mag * 10000)
norm_mag = (mag - mag.min()) / (mag.max() - mag.min() + 1e-5)
outputs = []
outputs.append((middle_frame, norm_mag))
return outputs
def _calc_optical_flow(prev, next_):
flow = cv2.calcOpticalFlowFarneback(prev, next_, flow=None, pyr_scale=0.5, levels=3, winsize=15, iterations=3,
poly_n=5, poly_sigma=1.2, flags=0)
return flow
def test_flow(img1, img2):
gray1 = cv2.cvtColor(img1, cv2.COLOR_BGR2GRAY)
gray2 = cv2.cvtColor(img2, cv2.COLOR_BGR2GRAY)
flow = cv2.calcOpticalFlowFarneback(gray1, gray2, 0.5, 3, 15, 3, 5, 1.2, 0)
cv2.imshow('flow', draw_flow(gray1, flow))
cv2.imshow('flow HSV', draw_hsv(flow))
cv2.imshow('warp', warp_flow(cur_glitch, flow))
def track(self, im0, im1, p0):
if p0 is None or not len(p0):
return np.array([])
fflow = cv2.calcOpticalFlowFarneback(im0, im1, **self.farneback_params_)
fflow = cv2.medianBlur(fflow, 5)
# Initialize forward flow and propagated points
p1 = np.ones(shape=p0.shape) * np.nan
flow_p0 = np.ones(shape=p0.shape) * np.nan
flow_good = np.ones(shape=p0.shape, dtype=bool)
# Check finite value for pts, and within image bounds
valid0 = finite_and_within_bounds(p0, im0.shape)
# Determine finite flow at points
xys0 = p0[valid0].astype(int)
flow_p0[valid0] = fflow[xys0[:,1], xys0[:,0]]
# Propagate
p1 = p0 + flow_p0
# FWD-BWD check
if self.fb_check_:
# Initialize reverse flow and propagated points
p0r = np.ones(shape=p0.shape) * np.nan
flow_p1 = np.ones(shape=p0.shape) * np.nan
rflow = cv2.calcOpticalFlowFarneback(im1, im0, **self.farneback_params_)
rflow = cv2.medianBlur(rflow, 5)
# Check finite value for pts, and within image bounds
valid1 = finite_and_within_bounds(p1, im0.shape)
# Determine finite flow at points
xys1 = p1[valid1].astype(int)
flow_p1[valid1] = rflow[xys1[:,1], xys1[:,0]]
# Check diff
p0r = p1 + flow_p1
fb_good = (np.fabs(p0r-p0) < 3).all(axis=1)
# Set only good flow
flow_p0[~fb_good] = np.nan
p1 = p0 + flow_p0
return p1
def calculateOpticalFlowsForDataset(self, image_reference, dataset):
# optical flows will be stored here
list_optical_flows = []
# add zero optical flow for the reference image which is at first position
shape_image_reference = image_reference.shape
shape_optical_flow = [shape_image_reference[0],
shape_image_reference[1],
2]
zero_optical_flow = np.zeros(shape_optical_flow, np.float32)
list_optical_flows.append(zero_optical_flow)
# iterate through the dataset and calculate the optical flow for each
# except the first one
num_images = dataset.getImageCount()
for index in range(1, num_images):
print ("calculating optical flow for image ", index)
# Get the image at the index
data = dataset.getData(index)
hdu_image = data["hdu_list"][self.extension].data
image = CommonFunctions.preprocessHduImage(hdu_image,
self.scale_factor)
# @todo: here, do not use config but simply check if matrix is None
# apply the transformation to the input image
if self.config["Processing_Options"]["align_images"] == "True":
# get the image dimension
image_shape = image.shape
# Transform the Image
image = cv2.warpAffine(image,
data["transform_matrix"],
(image_shape[1],image_shape[0]),
flags=cv2.INTER_CUBIC + cv2.WARP_INVERSE_MAP)
# calculate the optical flow (backwards for warping!)
optical_flow = cv2.calcOpticalFlowFarneback(image_reference,
image,
None,
self.pyr_scale,
self.levels,
self.winsize,
self.iterations,
self.poly_n,
self.poly_sigma,
cv2.OPTFLOW_FARNEBACK_GAUSSIAN)
# Write out optical flow images for user evaluation
self.writeOpticalFlowImage(index, optical_flow)
list_optical_flows.append(optical_flow)
return list_optical_flows
## Average a list of optical flows
# @param list_optical_flows list object containing optical flows as numpy arrays
# @return averaged optical flow as numpy array
def extract_optical_flow(fn, times, frames=8, scale_factor=1.0):
cap = cv2.VideoCapture(fn)
if not cap.isOpened():
return []
n_frames = cap.get(cv2.CAP_PROP_FRAME_COUNT)
outputs = []
if n_frames < frames * 2:
return outputs
def resize(im):
if scale_factor != 1.0:
new_size = (int(im.shape[1] * scale_factor), int(im.shape[0] * scale_factor))
return cv2.resize(im, new_size, interpolation=cv2.INTER_LINEAR)
else:
return im
for t in times:
cap.set(cv2.CAP_PROP_POS_FRAMES, min(t * n_frames, n_frames - 1 - frames))
ret, frame0 = cap.read()
im0 = resize(cv2.cvtColor(frame0, cv2.COLOR_BGR2GRAY))
mags = []
middle_frame = frame0
flows = []
for f in range(frames - 1):
ret, frame1 = cap.read()
if f == frames // 2:
middle_frame = frame1
im1 = resize(cv2.cvtColor(frame1, cv2.COLOR_BGR2GRAY))
flow = cv2.calcOpticalFlowFarneback(im0, im1,
None,
0.5, # py_scale
8, # levels
int(40 * scale_factor), # winsize
10, # iterations
5, # poly_n
1.1, # poly_sigma
cv2.OPTFLOW_FARNEBACK_GAUSSIAN)
#mag, ang = cv2.cartToPolar(flow[...,0], flow[...,1])
#mags.append(mag)
flows.append(flow)
im0 = im1
flow = (np.mean(flows, 0) / 100).clip(-1, 1)
#flow = np.mean(flows, 0)
#flow /= (flow.mean() * 5 + 1e-5)
#flow = flow.clip(-1, 1)
#flows = flows / (np.mean(flows, 0, keepdims=True) + 1e-5)
x = middle_frame[..., ::-1].astype(np.float32) / 255
outputs.append((x, flow))
return outputs
def writeOpticalFlow(path,filename,w,h,c):
count=0
try:
cap = cv2.VideoCapture(path+'/'+filename)
ret, frame1 = cap.read()
if frame1==None:
return count
frame1 = cv2.resize(frame1, (w,h))
prvs = cv2.cvtColor(frame1,cv2.COLOR_BGR2GRAY)
if not os.path.isdir("../dataset/of_images"): os.mkdir("of_images")
folder = '../dataset/of_images'+'/'+filename+'/'
dir = os.path.dirname(folder)
os.mkdir(dir)
while(1):
ret, frame2 = cap.read()
if frame2 is None:
break
count+=1
if count%5==0:
print (filename+':' +str(c)+'-'+str(count))
frame2 = cv2.resize(frame2, (w,h))
next = cv2.cvtColor(frame2,cv2.COLOR_BGR2GRAY)
flow = cv2.calcOpticalFlowFarneback(prvs,next, None, 0.5, 3, 15, 3, 5, 1.2, 0)
horz = cv2.normalize(flow[...,0], None, 0, 255, cv2.NORM_MINMAX)
vert = cv2.normalize(flow[...,1], None, 0, 255, cv2.NORM_MINMAX)
horz = horz.astype('uint8')
vert = vert.astype('uint8')
cv2.imwrite(folder+'h'+str(count)+'_'+filename+'.jpg',horz,[int(cv2.IMWRITE_JPEG_QUALITY), 90])
cv2.imwrite(folder+'v'+str(count)+'_'+filename+'.jpg',vert,[int(cv2.IMWRITE_JPEG_QUALITY), 90])
prvs = next
cap.release()
cv2.destroyAllWindows()
return count
except Exception as e:
return count
def writeOpticalFlow(path,filename,w,h,c):
count=0
try:
cap = cv2.VideoCapture(path+'/'+filename)
ret, frame1 = cap.read()
if frame1==None:
return count
frame1 = cv2.resize(frame1, (w,h))
prvs = cv2.cvtColor(frame1,cv2.COLOR_BGR2GRAY)
if not os.path.isdir("../dataset/of_images"): os.mkdir("of_images")
folder = '../dataset/of_images'+'/'+filename+'/'
dir = os.path.dirname(folder)
os.mkdir(dir)
while(1):
ret, frame2 = cap.read()
if frame2 is None:
break
count+=1
if count%5==0:
print (filename+':' +str(c)+'-'+str(count))
frame2 = cv2.resize(frame2, (w,h))
next = cv2.cvtColor(frame2,cv2.COLOR_BGR2GRAY)
flow = cv2.calcOpticalFlowFarneback(prvs,next, None, 0.5, 3, 15, 3, 5, 1.2, 0)
horz = cv2.normalize(flow[...,0], None, 0, 255, cv2.NORM_MINMAX)
vert = cv2.normalize(flow[...,1], None, 0, 255, cv2.NORM_MINMAX)
horz = horz.astype('uint8')
vert = vert.astype('uint8')
cv2.imwrite(folder+'h'+str(count)+'_'+filename+'.jpg',horz,[int(cv2.IMWRITE_JPEG_QUALITY), 90])
cv2.imwrite(folder+'v'+str(count)+'_'+filename+'.jpg',vert,[int(cv2.IMWRITE_JPEG_QUALITY), 90])
prvs = next
cap.release()
cv2.destroyAllWindows()
return count
except Exception as e:
return count
optical_flow_prep.py 文件源码
项目:Video-Classification-2-Stream-CNN
作者: wadhwasahil
项目源码
文件源码
阅读 23
收藏 0
点赞 0
评论 0
def writeOpticalFlow(path,filename,w,h,c):
count=0
try:
cap = cv2.VideoCapture(path+'/'+filename)
ret, frame1 = cap.read()
if frame1==None:
return count
frame1 = cv2.resize(frame1, (w,h))
prvs = cv2.cvtColor(frame1,cv2.COLOR_BGR2GRAY)
folder = './of_images'+'/'+filename+'/'
dir = os.path.dirname(folder)
os.mkdir(dir)
while(1):
ret, frame2 = cap.read()
if frame2==None:
break
count+=1
if count%5==0:
print (filename+':' +str(c)+'-'+str(count))
frame2 = cv2.resize(frame2, (w,h))
next = cv2.cvtColor(frame2,cv2.COLOR_BGR2GRAY)
flow = cv2.calcOpticalFlowFarneback(prvs,next, None, 0.5, 3, 15, 3, 5, 1.2, 0)
horz = cv2.normalize(flow[...,0], None, 0, 255, cv2.NORM_MINMAX)
vert = cv2.normalize(flow[...,1], None, 0, 255, cv2.NORM_MINMAX)
horz = horz.astype('uint8')
vert = vert.astype('uint8')
cv2.imwrite(folder+'h'+str(count)+'_'+filename+'.jpg',horz,[int(cv2.IMWRITE_JPEG_QUALITY), 90])
cv2.imwrite(folder+'v'+str(count)+'_'+filename+'.jpg',vert,[int(cv2.IMWRITE_JPEG_QUALITY), 90])
prvs = next
cap.release()
cv2.destroyAllWindows()
return count
except Exception,e:
print e
return count