In [70]:
import cv2
def _hist(img, hmax, x,y,h,w,type):
hist, bins = np.histogram(img[y:y + h, x:x + w], bins='auto')
max1 = np.amax(bins)
alpha = hmax / float(max1)
corrected = np.asarray(np.where(img <= max1, np.multiply(alpha, img), hmax), type)
return corrected
def _max(img, hmax,mask,x,y,h,w,type):
imgcp = np.copy(img)
cv2.rectangle(mask, (x, y), (x + w, y + h), (255, 255, 255), -1)
mask_binary = mask[:, :, 0]
retval, mask_binary = cv2.threshold(mask_binary, 254, 255, cv2.THRESH_BINARY)
_, masked = apply_mask(imgcp, mask_binary, 'black', 0, debug=None)
max1 = np.amax(masked)
alpha = hmax / float(max1)
corrected = np.asarray(np.where(img <= max1, np.multiply(alpha, img), hmax), type)
return corrected
def white_balance(device, img, mode='hist',debug=None, roi=None):
"""Corrects the exposure of an image based on its histogram.
Inputs:
device = pipeline step counter
img = An RGB image on which to perform the correction, correction is done on each channel and then reassembled,
alternatively a single channel can be input but is not recommended.
mode = 'hist or 'max'
debug = None, print, or plot. Print = save to file, Plot = print to screen.
roi = A list of 4 points (x, y, width, height) that form the rectangular ROI of the white color standard.
If a list of 4 points is not given, whole image will be used.
Returns:
device = pipeline step counter
img = Image after exposure correction
:param device: int
:param img: ndarray
:param debug: str
:param roi: list
"""
device += 1
ori_img = np.copy(img)
if roi is not None:
roiint = all(isinstance(item, int) for item in roi)
if len(roi) != 4 | roiint is False:
fatal_error('If ROI is used ROI must have 4 elements as a list and all must be integers')
else:
pass
if len(np.shape(img)) == 3:
iy, ix, iz = np.shape(img)
hmax=255
type = np.uint8
else:
iy, ix = np.shape(img)
if img.dtype == 'uint8':
hmax=255
type=np.uint8
elif img.dtype == 'uint16':
hmax=65536
type=np.uint16
mask = np.zeros((iy, ix, 3), dtype=np.uint8)
if roi is None:
x = 0
y = 0
w = ix
h = iy
else:
x = roi[0]
y = roi[1]
w = roi[2]
h = roi[3]
if len(np.shape(img)) == 3:
cv2.rectangle(ori_img, (x, y), (x + w, y + h), (0, 255, 0), 3)
c1 = img[:, :, 0]
c2 = img[:, :, 1]
c3 = img[:, :, 2]
if mode == 'hist':
channel1 = _hist(c1, hmax, x, y, h, w, type)
channel2 = _hist(c2, hmax, x, y, h, w, type)
channel3 = _hist(c3, hmax, x, y, h, w, type)
else:
channel1 = _max(c1, hmax, mask, x, y, h, w, type)
channel2 = _max(c2, hmax, mask, x, y, h, w, type)
channel3 = _max(c3, hmax, mask, x, y, h, w, type)
finalcorrected = np.dstack((channel1, channel2, channel3))
else:
cv2.rectangle(ori_img, (x, y), (x + w, y + h), (255, 255, 255), 3)
if mode == 'hist':
finalcorrected = _hist(img, hmax, x, y, h, w, type)
elif mode == 'max':
finalcorrected = _max(img, hmax, mask, x, y, h, w, type)
return device, finalcorrected
In [113]:
def chalk_print(text):
''' This text is overwritten by the following print statement'''
print(text)
from IPython.display import clear_output
clear_output(wait=True)
chalk_print(3)
chalk_print(4)
In [ ]:
# Minkowski style metrics
# euclidean
# manhattan
# chebyshev
# minkowski
# Miscellaneous spatial metrics
# canberra
# braycurtis
# haversine
# Normalized spatial metrics
# mahalanobis
# wminkowski
# seuclidean
# Angular and correlation metrics
# cosine
# correlation
# Metrics for binary data
# hamming
# jaccard
# dice
# russelrao
# kulsinski
# rogerstanimoto
# sokalmichener
# sokalsneath
# yule
#pip install umap-learn
fit = umap.UMAP(
n_neighbors=n_neighbors, # n-neighbors to consider( like t-SNe)
min_dist=min_dist, # min_dist between pts in embedding (small for local detail, large for global)
n_components=n_components,# n embedding components
metric=metric # distance metric to use
)
u = fit.fit_transform(data)
In [ ]:
from functools import reduce
doubled_list = map(lambda x: x * 2, old_list)
summation = reduce(lambda x, y: x + y, numbers)
In [ ]:
list_comp = [x ** 2 for x in range(10) if x % 2 == 0]
gen_exp = (x ** 2 for x in range(10) if x % 2 == 0) # Uses less memory, computed on the fly
Github Markdown
bold text
Italic text
strikethrough
quote
long quote here...
sudo apt update
sudo apt upgrade
List
Numbered List
Nested Lists
To-do List
In [101]:
sys.version_info >= (3, 5)
Out[101]:
In [109]:
#pip install line-profiler
%load_ext line_profiler
def sub_function(a,b):
print('in sub-function')
a = a+1
b=b**a
return a*b
def full_function(listy):
print('in some large function')
return sub_function(*listy)
# %lprun -f sub_function full_function(**params)
%lprun -f sub_function full_function([1,2])
In [111]:
# Static Typing
from typing import Dict, List
# def function_name(parameter1: type) -> return_type:
def f(a: int, b: int) -> Dict:
return {a: b}
# Type aliases
T = Union[int, float, complex]
# Allow multiple typles
def g(a: T) -> float:
return float(a)
In [1]:
%time
# Map in parallel
from multiprocessing.pool import ThreadPool
n_cores = 4
strings_list = ['a', 'be', 'cee']
pool = ThreadPool(n_cores)
results = pool.map(len, strings_list)
# close the pool and wait for the work to finish
pool.close()
pool.join()
print(results)
In [2]:
%time
import concurrent.futures
with concurrent.futures.ProcessPoolExecutor() as executor:
print(list(executor.map(len, strings_list)))
In [7]:
import multiprocessing
from itertools import product
def merge_names(a, b):
return '{} & {}'.format(a, b)
names = ['Brown', 'Wilson', 'Bartlett', 'Rivera', 'Molloy', 'Opie']
with multiprocessing.Pool(processes=3) as pool:
results = pool.starmap(merge_names, product(names, repeat=2))
print(results)
In [ ]:
from tqdm import tqdm
from concurrent.futures import ProcessPoolExecutor, as_completed
def parallel_process(function, array, n_jobs=16, use_kwargs=False, front_num=0):
"""
A parallel version of the map function with a progress bar.
Args:
function (function): A python function to apply to the elements of array
array (array-like): An array to iterate over.
n_jobs (int, default=16): The number of cores to use
use_kwargs (boolean, default=False): Whether to consider the elements of array as dictionaries of
keyword arguments to function
front_num (int, default=3): The number of iterations to run serially before kicking off the parallel job.
Useful for catching bugs
Returns:
[function(array[0]), function(array[1]), ...]
"""
#We run the first few iterations serially to catch bugs
if front_num > 0:
front = [function(**a) if use_kwargs else function(a) for a in array[:front_num]]
else:
front = []
#If we set n_jobs to 1, just run a list comprehension. This is useful for benchmarking and debugging.
if n_jobs==1:
return front + [function(**a) if use_kwargs else function(a) for a in tqdm(array[front_num:])]
#Assemble the workers
with ProcessPoolExecutor(max_workers=n_jobs) as pool:
#Pass the elements of array into function
if use_kwargs:
futures = [pool.submit(function, **a) for a in array[front_num:]]
else:
futures = [pool.submit(function, a) for a in array[front_num:]]
kwargs = {
'total': len(futures),
'unit': 'it',
'unit_scale': True,
'leave': True
}
#Print out the progress as tasks complete
for f in tqdm(as_completed(futures), **kwargs):
pass
out = []
#Get the results from the futures.
for i, future in tqdm(enumerate(futures)):
try:
out.append(future.result())
except Exception as e:
out.append(e)
return front + out
In [ ]:
# Filemagic
# pip install filemagic
import magic
with magic.Magic() as m:
m.id_filename('setup.py')
In [ ]:
# Coloured Strings
# pip install crayons
# red, green, yellow, blue, black, magenta, cyan, and white
import crayons
print(crayons.red('WARNING!'))
print(f'Username: {crayons.green("Goliath")}')
In [131]:
from subprocess import call
call(["mkdir", 'test_dir'])
Out[131]:
In [130]:
Out[130]:
In [133]:
from itertools import permutations, combinations, combinations_with_replacement
from itertools import product
for item in permutations('WXYZ', 2):
print(item)
print()
for item in combinations('WXYZ', 2):
print(item)
print()
for item in combinations_with_replacement('WXYZ', 2):
print(item)
print()
arrays = [(-1,1), (-3,3), (-5,5)]
for item in product(*arrays):
print(item)
In [ ]:
# Log in to email account.
smtpObj = smtplib.SMTP('smtp.gmail.com', 587)
smtpObj.ehlo()
smtpObj.starttls()
smtpObj.login('my_email_address@gmail.com ', 'password')
body = 'Subject: %s dues unpaid.\n <Body>'
target_email = 'god@wishfulthinking.com'
sendmailStatus = smtpObj.sendmail('my_email_address@gmail.com',
target_email,
body)
smtpObj.quit()
In [ ]:
#Simple OCR
# install tesseract
# instal pyocr
from PIL import Image
import sys
import pyocr
import pyocr.builders
import glob
import yaml
from numba import jit
import concurrent.futures
tools = pyocr.get_available_tools()
if len(tools) == 0:
print("No OCR tool found")
sys.exit(1)
# The tools are returned in the recommended order of usage
tool = tools[0]
print("Will use tool '%s'" % (tool.get_name()))
# Ex: Will use tool 'libtesseract'
langs = tool.get_available_languages()
print("Available languages: %s" % ", ".join(langs))
lang = langs[0]
print("Will use lang '%s'" % (lang))
@jit
def get_txt(fpath):
return tool.image_to_string(
Image.open(fpath),
lang=lang,
builder=pyocr.builders.TextBuilder()
)
with concurrent.futures.ProcessPoolExecutor() as executor:
results = list(executor.map(get_txt, glob.iglob('/home/giladamar/Desktop/Conf leads/*.PNG')))
In [ ]:
#White balance:
import cv2
def _hist(img, hmax, x,y,h,w,type):
hist, bins = np.histogram(img[y:y + h, x:x + w], bins='auto')
max1 = np.amax(bins)
alpha = hmax / float(max1)
corrected = np.asarray(np.where(img <= max1, np.multiply(alpha, img), hmax), type)
return corrected
def _max(img, hmax,mask,x,y,h,w,type):
imgcp = np.copy(img)
cv2.rectangle(mask, (x, y), (x + w, y + h), (255, 255, 255), -1)
mask_binary = mask[:, :, 0]
retval, mask_binary = cv2.threshold(mask_binary, 254, 255, cv2.THRESH_BINARY)
_, masked = apply_mask(imgcp, mask_binary, 'black', 0, debug=None)
max1 = np.amax(masked)
alpha = hmax / float(max1)
corrected = np.asarray(np.where(img <= max1, np.multiply(alpha, img), hmax), type)
return corrected
def white_balance(device, img, mode='hist',debug=None, roi=None):
"""Corrects the exposure of an image based on its histogram.
Inputs:
device = pipeline step counter
img = An RGB image on which to perform the correction, correction is done on each channel and then reassembled,
alternatively a single channel can be input but is not recommended.
mode = 'hist or 'max'
debug = None, print, or plot. Print = save to file, Plot = print to screen.
roi = A list of 4 points (x, y, width, height) that form the rectangular ROI of the white color standard.
If a list of 4 points is not given, whole image will be used.
Returns:
device = pipeline step counter
img = Image after exposure correction
:param device: int
:param img: ndarray
:param debug: str
:param roi: list
"""
device += 1
ori_img = np.copy(img)
if roi is not None:
roiint = all(isinstance(item, int) for item in roi)
if len(roi) != 4 | roiint is False:
fatal_error('If ROI is used ROI must have 4 elements as a list and all must be integers')
else:
pass
if len(np.shape(img)) == 3:
iy, ix, iz = np.shape(img)
hmax=255
type = np.uint8
else:
iy, ix = np.shape(img)
if img.dtype == 'uint8':
hmax=255
type=np.uint8
elif img.dtype == 'uint16':
hmax=65536
type=np.uint16
mask = np.zeros((iy, ix, 3), dtype=np.uint8)
if roi is None:
x, y, w, h = 0, 0, ix ,iy
else:
x, y, w, h = roi
if len(np.shape(img)) == 3:
cv2.rectangle(ori_img, (x, y), (x + w, y + h), (0, 255, 0), 3)
c1 = img[:, :, 0]
c2 = img[:, :, 1]
c3 = img[:, :, 2]
if mode == 'hist':
channel1 = _hist(c1, hmax, x, y, h, w, type)
channel2 = _hist(c2, hmax, x, y, h, w, type)
channel3 = _hist(c3, hmax, x, y, h, w, type)
else:
channel1 = _max(c1, hmax, mask, x, y, h, w, type)
channel2 = _max(c2, hmax, mask, x, y, h, w, type)
channel3 = _max(c3, hmax, mask, x, y, h, w, type)
finalcorrected = np.dstack((channel1, channel2, channel3))
else:
cv2.rectangle(ori_img, (x, y), (x + w, y + h), (255, 255, 255), 3)
if mode == 'hist':
finalcorrected = _hist(img, hmax, x, y, h, w, type)
elif mode == 'max':
finalcorrected = _max(img, hmax, mask, x, y, h, w, type)
return device, finalcorrected