Skip to content

Pygen

cognite.pygen

This is the main entry point for the pygen package. It contains the main functions for generating SDKs.

_QueryExecutor

Class for executing queries against the Domain Model Storage (DMS) endpoints in CDF.

Parameters:

Name Type Description Default
client CogniteClient

An instance of the CogniteClient.

required
views Sequence[View]

A list of views to use for the queries. Defaults to None. If not passed, the views will be fetched from the server when needed.

None
Source code in cognite/pygen/_query/interface.py
class QueryExecutor:
    """Class for executing queries against the Domain Model Storage (DMS) endpoints in CDF.

    Args:
        client (CogniteClient): An instance of the CogniteClient.
        views (Sequence[dm.View], optional): A list of views to use for the queries. Defaults to None.
            If not passed, the views will be fetched from the server when needed.
    """

    def __init__(self, client: CogniteClient, views: Sequence[dm.View] | None = None):
        self._client = client
        # Used for aggregated logging of requests
        client.config.client_name = f"CognitePygen:{__version__}:QueryExecutor:{client.config.client_name}"
        self._view_by_id: dict[dm.ViewId, dm.View] = {view.as_id(): view for view in views or []}
        self._unpack_edges: Literal["skip", "include"] = "include"

    def search(
        self,
        view: dm.ViewId,
        properties: SelectedProperties | None = None,
        query: str | None = None,
        filter: filters.Filter | None = None,
        search_properties: str | SequenceNotStr[str] | None = None,
        sort: Sequence[dm.InstanceSort] | dm.InstanceSort | None = None,
        limit: int | None = None,
    ) -> list[dict[str, Any]]:
        """Search for nodes/edges in a view.


        Args:
            view: The view in which the nodes/edges have properties.
            properties: The properties to in include in the result. If None, all properties are included.
            query: The search query.
            filter: The filter to apply ahead of the search.
            search_properties: The properties to search. If None, all text properties are searched.
            sort: The sort order of the results.
            limit: The maximum number of results to return. Max 1000.

        Returns:
            list[dict[str, Any]]: The search results.

        """
        filter = self._equals_none_to_not_exists(filter)
        search_result = self._client.data_modeling.instances.search(
            view,
            query,
            properties=search_properties,  # type: ignore[arg-type]
            filter=filter,
            limit=limit or SEARCH_LIMIT,
            sort=sort,
        )

        flatten_props = self._as_property_list(properties, "list") if properties else None
        are_flat_properties = flatten_props == properties
        if properties is None or are_flat_properties:
            return self._prepare_list_result(search_result, set(flatten_props) if flatten_props else None)

        # Lookup nested properties:

        order_by_node_ids = {node.as_id(): no for no, node in enumerate(search_result)}
        # If we are sorting, then we need to ensure externalId and space are included in the properties.
        # This is because we need them for the final sorting.
        include_space = False
        include_external_id = False
        if sort is not None:
            include_space = "space" not in properties
            include_external_id = "externalId" not in properties
        if include_space:
            properties.append("space")
        if include_external_id:
            properties.append("externalId")

        result: list[dict[str, Any]] = []
        for space, space_nodes in itertools.groupby(
            sorted(order_by_node_ids.keys(), key=lambda x: x.space), key=lambda x: x.space
        ):
            is_space = filters.Equals(["node", "space"], space)
            for chunk in chunker(list(space_nodes), IN_FILTER_CHUNK_SIZE):
                batch_filter = filters.And(
                    filters.In(["node", "externalId"], [node.external_id for node in chunk]), is_space
                )
                batch_result = self.list(view, properties, batch_filter, sort, limit or SEARCH_LIMIT)
                result.extend(batch_result)

        if sort is not None:
            result.sort(key=lambda x: order_by_node_ids[dm.NodeId(x["space"], x["externalId"])])
        if include_space or include_external_id:
            for item in result:
                if include_space:
                    del item["space"]
                if include_external_id:
                    del item["externalId"]

        return result

    @overload
    def aggregate(
        self,
        view: dm.ViewId,
        aggregates: Aggregation | Sequence[Aggregation],
        group_by: None = None,
        filter: filters.Filter | None = None,
        query: str | None = None,
        search_properties: str | SequenceNotStr[str] | None = None,
        limit: int | None = None,
    ) -> dict[str, Any]: ...

    @overload
    def aggregate(
        self,
        view: dm.ViewId,
        aggregates: Aggregation | Sequence[Aggregation],
        group_by: str | SequenceNotStr[str],
        filter: filters.Filter | None = None,
        query: str | None = None,
        search_properties: str | SequenceNotStr[str] | None = None,
        limit: int | None = None,
    ) -> list[dict[str, Any]]: ...

    def aggregate(
        self,
        view: dm.ViewId,
        aggregates: Aggregation | Sequence[Aggregation],
        group_by: str | SequenceNotStr[str] | None = None,
        filter: filters.Filter | None = None,
        query: str | None = None,
        search_properties: str | SequenceNotStr[str] | None = None,
        limit: int | None = None,
    ) -> dict[str, Any] | list[dict[str, Any]]:
        """Aggregate nodes/edges in a view.

        Args:
            view: The view in which the nodes/edges have properties.
            aggregates: The aggregations to perform.
            group_by: The properties to group by.
            filter: The filter to apply ahead of the aggregation.
            query: The search query. This is useful when you want to show the number of results
                of a specific search query. It is useful for combining with the search method.
            search_properties: The properties to search. If None, all text properties are searched.
            limit: The maximum number of results to return. Max 1000.

        Returns:
            dict[str, Any] | list[dict[str, Any]]: The aggregation results.

        """
        filter = self._equals_none_to_not_exists(filter)
        return self._execute_aggregation(view, aggregates, search_properties, query, filter, group_by, limit)

    @classmethod
    def _equals_none_to_not_exists(cls, filter: filters.Filter | None) -> filters.Filter | None:
        """Converts all Equals([property], None) filters to Not(Exists([property])) filters.

        The motivation is that the DMS API does not support Equals([property], None) filters, and
        it is more intuitive to use Equals([property], None) filters in the query builder.
        """
        if isinstance(filter, filters.Equals) and filter._value is None:
            return filters.Not(filters.Exists(filter._property))
        elif isinstance(filter, filters.And):
            return filters.And(*[res for f in filter._filters if (res := cls._equals_none_to_not_exists(f))])
        elif isinstance(filter, filters.Or):
            return filters.Or(*[res for f in filter._filters if (res := cls._equals_none_to_not_exists(f))])
        elif isinstance(filter, filters.Not) and filter._filters:
            if res := cls._equals_none_to_not_exists(filter._filters[0]):
                return filters.Not(res)
        return filter

    def _get_view(self, view_id: dm.ViewId) -> dm.View:
        if view_id not in self._view_by_id:
            view = self._client.data_modeling.views.retrieve(view_id, all_versions=False)
            if not view:
                raise CogniteAPIError(f"View not found: {view_id!r}", code=200)
            self._view_by_id[view_id] = view[0]
        return self._view_by_id[view_id]

    @staticmethod
    def _as_property_list(properties: SelectedProperties, operation: str) -> list[str]:
        output = []
        is_nested_supported = operation == "list"
        for prop in properties:
            if isinstance(prop, str):
                output.append(prop)
            elif isinstance(prop, dict) and is_nested_supported:
                if len(prop) != 1:
                    raise ValueError(f"Unexpected nested property: {prop}")
                key = next(iter(prop.keys()))
                output.append(key)
            elif isinstance(prop, dict):
                raise ValueError(f"Nested properties are not supported for operation {operation}")
            else:
                raise ValueError(f"Unexpected property type: {type(prop)}")
        return output

    def _execute_list(
        self,
        view_id: dm.ViewId,
        properties: SelectedProperties,
        filter: filters.Filter | None = None,
        sort: Sequence[dm.InstanceSort] | dm.InstanceSort | None = None,
        limit: int | None = None,
    ) -> list[dict[str, Any]]:
        view = self._get_view(view_id)
        root_properties = self._as_property_list(properties, "list")
        builder = QueryBuilder()
        factory = QueryStepFactory(builder.create_name, view=view, user_selected_properties=properties)

        if not factory.connection_properties:
            result = self._client.data_modeling.instances.list(
                instance_type="node",
                sources=[view_id],
                filter=filter,
                limit=limit,
                sort=sort,
            )
            return self._prepare_list_result(result, set(root_properties))

        reverse_views = {
            prop.through.source: self._get_view(prop.through.source)
            for prop in factory.reverse_properties.values()
            if isinstance(prop.through.source, dm.ViewId)
        }
        builder.append(factory.root(filter, limit=limit))
        for connection_id, connection in factory.connection_properties.items():
            builder.extend(factory.from_connection(connection_id, connection, reverse_views))
        _ = builder.execute_query(self._client, remove_not_connected=False)
        return QueryUnpacker(
            builder, edges=self._unpack_edges, as_data_record=False, edge_type_key="type", node_type_key="type"
        ).unpack()

    @classmethod
    def _prepare_list_result(
        cls, result: dm.NodeList[dm.Node], selected_properties: set[str] | None
    ) -> list[dict[str, Any]]:
        output: list[dict[str, Any]] = []
        for node in result:
            item = QueryUnpacker.flatten_dump(node, selected_properties)
            if item:
                # As long as you have selected properties, you will not get None.
                output.append(item)  # type: ignore[arg-type]
        return output

    def _execute_aggregation(
        self,
        view_id: dm.ViewId,
        aggregates: Aggregation | Sequence[Aggregation],
        search_properties: str | SequenceNotStr[str] | None = None,
        query: str | None = None,
        filter: filters.Filter | None = None,
        group_by: str | SequenceNotStr[str] | None = None,
        limit: int | None = None,
    ) -> dict[str, Any] | list[dict[str, Any]]:
        aggregates_list = aggregates if isinstance(aggregates, Sequence) else [aggregates]
        metric_aggregates = [agg for agg in aggregates_list if isinstance(agg, dm.aggregations.MetricAggregation)]
        histogram_aggregates = [agg for agg in aggregates_list if isinstance(agg, dm.aggregations.Histogram)]
        if metric_aggregates and histogram_aggregates:
            raise ValueError("Cannot mix metric and histogram aggregations")

        if metric_aggregates and group_by is not None:
            group_by_result = self._client.data_modeling.instances.aggregate(  # type: ignore[call-overload]
                view=view_id,
                group_by=group_by,
                aggregates=metric_aggregates,
                query=query,
                properties=search_properties,
                filter=filter,
                limit=limit or AGGREGATION_LIMIT,
            )
            return self._grouped_metric_aggregation_to_dict(group_by_result)
        elif metric_aggregates:
            metric_results = self._client.data_modeling.instances.aggregate(
                view_id,
                aggregates=metric_aggregates,
                query=query,
                properties=search_properties,
                filter=filter,
                limit=limit or AGGREGATION_LIMIT,
            )
            return self._metric_aggregation_to_dict(metric_results)

        elif histogram_aggregates:
            histogram_results = self._client.data_modeling.instances.histogram(
                view_id,
                histograms=histogram_aggregates,
                query=query,
                properties=search_properties,  # type: ignore[arg-type]
                filter=filter,
                limit=limit or AGGREGATION_LIMIT,
            )
            return self._histogram_aggregation_to_dict(histogram_results)
        else:
            raise ValueError("No aggregation found")

    @staticmethod
    def _metric_aggregation_to_dict(aggregation: list[dm.aggregations.AggregatedNumberedValue]) -> dict[str, Any]:
        values_by_aggregations: dict[str, dict[str, Any]] = defaultdict(dict)
        for item in aggregation:
            values_by_aggregations[item._aggregate][item.property] = item.value
        return dict(values_by_aggregations)

    @classmethod
    def _grouped_metric_aggregation_to_dict(cls, aggregations: InstanceAggregationResultList) -> list[dict[str, Any]]:
        output: list[dict[str, Any]] = []
        for group in aggregations:
            group_dict = {
                "group": group.group,
                **cls._metric_aggregation_to_dict(group.aggregates),
            }
            output.append(group_dict)
        return output

    @staticmethod
    def _histogram_aggregation_to_dict(aggregation: list[dm.aggregations.HistogramValue]) -> dict[str, Any]:
        output: dict[str, dict[str, Any]] = defaultdict(dict)
        for item in aggregation:
            output[item._aggregate][item.property] = {
                "interval": item.interval,
                "buckets": [bucket.dump() for bucket in item.buckets],
            }
        return dict(output)

    def list(
        self,
        view: dm.ViewId,
        properties: SelectedProperties,
        filter: filters.Filter | None = None,
        sort: Sequence[dm.InstanceSort] | dm.InstanceSort | None = None,
        limit: int | None = None,
    ) -> list[dict[str, Any]]:
        """List nodes/edges in a view.

        Args:
            view: The view in which the nodes/edges have properties.
            properties: The properties to include in the result.
            filter: The filter to apply ahead of the list operation.
            sort: The sort order of the results.
            limit: The maximum number of results to return. Pagination is handled automatically.
        """
        filter = self._equals_none_to_not_exists(filter)
        return self._execute_list(view, properties, filter, sort, limit)

_equals_none_to_not_exists(filter) classmethod

Converts all Equals([property], None) filters to Not(Exists([property])) filters.

The motivation is that the DMS API does not support Equals([property], None) filters, and it is more intuitive to use Equals([property], None) filters in the query builder.

Source code in cognite/pygen/_query/interface.py
@classmethod
def _equals_none_to_not_exists(cls, filter: filters.Filter | None) -> filters.Filter | None:
    """Converts all Equals([property], None) filters to Not(Exists([property])) filters.

    The motivation is that the DMS API does not support Equals([property], None) filters, and
    it is more intuitive to use Equals([property], None) filters in the query builder.
    """
    if isinstance(filter, filters.Equals) and filter._value is None:
        return filters.Not(filters.Exists(filter._property))
    elif isinstance(filter, filters.And):
        return filters.And(*[res for f in filter._filters if (res := cls._equals_none_to_not_exists(f))])
    elif isinstance(filter, filters.Or):
        return filters.Or(*[res for f in filter._filters if (res := cls._equals_none_to_not_exists(f))])
    elif isinstance(filter, filters.Not) and filter._filters:
        if res := cls._equals_none_to_not_exists(filter._filters[0]):
            return filters.Not(res)
    return filter

aggregate(view, aggregates, group_by=None, filter=None, query=None, search_properties=None, limit=None)

aggregate(view: dm.ViewId, aggregates: Aggregation | Sequence[Aggregation], group_by: None = None, filter: filters.Filter | None = None, query: str | None = None, search_properties: str | SequenceNotStr[str] | None = None, limit: int | None = None) -> dict[str, Any]
aggregate(view: dm.ViewId, aggregates: Aggregation | Sequence[Aggregation], group_by: str | SequenceNotStr[str], filter: filters.Filter | None = None, query: str | None = None, search_properties: str | SequenceNotStr[str] | None = None, limit: int | None = None) -> list[dict[str, Any]]

Aggregate nodes/edges in a view.

Parameters:

Name Type Description Default
view ViewId

The view in which the nodes/edges have properties.

required
aggregates Aggregation | Sequence[Aggregation]

The aggregations to perform.

required
group_by str | SequenceNotStr[str] | None

The properties to group by.

None
filter Filter | None

The filter to apply ahead of the aggregation.

None
query str | None

The search query. This is useful when you want to show the number of results of a specific search query. It is useful for combining with the search method.

None
search_properties str | SequenceNotStr[str] | None

The properties to search. If None, all text properties are searched.

None
limit int | None

The maximum number of results to return. Max 1000.

None

Returns:

Type Description
dict[str, Any] | list[dict[str, Any]]

dict[str, Any] | list[dict[str, Any]]: The aggregation results.

Source code in cognite/pygen/_query/interface.py
def aggregate(
    self,
    view: dm.ViewId,
    aggregates: Aggregation | Sequence[Aggregation],
    group_by: str | SequenceNotStr[str] | None = None,
    filter: filters.Filter | None = None,
    query: str | None = None,
    search_properties: str | SequenceNotStr[str] | None = None,
    limit: int | None = None,
) -> dict[str, Any] | list[dict[str, Any]]:
    """Aggregate nodes/edges in a view.

    Args:
        view: The view in which the nodes/edges have properties.
        aggregates: The aggregations to perform.
        group_by: The properties to group by.
        filter: The filter to apply ahead of the aggregation.
        query: The search query. This is useful when you want to show the number of results
            of a specific search query. It is useful for combining with the search method.
        search_properties: The properties to search. If None, all text properties are searched.
        limit: The maximum number of results to return. Max 1000.

    Returns:
        dict[str, Any] | list[dict[str, Any]]: The aggregation results.

    """
    filter = self._equals_none_to_not_exists(filter)
    return self._execute_aggregation(view, aggregates, search_properties, query, filter, group_by, limit)

list(view, properties, filter=None, sort=None, limit=None)

List nodes/edges in a view.

Parameters:

Name Type Description Default
view ViewId

The view in which the nodes/edges have properties.

required
properties SelectedProperties

The properties to include in the result.

required
filter Filter | None

The filter to apply ahead of the list operation.

None
sort Sequence[InstanceSort] | InstanceSort | None

The sort order of the results.

None
limit int | None

The maximum number of results to return. Pagination is handled automatically.

None
Source code in cognite/pygen/_query/interface.py
def list(
    self,
    view: dm.ViewId,
    properties: SelectedProperties,
    filter: filters.Filter | None = None,
    sort: Sequence[dm.InstanceSort] | dm.InstanceSort | None = None,
    limit: int | None = None,
) -> list[dict[str, Any]]:
    """List nodes/edges in a view.

    Args:
        view: The view in which the nodes/edges have properties.
        properties: The properties to include in the result.
        filter: The filter to apply ahead of the list operation.
        sort: The sort order of the results.
        limit: The maximum number of results to return. Pagination is handled automatically.
    """
    filter = self._equals_none_to_not_exists(filter)
    return self._execute_list(view, properties, filter, sort, limit)

search(view, properties=None, query=None, filter=None, search_properties=None, sort=None, limit=None)

Search for nodes/edges in a view.

Parameters:

Name Type Description Default
view ViewId

The view in which the nodes/edges have properties.

required
properties SelectedProperties | None

The properties to in include in the result. If None, all properties are included.

None
query str | None

The search query.

None
filter Filter | None

The filter to apply ahead of the search.

None
search_properties str | SequenceNotStr[str] | None

The properties to search. If None, all text properties are searched.

None
sort Sequence[InstanceSort] | InstanceSort | None

The sort order of the results.

None
limit int | None

The maximum number of results to return. Max 1000.

None

Returns:

Type Description
list[dict[str, Any]]

list[dict[str, Any]]: The search results.

Source code in cognite/pygen/_query/interface.py
def search(
    self,
    view: dm.ViewId,
    properties: SelectedProperties | None = None,
    query: str | None = None,
    filter: filters.Filter | None = None,
    search_properties: str | SequenceNotStr[str] | None = None,
    sort: Sequence[dm.InstanceSort] | dm.InstanceSort | None = None,
    limit: int | None = None,
) -> list[dict[str, Any]]:
    """Search for nodes/edges in a view.


    Args:
        view: The view in which the nodes/edges have properties.
        properties: The properties to in include in the result. If None, all properties are included.
        query: The search query.
        filter: The filter to apply ahead of the search.
        search_properties: The properties to search. If None, all text properties are searched.
        sort: The sort order of the results.
        limit: The maximum number of results to return. Max 1000.

    Returns:
        list[dict[str, Any]]: The search results.

    """
    filter = self._equals_none_to_not_exists(filter)
    search_result = self._client.data_modeling.instances.search(
        view,
        query,
        properties=search_properties,  # type: ignore[arg-type]
        filter=filter,
        limit=limit or SEARCH_LIMIT,
        sort=sort,
    )

    flatten_props = self._as_property_list(properties, "list") if properties else None
    are_flat_properties = flatten_props == properties
    if properties is None or are_flat_properties:
        return self._prepare_list_result(search_result, set(flatten_props) if flatten_props else None)

    # Lookup nested properties:

    order_by_node_ids = {node.as_id(): no for no, node in enumerate(search_result)}
    # If we are sorting, then we need to ensure externalId and space are included in the properties.
    # This is because we need them for the final sorting.
    include_space = False
    include_external_id = False
    if sort is not None:
        include_space = "space" not in properties
        include_external_id = "externalId" not in properties
    if include_space:
        properties.append("space")
    if include_external_id:
        properties.append("externalId")

    result: list[dict[str, Any]] = []
    for space, space_nodes in itertools.groupby(
        sorted(order_by_node_ids.keys(), key=lambda x: x.space), key=lambda x: x.space
    ):
        is_space = filters.Equals(["node", "space"], space)
        for chunk in chunker(list(space_nodes), IN_FILTER_CHUNK_SIZE):
            batch_filter = filters.And(
                filters.In(["node", "externalId"], [node.external_id for node in chunk]), is_space
            )
            batch_result = self.list(view, properties, batch_filter, sort, limit or SEARCH_LIMIT)
            result.extend(batch_result)

    if sort is not None:
        result.sort(key=lambda x: order_by_node_ids[dm.NodeId(x["space"], x["externalId"])])
    if include_space or include_external_id:
        for item in result:
            if include_space:
                del item["space"]
            if include_external_id:
                del item["externalId"]

    return result

build_wheel(model_id, client=None, *, top_level_package=None, client_name=None, default_instance_space=None, output_dir=Path('dist'), format_code=True, config=None)

Generates a wheel with Python SDK tailored to the given Data Model(s).

Parameters:

Name Type Description Default
model_id DataModel | Sequence[DataModel]

The ID(s) of the data model(s) used to create a tailored SDK. You can also pass in the data model(s) directly to avoid fetching them from CDF.

required
client Optional[CogniteClient]

The cognite client used for fetching the data model. This is required if you pass in data models ID(s) in the model_id argument and not a data model.

None
top_level_package Optional[str]

The name of the top level package for the SDK. For example, if we have top_level_package=apm and the client_name=APMClient, then the importing the client will be from apm import APMClient. If nothing is passed, the package will be [external_id:snake] of the first data model given, while the client name will be [external_id:pascal_case]

None
client_name Optional[str]

The name of the client class. For example, APMClient. See above for more details.

None
default_instance_space str | None

The default instance space to use for the generated SDK. If not provided, the space must be specified when creating, deleting, and retrieving nodes and edges.

None
output_dir Path

The location to output the generated SDK wheel. Defaults to "dist".

Path('dist')
format_code bool

Whether to format the generated code using black. Defaults to True.

True
config Optional[PygenConfig]

The configuration used to control how to generate the SDK.

None
Source code in cognite/pygen/_build.py
def build_wheel(
    model_id: DataModel | Sequence[DataModel],
    client: Optional[CogniteClient] = None,
    *,
    top_level_package: Optional[str] = None,
    client_name: Optional[str] = None,
    default_instance_space: str | None = None,
    output_dir: Path = Path("dist"),
    format_code: bool = True,
    config: Optional[PygenConfig] = None,
) -> None:
    """
    Generates a wheel with Python SDK tailored to the given Data Model(s).

    Args:
        model_id: The ID(s) of the data model(s) used to create a tailored SDK. You can also pass in the data model(s)
            directly to avoid fetching them from CDF.
        client: The cognite client used for fetching the data model. This is required if you pass in
            data models ID(s) in the `model_id` argument and not a data model.
        top_level_package: The name of the top level package for the SDK. For example,
            if we have top_level_package=`apm` and the client_name=`APMClient`, then
            the importing the client will be `from apm import APMClient`. If nothing is passed,
            the package will be [external_id:snake] of the first data model given, while
            the client name will be [external_id:pascal_case]
        client_name: The name of the client class. For example, `APMClient`. See above for more details.
        default_instance_space: The default instance space to use for the generated SDK. If not provided,
            the space must be specified when creating, deleting, and retrieving nodes and edges.
        output_dir: The location to output the generated SDK wheel. Defaults to "dist".
        format_code: Whether to format the generated code using black. Defaults to True.
        config: The configuration used to control how to generate the SDK.
    """
    try:
        from build import ProjectBuilder  # type: ignore[import]
    except ImportError:
        raise ImportError(
            "'build' is required to build wheel. Install pygen with `pip install pygen[cli] or "
            "install build directly `pip install build`."
        ) from None

    data_model = _get_data_model(model_id, client, print)
    folder_name = _create_folder_name(
        data_model.as_id() if isinstance(data_model, dm.DataModel) else data_model.as_ids()
    )
    build_dir = Path(tempfile.gettempdir()) / "pygen_build" / folder_name
    if build_dir.exists():
        try:
            shutil.rmtree(build_dir)
        except Exception as e:
            print(f"Failed to clean temporary build directory {build_dir}: {e}")

    external_id = _extract_external_id(data_model)
    if top_level_package is None:
        top_level_package = _default_top_level_package(external_id)
    if client_name is None:
        client_name = _default_client_name(external_id)
    generate_sdk(
        data_model,
        client,
        top_level_package=top_level_package,
        client_name=client_name,
        default_instance_space=default_instance_space,
        output_dir=build_dir / _top_level_to_path(top_level_package),
        overwrite=True,
        format_code=format_code,
        config=config,
    )

    generate_pyproject_toml(build_dir, top_level_package)

    output_dir.mkdir(exist_ok=True, parents=True)
    ProjectBuilder(build_dir).build(distribution="wheel", output_directory=str(output_dir))

    print(f"Generated SDK wheel at {output_dir}")

generate_sdk(model_id, client=None, top_level_package=None, client_name=None, default_instance_space=None, output_dir=None, logger=None, overwrite=False, format_code=False, config=None, return_sdk_files=False)

generate_sdk(model_id: DataModel | Sequence[DataModel], client: Optional[CogniteClient] = None, top_level_package: Optional[str] = None, client_name: Optional[str] = None, default_instance_space: str | None = None, output_dir: Optional[Path] = None, logger: Optional[Callable[[str], None]] = None, overwrite: bool = False, format_code: bool = False, config: Optional[PygenConfig] = None, return_sdk_files: Literal[False] = False) -> None
generate_sdk(model_id: DataModel | Sequence[DataModel], client: Optional[CogniteClient] = None, top_level_package: Optional[str] = None, client_name: Optional[str] = None, default_instance_space: str | None = None, output_dir: Optional[Path] = None, logger: Optional[Callable[[str], None]] = None, overwrite: bool = False, format_code: bool = False, config: Optional[PygenConfig] = None, return_sdk_files: Literal[True] = False) -> dict[Path, str]

Generates a Python SDK tailored to the given Data Model(s).

Parameters:

Name Type Description Default
model_id DataModel | Sequence[DataModel]

The ID(s) of the data model(s) used to create a tailored SDK. You can also pass in the data model(s) directly to avoid fetching them from CDF.

required
client Optional[CogniteClient]

The cognite client used for fetching the data model. This is required if you pass in data models ID(s) in the model_id argument and not a data model.

None
top_level_package Optional[str]

The name of the top level package for the SDK. For example, if we have top_level_package=apm and the client_name=APMClient, then the importing the client will be from apm import APMClient. If nothing is passed, the package will be [external_id:snake] of the first data model given, while the client name will be [external_id:pascal_case]. In the case, pygen is part of another package, the top level package should be the full package name. For example, cognite.apm.

None
client_name Optional[str]

The name of the client class. For example, APMClient. See above for more details.

None
default_instance_space str | None

The default instance space to use for the generated SDK. If not provided, the space must be specified when creating, deleting, and retrieving nodes and edges.

None
output_dir Optional[Path]

The location to output the generated SDK. Defaults: Path.cwd() / Path(top_level_package.replace(".", "/")).

None
logger Optional[Callable[[str], None]]

A logger function to log progress. Defaults to print.

None
overwrite bool

Whether to overwrite the output directory if it already exists. Defaults to False.

False
format_code bool

Whether to format the generated code using black. Defaults to False.

False
config Optional[PygenConfig]

The configuration used to control how to generate the SDK.

None
return_sdk_files bool

Whether to return the generated SDK files as a dictionary. Defaults to False. This is useful for granular control of how to write the SDK to disk.

False
Source code in cognite/pygen/_generator.py
def generate_sdk(
    model_id: DataModel | Sequence[DataModel],
    client: Optional[CogniteClient] = None,
    top_level_package: Optional[str] = None,
    client_name: Optional[str] = None,
    default_instance_space: str | None = None,
    output_dir: Optional[Path] = None,
    logger: Optional[Callable[[str], None]] = None,
    overwrite: bool = False,
    format_code: bool = False,
    config: Optional[PygenConfig] = None,
    return_sdk_files: bool = False,
) -> None | dict[Path, str]:
    """
    Generates a Python SDK tailored to the given Data Model(s).

    Args:
        model_id: The ID(s) of the data model(s) used to create a tailored SDK. You can also pass in the data model(s)
            directly to avoid fetching them from CDF.
        client: The cognite client used for fetching the data model. This is required if you pass in
            data models ID(s) in the `model_id` argument and not a data model.
        top_level_package: The name of the top level package for the SDK. For example,
            if we have top_level_package=`apm` and the client_name=`APMClient`, then
            the importing the client will be `from apm import APMClient`. If nothing is passed,
            the package will be [external_id:snake] of the first data model given, while
            the client name will be [external_id:pascal_case]. In the case, pygen is part of another package,
            the top level package should be the full package name. For example, `cognite.apm`.
        client_name: The name of the client class. For example, `APMClient`. See above for more details.
        default_instance_space: The default instance space to use for the generated SDK. If not provided,
            the space must be specified when creating, deleting, and retrieving nodes and edges.
        output_dir: The location to output the generated SDK.
            Defaults: Path.cwd() / Path(top_level_package.replace(".", "/")).
        logger: A logger function to log progress. Defaults to print.
        overwrite: Whether to overwrite the output directory if it already exists. Defaults to False.
        format_code: Whether to format the generated code using black. Defaults to False.
        config: The configuration used to control how to generate the SDK.
        return_sdk_files: Whether to return the generated SDK files as a dictionary. Defaults to False.
            This is useful for granular control of how to write the SDK to disk.
    """
    return _generate_sdk(
        model_id,
        client,
        top_level_package,
        client_name,
        default_instance_space,
        output_dir,
        logger,
        overwrite,
        format_code,
        config,
        return_sdk_files,
    )

generate_sdk_notebook(model_id, client=None, top_level_package=None, client_name=None, default_instance_space=None, config=None, clean_pygen_temp_dir=True)

Generates a Python SDK tailored to the given Data Model(s) and imports it into the current Python session.

This function is wrapper around generate_sdk. It is intended to be used in a Jupyter notebook. The differences are that it:

  • The SDK is generated in a temporary directory and added to the sys.path. This is such that it becomes available to be imported in the current Python session.
  • The signature is simplified.
  • An instantiated client of the generated SDK is returned.

Parameters:

Name Type Description Default
model_id DataModel | Sequence[DataModel]

The ID(s) of the data model(s) used to create a tailored SDK. You can also pass in the data model(s) directly to avoid fetching them from CDF.

required
client Optional[CogniteClient]

The cognite client used for fetching the data model. This is required if you pass in data models ID(s) in the model_id argument and not a data model.

None
top_level_package Optional[str]

The name of the top level package for the SDK. For example, if we have top_level_package=apm and the client_name=APMClient, then the importing the client will be from apm import APMClient. If nothing is passed, the package will be [external_id:snake] of the first data model given, while the client name will be [external_id:pascal_case]

None
client_name Optional[str]

The name of the client class. For example, APMClient. See above for more details.

None
default_instance_space str | None

The default instance space to use for the generated SDK. If not provided, the space must be specified when creating, deleting, and retrieving nodes and edges.

None
config Optional[PygenConfig]

The configuration used to control how to generate the SDK.

None
clean_pygen_temp_dir bool

Whether to clean the temporary directory used to store the generated SDK. Defaults to True.

True

Returns:

Type Description
Any

The instantiated generated client class.

Source code in cognite/pygen/_generator.py
def generate_sdk_notebook(
    model_id: DataModel | Sequence[DataModel],
    client: Optional[CogniteClient] = None,
    top_level_package: Optional[str] = None,
    client_name: Optional[str] = None,
    default_instance_space: str | None = None,
    config: Optional[PygenConfig] = None,
    clean_pygen_temp_dir: bool = True,
) -> Any:
    """
    Generates a Python SDK tailored to the given Data Model(s) and imports it into the current Python session.

    This function is wrapper around generate_sdk. It is intended to be used in a Jupyter notebook.
    The differences are that it:

    * The SDK is generated in a temporary directory and added to the sys.path. This is such that it
      becomes available to be imported in the current Python session.
    * The signature is simplified.
    * An instantiated client of the generated SDK is returned.


    Args:
        model_id: The ID(s) of the data model(s) used to create a tailored SDK. You can also pass in the data model(s)
            directly to avoid fetching them from CDF.
        client: The cognite client used for fetching the data model. This is required if you pass in
            data models ID(s) in the `model_id` argument and not a data model.
        top_level_package: The name of the top level package for the SDK. For example,
            if we have top_level_package=`apm` and the client_name=`APMClient`, then
            the importing the client will be `from apm import APMClient`. If nothing is passed,
            the package will be [external_id:snake] of the first data model given, while
            the client name will be [external_id:pascal_case]
        client_name: The name of the client class. For example, `APMClient`. See above for more details.
        default_instance_space: The default instance space to use for the generated SDK. If not provided,
            the space must be specified when creating, deleting, and retrieving nodes and edges.
        config: The configuration used to control how to generate the SDK.
        clean_pygen_temp_dir: Whether to clean the temporary directory used to store the generated SDK.
            Defaults to True.

    Returns:
        The instantiated generated client class.
    """
    data_model = _get_data_model(model_id, client, print)
    folder_name = _create_folder_name(
        data_model.as_id() if isinstance(data_model, dm.DataModel) else data_model.as_ids()
    )
    output_dir = Path(tempfile.gettempdir()) / "pygen" / folder_name
    if clean_pygen_temp_dir and output_dir.exists():
        try:
            shutil.rmtree(output_dir)
        except Exception as e:
            print(f"Failed to clean temporary directory {output_dir}: {e}")
        else:
            print(f"Cleaned temporary directory {output_dir}")

    external_id = _extract_external_id(data_model)
    if top_level_package is None:
        top_level_package = _default_top_level_package(external_id)
    if client_name is None:
        client_name = _default_client_name(external_id)
    _generate_sdk(
        data_model,
        client,
        top_level_package=top_level_package,
        client_name=client_name,
        default_instance_space=default_instance_space,
        output_dir=output_dir / _top_level_to_path(top_level_package),
        overwrite=True,
        format_code=False,
        config=config,
        context="notebook",
    )
    if str(output_dir) not in sys.path:
        sys.path.append(str(output_dir))
        print(f"Added {output_dir} to sys.path to enable import")
    else:
        print(f"{output_dir} already in sys.path")
    try:
        module = vars(importlib.import_module(top_level_package))
    except SchemaError as error:
        if is_pyodide() and "recursion_loop" in {e["type"] for e in error.errors() if "type" in e}:
            print("Large SDK detected. Reached recursion limit in Pyodide. Tying again skipping schema validation.")
            os.environ["PYDANTIC_SKIP_VALIDATING_CORE_SCHEMAS"] = "true"
            module = vars(importlib.import_module(top_level_package))
        else:
            raise error

    print(f"Imported {top_level_package}")
    print("You can now use the generated SDK in the current Python session.")
    if isinstance(data_model, dm.DataModel):
        view = data_model.views[0]
    elif isinstance(data_model, Sequence):
        view = data_model[0].views[0]
    else:
        view = None

    if view:
        print(
            "The data classes are available by importing, for example, "
            f"`from {top_level_package}.data_classes import {DataClass.to_base_name(view)}Write`"
        )
    return module[client_name](client)

load_cognite_client_from_toml(toml_file='config.toml', section='cognite')

This is a small helper function to load a CogniteClient from a toml file.

The default name of the config file is "config.toml" and it should look like this:

[cognite]
project = "<cdf-project>"
tenant_id = "<tenant-id>"
cdf_cluster = "<cdf-cluster>"
client_id = "<client-id>"
client_secret = "<client-secret>"

Parameters:

Name Type Description Default
toml_file Path | str

Path to toml file

'config.toml'
section str | None

Name of the section in the toml file to use. If None, use the top level of the toml file. Defaults to "cognite".

'cognite'

Returns:

Type Description
CogniteClient

A CogniteClient with configurations from the toml file.

Source code in cognite/pygen/utils/cdf.py
def load_cognite_client_from_toml(
    toml_file: Path | str = "config.toml", section: str | None = "cognite"
) -> CogniteClient:
    """
    This is a small helper function to load a CogniteClient from a toml file.

    The default name of the config file is "config.toml" and it should look like this:

    ```toml
    [cognite]
    project = "<cdf-project>"
    tenant_id = "<tenant-id>"
    cdf_cluster = "<cdf-cluster>"
    client_id = "<client-id>"
    client_secret = "<client-secret>"
    ```

    Args:
        toml_file: Path to toml file
        section: Name of the section in the toml file to use. If None, use the top level of the toml file.
                 Defaults to "cognite".

    Returns:
        A CogniteClient with configurations from the toml file.
    """
    import toml

    toml_content = toml.load(toml_file)
    if section is not None:
        toml_content = toml_content[section]

    login_flow = toml_content.pop("login_flow", None)
    if login_flow == "interactive":
        return CogniteClient.default_oauth_interactive(**toml_content)
    else:
        return CogniteClient.default_oauth_client_credentials(**toml_content)