Source code for O365.mailbox

import datetime as dt
import logging

from .message import Message
from .utils import Pagination, NEXT_LINK_KEYWORD, \
    OutlookWellKnowFolderNames, ApiComponent

log = logging.getLogger(__name__)


[docs]class Folder(ApiComponent): """ A Mail Folder representation """ _endpoints = { 'root_folders': '/mailFolders', 'child_folders': '/mailFolders/{id}/childFolders', 'get_folder': '/mailFolders/{id}', 'root_messages': '/messages', 'folder_messages': '/mailFolders/{id}/messages', 'copy_folder': '/mailFolders/{id}/copy', 'move_folder': '/mailFolders/{id}/move', 'message': '/messages/{id}', } message_constructor = Message
[docs] def __init__(self, *, parent=None, con=None, **kwargs): """ Create an instance to represent the specified folder un given parent folder :param parent: parent folder/account for this folder :type parent: mailbox.Folder or Account :param Connection con: connection to use if no parent specified :param Protocol protocol: protocol to use if no parent specified (kwargs) :param str main_resource: use this resource instead of parent resource (kwargs) :param str name: name of the folder to get under the parent (kwargs) :param str folder_id: id of the folder to get under the parent (kwargs) """ if parent and con: raise ValueError('Need a parent or a connection but not both') self.con = parent.con if parent else con self.parent = parent if isinstance(parent, Folder) else None # This folder has no parents if root = True. self.root = kwargs.pop('root', False) # Choose the main_resource passed in kwargs over parent main_resource main_resource = kwargs.pop('main_resource', None) or ( getattr(parent, 'main_resource', None) if parent else None) super().__init__( protocol=parent.protocol if parent else kwargs.get('protocol'), main_resource=main_resource) cloud_data = kwargs.get(self._cloud_data_key, {}) # Fallback to manual folder if nothing available on cloud data self.name = cloud_data.get(self._cc('displayName'), kwargs.get('name', '')) if self.root is False: # Fallback to manual folder if nothing available on cloud data self.folder_id = cloud_data.get(self._cc('id'), kwargs.get('folder_id', None)) self.parent_id = cloud_data.get(self._cc('parentFolderId'), None) self.child_folders_count = cloud_data.get( self._cc('childFolderCount'), 0) self.unread_items_count = cloud_data.get( self._cc('unreadItemCount'), 0) self.total_items_count = cloud_data.get(self._cc('totalItemCount'), 0) self.updated_at = dt.datetime.now() else: self.folder_id = 'root'
def __str__(self): return self.__repr__() def __repr__(self): return '{} from resource: {}'.format(self.name, self.main_resource) def __eq__(self, other): return self.folder_id == other.folder_id
[docs] def get_folders(self, limit=None, *, query=None, order_by=None, batch=None): """ Returns a list of child folders matching the query :param int limit: max no. of folders to get. Over 999 uses batch. :param query: applies a filter to the request such as "displayName eq 'HelloFolder'" :type query: Query or str :param order_by: orders the result set based on this condition :type order_by: Query or str :param int batch: batch size, retrieves items in batches allowing to retrieve more items than the limit. :return: list of folders :rtype: list[mailbox.Folder] or Pagination """ if self.root: url = self.build_url(self._endpoints.get('root_folders')) else: url = self.build_url( self._endpoints.get('child_folders').format(id=self.folder_id)) if limit is None or limit > self.protocol.max_top_value: batch = self.protocol.max_top_value params = {'$top': batch if batch else limit} if order_by: params['$orderby'] = order_by if query: if isinstance(query, str): params['$filter'] = query else: params.update(query.as_params()) response = self.con.get(url, params=params) if not response: return [] data = response.json() # Everything received from cloud must be passed as self._cloud_data_key self_class = getattr(self, 'folder_constructor', type(self)) folders = [self_class(parent=self, **{self._cloud_data_key: folder}) for folder in data.get('value', [])] next_link = data.get(NEXT_LINK_KEYWORD, None) if batch and next_link: return Pagination(parent=self, data=folders, constructor=self_class, next_link=next_link, limit=limit) else: return folders
[docs] def get_message(self, object_id=None, query=None, *, download_attachments=False): """ Get one message from the query result. A shortcut to get_messages with limit=1 :param object_id: the message id to be retrieved. :param query: applies a filter to the request such as "displayName eq 'HelloFolder'" :type query: Query or str :param bool download_attachments: whether or not to download attachments :return: one Message :rtype: Message or None """ if object_id is None and query is None: raise ValueError('Must provide object id or query.') if object_id is not None: url = self.build_url(self._endpoints.get('message').format(id=object_id)) params = None if query and (query.has_selects or query.has_expands): params = query.as_params() response = self.con.get(url, params=params) if not response: return None message = response.json() return self.message_constructor(parent=self, download_attachments=download_attachments, **{self._cloud_data_key: message}) else: messages = list(self.get_messages(limit=1, query=query, download_attachments=download_attachments)) return messages[0] if messages else None
[docs] def get_messages(self, limit=25, *, query=None, order_by=None, batch=None, download_attachments=False): """ Downloads messages from this folder :param int limit: limits the result set. Over 999 uses batch. :param query: applies a filter to the request such as "displayName eq 'HelloFolder'" :type query: Query or str :param order_by: orders the result set based on this condition :type order_by: Query or str :param int batch: batch size, retrieves items in batches allowing to retrieve more items than the limit. :param bool download_attachments: whether or not to download attachments :return: list of messages :rtype: list[Message] or Pagination """ if self.root: url = self.build_url(self._endpoints.get('root_messages')) else: url = self.build_url(self._endpoints.get('folder_messages').format( id=self.folder_id)) if limit is None or limit > self.protocol.max_top_value: batch = self.protocol.max_top_value params = {'$top': batch if batch else limit} if order_by: params['$orderby'] = order_by if query: if isinstance(query, str): params['$filter'] = query else: params.update(query.as_params()) response = self.con.get(url, params=params) if not response: return iter(()) data = response.json() # Everything received from cloud must be passed as self._cloud_data_key messages = (self.message_constructor( parent=self, download_attachments=download_attachments, **{self._cloud_data_key: message}) for message in data.get('value', [])) next_link = data.get(NEXT_LINK_KEYWORD, None) if batch and next_link: return Pagination(parent=self, data=messages, constructor=self.message_constructor, next_link=next_link, limit=limit, download_attachments=download_attachments) else: return messages
[docs] def create_child_folder(self, folder_name): """ Creates a new child folder under this folder :param str folder_name: name of the folder to add :return: newly created folder :rtype: mailbox.Folder or None """ if not folder_name: return None if self.root: url = self.build_url(self._endpoints.get('root_folders')) else: url = self.build_url( self._endpoints.get('child_folders').format(id=self.folder_id)) response = self.con.post(url, data={self._cc('displayName'): folder_name}) if not response: return None folder = response.json() self_class = getattr(self, 'folder_constructor', type(self)) # Everything received from cloud must be passed as self._cloud_data_key return self_class(parent=self, **{self._cloud_data_key: folder})
[docs] def get_folder(self, *, folder_id=None, folder_name=None): """ Get a folder by it's id or name :param str folder_id: the folder_id to be retrieved. Can be any folder Id (child or not) :param str folder_name: the folder name to be retrieved. Must be a child of this folder. :return: a single folder :rtype: mailbox.Folder or None """ if folder_id and folder_name: raise RuntimeError('Provide only one of the options') if not folder_id and not folder_name: raise RuntimeError('Provide one of the options') if folder_id: # get folder by it's id, independent of the parent of this folder_id url = self.build_url( self._endpoints.get('get_folder').format(id=folder_id)) params = None else: # get folder by name. Only looks up in child folders. if self.root: url = self.build_url(self._endpoints.get('root_folders')) else: url = self.build_url( self._endpoints.get('child_folders').format( id=self.folder_id)) params = {'$filter': "{} eq '{}'".format(self._cc('displayName'), folder_name), '$top': 1} response = self.con.get(url, params=params) if not response: return None if folder_id: folder = response.json() else: folder = response.json().get('value') folder = folder[0] if folder else None if folder is None: return None self_class = getattr(self, 'folder_constructor', type(self)) # Everything received from cloud must be passed as self._cloud_data_key # We don't pass parent, as this folder may not be a child of self. return self_class(con=self.con, protocol=self.protocol, main_resource=self.main_resource, **{self._cloud_data_key: folder})
[docs] def refresh_folder(self, update_parent_if_changed=False): """ Re-download folder data Inbox Folder will be unable to download its own data (no folder_id) :param bool update_parent_if_changed: updates self.parent with new parent Folder if changed :return: Refreshed or Not :rtype: bool """ folder_id = getattr(self, 'folder_id', None) if self.root or folder_id is None: return False folder = self.get_folder(folder_id=folder_id) if folder is None: return False self.name = folder.name if folder.parent_id and self.parent_id: if folder.parent_id != self.parent_id: self.parent_id = folder.parent_id self.parent = (self.get_parent_folder() if update_parent_if_changed else None) self.child_folders_count = folder.child_folders_count self.unread_items_count = folder.unread_items_count self.total_items_count = folder.total_items_count self.updated_at = folder.updated_at return True
[docs] def get_parent_folder(self): """ Get the parent folder from attribute self.parent or getting it from the cloud :return: Parent Folder :rtype: mailbox.Folder or None """ if self.root: return None if self.parent: return self.parent if self.parent_id: self.parent = self.get_folder(folder_id=self.parent_id) return self.parent
[docs] def update_folder_name(self, name, update_folder_data=True): """ Change this folder name :param str name: new name to change to :param bool update_folder_data: whether or not to re-fetch the data :return: Updated or Not :rtype: bool """ if self.root: return False if not name: return False url = self.build_url( self._endpoints.get('get_folder').format(id=self.folder_id)) response = self.con.patch(url, data={self._cc('displayName'): name}) if not response: return False self.name = name if not update_folder_data: return True folder = response.json() self.name = folder.get(self._cc('displayName'), '') self.parent_id = folder.get(self._cc('parentFolderId'), None) self.child_folders_count = folder.get(self._cc('childFolderCount'), 0) self.unread_items_count = folder.get(self._cc('unreadItemCount'), 0) self.total_items_count = folder.get(self._cc('totalItemCount'), 0) self.updated_at = dt.datetime.now() return True
[docs] def delete(self): """ Deletes this folder :return: Deleted or Not :rtype: bool """ if self.root or not self.folder_id: return False url = self.build_url( self._endpoints.get('get_folder').format(id=self.folder_id)) response = self.con.delete(url) if not response: return False self.folder_id = None return True
[docs] def copy_folder(self, to_folder): """ Copy this folder and it's contents to into another folder :param to_folder: the destination Folder/folder_id to copy into :type to_folder: mailbox.Folder or str :return: The new folder after copying :rtype: mailbox.Folder or None """ to_folder_id = to_folder.folder_id if isinstance(to_folder, Folder) else to_folder if self.root or not self.folder_id or not to_folder_id: return None url = self.build_url( self._endpoints.get('copy_folder').format(id=self.folder_id)) response = self.con.post(url, data={self._cc('destinationId'): to_folder_id}) if not response: return None folder = response.json() self_class = getattr(self, 'folder_constructor', type(self)) # Everything received from cloud must be passed as self._cloud_data_key return self_class(con=self.con, main_resource=self.main_resource, **{self._cloud_data_key: folder})
[docs] def move_folder(self, to_folder, *, update_parent_if_changed=True): """ Move this folder to another folder :param to_folder: the destination Folder/folder_id to move into :type to_folder: mailbox.Folder or str :param bool update_parent_if_changed: updates self.parent with the new parent Folder if changed :return: The new folder after copying :rtype: mailbox.Folder or None """ to_folder_id = to_folder.folder_id if isinstance(to_folder, Folder) else to_folder if self.root or not self.folder_id or not to_folder_id: return False url = self.build_url( self._endpoints.get('move_folder').format(id=self.folder_id)) response = self.con.post(url, data={self._cc('destinationId'): to_folder_id}) if not response: return False folder = response.json() parent_id = folder.get(self._cc('parentFolderId'), None) if parent_id and self.parent_id: if parent_id != self.parent_id: self.parent_id = parent_id self.parent = (self.get_parent_folder() if update_parent_if_changed else None) return True
[docs] def new_message(self): """ Creates a new draft message under this folder :return: new Message :rtype: Message """ draft_message = self.message_constructor(parent=self, is_draft=True) if self.root: draft_message.folder_id = OutlookWellKnowFolderNames.DRAFTS.value else: draft_message.folder_id = self.folder_id return draft_message
[docs] def delete_message(self, message): """ Deletes a stored message :param message: message/message_id to delete :type message: Message or str :return: Success / Failure :rtype: bool """ message_id = message.object_id if isinstance(message, Message) else message if message_id is None: raise RuntimeError('Provide a valid Message or a message id') url = self.build_url( self._endpoints.get('message').format(id=message_id)) response = self.con.delete(url) return bool(response)
[docs]class MailBox(Folder): folder_constructor = Folder
[docs] def __init__(self, *, parent=None, con=None, **kwargs): super().__init__(parent=parent, con=con, root=True, **kwargs)
[docs] def inbox_folder(self): """ Shortcut to get Inbox Folder instance :rtype: mailbox.Folder """ return self.folder_constructor(parent=self, name='Inbox', folder_id=OutlookWellKnowFolderNames .INBOX.value)
[docs] def junk_folder(self): """ Shortcut to get Junk Folder instance :rtype: mailbox.Folder """ return self.folder_constructor(parent=self, name='Junk', folder_id=OutlookWellKnowFolderNames .JUNK.value)
[docs] def deleted_folder(self): """ Shortcut to get DeletedItems Folder instance :rtype: mailbox.Folder """ return self.folder_constructor(parent=self, name='DeletedItems', folder_id=OutlookWellKnowFolderNames .DELETED.value)
[docs] def drafts_folder(self): """ Shortcut to get Drafts Folder instance :rtype: mailbox.Folder """ return self.folder_constructor(parent=self, name='Drafts', folder_id=OutlookWellKnowFolderNames .DRAFTS.value)
[docs] def sent_folder(self): """ Shortcut to get SentItems Folder instance :rtype: mailbox.Folder """ return self.folder_constructor(parent=self, name='SentItems', folder_id=OutlookWellKnowFolderNames .SENT.value)
[docs] def outbox_folder(self): """ Shortcut to get Outbox Folder instance :rtype: mailbox.Folder """ return self.folder_constructor(parent=self, name='Outbox', folder_id=OutlookWellKnowFolderNames .OUTBOX.value)
[docs] def archive_folder(self): """ Shortcut to get Archive Folder instance :rtype: mailbox.Folder """ return self.folder_constructor(parent=self, name='Archive', folder_id=OutlookWellKnowFolderNames .ARCHIVE.value)