Source code for O365.directory

import logging

from dateutil.parser import parse
from requests.exceptions import HTTPError

from .message import Message, RecipientType
from .utils import ME_RESOURCE, NEXT_LINK_KEYWORD, ApiComponent, Pagination

USERS_RESOURCE = 'users'

log = logging.getLogger(__name__)


[docs] class User(ApiComponent): _endpoints = { 'photo': '/photo/$value', 'photo_size': '/photos/{size}/$value' } message_constructor = Message
[docs] def __init__(self, *, parent=None, con=None, **kwargs): """ Represents an Azure AD user account :param parent: parent object :type parent: Account :param Connection con: connection to use if no parent specified :param Protocol protocol: protocol to use if no parent specified (kwargs) :param str main_resource: use this resource instead of parent resource (kwargs) """ if parent and con: raise ValueError('Need a parent or a connection but not both') self.con = parent.con if parent else con # Choose the main_resource passed in kwargs over parent main_resource main_resource = kwargs.pop('main_resource', None) or ( getattr(parent, 'main_resource', None) if parent else None) cloud_data = kwargs.get(self._cloud_data_key, {}) self.object_id = cloud_data.get('id') if main_resource == USERS_RESOURCE: main_resource += f'/{self.object_id}' super().__init__( protocol=parent.protocol if parent else kwargs.get('protocol'), main_resource=main_resource) local_tz = self.protocol.timezone cc = self._cc self.type = cloud_data.get('@odata.type') self.user_principal_name = cloud_data.get(cc('userPrincipalName')) self.display_name = cloud_data.get(cc('displayName')) self.given_name = cloud_data.get(cc('givenName'), '') self.surname = cloud_data.get(cc('surname'), '') self.mail = cloud_data.get(cc('mail')) # read only self.business_phones = cloud_data.get(cc('businessPhones'), []) self.job_title = cloud_data.get(cc('jobTitle')) self.mobile_phone = cloud_data.get(cc('mobilePhone')) self.office_location = cloud_data.get(cc('officeLocation')) self.preferred_language = cloud_data.get(cc('preferredLanguage')) # End of default properties. Next properties must be selected self.about_me = cloud_data.get(cc('aboutMe')) self.account_enabled = cloud_data.get(cc('accountEnabled')) self.age_group = cloud_data.get(cc('ageGroup')) self.assigned_licenses = cloud_data.get(cc('assignedLicenses')) self.assigned_plans = cloud_data.get(cc('assignedPlans')) # read only birthday = cloud_data.get(cc('birthday')) self.birthday = parse(birthday).astimezone(local_tz) if birthday else None self.city = cloud_data.get(cc('city')) self.company_name = cloud_data.get(cc('companyName')) self.consent_provided_for_minor = cloud_data.get(cc('consentProvidedForMinor')) self.country = cloud_data.get(cc('country')) created = cloud_data.get(cc('createdDateTime')) self.created = parse(created).astimezone( local_tz) if created else None self.department = cloud_data.get(cc('department')) self.employee_id = cloud_data.get(cc('employeeId')) self.fax_number = cloud_data.get(cc('faxNumber')) hire_date = cloud_data.get(cc('hireDate')) self.hire_date = parse(hire_date).astimezone( local_tz) if hire_date else None self.im_addresses = cloud_data.get(cc('imAddresses')) # read only self.interests = cloud_data.get(cc('interests')) self.is_resource_account = cloud_data.get(cc('isResourceAccount')) last_password_change = cloud_data.get(cc('lastPasswordChangeDateTime')) self.last_password_change = parse(last_password_change).astimezone( local_tz) if last_password_change else None self.legal_age_group_classification = cloud_data.get(cc('legalAgeGroupClassification')) self.license_assignment_states = cloud_data.get(cc('licenseAssignmentStates')) # read only self.mailbox_settings = cloud_data.get(cc('mailboxSettings')) self.mail_nickname = cloud_data.get(cc('mailNickname')) self.my_site = cloud_data.get(cc('mySite')) self.other_mails = cloud_data.get(cc('otherMails')) self.password_policies = cloud_data.get(cc('passwordPolicies')) self.password_profile = cloud_data.get(cc('passwordProfile')) self.past_projects = cloud_data.get(cc('pastProjects')) self.postal_code = cloud_data.get(cc('postalCode')) self.preferred_data_location = cloud_data.get(cc('preferredDataLocation')) self.preferred_name = cloud_data.get(cc('preferredName')) self.provisioned_plans = cloud_data.get(cc('provisionedPlans')) # read only self.proxy_addresses = cloud_data.get(cc('proxyAddresses')) # read only self.responsibilities = cloud_data.get(cc('responsibilities')) self.schools = cloud_data.get(cc('schools')) self.show_in_address_list = cloud_data.get(cc('showInAddressList'), True) self.skills = cloud_data.get(cc('skills')) sign_in_sessions_valid_from = cloud_data.get(cc('signInSessionsValidFromDateTime')) # read only self.sign_in_sessions_valid_from = parse(sign_in_sessions_valid_from).astimezone( local_tz) if sign_in_sessions_valid_from else None self.state = cloud_data.get(cc('state')) self.street_address = cloud_data.get(cc('streetAddress')) self.usage_location = cloud_data.get(cc('usageLocation')) self.user_type = cloud_data.get(cc('userType')) self.on_premises_sam_account_name = cloud_data.get(cc('onPremisesSamAccountName'))
def __str__(self): return self.__repr__() def __repr__(self): return self.display_name or self.full_name or self.user_principal_name or 'Unknown Name' def __eq__(self, other): return self.object_id == other.object_id def __hash__(self): return self.object_id.__hash__() @property def full_name(self): """ Full Name (Name + Surname) :rtype: str """ return f'{self.given_name} {self.surname}'.strip()
[docs] def new_message(self, recipient=None, *, recipient_type=RecipientType.TO): """ This method returns a new draft Message instance with this user email as a recipient :param Recipient recipient: a Recipient instance where to send this message. If None the email of this contact will be used :param RecipientType recipient_type: section to add recipient into :return: newly created message :rtype: Message or None """ if isinstance(recipient_type, str): recipient_type = RecipientType(recipient_type) recipient = recipient or self.mail if not recipient: return None new_message = self.message_constructor(parent=self, is_draft=True) target_recipients = getattr(new_message, str(recipient_type.value)) target_recipients.add(recipient) return new_message
[docs] def get_profile_photo(self, size=None): """Returns the user profile photo :param str size: 48x48, 64x64, 96x96, 120x120, 240x240, 360x360, 432x432, 504x504, and 648x648 """ if size is None: url = self.build_url(self._endpoints.get('photo')) else: url = self.build_url(self._endpoints.get('photo_size').format(size=size)) try: response = self.con.get(url) except HTTPError as e: log.debug(f'Error while retrieving the user profile photo. Error: {e}') return None if not response: return None return response.content
[docs] def update_profile_photo(self, photo): """ Updates this user profile photo :param bytes photo: the photo data in bytes """ url = self.build_url(self._endpoints.get('photo')) response = self.con.patch(url, data=photo, headers={'Content-type': 'image/jpeg'}) return bool(response)
[docs] class Directory(ApiComponent): _endpoints = { 'get_user': '/{email}' } user_constructor = User
[docs] def __init__(self, *, parent=None, con=None, **kwargs): """ Represents the Active Directory :param parent: parent object :type parent: Account :param Connection con: connection to use if no parent specified :param Protocol protocol: protocol to use if no parent specified (kwargs) :param str main_resource: use this resource instead of parent resource (kwargs) """ if parent and con: raise ValueError('Need a parent or a connection but not both') self.con = parent.con if parent else con # Choose the main_resource passed in kwargs over parent main_resource main_resource = kwargs.pop('main_resource', None) or ( getattr(parent, 'main_resource', None) if parent else None) super().__init__( protocol=parent.protocol if parent else kwargs.get('protocol'), main_resource=main_resource)
def __repr__(self): return 'Active Directory'
[docs] def get_users(self, limit=100, *, query=None, order_by=None, batch=None): """ Gets a list of users from the active directory When querying the Active Directory the Users endpoint will be used. Only a limited set of information will be available unless you have access to scope 'User.Read.All' which requires App Administration Consent. Also using endpoints has some limitations on the querying capabilities. To use query an order_by check the OData specification here: http://docs.oasis-open.org/odata/odata/v4.0/errata03/os/complete/ part2-url-conventions/odata-v4.0-errata03-os-part2-url-conventions -complete.html :param limit: max no. of contacts to get. Over 999 uses batch. :type limit: int or None :param query: applies a OData filter to the request :type query: Query or str :param order_by: orders the result set based on this condition :type order_by: Query or str :param int batch: batch size, retrieves items in batches allowing to retrieve more items than the limit. :return: list of users :rtype: list[User] or Pagination """ url = self.build_url('') # target the main_resource if limit is None or limit > self.protocol.max_top_value: batch = self.protocol.max_top_value params = {'$top': batch if batch else limit} if order_by: params['$orderby'] = order_by if query: if isinstance(query, str): params['$filter'] = query else: params.update(query.as_params()) response = self.con.get(url, params=params) if not response: return iter(()) data = response.json() # Everything received from cloud must be passed as self._cloud_data_key users = (self.user_constructor(parent=self, **{self._cloud_data_key: user}) for user in data.get('value', [])) next_link = data.get(NEXT_LINK_KEYWORD, None) if batch and next_link: return Pagination(parent=self, data=users, constructor=self.user_constructor, next_link=next_link, limit=limit) else: return users
def _get_user(self, url, query=None): """Helper method so DRY""" params = {} if query: if isinstance(query, str): params['$filter'] = query else: params.update(query.as_params()) response = self.con.get(url, params=params) if not response: return None data = response.json() # Everything received from cloud must be passed as self._cloud_data_key return self.user_constructor(parent=self, **{self._cloud_data_key: data})
[docs] def get_user(self, user, query=None): """ Returns a User by it's id or user principal name :param str user: the user id or user principal name :return: User for specified email :rtype: User """ url = self.build_url(self._endpoints.get('get_user').format(email=user)) return self._get_user(url, query=query)
[docs] def get_current_user(self, query=None): """ Returns the current logged-in user""" if self.main_resource != ME_RESOURCE: raise ValueError(f"Can't get the current user. The main resource must be set to '{ME_RESOURCE}'") url = self.build_url('') # target main_resource return self._get_user(url, query=query)
[docs] def get_user_manager(self, user, query=None): """ Returns a Users' manager by the users id, or user principal name :param str user: the user id or user principal name :return: User for specified email :rtype: User """ url = self.build_url(self._endpoints.get('get_user').format(email=user)) return self._get_user(url + '/manager', query=query)
[docs] def get_user_direct_reports(self, user, limit=100, *, query=None, order_by=None, batch=None): """ Gets a list of direct reports for the user provided from the active directory When querying the Active Directory the Users endpoint will be used. Also using endpoints has some limitations on the querying capabilities. To use query an order_by check the OData specification here: http://docs.oasis-open.org/odata/odata/v4.0/errata03/os/complete/ part2-url-conventions/odata-v4.0-errata03-os-part2-url-conventions -complete.html :param limit: max no. of contacts to get. Over 999 uses batch. :type limit: int or None :param query: applies a OData filter to the request :type query: Query or str :param order_by: orders the result set based on this condition :type order_by: Query or str :param int batch: batch size, retrieves items in batches allowing to retrieve more items than the limit. :return: list of users :rtype: list[User] or Pagination """ url = self.build_url(self._endpoints.get('get_user').format(email=user)) if limit is None or limit > self.protocol.max_top_value: batch = self.protocol.max_top_value params = {'$top': batch if batch else limit} if order_by: params['$orderby'] = order_by if query: if isinstance(query, str): params['$filter'] = query else: params.update(query.as_params()) response = self.con.get(url + '/directReports', params=params) if not response: return iter(()) data = response.json() # Everything received from cloud must be passed as self._cloud_data_key direct_reports = (self.user_constructor(parent=self, **{self._cloud_data_key: user}) for user in data.get('value', [])) next_link = data.get(NEXT_LINK_KEYWORD, None) if batch and next_link: return Pagination(parent=self, data=direct_reports, constructor=self.user_constructor, next_link=next_link, limit=limit) else: return direct_reports