Skip to content

Model

model

Package containing models, the pythonic representation of feature classes.

They inherit from BaseFeature, which extends the Pydantic BaseModel with some utility, and are usually instantiated with the model_factory method. A feature collection is represented by the BaseCollection class, which inherits from Pydantic RootModel.

Example

Load the BP_Plan model for XPlanung v6.0 and instantiate it with some data:

plan = model_factory("BP_Plan", "6.0")
instance = plan.model_validate(
    {
        "name": "Testplan",
        "gemeinde": [
            {
                "ags": "1234"
            }
        ],
        "raeumlicherGeltungsbereich": {
            "srid": 25832,
            "wkt": <WKT-String>
        }
    }
)

model_factory(model_name, model_version, data_type='xplan')

Factory method for retrieving the corresponding pydantic model representation of a feature class.

Parameters:

Name Type Description Default
model_name str

name of the feature class

required
model_version str | None

version of the specification release

required
data_type Literal['xplan', 'plu', 'def']

Specification of either XPlanung, INSPIRE PLU, or general definitions used by both application schemas.

'xplan'

Raises:

Type Description
ValueError

raises error for invalid model name and/or incompatible data version.

Returns:

Name Type Description
BaseFeature BaseFeature

The concrete feature class inheriting from BaseFeature.

Source code in xplan_tools/model/__init__.py
def model_factory(
    model_name: str,
    model_version: str | None,
    data_type: Literal["xplan", "plu", "def"] = "xplan",
) -> "BaseFeature":
    """Factory method for retrieving the corresponding pydantic model representation of a feature class.

    Args:
        model_name: name of the feature class
        model_version: version of the specification release
        data_type: Specification of either XPlanung, INSPIRE PLU, or general definitions used by both application schemas.

    Raises:
        ValueError: raises error for invalid model name and/or incompatible data version.

    Returns:
        BaseFeature: The concrete feature class inheriting from BaseFeature.
    """
    match data_type:
        case "xplan":
            if model_version[0] == "5":
                model_version = "5.4"
            model = locate(
                f"xplan_tools.model.appschema.xplan{model_version.replace('.', '')}.{model_name.replace('_', '')}"
            )
        case "plu":
            model = locate(
                f"xplan_tools.model.appschema.inspire_plu{model_version.replace('.', '')}.{model_name}"
            )
            model.model_config["extra"] = "ignore"
        case "def":
            model = locate(f"xplan_tools.model.appschema.definitions.{model_name}")

    if not model:
        raise ValueError(
            f"Invalid model name '{model_name}' or version '{model_version}'"
        )
    return model

BaseFeature

Bases: BaseModel, GMLAdapter, CoretableAdapter, JsonFGAdapter

Base class for application schema classes.

It extends pydantic BaseModel with Feature-related helper methods as well as conversion capabilities from/to other formats via inheriting from respective adapter classes.

deserialization_hook(data) classmethod

Provides deserialization for different formats/representations before validation.

Source code in xplan_tools/model/base.py
@model_validator(mode="before")
@classmethod
def deserialization_hook(cls, data: Any) -> Any:
    """Provides deserialization for different formats/representations before validation."""
    if isinstance(data, _Element):
        return cls._from_etree(data)
    if isinstance(data, Feature):
        return cls._from_coretable(data)
    if isinstance(data, dict) and data.get("featureType", None):
        return cls._from_jsonfg(data)
    return data

get_data_type()

Return the data type.

Source code in xplan_tools/model/base.py
def get_data_type(self) -> Literal["xplan", "plu"]:
    """Return the data type."""
    return self.__module__[:-2].split(".")[-1].split("_")[-1]

get_geom_field() classmethod

Returns the classes geometry field name, if any.

Source code in xplan_tools/model/base.py
@classmethod
def get_geom_field(cls) -> Optional[str]:
    """Returns the classes geometry field name, if any."""
    if attr := {
        "position",
        "geltungsbereich",
        "raeumlicherGeltungsbereich",
        "extent",
        "geometry",
    }.intersection(set(cls.model_fields.keys())):
        return attr.pop()

get_geom_srid()

Returns the object's geometry's SRID, if any.

Source code in xplan_tools/model/base.py
def get_geom_srid(self) -> Optional[int]:
    """Returns the object's geometry's SRID, if any."""
    if geom_field := self.get_geom_field():
        if geom := getattr(self, geom_field, None):
            return geom.srid

get_geom_wkt()

Returns the object's eWKT geometry's WKT representation withouth SRID, if any.

Source code in xplan_tools/model/base.py
def get_geom_wkt(self) -> Optional[str]:
    """Returns the object's eWKT geometry's WKT representation withouth SRID, if any."""
    if geom_field := self.get_geom_field():
        if geom := getattr(self, geom_field, None):
            return geom.wkt

get_name() classmethod

Returns the canonical name of the FeatureClass.

Source code in xplan_tools/model/base.py
@classmethod
def get_name(cls) -> str:
    """Returns the canonical name of the FeatureClass."""
    return get_name(cls.__name__)

get_properties() classmethod

Returns a slimmed down model with just the properties, i.e. exluding id and geometry attributes as well as utility methods.

Source code in xplan_tools/model/base.py
@classmethod
def get_properties(cls) -> BaseModel:
    """Returns a slimmed down model with just the properties, i.e. exluding id and geometry attributes as well as utility methods."""
    properties = {
        k: (v.annotation, v)
        for k, v in cls.model_fields.items()
        if k not in ["id", cls.get_geom_field()]
    }
    return create_model(cls.get_name(), **properties)

get_property_info(name) classmethod

Property information.

Returns a dict containing the following property information which might be useful e.g. for de-/serialization:

  • stereotype: the property's stereotype, e.g. DataType or Association
  • typename: the concrete type(s) of the property; may be an array, especially for associations
  • list: whether the property has a multiplicity > 1
  • nullable: whether the property is optional
  • uom: unit of measure for measure types
  • enum_info: names, aliases, description and, if available, tokens for codes from enumeration types

Parameters:

Name Type Description Default
name str

The property's name.

required

Raises:

Type Description
AttributeError

The name was not found in the model fields.

Source code in xplan_tools/model/base.py
@classmethod
def get_property_info(cls, name: str) -> dict:
    """Property information.

    Returns a dict containing the following property information which might be useful e.g. for de-/serialization:

    - `stereotype`: the property's stereotype, e.g. DataType or Association
    - `typename`: the concrete type(s) of the property; may be an array, especially for associations
    - `list`: whether the property has a multiplicity > 1
    - `nullable`: whether the property is optional
    - `uom`: unit of measure for measure types
    - `enum_info`: names, aliases, description and, if available, tokens for codes from enumeration types

    Args:
        name: The property's name.

    Raises:
        AttributeError: The name was not found in the model fields.
    """
    try:
        field_info = cls.model_fields[name]
        extra_info = field_info.json_schema_extra or {}
    except KeyError:
        raise AttributeError(f"Unknown property: {name}")
    else:
        return {
            "stereotype": extra_info.get("stereotype", None),
            "typename": extra_info.get("typename", None),
            "list": get_origin(field_info.annotation) is list
            or list
            in [
                arg.__origin__
                for arg in get_args(field_info.annotation)
                if getattr(arg, "__origin__", None)
            ],
            "nullable": NoneType in get_args(field_info.annotation),
            "uom": extra_info.get("uom", None),
            "enum_info": extra_info.get("enumDescription", None),
        }

get_version() classmethod

Returns the application schema version of the object.

Source code in xplan_tools/model/base.py
@classmethod
def get_version(cls) -> str:
    """Returns the application schema version of the object."""
    return f"{cls.__module__[-2:-1]}.{cls.__module__[-1:]}"

model_dump_coretable()

Dumps the model data to a coretable Feature object to store in a database.

Source code in xplan_tools/model/base.py
def model_dump_coretable(self) -> Feature:
    """Dumps the model data to a coretable Feature object to store in a database."""
    return self._to_coretable()

model_dump_gml(data_type='xplan')

Dumps the model data to a GML structure held in an etree.Element.

Source code in xplan_tools/model/base.py
def model_dump_gml(self, data_type: Literal["xplan", "plu"] = "xplan") -> _Element:
    """Dumps the model data to a GML structure held in an etree.Element."""
    return self._to_etree(data_type)

model_dump_jsonfg(**kwargs)

Dumps the model data to a JSON-FG object.

Source code in xplan_tools/model/base.py
def model_dump_jsonfg(
    self,
    **kwargs,
) -> dict:
    """Dumps the model data to a JSON-FG object."""
    return self._to_jsonfg(**kwargs)

BaseCollection

Bases: RootModel

Container for features that provides validation of references.

The features are stored in a dictionary with their ID as key and the feature instance as value.

add_style_properties(to_text=False)

Add styling properties to presentational objects.

This method parses object (dientZurDarstellungVon) and property (art) references from presentational objects and derives styling information (stylesheetId, schriftinhalt) based on a set of defined rules.

Parameters:

Name Type Description Default
to_text bool

Whether to convert symbolic presentational objects to textual ones. Defaults to False.

False
Source code in xplan_tools/model/base.py
def add_style_properties(self, to_text: bool = False) -> None:
    """Add styling properties to presentational objects.

    This method parses object (dientZurDarstellungVon) and property (art) references from
    presentational objects and derives styling information (stylesheetId, schriftinhalt)
    based on a set of defined rules.

    Args:
        to_text: Whether to convert symbolic presentational objects to textual ones. Defaults to False.
    """

    uom_map = {"m2": "m²", "m3": "m³"}

    def parse_art(ref_obj: BaseFeature, art: str) -> dict:
        def parse_value(value: Any) -> dict:
            if prop_info["stereotype"] == "Measure":
                value = value.value
            if prop_info["typename"] == "Boolean":
                value = str(value).lower()

            if name.startswith("Z"):
                text = toRoman(value)
            elif prop_info["stereotype"] == "Enumeration":
                text = prop_info["enum_info"][value].get(
                    "token", prop_info["enum_info"][value].get("alias", value)
                )
            elif prop_info["stereotype"] == "Measure":
                text = (
                    f"{value:n} {uom_map.get(prop_info['uom'], prop_info['uom'])}"
                )
            else:
                text = value

            return {
                "value": value,
                "text": text,
            }

        remove_subindexes = re.sub(r"(/[:\w]*)(\[\d\])", r"\g<1>", art)
        remove_namespace = re.sub(
            r"xplan:", "", remove_subindexes
        )  # xplan:|(.P_|SO_)[a-zA-Z]*\/
        attr, index, datatype, sub_attr = re.match(
            r"^(?P<attr>\w*)\[?(?P<index>\d)?]?/?(?P<datatype>\w{2}_\w*)?/?(?P<sub_attr>\w*)?$",
            remove_namespace,
        ).groups()

        if datatype:
            model = model_factory(datatype, ref_obj.get_version())
        else:
            model = ref_obj

        name = attr
        value = getattr(ref_obj, attr)
        if isinstance(value, list):
            index = 0
            if index:
                index = max(int(index) - 1, 0)
            value = value[index]
        if sub_attr:
            name = sub_attr
            value = getattr(value, sub_attr)

        prop_info = model.get_property_info(name)

        data = {
            "name": name,
            "data": parse_value(value),
            "type": prop_info["typename"],
        }
        return data

    # TODO use for XPlanung v6.1 with addition attribute massstabFaktor
    # def set_scale(obj):
    #     bereich = self.root.get(str(obj.gehoertZuBereich))
    #     plan = self.root.get(str(bereich.gehoertZuPlan))
    #     default_scale = SCALES.get(plan.get_name(), 1000)
    #     actual_scale = (
    #         bereich.erstellungsMassstab or plan.erstellungsMassstab or default_scale
    #     )
    #     if obj.skalierung <= 3:
    #         obj.skalierung = float(obj.skalierung * actual_scale / 1000)

    logger.info("adding style properties to collection")

    for obj in filter(lambda x: hasattr(x, "stylesheetId"), self.get_features()):
        logger.debug(f"Feature {obj.id}: adding style properties")
        version = obj.get_version()
        if to_text and (old_type := obj.get_name()) == "XP_PPO":
            new_type = "XP_PTO"
            obj = model_factory(
                new_type,
                version,
                "xplan",
            ).model_validate(obj.model_dump())
            logger.info(f"Feature {obj.id}: converted {old_type} to {new_type}")
        # TODO use for XPlanung v6.1 with addition attribute massstabFaktor
        # if hasattr(obj, "skalierung"):
        #     set_scale(obj)
        if not obj.dientZurDarstellungVon:
            logger.info(
                f"Feature {obj.id}: dientZurDarstellungVon not set, skipping"
            )
            continue
        elif len(obj.dientZurDarstellungVon) > 1:
            logger.warning(
                f"Feature {obj.id}: references to multiple objects '{obj.dientZurDarstellungVon}' not supported, skipping"
            )
        elif not obj.art:
            logger.info(f"Feature {obj.id}: art not set, skipping")
            continue

        ref_obj = self.root.get(str(obj.dientZurDarstellungVon[0]))
        logger.debug(
            f"parsing properties {obj.art} for referenced feature {ref_obj.get_name()} with ID {ref_obj.id}"
        )
        selectors = {}
        for art in obj.art:
            try:
                parsed_art = parse_art(ref_obj, art)
                selectors[parsed_art.pop("name")] = parsed_art
            except Exception:
                logger.error(f"Feature {obj.id}: art '{art}' could not be parsed")
        valid_rules = []
        for rule_id, rule in RULES.items():
            versioned_rule = rule["versions"].get(version, {})
            if isinstance(
                versioned_rule, str
            ):  # use other versioned rule referenced by string
                versioned_rule = rule[versioned_rule]
            valid = versioned_rule["selector"].keys() == selectors.keys() and (
                all(
                    (
                        filter.get("value", None) == ["*"]
                        or selectors.get(attr, {})
                        .get("data", {})
                        .get("value", None)
                        in filter.get("value", False)
                    )
                    and (
                        selectors.get(attr, {}).get("type", None)
                        == filter.get("type", False)
                    )
                    for attr, filter in versioned_rule["selector"].items()
                )
                if versioned_rule.get("selector", None)
                else False
            )
            if valid:
                valid_rules.append(rule_id)
                texts = {
                    attr: data["data"]["text"] for attr, data in selectors.items()
                }
                obj.stylesheetId = rule_id
                if (text := versioned_rule.get("text", None)) and hasattr(
                    obj, "schriftinhalt"
                ):
                    obj.schriftinhalt = text.format(**texts)
        if not valid_rules:
            if all(
                data["type"] in ["CharacterString", "Integer", "Decimal", "Length"]
                for data in selectors.values()
            ):
                obj.stylesheetId = "81e52187-a33b-4340-9d6e-f25533e01aa3"
                if hasattr(obj, "schriftinhalt"):
                    obj.schriftinhalt = " ".join(
                        [str(data["data"]["text"]) for data in selectors.values()]
                    ).strip()
                    logger.debug(
                        f"Feature {obj.id}: schriftinhalt set to {obj.schriftinhalt}"
                    )
            else:
                logger.warning(f"No rule found for feature {obj.id}")
        if len(valid_rules) > 1:
            raise ValueError(f"More than one rules valid: {', '.join(valid_rules)}")
        else:
            logger.debug(
                f"Feature {obj.id}: stylesheetId set to {obj.stylesheetId}"
            )
        self.root[obj.id] = obj

    logger.info("finished adding style properties to collection")

check_references()

Checks if all objects referenced via UUID are part of the collection.

Source code in xplan_tools/model/base.py
@model_validator(mode="after")
def check_references(self) -> "BaseCollection":
    """Checks if all objects referenced via UUID are part of the collection."""
    logger.debug("checking feature references")
    keys = self.root.keys()
    for feature in self.root.values():
        for name, value in feature:
            if isinstance(value, UUID):
                if str(value) not in keys:
                    raise ValueError(
                        f"reference {name}: {value} in object {feature.id} not resolvable"
                    )
            elif isinstance(value, list):
                for item in value:
                    if isinstance(item, UUID) and str(item) not in keys:
                        raise ValueError(
                            f"reference {name}: {item} in object {feature.id} not resolvable"
                        )
    logger.debug("all feature references resolvable")
    return self

get_features()

Yields features stored in the collection.

Source code in xplan_tools/model/base.py
def get_features(self) -> Iterator["BaseFeature"]:
    """Yields features stored in the collection."""
    return (feature for feature in self.root.values())

get_single_plans(with_name=False)

Yields BaseCollection objects for every plan in the original collection.

Source code in xplan_tools/model/base.py
def get_single_plans(
    self, with_name: bool = False
) -> Iterator["BaseCollection"] | Iterator[Tuple[str, "BaseCollection"]]:
    """Yields BaseCollection objects for every plan in the original collection."""
    for plan in filter(lambda x: "Plan" in x.get_name(), self.root.values()):
        collection = {plan.id: plan}
        for attr in ["texte", "begruendungsTexte"]:
            if refs := getattr(plan, attr):
                collection.update({str(ref): self.root[str(ref)] for ref in refs})
        for bereich in filter(
            lambda x: str(getattr(x, "gehoertZuPlan", "")) == plan.id,
            self.root.values(),
        ):
            collection[bereich.id] = bereich
            for feature in filter(
                lambda x: str(getattr(x, "gehoertZuBereich", "")) == bereich.id,
                self.root.values(),
            ):
                collection[feature.id] = feature
            if raster := getattr(bereich, "rasterBasis", None):
                collection[str(raster)] = self.root[str(raster)]
        yield (
            plan.name,
            BaseCollection(root=collection)
            if with_name
            else BaseCollection(root=collection),
        )

list_to_dict(data) classmethod

Takes a list of BaseFeatures and returns a BaseCollection dict.

Source code in xplan_tools/model/base.py
@model_validator(mode="before")
@classmethod
def list_to_dict(cls, data: Any) -> Any:
    """Takes a list of BaseFeatures and returns a BaseCollection dict."""
    if isinstance(data, list):
        data_dict = {}
        for feature in data:
            if not isinstance(feature, BaseFeature):
                raise TypeError(
                    f"Object is not an instance of BaseFeature: {feature}"
                )
            data_dict[feature.id] = feature
        return data_dict
    return data