Module boosted.api.api_util

Expand source code
# Copyright (C) 2020 Gradient Boosted Investments, Inc. - All Rights Reserved

import logging
from collections import OrderedDict

import pandas as pd
import datetime
from typing import Optional

# import numpy as np

from boosted.api.api_type import (
    DataSetConfig,
    ColumnConfig,
    StrategyConfig,
    ColumnRole,
    ColumnSubRole,
    ColumnValueType,
    DataSetType,
    DataSetSubType,
    DataSetFrequency,
    BoostedDataSetSchemaException,
)

logger = logging.getLogger("boosted.api.api_util")


def infer_dataset_schema(
    name,
    df,
    dataset_type,
    dataset_subtype=DataSetSubType.DENSE,
    dataset_frequency=DataSetFrequency.DAILY,
    infer_dataset_report_period=False,
    infer_from_column_names=False,
):
    # Sanity checks:
    # Time index
    if type(df.index) != pd.core.indexes.datetimes.DatetimeIndex:
        raise BoostedDataSetSchemaException("Index must be DatetimeIndex.")
    if len(df.columns) == 0:
        raise BoostedDataSetSchemaException("No feature columns exist.")

    datasetConfig = DataSetConfig(name, dataset_type, dataset_subtype, dataset_frequency)
    # More than two columns, one goal, one feature
    non_stock_identifier_seen = False

    def variable(name):
        return ColumnConfig(name=name, role=ColumnRole.VARIABLE, value_type=ColumnValueType.NUMBER)

    for i, c in enumerate(zip(df.columns.values, df.dtypes.values)):
        # process stock identifiers first, ensuring that they all lie grouped
        # at the front of the column list
        if (
            infer_from_column_names
            and dataset_type == DataSetType.STOCK
            and not non_stock_identifier_seen
        ):
            # make a good effort to match column names with identifiers by stripping out
            # punctuation and lowercasing.
            sub_role_match = ColumnSubRole.get_match(c[0])
            if sub_role_match:
                f = ColumnConfig(name=c[0], role=ColumnRole.IDENTIFIER, sub_role=sub_role_match)
                datasetConfig.addColumn(f)
                continue
            # end stock identifiers processing sequence as soon as we see something that's not
            # an identifier
            non_stock_identifier_seen = True

        if dataset_type == DataSetType.STRATEGY and i < 1:
            # Don't need to add the first column to the schema.
            # It is assumed to be the security identifier.
            strategy_column_name = c[0]
        elif (
            not infer_from_column_names
            and dataset_type == DataSetType.STOCK
            and (
                (dataset_subtype == DataSetSubType.DENSE and i < 3)
                or (
                    dataset_subtype in [DataSetSubType.SPARSE_HIST, DataSetSubType.SPARSE_FWD]
                    and i < 5
                )
            )
        ):
            if i == 0:
                f = ColumnConfig(name=c[0], role=ColumnRole.IDENTIFIER, sub_role=ColumnSubRole.ISIN)
            elif i == 1:
                f = ColumnConfig(
                    name=c[0], role=ColumnRole.IDENTIFIER, sub_role=ColumnSubRole.COUNTRY
                )
            elif i == 2:
                f = ColumnConfig(
                    name=c[0], role=ColumnRole.IDENTIFIER, sub_role=ColumnSubRole.CURRENCY
                )
            elif i == 3:
                f = ColumnConfig(
                    name=c[0], role=ColumnRole.IDENTIFIER, sub_role=ColumnSubRole.REPORT_DATE
                )
            elif i == 4:
                if infer_dataset_report_period:
                    f = variable(c[0])
                else:
                    f = ColumnConfig(
                        name=c[0], role=ColumnRole.IDENTIFIER, sub_role=ColumnSubRole.REPORT_PERIOD
                    )
            datasetConfig.addColumn(f)
        elif pd.api.types.is_numeric_dtype(c[1]):
            # Is numeric
            f = variable(c[0])
            datasetConfig.addColumn(f)
        else:
            logger.info(
                "Only numeric types are supported now."
                '  IGNORING THIS COLUMN. Name = "{0}", type = {1}.'.format(c[0], c[1])
            )
    if dataset_type == DataSetType.STRATEGY:
        strategies = list(OrderedDict.fromkeys(df[strategy_column_name].tolist()))
        for strategy in strategies:
            datasetConfig.addStrategy(StrategyConfig(name=strategy, source_name=strategy))
    return datasetConfig


def validate_start_and_end_dates(start_date: Optional[str], end_date: Optional[str]):
    start, end = None, None
    if start_date is not None:
        try:
            start = datetime.datetime.strptime(start_date, "%Y-%m-%d")
        except ValueError:
            raise ValueError("Start date must be a valid YYYY-MM-DD string.")
    if end_date is not None:
        try:
            end = datetime.datetime.strptime(end_date, "%Y-%m-%d")
        except ValueError:
            raise ValueError("End date must be a valid YYYY-MM-DD string.")
    if start is not None and end is not None and start_date > end_date:
        raise ValueError("Start date cannot be after end date!")


def estimateUploadBatchPeriod(df: pd.DataFrame):
    maxrows = 30000
    periods = ["Y", "M"]
    for p in periods:
        sizes = []
        for t in df.index.to_period(p).unique():
            print(t)
            sizes.append(len(df.loc[str(t)]))
        if max(sizes) < maxrows:
            return p
    # If we got here, no period worked.
    return None


def protoCubeJsonDataToDataFrame(
    pf: list, row_name: str, rows: list, column_name: str, columns: list, fields: list
):
    pc_list = []
    for row_idx, row in enumerate(rows):
        for col_idx, col in enumerate(columns):
            pc_list.append([row, col] + pf[row_idx]["columns"][col_idx]["fields"])
    df = pd.DataFrame(pc_list)
    df.set_axis([row_name, column_name] + fields, axis="columns", inplace=True)
    df.set_index([row_name, column_name], inplace=True)
    return df

Functions

def estimateUploadBatchPeriod(df: pandas.core.frame.DataFrame)
Expand source code
def estimateUploadBatchPeriod(df: pd.DataFrame):
    maxrows = 30000
    periods = ["Y", "M"]
    for p in periods:
        sizes = []
        for t in df.index.to_period(p).unique():
            print(t)
            sizes.append(len(df.loc[str(t)]))
        if max(sizes) < maxrows:
            return p
    # If we got here, no period worked.
    return None
def infer_dataset_schema(name, df, dataset_type, dataset_subtype=DENSE, dataset_frequency=DAILY, infer_dataset_report_period=False, infer_from_column_names=False)
Expand source code
def infer_dataset_schema(
    name,
    df,
    dataset_type,
    dataset_subtype=DataSetSubType.DENSE,
    dataset_frequency=DataSetFrequency.DAILY,
    infer_dataset_report_period=False,
    infer_from_column_names=False,
):
    # Sanity checks:
    # Time index
    if type(df.index) != pd.core.indexes.datetimes.DatetimeIndex:
        raise BoostedDataSetSchemaException("Index must be DatetimeIndex.")
    if len(df.columns) == 0:
        raise BoostedDataSetSchemaException("No feature columns exist.")

    datasetConfig = DataSetConfig(name, dataset_type, dataset_subtype, dataset_frequency)
    # More than two columns, one goal, one feature
    non_stock_identifier_seen = False

    def variable(name):
        return ColumnConfig(name=name, role=ColumnRole.VARIABLE, value_type=ColumnValueType.NUMBER)

    for i, c in enumerate(zip(df.columns.values, df.dtypes.values)):
        # process stock identifiers first, ensuring that they all lie grouped
        # at the front of the column list
        if (
            infer_from_column_names
            and dataset_type == DataSetType.STOCK
            and not non_stock_identifier_seen
        ):
            # make a good effort to match column names with identifiers by stripping out
            # punctuation and lowercasing.
            sub_role_match = ColumnSubRole.get_match(c[0])
            if sub_role_match:
                f = ColumnConfig(name=c[0], role=ColumnRole.IDENTIFIER, sub_role=sub_role_match)
                datasetConfig.addColumn(f)
                continue
            # end stock identifiers processing sequence as soon as we see something that's not
            # an identifier
            non_stock_identifier_seen = True

        if dataset_type == DataSetType.STRATEGY and i < 1:
            # Don't need to add the first column to the schema.
            # It is assumed to be the security identifier.
            strategy_column_name = c[0]
        elif (
            not infer_from_column_names
            and dataset_type == DataSetType.STOCK
            and (
                (dataset_subtype == DataSetSubType.DENSE and i < 3)
                or (
                    dataset_subtype in [DataSetSubType.SPARSE_HIST, DataSetSubType.SPARSE_FWD]
                    and i < 5
                )
            )
        ):
            if i == 0:
                f = ColumnConfig(name=c[0], role=ColumnRole.IDENTIFIER, sub_role=ColumnSubRole.ISIN)
            elif i == 1:
                f = ColumnConfig(
                    name=c[0], role=ColumnRole.IDENTIFIER, sub_role=ColumnSubRole.COUNTRY
                )
            elif i == 2:
                f = ColumnConfig(
                    name=c[0], role=ColumnRole.IDENTIFIER, sub_role=ColumnSubRole.CURRENCY
                )
            elif i == 3:
                f = ColumnConfig(
                    name=c[0], role=ColumnRole.IDENTIFIER, sub_role=ColumnSubRole.REPORT_DATE
                )
            elif i == 4:
                if infer_dataset_report_period:
                    f = variable(c[0])
                else:
                    f = ColumnConfig(
                        name=c[0], role=ColumnRole.IDENTIFIER, sub_role=ColumnSubRole.REPORT_PERIOD
                    )
            datasetConfig.addColumn(f)
        elif pd.api.types.is_numeric_dtype(c[1]):
            # Is numeric
            f = variable(c[0])
            datasetConfig.addColumn(f)
        else:
            logger.info(
                "Only numeric types are supported now."
                '  IGNORING THIS COLUMN. Name = "{0}", type = {1}.'.format(c[0], c[1])
            )
    if dataset_type == DataSetType.STRATEGY:
        strategies = list(OrderedDict.fromkeys(df[strategy_column_name].tolist()))
        for strategy in strategies:
            datasetConfig.addStrategy(StrategyConfig(name=strategy, source_name=strategy))
    return datasetConfig
def protoCubeJsonDataToDataFrame(pf: list, row_name: str, rows: list, column_name: str, columns: list, fields: list)
Expand source code
def protoCubeJsonDataToDataFrame(
    pf: list, row_name: str, rows: list, column_name: str, columns: list, fields: list
):
    pc_list = []
    for row_idx, row in enumerate(rows):
        for col_idx, col in enumerate(columns):
            pc_list.append([row, col] + pf[row_idx]["columns"][col_idx]["fields"])
    df = pd.DataFrame(pc_list)
    df.set_axis([row_name, column_name] + fields, axis="columns", inplace=True)
    df.set_index([row_name, column_name], inplace=True)
    return df
def validate_start_and_end_dates(start_date: Optional[str], end_date: Optional[str])
Expand source code
def validate_start_and_end_dates(start_date: Optional[str], end_date: Optional[str]):
    start, end = None, None
    if start_date is not None:
        try:
            start = datetime.datetime.strptime(start_date, "%Y-%m-%d")
        except ValueError:
            raise ValueError("Start date must be a valid YYYY-MM-DD string.")
    if end_date is not None:
        try:
            end = datetime.datetime.strptime(end_date, "%Y-%m-%d")
        except ValueError:
            raise ValueError("End date must be a valid YYYY-MM-DD string.")
    if start is not None and end is not None and start_date > end_date:
        raise ValueError("Start date cannot be after end date!")