Source code for n6sdk.data_spec.fields

# -*- coding: utf-8 -*-

# Copyright (c) 2013-2014 NASK. All rights reserved.

"""
.. note::

   For basic information how to use the classes defined in this module
   -- please consult the :ref:`data_spec_class` chapter of the tutorial,
   in particular the :ref:`n6sdk_field_classes` and
   :ref:`custom_field_classes` sections.
"""


import collections
import datetime
import re

import ipaddr

from n6sdk.addr_helpers import (
    ip_network_as_tuple,
)
from n6sdk.datetime_helpers import (
    datetime_utc_normalize,
    parse_iso_datetime_to_utc,
)
from n6sdk.encoding_helpers import (
    ascii_str,
    as_unicode,
)
from n6sdk.exceptions import (
    FieldValueError,
    FieldValueTooLongError,
)
from n6sdk.regexes import (
    CC_SIMPLE_REGEX,
    DOMAIN_ASCII_LOWERCASE_REGEX,
    EMAIL_SIMPLIFIED_REGEX,
    IBAN_REGEX,
    IPv4_STRICT_DECIMAL_REGEX,
    IPv4_CIDR_NETWORK_REGEX,
    IPv4_ANONYMIZED_REGEX,
    SOURCE_REGEX,
)



#
# The base field specification class

[docs]class Field(object): """ The base class for all data field specification classes. It has two (overridable/extendable) methods: :meth:`clean_param_value` and :meth:`clean_result_value` (see below). Note that fields can be customized in two ways: * by subclassing (and overridding/extending some of their attributes/methods); * by specifying custom per-instance values of any of the class-defined attributes -- by passing them as keyword arguments to the constructor of a particular class. Constructors of all field classes accept the following keyword-only arguments: * `in_result` (default: :obj:`None`): One of: ``'required'``, ``'optional'``, :obj:`None`. * `in_params` (default: :obj:`None`: One of: ``'required'``, ``'optional'``, :obj:`None`. * `single_param` (default: :obj:`False`): If false: multiple query parameter values are allowed. * `extra_params` (default: :obj:`None`): A dictionary that maps parameter *subnames* (second parts of *dotted names*) to instances of :class:`Field` or of its subclass. * `custom_info` (default: an empty dictionary): A dictionary containing arbitrary data (accessible as the :attr:`custom_info` instance attribute). * **any** keyword arguments whose names are the names of class-level attributes (see the second point in the paragraph above). """ def __init__(self, **kwargs): self._init_kwargs = kwargs self._set_public_attrs(**kwargs) def __repr__(self): return '{}({})'.format( self.__class__.__name__, ', '.join( '{}={!r}'.format(key, value) for key, value in sorted(self._init_kwargs.iteritems()))) # # overridable methods
[docs] def clean_param_value(self, value): """ The method called by *data specification*'s parameter cleaning methods. Args: `value`: A single parameter value (being *always* a :class:`str` or :class:`unicode` instance). Returns: The value after necessary cleaning (adjustment/coercion/etc. and validation). Raises: Any instance/subclass of :exc:`~exceptions.Exception` (especially a :exc:`n6sdk.exceptions.FieldValueError`). The default implementation just passes the value unchanged. This method can be extended (using :func:`super`) in subclasses. .. note:: Although any subclass of :exc:`~exceptions.Exception` can be used to signalize a cleaning/validation error, if you want to specify a *public message*, use :exc:`n6sdk.exceptions.FieldValueError` with the `public_message` constructor keyword argument specified. """ assert isinstance(value, basestring) return value
[docs] def clean_result_value(self, value): """ The method called by *data specification*'s result cleaning methods. Args: `value`: A result item value (*not* necessarily a string; valid types depend on a particular implementation of the method). Returns: The value after necessary cleaning (adjustment/coercion/etc. and validation). Raises: Any instance/subclass of :exc:`~exceptions.Exception`. The default implementation just passes the value unchanged. This method can be extended (using :func:`super`) in subclasses. The method should always return a new object, **never** modifying the given value in-place. """ return value
# # non-public internals def _set_public_attrs(self, in_result=None, in_params=None, single_param=False, extra_params=None, custom_info=None, **per_instance_attrs): self.in_result = self._in_arg_checked(in_result) self.in_params = self._in_arg_checked(in_params) self.single_param = single_param self.extra_params = ( extra_params if extra_params is not None else {}) self.custom_info = ( custom_info if custom_info is not None else {}) self._set_per_instance_attrs(per_instance_attrs) def _in_arg_checked(self, arg): if arg not in (None, 'required', 'optional'): raise ValueError("{!r} is not one of: None, 'required', 'optional'" .format(arg)) return arg def _set_per_instance_attrs(self, per_instance_attrs): # per-instance customizations of attributes defined in the class body cls = self.__class__ for attr_name, obj in per_instance_attrs.iteritems(): if hasattr(cls, attr_name): setattr(self, attr_name, obj) else: raise TypeError( '{}.__init__() got an unexpected keyword argument {!r}' .format(cls.__name__, attr_name))
# # Concrete field specification classes
[docs]class DateTimeField(Field): """ For date-and-time (timestamp) values, automatically normalized to UTC. """
[docs] def clean_param_value(self, value): """ The input `value` should be a :class:`str`/:class:`unicode` string, *ISO-8601*-formatted. Returns: a :class:`datetime.datetime` object (a *naive* one, i.e. not aware of any timezone). """ value = super(DateTimeField, self).clean_param_value(value) return self._parse_datetime_string(value)
[docs] def clean_result_value(self, value): """ The input `value` should be a :class:`str`/:class:`unicode` string (*ISO-8601*-formatted) or a :class:`datetime.datetime` object (timezone-aware or *naive*). Returns: a :class:`datetime.datetime` object (a *naive* one, i.e. not aware of any timezone). """ value = super(DateTimeField, self).clean_result_value(value) if isinstance(value, datetime.datetime): return datetime_utc_normalize(value) if isinstance(value, basestring): return self._parse_datetime_string(value) raise TypeError( '{!r} is neither a str/unicode nor a ' 'datetime.datetime object'.format(value))
@staticmethod def _parse_datetime_string(value): try: return parse_iso_datetime_to_utc(value) except Exception: raise FieldValueError(public_message=( u'"{}" is not a valid date + ' u'time specification'.format(ascii_str(value))))
[docs]class UnicodeField(Field): """ For arbitrary text data. """ encoding = 'utf-8' decode_error_handling = 'strict' disallow_empty = False
[docs] def clean_param_value(self, value): value = super(UnicodeField, self).clean_param_value(value) value = self._fix_value(value) self._validate_value(value) return value
[docs] def clean_result_value(self, value): value = super(UnicodeField, self).clean_result_value(value) if not isinstance(value, basestring): raise TypeError('{!r} is not a str/unicode instance'.format(value)) value = self._fix_value(value) self._validate_value(value) return value
def _fix_value(self, value): if isinstance(value, str): try: value = value.decode(self.encoding, self.decode_error_handling) except UnicodeError: raise FieldValueError(public_message=( u'"{}" cannot be decoded with encoding "{}"'.format( ascii_str(value), self.encoding))) assert isinstance(value, unicode) return value def _validate_value(self, value): if self.disallow_empty and not value: raise FieldValueError(public_message=u'The value is empty')
[docs]class HexDigestField(UnicodeField): """ For hexadecimal digests (hashes), such as *MD5*, *SHA256* or any other. Uppercase letters (``A``-``F``) that values may contain are normalized to lowercase. The constructor-arguments-or-subclass-attributes: :attr:`num_of_characters` (the exact number of characters each hex digest consist of) and :attr:`hash_algo_descr` (the digest algorithm label, such as ``"MD5"`` or ``"SHA256"``) are obligatory. """ num_of_characters = None hash_algo_descr = None def __init__(self, **kwargs): super(HexDigestField, self).__init__(**kwargs) if self.num_of_characters is None: raise TypeError("'num_of_characters' not specified for {} " "(neither as a class attribute nor " "as a constructor argument)" .format(self.__class__.__name__)) if self.hash_algo_descr is None: raise TypeError("'hash_algo_descr' not specified for {} " "(neither as a class attribute nor " "as a constructor argument)" .format(self.__class__.__name__)) if getattr(self, 'max_length', None) is None: self.max_length = self.num_of_characters def _fix_value(self, value): value = super(HexDigestField, self)._fix_value(value) return value.lower() def _validate_value(self, value): super(HexDigestField, self)._validate_value(value) try: value.decode('hex') if len(value) != self.num_of_characters: raise ValueError except (TypeError, ValueError): raise FieldValueError(public_message=( u'"{}" is not a valid {} hash'.format( ascii_str(value), self.hash_algo_descr)))
[docs]class MD5Field(HexDigestField): """ For hexadecimal MD5 digests (hashes). """ num_of_characters = 32 hash_algo_descr = 'MD5'
[docs]class SHA1Field(HexDigestField): """ For hexadecimal SHA-1 digests (hashes). """ num_of_characters = 40 hash_algo_descr = 'SHA1'
[docs]class UnicodeEnumField(UnicodeField): """ For text data limited to a finite set of possible values. The constructor-argument-or-subclass-attribute :attr:`enum_values` (a sequence or set of strings) is obligatory. """ enum_values = None def __init__(self, **kwargs): super(UnicodeEnumField, self).__init__(**kwargs) if self.enum_values is None: raise TypeError("'enum_values' not specified for {} " "(neither as a class attribute nor " "as a constructor argument)" .format(self.__class__.__name__)) self.enum_values = tuple(as_unicode(v) for v in self.enum_values) def _validate_value(self, value): super(UnicodeEnumField, self)._validate_value(value) if value not in self.enum_values: raise FieldValueError(public_message=( u'"{}" is not one of: {}'.format( ascii_str(value), u', '.join(u'"{}"'.format(v) for v in self.enum_values))))
[docs]class UnicodeLimitedField(UnicodeField): """ For text data with limited length. The constructor-argument-or-subclass-attribute :attr:`max_length` (an integer number greater or equal to 1) is obligatory. """ max_length = None #: **Experimental attribute** #: (can be removed in future versions, #: so do not rely on it, please). checking_bytes_length = False def __init__(self, **kwargs): super(UnicodeLimitedField, self).__init__(**kwargs) if self.max_length is None: raise TypeError("'max_length' not specified for {} " "(neither as a class attribute nor " "as a constructor argument)" .format(self.__class__.__name__)) if self.max_length < 1: raise ValueError("'max_length' specified for {} should " "not be lesser than 1 ({} given)" .format(self.__class__.__name__, ascii_str(self.max_length))) def _validate_value(self, value): super(UnicodeLimitedField, self)._validate_value(value) if self.checking_bytes_length: value = value.encode(self.encoding) if len(value) > self.max_length: raise FieldValueTooLongError( field=self, checked_value=value, max_length=self.max_length, public_message=( u'Length of "{}" is greater than {}'.format( ascii_str(value), self.max_length)))
[docs]class UnicodeRegexField(UnicodeField): """ For text data limited by the specified regular expression. The constructor-argument-or-subclass-attribute :attr:`regex` (a regular expression specified as a string or a compiled regular expression object) is obligatory. """ regex = None error_msg_template = u'"{}" is not a valid value' def __init__(self, **kwargs): super(UnicodeRegexField, self).__init__(**kwargs) if self.regex is None: raise TypeError("'regex' not specified for {} " "(neither as a class attribute " "nor as a constructor argument)" .format(self.__class__.__name__)) if isinstance(self.regex, basestring): self.regex = re.compile(self.regex) def _validate_value(self, value): super(UnicodeRegexField, self)._validate_value(value) if self.regex.search(value) is None: raise FieldValueError(public_message=( self.error_msg_template.format(ascii_str(value))))
[docs]class SourceField(UnicodeLimitedField, UnicodeRegexField): """ For dot-separated source specifications, such as ``my-org.type``. """ regex = SOURCE_REGEX error_msg_template = '"{}" is not a valid source specification' max_length = 32
[docs]class IPv4Field(UnicodeLimitedField, UnicodeRegexField): """ For IPv4 addresses, such as ``127.234.5.17``. (Using decimal dotted-quad notation.) """ regex = IPv4_STRICT_DECIMAL_REGEX error_msg_template = '"{}" is not a valid IPv4 address' max_length = 15 # <- formally redundant but may improve introspection
[docs]class IPv6Field(UnicodeField): """ For IPv6 addresses, such as ``2001:0db8:85a3:0000:0000:8a2e:0370:7334``. Note that: * when cleaning a parameter value -- the address is normalized to an "exploded" form, such as ``u'2001:0db8:85a3:0000:0000:8a2e:0370:7334'``; * when cleaning a result value -- the address is normalized to a "compressed" form, such as ``u'2001:db8:85a3::8a2e:370:7334'``. """ error_msg_template = '"{}" is not a valid IPv6 address' max_length = 39 # <- not used at all but may improve introspection
[docs] def clean_param_value(self, value): ipv6_obj = super(IPv6Field, self).clean_param_value(value) return unicode(ipv6_obj.exploded)
[docs] def clean_result_value(self, value): ipv6_obj = super(IPv6Field, self).clean_result_value(value) return unicode(ipv6_obj.compressed)
def _fix_value(self, value): value = super(IPv6Field, self)._fix_value(value) try: ipv6_obj = ipaddr.IPv6Address(value) except Exception: raise FieldValueError(public_message=( self.error_msg_template.format(ascii_str(value)))) return ipv6_obj
[docs]class AnonymizedIPv4Field(UnicodeLimitedField, UnicodeRegexField): """ For anonymized IPv4 addresses, such as ``x.x.5.17``. (Using decimal dotted-quad notation, with the leftmost octet -- and possibly any other octets -- replaced with "x".) """ regex = IPv4_ANONYMIZED_REGEX error_msg_template = '"{}" is not a valid anonymized IPv4 address' max_length = 13 # <- formally redundant but may improve introspection def _fix_value(self, value): value = super(AnonymizedIPv4Field, self)._fix_value(value) return value.lower()
[docs]class IPv4NetField(UnicodeLimitedField, UnicodeRegexField): """ For IPv4 network specifications (CIDR), such as ``127.234.5.0/24``. Note that: * when cleaning a parameter value -- an (<address part as unicode string>, <net as int>) tuple is returned; * when cleaning a result value -- a unicode string is returned. """ regex = IPv4_CIDR_NETWORK_REGEX error_msg_template = ('"{}" is not a valid CIDR ' 'IPv4 network specification') max_length = 18 # <- formally redundant but may improve introspection
[docs] def clean_param_value(self, value): value = super(IPv4NetField, self).clean_param_value(value) ip, net = ip_network_as_tuple(value) assert isinstance(ip, unicode) and IPv4_STRICT_DECIMAL_REGEX.search(ip) assert isinstance(net, int) and 0 <= net <= 32 # returning a tuple: ip is a unicode string, net is an int number return ip, net
[docs] def clean_result_value(self, value): if not isinstance(value, basestring): try: ip, net = value value = '{}/{}'.format(ip, net) except (ValueError, TypeError): raise FieldValueError(public_message=( self.error_msg_template.format(ascii_str(value)))) # returning a unicode string return super(IPv4NetField, self).clean_result_value(value)
[docs]class IPv6NetField(UnicodeField): """ For IPv6 network specifications (CIDR), such as ``2001:0db8:85a3:0000:0000:8a2e:0370:7334/32``. Note that: * when cleaning a parameter value -- * an (<address part as unicode string>, <net as int>) tuple is returned; * the address part is normalized to an "exploded" form, such as ``u'2001:0db8:85a3:0000:0000:8a2e:0370:7334'``; * when cleaning a result value -- * a unicode string is returned; * the address part is normalized to a "compressed" form, such as ``u'2001:db8:85a3::8a2e:370:7334'``. """ error_msg_template = ('"{}" is not a valid CIDR ' 'IPv6 network specification') max_length = 43 # <- not used at all but may improve introspection
[docs] def clean_param_value(self, value): ipv6_network_obj = super(IPv6NetField, self).clean_param_value(value) ipv6 = unicode(ipv6_network_obj.ip.exploded) net = ipv6_network_obj.prefixlen assert isinstance(ipv6, unicode) assert isinstance(net, int) and 0 <= net <= 128 # returning a tuple: ipv6 is a unicode string, net is an int number return ipv6, net
[docs] def clean_result_value(self, value): if not isinstance(value, basestring): try: ip, net = value value = '{}/{}'.format(ip, net) except (ValueError, TypeError): raise FieldValueError(public_message=( self.error_msg_template.format(ascii_str(value)))) ipv6_network_obj = super(IPv6NetField, self).clean_result_value(value) # returning a unicode string return unicode(ipv6_network_obj.compressed)
def _fix_value(self, value): value = super(IPv6NetField, self)._fix_value(value) try: if '/' not in value: raise ValueError ipv6_network_obj = ipaddr.IPv6Network(value) except Exception: raise FieldValueError(public_message=( self.error_msg_template.format(ascii_str(value)))) return ipv6_network_obj
[docs]class CCField(UnicodeLimitedField, UnicodeRegexField): """ For 2-letter country codes, such as ``FR`` or ``UA``. """ regex = CC_SIMPLE_REGEX error_msg_template = '"{}" is not a valid 2-character country code' max_length = 2 # <- formally redundant but may improve introspection def _fix_value(self, value): value = super(CCField, self)._fix_value(value) return value.upper()
[docs]class URLSubstringField(UnicodeLimitedField): """ For substrings of URLs (such as ``xample.com/path?que``). """ max_length = 2048 decode_error_handling = 'surrogateescape'
[docs]class URLField(URLSubstringField): """ For URLs (such as ``http://xyz.example.com/path?query=foo#fr``). """
[docs]class DomainNameSubstringField(UnicodeLimitedField): """ For substrings of domain names, automatically IDNA-encoded and lower-cased. """ max_length = 255 def _fix_value(self, value): value = super(DomainNameSubstringField, self)._fix_value(value) try: ascii_value = value.encode('idna') except ValueError: raise FieldValueError(public_message=( u'"{}" could not be encoded using the ' u'IDNA encoding'.format(ascii_str(value)))) return unicode(ascii_value.lower())
[docs]class DomainNameField(DomainNameSubstringField, UnicodeRegexField): """ For domain names, automatically IDNA-encoded and lower-cased. """ regex = DOMAIN_ASCII_LOWERCASE_REGEX error_msg_template = '"{}" is not a valid domain name'
[docs]class EmailSimplifiedField(UnicodeLimitedField, UnicodeRegexField): """ For e-mail addresses. (Note: values are *not* normalized in any way, especially the domain part is *not* IDNA-encoded or lower-cased.) """ max_length = 254 regex = EMAIL_SIMPLIFIED_REGEX error_msg_template = '"{}" is not a valid e-mail address'
[docs]class IBANSimplifiedField(UnicodeLimitedField, UnicodeRegexField): """ For International Bank Account Numbers. """ regex = IBAN_REGEX error_msg_template = '"{}" is not a valid IBAN' max_length = 34 # <- formally redundant but may improve introspection def _fix_value(self, value): value = super(IBANSimplifiedField, self)._fix_value(value) return value.upper()
[docs]class IntegerField(Field): """ For integer numbers (optionally with min./max. limits defined). """ min_value = None max_value = None error_msg_template = None
[docs] def clean_param_value(self, value): value = super(IntegerField, self).clean_param_value(value) return self._clean_value(value)
[docs] def clean_result_value(self, value): value = super(IntegerField, self).clean_result_value(value) return self._clean_value(value)
def _clean_value(self, value): try: value = self._coerce_value(value) self._check_range(value) except FieldValueError: if self.error_msg_template is None: raise raise FieldValueError(public_message=( self.error_msg_template.format(ascii_str(value)))) return value def _coerce_value(self, value): try: coerced_value = self._do_coerce(value) # e.g. float is OK *only* if it is an integer number (such as 42.0) if not isinstance(value, basestring) and coerced_value != value: raise ValueError except (TypeError, ValueError): raise FieldValueError(public_message=( u'"{}" cannot be interpreted as an ' u'integer number'.format(ascii_str(value)))) assert isinstance(coerced_value, (int, long)) # long if > sys.maxint return coerced_value def _do_coerce(self, value): return int(value) def _check_range(self, value): assert isinstance(value, (int, long)) if self.min_value is not None and value < self.min_value: raise FieldValueError(public_message=( u'{} is lesser than {}'.format(value, self.min_value))) if self.max_value is not None and value > self.max_value: raise FieldValueError(public_message=( u'{} is greater than {}'.format(value, self.max_value)))
[docs]class ASNField(IntegerField): """ For AS numbers, such as ``12345``, ``123456789`` or ``12345.65432``. """ min_value = 0 max_value = 2 ** 32 - 1 error_msg_template = '"{}" is not a valid Autonomous System Number' def _do_coerce(self, value): # supporting also the '<16-bit number>.<16-bitnumber>' ASN notation if isinstance(value, basestring): if '.' in value: high, low = map(int, value.split('.')) if not (0 <= low <= 65535): # (high is checked later) raise ValueError return (high << 16) + low else: return int(value) elif isinstance(value, (int, long)): return int(value) else: # not accepting e.g. floats, to avoid the '42.0'/42.0-confusion # ('42.0' gives 42 * 2**16 but 42.0 would give 42 if were accepted) raise TypeError
[docs]class PortField(IntegerField): """ For TCP/UDP port numbers, such as ``12345``. """ min_value = 0 max_value = 2 ** 16 - 1 error_msg_template = '"{}" is not a valid port number'
[docs]class ResultListFieldMixin(Field): """ A mix-in class for fields whose result values are supposed to be a *sequence of values* and not single values. Its :meth:`clean_result_value` checks that its argument is a *non-string sequence* (:class:`list` or :class:`tuple`, or any other :class:`collections.Sequence` not being :class:`str` or :class:`unicode`) and performs result cleaning (as defined in a superclass) for *each item* of it. See: :class:`AddressField` below. """ allow_empty = False
[docs] def clean_result_value(self, value): if isinstance(value, basestring) or ( not isinstance(value, collections.Sequence)): raise TypeError('{!r} is not a non-string sequence'.format(value)) if not self.allow_empty and not value: raise ValueError('empty sequence given') do_clean = super(ResultListFieldMixin, self).clean_result_value return self._clean_result_list(value, do_clean)
def _clean_result_list(self, value, do_clean): checked_value_list = [] too_long = False for v in value: try: v = do_clean(v) except FieldValueTooLongError as exc: if exc.field is not self: raise too_long = True assert hasattr(self, 'max_length') assert exc.max_length == self.max_length v = exc.checked_value checked_value_list.append(v) if too_long: raise FieldValueTooLongError( field=self, checked_value=checked_value_list, max_length=self.max_length, public_message=( u'Length of at least one item of ' u'list {} is greater than {}'.format( ascii_str(value), self.max_length))) return checked_value_list
[docs]class DictResultField(Field): """ A base class for fields whose result values are supposed to be dictionaries. The constructor-argument-or-subclass-attribute :attr:`key_to_subfield_factory` can be: * specified as a dictionary that maps *subfield names* to *factories* (typically, Field subclasses) -- then result dictionaries are constrained and cleaned in the following way: * each *key* must be one of the *subfield names*, * each *value* is cleaned with :meth:`clean_result_value` of the field object produced by the corresponding *factory*; * left as :obj:`None` -- then there are no constraints about structure and content of result dictionaries. """ key_to_subfield_factory = None def __init__(self, **kwargs): super(DictResultField, self).__init__(**kwargs) if self.key_to_subfield_factory is None: self.key_to_subfield = None else: self.key_to_subfield = { key.decode('ascii'): factory() for key, factory in self.key_to_subfield_factory.iteritems()}
[docs] def clean_param_value(self, value): """Always raises :exc:`~exceptions.TypeError`.""" raise TypeError("it's a result-only field")
[docs] def clean_result_value(self, value): value = super(DictResultField, self).clean_result_value(value) if not isinstance(value, collections.Mapping): raise TypeError('{!r} is not a mapping'.format(value)) keys = frozenset(value) illegal_keys_repr = self._get_illegal_keys_repr(keys) if illegal_keys_repr: raise ValueError( '{!r} contains illegal keys ({!r})'.format( value, illegal_keys_repr)) missing_keys_repr = self._get_missing_keys_repr(keys) if missing_keys_repr: raise ValueError( '{!r} does not contain required keys ({!r})'.format( value, missing_keys_repr)) if self.key_to_subfield is None: return { k.decode('ascii'): v for k, v in value.iteritems()} return { k.decode('ascii'): self.key_to_subfield[k].clean_result_value(v) for k, v in value.iteritems()}
def _get_illegal_keys_repr(self, keys): if self.key_to_subfield is not None: illegal_keys = keys - self.key_to_subfield.viewkeys() return ', '.join(sorted(illegal_keys)) return '' def _get_missing_keys_repr(self, keys): return ''
[docs]class ListOfDictsField(ResultListFieldMixin, DictResultField): """ For lists of dictionaries containing arbitrary values. """
[docs]class AddressField(ListOfDictsField): """ For lists of dictionaries -- each containing ``"ip"`` and optionally ``"cc"`` and/or ``"asn"``. """ key_to_subfield_factory = { u'ip': IPv4Field, u'cc': CCField, u'asn': ASNField, } def _get_missing_keys_repr(self, keys): if 'ip' not in keys: return 'ip' return ''
[docs]class DirField(UnicodeEnumField): """ For ``dir`` values in items cleaned by of :class:`ExtendedAddressField` instances (``dir`` marks role of the address in terms of the direction of the network flow in layers 3 or 4). """ enum_values = ('src', 'dst')
[docs]class ExtendedAddressField(ListOfDictsField): """ For lists of dictionaries -- each containing either ``"ip"`` or ``"ipv6"`` (but not both), and optionally all or some of: ``"cc"``, ``"asn"``, ``"dir"``, ``"rdns"``. """ key_to_subfield_factory = { u'ip': IPv4Field, u'ipv6': IPv6Field, u'cc': CCField, u'asn': ASNField, u'dir': DirField, u'rdns': DomainNameField, } def _get_illegal_keys_repr(self, keys): illegal_keys_repr = super(ExtendedAddressField, self)._get_illegal_keys_repr(keys) if 'ip' in keys and 'ipv6' in keys: if illegal_keys_repr: illegal_keys_repr += '; ' illegal_keys_repr += ( 'ip / ipv6 [only one of these two should be specified]') return illegal_keys_repr def _get_missing_keys_repr(self, keys): if 'ip' not in keys and 'ipv6' not in keys: return 'ip / ipv6 [one of these two should be specified]' return ''