{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Advanced\n", "\n", "## Subclassing schemas\n", "\n", "Subclassing schemas is a useful pattern for pipelines where every next function adds a few columns." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from strictly_typed_pandas import DataSet\n", "\n", "\n", "class SchemaA:\n", " name: str\n", "\n", "\n", "class SchemaB(SchemaA):\n", " id: int\n", "\n", "\n", "df = DataSet[SchemaA]({\"name\": [\"John\", \"Jane\", \"Jack\"]})\n", "\n", "\n", "def foo(df: DataSet[SchemaA]) -> DataSet[SchemaB]:\n", " return df.assign(\n", " id=lambda df: range(df.shape[0]),\n", " ).pipe(DataSet[SchemaB])" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Similarly, you can use it when merging (or joining or concatenating) two datasets together." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "class SchemaA:\n", " id: int\n", " name: str\n", "\n", "\n", "class SchemaB:\n", " id: int\n", " job: str\n", "\n", "\n", "class SchemaAB(SchemaA, SchemaB):\n", " pass\n", "\n", "\n", "df1 = DataSet[SchemaA](\n", " {\n", " \"id\": [1, 2, 3],\n", " \"name\": [\"John\", \"Jane\", \"Jack\"],\n", " }\n", ")\n", "df2 = DataSet[SchemaB](\n", " {\n", " \"id\": [1, 2, 3],\n", " \"job\": \"Data Scientist\",\n", " }\n", ")\n", "df1.merge(df2, on=\"id\").pipe(DataSet[SchemaAB])" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Creating an empty DataSet\n", "Sometimes it's useful to create a DataSet without any rows. This can be easily done as follows:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "class Schema:\n", " id: int\n", " name: str\n", "\n", "\n", "DataSet[Schema]()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Support for numpy and pandas data types\n", "We also support using numpy types and pandas types, as well as `typing.Any`. If you miss support for any other data type, drop us a line and we'll see if we can add it!" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import numpy as np\n", "import pandas as pd\n", "from typing import Any\n", "\n", "\n", "class Schema:\n", " name: pd.StringDtype\n", " money: np.float64\n", " eggs: np.int64\n", " potatoes: Any\n", "\n", "\n", "df = DataSet[Schema](\n", " {\n", " \"name\": pd.Series([\"John\", \"Jane\", \"Jack\"], dtype=\"string\"),\n", " \"money\": pd.Series([100.50, 1000.23, 123.45], dtype=np.float64),\n", " \"eggs\": pd.Series([1, 2, 3], dtype=np.int64),\n", " \"potatoes\": [\"1\", 0, np.nan],\n", " }\n", ")\n", "\n", "df.dtypes" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# ClassVar variables\n", "\n", "Variables annotated with `typing.ClassVar` variables are not included in the schema, so these can be used for example to store metadata about the DataSet." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from typing import ClassVar\n", "\n", "class Schema:\n", " id: int\n", " name: str\n", " file_name: ClassVar[str] = \"schema_data.csv\"\n", "\n", "df1 = DataSet[Schema](\n", " {\n", " \"id\": [1, 2, 3],\n", " \"name\": [\"John\", \"Jane\", \"Jack\"],\n", " }\n", ")\n", "\n", "print(Schema.file_name)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## IndexedDataSet\n", "\n", "If you'd like to also strictly type the index, you can use the IndexedDataSet class." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from strictly_typed_pandas import IndexedDataSet\n", "\n", "\n", "class IndexSchema:\n", " id: int\n", " job: str\n", "\n", "\n", "class DataSchema:\n", " name: str\n", "\n", "\n", "df = (\n", " pd.DataFrame(\n", " {\n", " \"id\": [1, 2, 3],\n", " \"name\": [\"John\", \"Jane\", \"Jack\"],\n", " \"job\": \"Data Scientist\",\n", " }\n", " )\n", " .set_index([\"id\", \"job\"])\n", " .pipe(IndexedDataSet[IndexSchema, DataSchema])\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Reusing a variable (e.g. `df`) with different schemas\n", "Sometimes when building a pipeline, it's useful to reuse a variable (e.g. `df`) with different schemas. If we do that in the following way however, we'll get a mypy error." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "class SchemaA:\n", " name: str\n", "\n", "\n", "class SchemaB(SchemaA):\n", " id: int\n", "\n", "\n", "def foo(df: DataSet[SchemaA]) -> DataSet[SchemaB]:\n", " return df.assign(id=1).pipe(DataSet[SchemaB])\n", "\n", "\n", "df = DataSet[SchemaA]({\"name\": [\"John\", \"Jane\", \"Jack\"]})\n", "df = foo(df)\n", "# mypy(error): Incompatible types in assignment (expression has type \"DataSet[SchemaB]\", variable has type \"DataSet[SchemaA]\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "To avoid this error, we need to declare that `df` will be of the type `DataSet` (implying the the schema may be different at different points)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "df: DataSet\n", "df = DataSet[SchemaA]({\"name\": [\"John\", \"Jane\", \"Jack\"]})\n", "df = foo(df)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "interpreter": { "hash": "21955bae40816b58329a864495bd83642121ab031d49eff86d34b7b0569c6cea" }, "kernelspec": { "display_name": "Python 3.8.5 64-bit ('base': conda)", "name": "python3" }, "language_info": { "name": "python", "version": "" }, "orig_nbformat": 2 }, "nbformat": 4, "nbformat_minor": 2 }