Skip to content

Mock Generator

cognite.pygen.utils.mock_generator

This module contains the mock generator for the pygen package. It can be used to generate mock nodes, edges, timeseries, sequences, and files for a given data model/views.

GeneratorFunction

Bases: Protocol, Generic[T_DataType]

Interface for a function that generates mock data.

Source code in cognite/pygen/utils/mock_generator.py
class GeneratorFunction(typing.Protocol, Generic[T_DataType]):
    """Interface for a function that generates mock data."""

    def __call__(self, count: int) -> list[T_DataType]:
        raise NotImplementedError()

    @classmethod
    def _repr_html_(cls) -> str:
        return """Interface for a function that generates mock data.<br />
        <br />
        <strong>Example:</strong><br />
        <code>
        def my_data_generator(count: int) -> list[T_DataType]:
            return ["".join(random.choices(string.ascii_lowercase + string.ascii_uppercase, k=7)) for _ in range(count)]
        </code>
        """

IDGeneratorFunction

Bases: Protocol

Interface for a function that generates mock data.

Source code in cognite/pygen/utils/mock_generator.py
class IDGeneratorFunction(typing.Protocol):
    """Interface for a function that generates mock data."""

    def __call__(self, view_id: dm.ViewId, count: int) -> list[str]:
        raise NotImplementedError()

    @classmethod
    def _repr_html_(cls) -> str:
        return """Interface for a function that generates NodeIDs<br />
        <br />
        <strong>Example:</strong><br />
        <code>
        def my_id_generator(view_id: dm.ViewId, count: int) -> list[str]:
            return [f"{view_id.external_id.casefold()}_{no}" for no in range(count)]
        </code>
        """

MockData

Bases: UserList[ViewMockData]

Mock data for a given data model.

Source code in cognite/pygen/utils/mock_generator.py
class MockData(UserList[ViewMockData]):
    """Mock data for a given data model."""

    @property
    def view_ids(self) -> list[dm.ViewId]:
        return [view_mock_data.view_id for view_mock_data in self]

    @property
    def nodes(self) -> dm.NodeApplyList:
        return dm.NodeApplyList([node for view_mock_data in self for node in view_mock_data.node if node])

    @property
    def writable_nodes(self) -> dm.NodeApplyList:
        return dm.NodeApplyList(
            [
                node
                for view_mock_data in self
                for node in (view_mock_data.node if view_mock_data.is_writeable else view_mock_data._node_only)
                if node
            ]
        )

    @property
    def edges(self) -> dm.EdgeApplyList:
        return dm.EdgeApplyList([edge for view_mock_data in self for edge in view_mock_data.edge if edge])

    @property
    def timeseries(self) -> TimeSeriesList:
        return TimeSeriesList([ts for view_mock_data in self for ts in view_mock_data.timeseries if ts])

    @property
    def sequences(self) -> SequenceList:
        return SequenceList([seq for view_mock_data in self for seq in view_mock_data.sequence if seq])

    @property
    def files(self) -> FileMetadataList:
        return FileMetadataList([file for view_mock_data in self for file in view_mock_data.file if file])

    @property
    def unique_timeseries(self) -> TimeSeriesList:
        return self._unique_resources(self.timeseries)

    @property
    def unique_sequences(self) -> SequenceList:
        return self._unique_resources(self.sequences)

    @property
    def unique_files(self) -> FileMetadataList:
        return self._unique_resources(self.files)

    @staticmethod
    def _unique_resources(resource_list: _T_ResourceList) -> _T_ResourceList:
        seen: set[str] = set()
        unique_resources = type(resource_list)([])
        for resource in resource_list:
            if resource.external_id in seen:
                continue
            if resource.external_id:
                seen.add(resource.external_id)
                unique_resources.append(resource)
        return unique_resources  # type: ignore[return-value]

    def dump_yaml(
        self, folder: Path | str, exclude: set[ResourceType | tuple[str, ResourceType] | str] | None = None
    ) -> None:
        """Dumps the mock data to a folder in yaml format.

        Args:
            folder (Path | str): The folder to dump the mock data to.
            exclude: The resources to exclude from the dump. Can either be a ResourceType,
                a view_external_id or a tuple of view_external_id and ResourceType.
        """
        for view_mock_data in self:
            exclude_view: set[str] | None
            if exclude:
                external_id = view_mock_data.view_id.external_id
                if external_id in exclude:
                    continue
                exclude_view = {
                    item if item in _ResourceTypes else item[1]  # type: ignore[misc]
                    for item in exclude
                    if (isinstance(item, tuple) and item[0] == external_id) or item in _ResourceTypes
                }
            else:
                exclude_view = None
            view_mock_data.dump_yaml(folder, exclude_view)  # type: ignore[arg-type]

    def deploy(
        self,
        client: CogniteClient,
        exclude: set[Literal["timeseries", "files", "sequences"]] | None = None,
        verbose: bool = False,
    ) -> None:
        """Deploys the mock data to CDF.

        This means calling the .apply() method for the instances (nodes and edges), and the .upsert() method for
        timeseries and sequences. Files are created using the .create() method.

        Args:
            client (CogniteClient): The client to use for deployment.
            exclude (set[Literal["timeseries", "files", "sequences"]]): The resources to exclude from deployment.
            verbose (bool): Whether to print information about the deployment.
        """
        nodes = self.writable_nodes
        edges = self.edges
        with _log_pygen_mock_call(client) as client:
            if self:
                instance_space = self[0].instance_space
                if client.data_modeling.spaces.retrieve(instance_space) is None:
                    client.data_modeling.spaces.apply(dm.SpaceApply(instance_space, name=instance_space))
                    if verbose:
                        print(f"Created space {instance_space}")

            if nodes or edges:
                # There is an 'edge' if there is an outward and inward edge on two views, we can get duplicated edges.
                # We should remove the duplicates.
                edges = dm.EdgeApplyList({edge.as_id(): edge for edge in edges}.values())

                created = client.data_modeling.instances.apply(
                    nodes,
                    edges,
                    auto_create_start_nodes=True,
                    auto_create_end_nodes=True,
                    auto_create_direct_relations=True,
                )
                if verbose:
                    print(
                        f"Created {sum(1 for n in created.nodes if n.was_modified)} nodes "
                        f"and {sum(1 for e in created.edges if e.was_modified)} edges"
                    )
            if (timeseries := self.unique_timeseries) and (exclude is None or "timeseries" not in exclude):
                client.time_series.upsert(timeseries)
                if verbose:
                    print(f"Created/Updated {len(timeseries)} timeseries")
            if (sequences := self.unique_sequences) and (exclude is None or "sequences" not in exclude):
                client.sequences.upsert(sequences)
                if verbose:
                    print(f"Created/Updated {len(sequences)} sequences")
            if (files := self.unique_files) and (exclude is None or "files" not in exclude):
                existing = set(
                    client.files.retrieve_multiple(
                        external_ids=files.as_external_ids(), ignore_unknown_ids=True
                    ).as_external_ids()
                )
                new_files = FileMetadataList([file for file in files if file.external_id not in existing])
                for file in new_files:
                    client.files.create(file)
                if verbose:
                    print(f"Created {len(new_files)} files")

    def clean(self, client: CogniteClient, delete_space: bool = False, verbose: bool = False) -> None:
        """Cleans the mock data from CDF.

        This means calling the .delete() method for the instances (nodes and edges), timeseries, sequences, and files.

        Args:
            client: The client to use for cleaning.
            delete_space: Whether to delete the instance space.
            verbose: Whether to print information about the cleaning.
        """
        nodes = self.nodes
        edges = self.edges
        with _log_pygen_mock_call(client) as client:
            if nodes or edges:
                client.data_modeling.instances.delete(nodes.as_ids(), edges.as_ids())
                if verbose:
                    print(f"Deleted {len(nodes)} nodes and {len(edges)} edges ")
            if timeseries := self.unique_timeseries:
                client.time_series.delete(external_id=timeseries.as_external_ids(), ignore_unknown_ids=True)
                if verbose:
                    print(f"Deleted {len(timeseries)} timeseries")
            if sequences := self.unique_sequences:
                client.sequences.delete(external_id=sequences.as_external_ids(), ignore_unknown_ids=True)
                if verbose:
                    print(f"Deleted {len(sequences)} sequences")
            if files := self.unique_files:
                try:
                    client.files.delete(external_id=files.as_external_ids())
                except CogniteNotFoundError as e:
                    not_existing = {file["externalId"] for file in e.not_found}
                    files = FileMetadataList([file for file in files if file.external_id not in not_existing])
                    client.files.delete(external_id=files.as_external_ids())
                if verbose:
                    print(f"Deleted {len(files)} files")

            if self and delete_space:
                instance_space = self[0].instance_space
                client.data_modeling.spaces.delete(instance_space)
                if verbose:
                    print(f"Deleted space {instance_space}")

    def _repr_html_(self) -> str:
        table = pd.DataFrame(
            [
                {
                    "resource": "node",
                    "count": len(self.nodes),
                },
                {
                    "resource": "edge",
                    "count": len(self.edges),
                },
                {
                    "resource": "timeseries",
                    "count": len(self.timeseries),
                },
                {
                    "resource": "sequence",
                    "count": len(self.sequences),
                },
                {
                    "resource": "file",
                    "count": len(self.files),
                },
            ]
        )

        return table._repr_html_()  # type: ignore[operator]

clean(client, delete_space=False, verbose=False)

Cleans the mock data from CDF.

This means calling the .delete() method for the instances (nodes and edges), timeseries, sequences, and files.

Parameters:

Name Type Description Default
client CogniteClient

The client to use for cleaning.

required
delete_space bool

Whether to delete the instance space.

False
verbose bool

Whether to print information about the cleaning.

False
Source code in cognite/pygen/utils/mock_generator.py
def clean(self, client: CogniteClient, delete_space: bool = False, verbose: bool = False) -> None:
    """Cleans the mock data from CDF.

    This means calling the .delete() method for the instances (nodes and edges), timeseries, sequences, and files.

    Args:
        client: The client to use for cleaning.
        delete_space: Whether to delete the instance space.
        verbose: Whether to print information about the cleaning.
    """
    nodes = self.nodes
    edges = self.edges
    with _log_pygen_mock_call(client) as client:
        if nodes or edges:
            client.data_modeling.instances.delete(nodes.as_ids(), edges.as_ids())
            if verbose:
                print(f"Deleted {len(nodes)} nodes and {len(edges)} edges ")
        if timeseries := self.unique_timeseries:
            client.time_series.delete(external_id=timeseries.as_external_ids(), ignore_unknown_ids=True)
            if verbose:
                print(f"Deleted {len(timeseries)} timeseries")
        if sequences := self.unique_sequences:
            client.sequences.delete(external_id=sequences.as_external_ids(), ignore_unknown_ids=True)
            if verbose:
                print(f"Deleted {len(sequences)} sequences")
        if files := self.unique_files:
            try:
                client.files.delete(external_id=files.as_external_ids())
            except CogniteNotFoundError as e:
                not_existing = {file["externalId"] for file in e.not_found}
                files = FileMetadataList([file for file in files if file.external_id not in not_existing])
                client.files.delete(external_id=files.as_external_ids())
            if verbose:
                print(f"Deleted {len(files)} files")

        if self and delete_space:
            instance_space = self[0].instance_space
            client.data_modeling.spaces.delete(instance_space)
            if verbose:
                print(f"Deleted space {instance_space}")

deploy(client, exclude=None, verbose=False)

Deploys the mock data to CDF.

This means calling the .apply() method for the instances (nodes and edges), and the .upsert() method for timeseries and sequences. Files are created using the .create() method.

Parameters:

Name Type Description Default
client CogniteClient

The client to use for deployment.

required
exclude set[Literal['timeseries', 'files', 'sequences']]

The resources to exclude from deployment.

None
verbose bool

Whether to print information about the deployment.

False
Source code in cognite/pygen/utils/mock_generator.py
def deploy(
    self,
    client: CogniteClient,
    exclude: set[Literal["timeseries", "files", "sequences"]] | None = None,
    verbose: bool = False,
) -> None:
    """Deploys the mock data to CDF.

    This means calling the .apply() method for the instances (nodes and edges), and the .upsert() method for
    timeseries and sequences. Files are created using the .create() method.

    Args:
        client (CogniteClient): The client to use for deployment.
        exclude (set[Literal["timeseries", "files", "sequences"]]): The resources to exclude from deployment.
        verbose (bool): Whether to print information about the deployment.
    """
    nodes = self.writable_nodes
    edges = self.edges
    with _log_pygen_mock_call(client) as client:
        if self:
            instance_space = self[0].instance_space
            if client.data_modeling.spaces.retrieve(instance_space) is None:
                client.data_modeling.spaces.apply(dm.SpaceApply(instance_space, name=instance_space))
                if verbose:
                    print(f"Created space {instance_space}")

        if nodes or edges:
            # There is an 'edge' if there is an outward and inward edge on two views, we can get duplicated edges.
            # We should remove the duplicates.
            edges = dm.EdgeApplyList({edge.as_id(): edge for edge in edges}.values())

            created = client.data_modeling.instances.apply(
                nodes,
                edges,
                auto_create_start_nodes=True,
                auto_create_end_nodes=True,
                auto_create_direct_relations=True,
            )
            if verbose:
                print(
                    f"Created {sum(1 for n in created.nodes if n.was_modified)} nodes "
                    f"and {sum(1 for e in created.edges if e.was_modified)} edges"
                )
        if (timeseries := self.unique_timeseries) and (exclude is None or "timeseries" not in exclude):
            client.time_series.upsert(timeseries)
            if verbose:
                print(f"Created/Updated {len(timeseries)} timeseries")
        if (sequences := self.unique_sequences) and (exclude is None or "sequences" not in exclude):
            client.sequences.upsert(sequences)
            if verbose:
                print(f"Created/Updated {len(sequences)} sequences")
        if (files := self.unique_files) and (exclude is None or "files" not in exclude):
            existing = set(
                client.files.retrieve_multiple(
                    external_ids=files.as_external_ids(), ignore_unknown_ids=True
                ).as_external_ids()
            )
            new_files = FileMetadataList([file for file in files if file.external_id not in existing])
            for file in new_files:
                client.files.create(file)
            if verbose:
                print(f"Created {len(new_files)} files")

dump_yaml(folder, exclude=None)

Dumps the mock data to a folder in yaml format.

Parameters:

Name Type Description Default
folder Path | str

The folder to dump the mock data to.

required
exclude set[ResourceType | tuple[str, ResourceType] | str] | None

The resources to exclude from the dump. Can either be a ResourceType, a view_external_id or a tuple of view_external_id and ResourceType.

None
Source code in cognite/pygen/utils/mock_generator.py
def dump_yaml(
    self, folder: Path | str, exclude: set[ResourceType | tuple[str, ResourceType] | str] | None = None
) -> None:
    """Dumps the mock data to a folder in yaml format.

    Args:
        folder (Path | str): The folder to dump the mock data to.
        exclude: The resources to exclude from the dump. Can either be a ResourceType,
            a view_external_id or a tuple of view_external_id and ResourceType.
    """
    for view_mock_data in self:
        exclude_view: set[str] | None
        if exclude:
            external_id = view_mock_data.view_id.external_id
            if external_id in exclude:
                continue
            exclude_view = {
                item if item in _ResourceTypes else item[1]  # type: ignore[misc]
                for item in exclude
                if (isinstance(item, tuple) and item[0] == external_id) or item in _ResourceTypes
            }
        else:
            exclude_view = None
        view_mock_data.dump_yaml(folder, exclude_view)  # type: ignore[arg-type]

MockGenerator

Mock generator for the pygen package. It can be used to generate mock nodes, edges, timeseries, sequences, and files for a given data model/views.

Parameters:

Name Type Description Default
views List[View]

The views to generate mock data for.

required
instance_space str

The space to use for the generated nodes and edges.

required
view_configs dict[ViewId, ViewMockConfig]

Configuration for how to generate mock data for the different views. The keys are the view ids, and the values are the configuration for the view.

None
default_config ViewMockConfig

Default configuration for how to generate mock data for the different views.

None
data_set_id int

The data set id to use for TimeSeries, Sequences, and FileMetadata.

None
seed int

The seed to use for the random number generator.

None
skip_interfaces bool

Whether to skip interfaces when generating mock data. Defaults to False.

False
Source code in cognite/pygen/utils/mock_generator.py
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
class MockGenerator:
    """Mock generator for the pygen package. It can be used to generate mock nodes, edges, timeseries,
    sequences, and files for a given data model/views.

    Args:
        views (List[View]): The views to generate mock data for.
        instance_space (str): The space to use for the generated nodes and edges.
        view_configs (dict[ViewId, ViewMockConfig]): Configuration for how to generate mock data for the different
            views. The keys are the view ids, and the values are the configuration for the view.
        default_config (ViewMockConfig): Default configuration for how to generate mock data for the different
            views.
        data_set_id (int): The data set id to use for TimeSeries, Sequences, and FileMetadata.
        seed (int): The seed to use for the random number generator.
        skip_interfaces (bool): Whether to skip interfaces when generating mock data. Defaults to False.
    """

    def __init__(
        self,
        views: typing.Sequence[dm.View],
        instance_space: str,
        view_configs: dict[dm.ViewId, ViewMockConfig] | None = None,
        default_config: ViewMockConfig | None = None,
        data_set_id: int | None = None,
        seed: int | None = None,
        skip_interfaces: bool = False,
    ):
        self._views = dm.ViewList(views)
        self._instance_space = instance_space
        self._view_configs = view_configs or {}
        self._default_config = default_config or ViewMockConfig()
        self._data_set_id = data_set_id
        self._seed = seed
        self._skip_interfaces = skip_interfaces
        self._interfaces: set[dm.ViewId] = set()

    def __str__(self):
        args = [
            f"view_count={len(self._views)}",
            f"instance_space={self._instance_space}",
        ]
        if self._view_configs:
            args.append(f"custom_config_cont={len(self._view_configs)}")
        if self._default_config == ViewMockConfig():
            args.append("default_config=True")
        else:
            args.append("default_config=False")
        if self._data_set_id:
            args.append(f"data_set_id={self._data_set_id}")
        if self._seed:
            args.append(f"seed={self._seed}")

        return f"MockGenerator({', '.join(args)})"

    def _repr_html_(self) -> str:
        return str(self)

    @classmethod
    def from_data_model(
        cls,
        data_model_id: DataModelIdentifier,
        instance_space: str,
        client: CogniteClient,
        data_set_id: int | None = None,
        seed: int | None = None,
    ) -> MockGenerator:
        """Creates a MockGenerator from a data model.

        Args:
            data_model_id: Identifier of the data model to generate mock data for.
            instance_space: The space to use for the generated nodes and edges.
            client: An instance of the CogniteClient class.
            data_set_id: The data set id to use for TimeSeries, Sequences, and FileMetadata.
            seed: The seed to use for the random number generator.

        Returns:
            MockGenerator: The mock generator.

        """
        with _log_pygen_mock_call(client) as client:
            data_model = client.data_modeling.data_models.retrieve(
                ids=data_model_id,
                inline_views=True,
            ).latest_version()

        return cls(
            views=data_model.views,
            instance_space=instance_space,
            data_set_id=data_set_id,
            seed=seed,
        )

    def generate_mock_data(
        self, node_count: int = 5, max_edge_per_type: int = 5, null_values: float = 0.25
    ) -> MockData:
        """Generates mock data for the given data model/views.


        Args:
            node_count: The number of nodes to generate for each view.
            max_edge_per_type: The maximum number of edges to generate for each edge type.
            null_values: The probability of generating a null value for a nullable properties.

        Returns:
            MockData: The generated mock data.
        """
        if self._seed:
            random.seed(self._seed)
            for config in itertools.chain(self._view_configs.values(), [self._default_config]):
                for generator in config.property_types.values():
                    if hasattr(generator, "reset") and isinstance(generator.reset, Callable):  # type: ignore[arg-type]
                        # This is for generators that have a state.
                        generator.reset()

        self._interfaces = {interface for view in self._views for interface in view.implements or []}
        mock_data = MockData()
        for components in _connected_views(self._views):
            data = self._generate_views_mock_data(components, node_count, max_edge_per_type, null_values)
            mock_data.extend(data)
        return mock_data

    def _generate_views_mock_data(self, views: list[dm.View], node_count, max_edge_per_type, null_values) -> MockData:
        outputs = self._generate_mock_nodes(views, node_count, null_values)
        self._generate_mock_connections(views, outputs, max_edge_per_type, null_values)
        return MockData(outputs.values())

    def _generate_mock_nodes(
        self, views: list[dm.View], default_node_count: int, default_nullable_fraction: float
    ) -> dict[dm.ViewId, ViewMockData]:
        output: dict[dm.ViewId, ViewMockData] = {}
        for view in sorted(views, key=lambda v: v.as_id().as_tuple()):
            if self._skip_interfaces and view.as_id() in self._interfaces:
                continue
            mapped_properties = {
                name: prop
                for name, prop in view.properties.items()
                if isinstance(prop, dm.MappedProperty) and not isinstance(prop.type, dm.DirectRelation)
            }

            node_type = _find_first_node_type(view.filter)

            view_id = view.as_id()
            config = self._view_configs.get(view_id, self._default_config)
            properties, external = self._generate_mock_values(
                mapped_properties,
                config,
                view.as_id(),
                default_node_count,
                default_nullable_fraction,
            )
            node_ids = config.node_id_generator(view_id, config.node_count or default_node_count)

            nodes = [
                dm.NodeApply(
                    space=self._instance_space,
                    external_id=node_id,
                    type=node_type,
                    sources=(
                        [
                            dm.NodeOrEdgeData(
                                source=view.as_id(),
                                properties=dict(zip(properties.keys(), props)),
                            )
                        ]
                        if props
                        else None
                    ),
                )
                for node_id, *props in zip(node_ids, *properties.values())
            ]
            output[view.as_id()] = ViewMockData(
                view.as_id(),
                instance_space=self._instance_space,
                is_writeable=view.writable,
                node=dm.NodeApplyList(nodes),
                timeseries=TimeSeriesList(external.timeseries),
                sequence=SequenceList(external.sequence),
                file=FileMetadataList(external.file),
            )
        return output

    def _generate_mock_connections(
        self,
        views: list[dm.View],
        outputs: dict[dm.ViewId, ViewMockData],
        default_max_edge_count: int,
        default_nullable_fraction: float,
    ) -> None:
        leaf_children_by_parent = self._to_leaf_children_by_parent(views)
        for view in sorted(views, key=lambda v: v.as_id().as_tuple()):
            if self._skip_interfaces and view.as_id() in self._interfaces:
                continue
            connection_properties = {
                name: prop
                for name, prop in view.properties.items()
                if (isinstance(prop, dm.MappedProperty) and isinstance(prop.type, dm.DirectRelation))
                or isinstance(prop, dm.ConnectionDefinition)
            }
            if not connection_properties:
                continue
            view_id = view.as_id()
            config = self._view_configs.get(view_id, self._default_config)
            for this_node in outputs[view_id].node:
                for property_name, connection in connection_properties.items():
                    if (
                        isinstance(connection, (MultiEdgeConnection, dm.MappedProperty))
                        and connection.source is not None
                        and connection.source not in outputs
                        and connection.source not in leaf_children_by_parent
                    ):
                        warnings.warn(
                            f"{view_id} property {property_name!r} points to a view {connection.source} "
                            f"which is not in the data model. Skipping connection generation.",
                            stacklevel=2,
                        )
                        continue

                    if isinstance(connection, EdgeConnection):
                        other_nodes = self.get_other_nodes(connection.source, outputs, leaf_children_by_parent)
                        if isinstance(connection, SingleEdgeConnection):
                            max_edge_count = 1
                        else:  # MultiEdgeConnection
                            max_edge_count = config.max_edge_per_type or default_max_edge_count
                        max_edge_count = min(max_edge_count, len(other_nodes))
                        edges = self._create_edges(connection, this_node.as_id(), other_nodes, max_edge_count)
                        outputs[view_id].edge.extend(edges)
                    elif isinstance(connection, dm.MappedProperty) and isinstance(connection.type, dm.DirectRelation):
                        if not connection.source:
                            warnings.warn(
                                f"View {view_id}: DirectRelation {property_name} is missing source, "
                                "do not know the target view the direct relation points to",
                                stacklevel=2,
                            )
                            continue
                        other_nodes = self.get_other_nodes(connection.source, outputs, leaf_children_by_parent)

                        # If the connection is nullable, we randomly decide if we should create the relation
                        create_relation = not connection.nullable or random.random() < (
                            1 - (config.null_values or default_nullable_fraction)
                        )
                        if not (create_relation and other_nodes):
                            continue
                        if connection.type.is_list:
                            max_edge_count = config.max_edge_per_type or default_max_edge_count
                        else:
                            max_edge_count = 1
                        other_nodes = random.sample(other_nodes, k=randint(1, max_edge_count))
                        values = [
                            {"space": other_node.space, "externalId": other_node.external_id}
                            for other_node in other_nodes
                        ]
                        value: dict | list[dict] = values if connection.type.is_list else values[0]
                        self._set_direct_relation_property(this_node, view_id, property_name, value)
                    elif isinstance(connection, ReverseDirectRelation):
                        continue
                    else:
                        warnings.warn(
                            f"View {view_id}: Connection {type(connection)} used by {property_name} "
                            f"is not supported by the {type(self).__name__}.",
                            stacklevel=2,
                        )

    def _generate_mock_values(
        self,
        properties: dict[str, dm.MappedProperty],
        config: ViewMockConfig,
        view_id: dm.ViewId,
        default_node_count: int,
        default_nullable_fraction: float,
    ) -> tuple[dict[str, typing.Sequence[ListAbleDataType]], ViewMockData]:
        output: dict[str, typing.Sequence[ListAbleDataType]] = {}
        external = ViewMockData(view_id, self._instance_space)
        values: typing.Sequence[ListAbleDataType]
        for name, prop in properties.items():
            if name in config.properties:
                generator = config.properties[name]
            elif type(prop.type) in config.property_types:
                generator = config.property_types[type(prop.type)]
            else:
                raise ValueError(f"Could not generate mock data for property {name} of type {type(prop.type)}")

            config_node_count = config.node_count or default_node_count
            config_null_values = config.null_values or default_nullable_fraction

            null_values = int(prop.nullable and config_node_count * config_null_values)
            node_count = config_node_count - null_values
            if isinstance(prop.type, PropertyType) and prop.type.is_list:
                values = [generator(random.randint(0, 5)) for _ in range(node_count)] + [None] * null_values
            else:
                values = generator(config_node_count - null_values) + [None] * null_values

            if null_values and isinstance(values, list):
                random.shuffle(values)

            output[name] = values
            if isinstance(prop.type, dm.TimeSeriesReference):
                external.timeseries.extend(
                    [
                        TimeSeries(
                            external_id=ts,
                            name=ts,
                            data_set_id=self._data_set_id,
                            is_step=False,
                            is_string=False,
                            metadata={
                                "source": f"Pygen{type(self).__name__}",
                            },
                        )
                        for timeseries_set in values
                        for ts in (
                            cast(list[str], timeseries_set)
                            if isinstance(timeseries_set, list)
                            else [cast(str, timeseries_set)]
                        )
                        if ts
                    ]
                )
            elif isinstance(prop.type, dm.FileReference):
                external.file.extend(
                    [
                        FileMetadata(
                            external_id=file,
                            name=file,
                            source=self._instance_space,
                            data_set_id=self._data_set_id,
                            mime_type="text/plain",
                            metadata={
                                "source": f"Pygen{type(self).__name__}",
                            },
                        )
                        for file_set in values
                        for file in (cast(list[str], file_set) if isinstance(file_set, list) else [cast(str, file_set)])
                        if file
                    ]
                )
            elif isinstance(prop.type, dm.SequenceReference):
                external.sequence.extend(
                    [
                        Sequence(
                            external_id=seq,
                            name=seq,
                            data_set_id=self._data_set_id,
                            columns=[
                                SequenceColumn(
                                    external_id="value",
                                    value_type=cast(Literal["Double"], "DOUBLE"),
                                    metadata={
                                        "source": f"Pygen{type(self).__name__}",
                                    },
                                )
                            ],
                            metadata={
                                "source": f"Pygen{type(self).__name__}",
                            },
                        )
                        for seq_set in values
                        for seq in (cast(list[str], seq_set) if isinstance(seq_set, list) else [cast(str, seq_set)])
                        if seq
                    ]
                )

        return output, external

    @staticmethod
    def get_other_nodes(
        connection: dm.ViewId,
        outputs: dict[dm.ViewId, ViewMockData],
        leaf_children_by_parent: dict[dm.ViewId, list[dm.ViewId]],
    ) -> list[dm.NodeId]:
        if connection in leaf_children_by_parent:
            sources: list[dm.NodeId] = []
            for child in leaf_children_by_parent[connection]:
                sources.extend(outputs[child].node.as_ids())
        else:
            sources = outputs[connection].node.as_ids()
        return sources

    def _create_edges(
        self, connection: EdgeConnection, this_node: dm.NodeId, sources: list[dm.NodeId], max_edge_count: int
    ) -> list[dm.EdgeApply]:
        end_nodes = random.sample(sources, k=randint(0, max_edge_count))

        edges: list[dm.EdgeApply] = []
        for end_node in end_nodes:
            start_node = this_node
            if connection.direction == "inwards":
                start_node, end_node = end_node, start_node

            edge = dm.EdgeApply(
                space=self._instance_space,
                external_id=f"{start_node.external_id}:{end_node.external_id}",
                type=connection.type,
                start_node=(start_node.space, start_node.external_id),
                end_node=(end_node.space, end_node.external_id),
            )
            edges.append(edge)
        return edges

    @staticmethod
    def _set_direct_relation_property(
        this_node: dm.NodeApply, view_id: dm.ViewId, property_name: str, value: dict | list[dict]
    ) -> None:
        if this_node.sources is None:
            this_node.sources = []
        for source in this_node.sources:
            if source.source == view_id:
                if not isinstance(source.properties, dict):
                    source.properties = dict(source.properties) if source.properties else {}
                source.properties[property_name] = value
                break
        else:
            # This is the first property residing in this view
            # for this node
            this_node.sources.append(
                dm.NodeOrEdgeData(
                    source=view_id,
                    properties={property_name: value},
                )
            )

    @staticmethod
    def _to_leaf_children_by_parent(views: list[dm.View]) -> dict[dm.ViewId, list[dm.ViewId]]:
        leaf_children_by_parent: dict[dm.ViewId, set[dm.ViewId]] = defaultdict(set)
        for view in views:
            for parent in view.implements or []:
                leaf_children_by_parent[parent].add(view.as_id())

        leafs: set[dm.ViewId] = set()
        for view_id in TopologicalSorter(leaf_children_by_parent).static_order():
            if view_id not in leaf_children_by_parent:
                leafs.add(view_id)
                continue

            parents = leaf_children_by_parent[view_id] - leafs
            for parent in parents:
                leaf_children_by_parent[view_id].remove(parent)
                leaf_children_by_parent[view_id].update(leaf_children_by_parent[parent])

        return {k: sorted(v, key=lambda x: x.as_tuple()) for k, v in leaf_children_by_parent.items()}

from_data_model(data_model_id, instance_space, client, data_set_id=None, seed=None) classmethod

Creates a MockGenerator from a data model.

Parameters:

Name Type Description Default
data_model_id DataModelIdentifier

Identifier of the data model to generate mock data for.

required
instance_space str

The space to use for the generated nodes and edges.

required
client CogniteClient

An instance of the CogniteClient class.

required
data_set_id int | None

The data set id to use for TimeSeries, Sequences, and FileMetadata.

None
seed int | None

The seed to use for the random number generator.

None

Returns:

Name Type Description
MockGenerator MockGenerator

The mock generator.

Source code in cognite/pygen/utils/mock_generator.py
@classmethod
def from_data_model(
    cls,
    data_model_id: DataModelIdentifier,
    instance_space: str,
    client: CogniteClient,
    data_set_id: int | None = None,
    seed: int | None = None,
) -> MockGenerator:
    """Creates a MockGenerator from a data model.

    Args:
        data_model_id: Identifier of the data model to generate mock data for.
        instance_space: The space to use for the generated nodes and edges.
        client: An instance of the CogniteClient class.
        data_set_id: The data set id to use for TimeSeries, Sequences, and FileMetadata.
        seed: The seed to use for the random number generator.

    Returns:
        MockGenerator: The mock generator.

    """
    with _log_pygen_mock_call(client) as client:
        data_model = client.data_modeling.data_models.retrieve(
            ids=data_model_id,
            inline_views=True,
        ).latest_version()

    return cls(
        views=data_model.views,
        instance_space=instance_space,
        data_set_id=data_set_id,
        seed=seed,
    )

generate_mock_data(node_count=5, max_edge_per_type=5, null_values=0.25)

Generates mock data for the given data model/views.

Parameters:

Name Type Description Default
node_count int

The number of nodes to generate for each view.

5
max_edge_per_type int

The maximum number of edges to generate for each edge type.

5
null_values float

The probability of generating a null value for a nullable properties.

0.25

Returns:

Name Type Description
MockData MockData

The generated mock data.

Source code in cognite/pygen/utils/mock_generator.py
def generate_mock_data(
    self, node_count: int = 5, max_edge_per_type: int = 5, null_values: float = 0.25
) -> MockData:
    """Generates mock data for the given data model/views.


    Args:
        node_count: The number of nodes to generate for each view.
        max_edge_per_type: The maximum number of edges to generate for each edge type.
        null_values: The probability of generating a null value for a nullable properties.

    Returns:
        MockData: The generated mock data.
    """
    if self._seed:
        random.seed(self._seed)
        for config in itertools.chain(self._view_configs.values(), [self._default_config]):
            for generator in config.property_types.values():
                if hasattr(generator, "reset") and isinstance(generator.reset, Callable):  # type: ignore[arg-type]
                    # This is for generators that have a state.
                    generator.reset()

    self._interfaces = {interface for view in self._views for interface in view.implements or []}
    mock_data = MockData()
    for components in _connected_views(self._views):
        data = self._generate_views_mock_data(components, node_count, max_edge_per_type, null_values)
        mock_data.extend(data)
    return mock_data

ViewMockConfig dataclass

This class contains parameters for configuration of how the mock data should be generated for a given view.

This controls how many nodes and edges should be generated, and how to generate mock data for the different property types and relations (direct relations + edges).

The 'properties' and 'property_types' parameters can be used to override the default mock data generation for specific properties and property types. The 'properties' parameter takes precedence over the 'property_types'

Note that this gives a very granular control over how to generate mock data, but it can be cumbersome to use. For most use cases, it is recommended to use the 'default_config' parameter of the MockGenerator class instead.

Parameters:

Name Type Description Default
node_count int | None

The number of nodes to generate for this view.

None
max_edge_per_type int | None

The maximum number of edges to generate per edge type for this view.

None
null_values float | None

The fraction of nullable properties that should be null for this view.

None
node_id_generator IDGeneratorFunction

How to generate node ids.

node_id
property_types dict[type[PropertyType], GeneratorFunction]

How to generate mock data for the different property types. The keys are the property types, and the values are functions that take the number of nodes as input and return a list of property values.

_create_default_property_types()
properties dict[str, GeneratorFunction]

How to generate mock data for the different properties. The keys are the property names, and the values are functions that take the number of nodes as input and return a list of property values.

lambda: {}()
Source code in cognite/pygen/utils/mock_generator.py
@dataclass
class ViewMockConfig:
    """This class contains parameters for configuration of how the mock
    data should be generated for a given view.

    This controls how many nodes and edges should be generated, and how to generate mock data for the different
    property types and relations (direct relations + edges).

    The 'properties' and 'property_types' parameters can be used to override the default mock data generation for
    specific properties and property types. The 'properties' parameter takes precedence over the 'property_types'

    Note that this gives a very granular control over how to generate mock data, but it can be cumbersome to use.
    For most use cases, it is recommended to use the 'default_config' parameter of the MockGenerator class instead.

    Args:
        node_count: The number of nodes to generate for this view.
        max_edge_per_type: The maximum number of edges to generate per edge type for this view.
        null_values: The fraction of nullable properties that should be null for this view.
        node_id_generator: How to generate node ids.
        property_types: How to generate mock
            data for the different property types. The keys are the property types, and the values are functions
            that take the number of nodes as input and return a list of property values.
        properties: How to generate mock data for the different
            properties. The keys are the property names, and the values are functions that take the number of nodes
            as input and return a list of property values.

    """

    node_count: int | None = None
    max_edge_per_type: int | None = None
    null_values: float | None = None
    node_id_generator: IDGeneratorFunction = _RandomGenerator.node_id  # type: ignore[assignment]
    property_types: dict[type[dm.PropertyType], GeneratorFunction] = field(
        default_factory=_create_default_property_types
    )
    properties: dict[str, GeneratorFunction] = field(default_factory=lambda: {})

    def __post_init__(self):
        if self.null_values is not None and (self.null_values < 0 or self.null_values > 1):
            raise ValueError("null_values must be between 0 and 1")
        if self.node_count is not None and self.node_count <= 0:
            raise ValueError("node_count must be greater than 0")
        if self.max_edge_per_type is not None and self.max_edge_per_type < 0:
            raise ValueError("max_edge_per_type must be greater than 0")
        for k, v in _create_default_property_types().items():
            if k not in self.property_types:
                self.property_types[k] = v
            elif self.property_types[k] is not v:
                expected = self.property_types[k](3)
                if len(expected) != 3:
                    raise ValueError(
                        f"Invalid Custom Random Generator property_types[{k}](3) must return a list of length 3"
                    )
                expected = self.property_types[k](5)
                if len(expected) != 5:
                    raise ValueError(
                        f"Invalid Custom Random Generator property_types[{k}](5) must return a list of length 5"
                    )

ViewMockData dataclass

Mock data for a given view.

Parameters:

Name Type Description Default
view_id ViewId

The view id.

required
instance_space str

The instance space.

required
is_writeable bool

Whether the view is writeable. Defaults to True.

True
node NodeApplyList

The nodes.

lambda: NodeApplyList([])()
edge EdgeApplyList

The edges.

lambda: EdgeApplyList([])()
timeseries TimeSeriesList

The timeseries.

lambda: TimeSeriesList([])()
sequence SequenceList

The sequences.

lambda: SequenceList([])()
file FileMetadataList

The files.

lambda: FileMetadataList([])()
Source code in cognite/pygen/utils/mock_generator.py
@dataclass
class ViewMockData:
    """Mock data for a given view.

    Args:
        view_id (dm.ViewId): The view id.
        instance_space (str): The instance space.
        is_writeable (bool): Whether the view is writeable. Defaults to True.
        node (dm.NodeApplyList): The nodes.
        edge (dm.EdgeApplyList): The edges.
        timeseries (TimeSeriesList): The timeseries.
        sequence (SequenceList): The sequences.
        file (FileMetadataList): The files.
    """

    view_id: dm.ViewId
    instance_space: str
    is_writeable: bool = True
    node: dm.NodeApplyList = field(default_factory=lambda: dm.NodeApplyList([]))
    edge: dm.EdgeApplyList = field(default_factory=lambda: dm.EdgeApplyList([]))
    timeseries: TimeSeriesList = field(default_factory=lambda: TimeSeriesList([]))
    sequence: SequenceList = field(default_factory=lambda: SequenceList([]))
    file: FileMetadataList = field(default_factory=lambda: FileMetadataList([]))

    @property
    def _node_only(self) -> dm.NodeApplyList:
        nodes = dm.NodeApplyList([])
        for node in self.node:
            # Dumping and loading to avoid mutating the original node
            dumped = node.dump()
            dumped.pop("sources", None)
            nodes.append(dm.NodeApply.load(dumped))
        return nodes

    def dump_yaml(self, folder: Path | str, exclude: set[ResourceType] | None = None) -> None:
        """
        Dumps the mock data to the given folder in yaml format.

        Args:
            folder: The folder to dump the mock data to.
            exclude: The resources to exclude from the dump.
        """
        folder_path = Path(folder)
        if not folder_path.exists():
            folder_path.mkdir(parents=True, exist_ok=True)
        for resource_name in ["node", "edge", "timeseries", "sequence", "file"]:
            if exclude and resource_name in exclude:
                continue
            values = getattr(self, resource_name)
            if values:
                dump_file = folder_path / f"{self.view_id.external_id}.{resource_name}.yaml"
                with dump_file.open("w", encoding="utf-8", newline="\n") as f:
                    f.write(values.dump_yaml())

    def deploy(self, client: CogniteClient, verbose: bool = False) -> None:
        """Deploys the mock data to CDF."""
        with _log_pygen_mock_call(client) as client:
            if client.data_modeling.spaces.retrieve(self.instance_space) is None:
                client.data_modeling.spaces.apply(dm.SpaceApply(self.instance_space, name=self.instance_space))

            if self.node or self.edge:
                if self.is_writeable:
                    nodes = self.node
                else:
                    nodes = self._node_only

                created = client.data_modeling.instances.apply(nodes, self.edge)
                if verbose:
                    print(
                        f"Created {sum(1 for n in created.nodes if n.was_modified)} nodes "
                        f"and {sum(1 for e in created.edges if e.was_modified)} edges"
                    )
            if self.timeseries:
                client.time_series.upsert(self.timeseries)
                if verbose:
                    print(f"Created/Updated {len(self.timeseries)} timeseries")
            if self.sequence:
                client.sequences.upsert(self.sequence)
                if verbose:
                    print(f"Created/Updated {len(self.sequence)} sequences")
            if self.file:
                existing = client.files.retrieve_multiple(
                    external_ids=self.file.as_external_ids(), ignore_unknown_ids=True
                )
                new_files = FileMetadataList([file for file in self.file if file.external_id not in existing])
                for file in new_files:
                    client.files.create(file)
                if verbose:
                    print(f"Created {len(new_files)} files")

    def _repr_html_(self) -> str:
        table = pd.DataFrame(
            [
                {
                    "resource": "node",
                    "count": len(self.node),
                },
                {
                    "resource": "edge",
                    "count": len(self.edge),
                },
                {
                    "resource": "timeseries",
                    "count": len(self.timeseries),
                },
                {
                    "resource": "sequence",
                    "count": len(self.sequence),
                },
                {
                    "resource": "file",
                    "count": len(self.file),
                },
            ]
        )

        return table._repr_html_()  # type: ignore[operator]

deploy(client, verbose=False)

Deploys the mock data to CDF.

Source code in cognite/pygen/utils/mock_generator.py
def deploy(self, client: CogniteClient, verbose: bool = False) -> None:
    """Deploys the mock data to CDF."""
    with _log_pygen_mock_call(client) as client:
        if client.data_modeling.spaces.retrieve(self.instance_space) is None:
            client.data_modeling.spaces.apply(dm.SpaceApply(self.instance_space, name=self.instance_space))

        if self.node or self.edge:
            if self.is_writeable:
                nodes = self.node
            else:
                nodes = self._node_only

            created = client.data_modeling.instances.apply(nodes, self.edge)
            if verbose:
                print(
                    f"Created {sum(1 for n in created.nodes if n.was_modified)} nodes "
                    f"and {sum(1 for e in created.edges if e.was_modified)} edges"
                )
        if self.timeseries:
            client.time_series.upsert(self.timeseries)
            if verbose:
                print(f"Created/Updated {len(self.timeseries)} timeseries")
        if self.sequence:
            client.sequences.upsert(self.sequence)
            if verbose:
                print(f"Created/Updated {len(self.sequence)} sequences")
        if self.file:
            existing = client.files.retrieve_multiple(
                external_ids=self.file.as_external_ids(), ignore_unknown_ids=True
            )
            new_files = FileMetadataList([file for file in self.file if file.external_id not in existing])
            for file in new_files:
                client.files.create(file)
            if verbose:
                print(f"Created {len(new_files)} files")

dump_yaml(folder, exclude=None)

Dumps the mock data to the given folder in yaml format.

Parameters:

Name Type Description Default
folder Path | str

The folder to dump the mock data to.

required
exclude set[ResourceType] | None

The resources to exclude from the dump.

None
Source code in cognite/pygen/utils/mock_generator.py
def dump_yaml(self, folder: Path | str, exclude: set[ResourceType] | None = None) -> None:
    """
    Dumps the mock data to the given folder in yaml format.

    Args:
        folder: The folder to dump the mock data to.
        exclude: The resources to exclude from the dump.
    """
    folder_path = Path(folder)
    if not folder_path.exists():
        folder_path.mkdir(parents=True, exist_ok=True)
    for resource_name in ["node", "edge", "timeseries", "sequence", "file"]:
        if exclude and resource_name in exclude:
            continue
        values = getattr(self, resource_name)
        if values:
            dump_file = folder_path / f"{self.view_id.external_id}.{resource_name}.yaml"
            with dump_file.open("w", encoding="utf-8", newline="\n") as f:
                f.write(values.dump_yaml())