In [32]:
import requests
import time
def send_slack(channel, username, icon_emoji, message):
#change base_url your slack url
base_url = "https://hooks.slack.com/services/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"
payload = {
"channel": channel,
"username": username,
"icon_emoji": icon_emoji,
"text": message
}
response = requests.post(base_url, data=json.dumps(payload))
print(response.content)
def slack(function):
def wrapper(*args, **kwargs):
name = function.__name__
start_time = time.time()
current_time = str(datetime.datetime.now())
send_slack("dss", "databot", ":ghost:",
"Start {name} - {time}".format(name=name, time=current_time)
)
result = function(*args, **kwargs)
current_time = str(datetime.datetime.now())
end_time = time.time()
send_slack("dss", "databot", ":ghost:",
"End {name} - total time {time}s ".format(name=name, time=int(end_time-start_time))
)
return wrapper
In [33]:
import PIL
from PIL import Image
import pandas as pd
import numpy as np
import os
import pickle
import shutil
class Image_processing():
def __init__(self):
self.path = os.curdir
self.img_size = [640, 480]
self.img_cut = [10, 110, 10, 80]
self.img_flatten = (self.img_cut[3]-self.img_cut[2]) * (self.img_cut[1]-self.img_cut[0])
def __image_processing(self, path):
"""
grayscale & resize & cut image
"""
img = Image.open(path)
resize_img = img.resize(self.img_size, PIL.Image.ANTIALIAS)
gray_img = resize_img.convert("L", dither=1)
if self.img_cut:
cut = self.img_cut
cut_img = np.array(gray_img)[cut[2]:cut[3],cut[0]:cut[1]]
return cut_img.ravel()
return np.array(gray_img.ravel())
def __make_train_dataframe(self, class_number):
"""
make train_image dataframe
"""
path = os.path.join(self.path, "imgs", "train")
data = list(map(self.__image_processing,
[os.path.join(path, "c"+str(class_number), i)
for i in os.listdir(os.path.join(path, "c"+str(class_number)))]
)
)
df = pd.DataFrame(data, columns=["X"+str(i)
for i in range(self.img_flatten)])
df["Y"] = class_number
return df
def __make_test_dataframe(self, number):
"""
make test_image dataframe
"""
path = os.path.join(self.path, "imgs", "test")
data = list(map(self.__image_processing,
[os.path.join(path, i)
for i in os.listdir(path)[5000*number:5000*(number+1)]
]
)
)
df = pd.DataFrame(data, columns=["X"+str(i)
for i in range(self.img_flatten)])
return df
def __check(self, path, size, cut):
"""
check folder list & image size
"""
if "imgs" not in os.listdir(self.path):
try:
print("Unpacking...")
shutil.unpack_archive("imgs.zip", "imgs", "zip")
print("Unpack Complete!")
except:
print("You can download 'imgs.zip' in \n\
https://www.kaggle.com/c/state-farm-distracted-driver-detection/data")
raise FileNotFoundError
if "data" not in os.listdir(self.path):
"Make 'data' directory"
os.mkdir("data")
if path:
self.path = path
if size:
self.size = size
if cut:
self.img_cut = cut
size = self.img_flatten
return size
@slack
def make_train_data(self, save=True, path="", size=[], cut=[]):
"""
All data save as path/data/, default path is current directory
If you want to change save directory, change self.path value
save: train_dataframe to pickle, default True
path: defalut path is current directory
size: resize image, default = [120,90]
cut: cut img, default = [10,110,10,80]
"""
size = self.__check(path, size, cut)
answer = input("If pickle file in folder, do you want to use this file? (Y/n)")
train_df = pd.DataFrame(columns = ["X"+str(i) for i in range(size)] + ["Y"])
print("Start working...")
for i in range(10):
file_name = "train_class_" + str(i) + ".pickle"
if (file_name in os.listdir(os.path.join(self.path, "data"))) & (answer.upper() == "Y"):
train_class_df = pd.read_pickle(os.path.join(self.path, "data", file_name))
else:
train_class_df = self.__make_train_dataframe(i)
train_class_df.to_pickle(os.path.join(self.path, "data", file_name))
train_df = pd.concat([train_df, train_class_df])
print("{0} / {1} Complete".format(i+1, 10))
train_df.reset_index(drop=True, inplace=True)
if save:
train_df.to_pickle(os.path.join(self.path, "data", "train_df.pickle"))
print("All Complete!")
return train_df
def load_train_data(self):
"""
load train dataframe
"""
return pd.read_pickle(os.path.join(self.path, "data", "train_df.pickle"))
@slack
def make_test_data(self, save=True, path="", size=[], cut=[]):
"""
All data save as path/data/, default path is current directory
If you want to change save directory, change self.path value
save: test_dataframe to pickle, default True
path: defalut path is current directory
size: resize image, default = [120,90]
cut: cut img, default = [10,110,10,80]
"""
size = self.__check(path, size, cut)
answer = input("If pickle file in folder, do you want to use this file? (Y/n)")
test_df = pd.DataFrame(columns = ["X"+str(i) for i in range(size)] + ["Y"])
for i in range(16):
file_name = "test_" + str(i) + ".pickle"
if (file_name in os.listdir(os.path.join(self.path, "data"))) & (answer.upper() == "Y"):
test_small_df = pd.read_pickle(os.path.join(self.path, "data", file_name))
else:
test_small_df = self.__make_test_dataframe(i)
test_small_df.to_pickle(os.path.join(self.path, "data", file_name))
test_df = pd.concat([test_df, test_small_df])
print("{0} / {1} Complete".format(i+1, 16))
test_df.reset_index(drop=True, inplace=True)
if save:
test_df.to_pickle(os.path.join(self.path, "data", "test_df.pickle"))
print("All Complete!")
return test_df
def load_test_data(self):
"""
load test dataframe
"""
return pd.read_pickle(os.path.join(self.path, "data", "test_df.pickle"))
In [30]:
process = Image_processing()
In [26]:
process.make_train_data()
Out[26]:
In [31]:
process.make_test_data()
In [ ]: