Advanced
Subclassing schemas
Subclassing schemas is a useful pattern for pipelines where every next function adds a few columns.
[1]:
from strictly_typed_pandas import DataSet
class SchemaA:
name: str
class SchemaB(SchemaA):
id: int
df = DataSet[SchemaA]({"name": ["John", "Jane", "Jack"]})
def foo(df: DataSet[SchemaA]) -> DataSet[SchemaB]:
return df.assign(
id=lambda df: range(df.shape[0]),
).pipe(DataSet[SchemaB])
Similarly, you can use it when merging (or joining or concatenating) two datasets together.
[2]:
class SchemaA:
id: int
name: str
class SchemaB:
id: int
job: str
class SchemaAB(SchemaA, SchemaB):
pass
df1 = DataSet[SchemaA](
{
"id": [1, 2, 3],
"name": ["John", "Jane", "Jack"],
}
)
df2 = DataSet[SchemaB](
{
"id": [1, 2, 3],
"job": "Data Scientist",
}
)
df1.merge(df2, on="id").pipe(DataSet[SchemaAB])
[2]:
id | name | job | |
---|---|---|---|
0 | 1 | John | Data Scientist |
1 | 2 | Jane | Data Scientist |
2 | 3 | Jack | Data Scientist |
Creating an empty DataSet
Sometimes it’s useful to create a DataSet without any rows. This can be easily done as follows:
[3]:
class Schema:
id: int
name: str
DataSet[Schema]()
[3]:
id | name |
---|
Support for numpy and pandas data types
We also support using numpy types and pandas types, as well as typing.Any
. If you miss support for any other data type, drop us a line and we’ll see if we can add it!
[4]:
import numpy as np
import pandas as pd
from typing import Any
class Schema:
name: pd.StringDtype
money: np.float64
eggs: np.int64
potatoes: Any
df = DataSet[Schema](
{
"name": pd.Series(["John", "Jane", "Jack"], dtype="string"),
"money": pd.Series([100.50, 1000.23, 123.45], dtype=np.float64),
"eggs": pd.Series([1, 2, 3], dtype=np.int64),
"potatoes": ["1", 0, np.nan],
}
)
df.dtypes
[4]:
name string[python]
money float64
eggs int64
potatoes object
dtype: object
IndexedDataSet
If you’d like to also strictly type the index, you can use the IndexedDataSet class.
[5]:
from strictly_typed_pandas import IndexedDataSet
class IndexSchema:
id: int
job: str
class DataSchema:
name: str
df = (
pd.DataFrame(
{
"id": [1, 2, 3],
"name": ["John", "Jane", "Jack"],
"job": "Data Scientist",
}
)
.set_index(["id", "job"])
.pipe(IndexedDataSet[IndexSchema, DataSchema])
)
Reusing a variable (e.g. df
) with different schemas
Sometimes when building a pipeline, it’s useful to reuse a variable (e.g. df
) with different schemas. If we do that in the following way however, we’ll get a mypy error.
[6]:
class SchemaA:
name: str
class SchemaB(SchemaA):
id: int
def foo(df: DataSet[SchemaA]) -> DataSet[SchemaB]:
return df.assign(id=1).pipe(DataSet[SchemaB])
df = DataSet[SchemaA]({"name": ["John", "Jane", "Jack"]})
df = foo(df)
# mypy(error): Incompatible types in assignment (expression has type "DataSet[SchemaB]", variable has type "DataSet[SchemaA]")
To avoid this error, we need to declare that df
will be of the type DataSet
(implying the the schema may be different at different points)
[7]:
df: DataSet
df = DataSet[SchemaA]({"name": ["John", "Jane", "Jack"]})
df = foo(df)
[ ]: