593 lines
19 KiB
Python
593 lines
19 KiB
Python
#!/usr/bin/python3
|
|
|
|
# Path tools
|
|
import pathlib
|
|
import glob
|
|
import datetime
|
|
import copy
|
|
|
|
# File IO
|
|
from io import open
|
|
import os
|
|
import json
|
|
|
|
# OpenCV
|
|
import numpy as np
|
|
import cv2
|
|
from sklearn.metrics import confusion_matrix, matthews_corrcoef
|
|
from sklearn.preprocessing import (
|
|
MinMaxScaler,
|
|
StandardScaler,
|
|
RobustScaler,
|
|
MaxAbsScaler,
|
|
)
|
|
import joblib
|
|
|
|
# GUI
|
|
import pygubu
|
|
import matplotlib.pyplot as plt
|
|
import seaborn as sns
|
|
|
|
# Helpers
|
|
from helpers.statistics import imgStats
|
|
from helpers.logger import CVSuiteLogger, C_INFO, C_DBUG, C_WARN, C_DONE
|
|
from helpers.canvas import CVSuiteCanvas
|
|
from helpers.sift import getSiftData
|
|
from helpers.tags import Tree
|
|
|
|
# Tests
|
|
from helpers.test.knn import CVSuiteTestKNN
|
|
from helpers.test.decision_tree import (
|
|
CVSuiteTestDecisionTree,
|
|
CVSuiteTestRandomForest,
|
|
CVSuiteTestExtraTrees,
|
|
)
|
|
|
|
## UI config load
|
|
PROJECT_PATH = pathlib.Path(__file__).parent
|
|
PROJECT_UI = "./src/helpers/gui/main.ui"
|
|
TITLE = "Tree Recogniser 7000"
|
|
|
|
## Config file load
|
|
CONFIG_PATH = "./src/config/config.json"
|
|
config_file = open(CONFIG_PATH, encoding="utf-8")
|
|
config_json = json.load(config_file)
|
|
|
|
print(
|
|
f"""
|
|
Welcome to CVSuite aka {TITLE}!
|
|
Powered by ARNweb.nl and TomSelier.com
|
|
"""
|
|
)
|
|
|
|
|
|
## UI class setup
|
|
class CVSuite:
|
|
def __init__(self, master=None):
|
|
### UI setup ###
|
|
# Pygubu builder
|
|
self.builder = builder = pygubu.Builder()
|
|
builder.add_resource_path(PROJECT_PATH)
|
|
builder.add_from_file(PROJECT_UI)
|
|
|
|
# Main window
|
|
self.mainwindow = builder.get_object("main", master)
|
|
|
|
# Canvas for output images
|
|
self.canvas = CVSuiteCanvas(builder.get_object("output_canvas"))
|
|
|
|
# Log file
|
|
self.log = CVSuiteLogger(config_json["out"]["log"])
|
|
|
|
# Keep track of images in dataset
|
|
self.img_current = 0
|
|
self.img_name = ""
|
|
self.img_old = -1 ## minus 1 to enforce full update on start
|
|
self.img_max = 0
|
|
|
|
# Plots
|
|
self.axs = self.createPlot(2, 2)
|
|
self.axs_cm = None
|
|
|
|
# UI Variables
|
|
self.canny_thr1 = None
|
|
self.canny_thr2 = None
|
|
self.img_path = None
|
|
self.contrast = None
|
|
|
|
self.img_size = None
|
|
self.img_size_old = 0 ## Check if the rendering size has changed, if it has the analysis has to be run
|
|
|
|
self.sobel_select = None
|
|
self.export_id = None
|
|
self.brightness = None
|
|
builder.import_variables(
|
|
self,
|
|
[
|
|
"canny_thr1",
|
|
"canny_thr2",
|
|
"img_path",
|
|
"contrast",
|
|
"img_size",
|
|
"sobel_select",
|
|
"export_id",
|
|
"brightness",
|
|
],
|
|
)
|
|
builder.connect_callbacks(self)
|
|
|
|
# Load values from config after UI has been initialised
|
|
self.img_path.set(config_json["path"])
|
|
self.img_size.set(config_json["size"])
|
|
|
|
### End of UI setup ###
|
|
|
|
### Model tests setup ###
|
|
# Attempt to load scaler
|
|
if config_json["scaler"] != "":
|
|
self.scaler = joblib.load(config_json["scaler"])
|
|
else:
|
|
self.scaler = None
|
|
|
|
self.models = []
|
|
if self.scaler is not None:
|
|
for model in config_json["models"]:
|
|
if config_json["models"][model] != "":
|
|
print(C_INFO, f"Loading model {model}")
|
|
mpath = config_json["models"][model]
|
|
if model == "knn":
|
|
# Tuple with name, class instance and array of guesses for confusion matrix
|
|
self.models.append(("KNN", CVSuiteTestKNN(mpath), []))
|
|
elif model == "dectree":
|
|
self.models.append(
|
|
("Decision Tree", CVSuiteTestDecisionTree(mpath), [])
|
|
)
|
|
elif model == "randforest":
|
|
self.models.append(
|
|
("Random Forest", CVSuiteTestRandomForest(mpath), [])
|
|
)
|
|
elif model == "extratree":
|
|
self.models.append(("Extra tree", CVSuiteTestExtraTrees(mpath), []))
|
|
else:
|
|
print(
|
|
C_WARN, f"Model {model} does not exist or is not supported!"
|
|
)
|
|
else:
|
|
print(C_WARN, f"Model {model} path not configured!")
|
|
print(C_DONE, f"{len(self.models)} models loaded!")
|
|
|
|
print(C_DONE, "CVSuite initialised!\n")
|
|
### End of model tests setup ###
|
|
|
|
def on_quit(self, event=None):
|
|
"""
|
|
Close PLT windows on main app quit
|
|
"""
|
|
if plt is not None:
|
|
plt.close() # Close graph vies
|
|
self.log.file.close() # Close log files
|
|
|
|
self.mainwindow.quit() # Close main
|
|
|
|
def run(self):
|
|
"""
|
|
Run loop
|
|
"""
|
|
self.mainwindow.mainloop()
|
|
|
|
def imgCtl(self, widget_id):
|
|
"""
|
|
Bunch a functions to switch between images in the given dataset
|
|
"""
|
|
cmd = widget_id.split("_")
|
|
|
|
# Determine detection based on widget id
|
|
if cmd[0] == "next":
|
|
cdir = 1
|
|
elif cmd[0] == "prev":
|
|
cdir = -1
|
|
|
|
# Get name of current img
|
|
start = copy.deepcopy(
|
|
self.img_name.split("_")[0]
|
|
) # deepcopy cus snaky boi language likes to create pointers
|
|
inext = start
|
|
|
|
while start == inext:
|
|
# Check for limits
|
|
self.img_current += cdir
|
|
if self.img_current == self.img_max:
|
|
self.img_current = 0
|
|
elif self.img_current == -1:
|
|
self.img_current = self.img_max - 1
|
|
|
|
if cmd[1] == "img":
|
|
break # Stop if only one image should be skipped
|
|
elif cmd[1] == "tree":
|
|
self.updatePath()
|
|
inext = copy.deepcopy(self.img_name.split("_")[0])
|
|
|
|
# Update UI
|
|
self.update(self)
|
|
|
|
def imgExport(self, event=None, path=config_json["out"]["img"]):
|
|
"""
|
|
Export given preprocess id to file
|
|
"""
|
|
iid = self.export_id.get()
|
|
self.canvas.export(iid, self.img_name.split("_")[0], path)
|
|
|
|
def imgCycle(self, widget_id):
|
|
"""
|
|
GUI Button callback
|
|
Cycle through all images in the data set
|
|
"""
|
|
if widget_id == "export":
|
|
export = True
|
|
elif widget_id == "analyse":
|
|
export = False
|
|
|
|
img_id = self.export_id.get()
|
|
img_current = copy.deepcopy(self.img_current)
|
|
|
|
if export:
|
|
now = datetime.datetime.now()
|
|
path = pathlib.Path(
|
|
config_json["out"]["img"],
|
|
f"{self.canvas.tags[img_id]}-all-{now.strftime('%Y-%m-%dT%H.%M.%S')}/",
|
|
)
|
|
os.mkdir(path)
|
|
|
|
while True:
|
|
if export:
|
|
self.imgExport(path=path)
|
|
|
|
self.imgCtl("next_img")
|
|
if self.img_current == img_current:
|
|
break
|
|
|
|
## Ensure display is always correct with image
|
|
self.update()
|
|
|
|
def createPlot(self, columns, rows):
|
|
fig, axs = plt.subplots(columns, rows, num=100)
|
|
return axs
|
|
|
|
def drawHist(self, image, labels, column, row):
|
|
self.axs[column, row].clear()
|
|
for i, lab in enumerate(labels):
|
|
hist = cv2.calcHist(
|
|
[image],
|
|
[i],
|
|
None,
|
|
[256],
|
|
[0, 256],
|
|
)
|
|
self.axs[column, row].plot(hist, label=lab)
|
|
self.axs[column, row].grid()
|
|
self.axs[column, row].legend()
|
|
|
|
def drawCannyHM(self, img, column, row):
|
|
self.axs[column, row].clear()
|
|
canny_max = 500
|
|
canny_step = 20
|
|
|
|
results = [[] for x in range((int)(canny_max / canny_step))]
|
|
|
|
for th1 in range(0, canny_max, canny_step):
|
|
for th2 in range(0, canny_max, canny_step):
|
|
y_ind = (int)(th1 / canny_step)
|
|
x_ind = (int)(th2 / canny_step)
|
|
|
|
# Canny Edge Detection
|
|
edges = cv2.Canny(image=img, threshold1=th1, threshold2=th2)
|
|
w_res = cv2.countNonZero(edges)
|
|
results[y_ind].append(w_res)
|
|
|
|
# print(f"Result at thres {th1}, {th2}; \tIndex {y_ind}, {x_ind} \t= {w_res}")
|
|
# print(results[y_ind])
|
|
|
|
func = np.diag(results)
|
|
diff = np.diff(func)
|
|
area = sum(func)
|
|
|
|
self.axs[column, row - 1].clear()
|
|
self.axs[column, row - 1].title.set_text(f"Area: {area}")
|
|
self.axs[column, row - 1].plot(func)
|
|
self.axs[column, row - 1].plot(diff)
|
|
|
|
self.axs[column, row].title.set_text(
|
|
f"Mean: {np.matrix(results).mean()}\nStd: {np.matrix(results).std()}"
|
|
)
|
|
self.axs[column, row].imshow(results)
|
|
self.axs[column, row].xaxis.set_major_formatter(
|
|
lambda x, pos: str(x * canny_step)
|
|
)
|
|
self.axs[column, row].yaxis.set_major_formatter(
|
|
lambda x, pos: str(x * canny_step)
|
|
)
|
|
|
|
self.log.add("Canny Mean", func.mean())
|
|
self.log.add("Canny Std", func.std())
|
|
self.log.add("Canny Min", func.min())
|
|
self.log.add("Canny Max x", np.where(func == func.max())[0][0])
|
|
self.log.add("Canny Max y", func.max())
|
|
self.log.add("Canny Diff max y", diff.max())
|
|
self.log.add("Canny Diff min x", np.where(diff == diff.min())[0][0])
|
|
self.log.add("Canny Diff min y", diff.min())
|
|
self.log.add("Canny Area", area)
|
|
|
|
def writeStats(self, img, labels, column, row):
|
|
mean, std = imgStats(img)
|
|
self.axs[column, row].title.set_text(
|
|
"Mean: %c:%d %c:%d %c:%d \nStd: %c:%d %c:%d %c:%d"
|
|
% (
|
|
labels[0],
|
|
mean[0],
|
|
labels[1],
|
|
mean[1],
|
|
labels[2],
|
|
mean[2],
|
|
labels[0],
|
|
std[0],
|
|
labels[1],
|
|
std[1],
|
|
labels[2],
|
|
std[2],
|
|
)
|
|
)
|
|
|
|
for idx, label in enumerate(labels):
|
|
self.log.add(f"Mean {label}", mean[idx])
|
|
self.log.add(f"Std {label}", std[idx])
|
|
|
|
def runTest(self, data, event=None):
|
|
# Don't run the test if there's no scaler
|
|
if self.scaler is None:
|
|
return
|
|
|
|
output = self.builder.get_object("testdata")
|
|
output.configure(state="normal")
|
|
output.delete(1.0, "end")
|
|
|
|
# Remove tag and photoId
|
|
tag = data.pop(0)
|
|
photoId = data.pop(1)
|
|
|
|
# Add actual name
|
|
output.insert("end", f"Actual type:\n\t{tag.upper()}\n")
|
|
|
|
# Normalise data using loaded scalers
|
|
for idx, value in enumerate(data):
|
|
d = np.array(value)
|
|
data[idx] = self.scaler[idx].transform(d.astype(np.float32).reshape(1, -1))[0][0]
|
|
|
|
data = np.array([data], dtype=np.float32)
|
|
|
|
for name, ins, guesses in self.models:
|
|
output.insert("end", f"{name} Result:\n")
|
|
|
|
# Predict result using model instance
|
|
result = ins.predict(data)
|
|
|
|
# Prediciton result should be an array
|
|
for idx, value in enumerate(result):
|
|
if idx == 0:
|
|
guesses.append([Tree[tag.upper()].value, value])
|
|
output.insert("end", f" [{idx + 1}]\t{Tree(value).name}\n")
|
|
|
|
print(C_DBUG, f"Guesses for {name}:", guesses)
|
|
|
|
output.configure(state="disabled")
|
|
|
|
def drawConfusionMatrix(self, event=None):
|
|
if self.axs_cm is not None:
|
|
for ays in self.axs_cm:
|
|
for graph in ays:
|
|
graph.remove()
|
|
|
|
fig, axs = plt.subplots(2, 2, num=101)
|
|
self.axs_cm = axs
|
|
|
|
for idx, ays in enumerate(axs):
|
|
for idy, graph in enumerate(ays):
|
|
# Get guesses for current model
|
|
modelnr = (idx * 2) + idy
|
|
guesses = self.models[modelnr][2]
|
|
|
|
# Get accuracy
|
|
guess_total = 0
|
|
guess_ok = 0
|
|
for guess in guesses:
|
|
guess_total += 1
|
|
if guess[0] == guess[1]:
|
|
guess_ok += 1
|
|
|
|
# Convert guess array
|
|
tag_true = [guess[0] for guess in guesses ]
|
|
tag_predict = [guess[1] for guess in guesses ]
|
|
|
|
# calculate weighted average
|
|
data_len_class = [tag_true.count(tag) for tag in range(0, 8)]
|
|
data_len = len(tag_true)
|
|
data_class_weight = [data_len_class[tag] / data_len for tag in range(0, 8)]
|
|
|
|
tag_weighted = []
|
|
for tag in tag_true:
|
|
tag_weighted.append(data_class_weight[tag])
|
|
|
|
print(C_DBUG, f"Data length per class: {data_len_class}; Total: {data_len}")
|
|
|
|
# Get MCC
|
|
mcc = matthews_corrcoef(tag_true, tag_predict, sample_weight=tag_weighted)
|
|
|
|
labels = [Tree(tag).name for tag in range(0, 8)]
|
|
|
|
cm = confusion_matrix(tag_true, tag_predict)
|
|
cmn = cmn = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
|
|
sns.heatmap(cmn, xticklabels=labels, yticklabels=labels, ax=graph, annot=True, cbar=False, fmt='.2f')
|
|
|
|
graph.set_title(f"{self.models[modelnr][0]}; MCC: {mcc:.2f}; Acc: {((guess_ok / guess_total) * 100):.2f}%" )
|
|
graph.set_xlabel("Predicted")
|
|
graph.set_ylabel("Actual")
|
|
graph.set_xticklabels(labels, rotation=0)
|
|
graph.set_yticklabels(labels, rotation=0)
|
|
# exit()
|
|
|
|
def updatePath(self):
|
|
"""
|
|
Only update image name and path
|
|
"""
|
|
path = self.img_path.get()
|
|
|
|
if path != None and path != "":
|
|
# Get all images at current path
|
|
images = []
|
|
for file in glob.glob(path + "/*.jpg"):
|
|
images.append(file)
|
|
|
|
for file in glob.glob(path + "/*.png"):
|
|
images.append(file)
|
|
|
|
self.img_max = len(images)
|
|
self.img_name = os.path.split(images[self.img_current])[1]
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
def update(self, event=None, part_update=None):
|
|
## Check if hist and canny hm have to be rerendered
|
|
if part_update is None:
|
|
## If partial update has not been forced, check if full update is required
|
|
if self.img_current != self.img_old or self.img_size != self.img_size_old:
|
|
part_update = False
|
|
self.img_old = self.img_current
|
|
self.img_size_old = self.img_size
|
|
else:
|
|
part_update = True
|
|
else:
|
|
if part_update == True:
|
|
print("Partial update forced!")
|
|
else:
|
|
print("Full update forced!")
|
|
|
|
if self.updatePath():
|
|
print(C_INFO, f"Processing {self.img_name}")
|
|
|
|
self.mainwindow.title(f"{TITLE} - {self.img_name}")
|
|
self.log.add("Tree", self.img_name.split("_")[0])
|
|
self.log.add("ID", self.img_name.split("_")[1].split(".")[0])
|
|
|
|
# Get all user vars
|
|
ct1 = self.canny_thr1.get()
|
|
ct2 = self.canny_thr2.get()
|
|
sxy = self.sobel_select.get()
|
|
size = self.img_size.get()
|
|
contrast = self.contrast.get()
|
|
bright = self.brightness.get()
|
|
|
|
# Clear output
|
|
self.canvas.clear()
|
|
|
|
# Import and resize image
|
|
# img = cv2.imread(images[self.img_current])
|
|
img = cv2.imread(os.path.join(self.img_path.get(), self.img_name))
|
|
img = cv2.resize(img, (size, size), interpolation=cv2.INTER_AREA)
|
|
self.canvas.add(img, "Original")
|
|
|
|
# Set grayscale
|
|
img_gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
|
|
self.canvas.add(img_gray, "Grayscale")
|
|
|
|
# Contrast / brightness boost
|
|
contrast_val = contrast / 100
|
|
bright_val = bright / 100
|
|
img_contrast = np.clip(
|
|
contrast_val * (img_gray + bright_val), 0, 255
|
|
).astype(np.uint8)
|
|
# self.canvas.add(img_contrast, F"Contrast / Brightness\n c+{contrast_val} b+{bright_val}")
|
|
self.canvas.add(img_contrast, f"BCG")
|
|
|
|
# Blurred edition
|
|
img_blur = cv2.GaussianBlur(img_gray, (3, 3), 0)
|
|
self.canvas.add(img_blur, "Blurred_k3")
|
|
|
|
# Sobel edge
|
|
if sxy in ["x", "y", "both"]:
|
|
if sxy == "x":
|
|
dx = 1
|
|
dy = 0
|
|
elif sxy == "y":
|
|
dx = 0
|
|
dy = 1
|
|
elif sxy == "both":
|
|
dx = 1
|
|
dy = 1
|
|
|
|
img_sobel = cv2.Sobel(
|
|
src=img_blur, ddepth=cv2.CV_8U, dx=dx, dy=dy, ksize=5
|
|
)
|
|
else:
|
|
img_sobel = img_gray
|
|
|
|
self.canvas.add(img_sobel, "Sobel_edge")
|
|
# self.log.add("Sobel nonzero", cv2.countNonZero(img_sobel))
|
|
|
|
# Canny edge
|
|
img_canny = cv2.Canny(image=img_blur, threshold1=ct1, threshold2=ct2)
|
|
self.canvas.add(img_canny, "Canny_edge")
|
|
|
|
# BGR
|
|
self.canvas.add(img[:, :, 0], "BGR_B")
|
|
self.canvas.add(img[:, :, 1], "BGR_G")
|
|
self.canvas.add(img[:, :, 2], "BGR_R")
|
|
|
|
if img is not None:
|
|
self.drawHist(img, ("B", "G", "R"), 0, 0)
|
|
self.writeStats(img, ("B", "G", "R"), 0, 0)
|
|
|
|
# HSV
|
|
img_hsv = cv2.cvtColor(img, cv2.COLOR_BGR2HSV)
|
|
self.canvas.add(img_hsv, "HSV")
|
|
self.canvas.add(img_hsv[:, :, 0], "HSV_H") # H
|
|
self.canvas.add(img_hsv[:, :, 1], "HSV_S") # S
|
|
self.canvas.add(img_hsv[:, :, 2], "HSV_V") # V
|
|
|
|
if not part_update:
|
|
if img_hsv is not None:
|
|
self.drawHist(img_hsv, ("H", "S", "V"), 0, 1)
|
|
self.writeStats(img_hsv, ("H", "S", "V"), 0, 1)
|
|
|
|
# Canny Heatmap
|
|
if not part_update:
|
|
self.drawCannyHM(img, 1, 1)
|
|
|
|
# SIFT
|
|
siftData = getSiftData(img)
|
|
self.log.add("SIFT total magnitude", siftData[0])
|
|
self.log.add("SIFT maximum magnitude", siftData[1])
|
|
self.log.add("SIFT average magnitude", siftData[2])
|
|
self.log.add("SIFT std magnitude", siftData[3])
|
|
self.log.add("SIFT counts", siftData[4])
|
|
self.log.add("SIFT total response", siftData[5])
|
|
self.log.add("SIFT average response", siftData[6])
|
|
|
|
# Write results to CSV file
|
|
if not part_update:
|
|
if self.models != []:
|
|
self.runTest(self.log.data)
|
|
self.drawConfusionMatrix()
|
|
|
|
self.log.update()
|
|
else:
|
|
self.log.clear() # Prevent partial updates from breaking log
|
|
|
|
# Show all data
|
|
plt.show(block=False) ## Graphs
|
|
self.canvas.draw(size) ## Images
|
|
|
|
|
|
if __name__ == "__main__":
|
|
app = CVSuite()
|
|
app.run()
|