In [ ]:
import os
import math
import glob
import cv2
from collections import deque
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.image as mpimg
from moviepy.editor import VideoFileClip
%matplotlib inline
In [ ]:
class cam_util():
"""
util class for camera operations
"""
ret = None
mtx = None
dist = None
rvecs = None
tvecs = None
# Arrays to store object points and image points from all the images.
objpoints = [] # 3d points in real world space
imgpoints = [] # 2d points in image plane.
def gen_camera_points(self):
"""
generate objpoints and impoints from calibration images
"""
objp = np.zeros((6*9,3), np.float32)
objp[:,:2] = np.mgrid[0:9,0:6].T.reshape(-1,2)
# Make a list of calibration images
images = glob.glob('camera_cal/calibration*.jpg')
# Step through the list and search for chessboard corners
for fname in images:
img = cv2.imread(fname)
gray = cv2.cvtColor(img,cv2.COLOR_BGR2GRAY)
# Find the chessboard corners
ret, corners = cv2.findChessboardCorners(gray, (9,6),None)
# If found, add object points, image points
if ret == True:
self.objpoints.append(objp)
self.imgpoints.append(corners)
def undistort(self, img):
"""
undistort an image with camera matrix
"""
if self.mtx is None:
self.ret, self.mtx, self.dist, self.rvecs, self.tvecs = cv2.calibrateCamera(self.objpoints, self.imgpoints,
img.shape[:2],None,None)
h, w = img.shape[:2]
newcameramtx, roi=cv2.getOptimalNewCameraMatrix(self.mtx, self.dist, (w,h), 1, (w,h))
dst = cv2.undistort(img, self.mtx, self.dist, None, newcameramtx)
x,y,w,h = roi
return dst[y:y+h, x:x+w]
def clean_mat(self):
"""
Reset camera calibration
"""
self.ret = None
self.mtx = None
self.dist = None
self.rvecs = None
self.tvecs = None
In [ ]:
class Line():
"""
class to store detected lane stats
"""
def __init__(self, maxSamples=15):
self.maxSamples = maxSamples
# x values of the last n fits of the line
self.recent_xfitted = deque(maxlen=self.maxSamples)
#polynomial coefficients for the most recent fit
self.current_fit = [np.array([False])]
#polynomial coefficients averaged over the last n iterations
self.best_fit = None
#difference in fit coefficients between last and new fits
self.diffs = np.array([0,0,0], dtype='float')
#average x values of the fitted line over the last n iterations
self.bestx = None
# was the line detected in the last iteration?
self.detected = False
#radius of curvature of the line in some units
self.radius_of_curvature = None
#distance in meters of vehicle center from the line
self.line_base_pos = None
def update_lane(self, ally, allx):
"""
Function to update the stats
"""
# get the mean as the best x
self.bestx = np.mean(allx, axis=0)
# fit a 2 order polynomial
new_fit = np.polyfit(ally, allx, 2)
# calculate the difference between last fit and new fit
self.diffs = np.subtract(self.current_fit, new_fit)
# update current fit
self.current_fit = new_fit
# add the new fit to the queue
self.recent_xfitted.append(self.current_fit)
# Use the queue mean as the best fit
self.best_fit = np.mean(self.recent_xfitted, axis=0)
# meters per pixel in y dimension
ym_per_pix = 30/720
# meters per pixel in x dimension
xm_per_pix = 3.7/700
# Calculate radius of curvature
fit_cr = np.polyfit(ally*ym_per_pix, allx*xm_per_pix, 2)
y_eval = np.max(ally)
self.radius_of_curvature = ((1 + (2*fit_cr[0]*y_eval*ym_per_pix + fit_cr[1])**2)**1.5) / np.absolute(2*fit_cr[0])
In [ ]:
# Utility Functions
def get_roi(img, vertices):
"""
Apply mask and get region of interest within the mask
"""
mask = np.zeros_like(img)
if len(img.shape) > 2:
channel_count = img.shape[2]
ignore_mask_color = (255,) * channel_count
else:
ignore_mask_color = 255
cv2.fillPoly(mask, vertices, ignore_mask_color)
masked_image = cv2.bitwise_and(img, mask)
return masked_image
def hide_roi(img, vertices):
"""
Apply mask and get region of interest outside the mask
"""
mask = np.zeros_like(img)
mask=mask+255
if len(img.shape) > 2:
channel_count = img.shape[2]
ignore_mask_color = (0,) * channel_count
else:
ignore_mask_color = 0
cv2.fillPoly(mask, vertices, ignore_mask_color)
masked_image = cv2.bitwise_and(img, mask)
return masked_image
def drow_on_images(img, vertices):
"""
Draw ploygon on image
"""
cv2.polylines(img, [vertices], True, (255,255,255), 2)
plot_img(img, 'img drawing', True)
def plot_img(img, step, show_stages=False):
"""
plot image
"""
if show_stages:
print('######################## '+step+' ########################')
plt.imshow(img, cmap='gray')
plt.show()
def plot_hist(histogram, show_stages=False):
"""
plot histogram
"""
if show_stages:
print('######################## histogram ########################')
plt.plot(histogram)
plt.show()
In [ ]:
def write_stats(img):
"""
Write lane stats on image
"""
font = cv2.FONT_HERSHEY_SIMPLEX
size = 1
weight = 2
color = (255,70,0)
cv2.putText(img,'Left Curve : '+ '{0:.2f}'.format(left_line.radius_of_curvature)+' m',(10,30), font, size, color, weight)
cv2.putText(img,'Right Curve : '+ '{0:.2f}'.format(right_line.radius_of_curvature)+' m',(10,60), font, size, color, weight)
cv2.putText(img,'Left Lane Pos: '+ '{0:.2f}'.format(left_line.bestx),(10,100), font, size, color, weight)
cv2.putText(img,'Right Lane Pos: '+ '{0:.2f}'.format(right_line.bestx),(10,130), font, size, color, weight)
cv2.putText(img,'Distance from center: '+ "{0:.2f}".format(left_line.line_base_pos)+' m',(10,180), font, size, color, weight)
def draw_lane(undist, img, Minv):
"""
Draw the detected lane bak on the image
"""
# Generate x and y values for plotting
ploty = np.linspace(300, 700)
# Create an image to draw the lines on
warp_zero = np.zeros_like(img).astype(np.uint8)
color_warp = np.dstack((warp_zero, warp_zero, warp_zero))
left_fit = left_line.best_fit
right_fit = right_line.best_fit
if left_fit is not None and right_fit is not None:
left_fitx = left_fit[0]*ploty**2 + left_fit[1]*ploty + left_fit[2]
# Recast the x and y points into usable format for cv2.fillPoly()
pts_left = np.array([np.transpose(np.vstack([left_fitx, ploty]))])
right_fitx = right_fit[0]*ploty**2 + right_fit[1]*ploty + right_fit[2]
pts_right = np.array([np.flipud(np.transpose(np.vstack([right_fitx, ploty])))])
pts = np.hstack((pts_left, pts_right))
# Draw the lane onto the warped blank image
cv2.fillPoly(color_warp, np.int_([pts]), (20,120, 80))
# Warp the blank back to original image space using inverse perspective matrix (Minv)
newwarp = cv2.warpPerspective(color_warp, Minv, (img.shape[1], img.shape[0]))
# Combine the result with the original image
result = cv2.addWeighted(undist, 1, newwarp, 0.3, 0)
write_stats(result)
return result
return undist
In [ ]:
def validate_Update_lane(img, nonzero, nonzerox, nonzeroy, left_lane_inds, right_lane_inds, show_stages=False):
"""
Validate the detected lane ids and update the lane stats if valid.
"""
# Extract left and right line pixel positions
left_line_allx = nonzerox[left_lane_inds]
left_line_ally = nonzeroy[left_lane_inds]
right_line_allx = nonzerox[right_lane_inds]
right_line_ally = nonzeroy[right_lane_inds]
# Discard the detections if any of the detected lane is less than 2000 pixals.
# This is done because for very small size the poly fit function gives unpredictable results.
# A better approch would be to use the largest lane curvature to extend the other one
if len(left_line_allx) <= 2000 or len(right_line_allx) <= 2000:
left_line.detected = False
right_line.detected = False
return
left_x_mean = np.mean(left_line_allx, axis=0)
right_x_mean = np.mean(right_line_allx, axis=0)
lane_width = np.subtract(right_x_mean, left_x_mean)
# Discard the detections if the lane with is too large or too small
if left_x_mean > 450 or right_x_mean < 850:
left_line.detected = False
right_line.detected = False
return
if lane_width < 300 or lane_width > 800:
left_line.detected = False
right_line.detected = False
return
# Update the lane stats if the current detection is the first one or
# the detection is within 100 pixals of the last n detection mean
if left_line.bestx is None or np.abs(np.subtract(left_line.bestx, np.mean(left_line_allx, axis=0))) < 100:
left_line.update_lane(left_line_ally, left_line_allx)
left_line.detected = True
else:
left_line.detected = False
if right_line.bestx is None or np.abs(np.subtract(right_line.bestx, np.mean(right_line_allx, axis=0))) < 100:
right_line.update_lane(right_line_ally, right_line_allx)
right_line.detected = True
else:
right_line.detected = False
# Calculate the distance of car from center of lane
lane_center = right_line.bestx - left_line.bestx
left_line.line_base_pos = ((img.shape[1]*0.5 - lane_center)*3.7)/700
right_line.line_base_pos = left_line.line_base_pos
In [ ]:
def window_search(img, nonzero, nonzerox, nonzeroy, show_stages=False):
"""
Perform a sliding window search to detect lane pixals.
"""
# Temp image to draw detections on
out_img = np.dstack((img, img, img))*255
# Calculate histogram
histogram = np.sum(img[img.shape[0]*.75:,:], axis=0)
plot_hist(histogram, show_stages)
# Take the midpoint and use the max on each side as starting point
midpoint = np.int(histogram.shape[0]/2)
leftx_base = np.argmax(histogram[0:midpoint])
rightx_base = np.argmax(histogram[midpoint:histogram.shape[0]]) + midpoint
# Choose the number of sliding windows
nwindows = 9
# Set height of windows
window_height = np.int(img.shape[0]/nwindows)
# Current positions to be updated for each window
leftx_current = leftx_base
rightx_current = rightx_base
# Set the width of the windows +/- margin
margin = 80
# Set minimum number of pixels found to recenter window
minpix = 30
# Create empty lists to receive left and right lane pixel indices
left_lane_inds = []
right_lane_inds = []
# Step through the windows one by one
for window in range(nwindows):
# Identify window boundaries in x and y (and right and left)
win_y_low = img.shape[0] - (window+1)*window_height
win_y_high = img.shape[0] - window*window_height
win_xleft_low = leftx_current - margin
win_xleft_high = leftx_current + margin
win_xright_low = rightx_current - margin
win_xright_high = rightx_current + margin
# Draw the windows on the visualization image
cv2.rectangle(out_img,(win_xleft_low,win_y_low),(win_xleft_high,win_y_high),(0,255,0), 2)
cv2.rectangle(out_img,(win_xright_low,win_y_low),(win_xright_high,win_y_high),(0,255,0), 2)
# Identify the nonzero pixels in x and y within the window
good_left_inds = ((nonzeroy >= win_y_low)
& (nonzeroy < win_y_high)
& (nonzerox >= win_xleft_low)
& (nonzerox < win_xleft_high)).nonzero()[0]
good_right_inds = ((nonzeroy >= win_y_low)
& (nonzeroy < win_y_high)
& (nonzerox >= win_xright_low)
& (nonzerox < win_xright_high)).nonzero()[0]
left_lane_inds.append(good_left_inds)
right_lane_inds.append(good_right_inds)
# If found > minpix pixels, recenter next window on their mean position
if len(good_left_inds) > minpix:
leftx_current = np.int(np.mean(nonzerox[good_left_inds]))
if len(good_right_inds) > minpix:
rightx_current = np.int(np.mean(nonzerox[good_right_inds]))
# Concatenate the arrays of indices
left_lane_inds = np.concatenate(left_lane_inds)
right_lane_inds = np.concatenate(right_lane_inds)
plot_img(out_img, 'sliding window marked', show_stages)
return left_lane_inds, right_lane_inds
In [ ]:
def find_lanes(img, show_stages=False):
"""
Lane finding wrapper function
"""
# Get the foreground pixals
nonzero = img.nonzero()
nonzeroy = np.array(nonzero[0])
nonzerox = np.array(nonzero[1])
# If the last detection was successful take the non zero pixals within the 30 pixal margin as the new detections
if left_line.detected and right_line.detected:
margin = 30
left_lane_inds = ((nonzerox > (left_line.current_fit[0]*(nonzeroy**2) + left_line.current_fit[1]*nonzeroy + left_line.current_fit[2] - margin))
& (nonzerox < (left_line.current_fit[0]*(nonzeroy**2) + left_line.current_fit[1]*nonzeroy + left_line.current_fit[2] + margin)))
right_lane_inds = ((nonzerox > (right_line.current_fit[0]*(nonzeroy**2) + right_line.current_fit[1]*nonzeroy + right_line.current_fit[2] - margin))
& (nonzerox < (right_line.current_fit[0]*(nonzeroy**2) + right_line.current_fit[1]*nonzeroy + right_line.current_fit[2] + margin)))
# Update the lane detections
validate_Update_lane(img, nonzero, nonzerox, nonzeroy, left_lane_inds, right_lane_inds)
# If first detection or the last detection was unsuccessful perform a sliding window search
else:
#print('doing window search')
left_lane_inds, right_lane_inds = window_search(img, nonzero, nonzerox, nonzeroy, show_stages)
# Update the lane detections
validate_Update_lane(img, nonzero, nonzerox, nonzeroy, left_lane_inds, right_lane_inds)
Use source points
Destinations points
Get perpective transform
In [ ]:
def warp(img):
"""
Warp the image to get birds eye view.
"""
img_shape = img.shape
bounding_top_right = [img_shape[1]*0.5 + 90,img_shape[0]*0.70]
bounding_btm_right = [img_shape[1]*0.5 + 450,img_shape[0]]
bounding_btm_left = [img_shape[1]*0.5 - 400,img_shape[0]]
bounding_top_left = [img_shape[1]*0.5 - 60,img_shape[0]*0.70]
# Select source points
pts1 = np.float32([bounding_top_right,bounding_btm_right,bounding_btm_left,bounding_top_left])
# Select destination points
pts2 = np.float32([[img_shape[1]*0.5 + 250,img_shape[0]*0.60],
[img_shape[1]*0.5 + 390,img_shape[0]],
[img_shape[1]*0.5 - 345,img_shape[0]],
[img_shape[1]*0.5 - 205,img_shape[0]*0.60]])
# Get Perspective Transform
M = cv2.getPerspectiveTransform(pts1, pts2)
# Get inverse Perspective Transform
Minv = cv2.getPerspectiveTransform(pts2, pts1)
# Apply warp transform on source image
dst = cv2.warpPerspective(img, M, (img.shape[1], img.shape[0]), flags=cv2.INTER_LINEAR)
return dst, Minv
In [ ]:
def rec_threshold(img, roi, t_min=140, t_max=255):
"""
Funtion to apply recursive threshold with increasing/decreasing boundries
based on the area of lane within a region of interest.
"""
binary = np.zeros_like(img)
binary[(img >= t_min) & (img <= t_max)] = 1
# retrun last val if the threshold levels reach minimum or maximum.
if t_min <= 40 or t_min >= 220:
return binary
binary_1 = get_roi(binary, roi)
#print(np.sum(binary_1.nonzero()))
if np.sum(binary_1.nonzero()) > 9800000:
binary = rec_threshold(img, roi, t_min+10)
elif np.sum(binary_1.nonzero()) < 100000:
binary = rec_threshold(img, roi, t_min-10)
return binary
def threshold(img, roi, show_stages=False):
"""
Apply threshold
"""
# Convert image the HSV
hsv = cv2.cvtColor(img, cv2.COLOR_RGB2HSV)
# Take v channel
v_channel = hsv[:,:,2]
plot_img(v_channel, 'v channel', show_stages)
# Apply threshold to find lane
v_binary = rec_threshold(v_channel, roi)
plot_img(v_binary, 'color threshold', show_stages)
# Convert image to grayscale
gray = cv2.cvtColor(img, cv2.COLOR_RGB2GRAY)
# Take the derivative in x
sobelx = cv2.Sobel(gray, cv2.CV_32F, 1, 0)
#sobelx = cv2.Sobel(sobelx, cv2.CV_32F, 0, 1) # Take the derivative
#plot_img(sobelx, show_stages)
# Absolute x derivative to
abs_sobelx = np.absolute(sobelx)
#accentuate lines away from horizontal
scaled_sobel = np.uint8(255*abs_sobelx/np.max(abs_sobelx))
#plot_img(sobelx, show_stages)
sxbinary = np.zeros_like(scaled_sobel)
# perform threshold
sxbinary[(scaled_sobel >= 100) & (scaled_sobel <= 255)] = 1
plot_img(sobelx, 'sobel', show_stages)
color_binary = np.dstack(( np.zeros_like(sxbinary), sxbinary, v_binary))
combined_binary = np.zeros_like(sxbinary)
# conbine color and sobel threshold
combined_binary[(v_binary == 1) | (sxbinary == 1)] = 1
plot_img(combined_binary, 'combined threshold', show_stages)
return combined_binary
In [ ]:
def process_image(image, show_stages=False):
"""
Wrapper function for all image processing
"""
# Undistort the image
undistorted = cam.undistort(image)
plot_img(undistorted, 'undistorted', show_stages)
# Apply perpective transform
img, Minv = warp(undistorted)
plot_img(img, 'warped', show_stages)
# Get points for region of interst
vertices = np.array([[(image.shape[1]*0.1,image.shape[0]-50),
(image.shape[1]*0.5-100,image.shape[0]*0.60),
(image.shape[1]*0.5+100,image.shape[0]*0.60),
(image.shape[1]*0.95,image.shape[0]-50)]],
dtype=np.int32)
# Apply threshold
img = threshold(img, vertices, show_stages)
vertices = np.array([[(200,img.shape[0]),
(200,0),
(1050,0),
(1050,img.shape[0])]], dtype=np.int32)
# Get roi
img = get_roi(img, vertices)
# Find Lanes
find_lanes(img, show_stages)
# Draw lanes on image
res = draw_lane(undistorted, img, Minv);
#plot_img(res, show_stages)
return res
In [ ]:
# init camera
cam = cam_util()
cam.gen_camera_points()
In [ ]:
# Undistort a sample calibration image
cal_dir = "camera_cal/"
cal_images = glob.glob(cal_dir+'*.jpg')
for cal_image in cal_images:
cimg = mpimg.imread(cal_image)
cimg_undistort = cam.undistort(cimg)
cv2.imwrite('output_images/undistort_'+cal_image.split('/')[1],cimg_undistort)
print('calibration done')
In [ ]:
# Clean camera matrix
cam.clean_mat()
In [ ]:
# Test on images
test_dir = "test_images/"
test_images = glob.glob(test_dir+'test*.jpg')
#test_images = glob.glob(test_dir+'straight_lines*.jpg')
#test_images = glob.glob(test_dir+'*.jpg')
for test_image in test_images:
left_line = Line()
right_line = Line()
image = mpimg.imread(test_image)
res = process_image(image, False)
#plot_img(res, True)
print('######################## Sample Stages ########################')
print()
# display stages for a sample image
left_line = Line()
right_line = Line()
image = mpimg.imread('test_images/test3.jpg')
plot_img(image, 'Initial', True)
res = process_image(image, True)
plot_img(res, 'Final', True)
In [ ]:
# Test on Videos
# Clean data for video
#"""
left_line = Line()
right_line = Line()
cam.clean_mat()
project_video_res = 'project_video_res.mp4'
clip1 = VideoFileClip("project_video.mp4")
project_video_clip = clip1.fl_image(process_image)
project_video_clip.write_videofile(project_video_res, audio=False)
#"""
# Clean data for video
#"""
left_line = Line()
right_line = Line()
cam.clean_mat()
challenge_video_res = 'challenge_video_res.mp4'
clip2 = VideoFileClip('challenge_video.mp4')
challenge_video_clip = clip2.fl_image(process_image)
challenge_video_clip.write_videofile(challenge_video_res, audio=False)
#"""
# Clean data for video
#"""
left_line = Line()
right_line = Line()
cam.clean_mat()
harder_challenge_video_res = 'harder_challenge_video_res.mp4'
clip2 = VideoFileClip('harder_challenge_video.mp4')
harder_challenge_video_clip = clip2.fl_image(process_image)
harder_challenge_video_clip.write_videofile(harder_challenge_video_res, audio=False)
#"""
In [ ]: