Programm has image processing pipeline that support both RGB images and BGR video input.
RGB image processing consists of next steps:
- image file reading. File expected to exist on disk
- RGB to BGR conversion
- common image processing pipeline
- processing result conversion (BGR to RGB)
- result image output
BGR video processing consists of next steps:
- video file capturing. File expected to exist on disk
- video frame processing loop:
* common image processing pipeline
* result frame output
- resources release
Image processing pipeline does:
- BGR to HSV conversion
- white and yellow colors filter, creates b/w image. Makes other colored objects black
- detection area filter. Creates new b/w image output of color filter result
- edge detection
- lines detection with help of Hough transform method (cv2.HoughLines tool)
- lanes detection with help of custom algorithm that does line groups detection and median line
calculation for each found group
- lane lines Polar coortinate to Cartesian coordinate conversion, drawing lane lines
- result image composition from initial frame and detected lane lines images
- result image returned to program for output/further operations
Colors detection on HSV image allows efficientely get rid of noise coused by shadows and road surface color artifacts. Further b/w image processing might save processor time.
As the next improvement I would extract configuration parametes into separate entity. And would use same instance of it for on-flight configuration/adjustment. It can become an interface for another system :)
Imports, compose full file path fuction
In [1]:
import matplotlib.pyplot as plt
import matplotlib.image as mpimg
import numpy as np
import cv2
import pandas
import os
def getPathFor(file_path):
current_directory = %pwd
path = os.path.join(current_directory, file_path)
print("About to open file: {}\n".format(path))
return path
Next class is responsible for filtering out lanes detection area:
In [2]:
class DetectionAreaFilter:
def __init__(self):
self._lower_yellow = np.array([20, 0, 170], dtype=np.uint8)
self._upper_yellow = np.array([55, 255, 255], dtype=np.uint8)
self._lower_white = np.array([0, 0, 220], dtype=np.uint8)
self._upper_white = np.array([255, 25, 255], dtype=np.uint8)
self._ignore_mask_color = 255
def getColorMask(self, hsv_image):
mask_yellow = cv2.inRange(hsv_image, self._lower_yellow, self._upper_yellow)
mask_white = cv2.inRange(hsv_image, self._lower_white, self._upper_white)
mask = cv2.add(mask_white, mask_yellow)
return mask
def applyDetectionArea(self, bw_image, width_adjustment=60, height_adjustment=65):
im_height = bw_image.shape[0]
im_half_height = im_height // 2
im_width = bw_image.shape[1]
im_half_width = im_width // 2
area_left_bottom = (0, im_height)
area_left_top = (im_half_width - width_adjustment, im_half_height + height_adjustment)
area_right_top = (im_half_width + width_adjustment, im_half_height + height_adjustment)
area_right_bottom = (im_width, im_height)
detection_area = [area_left_bottom, area_left_top, area_right_top, area_right_bottom]
vertices = np.array([detection_area], dtype=np.int32)
mask = np.zeros_like(bw_image)
cv2.fillPoly(mask, vertices, self._ignore_mask_color)
masked_image = cv2.bitwise_and(bw_image, mask)
return masked_image
Here is the result of getColorMask function, that turns all white and yellow objects into white ones and makes other colored objects black:
And the result of applyDetectionArea function, that creates new b/w image with applyed trapezium shaped mask out of given b/w image:
Then goes Canny edge detection:
In [3]:
def getEdges(image, low_threshold=50, high_threshold=150):
edges = cv2.Canny(image, low_threshold, high_threshold)
return edges
The result of Canny edge detection:
Lines detetection with help of Hough transform method
In [4]:
def getLaneLines(edges):
deg = np.pi/180
lines = cv2.HoughLines(edges, 1, 1*deg, 40)
if lines is None:
return np.array([])
points_array = list()
for line in lines:
for rho, theta in line:
points_array.append((rho, theta))
return np.array(points_array)
Result of getLaneLines transformed into lines
Then goes line grop detection, that accepts getLaneLines result as input parameter.
CoordinateSorter class does line groups detection and median line calculation for each found group. Influence it's input parameters on 'sort' function behavior could be described with help of Gherkin language:
CoordinateSorter(max_distance_delta, max_angle_delta, threshold)
Scenario: 'max_distance_delta' and 'max_angle_delta' parameters allow to control line group detection
Given: 5 lines have been given to sort
And: it is possible to create a chain 'chain_1' of lines line1, line2, line3
Where: distance between links is less (or equal) then (max_distance_delta, max_angle_delta)
And: it is possible to create a chain 'chain_2' of lines line4, line5
Where: distance between links is less (or equal) then (max_distance_delta, max_angle_delta)
And: distance between chain_1 and chain_2 edges is more than (max_distance_delta, max_angle_delta)
Then: chain_1 and chain_2 considered as two separate lines
Scenario: 'threshold' parameter allows to filter out noise lines
Given: threshold = 4, set of lines
When: sorter found 3 groups of lines
And: the first set of lines contains 10 lines, second - 5 lines
But: the third set of lines contains 3 lines
Then: the third considered as noise and will not be presented in sorting result
Resulting line is calculate as median of all lines in a group
In [5]:
class CoordinateSorter:
def __init__(self, max_distance_delta, max_angle_delta, threshold):
if max_angle_delta < 0:
raise ValueError("[max_angle_delta] must be positive number")
if max_angle_delta < 0:
raise ValueError("[max_angle_delta] must be positive number")
if threshold < 1 or type(threshold) != int:
raise ValueError("[threshold] expected to be integer greater then or equal to 1")
self._max_point_distance = (max_distance_delta, max_angle_delta)
self._min_points_amount = threshold
def _sortPointsByDistance(self, points_dict):
set_list = list()
for key, value in points_dict.items():
indexes_set = set()
set_list.append(indexes_set)
indexes_set.add(key)
for inner_key, inner_value in points_dict.items():
point_distance = abs(np.subtract(value, inner_value))
if point_distance[0] <= self._max_point_distance[0] \
and point_distance[1] <= self._max_point_distance[1]:
indexes_set.add(inner_key)
return set_list
def _splitOnGroups(self, set_list_source):
sorted_source = list(set_list_source)
sorted_source.sort(key=len, reverse=True)
extremums = list()
def find_extremums(ordered_list_of_set_items):
if len(ordered_list_of_set_items) == 0:
return
first_extremum = ordered_list_of_set_items[0]
items_for_further_sorting = list()
for dot_set in ordered_list_of_set_items:
if dot_set.issubset(first_extremum):
continue
else:
if len(first_extremum.intersection(dot_set)):
first_extremum = first_extremum.union(dot_set)
else:
items_for_further_sorting.append(dot_set)
extremums.append(first_extremum)
find_extremums(items_for_further_sorting)
find_extremums(sorted_source)
filtered_extremums = filter(lambda x: len(x) >= self._min_points_amount, extremums)
return filtered_extremums
@staticmethod
def _getMedian(source_dict, key_set):
point_array = [source_dict[item] for item in key_set]
data_frame = pandas.DataFrame(data=point_array, columns=["distance", "angle"])
return data_frame["distance"].median(), data_frame["angle"].median()
def sort(self, points_array):
if len(points_array) < self._min_points_amount:
return []
points_dictionary = dict()
for index, coordinates in enumerate(points_array):
points_dictionary[index] = (int(coordinates[0]), coordinates[1])
point_set_list = self._sortPointsByDistance(points_dictionary)
point_groups = self._splitOnGroups(point_set_list)
resulting_points = [self._getMedian(points_dictionary, point_group) for point_group in point_groups]
return resulting_points
Result of lines sorting function in Cartesian coordinate system:
Drawing of lines with needed length:
In [6]:
def convert(rho, theta, y_min, y_max):
def create_point(y):
x = (rho - y*np.sin(theta))/np.cos(theta)
return int(x), int(y)
d1 = create_point(y_max)
d2 = create_point(y_min)
return d1, d2
def drawLines(polar_coordinates_array, image, color, line_weight = 10):
y_max = image.shape[0]
y_min = int(y_max * 2 / 3)
lines = [convert(rho, theta, y_min, y_max) for rho, theta in polar_coordinates_array]
for d1, d2 in lines:
cv2.line(image, d1, d2, color, line_weight)
The result:
The pipeline itself:
In [7]:
class ImageProcessor:
def __init__(self, detection_area_filter, coordinate_sorter):
self._bgr_line_color = (0, 0, 255)
self._detection_area_filter = detection_area_filter
self._coordinate_sorter = coordinate_sorter
def processFrame(self, bgr_frame):
frame = cv2.cvtColor(bgr_frame, cv2.COLOR_BGR2HSV)
bw_color_mask = self._detection_area_filter.getColorMask(frame)
bw_area = self._detection_area_filter.applyDetectionArea(bw_color_mask)
bw_edges = getEdges(bw_area)
polar_lane_coordinates = getLaneLines(bw_edges)
average_polar_lane_coordinates = self._coordinate_sorter.sort(polar_lane_coordinates)
lines_image = np.zeros(bgr_frame.shape, dtype=np.uint8)
drawLines(average_polar_lane_coordinates, lines_image, self._bgr_line_color)
result_image = cv2.addWeighted(lines_image, 0.9, bgr_frame, 1, 0)
return result_image
def _convert_bw_2_color(self, bw_image):
return np.dstack((bw_image, bw_image, bw_image))
And the result of processFrame:
RGB image processing entry point
In [8]:
def showImage(file_path):
def convert(image):
return image[..., [2, 1, 0]]
image_path = getPathFor(file_path)
rgb_image = mpimg.imread(image_path)
bgr_frame = convert(rgb_image)
frame = img_processor.processFrame(bgr_frame)
rgb_frame = convert(frame)
plt.imshow(rgb_frame)
plt.show()
Video processing entry point
In [9]:
def playVideo(file_path):
video_path = getPathFor(file_path)
video = cv2.VideoCapture(video_path)
print("About to start video playback...")
while video.isOpened():
_, bgr_frame = video.read()
if not isinstance(bgr_frame, np.ndarray):
# workaround to handle end of video stream.
break
frame = img_processor.processFrame(bgr_frame)
cv2.imshow("output", frame)
key = cv2.waitKey(1) & 0xFF
# stop video on ESC key pressed
if key == 27:
break
print("Video has been closed successfully.")
video.release()
cv2.destroyAllWindows()
Constants with image/video pathes for testing, pipeline initialization
In [14]:
image1 = "input/test_images/solidWhiteCurve.jpg"
image2 = "input/test_images/solidWhiteRight.jpg"
image3 = "input/test_images/solidYellowCurve.jpg"
image4 = "input/test_images/solidYellowCurve2.jpg"
image5 = "input/test_images/solidYellowLeft.jpg"
image6 = "input/test_images/whiteCarLaneSwitch.jpg"
video1 = "input/test_videos/challenge.mp4"
video2 = "input/test_videos/solidYellowLeft.mp4"
video3 = "input/test_videos/solidWhiteRight.mp4"
detection_area_filter = DetectionAreaFilter()
max_distance_delta = 40 # max distance between lines (rho1 - rho2) in polar coordinate system
max_angle_delta = np.radians(4) # max angle between lines (theta1 - theta2) in polar coordinate system
threshold = 3 # min amount of lines in set filter
coordinate_sorter = CoordinateSorter(max_distance_delta, max_angle_delta, threshold)
img_processor = ImageProcessor(detection_area_filter, coordinate_sorter)
showImage(image4)
#playVideo(video1)