Source code for O365.utils.attachment

import base64
import logging
from pathlib import Path
from io import BytesIO

from .utils import ApiComponent

log = logging.getLogger(__name__)


[docs]class AttachableMixin:
[docs] def __init__(self, attachment_name_property=None, attachment_type=None): """ Defines the functionality for an object to be attachable. Any object that inherits from this class will be attachable (if the underlying api allows that) """ self.__attachment_name = None self.__attachment_name_property = attachment_name_property self.__attachment_type = self._gk(attachment_type)
@property def attachment_name(self): """ Name of the attachment :getter: get attachment name :setter: set new name for the attachment :type: str """ if self.__attachment_name is not None: return self.__attachment_name if self.__attachment_name_property: return getattr(self, self.__attachment_name_property, '') else: # property order resolution: # 1) try property 'subject' # 2) try property 'name' try: attachment_name = getattr(self, 'subject') except AttributeError: attachment_name = getattr(self, 'name', '') return attachment_name @attachment_name.setter def attachment_name(self, value): self.__attachment_name = value @property def attachment_type(self): """ Type of attachment :rtype: str """ return self.__attachment_type
[docs] def to_api_data(self): """ Returns a dict to communicate with the server :rtype: dict """ raise NotImplementedError()
[docs]class BaseAttachment(ApiComponent): """ BaseAttachment class is the base object for dealing with attachments """ _endpoints = {'attach': '/messages/{id}/attachments'}
[docs] def __init__(self, attachment=None, *, parent=None, **kwargs): """ Creates a new attachment, optionally from existing cloud data :param attachment: attachment data (dict = cloud data, other = user data) :type attachment: dict or str or Path or list[str] or AttachableMixin :param BaseAttachments parent: the parent Attachments :param Protocol protocol: protocol to use if no parent specified (kwargs) :param str main_resource: use this resource instead of parent resource (kwargs) """ kwargs.setdefault('protocol', getattr(parent, 'protocol', None)) kwargs.setdefault('main_resource', getattr(parent, 'main_resource', None)) super().__init__(**kwargs) self.name = None self.attachment_type = 'file' self.attachment_id = None self.content_id = None self.is_inline = False self.attachment = None self.content = None self.on_disk = False self.on_cloud = kwargs.get('on_cloud', False) self.size = None if attachment: if isinstance(attachment, dict): if self._cloud_data_key in attachment: # data from the cloud attachment = attachment.get(self._cloud_data_key) self.attachment_id = attachment.get(self._cc('id'), None) self.content_id = attachment.get(self._cc('contentId'), None) self.is_inline = attachment.get(self._cc('IsInline'), False) self.name = attachment.get(self._cc('name'), None) self.content = attachment.get(self._cc('contentBytes'), None) self.attachment_type = 'item' if 'item' in attachment.get( '@odata.type', '').lower() else 'file' self.on_disk = False self.size = attachment.get(self._cc('size'), None) else: file_path = attachment.get('path', attachment.get('name')) if file_path is None: raise ValueError('Must provide a valid "path" or ' '"name" for the attachment') self.content = attachment.get('content') self.on_disk = attachment.get('on_disk') self.attachment_id = attachment.get('attachment_id') self.attachment = Path(file_path) if self.on_disk else None self.name = (self.attachment.name if self.on_disk else attachment.get('name')) self.size = self.attachment.stat().st_size if self.attachment else None elif isinstance(attachment, str): self.attachment = Path(attachment) self.name = self.attachment.name elif isinstance(attachment, Path): self.attachment = attachment self.name = self.attachment.name elif isinstance(attachment, (tuple, list)): # files with custom names or Inmemory objects file_obj, custom_name = attachment if isinstance(file_obj, BytesIO): # in memory objects self.content = base64.b64encode(file_obj.getvalue()).decode('utf-8') else: self.attachment = Path(file_obj) self.name = custom_name elif isinstance(attachment, AttachableMixin): # Object that can be attached (Message for example) self.attachment_type = 'item' self.attachment = attachment self.name = attachment.attachment_name self.content = attachment.to_api_data() self.content['@odata.type'] = attachment.attachment_type if self.content is None and self.attachment and self.attachment.exists(): with self.attachment.open('rb') as file: self.content = base64.b64encode(file.read()).decode('utf-8') self.on_disk = True self.size = self.attachment.stat().st_size
def __len__(self): """ Returns the size of this attachment """ return self.size def __eq__(self, other): return self.attachment_id == other.attachment_id
[docs] def to_api_data(self): """ Returns a dict to communicate with the server :rtype: dict """ data = {'@odata.type': self._gk( '{}_attachment_type'.format(self.attachment_type)), self._cc('name'): self.name} if self.is_inline: data[self._cc('isInline')] = self.is_inline if self.attachment_type == 'file': data[self._cc('contentBytes')] = self.content if self.content_id is not None: data[self._cc('contentId')] = self.content_id else: data[self._cc('item')] = self.content return data
[docs] def save(self, location=None, custom_name=None): """ Save the attachment locally to disk :param str location: path string to where the file is to be saved. :param str custom_name: a custom name to be saved as :return: Success / Failure :rtype: bool """ if not self.content: return False location = Path(location or '') if not location.exists(): log.debug('the location provided does not exist') return False name = custom_name or self.name name = name.replace('/', '-').replace('\\', '') try: path = location / name with path.open('wb') as file: file.write(base64.b64decode(self.content)) self.attachment = path self.on_disk = True self.size = self.attachment.stat().st_size log.debug('file saved locally.') except Exception as e: log.error('file failed to be saved: %s', str(e)) return False return True
[docs] def attach(self, api_object, on_cloud=False): """ Attach this attachment to an existing api_object. This BaseAttachment object must be an orphan BaseAttachment created for the sole purpose of attach it to something and therefore run this method. :param api_object: object to attach to :param on_cloud: if the attachment is on cloud or not :return: Success / Failure :rtype: bool """ if self.on_cloud: # item is already saved on the cloud. return True # api_object must exist and if implements attachments # then we can attach to it. if api_object and getattr(api_object, 'attachments', None): if on_cloud: if not api_object.object_id: raise RuntimeError( 'A valid object id is needed in order to attach a file') # api_object builds its own url using its # resource and main configuration url = api_object.build_url(self._endpoints.get('attach').format( id=api_object.object_id)) response = api_object.con.post(url, data=self.to_api_data()) return bool(response) else: if self.attachment_type == 'file': api_object.attachments.add([{ 'attachment_id': self.attachment_id, # TODO: copy attachment id? or set to None? 'path': str( self.attachment) if self.attachment else None, 'name': self.name, 'content': self.content, 'on_disk': self.on_disk }]) else: raise RuntimeError('Only file attachments can be attached')
def __str__(self): return self.__repr__() def __repr__(self): return 'Attachment: {}'.format(self.name)
[docs]class BaseAttachments(ApiComponent): """ A Collection of BaseAttachments """ _endpoints = { 'attachments': '/messages/{id}/attachments', 'attachment': '/messages/{id}/attachments/{ida}' } _attachment_constructor = BaseAttachment
[docs] def __init__(self, parent, attachments=None): """ Attachments must be a list of path strings or dictionary elements :param Account parent: parent object :param attachments: list of attachments :type attachments: list[str] or list[Path] or str or Path or dict """ super().__init__(protocol=parent.protocol, main_resource=parent.main_resource) self._parent = parent self.__attachments = [] # holds on_cloud attachments removed from the parent object self.__removed_attachments = [] self.untrack = True if attachments: self.add(attachments) self.untrack = False
def __iter__(self): return iter(self.__attachments) def __getitem__(self, key): return self.__attachments[key] def __contains__(self, item): return item in {attachment.name for attachment in self.__attachments} def __len__(self): return len(self.__attachments) def __str__(self): attachments = len(self.__attachments) parent_has_attachments = getattr(self._parent, 'has_attachments', False) if parent_has_attachments and attachments == 0: return 'Number of Attachments: unknown' else: return 'Number of Attachments: {}'.format(attachments) def __repr__(self): return self.__str__() def __bool__(self): return bool(len(self.__attachments))
[docs] def to_api_data(self): """ Returns a dict to communicate with the server :rtype: dict """ return [attachment.to_api_data() for attachment in self.__attachments if attachment.on_cloud is False]
[docs] def clear(self): """ Clear the attachments """ for attachment in self.__attachments: if attachment.on_cloud: self.__removed_attachments.append(attachment) self.__attachments = [] self._update_parent_attachments() self._track_changes()
def _track_changes(self): """ Update the track_changes on the parent to reflect a needed update on this field """ if getattr(self._parent, '_track_changes', None) is not None and self.untrack is False: # noinspection PyProtectedMember self._parent._track_changes.add('attachments') def _update_parent_attachments(self): """ Tries to update the parent property 'has_attachments' """ try: self._parent.has_attachments = bool(len(self.__attachments)) except AttributeError: pass
[docs] def add(self, attachments): """ Add more attachments :param attachments: list of attachments :type attachments: list[str] or list[Path] or str or Path or dict """ if attachments: if isinstance(attachments, (str, Path)): attachments = [attachments] if isinstance(attachments, (list, tuple, set)): # User provided attachments attachments_temp = [ self._attachment_constructor(attachment, parent=self) for attachment in attachments] elif isinstance(attachments, dict) and self._cloud_data_key in attachments: # Cloud downloaded attachments. We pass on_cloud=True # to track if this attachment is saved on the server attachments_temp = [self._attachment_constructor( {self._cloud_data_key: attachment}, parent=self, on_cloud=True) for attachment in attachments.get(self._cloud_data_key, [])] else: raise ValueError('Attachments must be a str or Path or a ' 'list, tuple or set of the former') self.__attachments.extend(attachments_temp) self._update_parent_attachments() self._track_changes()
[docs] def remove(self, attachments): """ Remove the specified attachments :param attachments: list of attachments :type attachments: list[str] or list[Path] or str or Path or dict """ if isinstance(attachments, (list, tuple)): attachments = ({attachment.name if isinstance(attachment, BaseAttachment) else attachment for attachment in attachments}) elif isinstance(attachments, str): attachments = {attachments} elif isinstance(attachments, BaseAttachment): attachments = {attachments.name} else: raise ValueError('Incorrect parameter type for attachments') new_attachments = [] for attachment in self.__attachments: if attachment.name not in attachments: new_attachments.append(attachment) else: if attachment.on_cloud: # add to removed_attachments so later we can delete them self.__removed_attachments.append( attachment) self.__attachments = new_attachments self._update_parent_attachments() self._track_changes()
[docs] def download_attachments(self): """ Downloads this message attachments into memory. Need a call to 'attachment.save' to save them on disk. :return: Success / Failure :rtype: bool """ if not self._parent.has_attachments: log.debug( 'Parent {} has no attachments, skipping out early.'.format( self._parent.__class__.__name__)) return False if not self._parent.object_id: raise RuntimeError( 'Attempted to download attachments of an unsaved {}'.format( self._parent.__class__.__name__)) url = self.build_url(self._endpoints.get('attachments').format( id=self._parent.object_id)) response = self._parent.con.get(url) if not response: return False attachments = response.json().get('value', []) # Everything received from cloud must be passed as self._cloud_data_key self.untrack = True self.add({self._cloud_data_key: attachments}) self.untrack = False # TODO: when it's a item attachment the attachment itself # is not downloaded. We must download it... # TODO: idea: retrieve the attachments ids' only with # select and then download one by one. return True
def _update_attachments_to_cloud(self): """ Push new, unsaved attachments to the cloud and remove removed attachments. This method should not be called for non draft messages. """ url = self.build_url(self._endpoints.get('attachments').format( id=self._parent.object_id)) # ! potentially several api requests can be made by this method. for attachment in self.__attachments: if attachment.on_cloud is False: # upload attachment: response = self._parent.con.post(url, data=attachment.to_api_data()) if not response: return False data = response.json() # update attachment data attachment.attachment_id = data.get('id') attachment.content = data.get(self._cc('contentBytes'), None) attachment.on_cloud = True for attachment in self.__removed_attachments: if attachment.on_cloud and attachment.attachment_id is not None: # delete attachment url = self.build_url(self._endpoints.get('attachment').format( id=self._parent.object_id, ida=attachment.attachment_id)) response = self._parent.con.delete(url) if not response: return False self.__removed_attachments = [] # reset the removed attachments log.debug('Successfully updated attachments on {}'.format( self._parent.object_id)) return True