from collections import defaultdict
from itertools import chain
from typing import Set, Union
import numpy as np
import pandas as pd
from pandasdmx import message, model
from pandasdmx.model import (
DEFAULT_LOCALE,
AllDimensions,
Codelist,
DataAttribute,
DataSet,
DataStructureDefinition,
Dimension,
DimensionComponent,
FacetValueType as FVT,
Item,
Observation,
SeriesKey,
TimeDimension,
)
from pandasdmx.util import DictLike
from pandasdmx.writer.base import BaseWriter
#: Default return type for :func:`write_dataset` and similar methods. Either
#: 'compat' or 'rows'. See the ref:`HOWTO <howto-rtype>`.
DEFAULT_RTYPE = "rows"
# map facet value types to pandas dtypes (incomplete)
FVT_MAP = {
FVT.string: pd.StringDtype(),
FVT.bigInteger: pd.Int64Dtype,
FVT.integer: pd.Int32Dtype,
FVT.long: pd.Int32Dtype,
FVT.short: pd.Int16Dtype,
FVT.decimal: None,
FVT.float: float,
FVT.boolean: pd.BooleanDtype,
FVT.double: np.float64,
FVT.uri: pd.StringDtype(),
FVT.count: pd.Int64Dtype,
FVT.incremental: pd.Int64Dtype,
FVT.inclusiveValueRange: pd.CategoricalDtype,
}
writer = BaseWriter("pandas")
[docs]def to_pandas(obj, *args, **kwargs):
"""Convert an SDMX *obj* to :mod:`pandas` object(s).
See :ref:`pandasdmx.writer.pandas <writer-pandas>`.
"""
return writer.recurse(obj, *args, **kwargs)
# Functions for Python containers
@writer
def _list(obj: list, *args, **kwargs):
"""Convert a :class:`list` of SDMX objects."""
if isinstance(obj[0], Observation):
return write_dataset(obj, *args, **kwargs)
elif isinstance(obj[0], DataSet) and len(obj) == 1:
return writer.recurse(obj[0], *args, **kwargs)
elif isinstance(obj[0], SeriesKey):
assert len(args) == len(kwargs) == 0
return write_serieskeys(obj)
else:
return [writer.recurse(item, *args, **kwargs) for item in obj]
@writer
def _dict(obj: dict, *args, **kwargs):
"""Convert mappings."""
result = {k: writer.recurse(v, *args, **kwargs) for k, v in obj.items()}
result_type = set(type(v) for v in result.values())
if result_type <= {pd.Series, pd.DataFrame}:
if (
len(set(map(lambda s: s.index.name, result.values()))) == 1
and len(result) > 1
):
# Can safely concatenate these to a pd.MultiIndex'd Series.
return pd.concat(result)
else:
# The individual pd.Series are indexed by different dimensions; do
# not concatenate.
return DictLike(result)
elif result_type == {str}:
return pd.Series(result)
elif result_type == {DictLike}:
return result
elif result_type == set():
# No results
return pd.Series()
else:
raise ValueError(result_type)
@writer
def _set(obj: set, *args, **kwargs):
"""Convert :class:`set`."""
result = {writer.recurse(o, *args, **kwargs) for o in obj}
return result
# Functions for message classes
[docs]@writer
def write_datamessage(obj: message.DataMessage, *args, rtype=None, **kwargs):
"""Convert :class:`.DataMessage`.
Parameters
----------
rtype : 'compat' or 'rows', optional
Data type to return; default :data:`.DEFAULT_RTYPE`. See the
:ref:`HOWTO <howto-rtype>`.
kwargs :
Passed to :meth:`write_dataset` for each data set.
Returns
-------
:class:`pandas.Series` or :class:`pandas.DataFrame`
if `obj` has only one data set.
list of (:class:`pandas.Series` or :class:`pandas.DataFrame`)
if `obj` has more than one data set.
"""
# Pass the message's DSD to assist datetime handling
kwargs.setdefault("dsd", obj.dataflow.structure)
# Pass the return type and associated information
kwargs["_rtype"] = rtype or DEFAULT_RTYPE
if kwargs["_rtype"] == "compat":
kwargs["_message_class"] = obj.__class__
kwargs["_observation_dimension"] = obj.observation_dimension
if len(obj.data) == 1:
return writer.recurse(obj.data[0], *args, **kwargs)
else:
return [writer.recurse(ds, *args, **kwargs) for ds in obj.data]
[docs]@writer
def write_structuremessage(obj: message.StructureMessage, include=None, **kwargs):
"""Convert :class:`.StructureMessage`.
Parameters
----------
obj : .StructureMessage
include : iterable of str or str, optional
One or more of the attributes of the StructureMessage (
'category_scheme', 'codelist', etc.) to transform.
kwargs :
Passed to :meth:`write` for each attribute.
Returns
-------
.DictLike
Keys are StructureMessage attributes; values are pandas objects.
"""
all_contents = {
"category_scheme",
"codelist",
"concept_scheme",
"constraint",
"dataflow",
"structure",
"organisation_scheme",
}
# Handle arguments
if include is None:
attr_set = all_contents
else:
attr_set = set([include] if isinstance(include, str) else include)
# Silently discard invalid names
attr_set &= all_contents
attrs = sorted(attr_set)
result: DictLike[str, Union[pd.Series, pd.DataFrame]] = DictLike()
for a in attrs:
dl = writer.recurse(getattr(obj, a), **kwargs)
if len(dl):
# Only add non-empty elements
result[a] = dl
return result
# Functions for model classes
@writer
def _c(obj: model.Component):
"""Convert :class:`.Component`."""
# Raises AttributeError if the concept_identity is missing
return str(obj.concept_identity.id) # type: ignore
@writer
def _cc(obj: model.ContentConstraint, **kwargs):
"""Convert :class:`.ContentConstraint`."""
if len(obj.data_content_region) != 1:
raise NotImplementedError
return writer.recurse(obj.data_content_region[0], **kwargs)
@writer
def _cr(obj: model.CubeRegion, **kwargs):
"""Convert :class:`.CubeRegion`."""
result: DictLike[str, pd.Series] = DictLike()
for dim, memberselection in obj.member.items():
result[dim.id] = pd.Series(
[mv.value for mv in memberselection.values], name=dim.id
)
return result
def get_component_type(component):
lr = component.local_representation
try:
if isinstance(lr.enumerated, Codelist):
return "category"
# Get the facet value type
fvt = lr.non_enumerated[0].value_type
return FVT_MAP.get(fvt) or pd.StringDtype()
except (AttributeError, KeyError):
return "object"
[docs]@writer
def write_dataset(
obj: model.DataSet,
attributes="",
dtype=np.float64,
constraint=None,
datetime=False,
dtypes_from_dsd=False,
**kwargs,
):
"""Convert :class:`~.DataSet`.
See the :ref:`walkthrough <datetime>` for examples of using the `datetime`
argument.
Parameters
----------
obj : :class:`~.DataSet` or iterable of :class:`~.Observation`
attributes : str
Types of attributes to return with the data. A string containing
zero or more of:
- ``'o'``: attributes attached to each :class:`~.Observation` .
- ``'s'``: attributes attached to any (0 or 1) :class:`~.SeriesKey`
associated with each Observation.
- ``'g'``: attributes attached to any (0 or more) :class:`~.GroupKey`
associated with each Observation.
- ``'d'``: attributes attached to the :class:`~.DataSet` containing the
Observations.
dtype : str or :class:`numpy.dtype` or None
Datatype for values. If None, do not return the values of a series.
In this case, `attributes` must not be an empty string so that some
attribute is returned.
constraint : .ContentConstraint, optional
If given, only Observations included by the *constraint* are returned.
datetime : bool or str or .Dimension or dict, optional
If given, return a DataFrame with a :class:`~pandas.DatetimeIndex`
or :class:`~pandas.PeriodIndex` as the index and all other dimensions
as columns. Valid `datetime` values include:
- :class:`bool`: if :obj:`True`, determine the time dimension
automatically by detecting a :class:`~.TimeDimension`.
- :class:`str`: ID of the time dimension.
- :class:`~.Dimension`: the matching Dimension is the time dimension.
- :class:`dict`: advanced behaviour. Keys may include:
- **dim** (:class:`~.Dimension` or :class:`str`): the time dimension
or its ID.
- **axis** (`{0 or 'index', 1 or 'columns'}`): axis on which to place
the time dimension (default: 0).
- **freq** (:obj:`True` or :class:`str` or :class:`~.Dimension`):
produce :class:`pandas.PeriodIndex`. If :class:`str`, the ID of a
Dimension containing a frequency specification. If a Dimension, the
specified dimension is used for the frequency specification.
Any Dimension used for the frequency specification does not
appear in the returned DataFrame.
Returns
-------
:class:`pandas.DataFrame`
- if `attributes` is not ``''``, a data frame with one row per
Observation, ``value`` as the first column, and additional columns
for each attribute;
- if `datetime` is given, various layouts as described above; or
- if `_rtype` (passed from :func:`write_datamessage`) is 'compat',
various layouts as described in the :ref:`HOWTO <howto-rtype>`.
:class:`pandas.Series` with :class:`pandas.MultiIndex`
Otherwise.
"""
# If called directly on a DataSet (rather than a parent DataMessage),
# cannot determine the "dimension at observation level"
rtype = kwargs.setdefault("_rtype", "rows")
# Validate attributes argument
attributes = attributes or ""
try:
attributes = attributes.lower()
except AttributeError:
raise TypeError("'attributes' argument must be str")
if rtype == "compat" and kwargs["_observation_dimension"] is not AllDimensions:
# Cannot return attributes in this case
attributes = ""
elif set(attributes) - {"o", "s", "g", "d"}:
raise ValueError(f"attributes must be in 'osgd'; got {attributes}")
# Iterate on observations
data, indices = defaultdict(list), defaultdict(list)
for observation in getattr(obj, "obs", obj):
# Check that the Observation is within the constraint, if any
key = observation.key.order()
if (not constraint) or key in constraint:
key = tuple(map(str, key.get_values()))
# Add value and attributes
if dtype:
data["value"].append(observation.value)
indices['value'].append(key)
if attributes and attributes != "d":
# attributes at levels obs, series and group
for k, v in observation.attrib.items():
data[k].append(v)
indices[k].append(key)
if isinstance(obj, DataSet) and attributes and "d" in attributes:
# attributes at dataset level
for k, v in obj.attrib.items():
data[k].append(v)
indices[k].append(key)
# Check for a DSD
dsd = kwargs.get("dsd")
if dtypes_from_dsd and not isinstance(dsd, DataStructureDefinition):
raise TypeError(f"If `dtypes_from_dsd` is True, \
`dsd` must be a DataStructureDefinition object.\
Got {type(dsd)}.")
for col_name in data:
if col_name == "value":
if dtypes_from_dsd:
dt = get_component_type(dsd.measures.get("OBS_VALUE"))
else:
dt = dtype
else: # column for an attribute
if dtypes_from_dsd:
dt = get_component_type(dsd.attributes.get(col_name))
else:
dt = "object"
# Extract raw index tuples and values for this column
# For dtype category, we stringify the data
if dt == "category":
data[col_name] = map(str, data[col_name])
# Make pd index adding names
idx = pd.MultiIndex.from_tuples(
indices[col_name], names=observation.key.order().values.keys())
# Replace raw list with pd.Series
data[col_name] = pd.Series(data[col_name], idx, dtype=dt, name=col_name)
# Convert to pd.DataFrame if needed
if attributes:
result = pd.DataFrame.from_dict(data)
else:
result = data["value"]
# Reshape for compatibility with v0.9
result, datetime, kwargs = _dataset_compat(result, datetime, kwargs)
# Handle the datetime argument, if any
return _maybe_convert_datetime(result, datetime, obj=obj, **kwargs)
def _dataset_compat(df, datetime, kwargs):
"""Helper for :meth:`.write_dataset` 0.9 compatibility."""
rtype = kwargs.pop("_rtype")
if rtype != "compat":
return df, datetime, kwargs # Do nothing
# Remove compatibility arguments from kwargs
kwargs.pop("_message_class")
obs_dim = kwargs.pop("_observation_dimension")
if isinstance(obs_dim, list) and len(obs_dim) == 1:
# Unwrap a length-1 list
obs_dim = obs_dim[0]
if obs_dim in (AllDimensions, None):
pass # Do nothing
elif isinstance(obs_dim, TimeDimension):
# Don't modify *df*; only change arguments so that
# _maybe_convert_datetime performs the desired changes
if datetime is False or datetime is True:
# Either datetime is not given, or True without specifying a
# dimension; overwrite
datetime = obs_dim
elif isinstance(datetime, dict):
# Dict argument; ensure the 'dim' key is the same as obs_dim
if datetime.setdefault("dim", obs_dim) != obs_dim:
msg = (
f"datetime={datetime} conflicts with rtype='compat' and"
f" {obs_dim} at observation level"
)
raise ValueError(msg)
else:
assert datetime == obs_dim, (datetime, obs_dim)
elif isinstance(obs_dim, DimensionComponent):
# Pivot all levels except the observation dimension
df = df.unstack([n for n in df.index.names if n != obs_dim.id])
else:
# E.g. some JSON messages have two dimensions at the observation level;
# behaviour is unspecified here, so do nothing.
pass
return df, datetime, kwargs
def _maybe_convert_datetime(df, arg, obj, dsd=None):
"""Helper for :meth:`.write_dataset` to handle datetime indices.
Parameters
----------
df : pandas.DataFrame
arg : dict
From the `datetime` argument to :meth:`write_dataset`.
obj :
From the `obj` argument to :meth:`write_dataset`.
dsd: ~.DataStructureDefinition, optional
"""
if not arg:
# False, None, empty dict: no datetime conversion
return df
# Check argument values
param = dict(dim=None, axis=0, freq=False)
if isinstance(arg, str):
param["dim"] = arg
elif isinstance(arg, DimensionComponent):
param["dim"] = arg.id
elif isinstance(arg, dict):
extra_keys = set(arg.keys()) - set(param.keys())
if extra_keys:
raise ValueError(extra_keys)
param.update(arg)
elif isinstance(arg, bool):
pass # True
else:
raise ValueError(arg)
def _get_dims():
"""Return an appropriate list of dimensions."""
if len(obj.structured_by.dimensions.components):
return obj.structured_by.dimensions.components
elif dsd:
return dsd.dimensions.components
else:
return []
def _get_attrs():
"""Return an appropriate list of attributes."""
if len(obj.structured_by.attributes.components):
return obj.structured_by.attributes.components
elif dsd:
return dsd.attributes.components
else:
return []
if not param["dim"]:
# Determine time dimension
dims = _get_dims()
for dim in dims:
if isinstance(dim, TimeDimension):
param["dim"] = dim
break
if not param["dim"]:
raise ValueError(f"no TimeDimension in {dims}")
# Unstack all but the time dimension and convert
other_dims = list(filter(lambda d: d != param["dim"], df.index.names))
df = df.unstack(other_dims)
df.index = pd.to_datetime(df.index)
if param["freq"]:
# Determine frequency string, Dimension, or Attribute
try:
# pandas version prior to 1.1.0
prefix_mapping = pd.offsets.prefix_mapping
except AttributeError:
# pandas version >= 1.1.0
# See also issue #35482 in the pandas-dev repo
prefix_mapping = pd._libs.tslibs.offsets.prefix_mapping
freq = param["freq"]
if isinstance(freq, str) and freq not in prefix_mapping:
# ID of a Dimension or Attribute
for component in chain(_get_dims(), _get_attrs()):
if component.id == freq:
freq = component
break
# No named dimension in the DSD; but perhaps on the df
if isinstance(freq, str):
if freq in df.columns.names:
freq = Dimension(id=freq)
else:
raise ValueError(freq)
if isinstance(freq, Dimension):
# Retrieve Dimension values from pd.MultiIndex level
level = freq.id
i = df.columns.names.index(level)
values = set(df.columns.levels[i])
if len(values) > 1:
values = sorted(values)
raise ValueError(
"cannot convert to PeriodIndex with " f"non-unique freq={values}"
)
# Store the unique value
freq = values.pop()
# Remove the index level
df.columns = df.columns.droplevel(i)
elif isinstance(freq, DataAttribute): # pragma: no cover
raise NotImplementedError
df.index = df.index.to_period(freq=freq)
if param["axis"] in {1, "columns"}:
# Change axis
df = df.transpose()
return df
@writer
def _dd(obj: model.DimensionDescriptor):
"""Convert :class:`.DimensionDescriptor`."""
return writer.recurse(obj.components)
[docs]@writer
def write_itemscheme(obj: model.ItemScheme, locale=DEFAULT_LOCALE):
"""Convert :class:`.ItemScheme`.
Parameters
----------
locale : str, optional
Locale for names to return.
Returns
-------
pandas.Series
"""
items = {}
seen: Set[Item] = set()
def add_item(item):
"""Recursive helper for adding items."""
# Track seen items
if item in seen:
return
else:
seen.add(item)
# Localized name
row = {"name": item.name.localized_default(locale)}
try:
# Parent ID
row["parent"] = item.parent.id
except AttributeError:
row["parent"] = ""
items[item.id] = row
# Add this item's children, recursively
for child in item.child:
add_item(child)
for item in obj:
add_item(item)
# Convert to DataFrame
result = pd.DataFrame.from_dict(items, orient="index", dtype=object).rename_axis(
obj.id, axis="index"
)
if len(result) and not result["parent"].str.len().any():
# 'parent' column is empty; convert to pd.Series and rename
result = result["name"].rename(obj.name.localized_default(locale))
return result
@writer
def _mv(obj: model.MemberValue):
return obj.value
@writer
def _na(obj: model.NameableArtefact):
return str(obj.name)
def write_serieskeys(obj):
result = []
for sk in obj:
result.append({dim: kv.value for dim, kv in sk.order().values.items()})
# TODO perhaps return as a pd.MultiIndex if that is more useful
return pd.DataFrame(result)