EV5_Beeldherk_Bomen/src/helpers/test/knn.py

119 lines
3.9 KiB
Python

import cv2 as cv
import numpy as np
import csv
from sklearn.preprocessing import MinMaxScaler, StandardScaler, RobustScaler, MaxAbsScaler
import argparse
from enum import Enum
import yaml
import joblib
import datetime
import os
from ..logger import C_DBUG
from ..tags import Tree
parser = argparse.ArgumentParser(prog='KNN Train CLI')
parser.add_argument('-i', '--input', help='Input CSV file', required=True)
parser.add_argument('-o', '--output', help='Output model file', required=True)
class CVSuiteTestKNN:
def __init__(self, model = None):
self.scale = []
if model is None:
self.knn = cv.ml.KNearest_create()
self.trained = False
else:
self.knn = cv.ml.KNearest_load(model)
self.trained = True
def trainCSV(self, path, output):
'''
Takes preprocessed data from CVSuite, normalises it and trains the model
Output should be a folder path
Function expects first two columns of the dataset to be tag and photoId, the first row should be the CSV header
'''
file = open(path, mode='r')
data = list(csv.reader(file, delimiter=","))
file.close()
header = data.pop(0)
print("CSV tags: ", header)
# Get classifier tags
tags_int = []
for row in data:
tree = row.pop(0)
photoId = row.pop(1)
id = Tree[tree.upper()]
# print("Tree name =", tree, " id =", id.value)
tags_int.append(id.value)
# Make into numpy array cus OpenCV is dumb af
tags_len = len(tags_int)
tags_int = np.array(tags_int, dtype=np.int32)
# Transform array for normalisation
data = np.array(data, dtype=np.float32)
for idx, col in enumerate(data[0]):
# Get column from data
column = data[:, idx]
# Shape it to 2 dimentional
column = np.array(column).reshape(-1, 1)
# Perform Min - Max scaling
# scaler = MinMaxScaler()
scaler = MaxAbsScaler()
# Perform standard scaling
self.scale.append(scaler.fit(column))
column = scaler.fit_transform(column)
# Reshape it back cus scaler is dumb af
column = np.array(column).reshape(len(column))
# DEBUG Print resulting column
# print("NORM", header[idx + 1], "\n", column)
# Replace original data array
data[:, idx] = column
# Dump the scalers
now = datetime.datetime.now()
joblib.dump(self.scale, os.path.join(output, F"scale_{now.strftime('%Y-%m-%dT%H.%M.%S')}.pkl"))
# Pass data to train function
self.train(data, tags_int, output)
def train(self, data, tags, output):
'''
Data should be normalised before being passed to this function
This function should not be run from within the suite
'''
if self.trained:
raise EnvironmentError("Model already trained!")
else:
print(data)
print(data.shape)
self.knn.train(data, cv.ml.ROW_SAMPLE, tags)
# Save it
now = datetime.datetime.now()
self.knn.save(os.path.join(output, F"model_knn_{now.strftime('%Y-%m-%dT%H.%M.%S')}.yaml"))
def predict(self, data, nr = 3):
ret, results, neighbours ,dist = self.knn.findNearest(data, nr)
print(C_DBUG, "KNN Raw:")
print("\t\tresult: \t{}".format(results) )
print("\t\tneighbours:\t{}".format(neighbours) )
print("\t\tdistance:\t{}".format(dist) )
return neighbours[0]
if __name__ == "__main__":
args = parser.parse_args()
test = CVSuiteTestKNN()
test.trainCSV(args.input, args.output)