281 KiB
281 KiB
In [88]:
import pickle
import pandas as pd
from sklearn import tree
model = pickle.load(open("data/temp_density_tree.model.sav", "rb"))
features = (
pd.read_csv("data/density_train.csv", sep=";", decimal=",")
.drop(["T"], axis=1)
.columns.values.tolist()
)
rules = tree.export_text(model, feature_names=features)
print(rules)
In [89]:
from src.rules import get_rules
rules = get_rules(model, features)
display(len(rules))
rules
Out[89]:
In [90]:
from src.rules import normalise_rules
rules = normalise_rules(rules)
display(len(rules))
rules
Out[90]:
In [91]:
from src.rules import delete_same_rules
rules = delete_same_rules(rules)
display(len(rules))
rules
Out[91]:
In [92]:
density_train = pd.read_csv("data/density_train.csv", sep=";", decimal=",")
density_test = pd.read_csv("data/density_test.csv", sep=";", decimal=",")
display(density_train.head(3))
display(density_test.head(3))
In [93]:
from src.rules import simplify_rules
rules = simplify_rules(density_train, rules)
display(len(rules))
rules
Out[93]:
In [107]:
import numpy as np
from skfuzzy import control as ctrl
import skfuzzy as fuzz
al = ctrl.Antecedent(np.arange(0, 0.3, 0.005), "al")
ti = ctrl.Antecedent(np.arange(0, 0.3, 0.005), "ti")
density = ctrl.Antecedent(np.arange(1.03, 1.22, 0.00001), "density")
temp = ctrl.Consequent(density_train["T"].sort_values().unique(), "temp")
# temp = ctrl.Consequent(np.arange(20, 70, 5), "temp")
al.automf(3, variable_type="quant")
al.view()
ti.automf(3, variable_type="quant")
ti.view()
density.automf(3, variable_type="quant")
density.view()
temp.automf(3, variable_type="quant")
temp.view()
In [108]:
# from skfuzzy.control.fuzzyvariable import FuzzyVariable
# from skfuzzy.control.rule import Rule as FuzzyRule
# from skfuzzy.control.term import Term
# from typing import List, Tuple
# from functools import reduce
# from operator import and_
# from src.rules import RuleAtom
# def gfa(
# fuzzy_variable: FuzzyVariable, value: float
# ) -> Tuple[Term, float]:
# values = {}
# for term in fuzzy_variable.terms:
# mval = np.interp(value, fuzzy_variable.universe, fuzzy_variable[term].mf)
# values[term] = mval
# best_value = sorted(values.items(), key=lambda x: x[1], reverse=True)[0]
# return (fuzzy_variable[best_value[0]], best_value[1])
# def dsfr(
# rules: List[Tuple[List[RuleAtom], Term, float]]
# ) -> List[Tuple[List[RuleAtom], Term, float]]:
# same_rules: List[int] = []
# for rule1_index, rule1 in enumerate(rules):
# for rule2_index, rule2 in enumerate(rules):
# if rule1_index >= rule2_index:
# continue
# # Remove the same rules
# if str(rule1[0]) == str(rule2[0]) and str(rule1[1]) == str(rule2[1]):
# same_rules.append(rule1_index)
# break
# # If antecedents is equals, but consequents is not equals then
# # Remove rule with the higher antecedent weight
# if str(rule1[0]) == str(rule2[0]) and str(rule1[2]) <= str(rule2[2]):
# same_rules.append(rule2_index)
# break
# if str(rule1[0]) == str(rule2[0]) and str(rule1[2]) > str(rule2[2]):
# same_rules.append(rule1_index)
# break
# return [rule for index, rule in enumerate(rules) if index not in same_rules]
# display(rules)
# fuzzy_variables = {"Al2O3": al, "TiO2": ti, "Density": density, "consequent": temp}
# fuzzy_rules: List[Tuple[List[RuleAtom], Term, float]] = []
# for rule in rules:
# tmp_rule = []
# for atom in rule.get_antecedent():
# if fuzzy_variables.get(atom.get_varaible(), None) is None:
# continue
# variable = gfa(fuzzy_variables[atom.get_varaible()], atom.get_value())
# tmp_rule.append(variable)
# # if tmp_rule.get(variable[0].parent, None) is None:
# # tmp_rule[variable[0].parent] = variable
# # else:
# # if tmp_rule[variable[0].parent][1] > variable[1]:
# # tmp_rule[variable[0].parent] = variable
# consequent = gfa(
# fuzzy_variables["consequent"], rule.get_consequent()
# )[0]
# fuzzy_rules.append(
# (
# # FuzzyRule(reduce(and_, [atom[0] for atom in antecedent]), consequent),
# [atom[0] for atom in tmp_rule],
# consequent,
# sum([atom[1] for atom in tmp_rule]),
# )
# )
# fuzzy_rules = dsfr(fuzzy_rules)
# frules = [FuzzyRule(reduce(and_, item[0]), item[1]) for item in fuzzy_rules]
# display(len(frules))
# display(frules)
# # fuzzy_cntrl = ctrl.ControlSystem(frules)
# # sim = ctrl.ControlSystemSimulation(fuzzy_cntrl, lenient=False)
In [109]:
from src.rules import get_fuzzy_rules
fuzzy_variables = {"Al2O3": al, "TiO2": ti, "Density": density, "consequent": temp}
fuzzy_rules = get_fuzzy_rules(rules, fuzzy_variables)
fuzzy_cntrl = ctrl.ControlSystem(fuzzy_rules)
sim = ctrl.ControlSystemSimulation(fuzzy_cntrl, lenient=False)
display(len(fuzzy_rules))
fuzzy_rules
Out[109]:
In [110]:
# sim.input["al"] = 0.0
# sim.input["ti"] = 0.0
# sim.input["density"] = 1.05979
sim.input["al"] = 0.0
sim.input["ti"] = 0.0
sim.input["density"] = 1.0569013636039
sim.compute()
sim.print_state()
display(sim.output["temp"])
temp.view(sim=sim)
In [111]:
from sklearn import metrics
import math
def fuzzy_pred(row):
sim.input["al"] = row["Al2O3"]
sim.input["ti"] = row["TiO2"]
sim.input["density"] = row["Density"]
sim.compute()
return sim.output["temp"]
def rmse(row):
return math.sqrt(metrics.mean_squared_error([row["Real"]], [row["Inferred"]]))
result_train = density_train.copy()
result_train["Real"] = result_train["T"]
result_train["Inferred"] = result_train.apply(fuzzy_pred, axis=1)
result_train["RMSE"] = result_train.apply(rmse, axis=1)
result_test = result_test.round({"RMSE": 3})
result_train.head(15)
Out[111]:
In [112]:
result_test = density_test.copy()
result_test["Real"] = result_test["T"]
result_test["Inferred"] = result_test.apply(fuzzy_pred, axis=1)
result_test["RMSE"] = result_test.apply(rmse, axis=1)
# result_test["RMSE"] = result_test["RMSE"].apply(lambda x: "{:,.4f}".format(x))
result_test = result_test.round({"RMSE": 3})
result_test
Out[112]:
In [113]:
rmetrics = {}
rmetrics["RMSE_train"] = math.sqrt(
metrics.mean_squared_error(result_train["Real"], result_train["Inferred"])
)
rmetrics["RMSE_test"] = math.sqrt(
metrics.mean_squared_error(result_test["Real"], result_test["Inferred"])
)
rmetrics["RMAE_test"] = math.sqrt(
metrics.mean_absolute_error(result_test["Real"], result_test["Inferred"])
)
rmetrics["R2_test"] = metrics.r2_score(result_test["Real"], result_test["Inferred"])
rmetrics
Out[113]: