Source code for dag_cbor.encoding

"""
    Encoding functions for DAG-CBOR codec.
"""

from __future__ import annotations # See https://peps.python.org/pep-0563/

from io import BufferedIOBase, BytesIO
import math
import struct
from typing import Any, Dict, List, Optional, overload, Union
import unicodedata

from typing_extensions import Literal, TypedDict
from typing_validation import validate

from multiformats import varint, multicodec, CID

from ..ipld import IPLDKind, IPLDObjPath
from .err import CBOREncodingError, DAGCBOREncodingError

__all__ = ("CBOREncodingError", "DAGCBOREncodingError")

_dag_cbor_multicodec = multicodec.get("dag-cbor")
_dag_cbor_code: int = _dag_cbor_multicodec.code
_dag_cbor_code_bytes: bytes = varint.encode(_dag_cbor_code)
_dag_cbor_code_nbytes: int = len(_dag_cbor_code_bytes)

[docs] def check_key_compliance(value: Dict[str, Any]) -> None: """ Enforces DAG-CBOR compliance for keys in a mapping. """ validate(value, Dict[str, Any]) _check_key_compliance(value)
[docs] def canonical_order_dict(value: Dict[str, Any]) -> Dict[str, Any]: """ Returns a dictionary with canonically ordered keys, according to the DAG-CBOR specification. Specifically, keys are sorted increasingly first by length and then by the lexicographic ordering of the corresponding UTF-8 bytestrings. """ validate(value, Dict[str, Any]) _check_key_compliance(value) # sort keys canonically return _canonical_order_dict(value)
@overload def encode(data: IPLDKind, stream: None = None, *, include_multicodec: bool = False, normalize_strings: Optional[Literal["NFC", "NFKC", "NFD", "NFKD"]] = None ) -> bytes: ... # pragma: no cover @overload def encode(data: IPLDKind, stream: BufferedIOBase, *, include_multicodec: bool = False, normalize_strings: Optional[Literal["NFC", "NFKC", "NFD", "NFKD"]] = None ) -> int: ... # pragma: no cover
[docs] def encode(data: IPLDKind, stream: Optional[BufferedIOBase] = None, *, include_multicodec: bool = False, normalize_strings: Optional[Literal["NFC", "NFKC", "NFD", "NFKD"]] = None ) -> Union[bytes, int]: r""" Encodes the given data with the DAG-CBOR codec. By default, the encoded data is written to an internal stream and the bytes are returned at the end (as a `bytes` object). .. code-block:: python def encode(data: IPLDKind, stream: None = None) -> bytes: ... Example usage: >>> dag_cbor.encode({'a': 12, 'b': 'hello!'}) b'\xa2aa\x0cabfhello!' If a ``stream`` is given, the encoded data is written to the stream and the number of bytes written is returned: .. code-block:: python def encode(data: IPLDKind, stream: BufferedIOBase) -> int: ... Example usage with a stream: >>> from io import BytesIO >>> stream = BytesIO() >>> dag_cbor.encode({'a': 12, 'b': 'hello!'}, stream=stream) 13 >>> stream.getvalue() b'\xa2aa\x0cabfhello!' :param data: the DAG data to be encoded :param stream: an optional stream into which the encoded data should be written :param include_multicodec: if :obj:`True`, the encoded data is prefixed by the multicodec code for ``'dag-cbor'`` (see `multicodec.wrap <https://multiformats.readthedocs.io/en/latest/api/multiformats.multicodec.html#wrap>`_). :param normalize_strings: whether strings should be normalised prior to encoding :raises CBOREncodingError: if an :obj:`int` outside of ``range(-2**64, 2**64)`` is encountered :raises DAGCBOREncodingError: if a value of type other than :obj:`None`, :obj:`bool`, :obj:`int`, :obj:`float`, :obj:`str`, :obj:`bytes`, :obj:`list`, :obj:`dict`, or :class:`~multiformats.cid.CID` is encountered :raises DAGCBOREncodingError: if attempting to encode the special :obj:`float` values ``NaN``, ``Infinity`` and ``-Infinity`` :raises DAGCBOREncodingError: if a key of a dictionary is not a string """ validate(stream, Optional[BufferedIOBase]) validate(include_multicodec, bool) options: _EncodeOptions = {} if normalize_strings is not None: validate(normalize_strings, Literal["NFC", "NFKC", "NFD", "NFKD"]) options["normalize_strings"] = normalize_strings path = IPLDObjPath() if stream is None: internal_stream = BytesIO() if include_multicodec: internal_stream.write(_dag_cbor_code_bytes) _encode(internal_stream, data, path, options) return internal_stream.getvalue() num_bytes = 0 if include_multicodec: stream.write(_dag_cbor_code_bytes) num_bytes += _dag_cbor_code_nbytes num_bytes += _encode(stream, data, path, options) return num_bytes
class _EncodeOptions(TypedDict, total=False): r""" Options passed around to encoding sub-routines. """ normalize_strings: Literal["NFC", "NFKC", "NFD", "NFKD"] r""" Optional Unicode normalization to be performed on UTF-8 strings prior to byte encoding. """ def _encode(stream: BufferedIOBase, value: IPLDKind, path: IPLDObjPath, options: _EncodeOptions) -> int: # pylint: disable = too-many-return-statements, too-many-branches if isinstance(value, bool): # must go before int check # major type 0x7 (additional info 20 and 21) return _encode_bool(stream, value, path, options) if isinstance(value, int): # major types 0x0 and 0x1 return _encode_int(stream, value, path, options) if isinstance(value, bytes): # major type 0x2 return _encode_bytes(stream, value, path, options) if isinstance(value, str): # major type 0x3 return _encode_str(stream, value, path, options) if isinstance(value, list): # major type 0x4 return _encode_list(stream, value, path, options) if isinstance(value, dict): # major type 0x5 return _encode_dict(stream, value, path, options) if isinstance(value, CID): # major type 0x6 return _encode_cid(stream, value, path, options) if value is None: # major type 0x7 (additional info 22) return _encode_none(stream, value, path, options) if isinstance(value, float): # major type 0x7 (additional info 27) return _encode_float(stream, value, path, options) err = f"Error encoding value at {path}: value is not of IPLD kind (found type {type(value)})." raise DAGCBOREncodingError(err) def _encode_head(stream: BufferedIOBase, major_type: int, arg: int) -> int: if arg < 24: # argument value as additional info in leading byte head = struct.pack(">B", (major_type<<5)|arg) elif arg <= 255: # leading byte + 1 byte argument value (additional info = 24) head = struct.pack(">BB", (major_type<<5)|24, arg) elif arg <= 65535: # leading byte + 2 bytes argument value (additional info = 25) head = struct.pack(">BH", (major_type<<5)|25, arg) elif arg <= 4294967295: # leading byte + 4 bytes argument value (additional info = 26) head = struct.pack(">BL", (major_type<<5)|26, arg) else: # leading byte + 8 bytes argument value (additional info = 27) head = struct.pack(">BQ", (major_type<<5)|27, arg) stream.write(head) return len(head) def _encode_int(stream: BufferedIOBase, value: int, path: IPLDObjPath, options: _EncodeOptions) -> int: if value >= 18446744073709551616: # unsigned int must be < 2**64 err = f"Error encoding integer value at {path}: Unsigned integer out of range." raise CBOREncodingError(err) if value < -18446744073709551616: # negative int must be >= -2**64 err = f"Error encoding integer value at {path}: Negative integer out of range." raise CBOREncodingError(err) if value >= 0: # unsigned int return _encode_head(stream, 0x0, value) # negative int return _encode_head(stream, 0x1, -1-value) def _encode_bytes(stream: BufferedIOBase, value: bytes, path: IPLDObjPath, options: _EncodeOptions) -> int: num_head_bytes = _encode_head(stream, 0x2, len(value)) stream.write(value) return num_head_bytes+len(value) def _encode_str(stream: BufferedIOBase, value: str, path: IPLDObjPath, options: _EncodeOptions) -> int: if "normalize_strings" in options: value = unicodedata.normalize(options["normalize_strings"], value) utf8_value: bytes = value.encode("utf-8", errors="strict") num_head_bytes = _encode_head(stream, 0x3, len(utf8_value)) stream.write(utf8_value) return num_head_bytes+len(utf8_value) def _encode_list(stream: BufferedIOBase, value: List[Any], path: IPLDObjPath, options: _EncodeOptions) -> int: num_bytes_written = _encode_head(stream, 0x4, len(value)) for idx, item in enumerate(value): num_bytes_written += _encode(stream, item, path/idx, options) return num_bytes_written def _encode_dict(stream: BufferedIOBase, value: Dict[str, Any], path: IPLDObjPath, options: _EncodeOptions) -> int: _check_key_compliance(value, path) if "normalize_strings" in options: nf = options["normalize_strings"] value = {unicodedata.normalize(nf, k): v for k, v in value.items()} utf8key_val_pairs = [(k, k.encode("utf-8", errors="strict"), v) for k, v in value.items()] # 1. sort keys canonically: sorted_utf8key_val_pairs = sorted(utf8key_val_pairs, key=lambda i: (len(i[1]), i[1])) # 2. encode key-value pairs (keys already utf-8 encoded): num_bytes_written = _encode_head(stream, 0x5, len(value)) for k, utf8k, v in sorted_utf8key_val_pairs: num_bytes_written += _encode_head(stream, 0x3, len(utf8k)) stream.write(utf8k) num_bytes_written += len(utf8k) num_bytes_written += _encode(stream, v, path/k, options) return num_bytes_written def _encode_cid(stream: BufferedIOBase, value: CID, path: IPLDObjPath, options: _EncodeOptions) -> int: num_bytes_written = _encode_head(stream, 0x6, 42) num_bytes_written += _encode_bytes(stream, b"\0" + bytes(value), path, options) return num_bytes_written def _encode_bool(stream: BufferedIOBase, value: bool, path: IPLDObjPath, options: _EncodeOptions) -> int: return _encode_head(stream, 0x7, 21 if value else 20) def _encode_none(stream: BufferedIOBase, value: None, path: IPLDObjPath, options: _EncodeOptions) -> int: return _encode_head(stream, 0x7, 22) def _encode_float(stream: BufferedIOBase, value: float, path: IPLDObjPath, options: _EncodeOptions) -> int: if math.isnan(value): err = f"Error encoding float value at {path}: NaN is not allowed." raise DAGCBOREncodingError(err) if math.isinf(value): s = "" if value > 0 else "-" err = f"Error encoding float value at {path}: {s}Infinity is not allowed." raise DAGCBOREncodingError(err) # special head, with double encoding for 4B argument value head = struct.pack(">Bd", (0x7<<5)|27, value) stream.write(head) return len(head) def _check_key_compliance(value: Dict[str, Any], path: Optional[IPLDObjPath] = None) -> None: """ Check keys for DAG-CBOR compliance. """ for idx, k in enumerate(value.keys()): if not isinstance(k, str): err = "" if path is None else f"Error encoding value of map kind at {path}: " err += f"key for key-value pair at position {idx} is not a string." raise DAGCBOREncodingError(err) def _canonical_order_dict(value: Dict[str, Any]) -> Dict[str, Any]: utf8key_key_val_pairs = [(k.encode("utf-8", errors="strict"), k, v) for k, v in value.items()] sorted_utf8key_key_val_pairs = sorted(utf8key_key_val_pairs, key=lambda i: (len(i[0]), i[0])) return {k: v for _, k, v in sorted_utf8key_key_val_pairs}