115 lines
4.7 KiB
Python
115 lines
4.7 KiB
Python
"""stac-fastapi utility methods."""
|
|
|
|
from typing import Any, Dict, Optional, Set, Union
|
|
|
|
from stac_fastapi.types.stac import Item
|
|
|
|
|
|
def filter_fields( # noqa: C901
|
|
item: Union[Item, Dict[str, Any]],
|
|
include: Optional[Set[str]] = None,
|
|
exclude: Optional[Set[str]] = None,
|
|
) -> Item:
|
|
"""Preserve and remove fields as indicated by the fields extension include/exclude sets.
|
|
|
|
Returns a shallow copy of the Item with the fields filtered.
|
|
|
|
This will not perform a deep copy; values of the original item will be referenced
|
|
in the return item.
|
|
"""
|
|
if not include and not exclude:
|
|
return item
|
|
|
|
# Build a shallow copy of included fields on an item, or a sub-tree of an item
|
|
def include_fields(
|
|
source: Dict[str, Any], fields: Optional[Set[str]]
|
|
) -> Dict[str, Any]:
|
|
if not fields:
|
|
return source
|
|
|
|
clean_item: Dict[str, Any] = {}
|
|
for key_path in fields or []:
|
|
key_path_parts = key_path.split(".")
|
|
key_root = key_path_parts[0]
|
|
if key_root in source:
|
|
if isinstance(source[key_root], dict) and len(key_path_parts) > 1:
|
|
# The root of this key path on the item is a dict, and the
|
|
# key path indicates a sub-key to be included. Walk the dict
|
|
# from the root key and get the full nested value to include.
|
|
value = include_fields(
|
|
source[key_root], fields={".".join(key_path_parts[1:])}
|
|
)
|
|
|
|
if isinstance(clean_item.get(key_root), dict):
|
|
# A previously specified key and sub-keys may have been included
|
|
# already, so do a deep merge update if the root key already exists.
|
|
dict_deep_update(clean_item[key_root], value)
|
|
else:
|
|
# The root key does not exist, so add it. Fields
|
|
# extension only allows nested referencing on dicts, so
|
|
# this won't overwrite anything.
|
|
clean_item[key_root] = value
|
|
else:
|
|
# The item value to include is not a dict, or, it is a dict but the
|
|
# key path is for the whole value, not a sub-key. Include the entire
|
|
# value in the cleaned item.
|
|
clean_item[key_root] = source[key_root]
|
|
else:
|
|
# The key, or root key of a multi-part key, is not present in the item,
|
|
# so it is ignored
|
|
pass
|
|
return clean_item
|
|
|
|
# For an item built up for included fields, remove excluded fields. This
|
|
# modifies `source` in place.
|
|
def exclude_fields(source: Dict[str, Any], fields: Optional[Set[str]]) -> None:
|
|
for key_path in fields or []:
|
|
key_path_part = key_path.split(".")
|
|
key_root = key_path_part[0]
|
|
if key_root in source:
|
|
if isinstance(source[key_root], dict) and len(key_path_part) > 1:
|
|
# Walk the nested path of this key to remove the leaf-key
|
|
exclude_fields(source[key_root], fields={".".join(key_path_part[1:])})
|
|
# If, after removing the leaf-key, the root is now an empty
|
|
# dict, remove it entirely
|
|
if not source[key_root]:
|
|
del source[key_root]
|
|
else:
|
|
# The key's value is not a dict, or there is no sub-key to remove. The
|
|
# entire key can be removed from the source.
|
|
source.pop(key_root, None)
|
|
else:
|
|
# The key to remove does not exist on the source, so it is ignored
|
|
pass
|
|
|
|
# Coalesce incoming type to a dict
|
|
item = dict(item)
|
|
|
|
clean_item = include_fields(item, include)
|
|
|
|
# If, after including all the specified fields, there are no included properties,
|
|
# return just id and collection.
|
|
if not clean_item:
|
|
return Item({"id": item["id"], "collection": item["collection"]})
|
|
|
|
exclude_fields(clean_item, exclude)
|
|
|
|
return Item(**clean_item)
|
|
|
|
|
|
def dict_deep_update(merge_to: Dict[str, Any], merge_from: Dict[str, Any]) -> None:
|
|
"""Perform a deep update of two dicts.
|
|
|
|
merge_to is updated in-place with the values from merge_from.
|
|
merge_from values take precedence over existing values in merge_to.
|
|
"""
|
|
for k, v in merge_from.items():
|
|
if (
|
|
k in merge_to
|
|
and isinstance(merge_to[k], dict)
|
|
and isinstance(merge_from[k], dict)
|
|
):
|
|
dict_deep_update(merge_to[k], merge_from[k])
|
|
else:
|
|
merge_to[k] = v
|