Skip to content

Reference Documentation

This is where to find auto-generated Python API docs.

pydantic_kedro.PydanticAutoDataset

Bases: AbstractDataset[BaseModel, BaseModel]

Dataset for self-describing Pydantic models.

This allows fields with arbitrary types. When loading, it automatically detects the dataset type. When saving, it saves 'pure' models as YAML datasets, and arbitrary models as Zip datasets. This can be changed in the dataset object creation.

Example:
1
2
3
4
5
6
7
8
9
class MyModel(BaseModel):
    x: str

# using memory to avoid tempfile
ds_write = PydanticZipDataset('memory://path/to/model.zip')
ds_write.save(MyModel(x="example"))

ds_load = PydanticAutoDataset('memory://path/to/model.zip')
assert ds_load.load().x == "example"
Example:
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
class MyModel(BaseModel):
    x: str

# using memory to avoid tempfile
ds = PydanticAutoDataset('memory://path/to/model')
ds.save(MyModel(x="example"))  # selects YAML by default

ds2 = PydanticAutoDataset(
    'memory://path/to/model',
    default_format_pure="json",
    default_format_arbitrary="folder",
)
ds2.save(MyModel(x="example"))  # selects JSON
Source code in src/pydantic_kedro/datasets/auto.py
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
class PydanticAutoDataset(AbstractDataset[BaseModel, BaseModel]):
    """Dataset for self-describing Pydantic models.

    This allows fields with arbitrary types.
    When loading, it automatically detects the dataset type.
    When saving, it saves 'pure' models as YAML datasets, and arbitrary models as Zip datasets.
    This can be changed in the dataset object creation.

    Example:
    -------
    ```python
    class MyModel(BaseModel):
        x: str

    # using memory to avoid tempfile
    ds_write = PydanticZipDataset('memory://path/to/model.zip')
    ds_write.save(MyModel(x="example"))

    ds_load = PydanticAutoDataset('memory://path/to/model.zip')
    assert ds_load.load().x == "example"
    ```

    Example:
    -------
    ```python
    class MyModel(BaseModel):
        x: str

    # using memory to avoid tempfile
    ds = PydanticAutoDataset('memory://path/to/model')
    ds.save(MyModel(x="example"))  # selects YAML by default

    ds2 = PydanticAutoDataset(
        'memory://path/to/model',
        default_format_pure="json",
        default_format_arbitrary="folder",
    )
    ds2.save(MyModel(x="example"))  # selects JSON
    ```
    """

    def __init__(
        self,
        filepath: str,
        default_format_pure: Literal["yaml", "json", "zip", "folder"] = "yaml",
        default_format_arbitrary: Literal["zip", "folder"] = "zip",
    ) -> None:
        """Create a new instance of PydanticAutoDataset to load/save Pydantic models for given filepath.

        Args:
        ----
        filepath : The location of the Zip file.
        default_format_pure : Default format for saving "pure" models.
        default_format_arbitrary : Default format for saving "arbitrary" models.
        """
        assert default_format_pure in ["yaml", "json", "zip", "folder"]
        assert default_format_arbitrary in ["zip", "folder"]
        self._filepath = str(filepath)
        self._default_format_pure: Literal["yaml", "json", "zip", "folder"] = default_format_pure
        self._default_format_arbitrary: Literal["zip", "folder"] = default_format_arbitrary

    @property
    def filepath(self) -> str:
        """File path name."""
        return str(self._filepath)

    @property
    def default_format_pure(self) -> Literal["yaml", "json", "zip", "folder"]:
        """The default saving format used for 'pure' pydantic models."""
        return self._default_format_pure

    @property
    def default_format_arbitrary(self) -> Literal["zip", "folder"]:
        """The default saving format used for 'arbitrary' pydantic models."""
        return self._default_format_arbitrary

    def _get_ds(
        self, name: Literal["yaml", "json", "zip", "folder"]
    ) -> Union[PydanticYamlDataset, PydanticJsonDataset, PydanticFolderDataset, PydanticZipDataset]:
        """Map the format name to dataset type, and create it."""
        if name == "yaml":
            return PydanticYamlDataset(self.filepath)
        if name == "json":
            return PydanticJsonDataset(self.filepath)
        if name == "zip":
            return PydanticZipDataset(self.filepath)
        if name == "folder":
            return PydanticFolderDataset(self.filepath)
        raise ValueError(f"Unknown dataset keyword: {name}")

    def _load(self) -> BaseModel:
        """Load Pydantic model from the filepath.

        Returns
        -------
        Pydantic model.
        """
        filepath = self._filepath
        of = fsspec.open(filepath)
        fs: AbstractFileSystem = of.fs  # type: ignore
        _, path = get_protocol_and_path(filepath)

        # If it's a directory, try to open as a folder
        if fs.isdir(path):
            try:
                return PydanticFolderDataset(filepath).load()
            except Exception as exc:
                raise RuntimeError(
                    f"Path {filepath} is a directory, but failed to load PydanticFolderDataset from it."
                ) from exc

        # Try other datatsets
        # Yes, this looks hacky
        errors: list[Exception] = []
        try:
            return PydanticJsonDataset(filepath).load()
        except Exception as e1:
            errors.append(e1)

        try:
            return PydanticYamlDataset(filepath).load()
        except Exception as e2:
            errors.append(e2)

        try:
            return PydanticZipDataset(filepath).load()
        except Exception as e3:
            errors.append(e3)

        err_info = "\n".join([str(e) for e in errors])
        raise RuntimeError(f"Failed to load any dataset from the path {filepath!r}.\n{err_info}")

    def _save(self, data: BaseModel) -> None:
        """Save Pydantic model to the filepath."""
        try:
            self._get_ds(self.default_format_pure).save(data)
            return
        except Exception:
            pass
        self._get_ds(self.default_format_arbitrary).save(data)

    def _describe(self) -> Dict[str, Any]:
        return dict(
            filepath=self.filepath,
            default_format_pure=self.default_format_pure,
            default_format_arbitrary=self.default_format_arbitrary,
        )

default_format_arbitrary: Literal['zip', 'folder'] property

The default saving format used for 'arbitrary' pydantic models.

default_format_pure: Literal['yaml', 'json', 'zip', 'folder'] property

The default saving format used for 'pure' pydantic models.

filepath: str property

File path name.

pydantic_kedro.ArbModel

Bases: BaseModel

Base Pydantic Model with arbitrary types allowed in the config.

This also supports type hints for pydantic_kedro in the configuration:

  • kedro_map, which maps a type to a dataset constructor to use.
  • kedro_default, which specifies the default dataset type to use ([kedro_datasets.pickle.PickleDataset][])

These are pseudo-inherited, see config-inheritence. You do not actually need to inherit from ArbModel for this to work, however it can help with type completion in your IDE.

Source code in src/pydantic_kedro/models.py
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
class ArbModel(BaseModel):
    """Base Pydantic Model with arbitrary types allowed in the config.

    This also supports type hints for `pydantic_kedro` in the configuration:

    - `kedro_map`, which maps a type to a dataset constructor to use.
    - `kedro_default`, which specifies the default dataset type to use
      ([kedro_datasets.pickle.PickleDataset][])

    These are pseudo-inherited, see [config-inheritence][].
    You do not actually need to inherit from `ArbModel` for this to work, however it can help with
    type completion in your IDE.
    """

    Config = ArbConfig

pydantic_kedro.load_model(uri: str, supercls: Type[T] = BaseModel) -> T

Load a Pydantic model from a given URI.

Parameters:

Name Type Description Default
uri str

The path or URI to load the model from.

required
supercls type

Ensure that the loaded model is of this type. By default, this is just BaseModel.

BaseModel
Source code in src/pydantic_kedro/utils.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
def load_model(uri: str, supercls: Type[T] = BaseModel) -> T:  # type: ignore
    """Load a Pydantic model from a given URI.

    Parameters
    ----------
    uri : str
        The path or URI to load the model from.
    supercls : type
        Ensure that the loaded model is of this type.
        By default, this is just BaseModel.
    """
    ds = PydanticAutoDataset(filepath=uri)
    model = ds.load()
    if not isinstance(model, supercls):
        raise TypeError(f"Expected {supercls}, but got {type(model)}.")
    return model  # type: ignore

pydantic_kedro.save_model(model: BaseModel, uri: str, *, format: Literal['auto', 'zip', 'folder', 'yaml', 'json'] = 'auto') -> None

Save a Pydantic model to a given URI.

Parameters:

Name Type Description Default
model BaseModel

Pydantic model to save. This can be 'pure' (JSON-safe) or 'arbitrary'.

required
uri str

The path or URI to save the model to.

required
format (auto, zip, folder, yaml, json)

The dataset format to use. "auto" will use PydanticAutoDataset.

"auto"
Source code in src/pydantic_kedro/utils.py
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
def save_model(
    model: BaseModel,
    uri: str,
    *,
    format: Literal["auto", "zip", "folder", "yaml", "json"] = "auto",
) -> None:
    """Save a Pydantic model to a given URI.

    Parameters
    ----------
    model : BaseModel
        Pydantic model to save. This can be 'pure' (JSON-safe) or 'arbitrary'.
    uri : str
        The path or URI to save the model to.
    format : {"auto", "zip", "folder", "yaml", "json"}
        The dataset format to use.
        "auto" will use [PydanticAutoDataset][pydantic_kedro.PydanticAutoDataset].
    """
    if not isinstance(model, BaseModel):
        raise TypeError(f"Expected Pydantic model, but got {model!r}")
    ds: AbstractDataset
    if format == "auto":
        ds = PydanticAutoDataset(uri)
    elif format == "zip":
        ds = PydanticZipDataset(uri)
    elif format == "folder":
        ds = PydanticFolderDataset(uri)
    elif format == "yaml":
        ds = PydanticYamlDataset(uri)
    elif format == "json":
        ds = PydanticJsonDataset(uri)
    else:
        raise ValueError(
            f"Unknown dataset format {format}, "
            'expected one of: ["auto", "zip", "folder", "yaml", "json"]'
        )
    ds.save(model)

pydantic_kedro.PydanticJsonDataset

Bases: AbstractDataset[BaseModel, BaseModel]

Dataset for saving/loading Pydantic models, based on JSON.

Please note that the Pydantic model must be JSON-serializable. That means the fields are "pure" Pydantic fields, or you have added json_encoders to the model config.

Example:
1
2
3
4
5
6
class MyModel(BaseModel):
    x: str

ds = PydanticJsonDataset('memory://path/to/model.json')  # using memory to avoid tempfile
ds.save(MyModel(x="example"))
assert ds.load().x == "example"
Source code in src/pydantic_kedro/datasets/json.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
class PydanticJsonDataset(AbstractDataset[BaseModel, BaseModel]):
    """Dataset for saving/loading Pydantic models, based on JSON.

    Please note that the Pydantic model must be JSON-serializable.
    That means the fields are "pure" Pydantic fields,
    or you have added `json_encoders` to the model config.

    Example:
    -------
    ```python
    class MyModel(BaseModel):
        x: str

    ds = PydanticJsonDataset('memory://path/to/model.json')  # using memory to avoid tempfile
    ds.save(MyModel(x="example"))
    assert ds.load().x == "example"
    ```
    """

    def __init__(self, filepath: str) -> None:
        """Create a new instance of PydanticJsonDataset to load/save Pydantic models for given filepath.

        Args:
        ----
        filepath : The location of the JSON file.
        """
        # parse the path and protocol (e.g. file, http, s3, etc.)
        protocol, path = get_protocol_and_path(filepath)
        self._protocol = protocol
        self._filepath = PurePosixPath(path)
        self._fs: AbstractFileSystem = fsspec.filesystem(self._protocol)

    @property
    def filepath(self) -> str:
        """File path name."""
        return str(self._filepath)

    def _load(self) -> BaseModel:
        """Load Pydantic model from the filepath.

        Returns
        -------
        Pydantic model.
        """
        # using get_filepath_str ensures that the protocol and path
        # are appended correctly for different filesystems
        load_path = get_filepath_str(self._filepath, self._protocol)
        with self._fs.open(load_path, mode="r") as f:
            dct = json.load(f)
        assert isinstance(dct, dict), "JSON root must be a mapping."
        res = dict_to_model(dct)
        return res  # type: ignore

    @no_type_check
    def _save(self, data: BaseModel) -> None:
        """Save Pydantic model to the filepath."""
        # Open file and write to it
        save_path = get_filepath_str(self._filepath, self._protocol)

        # Ensure parent directory exists
        try:
            if "/" in save_path:
                parent_path, *_ = save_path.rsplit("/", maxsplit=1)
                self._fs.makedirs(parent_path, exist_ok=True)
        except Exception:
            warnings.warn(f"Failed to create parent path for {save_path}")

        with PatchPydanticIter():
            with self._fs.open(save_path, mode="w") as f:
                f.write(data.json())

    def _describe(self) -> Dict[str, Any]:
        """Return a dict that describes the attributes of the dataset."""
        return dict(filepath=self.filepath, protocol=self._protocol)

filepath: str property

File path name.

pydantic_kedro.PydanticYamlDataset

Bases: AbstractDataset[BaseModel, BaseModel]

Dataset for saving/loading Pydantic models, based on YAML.

Please note that the Pydantic model must be JSON-serializable. That means the fields are "pure" Pydantic fields, or you have added json_encoders to the model config.

Example:
1
2
3
4
5
6
class MyModel(BaseModel):
    x: str

ds = PydanticYamlDataset('memory://path/to/model.yaml')  # using memory to avoid tempfile
ds.save(MyModel(x="example"))
assert ds.load().x == "example"
Source code in src/pydantic_kedro/datasets/yaml.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
class PydanticYamlDataset(AbstractDataset[BaseModel, BaseModel]):
    """Dataset for saving/loading Pydantic models, based on YAML.

    Please note that the Pydantic model must be JSON-serializable.
    That means the fields are "pure" Pydantic fields,
    or you have added `json_encoders` to the model config.

    Example:
    -------
    ```python
    class MyModel(BaseModel):
        x: str

    ds = PydanticYamlDataset('memory://path/to/model.yaml')  # using memory to avoid tempfile
    ds.save(MyModel(x="example"))
    assert ds.load().x == "example"
    ```
    """

    def __init__(self, filepath: str) -> None:
        """Create a new instance of PydanticYamlDataset to load/save Pydantic models for given filepath.

        Args:
        ----
        filepath : The location of the YAML file.
        """
        # TODO: Update to just save the path and open it with `fsspec` directly
        # parse the path and protocol (e.g. file, http, s3, etc.)
        protocol, path = get_protocol_and_path(filepath)
        self._protocol = protocol
        self._filepath = PurePosixPath(path)
        self._fs: AbstractFileSystem = fsspec.filesystem(self._protocol)

    @property
    def filepath(self) -> str:
        """File path name."""
        return str(self._filepath)

    def _load(self) -> BaseModel:
        """Load Pydantic model from the filepath.

        Returns
        -------
        Pydantic model.
        """
        # using get_filepath_str ensures that the protocol and path
        # are appended correctly for different filesystems
        load_path = get_filepath_str(self._filepath, self._protocol)
        with self._fs.open(load_path, mode="r") as f:
            dct = yaml.safe_load(f)

        assert isinstance(dct, dict), "YAML root must be a mapping."
        res = dict_to_model(dct)
        return res  # type: ignore

    @no_type_check
    def _save(self, data: BaseModel) -> None:
        """Save Pydantic model to the filepath."""
        # Open file and write to it
        save_path = get_filepath_str(self._filepath, self._protocol)

        # Ensure parent directory exists
        try:
            if "/" in save_path:
                parent_path, *_ = save_path.rsplit("/", maxsplit=1)
                self._fs.makedirs(parent_path, exist_ok=True)
        except Exception:
            warnings.warn(f"Failed to create parent path for {save_path}")

        with PatchPydanticIter():
            with self._fs.open(save_path, mode="w") as f:
                to_yaml_file(f, data)

    def _describe(self) -> Dict[str, Any]:
        """Return a dict that describes the attributes of the dataset."""
        return dict(filepath=self.filepath, protocol=self._protocol)

filepath: str property

File path name.

pydantic_kedro.PydanticFolderDataset

Bases: AbstractDataset[BaseModel, BaseModel]

Dataset for saving/loading Pydantic models, based on saving sub-datasets in a folder.

This allows fields with arbitrary types.

Example:
1
2
3
4
5
6
class MyModel(BaseModel):
    x: str

ds = PydanticFolderDataset('memory://path/to/model')  # using in-memory to avoid tempfile
ds.save(MyModel(x="example"))
assert ds.load().x == "example"
Source code in src/pydantic_kedro/datasets/folder.py
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
class PydanticFolderDataset(AbstractDataset[BaseModel, BaseModel]):
    """Dataset for saving/loading Pydantic models, based on saving sub-datasets in a folder.

    This allows fields with arbitrary types.

    Example:
    -------
    ```python
    class MyModel(BaseModel):
        x: str

    ds = PydanticFolderDataset('memory://path/to/model')  # using in-memory to avoid tempfile
    ds.save(MyModel(x="example"))
    assert ds.load().x == "example"
    ```
    """

    def __init__(self, filepath: str) -> None:
        """Create a new instance of PydanticFolderDataset to load/save Pydantic models for given path.

        Args:
        ----
        filepath : The location of the folder.
        """
        self._filepath = filepath

    @property
    def filepath(self) -> str:
        """File path name."""
        return str(self._filepath)

    def _save(self, data: BaseModel) -> None:
        """Save Pydantic model to the filepath."""
        fs: AbstractFileSystem = fsspec.open(self._filepath).fs  # type: ignore
        if isinstance(fs, LocalFileSystem):
            self._save_local(data, self._filepath)
        else:
            from tempfile import TemporaryDirectory

            with TemporaryDirectory(prefix="pyd_kedro_") as tmpdir:
                self._save_local(data, tmpdir)
                # Copy to remote
                m_local = fsspec.get_mapper(tmpdir)
                m_remote = fsspec.get_mapper(self._filepath, create=True)
                for k, v in m_local.items():
                    m_remote[k] = v

            # Close (this might be required for some filesystems)
            try:
                fs.close()  # type: ignore
            except AttributeError:
                pass

    def _load(self) -> BaseModel:
        """Load Pydantic model from the filepath.

        Returns
        -------
        Pydantic model.
        """
        fs: AbstractFileSystem = fsspec.open(self._filepath).fs  # type: ignore
        if isinstance(fs, LocalFileSystem):
            return self._load_local(self._filepath)
        else:
            # Making a temp directory in the current cache dir location
            tmpdir = get_cache_dir() / str(uuid4()).replace("-", "")
            tmpdir.mkdir(exist_ok=False, parents=True)

            # Copy from remote... yes, this is not ideal!
            m_remote = fsspec.get_mapper(self._filepath)
            m_local = fsspec.get_mapper(str(tmpdir))
            for k, v in m_remote.items():
                m_local[k] = v

            # Load locally
            return self._load_local(str(tmpdir))

    def _load_local(self, filepath: str) -> BaseModel:
        """Load Pydantic model from the local filepath.

        Returns
        -------
        Pydantic model.
        """
        with fsspec.open(f"{filepath}/meta.json") as f:
            meta = FolderFormatMetadata.parse_raw(f.read())  # type: ignore

        # Ensure model type is importable
        model_cls = import_string(meta.model_class)
        assert issubclass(model_cls, BaseModel)

        # Check jsonpath? or maybe in validator?

        # Load data objects and mutate in-place
        model_data: Union[Dict[str, Any], List[Any]] = deepcopy(meta.model_info)
        for jsp_str, ds_spec in meta.catalog.items():
            jsp = jsp_str.split(".")[1:]
            ds_i = ds_spec.to_dataset(base_path=filepath)
            obj_i = ds_i.load()
            mutate_jsp(model_data, jsp, obj_i)

        res = dict_to_model(model_data)
        return res

    def _save_local(self, data: BaseModel, filepath: str) -> None:
        # Prepare fields for final metadata
        kls = type(data)
        model_class_str = get_import_name(kls)
        model_info: Union[Dict[str, Any], List[Any]] = {}
        catalog: Dict[JsonPath, KedroDatasetSpec] = {}

        # These are used to make datasets for various types
        # See the `kls.Config` class - this is inherited
        kedro_map: Dict[Type, Callable[[str], AbstractDataset]] = get_kedro_map(kls)
        kedro_default: Callable[[str], AbstractDataset] = get_kedro_default(kls)

        def make_ds_for(obj: Any, path: str) -> AbstractDataset:
            for k, v in kedro_map.items():
                if isinstance(obj, k):
                    return v(path)
            warnings.warn(
                f"No dataset defined for {get_import_name(type(obj))} in `Config.kedro_map`;"
                f" using `Config.kedro_default`: {kedro_default}"
            )
            return kedro_default(path)

        # We need to create `model_info` and `catalog`
        starter = str(uuid4()).replace("-", "")
        data_map: Dict[str, Any] = {}  # "starter_UUID" -> data_object

        def fake_encoder(obj: Any) -> Any:
            """Encode data objects as UUID strings, populating `data_map` as a side-effect."""
            try:
                return kls.__json_encoder__(obj)
            except TypeError:
                val = f"{starter}__{uuid4()}".replace("-", "")
                data_map[val] = obj
                return val

        # Roundtrip to apply the encoder and get UUID
        with PatchPydanticIter():
            rt = json.loads(data.json(encoder=fake_encoder))

        # This will map the data to a dataset and actually save it

        def visit3(obj: Any, jsp: str, base_path: str) -> Any:
            """Map the data to a dataset in `catalog` and actually saves it."""
            if isinstance(obj, str):
                if obj in data_map:
                    # We got a data point
                    data = data_map[obj]
                    # Make a dataset for it
                    full_path = f"{base_path}/{jsp}"
                    ds = make_ds_for(data, full_path)
                    # Get the spec (or fail because of non-JSON-able types...)
                    dss = KedroDatasetSpec.from_dataset(ds, jsp)
                    dss.json()  # to fail early
                    catalog[jsp] = dss  # add to catalog
                    # Save the data
                    ds.save(data)
                    # Return the spec in dict form
                    return DATA_PLACEHOLDER
            elif isinstance(obj, list):
                return [visit3(sub, f"{jsp}.{i}", base_path) for i, sub in enumerate(obj)]
            elif isinstance(obj, dict):
                return {k: visit3(v, f"{jsp}.{k}", base_path) for k, v in obj.items()}
            return obj

        # Ensure directory exists
        Path(filepath).mkdir(parents=True, exist_ok=True)

        model_info = visit3(rt, "", base_path=filepath)
        if not isinstance(model_info, dict):
            raise NotImplementedError("Only dict root is supported for now.")

        # Create and write metadata
        meta = FolderFormatMetadata(model_class=model_class_str, model_info=model_info, catalog=catalog)
        with fsspec.open(f"{filepath}/meta.json", mode="w") as f:
            f.write(meta.json())  # type: ignore

    def _describe(self) -> Dict[str, Any]:
        return dict(filepath=self.filepath)

filepath: str property

File path name.

pydantic_kedro.PydanticZipDataset

Bases: AbstractDataset[BaseModel, BaseModel]

Dataset for saving/loading Pydantic models, based on saving sub-datasets in a ZIP file.

This allows fields with arbitrary types.

Example:
1
2
3
4
5
6
class MyModel(BaseModel):
    x: str

ds = PydanticZipDataset('memory://path/to/model.zip')  # using memory to avoid tempfile
ds.save(MyModel(x="example"))
assert ds.load().x == "example"
Source code in src/pydantic_kedro/datasets/zip.py
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
class PydanticZipDataset(AbstractDataset[BaseModel, BaseModel]):
    """Dataset for saving/loading Pydantic models, based on saving sub-datasets in a ZIP file.

    This allows fields with arbitrary types.

    Example:
    -------
    ```python
    class MyModel(BaseModel):
        x: str

    ds = PydanticZipDataset('memory://path/to/model.zip')  # using memory to avoid tempfile
    ds.save(MyModel(x="example"))
    assert ds.load().x == "example"
    ```
    """

    def __init__(self, filepath: str) -> None:
        """Create a new instance of PydanticZipDataset to load/save Pydantic models for given filepath.

        Args:
        ----
        filepath : The location of the Zip file.
        """
        self._filepath = filepath  # NOTE: This is not checked when created.

    @property
    def filepath(self) -> str:
        """File path name."""
        return str(self._filepath)

    def _load(self) -> BaseModel:
        """Load Pydantic model from the filepath.

        Returns
        -------
        Pydantic model.
        """
        filepath = self._filepath
        # Making a temp directory in the current cache dir location
        tmpdir = get_cache_dir() / str(uuid4()).replace("-", "")
        tmpdir.mkdir(exist_ok=False, parents=True)
        m_local = fsspec.get_mapper(str(tmpdir))
        # Unzip via copying to folder
        with fsspec.open(filepath) as zip_file:
            zip_fs = ZipFileSystem(fo=zip_file)  # type: ignore
            m_zip = zip_fs.get_mapper()
            for k, v in m_zip.items():
                m_local[k] = v
            zip_fs.close()
        # Load folder dataset
        pfds = PydanticFolderDataset(str(tmpdir))
        res = pfds.load()
        return res

    def _save(self, data: BaseModel) -> None:
        """Save Pydantic model to the filepath."""
        filepath = self._filepath
        # Ensure parent directory exists
        try:
            if "/" in filepath:
                parent_path, *_ = filepath.rsplit("/", maxsplit=1)
                xfs = fsspec.open(filepath).fs
                xfs.makedirs(parent_path, exist_ok=True)
        except Exception:
            warnings.warn(f"Failed to create parent path for {filepath}")

        with TemporaryDirectory(prefix="pyd_kedro_") as tmpdir:
            # Save folder dataset
            pfds = PydanticFolderDataset(tmpdir)
            pfds.save(data)
            # Zip via copying to folder
            m_local = fsspec.get_mapper(tmpdir)
            with fsspec.open(filepath, mode="wb") as zip_file:
                zip_fs = ZipFileSystem(fo=zip_file, mode="w")  # type: ignore
                m_zip = zip_fs.get_mapper()
                for k, v in m_local.items():
                    m_zip[k] = v
                zip_fs.close()

    def _describe(self) -> Dict[str, Any]:
        return dict(filepath=self.filepath)

filepath: str property

File path name.