Separate main script to classes

master
Aleksey Filippov 1 year ago
parent a1d45fdc84
commit 488857052d

@ -1,171 +1,13 @@
#!/usr/bin/env python3
import json
import os
import sys
from datetime import date, datetime
import numpy as np
import pandas as pd
from geopy.extra.rate_limiter import RateLimiter
from geopy.geocoders import Nominatim
EMPTY_AGE = 0
UNIVERSITY_AGE = 21
SCHOOL_BEGIN_AGE = 7
SCHOOL_GRADUATED_AGE = 17
geolocator = Nominatim(user_agent="MyApp")
geocode = RateLimiter(geolocator.geocode, min_delay_seconds=1)
geo_cache = {}
def is_empty_str(value):
if value is None:
return True
return len(str(value).strip()) == 0
def is_empty_number(value):
if is_empty_str(value):
return True
str_val = str(value)
if str_val.startswith('-'):
str_val = str_val.replace('-', '', 1)
return not str_val.isnumeric()
def is_empty_collection(collection):
if is_empty_str(collection):
return True
if not isinstance(collection, list):
return True
return len(collection) == 0
def get_age(date_str):
if is_empty_str(date_str):
return EMPTY_AGE
today = date.today()
birthdate = datetime.strptime(date_str, '%d.%m.%Y')
age = today.year - birthdate.year - ((today.month, today.day) < (birthdate.month, birthdate.day))
return age
def get_years(year1, year2):
if year1 >= year2:
return year1 - year2
if year2 >= year1:
return year2 - year1
def get_age_from_education(education, value, additional_value):
if is_empty_collection(education):
return EMPTY_AGE
for item in education:
graduation = item[value]
if is_empty_number(graduation):
return EMPTY_AGE
return get_years(graduation, date.today().year) + additional_value
def prepare_dataset_age(df):
df['age'] = df.loc[:, 'bdate'].apply(get_age)
university_mask = (df['age'] == EMPTY_AGE) & (df['universities'].str.len() > 0)
df.loc[university_mask, 'age'] = df.loc[university_mask, 'universities'] \
.apply(lambda val: get_age_from_education(val, 'graduation', UNIVERSITY_AGE))
school_mask_1 = (df['age'] == EMPTY_AGE) & (df['schools'].str.len() > 0)
df.loc[school_mask_1, 'age'] = df.loc[school_mask_1, 'schools'] \
.apply(lambda val: get_age_from_education(val, 'year_graduated', SCHOOL_GRADUATED_AGE))
school_mask_2 = (df['age'] == EMPTY_AGE) & (df['schools'].str.len() > 0)
df.loc[school_mask_2, 'age'] = df.loc[school_mask_2, 'schools'] \
.apply(lambda val: get_age_from_education(val, 'year_from', SCHOOL_BEGIN_AGE))
return df
def prepare_dataset_status(df):
is_university_mask = ((df['age'] >= UNIVERSITY_AGE) | (df['age'] == EMPTY_AGE)) & \
((df['universities'].str.len() > 0) | (df['occupation_type'] == 'university'))
df['is_university'] = np.where(is_university_mask, True, False)
is_work_mask = ((df['age'] > SCHOOL_GRADUATED_AGE) | (df['age'] == EMPTY_AGE)) & \
((df['is_university']) | (df['occupation_type'] == 'work')) | \
(df['age'] > UNIVERSITY_AGE)
df['is_work'] = np.where(is_work_mask, True, False)
is_student_mask = ((df['occupation_type'] == 'university') &
((df['age'] >= SCHOOL_GRADUATED_AGE) & (df['age'] <= UNIVERSITY_AGE)))
df['is_student'] = np.where(is_student_mask, True, False)
is_schoolboy_mask = ((df['age'] < SCHOOL_GRADUATED_AGE) & (df['age'] != EMPTY_AGE)) | \
((df['age'] == EMPTY_AGE) & (df['occupation_type'] == 'school'))
df['is_schoolboy'] = np.where(is_schoolboy_mask, True, False)
return df
def load_geo_cache(json_file):
with open(json_file, 'r') as rf:
geo_cache.update(json.load(rf))
def save_geo_cache(json_file):
with open(json_file, 'w') as wf:
json.dump(geo_cache, wf)
print('Geocache saved')
def update_geo_cache(cities, json_file):
is_changed = False
for city in cities:
if is_empty_str(city):
continue
result = geo_cache.get(city)
if result is not None:
continue
print(f'{len(geo_cache.keys())}/{len(cities)} - Try to load geocode for {city}')
location = geocode(city)
result = (location.latitude, location.longitude)
geo_cache[city] = result
is_changed = True
if len(geo_cache.keys()) % 50 == 0:
save_geo_cache(json_file)
if is_changed:
save_geo_cache(json_file)
def prepare_dataset_location(df):
json_file = 'geocache.json'
load_geo_cache(json_file)
update_geo_cache(df['city'].unique().tolist(), json_file)
df['location'] = df['city'] \
.apply(lambda val: '' if is_empty_str(val) else geo_cache[val])
return df
def prepare_dataset(json_file):
df = pd.read_json(json_file)
df = prepare_dataset_age(df)
df = prepare_dataset_status(df)
df = prepare_dataset_location(df)
return df
from src.main.df_loader import DfLoader
def __main(json_file):
df = prepare_dataset(json_file)
df_loader: DfLoader = DfLoader(json_file)
df = df_loader.get_data_frame()
print('done')

@ -1,2 +1,3 @@
pandas==2.0.1
geopy==2.3.0
geopy==2.3.0
numpy==1.24.3

@ -0,0 +1,16 @@
class Constants:
@staticmethod
def empty_age() -> int:
return 0
@staticmethod
def university_gr_age() -> int:
return 21
@staticmethod
def school_st_age() -> int:
return 7
@staticmethod
def school_gr_age() -> int:
return 17

@ -0,0 +1,72 @@
from datetime import date
import numpy as np
import pandas as pd
from pandas import DataFrame
from src.main.constants import Constants as const
from src.main.geocache import Geocache
from src.main.utils import Utils
class DfLoader:
def __init__(self, json_file: str) -> None:
self.__geocache: Geocache = Geocache()
print(f'Try to load data from the {json_file} file')
self.__df: DataFrame = pd.read_json(json_file)
self.__prepare_dataset_age()
self.__prepare_dataset_status()
self.__prepare_dataset_location()
print(f'Data is successfully loaded')
@staticmethod
def get_age_from_education(education: [], value: str, additional_value: int) -> int:
if Utils.is_empty_collection(education):
return const.empty_age()
for item in education:
graduation: int = item[value]
if Utils.is_empty_number(graduation):
return const.empty_age()
return Utils.get_years(graduation, date.today().year) + additional_value
def __prepare_dataset_age(self) -> None:
self.__df['age'] = self.__df.loc[:, 'bdate'].apply(Utils.get_age)
university_mask = (self.__df['age'] == const.empty_age()) & (self.__df['universities'].str.len() > 0)
self.__df.loc[university_mask, 'age'] = self.__df.loc[university_mask, 'universities'] \
.apply(lambda val: self.get_age_from_education(val, 'graduation', const.university_gr_age()))
school_mask_1 = (self.__df['age'] == const.empty_age()) & (self.__df['schools'].str.len() > 0)
self.__df.loc[school_mask_1, 'age'] = self.__df.loc[school_mask_1, 'schools'] \
.apply(lambda val: self.get_age_from_education(val, 'year_graduated', const.school_gr_age()))
school_mask_2 = (self.__df['age'] == const.empty_age()) & (self.__df['schools'].str.len() > 0)
self.__df.loc[school_mask_2, 'age'] = self.__df.loc[school_mask_2, 'schools'] \
.apply(lambda val: self.get_age_from_education(val, 'year_from', const.school_st_age()))
def __prepare_dataset_status(self) -> None:
is_univer_mask = ((self.__df['age'] >= const.university_gr_age()) | (self.__df['age'] == const.empty_age())) & \
((self.__df['universities'].str.len() > 0) | (self.__df['occupation_type'] == 'university'))
self.__df['is_university'] = np.where(is_univer_mask, True, False)
is_work_mask = ((self.__df['age'] > const.school_gr_age()) | (self.__df['age'] == const.empty_age())) & \
((self.__df['is_university']) | (self.__df['occupation_type'] == 'work')) | \
(self.__df['age'] > const.university_gr_age())
self.__df['is_work'] = np.where(is_work_mask, True, False)
is_student_mask = ((self.__df['occupation_type'] == 'university') &
((self.__df['age'] >= const.school_gr_age()) &
(self.__df['age'] <= const.university_gr_age())))
self.__df['is_student'] = np.where(is_student_mask, True, False)
is_schoolboy_mask = ((self.__df['age'] < const.school_gr_age()) & (self.__df['age'] != const.empty_age())) | \
((self.__df['age'] == const.empty_age()) & (self.__df['occupation_type'] == 'school'))
self.__df['is_schoolboy'] = np.where(is_schoolboy_mask, True, False)
def __prepare_dataset_location(self) -> None:
self.__geocache.update_geo_cache(self.__df['city'].unique().tolist())
self.__df['location'] = self.__df['city'] \
.apply(lambda val: '' if Utils.is_empty_str(val) else self.__geocache.get_location(val))
def get_clustering_data(self) -> DataFrame:
return self.__df

@ -0,0 +1,50 @@
import json
import os
from typing import List
from geopy import Point
from geopy.extra.rate_limiter import RateLimiter
from geopy.geocoders import Nominatim
from src.main.utils import Utils
class Geocache:
JSON_FILE: str = 'geocache.json'
def __init__(self) -> None:
geolocator: Nominatim = Nominatim(user_agent="MyApp")
self.__geocode: RateLimiter = RateLimiter(geolocator.geocode, min_delay_seconds=1)
self.__geo_cache: dict = {}
self.__load_geo_cache()
def __load_geo_cache(self) -> None:
if os.path.isfile(self.JSON_FILE):
with open(self.JSON_FILE, 'r') as rf:
self.__geo_cache.update(json.load(rf))
def __save_geo_cache(self) -> None:
with open(self.JSON_FILE, 'w') as wf:
json.dump(self.__geo_cache, wf)
print('Geocache saved')
def update_geo_cache(self, cities: List[str]) -> None:
is_changed: bool = False
for city in cities:
if Utils.is_empty_str(city):
continue
result: () = self.__geo_cache.get(city)
if result is not None:
continue
print(f'{len(self.__geo_cache.keys())}/{len(cities)} - Try to load geocode for {city}')
location: Point = self.__geocode(city)
result: () = (location.latitude, location.longitude)
self.__geo_cache[city] = result
is_changed = True
if len(self.__geo_cache.keys()) % 50 == 0:
self.__save_geo_cache()
if is_changed:
self.__save_geo_cache()
def get_location(self, city: str) -> ():
return self.__geo_cache.get(city)

@ -0,0 +1,44 @@
from datetime import date, datetime
from src.main.constants import Constants as const
class Utils:
@staticmethod
def is_empty_str(value: any) -> bool:
if value is None:
return True
return len(str(value).strip()) == 0
@staticmethod
def is_empty_number(value: any) -> bool:
if Utils.is_empty_str(value):
return True
str_val = str(value)
if str_val.startswith('-'):
str_val = str_val.replace('-', '', 1)
return not str_val.isnumeric()
@staticmethod
def is_empty_collection(collection: any) -> bool:
if Utils.is_empty_str(collection):
return True
if not isinstance(collection, list):
return True
return len(collection) == 0
@staticmethod
def get_age(date_str: str) -> int:
if Utils.is_empty_str(date_str):
return const.empty_age()
today: date = date.today()
birthdate: date = datetime.strptime(date_str, '%d.%m.%Y')
age: int = today.year - birthdate.year - ((today.month, today.day) < (birthdate.month, birthdate.day))
return age
@staticmethod
def get_years(year1: int, year2: int) -> int:
if year1 >= year2:
return year1 - year2
if year2 >= year1:
return year2 - year1
Loading…
Cancel
Save