Advanced
Subclassing schemas
Subclassing schemas is a useful pattern for pipelines where every next function adds a few columns.
[1]:
from strictly_typed_pandas import DataSet
class SchemaA:
name: str
class SchemaB(SchemaA):
id: int
df = DataSet[SchemaA]({"name": ["John", "Jane", "Jack"]})
def foo(df: DataSet[SchemaA]) -> DataSet[SchemaB]:
return df.assign(
id=lambda df: range(df.shape[0]),
).pipe(DataSet[SchemaB])
Similarly, you can use it when merging (or joining or concatenating) two datasets together.
[2]:
class SchemaA:
id: int
name: str
class SchemaB:
id: int
job: str
class SchemaAB(SchemaA, SchemaB):
pass
df1 = DataSet[SchemaA](
{
"id": [1, 2, 3],
"name": ["John", "Jane", "Jack"],
}
)
df2 = DataSet[SchemaB](
{
"id": [1, 2, 3],
"job": "Data Scientist",
}
)
df1.merge(df2, on="id").pipe(DataSet[SchemaAB])
[2]:
| id | name | job | |
|---|---|---|---|
| 0 | 1 | John | Data Scientist |
| 1 | 2 | Jane | Data Scientist |
| 2 | 3 | Jack | Data Scientist |
Creating an empty DataSet
Sometimes it’s useful to create a DataSet without any rows. This can be easily done as follows:
[3]:
class Schema:
id: int
name: str
DataSet[Schema]()
[3]:
| id | name |
|---|
Support for numpy and pandas data types
We also support using numpy types and pandas types, as well as typing.Any. If you miss support for any other data type, drop us a line and we’ll see if we can add it!
[4]:
import numpy as np
import pandas as pd
from typing import Any
class Schema:
name: pd.StringDtype
money: np.float64
eggs: np.int64
potatoes: Any
df = DataSet[Schema](
{
"name": pd.Series(["John", "Jane", "Jack"], dtype="string"),
"money": pd.Series([100.50, 1000.23, 123.45], dtype=np.float64),
"eggs": pd.Series([1, 2, 3], dtype=np.int64),
"potatoes": ["1", 0, np.nan],
}
)
df.dtypes
[4]:
name string[python]
money float64
eggs int64
potatoes object
dtype: object
IndexedDataSet
If you’d like to also strictly type the index, you can use the IndexedDataSet class.
[5]:
from strictly_typed_pandas import IndexedDataSet
class IndexSchema:
id: int
job: str
class DataSchema:
name: str
df = (
pd.DataFrame(
{
"id": [1, 2, 3],
"name": ["John", "Jane", "Jack"],
"job": "Data Scientist",
}
)
.set_index(["id", "job"])
.pipe(IndexedDataSet[IndexSchema, DataSchema])
)
Reusing a variable (e.g. df) with different schemas
Sometimes when building a pipeline, it’s useful to reuse a variable (e.g. df) with different schemas. If we do that in the following way however, we’ll get a mypy error.
[6]:
class SchemaA:
name: str
class SchemaB(SchemaA):
id: int
def foo(df: DataSet[SchemaA]) -> DataSet[SchemaB]:
return df.assign(id=1).pipe(DataSet[SchemaB])
df = DataSet[SchemaA]({"name": ["John", "Jane", "Jack"]})
df = foo(df)
# mypy(error): Incompatible types in assignment (expression has type "DataSet[SchemaB]", variable has type "DataSet[SchemaA]")
To avoid this error, we need to declare that df will be of the type DataSet (implying the the schema may be different at different points)
[7]:
df: DataSet
df = DataSet[SchemaA]({"name": ["John", "Jane", "Jack"]})
df = foo(df)
[ ]: