Source code for dag_cbor.decoding

"""
    Deconding function for DAG-CBOR codec.
"""

from __future__ import annotations # See https://peps.python.org/pep-0563/

from io import BufferedIOBase, BytesIO
import math
import struct
import sys
from typing import Any, Dict, Callable, List, Optional, Sequence, Tuple, Union, cast
import unicodedata
from typing_extensions import Literal, Protocol, TypedDict
from typing_validation import validate

from multiformats import multicodec, CID, varint

from ..ipld import IPLDKind
from ..encoding import _dag_cbor_code
from .err import CBORDecodingError, DAGCBORDecodingError
from . import _err
from ._stream import Stream

__all__ = ("CBORDecodingError", "DAGCBORDecodingError")

[docs] class DecodeCallback(Protocol): r""" Type of optional callbacks for the :func:`decode` function. """ def __call__(self, value: IPLDKind, num_bytes_read: int) -> None: ...
class _DecodeOptions(TypedDict, total=False): r""" Options passed around to decoding sub-routines. """ callback: "DecodeCallback" r""" An optional callback to be called on each decoded item. """ normalize_strings: Literal["NFC", "NFKC", "NFD", "NFKD"] r""" Optional Unicode normalization to be performed on decoded UTF-8 strings. """
[docs] def decode(stream_or_bytes: Union[BufferedIOBase, bytes], *, allow_concat: bool = False, callback: Optional["DecodeCallback"] = None, require_multicodec: bool = False, normalize_strings: Literal["NFC", "NFKC", "NFD", "NFKD", None] = None) -> IPLDKind: r""" Decodes and returns a single data item from the given ``stream_or_bytes``, with the DAG-CBOR codec. A simple use for the optional ``callback`` argument is to count the number of bytes read from the stream: >>> import dag_cbor >>> from io import BytesIO >>> class BytesReadCounter: ... _num_bytes_read = 0 ... def __call__(self, _, num_bytes_read): ... self._num_bytes_read += num_bytes_read ... def __int__(self): ... return self._num_bytes_read ... >>> encoded_bytes = b'\xa2aa\x0cabfhello!\x82\x00\x01' >>> len(encoded_bytes) 16 >>> stream = BytesIO(encoded_bytes) >>> bytes_read_cnt = BytesReadCounter() >>> dag_cbor.decode(stream, allow_concat=True, callback=bytes_read_cnt) {'a': 12, 'b': 'hello!'} >>> int(bytes_read_cnt) 13 >>> bytes_remaining = stream.read() >>> bytes_remaining b'\x82\x00\x01' >>> len(bytes_remaining) 3 >>> dag_cbor.decode(bytes_remaining) [0, 1] :param stream_or_bytes: the bytes object or bytes stream to decode :param allow_concat: whether to allow partial stream decoding (if this is :obj:`False`, a byte stream will always be consumed in its entirety) :param callback: optional callback to be invoked as ``callback(item, num_bytes_read)`` every time an item is decoded, where ``num_bytes_read`` is the number of bytes read decoding the item (excluding sub-items, in the case of lists or dictionaries). :param require_multicodec: if :obj:`True`, the data being decoded must be prefixed by the multicodec code for ``'dag-cbor'`` (see `multicodec.unwrap <https://multiformats.readthedocs.io/en/latest/api/multiformats.multicodec.html#unwrap>`_). :param normalize_strings: whether strings should be normalised after decoding :raises CBORDecodingError: while reading the leading byte of a data item head, if no bytes are available :raises CBORDecodingError: while reading the argument bytes of a data item head, if the expected number of argument bytes is not available :raises CBORDecodingError: while decoding the data of a bytestring or string, if the expected number of data bytes is not available :raises CBORDecodingError: while decoding the items of a list or a map (keys and values), if the expected number of items is not available :raises CBORDecodingError: if an invalid utf-8 byte sequence is encountered while attempting to decode a string :raises DAGCBORDecodingError: if attempting to decode the special :obj:`float` values ``NaN``, ``Infinity`` and ``-Infinity`` :raises DAGCBORDecodingError: if the additional info is greater than 27, or different from 27 for major type 7 :raises DAGCBORDecodingError: if an integer value was not minimally encoded :raises DAGCBORDecodingError: if a key of a map is not a string :raises DAGCBORDecodingError: if a map has repeated keys :raises DAGCBORDecodingError: if map keys are not in canonical order :raises DAGCBORDecodingError: if a tag (major type 6) different than 42 (for CID data) is encountered :raises DAGCBORDecodingError: if non-bytestring data is found where CID data is expected (tag 42) :raises DAGCBORDecodingError: if a simple value (major type 7) different from 20 (False), 21 (True) or 22 (None) is encountered :raises DAGCBORDecodingError: if ``require_multicodec`` is set to :obj:`True` and the bytes are not prefixed by the ``'dag-cbor'`` multicodec code :raises DAGCBORDecodingError: if ``allow_concat`` is set to :obj:`False` and the decoding did not use all available bytes """ validate(stream_or_bytes, Union[BufferedIOBase, bytes]) validate(allow_concat, bool) validate(require_multicodec, bool) options: _DecodeOptions = {} if callback is not None: options["callback"] = callback if normalize_strings is not None: validate(normalize_strings, Literal["NFC", "NFKC", "NFD", "NFKD"]) options["normalize_strings"] = normalize_strings if isinstance(stream_or_bytes, bytes): _stream: BufferedIOBase = BytesIO(stream_or_bytes) else: _stream = stream_or_bytes if require_multicodec: code, _, _stream = multicodec.unwrap_raw(_stream) stream = Stream(_stream, varint.encode(code)) if code != _dag_cbor_code: raise DAGCBORDecodingError(_err._required_multicodec(stream)) else: stream = Stream(_stream) data, _ = _decode_item(stream, options) if not allow_concat: remaining_bytes = stream.read() if len(remaining_bytes) > 0: raise DAGCBORDecodingError(_err._multiple_top_level_items(stream)) return data
def _decode_item(stream: Stream, options: _DecodeOptions) -> Tuple[IPLDKind, int]: major_type, arg, num_bytes_read = _decode_head(stream) ret: Optional[Tuple[IPLDKind, int]] = None assert 0x0 <= major_type <= 0x7, f"Major type must be one of 0x0-0x7, found 0x{major_type:x} instead." if isinstance(arg, float): # Major type 0x7 (float case): assert major_type == 0x7, f"Major type for float must be 0x7, found 0x{major_type:x} instead." if math.isnan(arg) or math.isinf(arg): raise DAGCBORDecodingError(_err._invalid_float(stream, arg)) ret = (arg, num_bytes_read) elif major_type <= 0x1: # Major types 0x0 and 0x1: ret = (arg if major_type == 0x0 else -1-arg, num_bytes_read) else: # Major types 0x2-0x6 and 0x7 (bool/null case): value, num_bytes_further_read = _decoders[major_type](stream, arg, options) ret = (value, num_bytes_read+num_bytes_further_read) if "callback" in options: options["callback"](*ret) return ret def _decode_head(stream: Stream) -> Tuple[int, Union[int, float], int]: # pylint: disable = too-many-branches # read leading byte res = stream.read(1) if len(res) < 1: raise CBORDecodingError(_err._unexpected_eof(stream, "leading byte of data item head", 1, include_prev_snapshot=False)) leading_byte = res[0] major_type = leading_byte >> 5 additional_info = leading_byte & 0b11111 # read argument value and return (major_type, arg, num_bytes_read) if additional_info < 24: # argument value = additional info return (major_type, additional_info, 1) if additional_info > 27 or (major_type == 0x7 and additional_info != 27): raise DAGCBORDecodingError(_err._invalid_additional_info(stream, additional_info, major_type)) argument_nbytes = 1<<(additional_info-24) res = stream.read(argument_nbytes) if len(res) < argument_nbytes: raise CBORDecodingError(_err._unexpected_eof(stream, f"{argument_nbytes} byte argument of data item head", argument_nbytes)) if additional_info == 24: # 1 byte of unsigned int argument value to follow if res[0] < 24: raise DAGCBORDecodingError(_err._excessive_int_size(stream, res[0], 1, 0)) return (major_type, res[0], 2) if additional_info == 25: # 2 bytes of unsigned int argument value to follow arg = struct.unpack(">H", res)[0] if arg <= 255: raise DAGCBORDecodingError(_err._excessive_int_size(stream, arg, 2, 1)) return (major_type, arg, 3) if additional_info == 26: # 4 bytes of unsigned int argument value to follow arg = struct.unpack(">L", res)[0] if arg <= 65535: if arg <= 255: raise DAGCBORDecodingError(_err._excessive_int_size(stream, arg, 4, 1)) raise DAGCBORDecodingError(_err._excessive_int_size(stream, arg, 4, 2)) return (major_type, arg, 5) # necessarily additional_info == 27 if major_type == 0x7: # 8 bytes of float argument value to follow return (major_type, struct.unpack(">d", res)[0], 9) # 8 bytes of unsigned int argument value to follow arg = struct.unpack(">Q", res)[0] if arg <= 4294967295: if arg <= 255: raise DAGCBORDecodingError(_err._excessive_int_size(stream, arg, 8, 1)) if arg <= 65535: raise DAGCBORDecodingError(_err._excessive_int_size(stream, arg, 8, 2)) raise DAGCBORDecodingError(_err._excessive_int_size(stream, arg, 8, 4)) return (major_type, arg, 9) def _decode_bytes(stream: Stream, length: int, options: _DecodeOptions) -> Tuple[bytes, int]: res = stream.read(length) if len(res) < length: raise CBORDecodingError(_err._unexpected_eof(stream, f"{length} bytes of bytestring", length)) return (res, length) def _decode_str(stream: Stream, length: int, options: _DecodeOptions) -> Tuple[str, int]: res = stream.read(length) if len(res) < length: raise CBORDecodingError(_err._unexpected_eof(stream, f"{length} bytes of string", length)) try: s = res.decode(encoding="utf-8", errors="strict") except UnicodeDecodeError as e: raise CBORDecodingError(_err._unicode(stream, length, e.start, e.end, e.reason)) # pylint: disable = raise-missing-from if "normalize_strings" in options: s = unicodedata.normalize(options["normalize_strings"], s) return (s, length) def _decode_list(stream: Stream, length: int, options: _DecodeOptions) -> Tuple[List[Any], int]: list_head_snapshot = stream.curr_snapshot l: List[Any] = [] for idx in range(length): try: item, _ = _decode_item(stream, options) l.append(item) except CBORDecodingError as e: raise CBORDecodingError(_err._list_item(list_head_snapshot, idx, length, e)) # pylint: disable = raise-missing-from return (l, 0) def _decode_dict_key(stream: Stream, key_idx: int, dict_length: int, options: _DecodeOptions) -> Tuple[str, int, bytes]: # pylint: disable = too-many-return-statements, too-many-branches major_type, arg, num_bytes_read = _decode_head(stream) ret: Optional[Tuple[IPLDKind, int]] = None if major_type != 0x3: raise DAGCBORDecodingError(_err._dict_key_type(stream, major_type)) assert not isinstance(arg, float) str_length = arg str_bytes: bytes = stream.read(str_length) if len(str_bytes) < str_length: raise CBORDecodingError(_err._unexpected_eof(stream, f"{str_length} bytes of string", str_length)) try: s = str_bytes.decode(encoding="utf-8", errors="strict") except UnicodeDecodeError as e: raise CBORDecodingError(_err._unicode(stream, str_length, e.start, e.end, e.reason)) # pylint: disable = raise-missing-from if "normalize_strings" in options: s = unicodedata.normalize(options["normalize_strings"], s) ret = (s, num_bytes_read+str_length) if "callback" in options: options["callback"](*ret) return ret+(str_bytes,) def _decode_dict(stream: Stream, length: int, options: _DecodeOptions) -> Tuple[Dict[str, Any], int]: # pylint: disable = too-many-locals dict_head_snapshot = stream.curr_snapshot d: Dict[str, Any] = {} key_bytes_list: List[bytes] = [] for i in range(length): try: k, _, k_bytes = _decode_dict_key(stream, i, length, options) except CBORDecodingError as e: raise CBORDecodingError(_err._dict_item(dict_head_snapshot, "key", i, length, e)) # pylint: disable = raise-missing-from if k in d: raise DAGCBORDecodingError(_err._duplicate_dict_key(dict_head_snapshot, stream, k, i, length)) try: v, _ = _decode_item(stream, options) except CBORDecodingError as e: raise CBORDecodingError(_err._dict_item(dict_head_snapshot, "value", i, length, e)) # pylint: disable = raise-missing-from d[k] = v key_bytes_list.append(k_bytes) # check that keys are sorted canonically assert len(key_bytes_list) == length sorted_key_bytes_list = sorted(key_bytes_list, key=lambda e: (len(e), e)) for idx0, (kb0, kb1) in enumerate(zip(key_bytes_list, sorted_key_bytes_list)): if kb0 != kb1: idx1 = key_bytes_list.index(kb1) raise DAGCBORDecodingError(_err._dict_key_order(dict_head_snapshot, kb0, idx0, kb1, idx1, length)) return (d, 0) def _decode_cid(stream: Stream, arg: int, options: _DecodeOptions) -> Tuple[CID, int]: if arg != 42: raise DAGCBORDecodingError(_err._invalid_tag(stream, arg)) cid_head_snapshots = stream.prev_snapshot, stream.curr_snapshot try: if "callback" in options: options = cast(_DecodeOptions, {**options}) del options["callback"] cid_bytes, num_bytes_read = _decode_item(stream, options) except CBORDecodingError as e: raise CBORDecodingError(_err._cid(cid_head_snapshots, e)) # pylint: disable = raise-missing-from if not isinstance(cid_bytes, bytes): raise DAGCBORDecodingError(_err._cid_bytes(cid_head_snapshots, stream, cid_bytes)) if not cid_bytes[0] == 0: raise DAGCBORDecodingError(_err._cid_multibase(cid_head_snapshots, stream, cid_bytes)) return (CID.decode(cid_bytes[1:]), num_bytes_read) def _decode_bool_none(stream: Stream, arg: int, options: _DecodeOptions) -> Tuple[Optional[bool], int]: if arg == 20: return (False, 0) if arg == 21: return (True, 0) if arg == 22: return (None, 0) raise DAGCBORDecodingError(_err._simple_value(stream, arg)) def _decode_dummy(stream: Stream, arg: int, options: _DecodeOptions) -> Tuple[None, int]: assert False, f"Major type {arg} does not have an associated decoder." _decoders: Tuple[Callable[[Stream, int, _DecodeOptions], Tuple[IPLDKind, int]], ...] = ( _decode_dummy, _decode_dummy, _decode_bytes, _decode_str, _decode_list, _decode_dict, _decode_cid, _decode_bool_none )