190 lines
6.0 KiB
Python
190 lines
6.0 KiB
Python
import numpy as np
|
|
import pandas as pd
|
|
import matplotlib.pyplot as plt
|
|
from sklearn.preprocessing import MinMaxScaler
|
|
from skfuzzy import cmeans, defuzz
|
|
from sklearn.linear_model import LinearRegression
|
|
|
|
# Çàãðóçêà äàííûõ (ïðèìåð ñ äàííûìè î òåìïåðàòóðå)
|
|
url = "https://raw.githubusercontent.com/jbrownlee/Datasets/master/daily-min-temperatures.csv"
|
|
data = pd.read_csv(url, header=0, index_col=0, parse_dates=True)
|
|
ts_data = data['Temp'].values.reshape(-1, 1)
|
|
|
|
# Íîðìàëèçàöèÿ äàííûõ
|
|
scaler = MinMaxScaler()
|
|
scaled_data = scaler.fit_transform(ts_data).flatten()
|
|
|
|
# Ïàðàìåòðû íå÷åòêîé ñèñòåìû
|
|
n_clusters = 3 # Êîëè÷åñòâî íå÷åòêèõ ìíîæåñòâ
|
|
m = 2.0 # Ïàðàìåòð íå÷åòêîñòè (îáû÷íî ìåæäó 1.5 è 3.0)
|
|
max_iter = 100 # Ìàêñèìàëüíîå ÷èñëî èòåðàöèé
|
|
error = 0.005 # Êðèòåðèé îñòàíîâà
|
|
|
|
# Ïðèìåíåíèå íå÷åòêîé c-ñðåäíåé êëàñòåðèçàöèè
|
|
cntr, u, u0, d, jm, p, fpc = cmeans(
|
|
data=scaled_data.reshape(1, -1),
|
|
c=n_clusters,
|
|
m=m,
|
|
error=error,
|
|
maxiter=max_iter
|
|
)
|
|
|
|
# Âèçóàëèçàöèÿ ôóíêöèé ïðèíàäëåæíîñòè
|
|
x = np.linspace(0, 1, 100)
|
|
plt.figure(figsize=(10, 6))
|
|
for i in range(n_clusters):
|
|
membership = np.exp(-(x - cntr[i])**2 / (2 * 0.1**2)) # Ãàóññîâà ôóíêöèÿ ïðèíàäëåæíîñòè
|
|
plt.plot(x, membership, label=f'Êëàñòåð {i+1}')
|
|
plt.title('Ôóíêöèè ïðèíàäëåæíîñòè íå÷åòêèõ ìíîæåñòâ')
|
|
plt.xlabel('Íîðìàëèçîâàííîå çíà÷åíèå òåìïåðàòóðû')
|
|
plt.ylabel('Ñòåïåíü ïðèíàäëåæíîñòè')
|
|
plt.legend()
|
|
plt.show()
|
|
|
|
|
|
# Ôóíêöèÿ äëÿ îïðåäåëåíèÿ ïðèíàäëåæíîñòè ê êëàñòåðàì
|
|
def get_cluster_membership(x, cntr):
|
|
distances = [np.abs(x - c) for c in cntr]
|
|
memberships = [1 / (1 + d ** 2) for d in distances] # Èñïîëüçóåì îáðàòíîå ðàññòîÿíèå
|
|
return memberships / np.sum(memberships) # Íîðìàëèçàöèÿ
|
|
|
|
|
|
# Ñîçäàíèå îáó÷àþùåãî íàáîðà äëÿ ïðàâèë
|
|
window_size = 3 # Ðàçìåð îêíà äëÿ âðåìåííûõ ïàòòåðíîâ
|
|
X = []
|
|
y = []
|
|
|
|
for i in range(window_size, len(scaled_data)):
|
|
# Ïîëó÷àåì îêíî äàííûõ
|
|
window = scaled_data[i - window_size:i]
|
|
|
|
# Âû÷èñëÿåì ïðèíàäëåæíîñòè äëÿ êàæäîãî ýëåìåíòà îêíà
|
|
memberships = np.array([get_cluster_membership(x, cntr) for x in window])
|
|
|
|
# Íàõîäèì äîìèíèðóþùèé êëàñòåð äëÿ êàæäîãî ýëåìåíòà
|
|
dominant_clusters = np.argmax(memberships, axis=1)
|
|
|
|
# Ôîðìèðóåì ïðàâèëî è öåëåâóþ ïåðåìåííóþ
|
|
X.append(dominant_clusters)
|
|
y.append(scaled_data[i])
|
|
|
|
X = np.array(X)
|
|
y = np.array(y)
|
|
|
|
# Èçâëå÷åíèå ÷àñòûõ ïðàâèë
|
|
from collections import defaultdict
|
|
|
|
rule_counts = defaultdict(int)
|
|
rule_consequences = defaultdict(list)
|
|
|
|
for pattern, consequence in zip(X, y):
|
|
#rule_key = tuple(pattern)
|
|
rule_key = tuple(tuple(row) for row in pattern)
|
|
rule_counts[rule_key] += 1
|
|
rule_consequences[rule_key].append(consequence)
|
|
|
|
# Âûâîä íàèáîëåå ÷àñòûõ ïðàâèë
|
|
print("Òîï-5 íàèáîëåå ÷àñòûõ ïðàâèë:")
|
|
sorted_rules = sorted(rule_counts.items(), key=lambda x: -x[1])[:5]
|
|
for rule, count in sorted_rules:
|
|
avg_consequence = np.mean(rule_consequences[rule])
|
|
print(f"Åñëè {rule} òî y={avg_consequence:.3f} (âñòðå÷àåòñÿ {count} ðàç)")
|
|
|
|
|
|
class FuzzyTSKSystem:
|
|
def __init__(self, n_clusters, window_size, cntr):
|
|
self.n_clusters = n_clusters
|
|
self.window_size = window_size
|
|
self.cntr = cntr
|
|
self.rules = {}
|
|
self.rule_weights = {}
|
|
|
|
def add_rule(self, antecedent, consequent_func):
|
|
self.rules[antecedent] = consequent_func
|
|
self.rule_weights[antecedent] = 1.0 # Íà÷àëüíûé âåñ ïðàâèëà
|
|
|
|
def predict(self, window):
|
|
# Âû÷èñëÿåì ïðèíàäëåæíîñòè äëÿ êàæäîãî ýëåìåíòà îêíà
|
|
memberships = np.array([get_cluster_membership(x, self.cntr) for x in window])
|
|
|
|
# Âû÷èñëÿåì àêòèâàöèþ êàæäîãî ïðàâèëà
|
|
rule_activations = {}
|
|
total_activation = 0.0
|
|
|
|
for rule_antecedent in self.rules:
|
|
# Âû÷èñëÿåì ñòåïåíü ñîîòâåòñòâèÿ îêíà ïðàâèëó
|
|
activation = 1.0
|
|
for i in range(self.window_size):
|
|
cluster = rule_antecedent[i]
|
|
activation *= memberships[i, cluster]
|
|
|
|
rule_activations[rule_antecedent] = activation * self.rule_weights[rule_antecedent]
|
|
total_activation += rule_activations[rule_antecedent]
|
|
|
|
# Åñëè íè îäíî ïðàâèëî íå àêòèâèðîâàíî, âîçâðàùàåì ñðåäíåå
|
|
if total_activation == 0:
|
|
return np.mean(window)
|
|
|
|
# Âû÷èñëÿåì âçâåøåííûé âûâîä
|
|
weighted_output = 0.0
|
|
for rule_antecedent, activation in rule_activations.items():
|
|
# Ïîëó÷àåì âûâîä ïðàâèëà (ëèíåéíàÿ ôóíêöèÿ îò âõîäà)
|
|
consequent = self.rules[rule_antecedent](window)
|
|
weighted_output += activation * consequent
|
|
|
|
return weighted_output / total_activation
|
|
|
|
|
|
# Ñîçäàåì è îáó÷àåì íå÷åòêóþ ñèñòåìó
|
|
fuzzy_system = FuzzyTSKSystem(n_clusters, window_size, cntr)
|
|
|
|
# Äîáàâëÿåì ïðàâèëà íà îñíîâå äàííûõ
|
|
for rule_antecedent in rule_counts:
|
|
# Äëÿ êàæäîãî ïðàâèëà ñîçäàåì ëèíåéíóþ ìîäåëü
|
|
X_rule = []
|
|
y_rule = []
|
|
|
|
for i in range(len(X)):
|
|
if tuple(X[i]) == rule_antecedent:
|
|
X_rule.append(scaled_data[i - window_size:i])
|
|
y_rule.append(y[i])
|
|
"""
|
|
if len(X_rule) > 0:
|
|
X_rule = np.array(X_rule)
|
|
y_rule = np.array(y_rule)
|
|
|
|
# Îáó÷àåì ëèíåéíóþ ðåãðåññèþ äëÿ ýòîãî ïðàâèëà
|
|
model = LinearRegression()
|
|
model.fit(X_rule, y_rule)
|
|
|
|
# Äîáàâëÿåì ïðàâèëî â ñèñòåìó
|
|
fuzzy_system.add_rule(
|
|
rule_antecedent,
|
|
lambda x, m=model: m.predict([x])[0]
|
|
)
|
|
"""
|
|
|
|
# Ïðîãíîçèðîâàíèå íà òåñòîâûõ äàííûõ
|
|
test_size = 100
|
|
train_data = scaled_data[:-test_size]
|
|
test_data = scaled_data[-test_size:]
|
|
|
|
predictions = []
|
|
for i in range(window_size, len(test_data)):
|
|
window = test_data[i - window_size:i]
|
|
pred = fuzzy_system.predict(window)
|
|
predictions.append(pred)
|
|
|
|
# Âèçóàëèçàöèÿ ðåçóëüòàòîâ
|
|
plt.figure(figsize=(12, 6))
|
|
plt.plot(test_data[window_size:], label='Ôàêòè÷åñêèå çíà÷åíèÿ')
|
|
plt.plot(predictions, label='Ïðîãíîç íå÷åòêîé ñèñòåìû')
|
|
plt.title('Ñðàâíåíèå ôàêòè÷åñêèõ è ïðîãíîçèðóåìûõ çíà÷åíèé')
|
|
plt.legend()
|
|
plt.show()
|
|
|
|
# Îöåíêà êà÷åñòâà ïðîãíîçà
|
|
from sklearn.metrics import mean_squared_error
|
|
|
|
mse = mean_squared_error(test_data[window_size:], predictions)
|
|
print(f"Ñðåäíåêâàäðàòè÷íàÿ îøèáêà (MSE): {mse:.5f}") |