89 lines
2.9 KiB
Python

import os
import uuid
import pandas as pd
from pandas import DataFrame
from pandas.errors import ParserError
from sklearn.model_selection import train_test_split
from werkzeug import utils
from backend.api import FileStorage
from backend.dataset.model import DatasetParams, SplittedDataset
class Dataset:
def __init__(self, path: str | None, file: FileStorage) -> None:
if path is None:
raise Exception("Dataset path is not defined")
self.__path: str = path
self.__file_name: str = self.__save(file)
def __get_file_name(self, file: FileStorage) -> str:
if file.filename is None:
raise Exception("Dataset upload error")
file_uuid: str = str(uuid.uuid4())
file_name: str = utils.secure_filename(file_uuid)
return os.path.join(self.__path, file_name)
def __save(self, file: FileStorage) -> str:
file_name: str = self.__get_file_name(file=file)
if os.path.exists(file_name):
raise Exception(f"File with name '{file_name}' is already exists")
file.stream.seek(0)
file.save(file_name)
return file_name
def read(self, params: DatasetParams) -> DataFrame:
df = None
try:
df = pd.read_csv(self.__file_name, sep=params.sep, decimal=params.decimal)
except ParserError:
raise Exception(
"Can't parse dataset. Try to use correct 'sep' and 'decimal' values."
)
if df.columns.size < 2:
raise Exception(
"Dataset contains less than 2 columns. "
"Try to use correct 'sep' parameter value."
)
params.target = params.target or df.columns[-1]
if params.input is not None:
return df[params.input + [params.target]]
return df
def __split(
self,
data: DataFrame,
params: DatasetParams,
random_state: int,
is_classification: bool = False,
) -> SplittedDataset:
X = data.drop([params.target], axis=1)
y = data[[params.target]]
stratify = None if not is_classification else y
X_train, X_test, y_train, y_test = train_test_split(
X,
y,
test_size=(1.0 - params.train_volume),
random_state=random_state,
stratify=stratify,
)
return SplittedDataset(X_train, X_test, y_train, y_test)
def split_regression(
self, data: DataFrame, params: DatasetParams, random_state: int
) -> SplittedDataset:
return self.__split(
data=data, params=params, random_state=random_state, is_classification=False
)
def split_classification(
self, data: DataFrame, params: DatasetParams, random_state: int
) -> SplittedDataset:
return self.__split(
data=data, params=params, random_state=random_state, is_classification=True
)
def remove(self):
os.remove(self.__file_name)