Source code for O365.utils.utils

import datetime as dt
import logging
from collections import OrderedDict
from enum import Enum

import pytz
from dateutil.parser import parse
from stringcase import snakecase

from .windows_tz import get_iana_tz, get_windows_tz
from .decorators import fluent

ME_RESOURCE = 'me'
USERS_RESOURCE = 'users'
GROUPS_RESOURCE = 'groups'
SITES_RESOURCE = 'sites'

NEXT_LINK_KEYWORD = '@odata.nextLink'

log = logging.getLogger(__name__)

MAX_RECIPIENTS_PER_MESSAGE = 500  # Actual limit on Office 365


[docs]class CaseEnum(Enum): """ A Enum that converts the value to a snake_case casing """ def __new__(cls, value): obj = object.__new__(cls) obj._value_ = snakecase(value) # value will be transformed to snake_case return obj
[docs] @classmethod def from_value(cls, value): """ Gets a member by a snaked-case provided value""" try: return cls(snakecase(value)) except ValueError: return None
[docs]class ImportanceLevel(CaseEnum): Normal = 'normal' Low = 'low' High = 'high'
[docs]class OutlookWellKnowFolderNames(Enum): INBOX = 'Inbox' JUNK = 'JunkEmail' DELETED = 'DeletedItems' DRAFTS = 'Drafts' SENT = 'SentItems' OUTBOX = 'Outbox' ARCHIVE = 'Archive'
[docs]class OneDriveWellKnowFolderNames(Enum): DOCUMENTS = 'documents' PHOTOS = 'photos' CAMERA_ROLL = 'cameraroll' APP_ROOT = 'approot' MUSIC = 'music' ATTACHMENTS = 'attachments'
[docs]class ChainOperator(Enum): AND = 'and' OR = 'or'
[docs]class TrackerSet(set):
[docs] def __init__(self, *args, casing=None, **kwargs): """ A Custom Set that changes the casing of it's keys :param func casing: a function to convert into specified case """ self.cc = casing super().__init__(*args, **kwargs)
[docs] def add(self, value): value = self.cc(value) super().add(value)
[docs] def remove(self, value): value = self.cc(value) super().remove(value)
[docs]class Recipient: """ A single Recipient """
[docs] def __init__(self, address=None, name=None, parent=None, field=None): """ Create a recipient with provided information :param str address: email address of the recipient :param str name: name of the recipient :param HandleRecipientsMixin parent: parent recipients handler :param str field: name of the field to update back """ self._address = address or '' self._name = name or '' self._parent = parent self._field = field
def __bool__(self): return bool(self.address) def __str__(self): return self.__repr__() def __repr__(self): if self.name: return '{} ({})'.format(self.name, self.address) else: return self.address # noinspection PyProtectedMember def _track_changes(self): """ Update the track_changes on the parent to reflect a needed update on this field """ if self._field and getattr(self._parent, '_track_changes', None) is not None: self._parent._track_changes.add(self._field) @property def address(self): """ Email address of the recipient :getter: Get the email address :setter: Set and update the email address :type: str """ return self._address @address.setter def address(self, value): self._address = value self._track_changes() @property def name(self): """ Name of the recipient :getter: Get the name :setter: Set and update the name :type: str """ return self._name @name.setter def name(self, value): self._name = value self._track_changes()
[docs]class Recipients: """ A Sequence of Recipients """
[docs] def __init__(self, recipients=None, parent=None, field=None): """ Recipients must be a list of either address strings or tuples (name, address) or dictionary elements :param recipients: list of either address strings or tuples (name, address) or dictionary elements :type recipients: list[str] or list[tuple] or list[dict] or list[Recipient] :param HandleRecipientsMixin parent: parent recipients handler :param str field: name of the field to update back """ self._parent = parent self._field = field self._recipients = [] self.untrack = True if recipients: self.add(recipients) self.untrack = False
def __iter__(self): return iter(self._recipients) def __getitem__(self, key): return self._recipients[key] def __contains__(self, item): return item in {recipient.address for recipient in self._recipients} def __bool__(self): return bool(len(self._recipients)) def __len__(self): return len(self._recipients) def __str__(self): return self.__repr__() def __repr__(self): return 'Recipients count: {}'.format(len(self._recipients)) # noinspection PyProtectedMember def _track_changes(self): """ Update the track_changes on the parent to reflect a needed update on this field """ if self._field and getattr(self._parent, '_track_changes', None) is not None and self.untrack is False: self._parent._track_changes.add(self._field)
[docs] def clear(self): """ Clear the list of recipients """ self._recipients = [] self._track_changes()
[docs] def add(self, recipients): """ Add the supplied recipients to the exiting list :param recipients: list of either address strings or tuples (name, address) or dictionary elements :type recipients: list[str] or list[tuple] or list[dict] """ if recipients: if isinstance(recipients, str): self._recipients.append( Recipient(address=recipients, parent=self._parent, field=self._field)) elif isinstance(recipients, Recipient): self._recipients.append(recipients) elif isinstance(recipients, tuple): name, address = recipients if address: self._recipients.append( Recipient(address=address, name=name, parent=self._parent, field=self._field)) elif isinstance(recipients, list): for recipient in recipients: self.add(recipient) else: raise ValueError('Recipients must be an address string, a ' 'Recipient instance, a (name, address) ' 'tuple or a list') self._track_changes()
[docs] def remove(self, address): """ Remove an address or multiple addresses :param address: list of addresses to remove :type address: str or list[str] """ recipients = [] if isinstance(address, str): address = {address} # set elif isinstance(address, (list, tuple)): address = set(address) for recipient in self._recipients: if recipient.address not in address: recipients.append(recipient) if len(recipients) != len(self._recipients): self._track_changes() self._recipients = recipients
[docs] def get_first_recipient_with_address(self): """ Returns the first recipient found with a non blank address :return: First Recipient :rtype: Recipient """ recipients_with_address = [recipient for recipient in self._recipients if recipient.address] if recipients_with_address: return recipients_with_address[0] else: return None
[docs]class HandleRecipientsMixin: def _recipients_from_cloud(self, recipients, field=None): """ Transform a recipient from cloud data to object data """ recipients_data = [] for recipient in recipients: recipients_data.append( self._recipient_from_cloud(recipient, field=field)) return Recipients(recipients_data, parent=self, field=field) def _recipient_from_cloud(self, recipient, field=None): """ Transform a recipient from cloud data to object data """ if recipient: recipient = recipient.get(self._cc('emailAddress'), recipient if isinstance(recipient, dict) else {}) address = recipient.get(self._cc('address'), '') name = recipient.get(self._cc('name'), '') return Recipient(address=address, name=name, parent=self, field=field) else: return Recipient() def _recipient_to_cloud(self, recipient): """ Transforms a Recipient object to a cloud dict """ data = None if recipient: data = {self._cc('emailAddress'): { self._cc('address'): recipient.address}} if recipient.name: data[self._cc('emailAddress')][ self._cc('name')] = recipient.name return data
[docs]class ApiComponent: """ Base class for all object interactions with the Cloud Service API Exposes common access methods to the api protocol within all Api objects """ _cloud_data_key = '__cloud_data__' # wraps cloud data with this dict key _endpoints = {} # dict of all API service endpoints needed
[docs] def __init__(self, *, protocol=None, main_resource=None, **kwargs): """ Object initialization :param Protocol protocol: A protocol class or instance to be used with this connection :param str main_resource: main_resource to be used in these API communications """ self.protocol = protocol() if isinstance(protocol, type) else protocol if self.protocol is None: raise ValueError('Protocol not provided to Api Component') mr, bu = self.build_base_url(main_resource) self.main_resource = mr self._base_url = bu super().__init__()
def __str__(self): return self.__repr__() def __repr__(self): return 'Api Component on resource: {}'.format(self.main_resource) @staticmethod def _parse_resource(resource): """ Parses and completes resource information """ resource = resource.strip() if resource else resource if resource in {ME_RESOURCE, USERS_RESOURCE, GROUPS_RESOURCE, SITES_RESOURCE}: return resource elif resource.startswith('user:'): # user resource shorthand resource = resource.replace('user:', '', 1) return '{}/{}'.format(USERS_RESOURCE, resource) elif '@' in resource and not resource.startswith(USERS_RESOURCE): # user resource backup # when for example accessing a shared mailbox the # resource is set to the email address. we have to prefix # the email with the resource 'users/' so --> 'users/email_address' return '{}/{}'.format(USERS_RESOURCE, resource) elif resource.startswith('group:'): # group resource shorthand resource = resource.replace('group:', '', 1) return '{}/{}'.format(GROUPS_RESOURCE, resource) elif resource.startswith('site:'): # sharepoint site resource shorthand resource = resource.replace('site:', '', 1) return '{}/{}'.format(SITES_RESOURCE, resource) else: return resource
[docs] def build_base_url(self, resource): """ Builds the base url of this ApiComponent :param str resource: the resource to build the base url """ main_resource = self._parse_resource(resource if resource is not None else self.protocol.default_resource) # noinspection PyUnresolvedReferences base_url = '{}{}'.format(self.protocol.service_url, main_resource) if base_url.endswith('/'): # when self.main_resource is empty then remove the last slash. base_url = base_url[:-1] return main_resource, base_url
[docs] def set_base_url(self, resource): """ Sets the base urls for this ApiComponent :param str resource: the resource to build the base url """ self.main_resource, self._base_url = self.build_base_url(resource)
[docs] def build_url(self, endpoint): """ Returns a url for a given endpoint using the protocol service url :param str endpoint: endpoint to build the url for :return: final url :rtype: str """ return '{}{}'.format(self._base_url, endpoint)
def _gk(self, keyword): """ Alias for protocol.get_service_keyword """ return self.protocol.get_service_keyword(keyword) def _cc(self, dict_key): """ Alias for protocol.convert_case """ return self.protocol.convert_case(dict_key) def _parse_date_time_time_zone(self, date_time_time_zone): """ Parses and convert to protocol timezone a dateTimeTimeZone resource This resource is a dict with a date time and a windows timezone This is a common structure on Microsoft apis so it's included here. """ if date_time_time_zone is None: return None local_tz = self.protocol.timezone if isinstance(date_time_time_zone, dict): try: timezone = pytz.timezone( get_iana_tz(date_time_time_zone.get(self._cc('timeZone'), 'UTC'))) except pytz.UnknownTimeZoneError: timezone = local_tz date_time = date_time_time_zone.get(self._cc('dateTime'), None) try: date_time = timezone.localize(parse(date_time)) if date_time else None except OverflowError as e: log.debug('Could not parse dateTimeTimeZone: {}. Error: {}'.format(date_time_time_zone, str(e))) date_time = None if date_time and timezone != local_tz: date_time = date_time.astimezone(local_tz) else: # Outlook v1.0 api compatibility (fallback to datetime string) try: date_time = local_tz.localize(parse(date_time_time_zone)) if date_time_time_zone else None except Exception as e: log.debug('Could not parse dateTimeTimeZone: {}. Error: {}'.format(date_time_time_zone, str(e))) date_time = None return date_time def _build_date_time_time_zone(self, date_time): """ Converts a datetime to a dateTimeTimeZone resource """ timezone = date_time.tzinfo.zone if date_time.tzinfo is not None else None return { self._cc('dateTime'): date_time.strftime('%Y-%m-%dT%H:%M:%S'), self._cc('timeZone'): get_windows_tz(timezone or self.protocol.timezone) }
[docs] def new_query(self, attribute=None): """ Create a new query to filter results :param str attribute: attribute to apply the query for :return: new Query :rtype: Query """ return Query(attribute=attribute, protocol=self.protocol)
q = new_query # alias for new query
[docs]class Query: """ Helper to conform OData filters """ _mapping = { 'from': 'from/emailAddress/address', 'to': 'toRecipients/emailAddress/address', 'start': 'start/DateTime', 'end': 'end/DateTime', 'flag': 'flag/flagStatus' }
[docs] def __init__(self, attribute=None, *, protocol): """ Build a query to apply OData filters https://docs.microsoft.com/en-us/graph/query-parameters :param str attribute: attribute to apply the query for :param Protocol protocol: protocol to use for connecting """ self.protocol = protocol() if isinstance(protocol, type) else protocol self._attribute = None self._chain = None self.new(attribute) self._negation = False self._filters = [] # store all the filters self._order_by = OrderedDict() self._selects = set() self._expands = set() self._search = None self._open_group_flag = [] # stores if the next attribute must be grouped self._close_group_flag = [] # stores if the last attribute must be closing a group
def __str__(self): return 'Filter: {}\nOrder: {}\nSelect: {}\nExpand: {}\nSearch: {}'.format(self.get_filters(), self.get_order(), self.get_selects(), self.get_expands(), self._search) def __repr__(self): return self.__str__()
[docs] @fluent def select(self, *attributes): """ Adds the attribute to the $select parameter :param str attributes: the attributes tuple to select. If empty, the on_attribute previously set is added. :rtype: Query """ if attributes: for attribute in attributes: attribute = self.protocol.convert_case( attribute) if attribute and isinstance(attribute, str) else None if attribute: if '/' in attribute: # only parent attribute can be selected attribute = attribute.split('/')[0] self._selects.add(attribute) else: if self._attribute: self._selects.add(self._attribute) return self
[docs] @fluent def expand(self, *relationships): """ Adds the relationships (e.g. "event" or "attachments") that should be expanded with the $expand parameter Important: The ApiComponent using this should know how to handle this relationships. eg: Message knows how to handle attachments, and event (if it's an EventMessage). Important: When using expand on multi-value relationships a max of 20 items will be returned. :param str relationships: the relationships tuple to expand. :rtype: Query """ for relationship in relationships: if relationship == 'event': relationship = '{}/event'.format(self.protocol.get_service_keyword('event_message_type')) self._expands.add(relationship) return self
[docs] @fluent def search(self, text): """ Perform a search. Not from graph docs: You can currently search only message and person collections. A $search request returns up to 250 results. You cannot use $filter or $orderby in a search request. :param str text: the text to search :return: the Query instance """ if text is None: self._search = None else: # filters an order are not allowed self.clear_filters() self.clear_order() self._search = '"{}"'.format(text) return self
[docs] def as_params(self): """ Returns the filters, orders, select, expands and search as query parameters :rtype: dict """ params = {} if self.has_filters: params['$filter'] = self.get_filters() if self.has_order: params['$orderby'] = self.get_order() if self.has_expands and not self.has_selects: params['$expand'] = self.get_expands() if self.has_selects and not self.has_expands: params['$select'] = self.get_selects() if self.has_expands and self.has_selects: params['$expand'] = '{}($select={})'.format(self.get_expands(), self.get_selects()) if self._search: params['$search'] = self._search params.pop('$filter', None) params.pop('$orderby', None) return params
@property def has_filters(self): """ Whether the query has filters or not :rtype: bool """ return bool(self._filters) @property def has_order(self): """ Whether the query has order_by or not :rtype: bool """ return bool(self._order_by) @property def has_selects(self): """ Whether the query has select filters or not :rtype: bool """ return bool(self._selects) @property def has_expands(self): """ Whether the query has relationships that should be expanded or not :rtype: bool """ return bool(self._expands)
[docs] def get_filters(self): """ Returns the result filters :rtype: str or None """ if self._filters: filters_list = self._filters if isinstance(filters_list[-1], Enum): filters_list = filters_list[:-1] filters = ' '.join( [fs.value if isinstance(fs, Enum) else fs[1] for fs in filters_list] ).strip() # closing opened groups automatically open_groups = len([x for x in self._open_group_flag if x is False]) for i in range(open_groups - len(self._close_group_flag)): filters += ')' return filters else: return None
[docs] def get_order(self): """ Returns the result order by clauses :rtype: str or None """ # first get the filtered attributes in order as they must appear # in the order_by first if not self.has_order: return None return ','.join(['{} {}'.format(attribute, direction or '').strip() for attribute, direction in self._order_by.items()])
[docs] def get_selects(self): """ Returns the result select clause :rtype: str or None """ if self._selects: return ','.join(self._selects) else: return None
[docs] def get_expands(self): """ Returns the result expand clause :rtype: str or None """ if self._expands: return ','.join(self._expands) else: return None
def _get_mapping(self, attribute): if attribute: mapping = self._mapping.get(attribute) if mapping: attribute = '/'.join( [self.protocol.convert_case(step) for step in mapping.split('/')]) else: attribute = self.protocol.convert_case(attribute) return attribute return None
[docs] @fluent def new(self, attribute, operation=ChainOperator.AND): """ Combine with a new query :param str attribute: attribute of new query :param ChainOperator operation: operation to combine to new query :rtype: Query """ if isinstance(operation, str): operation = ChainOperator(operation) self._chain = operation self._attribute = self._get_mapping(attribute) if attribute else None self._negation = False return self
[docs] def clear_filters(self): """ Clear filters """ self._filters = []
[docs] def clear_order(self): """ Clears any order commands """ self._order_by = OrderedDict()
[docs] @fluent def clear(self): """ Clear everything :rtype: Query """ self._filters = [] self._order_by = OrderedDict() self._selects = set() self._negation = False self._attribute = None self._chain = None self._search = None self._open_group_flag = [] self._close_group_flag = [] return self
[docs] @fluent def negate(self): """ Apply a not operator :rtype: Query """ self._negation = not self._negation return self
[docs] @fluent def chain(self, operation=ChainOperator.AND): """ Start a chain operation :param ChainOperator, str operation: how to combine with a new one :rtype: Query """ if isinstance(operation, str): operation = ChainOperator(operation) self._chain = operation return self
[docs] @fluent def on_attribute(self, attribute): """ Apply query on attribute, to be used along with chain() :param str attribute: attribute name :rtype: Query """ self._attribute = self._get_mapping(attribute) return self
[docs] @fluent def on_list_field(self, field): """ Apply query on a list field, to be used along with chain() :param str field: field name (note: name is case sensitive) :rtype: Query """ self._attribute = 'fields/' + field return self
[docs] def remove_filter(self, filter_attr): """ Removes a filter given the attribute name """ filter_attr = self._get_mapping(filter_attr) new_filters = [] remove_chain = False for flt in self._filters: if isinstance(flt, list): if flt[0] == filter_attr: remove_chain = True else: new_filters.append(flt) else: # this is a ChainOperator if remove_chain is False: new_filters.append(flt) else: remove_chain = False self._filters = new_filters
def _add_filter(self, *filter_data): if self._attribute: if self._filters and not isinstance(self._filters[-1], ChainOperator): self._filters.append(self._chain) sentence, attrs = filter_data for i, group in enumerate(self._open_group_flag): if group is True: # Open a group sentence = '(' + sentence self._open_group_flag[i] = False # set to done self._filters.append([self._attribute, sentence, attrs]) else: raise ValueError( 'Attribute property needed. call on_attribute(attribute) ' 'or new(attribute)') def _parse_filter_word(self, word): """ Converts the word parameter into the correct format """ if isinstance(word, str): word = "'{}'".format(word) elif isinstance(word, dt.date): if isinstance(word, dt.datetime): if word.tzinfo is None: # if it's a naive datetime, localize the datetime. word = self.protocol.timezone.localize( word) # localize datetime into local tz if word.tzinfo != pytz.utc: word = word.astimezone( pytz.utc) # transform local datetime to utc if '/' in self._attribute: # TODO: this is a fix for the case when the parameter # filtered is a string instead a dateTimeOffset # but checking the '/' is not correct, but it will # differentiate for now the case on events: # start/dateTime (date is a string here) from # the case on other dates such as # receivedDateTime (date is a dateTimeOffset) word = "'{}'".format( word.isoformat()) # convert datetime to isoformat. else: word = "{}".format( word.isoformat()) # convert datetime to isoformat elif isinstance(word, bool): word = str(word).lower() elif word is None: word = 'null' return word @staticmethod def _prepare_sentence(attribute, operation, word, negation=False): negation = 'not' if negation else '' attrs = (negation, attribute, operation, word) sentence = '{} {} {} {}'.format(negation, attribute, operation, word).strip() return sentence, attrs
[docs] @fluent def logical_operator(self, operation, word): """ Apply a logical operator :param str operation: how to combine with a new one :param word: other parameter for the operation (a = b) would be like a.logical_operator('eq', 'b') :rtype: Query """ word = self._parse_filter_word(word) self._add_filter( *self._prepare_sentence(self._attribute, operation, word, self._negation)) return self
[docs] @fluent def equals(self, word): """ Add a equals check :param word: word to compare with :rtype: Query """ return self.logical_operator('eq', word)
[docs] @fluent def unequal(self, word): """ Add a unequals check :param word: word to compare with :rtype: Query """ return self.logical_operator('ne', word)
[docs] @fluent def greater(self, word): """ Add a greater than check :param word: word to compare with :rtype: Query """ return self.logical_operator('gt', word)
[docs] @fluent def greater_equal(self, word): """ Add a greater than or equal to check :param word: word to compare with :rtype: Query """ return self.logical_operator('ge', word)
[docs] @fluent def less(self, word): """ Add a less than check :param word: word to compare with :rtype: Query """ return self.logical_operator('lt', word)
[docs] @fluent def less_equal(self, word): """ Add a less than or equal to check :param word: word to compare with :rtype: Query """ return self.logical_operator('le', word)
@staticmethod def _prepare_function(function_name, attribute, word, negation=False): negation = 'not' if negation else '' attrs = (negation, attribute, function_name, word) return "{} {}({}, {})".format(negation, function_name, attribute, word).strip(), attrs
[docs] @fluent def function(self, function_name, word): """ Apply a function on given word :param str function_name: function to apply :param str word: word to apply function on :rtype: Query """ word = self._parse_filter_word(word) self._add_filter( *self._prepare_function(function_name, self._attribute, word, self._negation)) return self
[docs] @fluent def contains(self, word): """ Adds a contains word check :param str word: word to check :rtype: Query """ return self.function('contains', word)
[docs] @fluent def startswith(self, word): """ Adds a startswith word check :param str word: word to check :rtype: Query """ return self.function('startswith', word)
[docs] @fluent def endswith(self, word): """ Adds a endswith word check :param str word: word to check :rtype: Query """ return self.function('endswith', word)
[docs] @fluent def iterable(self, iterable_name, *, collection, word, attribute=None, func=None, operation=None): """ Performs a filter with the OData 'iterable_name' keyword on the collection For example: q.iterable('any', collection='email_addresses', attribute='address', operation='eq', word='george@best.com') will transform to a filter such as: emailAddresses/any(a:a/address eq 'george@best.com') :param str iterable_name: the OData name of the iterable :param str collection: the collection to apply the any keyword on :param str word: the word to check :param str attribute: the attribute of the collection to check :param str func: the logical function to apply to the attribute inside the collection :param str operation: the logical operation to apply to the attribute inside the collection :rtype: Query """ if func is None and operation is None: raise ValueError('Provide a function or an operation to apply') elif func is not None and operation is not None: raise ValueError( 'Provide either a function or an operation but not both') current_att = self._attribute self._attribute = iterable_name word = self._parse_filter_word(word) collection = self._get_mapping(collection) attribute = self._get_mapping(attribute) if attribute is None: attribute = 'a' # it's the same iterated object else: attribute = 'a/{}'.format(attribute) if func is not None: sentence = self._prepare_function(func, attribute, word) else: sentence = self._prepare_sentence(attribute, operation, word) filter_str, attrs = sentence filter_data = '{}/{}(a:{})'.format(collection, iterable_name, filter_str), attrs self._add_filter(*filter_data) self._attribute = current_att return self
[docs] @fluent def any(self, *, collection, word, attribute=None, func=None, operation=None): """ Performs a filter with the OData 'any' keyword on the collection For example: q.any(collection='email_addresses', attribute='address', operation='eq', word='george@best.com') will transform to a filter such as: emailAddresses/any(a:a/address eq 'george@best.com') :param str collection: the collection to apply the any keyword on :param str word: the word to check :param str attribute: the attribute of the collection to check :param str func: the logical function to apply to the attribute inside the collection :param str operation: the logical operation to apply to the attribute inside the collection :rtype: Query """ return self.iterable('any', collection=collection, word=word, attribute=attribute, func=func, operation=operation)
[docs] @fluent def all(self, *, collection, word, attribute=None, func=None, operation=None): """ Performs a filter with the OData 'all' keyword on the collection For example: q.any(collection='email_addresses', attribute='address', operation='eq', word='george@best.com') will transform to a filter such as: emailAddresses/all(a:a/address eq 'george@best.com') :param str collection: the collection to apply the any keyword on :param str word: the word to check :param str attribute: the attribute of the collection to check :param str func: the logical function to apply to the attribute inside the collection :param str operation: the logical operation to apply to the attribute inside the collection :rtype: Query """ return self.iterable('all', collection=collection, word=word, attribute=attribute, func=func, operation=operation)
[docs] @fluent def order_by(self, attribute=None, *, ascending=True): """ Applies a order_by clause :param str attribute: attribute to apply on :param bool ascending: should it apply ascending order or descending :rtype: Query """ attribute = self._get_mapping(attribute) or self._attribute if attribute: self._order_by[attribute] = None if ascending else 'desc' else: raise ValueError( 'Attribute property needed. call on_attribute(attribute) ' 'or new(attribute)') return self
[docs] def open_group(self): """ Applies a precedence grouping in the next filters """ self._open_group_flag.append(True) return self
[docs] def close_group(self): """ Closes a grouping for previous filters """ if self._filters: if len(self._open_group_flag) < (len(self._close_group_flag) + 1): raise RuntimeError('Not enough open groups to close.') if isinstance(self._filters[-1], ChainOperator): flt_sentence = self._filters[-2] else: flt_sentence = self._filters[-1] flt_sentence[1] = flt_sentence[1] + ')' # closing the group self._close_group_flag.append(False) # flag a close group was added else: raise RuntimeError("No filters present. Can't close a group") return self