Advanced

Subclassing schemas

Subclassing schemas is a useful pattern for pipelines where every next function adds a few columns.

[1]:
from strictly_typed_pandas import DataSet


class SchemaA:
    name: str


class SchemaB(SchemaA):
    id: int


df = DataSet[SchemaA]({"name": ["John", "Jane", "Jack"]})


def foo(df: DataSet[SchemaA]) -> DataSet[SchemaB]:
    return df.assign(
        id=lambda df: range(df.shape[0]),
    ).pipe(DataSet[SchemaB])

Similarly, you can use it when merging (or joining or concatenating) two datasets together.

[2]:
class SchemaA:
    id: int
    name: str


class SchemaB:
    id: int
    job: str


class SchemaAB(SchemaA, SchemaB):
    pass


df1 = DataSet[SchemaA](
    {
        "id": [1, 2, 3],
        "name": ["John", "Jane", "Jack"],
    }
)
df2 = DataSet[SchemaB](
    {
        "id": [1, 2, 3],
        "job": "Data Scientist",
    }
)
df1.merge(df2, on="id").pipe(DataSet[SchemaAB])
[2]:
id name job
0 1 John Data Scientist
1 2 Jane Data Scientist
2 3 Jack Data Scientist

Creating an empty DataSet

Sometimes it’s useful to create a DataSet without any rows. This can be easily done as follows:

[3]:
class Schema:
    id: int
    name: str


DataSet[Schema]()
[3]:
id name

Support for numpy and pandas data types

We also support using numpy types and pandas types, as well as typing.Any. If you miss support for any other data type, drop us a line and we’ll see if we can add it!

[4]:
import numpy as np
import pandas as pd
from typing import Any


class Schema:
    name: pd.StringDtype
    money: np.float64
    eggs: np.int64
    potatoes: Any


df = DataSet[Schema](
    {
        "name": pd.Series(["John", "Jane", "Jack"], dtype="string"),
        "money": pd.Series([100.50, 1000.23, 123.45], dtype=np.float64),
        "eggs": pd.Series([1, 2, 3], dtype=np.int64),
        "potatoes": ["1", 0, np.nan],
    }
)

df.dtypes
[4]:
name        string[python]
money              float64
eggs                 int64
potatoes            object
dtype: object

IndexedDataSet

If you’d like to also strictly type the index, you can use the IndexedDataSet class.

[5]:
from strictly_typed_pandas import IndexedDataSet


class IndexSchema:
    id: int
    job: str


class DataSchema:
    name: str


df = (
    pd.DataFrame(
        {
            "id": [1, 2, 3],
            "name": ["John", "Jane", "Jack"],
            "job": "Data Scientist",
        }
    )
    .set_index(["id", "job"])
    .pipe(IndexedDataSet[IndexSchema, DataSchema])
)

Reusing a variable (e.g. df) with different schemas

Sometimes when building a pipeline, it’s useful to reuse a variable (e.g. df) with different schemas. If we do that in the following way however, we’ll get a mypy error.

[6]:
class SchemaA:
    name: str


class SchemaB(SchemaA):
    id: int


def foo(df: DataSet[SchemaA]) -> DataSet[SchemaB]:
    return df.assign(id=1).pipe(DataSet[SchemaB])


df = DataSet[SchemaA]({"name": ["John", "Jane", "Jack"]})
df = foo(df)
# mypy(error): Incompatible types in assignment (expression has type "DataSet[SchemaB]", variable has type "DataSet[SchemaA]")

To avoid this error, we need to declare that df will be of the type DataSet (implying the the schema may be different at different points)

[7]:
df: DataSet
df = DataSet[SchemaA]({"name": ["John", "Jane", "Jack"]})
df = foo(df)
[ ]: