import numpy as np import pandas as pd import matplotlib.pyplot as plt from sklearn.preprocessing import MinMaxScaler from skfuzzy import cmeans, defuzz from sklearn.linear_model import LinearRegression # Загрузка данных (пример с данными о температуре) url = "https://raw.githubusercontent.com/jbrownlee/Datasets/master/daily-min-temperatures.csv" data = pd.read_csv(url, header=0, index_col=0, parse_dates=True) ts_data = data['Temp'].values.reshape(-1, 1) # Нормализация данных scaler = MinMaxScaler() scaled_data = scaler.fit_transform(ts_data).flatten() # Параметры нечеткой системы n_clusters = 3 # Количество нечетких множеств m = 2.0 # Параметр нечеткости (обычно между 1.5 и 3.0) max_iter = 100 # Максимальное число итераций error = 0.005 # Критерий останова # Применение нечеткой c-средней кластеризации cntr, u, u0, d, jm, p, fpc = cmeans( data=scaled_data.reshape(1, -1), c=n_clusters, m=m, error=error, maxiter=max_iter ) # Визуализация функций принадлежности x = np.linspace(0, 1, 100) plt.figure(figsize=(10, 6)) for i in range(n_clusters): membership = np.exp(-(x - cntr[i])**2 / (2 * 0.1**2)) # Гауссова функция принадлежности plt.plot(x, membership, label=f'Кластер {i+1}') plt.title('Функции принадлежности нечетких множеств') plt.xlabel('Нормализованное значение температуры') plt.ylabel('Степень принадлежности') plt.legend() plt.show() # Функция для определения принадлежности к кластерам def get_cluster_membership(x, cntr): distances = [np.abs(x - c) for c in cntr] memberships = [1 / (1 + d ** 2) for d in distances] # Используем обратное расстояние return memberships / np.sum(memberships) # Нормализация # Создание обучающего набора для правил window_size = 3 # Размер окна для временных паттернов X = [] y = [] for i in range(window_size, len(scaled_data)): # Получаем окно данных window = scaled_data[i - window_size:i] # Вычисляем принадлежности для каждого элемента окна memberships = np.array([get_cluster_membership(x, cntr) for x in window]) # Находим доминирующий кластер для каждого элемента dominant_clusters = np.argmax(memberships, axis=1) # Формируем правило и целевую переменную X.append(dominant_clusters) y.append(scaled_data[i]) X = np.array(X) y = np.array(y) # Извлечение частых правил from collections import defaultdict rule_counts = defaultdict(int) rule_consequences = defaultdict(list) for pattern, consequence in zip(X, y): #rule_key = tuple(pattern) rule_key = tuple(tuple(row) for row in pattern) rule_counts[rule_key] += 1 rule_consequences[rule_key].append(consequence) # Вывод наиболее частых правил print("Топ-5 наиболее частых правил:") sorted_rules = sorted(rule_counts.items(), key=lambda x: -x[1])[:5] for rule, count in sorted_rules: avg_consequence = np.mean(rule_consequences[rule]) print(f"Если {rule} то y={avg_consequence:.3f} (встречается {count} раз)") class FuzzyTSKSystem: def __init__(self, n_clusters, window_size, cntr): self.n_clusters = n_clusters self.window_size = window_size self.cntr = cntr self.rules = {} self.rule_weights = {} def add_rule(self, antecedent, consequent_func): self.rules[antecedent] = consequent_func self.rule_weights[antecedent] = 1.0 # Начальный вес правила def predict(self, window): # Вычисляем принадлежности для каждого элемента окна memberships = np.array([get_cluster_membership(x, self.cntr) for x in window]) # Вычисляем активацию каждого правила rule_activations = {} total_activation = 0.0 for rule_antecedent in self.rules: # Вычисляем степень соответствия окна правилу activation = 1.0 for i in range(self.window_size): cluster = rule_antecedent[i] activation *= memberships[i, cluster] rule_activations[rule_antecedent] = activation * self.rule_weights[rule_antecedent] total_activation += rule_activations[rule_antecedent] # Если ни одно правило не активировано, возвращаем среднее if total_activation == 0: return np.mean(window) # Вычисляем взвешенный вывод weighted_output = 0.0 for rule_antecedent, activation in rule_activations.items(): # Получаем вывод правила (линейная функция от входа) consequent = self.rules[rule_antecedent](window) weighted_output += activation * consequent return weighted_output / total_activation # Создаем и обучаем нечеткую систему fuzzy_system = FuzzyTSKSystem(n_clusters, window_size, cntr) # Добавляем правила на основе данных for rule_antecedent in rule_counts: # Для каждого правила создаем линейную модель X_rule = [] y_rule = [] for i in range(len(X)): if tuple(X[i]) == rule_antecedent: X_rule.append(scaled_data[i - window_size:i]) y_rule.append(y[i]) """ if len(X_rule) > 0: X_rule = np.array(X_rule) y_rule = np.array(y_rule) # Обучаем линейную регрессию для этого правила model = LinearRegression() model.fit(X_rule, y_rule) # Добавляем правило в систему fuzzy_system.add_rule( rule_antecedent, lambda x, m=model: m.predict([x])[0] ) """ # Прогнозирование на тестовых данных test_size = 100 train_data = scaled_data[:-test_size] test_data = scaled_data[-test_size:] predictions = [] for i in range(window_size, len(test_data)): window = test_data[i - window_size:i] pred = fuzzy_system.predict(window) predictions.append(pred) # Визуализация результатов plt.figure(figsize=(12, 6)) plt.plot(test_data[window_size:], label='Фактические значения') plt.plot(predictions, label='Прогноз нечеткой системы') plt.title('Сравнение фактических и прогнозируемых значений') plt.legend() plt.show() # Оценка качества прогноза from sklearn.metrics import mean_squared_error mse = mean_squared_error(test_data[window_size:], predictions) print(f"Среднеквадратичная ошибка (MSE): {mse:.5f}")