Source code for cfl_data_utils.utils.utils

"""
This module is for smaller utility function which are useful across many projects.

Todo:
    * better type testing using ``isinstance`` or ``assert``

"""
from datetime import datetime, timedelta
from os import get_terminal_size
from platform import system
from sys import stdout, modules
from time import time

from cfl_data_utils.references.constants import TYPE_TESTS, SQL_TYPE_DICT, WINDOWS

if not system() == WINDOWS:
    try:
        from blessings import Terminal
    except ModuleNotFoundError:
        from warnings import warn
        warn('Unable to import curses.')


[docs]def assert_date_is_yesterday(date): """Checks that a date is yesterday for data validation Args: date (datetime): The date to be validated Raises: AssertionError: if the date isn't today """ yesterday = datetime.today() - timedelta(days=1) assert date.date() == yesterday.date()
[docs]def sqlize(string): """SQL-izes a string to ensure the characters are all legal Args: string (str): The string to be processed Returns: the processed SQL-friendly string """ return string.upper().replace(' ', '_').replace('-', '_').replace(':', '_')
[docs]def get_var_type(value, sql=False): """Gets a Python type from a string variable Args: value (Union[int, float, str, datetime]): The variable to be type-checked sql (bool): Flag to decide if return type should be SQL-ized (e.g. int vs INT) Returns: Type of value passed in, either as SQL format ready for query of as Python type Examples: >>> get_var_type(123, sql=True) 'INT' """ for typ, test in TYPE_TESTS: try: test(value) return SQL_TYPE_DICT[typ] if sql else typ except (ValueError, AssertionError, TypeError): continue return 'TEXT' if sql else str
[docs]def increment_progress_display(processed=None, goal=100, start_time=None, downloaded=None, print_line=None, terminal_width=None): """Displays a progress bar to track data processing progress Args: processed (int, optional): the amount of processing done so far (e.g. number of iterations) goal (int): the total amount of processing to be done start_time (float, optional): the time at which the processing was started downloaded (float, optional): amount of data downloaded print_line (int, optional): the line of the terminal to print the progress bar on terminal_width (int, optional): width of terminal used for sizing progress bar Returns: If the processed arg is passed in, it is incremented by 1 for use in while loops etc. where the counter can be incremented as part of this function. Alternatively, None is returned. """ try: terminal_width = get_terminal_size()[0] if not terminal_width else terminal_width except OSError: terminal_width = 100 def output(): """Prints the progress bar to the terminal and increments processed parameter Returns: If the processed arg is passed in, it is incremented by 1 for use in while loops etc. where the counter can be incremented as part of this function. Alternatively, None is returned. """ if processed: progressbar_width = 64 if terminal_width > 74 else terminal_width - 10 progress = int(processed / (goal / progressbar_width)) stdout.write( '|' + '#' * progress + '-' * (progressbar_width - progress) + f'| {(processed / goal) * 100:.2f}% | ' + f'{processed}/{goal} items | ' ) if start_time: time_elapsed = time() - start_time stdout.write(f'Time Elapsed: {timedelta(seconds=int(time_elapsed))} | ') if processed: stdout.write(f'Time remaining: ' f'{timedelta(seconds=int((time_elapsed / processed) * (goal - processed)))}' f' | ') if downloaded: speed = f'{float((downloaded // (time() - start_time)) / 1000000):.3f}' stdout.write(f'Avg Speed: {speed} MB/s | ') if downloaded: stdout.write(f'Data processed: {downloaded / 1000000:.2f} MB') stdout.flush() return processed + 1 if processed is not None else None if not system() == WINDOWS and 'blessings' in modules: term = Terminal() with term.location(0, print_line): return output() else: return output()
[docs]def time_to_epoch(human_time=None, year=None, month=None, day=None, hour=None, minute=None, second=None): """Converts a time to an epoch timestamp It can take arguments of several different formats: - nothing can be passed, and the current epoch will be returned - each component part of the timestamp can be passed (e.g. year, month, day) - human_time is for more easily type-able time formats, e.g. 19700101120000 or 1970-01-01 12:00:00 Args: human_time (str): a more human-readable time to allow easier entry year (int): year to be converted month (int): month to be converted day (int): day to be converted hour (int): hour to be converted minute (int): minute to be converted second (int): second to be converted Returns: The time passed in (or the current time otherwise) as time since epoch in seconds """ year = datetime.now().year if not year else year month = datetime.now().month if not month else month day = datetime.now().day if not day else day hour = datetime.now().hour if not hour else hour minute = datetime.now().minute if not minute else minute second = datetime.now().second if not second else second if human_time: if isinstance(human_time, int) or get_var_type(human_time) == int: human_time_str = str(human_time) if len(human_time_str) == 8: # YYYYMMDD time_elem_list = [human_time_str[:4], human_time_str[4:6], human_time_str[6:8], '00', '00', '00'] elif len(human_time_str) == 12: # YYYYMMDDHHMM time_elem_list = [human_time_str[:4], human_time_str[4:6], human_time_str[6:8], human_time_str[8:10], human_time_str[10:], '00'] elif len(human_time_str) == 13: # Probably passed epoch time by accident return human_time elif len(human_time_str) == 14: # YYYYMMDDHHMMSS time_elem_list = [human_time_str[:4], human_time_str[4:6], human_time_str[6:8], human_time_str[8:10], human_time_str[10:12], human_time_str[12:]] else: raise ValueError(f'Invalid human_time passed: {human_time}\n' f'Use this format: YYYYMMDDHHMMSS | YYYY-MM-DD HH:MM:SS') else: time_elem_list = human_time.split('-')[:-1] + human_time.split('-')[-1].split()[:-1] + \ human_time.split('-')[-1].split()[-1].split(':') if not len(time_elem_list) == 6: raise ValueError(f'Invalid human_time passed: {human_time}\n' f'Use this format: YYYYMMDDHHMMSS | YYYY-MM-DD HH:MM:SS') str_year = time_elem_list[0] str_month = time_elem_list[1] str_day = time_elem_list[2] str_hour = time_elem_list[3] str_minute = time_elem_list[4] str_second = time_elem_list[5] else: str_year = str(year) str_month = str(month).rjust(2, '0') str_day = str(day).rjust(2, '0') str_hour = str(hour).rjust(2, '0') str_minute = str(minute).rjust(2, '0') str_second = str(second).rjust(2, '0') try: return int( datetime.strptime( f'{str_year} {str_month} {str_day} {str_hour} {str_minute} {str_second}', '%Y %m %d %H %M %S' ).timestamp() ) * 1000 except ValueError: raise ValueError( f'Invalid arguments passed to time_to_epoch function. Strings: ' f'{str_year} {str_month} {str_day} {str_hour} {str_minute} {str_second}' )
[docs]def get_col_types(data, sql=False): """Returns column headers and their types from a CSV or JSON file Args: data (Union[BufferedReader, BufferedWriter, TextIOWrapper, str, dict]): the file to be parsed sql (bool): flag to say whether the types should be in SQL dialect or not Returns: List: A list of two-element dictionaries (column name and value type). For example:: [{name: 'col1', type: 'typ1'}, {name: 'col2', type: 'typ2'}, {name: 'col3', type: 'typ3'}] The types can either be Python types or their SQL dialect counterparts (str vs 'TEXT') """ cols = None first_rows = [] try: with open(data) as f: for i, row in enumerate(f): if i == 0: cols = row.rstrip('\n').split(',') elif 0 < i < 20: first_rows.append(row.rstrip('\n').split(',')) else: break except TypeError: cols = list(data[0].keys()) first_rows = [None] * 20 i = 0 while i < len(first_rows): first_rows[i] = list(data[i].values()) i += 1 col_type_list = [] for i, col in enumerate(cols): type_found = False for row in first_rows: if not (row[i] == '' or row[i] is None): # pylint: disable=unsubscriptable-object type_found = True col_type_list.append( {'name': col, 'type': get_var_type(row[i], sql)} # pylint: disable=unsubscriptable-object ) break if not type_found: col_type_list.append({'name': col, 'type': get_var_type('', sql)}) return col_type_list