import os import uuid import pandas as pd from pandas import DataFrame from sklearn.model_selection import train_test_split from werkzeug import utils from backend.api import FileStorage from backend.dataset.model import DatasetParams, SplittedDataset class Dataset: def __init__(self, path: str | None, file: FileStorage) -> None: if path is None: raise Exception("Dataset path is not defined") self.__path: str = path self.__file_name: str = self.__save(file) def __get_file_name(self, file: FileStorage) -> str: if file.filename is None: raise Exception("Dataset upload error") file_uuid: str = str(uuid.uuid4()) file_name: str = utils.secure_filename(file_uuid) return os.path.join(self.__path, file_name) def __save(self, file: FileStorage) -> str: file_name: str = self.__get_file_name(file=file) if os.path.exists(file_name): raise Exception(f"File with name '{file_name}' is already exists") file.stream.seek(0) file.save(file_name) return file_name def read(self, params: DatasetParams) -> DataFrame: df = pd.read_csv(self.__file_name, sep=params.sep, decimal=params.decimal) if params.input is not None: return df[params.input + [params.target]] return df def split( self, data: DataFrame, params: DatasetParams, random_state: int ) -> SplittedDataset: X = data.drop([params.target], axis=1) y = data[[params.target]] X_train, X_test, y_train, y_test = train_test_split( X, y, test_size=(1.0 - params.train_volume), random_state=random_state, ) return SplittedDataset(X_train, X_test, y_train, y_test) def remove(self): os.remove(self.__file_name)