Source code for O365.address_book

import datetime as dt
import logging

from dateutil.parser import parse
from requests.exceptions import HTTPError

from .utils import Recipients
from .utils import AttachableMixin, TrackerSet
from .utils import Pagination, NEXT_LINK_KEYWORD, ApiComponent
from .message import Message, RecipientType
from .category import Category


log = logging.getLogger(__name__)


[docs]class Contact(ApiComponent, AttachableMixin): """ Contact manages lists of events on associated contact on office365. """ _endpoints = { 'contact': '/contacts', 'root_contact': '/contacts/{id}', 'child_contact': '/contactFolders/{folder_id}/contacts', 'photo': '/contacts/{id}/photo/$value', 'photo_size': '/contacts/{id}/photos/{size}/$value', } message_constructor = Message
[docs] def __init__(self, *, parent=None, con=None, **kwargs): """ Create a contact API component :param parent: parent account for this folder :type parent: Account :param Connection con: connection to use if no parent specified :param Protocol protocol: protocol to use if no parent specified (kwargs) :param str main_resource: use this resource instead of parent resource (kwargs) """ if parent and con: raise ValueError('Need a parent or a connection but not both') self.con = parent.con if parent else con # Choose the main_resource passed in kwargs over parent main_resource main_resource = kwargs.pop('main_resource', None) or ( getattr(parent, 'main_resource', None) if parent else None) super().__init__( protocol=parent.protocol if parent else kwargs.get('protocol'), main_resource=main_resource) cloud_data = kwargs.get(self._cloud_data_key, {}) cc = self._cc # alias to shorten the code # internal to know which properties need to be updated on the server self._track_changes = TrackerSet(casing=cc) self.object_id = cloud_data.get(cc('id'), None) self.__created = cloud_data.get(cc('createdDateTime'), None) self.__modified = cloud_data.get(cc('lastModifiedDateTime'), None) local_tz = self.protocol.timezone self.__created = parse(self.__created).astimezone( local_tz) if self.__created else None self.__modified = parse(self.__modified).astimezone( local_tz) if self.__modified else None self.__display_name = cloud_data.get(cc('displayName'), '') self.__name = cloud_data.get(cc('givenName'), '') self.__surname = cloud_data.get(cc('surname'), '') self.__title = cloud_data.get(cc('title'), '') self.__job_title = cloud_data.get(cc('jobTitle'), '') self.__company_name = cloud_data.get(cc('companyName'), '') self.__department = cloud_data.get(cc('department'), '') self.__office_location = cloud_data.get(cc('officeLocation'), '') self.__business_phones = cloud_data.get(cc('businessPhones'), []) or [] self.__mobile_phone = cloud_data.get(cc('mobilePhone'), '') self.__home_phones = cloud_data.get(cc('homePhones'), []) or [] emails = cloud_data.get(cc('emailAddresses'), []) self.__emails = Recipients( recipients=[(rcp.get(cc('name'), ''), rcp.get(cc('address'), '')) for rcp in emails], parent=self, field=cc('emailAddresses')) email = cloud_data.get(cc('email')) self.__emails.untrack = True if email and email not in self.__emails: # a Contact from OneDrive? self.__emails.add(email) self.__business_address = cloud_data.get(cc('businessAddress'), {}) self.__home_address = cloud_data.get(cc('homesAddress'), {}) self.__other_address = cloud_data.get(cc('otherAddress'), {}) self.__preferred_language = cloud_data.get(cc('preferredLanguage'), None) self.__categories = cloud_data.get(cc('categories'), []) self.__folder_id = cloud_data.get(cc('parentFolderId'), None) self.__personal_notes = cloud_data.get(cc('personalNotes'), '') # When using Users endpoints (GAL) # Missing keys: ['mail', 'userPrincipalName'] mail = cloud_data.get(cc('mail'), None) user_principal_name = cloud_data.get(cc('userPrincipalName'), None) if mail and mail not in self.emails: self.emails.add(mail) if user_principal_name and user_principal_name not in self.emails: self.emails.add(user_principal_name) self.__emails.untrack = False
def __str__(self): return self.__repr__() def __repr__(self): return self.display_name or self.full_name or 'Unknown Name' def __eq__(self, other): return self.object_id == other.object_id @property def created(self): """ Created Time :rtype: datetime """ return self.__created @property def modified(self): """ Last Modified Time :rtype: datetime """ return self.__modified @property def display_name(self): """ Display Name :getter: Get the display name of the contact :setter: Update the display name :type: str """ return self.__display_name @display_name.setter def display_name(self, value): self.__display_name = value self._track_changes.add(self._cc('displayName')) @property def name(self): """ First Name :getter: Get the name of the contact :setter: Update the name :type: str """ return self.__name @name.setter def name(self, value): self.__name = value self._track_changes.add(self._cc('givenName')) @property def surname(self): """ Surname of Contact :getter: Get the surname of the contact :setter: Update the surname :type: str """ return self.__surname @surname.setter def surname(self, value): self.__surname = value self._track_changes.add(self._cc('surname')) @property def full_name(self): """ Full Name (Name + Surname) :rtype: str """ return '{} {}'.format(self.name, self.surname).strip() @property def title(self): """ Title (Mr., Ms., etc..) :getter: Get the title of the contact :setter: Update the title :type: str """ return self.__title @title.setter def title(self, value): self.__title = value self._track_changes.add(self._cc('title')) @property def job_title(self): """ Job Title :getter: Get the job title of contact :setter: Update the job title :type: str """ return self.__job_title @job_title.setter def job_title(self, value): self.__job_title = value self._track_changes.add(self._cc('jobTitle')) @property def company_name(self): """ Name of the company :getter: Get the company name of contact :setter: Update the company name :type: str """ return self.__company_name @company_name.setter def company_name(self, value): self.__company_name = value self._track_changes.add(self._cc('companyName')) @property def department(self): """ Department :getter: Get the department of contact :setter: Update the department :type: str """ return self.__department @department.setter def department(self, value): self.__department = value self._track_changes.add(self._cc('department')) @property def office_location(self): """ Office Location :getter: Get the office location of contact :setter: Update the office location :type: str """ return self.__office_location @office_location.setter def office_location(self, value): self.__office_location = value self._track_changes.add(self._cc('officeLocation')) @property def business_phones(self): """ Business Contact numbers :getter: Get the contact numbers of contact :setter: Update the contact numbers :type: list[str] """ return self.__business_phones @business_phones.setter def business_phones(self, value): if isinstance(value, tuple): value = list(value) if not isinstance(value, list): value = [value] self.__business_phones = value self._track_changes.add(self._cc('businessPhones')) @property def mobile_phone(self): """ Personal Contact numbers :getter: Get the contact numbers of contact :setter: Update the contact numbers :type: list[str] """ return self.__mobile_phone @mobile_phone.setter def mobile_phone(self, value): self.__mobile_phone = value self._track_changes.add(self._cc('mobilePhone')) @property def home_phones(self): """ Home Contact numbers :getter: Get the contact numbers of contact :setter: Update the contact numbers :type: list[str] """ return self.__home_phones @home_phones.setter def home_phones(self, value): if isinstance(value, list): self.__home_phones = value elif isinstance(value, str): self.__home_phones = [value] elif isinstance(value, tuple): self.__home_phones = list(value) else: raise ValueError('home_phones must be a list') self._track_changes.add(self._cc('homePhones')) @property def emails(self): """ List of email ids of the Contact :rtype: Recipients """ return self.__emails @property def main_email(self): """ Primary(First) email id of the Contact :rtype: str """ if not self.emails: return None return self.emails[0].address @property def business_address(self): """ Business Address :getter: Get the address of contact :setter: Update the address :type: dict """ return self.__business_address @business_address.setter def business_address(self, value): if not isinstance(value, dict): raise ValueError('"business_address" must be dict') self.__business_address = value self._track_changes.add(self._cc('businessAddress')) @property def home_address(self): """ Home Address :getter: Get the address of contact :setter: Update the address :type: dict """ return self.__home_address @home_address.setter def home_address(self, value): if not isinstance(value, dict): raise ValueError('"home_address" must be dict') self.__home_address = value self._track_changes.add(self._cc('homesAddress')) @property def other_address(self): """ Other Address :getter: Get the address of contact :setter: Update the address :type: dict """ return self.__other_address @other_address.setter def other_address(self, value): if not isinstance(value, dict): raise ValueError('"other_address" must be dict') self.__other_address = value self._track_changes.add(self._cc('otherAddress')) @property def preferred_language(self): """ Preferred Language :getter: Get the language of contact :setter: Update the language :type: str """ return self.__preferred_language @preferred_language.setter def preferred_language(self, value): self.__preferred_language = value self._track_changes.add(self._cc('preferredLanguage')) @property def categories(self): """ Assigned Categories :getter: Get the categories :setter: Update the categories :type: list[str] """ return self.__categories @categories.setter def categories(self, value): if isinstance(value, list): self.__categories = [] for val in value: if isinstance(val, Category): self.__categories.append(val.name) else: self.__categories.append(val) elif isinstance(value, str): self.__categories = [value] elif isinstance(value, Category): self.__categories = [value.name] else: raise ValueError('categories must be a list') self._track_changes.add(self._cc('categories')) @property def personal_notes(self): return self.__personal_notes @personal_notes.setter def personal_notes(self, value): self.__personal_notes = value self._track_changes.add(self._cc('personalNotes')) @property def folder_id(self): """ ID of the folder :rtype: str """ return self.__folder_id
[docs] def to_api_data(self, restrict_keys=None): """ Returns a dictionary in cloud format :param restrict_keys: a set of keys to restrict the returned data to. """ cc = self._cc # alias data = { cc('displayName'): self.__display_name, cc('givenName'): self.__name, cc('surname'): self.__surname, cc('title'): self.__title, cc('jobTitle'): self.__job_title, cc('companyName'): self.__company_name, cc('department'): self.__department, cc('officeLocation'): self.__office_location, cc('businessPhones'): self.__business_phones, cc('mobilePhone'): self.__mobile_phone, cc('homePhones'): self.__home_phones, cc('emailAddresses'): [{self._cc('name'): recipient.name or '', self._cc('address'): recipient.address} for recipient in self.emails], cc('businessAddress'): self.__business_address, cc('homesAddress'): self.__home_address, cc('otherAddress'): self.__other_address, cc('categories'): self.__categories, cc('personalNotes'): self.__personal_notes, } if restrict_keys: restrict_keys.add(cc( 'givenName')) # GivenName is required by the api all the time. for key in list(data.keys()): if key not in restrict_keys: del data[key] return data
[docs] def delete(self): """ Deletes this contact :return: Success or Failure :rtype: bool :raises RuntimeError: if contact is not yet saved to cloud """ if not self.object_id: raise RuntimeError('Attempting to delete an unsaved Contact') url = self.build_url( self._endpoints.get('root_contact').format(id=self.object_id)) response = self.con.delete(url) return bool(response)
[docs] def save(self): """ Saves this contact to the cloud (create or update existing one based on what values have changed) :return: Saved or Not :rtype: bool """ if self.object_id: # Update Contact if not self._track_changes: return True # there's nothing to update url = self.build_url( self._endpoints.get('root_contact').format(id=self.object_id)) method = self.con.patch data = self.to_api_data(restrict_keys=self._track_changes) else: # Save new Contact if self.__folder_id: url = self.build_url( self._endpoints.get('child_contact').format( folder_id=self.__folder_id)) else: url = self.build_url(self._endpoints.get('contact')) method = self.con.post data = self.to_api_data(restrict_keys=self._track_changes) response = method(url, data=data) if not response: return False if not self.object_id: # New Contact contact = response.json() self.object_id = contact.get(self._cc('id'), None) self.__created = contact.get(self._cc('createdDateTime'), None) self.__modified = contact.get(self._cc('lastModifiedDateTime'), None) local_tz = self.protocol.timezone self.__created = parse(self.created).astimezone( local_tz) if self.__created else None self.__modified = parse(self.modified).astimezone( local_tz) if self.__modified else None else: self.__modified = self.protocol.timezone.localize(dt.datetime.now()) return True
[docs] def new_message(self, recipient=None, *, recipient_type=RecipientType.TO): """ This method returns a new draft Message instance with contacts first email as a recipient :param Recipient recipient: a Recipient instance where to send this message. If None first email of this contact will be used :param RecipientType recipient_type: section to add recipient into :return: newly created message :rtype: Message or None """ if isinstance(recipient_type, str): recipient_type = RecipientType(recipient_type) recipient = recipient or self.emails.get_first_recipient_with_address() if not recipient: return None new_message = self.message_constructor(parent=self, is_draft=True) target_recipients = getattr(new_message, str(recipient_type.value)) target_recipients.add(recipient) return new_message
[docs] def get_profile_photo(self, size=None): """ Returns this contact profile photo :param str size: 48x48, 64x64, 96x96, 120x120, 240x240, 360x360, 432x432, 504x504, and 648x648 """ if size is None: url = self.build_url(self._endpoints.get('photo').format(id=self.object_id)) else: url = self.build_url(self._endpoints.get('photo_size').format(id=self.object_id, size=size)) try: response = self.con.get(url) except HTTPError as e: log.debug('Error while retrieving the contact profile photo. Error: {}'.format(e)) return None if not response: return None return response.content
[docs] def update_profile_photo(self, photo): """ Updates this contact profile photo :param bytes photo: the photo data in bytes """ url = self.build_url(self._endpoints.get('photo').format(id=self.object_id)) response = self.con.patch(url, data=photo, headers={'Content-type': 'image/jpeg'}) return bool(response)
[docs]class BaseContactFolder(ApiComponent): """ Base Contact Folder Grouping Functionality """ _endpoints = { 'root_contacts': '/contacts', 'folder_contacts': '/contactFolders/{id}/contacts', 'get_folder': '/contactFolders/{id}', 'root_folders': '/contactFolders', 'child_folders': '/contactFolders/{id}/childFolders' } contact_constructor = Contact message_constructor = Message
[docs] def __init__(self, *, parent=None, con=None, **kwargs): """ Create a contact folder component :param parent: parent folder/account for this folder :type parent: BaseContactFolder or Account :param Connection con: connection to use if no parent specified :param Protocol protocol: protocol to use if no parent specified (kwargs) :param str main_resource: use this resource instead of parent resource (kwargs) """ if parent and con: raise ValueError('Need a parent or a connection but not both') self.con = parent.con if parent else con # Choose the main_resource passed in kwargs over parent main_resource main_resource = kwargs.pop('main_resource', None) or ( getattr(parent, 'main_resource', None) if parent else None) super().__init__( protocol=parent.protocol if parent else kwargs.get('protocol'), main_resource=main_resource) # This folder has no parents if root = True. self.root = kwargs.pop('root', False) cloud_data = kwargs.get(self._cloud_data_key, {}) # Fallback to manual folder if nothing available on cloud data self.name = cloud_data.get(self._cc('displayName'), kwargs.get('name', '')) # TODO: Most of above code is same as mailbox.Folder __init__ self.folder_id = cloud_data.get(self._cc('id'), None) self.parent_id = cloud_data.get(self._cc('parentFolderId'), None)
def __str__(self): return self.__repr__() def __repr__(self): return 'Contact Folder: {}'.format(self.name) def __eq__(self, other): return self.folder_id == other.folder_id
[docs] def get_contacts(self, limit=100, *, query=None, order_by=None, batch=None): """ Gets a list of contacts from this address book To use query an order_by check the OData specification here: http://docs.oasis-open.org/odata/odata/v4.0/errata03/os/complete/ part2-url-conventions/odata-v4.0-errata03-os-part2-url-conventions -complete.html :param limit: max no. of contacts to get. Over 999 uses batch. :type limit: int or None :param query: applies a OData filter to the request :type query: Query or str :param order_by: orders the result set based on this condition :type order_by: Query or str :param int batch: batch size, retrieves items in batches allowing to retrieve more items than the limit. :return: list of contacts :rtype: list[Contact] or Pagination """ if self.root: url = self.build_url(self._endpoints.get('root_contacts')) else: url = self.build_url( self._endpoints.get('folder_contacts').format( id=self.folder_id)) if limit is None or limit > self.protocol.max_top_value: batch = self.protocol.max_top_value params = {'$top': batch if batch else limit} if order_by: params['$orderby'] = order_by if query: if isinstance(query, str): params['$filter'] = query else: params.update(query.as_params()) response = self.con.get(url, params=params) if not response: return iter(()) data = response.json() # Everything received from cloud must be passed as self._cloud_data_key contacts = (self.contact_constructor(parent=self, **{self._cloud_data_key: contact}) for contact in data.get('value', [])) next_link = data.get(NEXT_LINK_KEYWORD, None) if batch and next_link: return Pagination(parent=self, data=contacts, constructor=self.contact_constructor, next_link=next_link, limit=limit) else: return contacts
[docs] def get_contact_by_email(self, email): """ Returns a Contact by it's email :param email: email to get contact for :return: Contact for specified email :rtype: Contact """ if not email: return None query = self.q().any(collection='email_addresses', attribute='address', word=email.strip(), operation='eq') contacts = list(self.get_contacts(limit=1, query=query)) return contacts[0] if contacts else None
[docs]class ContactFolder(BaseContactFolder): """ A Contact Folder representation """
[docs] def get_folder(self, folder_id=None, folder_name=None): """ Returns a Contact Folder by it's id or child folders by name :param folder_id: the folder_id to be retrieved. Can be any folder Id (child or not) :param folder_name: the folder name to be retrieved. Must be a child of this folder :return: a single contact folder :rtype: ContactFolder """ if folder_id and folder_name: raise RuntimeError('Provide only one of the options') if not folder_id and not folder_name: raise RuntimeError('Provide one of the options') if folder_id: # get folder by it's id, independent of the parent of this folder_id url = self.build_url( self._endpoints.get('get_folder').format(id=folder_id)) params = None else: # get folder by name. Only looks up in child folders. if self.root: url = self.build_url(self._endpoints.get('root_folders')) else: url = self.build_url( self._endpoints.get('child_folders').format( id=self.folder_id)) params = {'$filter': "{} eq '{}'".format(self._cc('displayName'), folder_name), '$top': 1} response = self.con.get(url, params=params) if not response: return None if folder_id: folder = response.json() else: folder = response.json().get('value') folder = folder[0] if folder else None if folder is None: return None # Everything received from cloud must be passed as self._cloud_data_key # we don't pass parent, as this folder may not be a child of self. return self.__class__(con=self.con, protocol=self.protocol, main_resource=self.main_resource, **{self._cloud_data_key: folder})
[docs] def get_folders(self, limit=None, *, query=None, order_by=None): """ Returns a list of child folders :param int limit: max no. of folders to get. Over 999 uses batch. :param query: applies a OData filter to the request :type query: Query or str :param order_by: orders the result set based on this condition :type order_by: Query or str :return: list of folders :rtype: list[ContactFolder] """ if self.root: url = self.build_url(self._endpoints.get('root_folders')) else: url = self.build_url( self._endpoints.get('child_folders').format(id=self.folder_id)) params = {} if limit: params['$top'] = limit if order_by: params['$orderby'] = order_by if query: if isinstance(query, str): params['$filter'] = query else: params.update(query.as_params()) response = self.con.get(url, params=params or None) if not response: return [] data = response.json() return [self.__class__(parent=self, **{self._cloud_data_key: folder}) for folder in data.get('value', [])]
[docs] def create_child_folder(self, folder_name): """ Creates a new child folder :param str folder_name: name of the new folder to create :return: newly created folder :rtype: ContactFolder or None """ if not folder_name: return None if self.root: url = self.build_url(self._endpoints.get('root_folders')) else: url = self.build_url( self._endpoints.get('child_folders').format(id=self.folder_id)) response = self.con.post(url, data={self._cc('displayName'): folder_name}) if not response: return None folder = response.json() # Everything received from cloud must be passed as self._cloud_data_key return self.__class__(parent=self, **{self._cloud_data_key: folder})
[docs] def update_folder_name(self, name): """ Change this folder name :param str name: new name to change to :return: Updated or Not :rtype: bool """ if self.root: return False if not name: return False url = self.build_url( self._endpoints.get('get_folder').format(id=self.folder_id)) response = self.con.patch(url, data={self._cc('displayName'): name}) if not response: return False folder = response.json() self.name = folder.get(self._cc('displayName'), '') self.parent_id = folder.get(self._cc('parentFolderId'), None) return True
[docs] def move_folder(self, to_folder): """ Change this folder name :param to_folder: folder_id/ContactFolder to move into :type to_folder: str or ContactFolder :return: Moved or Not :rtype: bool """ if self.root: return False if not to_folder: return False url = self.build_url( self._endpoints.get('get_folder').format(id=self.folder_id)) if isinstance(to_folder, ContactFolder): folder_id = to_folder.folder_id elif isinstance(to_folder, str): folder_id = to_folder else: return False response = self.con.patch(url, data={self._cc('parentFolderId'): folder_id}) if not response: return False folder = response.json() self.name = folder.get(self._cc('displayName'), '') self.parent_id = folder.get(self._cc('parentFolderId'), None) return True
[docs] def delete(self): """ Deletes this folder :return: Deleted or Not :rtype: bool """ if self.root or not self.folder_id: return False url = self.build_url( self._endpoints.get('get_folder').format(id=self.folder_id)) response = self.con.delete(url) if not response: return False self.folder_id = None return True
[docs] def new_contact(self): """ Creates a new contact to be saved into it's parent folder :return: newly created contact :rtype: Contact """ contact = self.contact_constructor(parent=self) if not self.root: contact.__folder_id = self.folder_id return contact
[docs] def new_message(self, recipient_type=RecipientType.TO, *, query=None): """ This method returns a new draft Message instance with all the contacts first email as a recipient :param RecipientType recipient_type: section to add recipient into :param query: applies a OData filter to the request :type query: Query or str :return: newly created message :rtype: Message or None """ if isinstance(recipient_type, str): recipient_type = RecipientType(recipient_type) recipients = [contact.emails[0] for contact in self.get_contacts(limit=None, query=query) if contact.emails and contact.emails[0].address] if not recipients: return None new_message = self.message_constructor(parent=self, is_draft=True) target_recipients = getattr(new_message, str(recipient_type.value)) target_recipients.add(recipients) return new_message
[docs]class AddressBook(ContactFolder): """ A class representing an address book """
[docs] def __init__(self, *, parent=None, con=None, **kwargs): # Set instance to be a root instance super().__init__(parent=parent, con=con, root=True, **kwargs)
def __repr__(self): return 'Address Book resource: {}'.format(self.main_resource)