dd/PA/code/fyzzy rules.py
2025-04-25 13:21:46 +04:00

190 lines
6.0 KiB
Python

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.preprocessing import MinMaxScaler
from skfuzzy import cmeans, defuzz
from sklearn.linear_model import LinearRegression
# Çàãðóçêà äàííûõ (ïðèìåð ñ äàííûìè î òåìïåðàòóðå)
url = "https://raw.githubusercontent.com/jbrownlee/Datasets/master/daily-min-temperatures.csv"
data = pd.read_csv(url, header=0, index_col=0, parse_dates=True)
ts_data = data['Temp'].values.reshape(-1, 1)
# Íîðìàëèçàöèÿ äàííûõ
scaler = MinMaxScaler()
scaled_data = scaler.fit_transform(ts_data).flatten()
# Ïàðàìåòðû íå÷åòêîé ñèñòåìû
n_clusters = 3 # Êîëè÷åñòâî íå÷åòêèõ ìíîæåñòâ
m = 2.0 # Ïàðàìåòð íå÷åòêîñòè (îáû÷íî ìåæäó 1.5 è 3.0)
max_iter = 100 # Ìàêñèìàëüíîå ÷èñëî èòåðàöèé
error = 0.005 # Êðèòåðèé îñòàíîâà
# Ïðèìåíåíèå íå÷åòêîé c-ñðåäíåé êëàñòåðèçàöèè
cntr, u, u0, d, jm, p, fpc = cmeans(
data=scaled_data.reshape(1, -1),
c=n_clusters,
m=m,
error=error,
maxiter=max_iter
)
# Âèçóàëèçàöèÿ ôóíêöèé ïðèíàäëåæíîñòè
x = np.linspace(0, 1, 100)
plt.figure(figsize=(10, 6))
for i in range(n_clusters):
membership = np.exp(-(x - cntr[i])**2 / (2 * 0.1**2)) # Ãàóññîâà ôóíêöèÿ ïðèíàäëåæíîñòè
plt.plot(x, membership, label=f'Êëàñòåð {i+1}')
plt.title('Ôóíêöèè ïðèíàäëåæíîñòè íå÷åòêèõ ìíîæåñòâ')
plt.xlabel('Íîðìàëèçîâàííîå çíà÷åíèå òåìïåðàòóðû')
plt.ylabel('Ñòåïåíü ïðèíàäëåæíîñòè')
plt.legend()
plt.show()
# Ôóíêöèÿ äëÿ îïðåäåëåíèÿ ïðèíàäëåæíîñòè ê êëàñòåðàì
def get_cluster_membership(x, cntr):
distances = [np.abs(x - c) for c in cntr]
memberships = [1 / (1 + d ** 2) for d in distances] # Èñïîëüçóåì îáðàòíîå ðàññòîÿíèå
return memberships / np.sum(memberships) # Íîðìàëèçàöèÿ
# Ñîçäàíèå îáó÷àþùåãî íàáîðà äëÿ ïðàâèë
window_size = 3 # Ðàçìåð îêíà äëÿ âðåìåííûõ ïàòòåðíîâ
X = []
y = []
for i in range(window_size, len(scaled_data)):
# Ïîëó÷àåì îêíî äàííûõ
window = scaled_data[i - window_size:i]
# Âû÷èñëÿåì ïðèíàäëåæíîñòè äëÿ êàæäîãî ýëåìåíòà îêíà
memberships = np.array([get_cluster_membership(x, cntr) for x in window])
# Íàõîäèì äîìèíèðóþùèé êëàñòåð äëÿ êàæäîãî ýëåìåíòà
dominant_clusters = np.argmax(memberships, axis=1)
# Ôîðìèðóåì ïðàâèëî è öåëåâóþ ïåðåìåííóþ
X.append(dominant_clusters)
y.append(scaled_data[i])
X = np.array(X)
y = np.array(y)
# Èçâëå÷åíèå ÷àñòûõ ïðàâèë
from collections import defaultdict
rule_counts = defaultdict(int)
rule_consequences = defaultdict(list)
for pattern, consequence in zip(X, y):
#rule_key = tuple(pattern)
rule_key = tuple(tuple(row) for row in pattern)
rule_counts[rule_key] += 1
rule_consequences[rule_key].append(consequence)
# Âûâîä íàèáîëåå ÷àñòûõ ïðàâèë
print("Òîï-5 íàèáîëåå ÷àñòûõ ïðàâèë:")
sorted_rules = sorted(rule_counts.items(), key=lambda x: -x[1])[:5]
for rule, count in sorted_rules:
avg_consequence = np.mean(rule_consequences[rule])
print(f"Åñëè {rule} òî y={avg_consequence:.3f} (âñòðå÷àåòñÿ {count} ðàç)")
class FuzzyTSKSystem:
def __init__(self, n_clusters, window_size, cntr):
self.n_clusters = n_clusters
self.window_size = window_size
self.cntr = cntr
self.rules = {}
self.rule_weights = {}
def add_rule(self, antecedent, consequent_func):
self.rules[antecedent] = consequent_func
self.rule_weights[antecedent] = 1.0 # Íà÷àëüíûé âåñ ïðàâèëà
def predict(self, window):
# Âû÷èñëÿåì ïðèíàäëåæíîñòè äëÿ êàæäîãî ýëåìåíòà îêíà
memberships = np.array([get_cluster_membership(x, self.cntr) for x in window])
# Âû÷èñëÿåì àêòèâàöèþ êàæäîãî ïðàâèëà
rule_activations = {}
total_activation = 0.0
for rule_antecedent in self.rules:
# Âû÷èñëÿåì ñòåïåíü ñîîòâåòñòâèÿ îêíà ïðàâèëó
activation = 1.0
for i in range(self.window_size):
cluster = rule_antecedent[i]
activation *= memberships[i, cluster]
rule_activations[rule_antecedent] = activation * self.rule_weights[rule_antecedent]
total_activation += rule_activations[rule_antecedent]
# Åñëè íè îäíî ïðàâèëî íå àêòèâèðîâàíî, âîçâðàùàåì ñðåäíåå
if total_activation == 0:
return np.mean(window)
# Âû÷èñëÿåì âçâåøåííûé âûâîä
weighted_output = 0.0
for rule_antecedent, activation in rule_activations.items():
# Ïîëó÷àåì âûâîä ïðàâèëà (ëèíåéíàÿ ôóíêöèÿ îò âõîäà)
consequent = self.rules[rule_antecedent](window)
weighted_output += activation * consequent
return weighted_output / total_activation
# Ñîçäàåì è îáó÷àåì íå÷åòêóþ ñèñòåìó
fuzzy_system = FuzzyTSKSystem(n_clusters, window_size, cntr)
# Äîáàâëÿåì ïðàâèëà íà îñíîâå äàííûõ
for rule_antecedent in rule_counts:
# Äëÿ êàæäîãî ïðàâèëà ñîçäàåì ëèíåéíóþ ìîäåëü
X_rule = []
y_rule = []
for i in range(len(X)):
if tuple(X[i]) == rule_antecedent:
X_rule.append(scaled_data[i - window_size:i])
y_rule.append(y[i])
"""
if len(X_rule) > 0:
X_rule = np.array(X_rule)
y_rule = np.array(y_rule)
# Îáó÷àåì ëèíåéíóþ ðåãðåññèþ äëÿ ýòîãî ïðàâèëà
model = LinearRegression()
model.fit(X_rule, y_rule)
# Äîáàâëÿåì ïðàâèëî â ñèñòåìó
fuzzy_system.add_rule(
rule_antecedent,
lambda x, m=model: m.predict([x])[0]
)
"""
# Ïðîãíîçèðîâàíèå íà òåñòîâûõ äàííûõ
test_size = 100
train_data = scaled_data[:-test_size]
test_data = scaled_data[-test_size:]
predictions = []
for i in range(window_size, len(test_data)):
window = test_data[i - window_size:i]
pred = fuzzy_system.predict(window)
predictions.append(pred)
# Âèçóàëèçàöèÿ ðåçóëüòàòîâ
plt.figure(figsize=(12, 6))
plt.plot(test_data[window_size:], label='Ôàêòè÷åñêèå çíà÷åíèÿ')
plt.plot(predictions, label='Ïðîãíîç íå÷åòêîé ñèñòåìû')
plt.title('Ñðàâíåíèå ôàêòè÷åñêèõ è ïðîãíîçèðóåìûõ çíà÷åíèé')
plt.legend()
plt.show()
# Îöåíêà êà÷åñòâà ïðîãíîçà
from sklearn.metrics import mean_squared_error
mse = mean_squared_error(test_data[window_size:], predictions)
print(f"Ñðåäíåêâàäðàòè÷íàÿ îøèáêà (MSE): {mse:.5f}")