"""
Functions to generate random data.
"""
# pylint: disable = global-statement
from __future__ import annotations # See https://peps.python.org/pep-0563/
from contextlib import contextmanager
import math
from random import Random # pylint: disable = import-self
import sys
from types import MappingProxyType
from typing import Any, Dict, Iterator, List, Mapping, Optional, Tuple
from typing_validation import validate
from multiformats import multicodec, multibase, multihash, CID
from .ipld import IPLDKind
from .encoding import encode, _canonical_order_dict
_min_int = -18446744073709551616
_max_int = 18446744073709551615
_min_float = -sys.float_info.max
_max_float = sys.float_info.max
_min_codepoint = 0x00
_max_codepoint = 0x10FFFF
_default_options: Dict[str, Any] = {
"min_int": -100,
"max_int": 100,
"min_bytes": 0,
"max_bytes": 8,
"min_chars": 0,
"max_chars": 8,
"min_codepoint": 0x21,
"max_codepoint": 0x7e,
"min_len": 0,
"max_len": 8,
"max_nesting": 2,
"canonical": True,
"min_float": -100.0,
"max_float": 100.0,
"float_decimals": 3,
"include_cid": True,
}
_options = _default_options
_rand = Random(0)
[docs]
def reset_options() -> None:
"""
Resets random generation options to their default values.
"""
global _options
global _rand
_options = _default_options
_rand = Random(0)
[docs]
def default_options() -> Mapping[str, Any]:
"""
Readonly view of the default random generation options.
"""
return MappingProxyType(_default_options)
[docs]
def get_options() -> Mapping[str, Any]:
"""
Readonly view of the current random generation options.
"""
return MappingProxyType(_options)
[docs]
@contextmanager
def options(*,
seed: Optional[int] = None,
min_int: Optional[int] = None,
max_int: Optional[int] = None,
min_bytes: Optional[int] = None,
max_bytes: Optional[int] = None,
min_chars: Optional[int] = None,
max_chars: Optional[int] = None,
min_codepoint: Optional[int] = None,
max_codepoint: Optional[int] = None,
min_len: Optional[int] = None,
max_len: Optional[int] = None,
max_nesting: Optional[int] = None,
canonical: Optional[bool] = None,
min_float: Optional[float] = None,
max_float: Optional[float] = None,
float_decimals: Optional[int] = None,
include_cid: Optional[bool] = None,) -> Iterator[None]:
"""
Returns with-statement context manager for temporary option setting:
.. code-block:: python
with options(**options):
for value in rand_data(num_samples):
...
Options available:
.. code-block::
seed: int # set new random number generator, with this seed
min_int: int # smallest `int` value
max_int: int # largest `int` value
min_bytes: int # min length of `bytes` value
max_bytes: int # max length of `bytes` value
min_chars: int # min length of `str` value
max_chars: int # max length of `str` value
min_codepoint: int # min utf-8 codepoint in `str` value
max_codepoint: int # max utf-8 codepoint in `str` value
min_len: int # min length of `list` and `dict` values
max_len: int # max length of `list` and `dict` values
max_nesting: int # max nesting of collections
canonical: bool # whether `dict` values have canonically ordered keys
min_float: float # smallest `float` value
max_float: float # largest `float` value
float_decimals: int # number of decimals to keep in floats
include_cid: bool # whether to generate CID values
"""
# pylint: disable = too-many-locals, too-many-arguments
global _options
global _rand
_old_options = _options
_old_rand = _rand
try:
set_options(seed=seed,
min_int=min_int, max_int=max_int,
min_bytes=min_bytes, max_bytes=max_bytes,
min_chars=min_chars, max_chars=max_chars,
min_codepoint=min_codepoint, max_codepoint=max_codepoint,
min_len=min_len, max_len=max_len,
max_nesting=max_nesting, canonical=canonical,
min_float=min_float, max_float=max_float,
float_decimals=float_decimals, include_cid=include_cid)
yield
finally:
_options = _old_options
_rand = _old_rand
[docs]
def set_options(*,
seed: Optional[int] = None,
min_int: Optional[int] = None,
max_int: Optional[int] = None,
min_bytes: Optional[int] = None,
max_bytes: Optional[int] = None,
min_chars: Optional[int] = None,
max_chars: Optional[int] = None,
min_codepoint: Optional[int] = None,
max_codepoint: Optional[int] = None,
min_len: Optional[int] = None,
max_len: Optional[int] = None,
max_nesting: Optional[int] = None,
canonical: Optional[bool] = None,
min_float: Optional[float] = None,
max_float: Optional[float] = None,
float_decimals: Optional[int] = None,
include_cid: Optional[bool] = None,) -> None:
"""
Permanently sets random generation options. See :func:`options` for the available options.
"""
# pylint: disable = too-many-branches, too-many-locals, too-many-statements, too-many-arguments
for iarg in (seed, min_int, max_int, min_bytes, max_bytes, min_chars, max_chars,
min_codepoint, max_codepoint, min_len, max_len, max_nesting, float_decimals):
validate(iarg, Optional[int])
for barg in (canonical, include_cid):
validate(barg, Optional[bool])
for farg in (min_float, max_float):
validate(farg, Optional[float])
global _options
global _rand
# set newly passed options
_new_options: Dict[str, Any] = {}
if seed is not None:
_rand = Random(seed)
if min_int is not None:
if min_int < _min_int:
raise ValueError("Value for min_int is not a valid CBOR integer.")
_new_options["min_int"] = min_int
if max_int is not None:
if max_int > _max_int:
raise ValueError("Value for max_int is not a valid CBOR integer.")
_new_options["max_int"] = max_int
if min_bytes is not None:
if min_bytes < 0:
raise ValueError("Value for min_bytes is negative.")
_new_options["min_bytes"] = min_bytes
if max_bytes is not None:
if max_bytes < 0:
raise ValueError("Value for max_bytes is negative.")
_new_options["max_bytes"] = max_bytes
if min_chars is not None:
if min_chars < 0:
raise ValueError("Value for min_chars is negative.")
_new_options["min_chars"] = min_chars
if max_chars is not None:
if max_chars < 0:
raise ValueError("Value for max_chars is negative.")
_new_options["max_chars"] = max_chars
if min_codepoint is not None:
if min_codepoint < _min_codepoint or min_codepoint > _max_codepoint:
raise ValueError("Value for min_codepoint not a valid utf-8 codepoint.")
_new_options["min_codepoint"] = min_codepoint
if max_codepoint is not None:
if max_codepoint < _min_codepoint or max_codepoint > _max_codepoint:
raise ValueError("Value for max_codepoint not a valid utf-8 codepoint.")
_new_options["max_codepoint"] = max_codepoint
if min_len is not None:
if min_len < 0:
raise ValueError("Value for min_len is negative.")
_new_options["min_len"] = min_len
if max_len is not None:
if max_len < 0:
raise ValueError("Value for max_len is negative.")
_new_options["max_len"] = max_len
if max_nesting is not None:
if max_nesting < 0:
raise ValueError("Value for max_nesting is negative.")
_new_options["max_nesting"] = max_nesting
if canonical is not None:
_new_options["canonical"] = canonical
if min_float is not None:
if math.isnan(min_float) or math.isinf(min_float):
raise ValueError("Value for min_float is not a valid CBOR float.")
_new_options["min_float"] = min_float
if max_float is not None:
if math.isnan(max_float) or math.isinf(max_float):
raise ValueError("Value for max_float is not a valid CBOR float.")
_new_options["max_float"] = max_float
if float_decimals is not None:
if float_decimals < 0:
raise ValueError("Value for float_decimals is negative.")
_new_options["float_decimals"] = float_decimals
if include_cid is not None:
_new_options["include_cid"] = include_cid
# pass-through other options with former values
for k, v in _options.items():
if k not in _new_options:
_new_options[k] = v
# check compatibility conditions
if _new_options["min_bytes"] > _new_options["max_bytes"]:
raise ValueError("Value for min_bytes is larger than value for max_bytes.")
if _new_options["min_chars"] > _new_options["max_chars"]:
raise ValueError("Value for min_chars is larger than value for max_chars.")
if _new_options["min_codepoint"] > _new_options["max_codepoint"]:
raise ValueError("Value for min_codepoint is larger than value for max_codepoint.")
if _new_options["min_len"] > _new_options["max_len"]:
raise ValueError("Value for min_len is larger than value for max_len.")
# update options
_options = _new_options
[docs]
def rand_data(n: Optional[int] = None, *, max_nesting: Optional[int] = None) -> Iterator[IPLDKind]:
r"""
Generates a stream of random data.
:param n: the number of samples to be yielded; if :obj:`None`, an infinite stream is yielded
:param max_nesting: the maximum nesting level for containers; if :obj:`None`, value from :func:`get_options` is used
Maximum nesting level for containers:
- the integer value -1 means no containers will be generated
- integer values >= 0 mean that containers will be generated, with items generated by ``random_data(max_nesting=max_nesting-1)``
- no other values are valid
"""
validate(n, Optional[int])
validate(max_nesting, Optional[int])
return _rand_data(n, max_nesting=max_nesting)
def _rand_data(n: Optional[int] = None, *, max_nesting: Optional[int] = None) -> Iterator[IPLDKind]:
if n is not None and n < 0:
raise ValueError()
if max_nesting is None:
max_nesting = _options["max_nesting"]
elif max_nesting < -1:
raise ValueError("Value for max_nesting must be >= -1 (with -1 indicating no containers).")
include_cid = _options["include_cid"]
data_generators: List[Iterator[Any]] = [
_rand_list(max_nesting=max_nesting) if max_nesting >= 0 else iter([]),
_rand_dict(max_nesting=max_nesting) if max_nesting >= 0 else iter([]),
_rand_int(),
_rand_bytes(),
_rand_str(),
_rand_bool_none(),
_rand_float(),
_rand_cid()
]
num_data_generators = len(data_generators) if include_cid else len(data_generators)-1
i = 0
while n is None or i < n:
if max_nesting == -1:
# exclude containers
datatype = _rand.randrange(0x2, num_data_generators)
else:
# include containers
datatype = _rand.randrange(0x0, num_data_generators)
try:
yield next(data_generators[datatype])
except StopIteration as e:
raise RuntimeError("All random streams are infinite, this should not happen.") from e
i += 1
[docs]
def rand_list(n: Optional[int] = None, *, length: Optional[int] = None, max_nesting: Optional[int] = None) -> Iterator[List[Any]]:
"""
Generates a stream of random :obj:`list` data.
:param n: the number of samples to be yielded; if :obj:`None`, an infinite stream is yielded
:param length: size for the lists; if :obj:`None`, a random value is sampled according to the :func:`options`
:param max_nesting: the maximum nesting level for containers; if :obj:`None`, value from :func:`get_options` is used
"""
validate(n, Optional[int])
validate(length, Optional[int])
validate(max_nesting, Optional[int])
return _rand_list(n, length=length, max_nesting=max_nesting)
def _rand_list(n: Optional[int] = None, *, length: Optional[int] = None, max_nesting: Optional[int] = None) -> Iterator[List[Any]]:
if n is not None and n < 0:
raise ValueError()
if length is not None and length < 0:
raise ValueError()
if max_nesting is None:
max_nesting = _options["max_nesting"]
elif max_nesting < 0:
raise ValueError("Value for max_nesting is negative.")
min_len = _options["min_len"]
max_len = _options["max_len"]
i = 0
while n is None or i < n:
_length = length if length is not None else _rand.randint(min_len, max_len)
yield list(_rand_data(_length, max_nesting=max_nesting-1))
i += 1
[docs]
def rand_dict(n: Optional[int] = None, *, length: Optional[int] = None, max_nesting: Optional[int] = None) -> Iterator[Dict[str, Any]]:
"""
Generates a stream of random :obj:`dict` data.
:param n: the number of samples to be yielded; if :obj:`None`, an infinite stream is yielded
:param length: size for the dicts; if :obj:`None`, a random value is sampled according to the :func:`options`
:param max_nesting: the maximum nesting level for containers; if :obj:`None`, value from :func:`get_options` is used
"""
validate(n, Optional[int])
validate(length, Optional[int])
validate(max_nesting, Optional[int])
return _rand_dict(n, length=length, max_nesting=max_nesting)
def _rand_dict(n: Optional[int] = None, *, length: Optional[int] = None, max_nesting: Optional[int] = None) -> Iterator[Dict[str, Any]]:
# pylint: disable = too-many-locals, too-many-branches
if n is not None and n < 0:
raise ValueError()
if length is not None and length < 0:
raise ValueError()
if max_nesting is None:
max_nesting = _options["max_nesting"]
elif max_nesting < 0:
raise ValueError("Value for max_nesting is negative.")
min_len = _options["min_len"]
max_len = _options["max_len"]
canonical = _options["canonical"]
min_chars = _options["min_chars"]
max_chars = _options["max_chars"]
max_codepoint = _options["max_codepoint"]
num_codepoints = max_codepoint-_options["min_codepoint"]
i = 0
while n is None or i < n:
_length = length if length is not None else _rand.randint(min_len, max_len)
# check whether we have enough distinct strings to generate a random dictionary of desired length
if num_codepoints == 1:
num_strings = max_chars-min_chars+1
else:
num_strings = (num_codepoints**min_chars)*(num_codepoints**(max_chars-min_chars+1)-1)//(num_codepoints-1)
if num_strings < _length:
raise ValueError(f"Not enough distinct strings available to make a dictionary of length {_length}")
# generate distinct dictionary keys
if num_codepoints == 1:
key_lengths = _rand.sample(range(min_chars, max_chars+1), _length)
keys = [chr(max_codepoint)*l for l in key_lengths]
else:
keys = []
keys_set = set()
str_generator = _rand_str()
while len(keys) < _length:
try:
s = next(str_generator)
except StopIteration as e:
raise RuntimeError("Random string stream is infinite, this should not happen.") from e
if s not in keys_set:
keys.append(s)
keys_set.add(s)
# generate dictionary
raw_dict = dict(zip(keys, _rand_data(_length, max_nesting=max_nesting-1)))
if canonical:
yield _canonical_order_dict(raw_dict)
else:
yield raw_dict
i += 1
[docs]
def rand_int(n: Optional[int] = None) -> Iterator[int]:
"""
Generates a stream of random :obj:`int` data.
:param n: the number of samples to be yielded; if :obj:`None`, an infinite stream is yielded
"""
validate(n, Optional[int])
return _rand_int(n)
def _rand_int(n: Optional[int] = None) -> Iterator[int]:
if n is not None and n < 0:
raise ValueError()
min_int = _options["min_int"]
max_int = _options["max_int"]
i = 0
while n is None or i < n:
yield _rand.randint(min_int, max_int)
i += 1
[docs]
def rand_bytes(n: Optional[int] = None, *, length: Optional[int] = None) -> Iterator[bytes]:
"""
Generates a stream of random :obj:`bytes` data.
:param n: the number of samples to be yielded; if :obj:`None`, an infinite stream is yielded
:param length: length of the bytestrings; if :obj:`None`, a random value is sampled according to the :func:`options`
"""
validate(n, Optional[int])
validate(length, Optional[int])
return _rand_bytes(n, length=length)
def _rand_bytes(n: Optional[int] = None, *, length: Optional[int] = None) -> Iterator[bytes]:
if n is not None and n < 0:
raise ValueError()
if length is not None and length < 0:
raise ValueError()
min_bytes = _options["min_bytes"]
max_bytes = _options["max_bytes"]
i = 0
while n is None or i < n:
_length = length if length is not None else _rand.randint(min_bytes, max_bytes)
yield bytes([_rand.randint(0, 255) for _ in range(_length)])
i += 1
[docs]
def rand_str(n: Optional[int] = None, *, length: Optional[int] = None) -> Iterator[str]:
"""
Generates a stream of random :obj:`str` data.
:param n: the number of samples to be yielded; if :obj:`None`, an infinite stream is yielded
:param length: length of the strings; if :obj:`None`, a random value is sampled according to the :func:`options`
"""
validate(n, Optional[int])
validate(length, Optional[int])
return _rand_str(n, length=length)
def _rand_str(n: Optional[int] = None, *, length: Optional[int] = None) -> Iterator[str]:
if n is not None and n < 0:
raise ValueError()
if length is not None and length < 0:
raise ValueError()
min_chars = _options["min_chars"]
max_chars = _options["max_chars"]
min_codepoint = _options["min_codepoint"]
max_codepoint = _options["max_codepoint"]
i = 0
while n is None or i < n:
_length = length if length is not None else _rand.randint(min_chars, max_chars)
codepoints = [_rand.randint(min_codepoint, max_codepoint) for _ in range(_length)]
try:
string = "".join(chr(c) for c in codepoints)
string.encode("utf-8", errors="strict")
yield string
i += 1
except UnicodeError:
continue
[docs]
def rand_bool(n: Optional[int] = None) -> Iterator[bool]:
"""
Generates a stream of random :obj:`bool` data.
:param n: the number of samples to be yielded; if :obj:`None`, an infinite stream is yielded
"""
validate(n, Optional[int])
return _rand_bool(n)
def _rand_bool(n: Optional[int] = None) -> Iterator[bool]:
if n is not None and n < 0:
raise ValueError()
i = 0
while n is None or i < n:
x = _rand.randint(0, 1)
yield x == 1
i += 1
[docs]
def rand_bool_none(n: Optional[int] = None) -> Iterator[Optional[bool]]:
"""
Generates a stream of random :obj:`bool` or :obj:`None` data.
:param n: the number of samples to be yielded; if :obj:`None`, an infinite stream is yielded
"""
validate(n, Optional[int])
return _rand_bool_none(n)
def _rand_bool_none(n: Optional[int] = None) -> Iterator[Optional[bool]]:
if n is not None and n < 0:
raise ValueError()
i = 0
while n is None or i < n:
x = _rand.randint(0, 2)
yield None if x == 2 else x == 1
i += 1
[docs]
def rand_float(n: Optional[int] = None) -> Iterator[float]:
"""
Generates a stream of random :obj:`float` data.
:param n: the number of samples to be yielded; if :obj:`None`, an infinite stream is yielded
"""
validate(n, Optional[int])
return _rand_float(n)
def _rand_float(n: Optional[int] = None) -> Iterator[float]:
if n is not None and n < 0:
raise ValueError()
min_float = _options["min_float"]
max_float = _options["max_float"]
float_decimals = _options["float_decimals"]
eps = 10.0**-float_decimals
if min_float >= 0 or max_float <= 0:
# no overflow in `min_float + (max_float-min_float) * random()`, can use `Random.uniform`
i = 0
while n is None or i < n:
x = _rand.uniform(min_float, max_float)
yield x-x%eps
i += 1
else:
# overflow in `min_float + (max_float-min_float) * random()`, cannot use `Random.uniform`
i = 0
while n is None or i < n:
x = 1/(1+max_float/(-min_float))
# x is (-min_float)/(max_float-min_float), the probability of sampling a number in (-min_float, 0)
if _rand.random() < x:
x = _rand.random()*min_float
else:
x = _rand.random()*max_float
yield x-x%eps
i += 1
_cid_multibase = multibase.get("base58btc") # the default base for binary CIDs
_cid_version = 1
_cid_multicodec = multicodec.get("dag-cbor")
_cid_multihash = multihash.get("sha3-512")
[docs]
def rand_data_cid(n: Optional[int] = None, *, max_nesting: Optional[int] = None) -> Iterator[Tuple[IPLDKind, CID]]:
r"""
Generates a stream of random DAG-CBOR data and associated CIDs:
- multibase 'base32'
- CIDv1
- multicodec 'dag-cbor'
- multihash 'sha3-512', with full 512-bit digest
:param n: the number of samples to be yielded; if :obj:`None`, an infinite stream is yielded
:param max_nesting: the maximum nesting level for containers; if :obj:`None`, value from :func:`get_options` is used
"""
validate(n, Optional[int])
return _rand_data_cid(n, max_nesting=max_nesting)
[docs]
def rand_cid(n: Optional[int] = None, *, max_nesting: Optional[int] = None) -> Iterator[CID]:
"""
Generates a stream of random CIDs:
- multibase 'base32'
- CIDv1
- multicodec 'dag-cbor'
- multihash 'sha3-512', with full 512-bit digest
:param n: the number of samples to be yielded; if :obj:`None`, an infinite stream is yielded
:param max_nesting: the maximum nesting level for containers; if :obj:`None`, value from :func:`get_options` is used
"""
validate(n, Optional[int])
return _rand_cid(n, max_nesting=max_nesting)
def _rand_cid(n: Optional[int] = None, *, max_nesting: Optional[int] = None) -> Iterator[CID]:
return (cid for _, cid in _rand_data_cid(n, max_nesting=max_nesting))
def _rand_data_cid(n: Optional[int] = None, *, max_nesting: Optional[int] = None) -> Iterator[Tuple[IPLDKind, CID]]:
if n is not None and n < 0:
raise ValueError()
if max_nesting is None:
max_nesting = _options["max_nesting"]
elif max_nesting < 0:
raise ValueError("Value for max_nesting is negative.")
i = 0
rand_data_generator = _rand_data(max_nesting=max_nesting-1)
while n is None or i < n:
try:
dag_cbor_data = next(rand_data_generator)
binary_data = encode(dag_cbor_data)
except StopIteration as e:
raise RuntimeError("Random digest stream is infinite, this should not happen.") from e
yield (dag_cbor_data, CID(_cid_multibase, _cid_version, _cid_multicodec, _cid_multihash.digest(binary_data)))
i += 1