import enum import sys from functools import reduce from operator import and_ from typing import Dict, List, Tuple import numpy as np import pandas as pd from skfuzzy.control.fuzzyvariable import FuzzyVariable from skfuzzy.control.rule import Rule as FuzzyRule from skfuzzy.control.term import Term from sklearn.tree._tree import TREE_UNDEFINED # type: ignore class ComparisonType(enum.Enum): LESS = "<=" GREATER = ">" EQUALS = "=" class RuleAtom: def __init__(self, variable: str, type: ComparisonType, value: float) -> None: self._variable = variable self._type = type self._value = value def get_varaible(self) -> str: return self._variable def get_type(self) -> ComparisonType: return self._type def get_value(self) -> float: return self._value def __repr__(self) -> str: return f"({self._variable} {self._type.value} {np.round(self._value, 3)})" def __eq__(self, other: object) -> bool: if id(self) == id(other): return True if not isinstance(other, RuleAtom): return False return ( self._variable == other._variable and self._type == other._type and self._value == other._value ) class Rule: def __init__(self, antecedent: List[RuleAtom], consequent: float) -> None: self._antecedent = antecedent self._consequent = consequent def get_antecedent(self) -> List[RuleAtom]: return self._antecedent def set_antecedent(self, antecedent: List[RuleAtom]): self._antecedent = [] self._antecedent.extend(antecedent) def get_consequent(self) -> float: return self._consequent def set_consequent(self, value: float): self._consequent = value def __repr__(self) -> str: return f"if {" and ".join([str(atom) for atom in self._antecedent])} -> {np.round(self._consequent, 3)}" # https://mljar.com/blog/extract-rules-decision-tree/ def get_rules(tree, feature_names, classes=None) -> List[Rule]: tree_ = tree.tree_ feature_name = [ feature_names[i] if i != TREE_UNDEFINED else "undefined!" for i in tree_.feature ] rules: List[Rule] = [] antecedent: List[RuleAtom] = [] def recurse(node, antecedent, rules): if tree_.feature[node] != TREE_UNDEFINED: name = feature_name[node] threshold = tree_.threshold[node] p1, p2 = list(antecedent), list(antecedent) p1.append(RuleAtom(name, ComparisonType.LESS, threshold)) recurse(tree_.children_left[node], p1, rules) p2.append(RuleAtom(name, ComparisonType.GREATER, threshold)) recurse(tree_.children_right[node], p2, rules) else: if classes is None: rules.append(Rule(antecedent, tree_.value[node][0][0])) else: value = np.argmax(tree_.value[node][0]) rules.append(Rule(antecedent, classes[value])) recurse(0, antecedent, rules) # sort by values values = [rule.get_consequent() for rule in rules] sorted_index = list(np.argpartition(values, 1)) rules = [rules[i] for i in sorted_index] return rules # from # if (Al2O3 <= 0.175) and (TiO2 <= 0.175) and (T > 32.5) and (TiO2 <= 0.025) # and (Al2O3 <= 0.025) and (T > 55.0) and (T > 62.5) # to # if (Al2O3 <= 0.175) and (TiO2 <= 0.175) and (T > 32.5) # max(<=) # min(>) def normalise_rules(rules: List[Rule]) -> List[Rule]: for rule in rules: dict: Dict[str, Dict[ComparisonType, float]] = {} new_antecedent: List[RuleAtom] = [] for atom in rule.get_antecedent(): old_value: float | None = dict.get(atom.get_varaible(), {}).get( atom.get_type(), None ) new_value = 0 if atom.get_type() == ComparisonType.GREATER: new_value = min( old_value if old_value is not None else sys.maxsize, atom.get_value(), ) if atom.get_type() == ComparisonType.LESS: new_value = max( old_value if old_value is not None else -sys.maxsize - 1, atom.get_value(), ) if dict.get(atom.get_varaible(), None) is None: dict[atom.get_varaible()] = {} dict[atom.get_varaible()][atom.get_type()] = new_value for key_var, other in dict.items(): for key_type, value in other.items(): new_antecedent.append(RuleAtom(key_var, key_type, value)) rule.set_antecedent(new_antecedent) return rules def _is_same_rules(rule1: Rule, rule2: Rule) -> bool: antecedent1 = rule1.get_antecedent() antecedent2 = rule2.get_antecedent() if len(antecedent1) != len(antecedent2): return False match: int = len([atom for atom in antecedent1 if atom not in antecedent2]) return match == 0 def _get_rules_accum(rules: List[Rule]) -> Dict[str, Dict[str, float]]: accum_dict: Dict[str, Dict[str, float]] = {} for rule in rules: key = str(rule.get_antecedent()) if accum_dict.get(key, None) is None: accum_dict[key] = {} cv = accum_dict[key].get("V", 0) cv += rule.get_consequent() cc = accum_dict[key].get("C", 0) cc += 1 accum_dict[key]["V"] = cv accum_dict[key]["C"] = cc return accum_dict def _recalculate_consequents( accum_dict: Dict[str, Dict[str, float]], rules: List[Rule] ) -> List[Rule]: for rule in rules: key: str = str(rule.get_antecedent()) value: float = accum_dict[key]["V"] count: int = int(accum_dict[key]["C"]) if count == 1: continue rule.set_consequent(value / count) return rules def delete_same_rules(rules: List[Rule]) -> List[Rule]: same_rules: List[int] = [] accum_dict: Dict[str, Dict[str, float]] = _get_rules_accum(rules) for rule1_index, rule1 in enumerate(rules): for rule2_index, rule2 in enumerate(rules): if rule1_index >= rule2_index: continue if _is_same_rules(rule1, rule2): same_rules.append(rule1_index) break cleared_rules = [ rule for index, rule in enumerate(rules) if index not in same_rules ] return _recalculate_consequents(accum_dict, cleared_rules) def get_features(rules: List[Rule], exclude: List[str] | None = None) -> List[str]: atoms: List[str] = [] for rule in rules: for atom in rule.get_antecedent(): if exclude is not None and atom.get_varaible() in exclude: continue if str(atom) in atoms: continue atoms.append(str(atom)) atoms.sort() return atoms def vectorize_rules(rules: List[Rule], features: List[str]) -> pd.DataFrame: columns: List[str] = [] columns.append("rule") columns.extend(features) columns.append("consequent") df = pd.DataFrame(columns=columns) for rule in rules: data = [str(rule)] mask = np.isin(list(features), [str(atom) for atom in rule.get_antecedent()]) data = np.append(data, mask.astype(int)) data = np.append(data, rule.get_consequent()) df.loc[len(df)] = pd.Series(data=data, index=df.columns) df = df.set_index("rule") return df def _get_clustered_rules( rules: List[Rule], clusters_num: int, labels: np.ndarray ) -> List[List[Rule]]: clustered_rules: List[List[Rule]] = [] for cluster_id in range(clusters_num): cluster_indices = np.where(labels == cluster_id)[0] clustered_rules.append([rules[idx] for idx in cluster_indices]) return clustered_rules def _get_variables_minmax(X: pd.DataFrame) -> Dict[str, Tuple[float, float]]: itervals: Dict[str, Tuple[float, float]] = {} for column in X.columns: itervals[column] = (X[column].min(), X[column].max()) return itervals def _get_varibles_interval( antecedent: List[RuleAtom], ) -> Dict[str, Tuple[float | None, float | None]]: intervals: Dict[str, Tuple[float | None, float | None]] = {} for atom in antecedent: if intervals.get(atom.get_varaible(), None) is None: intervals[atom.get_varaible()] = (None, None) if atom.get_type() == ComparisonType.GREATER: intervals[atom.get_varaible()] = ( atom.get_value(), intervals[atom.get_varaible()][1], ) if atom.get_type() == ComparisonType.LESS: intervals[atom.get_varaible()] = ( intervals[atom.get_varaible()][0], atom.get_value(), ) return intervals def simplify_and_group_rules( X: pd.DataFrame, rules: List[Rule], clusters_num: int, clusters_labels: np.ndarray ): minmax = _get_variables_minmax(X) new_rules: List[List[Rule]] = [] for cluster in _get_clustered_rules(rules, clusters_num, clusters_labels): cl_rules: List[Rule] = [] for rule in cluster: intervals = _get_varibles_interval(rule.get_antecedent()) new_atoms = [] for key, value in intervals.items(): val: float = 0 if value[0] is None and value[1] is not None: val = minmax[key][0] if value[1] is None and value[0] is not None: val = minmax[key][1] if value[0] is not None and value[1] is not None: val = (value[0] + value[1]) / 2 new_atoms.append(RuleAtom(key, ComparisonType.EQUALS, val)) cl_rules.append(Rule(new_atoms, rule.get_consequent())) new_rules.append(cl_rules) return new_rules def _get_fuzzy_rule_atom( fuzzy_variable: FuzzyVariable, value: float ) -> Tuple[Term, float]: values = {} for term in fuzzy_variable.terms: mval = np.interp(value, fuzzy_variable.universe, fuzzy_variable[term].mf) values[term] = mval best_value = sorted(values.items(), key=lambda x: x[1], reverse=True)[0] return (fuzzy_variable[best_value[0]], best_value[1]) def _get_fuzzy_rules( rules: List[Rule], fuzzy_variables: Dict[str, FuzzyVariable] ) -> List[Tuple[List[RuleAtom], Term, float]]: fuzzy_rules: List[Tuple[List[RuleAtom], Term, float]] = [] for rule in rules: antecedent = [] for atom in rule.get_antecedent(): if fuzzy_variables.get(atom.get_varaible(), None) is None: continue antecedent.append( _get_fuzzy_rule_atom( fuzzy_variables[atom.get_varaible()], atom.get_value() ) ) consequent = _get_fuzzy_rule_atom( fuzzy_variables["consequent"], rule.get_consequent() )[0] fuzzy_rules.append( ( # FuzzyRule(reduce(and_, [atom[0] for atom in antecedent]), consequent), [atom[0] for atom in antecedent], consequent, sum([atom[1] for atom in antecedent]), ) ) return fuzzy_rules def _delete_same_fuzzy_rules( rules_cluster: List[Tuple[List[RuleAtom], Term, float]] ) -> List[Tuple[List[RuleAtom], Term, float]]: same_rules: List[int] = [] for rule1_index, rule1 in enumerate(rules_cluster): for rule2_index, rule2 in enumerate(rules_cluster): if rule1_index >= rule2_index: continue # Remove the same rules if str(rule1[0]) == str(rule2[0]) and str(rule1[1]) == str(rule2[1]): same_rules.append(rule1_index) break # If antecedents is equals, but consequents is not equals then # Remove rule with the higher antecedent weight if str(rule1[0]) == str(rule2[0]) and str(rule1[2]) <= str(rule2[2]): same_rules.append(rule2_index) break if str(rule1[0]) == str(rule2[0]) and str(rule1[2]) > str(rule2[2]): same_rules.append(rule1_index) break return [rule for index, rule in enumerate(rules_cluster) if index not in same_rules] def get_fuzzy_rules( clustered_rules: List[List[Rule]], fuzzy_variables: Dict[str, FuzzyVariable] ) -> List[FuzzyRule]: fuzzy_rules: List[List[Tuple[List[RuleAtom], Term, float]]] = [] fuzzy_rules = [ _get_fuzzy_rules(rules, fuzzy_variables) for rules in clustered_rules ] fuzzy_rules = [_delete_same_fuzzy_rules(cluster) for cluster in fuzzy_rules] return [ FuzzyRule(reduce(and_, item[0]), item[1]) for cluster in fuzzy_rules for item in cluster ]