"""Methods for accessing MS Tasks/Todos via the MS Graph api."""
import datetime as dt
import logging
# noinspection PyPep8Naming
from bs4 import BeautifulSoup as bs
from dateutil.parser import parse
from .utils import ApiComponent, TrackerSet
log = logging.getLogger(__name__)
CONST_FOLDER = "folder"
CONST_GET_FOLDER = "get_folder"
CONST_GET_TASK = "get_task"
CONST_GET_TASKS = "get_tasks"
CONST_ROOT_FOLDERS = "root_folders"
CONST_TASK = "task"
CONST_TASK_FOLDER = "task_folder"
[docs]
class Task(ApiComponent):
"""A Microsoft To-Do task."""
_endpoints = {
CONST_TASK: "/todo/lists/{folder_id}/tasks/{id}",
CONST_TASK_FOLDER: "/todo/lists/{folder_id}/tasks",
}
[docs]
def __init__(self, *, parent=None, con=None, **kwargs):
"""Representation of a Microsoft To-Do task.
:param parent: parent object
:type parent: Folder
:param Connection con: connection to use if no parent specified
:param Protocol protocol: protocol to use if no parent specified
(kwargs)
:param str main_resource: use this resource instead of parent resource
(kwargs)
:param str folder_id: id of the calender to add this task in
(kwargs)
:param str subject: subject of the task (kwargs)
"""
if parent and con:
raise ValueError("Need a parent or a connection but not both")
self.con = parent.con if parent else con
cloud_data = kwargs.get(self._cloud_data_key, {})
self.task_id = cloud_data.get("id")
# Choose the main_resource passed in kwargs over parent main_resource
main_resource = kwargs.pop("main_resource", None) or (
getattr(parent, "main_resource", None) if parent else None
)
super().__init__(
protocol=parent.protocol if parent else kwargs.get("protocol"),
main_resource=main_resource,
)
cc = self._cc # pylint: disable=invalid-name
# internal to know which properties need to be updated on the server
self._track_changes = TrackerSet(casing=cc)
self.folder_id = kwargs.get("folder_id")
cloud_data = kwargs.get(self._cloud_data_key, {})
self.task_id = cloud_data.get(cc("id"), None)
self.__subject = cloud_data.get(cc("title"), kwargs.get("subject", "") or "")
body = cloud_data.get(cc("body"), {})
self.__body = body.get(cc("content"), "")
self.body_type = body.get(
cc("contentType"), "html"
) # default to HTML for new messages
self.__created = cloud_data.get(cc("createdDateTime"), None)
self.__modified = cloud_data.get(cc("lastModifiedDateTime"), None)
self.__status = cloud_data.get(cc("status"), None)
self.__is_completed = self.__status == "completed"
self.__importance = cloud_data.get(cc("importance"), None)
local_tz = self.protocol.timezone
self.__created = (
parse(self.__created).astimezone(local_tz) if self.__created else None
)
self.__modified = (
parse(self.__modified).astimezone(local_tz) if self.__modified else None
)
due_obj = cloud_data.get(cc("dueDateTime"), {})
self.__due = self._parse_date_time_time_zone(due_obj)
reminder_obj = cloud_data.get(cc("reminderDateTime"), {})
self.__reminder = self._parse_date_time_time_zone(reminder_obj)
self.__is_reminder_on = cloud_data.get(cc("isReminderOn"), False)
completed_obj = cloud_data.get(cc("completedDateTime"), {})
self.__completed = self._parse_date_time_time_zone(completed_obj)
def __str__(self):
"""Representation of the Task via the Graph api as a string."""
return self.__repr__()
def __repr__(self):
"""Representation of the Task via the Graph api."""
marker = "x" if self.__is_completed else "o"
if self.__due:
due_str = f"(due: {self.__due.date()} at {self.__due.time()}) "
else:
due_str = ""
if self.__completed:
compl_str = (
f"(completed: {self.__completed.date()} at {self.__completed.time()}) "
)
else:
compl_str = ""
return f"Task: ({marker}) {self.__subject} {due_str} {compl_str}"
def __eq__(self, other):
"""Comparison of tasks."""
return self.task_id == other.task_id
[docs]
def to_api_data(self, restrict_keys=None):
"""Return a dict to communicate with the server.
:param restrict_keys: a set of keys to restrict the returned data to
:rtype: dict
"""
cc = self._cc # pylint: disable=invalid-name
data = {
cc("title"): self.__subject,
cc("status"): "completed" if self.__is_completed else "notStarted",
}
if self.__body:
data[cc("body")] = {
cc("contentType"): self.body_type,
cc("content"): self.__body,
}
else:
data[cc("body")] = None
if self.__due:
data[cc("dueDateTime")] = self._build_date_time_time_zone(self.__due)
else:
data[cc("dueDateTime")] = None
if self.__reminder:
data[cc("reminderDateTime")] = self._build_date_time_time_zone(
self.__reminder
)
else:
data[cc("reminderDateTime")] = None
if self.__completed:
data[cc("completedDateTime")] = self._build_date_time_time_zone(
self.__completed
)
if restrict_keys:
for key in list(data.keys()):
if key not in restrict_keys:
del data[key]
return data
@property
def created(self):
"""Return Created time of the task.
:rtype: datetime
"""
return self.__created
@property
def modified(self):
"""Return Last modified time of the task.
:rtype: datetime
"""
return self.__modified
@property
def body(self):
"""Return Body of the task.
:getter: Get body text
:setter: Set body of task
:type: str
"""
return self.__body
@body.setter
def body(self, value):
self.__body = value
self._track_changes.add(self._cc("body"))
@property
def importance(self):
"""Return Task importance.
:getter: Get importance level (Low, Normal, High)
:type: str
"""
return self.__importance
@property
def is_starred(self):
"""Is the task starred (high importance).
:getter: Check if importance is high
:type: bool
"""
return self.__importance.casefold() == "high".casefold()
@property
def subject(self):
"""Subject of the task.
:getter: Get subject
:setter: Set subject of task
:type: str
"""
return self.__subject
@subject.setter
def subject(self, value):
self.__subject = value
self._track_changes.add(self._cc("title"))
@property
def due(self):
"""Due Time of task.
:getter: get the due time
:setter: set the due time
:type: datetime
"""
return self.__due
@due.setter
def due(self, value):
if value:
if not isinstance(value, dt.date):
raise ValueError("'due' must be a valid datetime object")
if not isinstance(value, dt.datetime):
# force datetime
value = dt.datetime(value.year, value.month, value.day)
if value.tzinfo is None:
# localize datetime
value = value.replace(tzinfo=self.protocol.timezone)
elif value.tzinfo != self.protocol.timezone:
value = value.astimezone(self.protocol.timezone)
self.__due = value
self._track_changes.add(self._cc("dueDateTime"))
@property
def reminder(self):
"""Reminder Time of task.
:getter: get the reminder time
:setter: set the reminder time
:type: datetime
"""
return self.__reminder
@reminder.setter
def reminder(self, value):
if value:
if not isinstance(value, dt.date):
raise ValueError("'reminder' must be a valid datetime object")
if not isinstance(value, dt.datetime):
# force datetime
value = dt.datetime(value.year, value.month, value.day)
if value.tzinfo is None:
# localize datetime
value = value.replace(tzinfo=self.protocol.timezone)
elif value.tzinfo != self.protocol.timezone:
value = value.astimezone(self.protocol.timezone)
self.__reminder = value
self._track_changes.add(self._cc("reminderDateTime"))
@property
def is_reminder_on(self):
"""Return isReminderOn of the task.
:getter: Get isReminderOn
:type: bool
"""
return self.__is_reminder_on
@property
def status(self):
"""Status of task
:getter: get status
:type: string
"""
return self.__status
@property
def completed(self):
"""Completed Time of task.
:getter: get the completed time
:setter: set the completed time
:type: datetime
"""
return self.__completed
@completed.setter
def completed(self, value):
if value is None:
self.mark_uncompleted()
else:
if not isinstance(value, dt.date):
raise ValueError("'completed' must be a valid datetime object")
if not isinstance(value, dt.datetime):
# force datetime
value = dt.datetime(value.year, value.month, value.day)
if value.tzinfo is None:
# localize datetime
value = value.replace(tzinfo=self.protocol.timezone)
elif value.tzinfo != self.protocol.timezone:
value = value.astimezone(self.protocol.timezone)
self.mark_completed()
self.__completed = value
self._track_changes.add(self._cc("completedDateTime"))
@property
def is_completed(self):
"""Is task completed or not.
:getter: Is completed
:setter: set the task to completted
:type: bool
"""
return self.__is_completed
[docs]
def mark_completed(self):
"""Mark the ask as completed."""
self.__is_completed = True
self._track_changes.add(self._cc("status"))
[docs]
def mark_uncompleted(self):
"""Mark the task as uncompleted."""
self.__is_completed = False
self._track_changes.add(self._cc("status"))
[docs]
def delete(self):
"""Delete a stored task.
:return: Success / Failure
:rtype: bool
"""
if self.task_id is None:
raise RuntimeError("Attempting to delete an unsaved task")
url = self.build_url(
self._endpoints.get(CONST_TASK).format(
folder_id=self.folder_id, id=self.task_id
)
)
response = self.con.delete(url)
return bool(response)
[docs]
def save(self):
"""Create a new task or update an existing one.
Does update by checking what values have changed and update them on the server
:return: Success / Failure
:rtype: bool
"""
if self.task_id:
# update task
if not self._track_changes:
return True # there's nothing to update
url = self.build_url(
self._endpoints.get(CONST_TASK).format(
folder_id=self.folder_id, id=self.task_id
)
)
method = self.con.patch
data = self.to_api_data(restrict_keys=self._track_changes)
else:
# new task
url = self.build_url(
self._endpoints.get(CONST_TASK_FOLDER).format(folder_id=self.folder_id)
)
method = self.con.post
data = self.to_api_data()
response = method(url, data=data)
if not response:
return False
self._track_changes.clear() # clear the tracked changes
if not self.task_id:
# new task
task = response.json()
self.task_id = task.get(self._cc("id"), None)
self.__created = task.get(self._cc("createdDateTime"), None)
self.__modified = task.get(self._cc("lastModifiedDateTime"), None)
self.__completed = task.get(self._cc("completed"), None)
self.__created = (
parse(self.__created).astimezone(self.protocol.timezone)
if self.__created
else None
)
self.__modified = (
parse(self.__modified).astimezone(self.protocol.timezone)
if self.__modified
else None
)
self.__is_completed = task.get(self._cc("status"), None) == "completed"
else:
self.__modified = dt.datetime.now().replace(tzinfo=self.protocol.timezone)
return True
[docs]
def get_body_text(self):
"""Parse the body html and returns the body text using bs4.
:return: body text
:rtype: str
"""
if self.body_type != "html":
return self.body
try:
soup = bs(self.body, "html.parser")
except RuntimeError:
return self.body
else:
return soup.body.text
[docs]
def get_body_soup(self):
"""Return the beautifulsoup4 of the html body.
:return: Html body
:rtype: BeautifulSoup
"""
return bs(self.body, "html.parser") if self.body_type == "html" else None
[docs]
class Folder(ApiComponent):
"""A Microsoft To-Do folder."""
_endpoints = {
CONST_FOLDER: "/todo/lists/{id}",
CONST_GET_TASKS: "/todo/lists/{id}/tasks",
CONST_GET_TASK: "/todo/lists/{id}/tasks/{ide}",
}
task_constructor = Task
[docs]
def __init__(self, *, parent=None, con=None, **kwargs):
"""Representation of a Microsoft To-Do Folder.
:param parent: parent object
:type parent: ToDo
:param Connection con: connection to use if no parent specified
:param Protocol protocol: protocol to use if no parent specified
(kwargs)
:param str main_resource: use this resource instead of parent resource
(kwargs)
"""
if parent and con:
raise ValueError("Need a parent or a connection but not both")
self.con = parent.con if parent else con
# Choose the main_resource passed in kwargs over parent main_resource
main_resource = kwargs.pop("main_resource", None) or (
getattr(parent, "main_resource", None) if parent else None
)
super().__init__(
protocol=parent.protocol if parent else kwargs.get("protocol"),
main_resource=main_resource,
)
cloud_data = kwargs.get(self._cloud_data_key, {})
self.name = cloud_data.get(self._cc("displayName"), "")
self.folder_id = cloud_data.get(self._cc("id"), None)
self.is_default = False
if cloud_data.get(self._cc("wellknownListName"), "") == "defaultList":
self.is_default = True
def __str__(self):
"""Representation of the Folder via the Graph api as a string."""
return self.__repr__()
def __repr__(self):
"""Representation of the folder via the Graph api."""
suffix = " (default)" if self.is_default else ""
return f"Folder: {self.name}{suffix}"
def __eq__(self, other):
"""Comparison of folders."""
return self.folder_id == other.folder_id
[docs]
def update(self):
"""Update this folder. Only name can be changed.
:return: Success / Failure
:rtype: bool
"""
if not self.folder_id:
return False
url = self.build_url(
self._endpoints.get(CONST_FOLDER).format(id=self.folder_id)
)
data = {
self._cc("displayName"): self.name,
}
response = self.con.patch(url, data=data)
return bool(response)
[docs]
def delete(self):
"""Delete this folder.
:return: Success / Failure
:rtype: bool
"""
if not self.folder_id:
return False
url = self.build_url(
self._endpoints.get(CONST_FOLDER).format(id=self.folder_id)
)
response = self.con.delete(url)
if not response:
return False
self.folder_id = None
return True
[docs]
def get_tasks(self, query=None, batch=None, order_by=None):
"""Return list of tasks of a specified folder.
:param query: the query string or object to query tasks
:param batch: the batch on to retrieve tasks.
:param order_by: the order clause to apply to returned tasks.
:rtype: tasks
"""
url = self.build_url(
self._endpoints.get(CONST_GET_TASKS).format(id=self.folder_id)
)
# get tasks by the folder id
params = {}
if batch:
params["$top"] = batch
if order_by:
params["$orderby"] = order_by
if query:
if isinstance(query, str):
params["$filter"] = query
else:
params |= query.as_params()
response = self.con.get(url, params=params)
if not response:
return iter(())
data = response.json()
return (
self.task_constructor(parent=self, **{self._cloud_data_key: task})
for task in data.get("value", [])
)
[docs]
def new_task(self, subject=None):
"""Create a task within a specified folder."""
return self.task_constructor(
parent=self, subject=subject, folder_id=self.folder_id
)
[docs]
def get_task(self, param):
"""Return a Task instance by it's id.
:param param: an task_id or a Query instance
:return: task for the specified info
:rtype: Event
"""
if param is None:
return None
if isinstance(param, str):
url = self.build_url(
self._endpoints.get(CONST_GET_TASK).format(id=self.folder_id, ide=param)
)
params = None
by_id = True
else:
url = self.build_url(
self._endpoints.get(CONST_GET_TASKS).format(id=self.folder_id)
)
params = {"$top": 1}
params |= param.as_params()
by_id = False
response = self.con.get(url, params=params)
if not response:
return None
if by_id:
task = response.json()
else:
task = response.json().get("value", [])
if task:
task = task[0]
else:
return None
return self.task_constructor(parent=self, **{self._cloud_data_key: task})
[docs]
class ToDo(ApiComponent):
"""A of Microsoft To-Do class for MS Graph API.
In order to use the API following permissions are required.
Delegated (work or school account) - Tasks.Read, Tasks.ReadWrite
"""
_endpoints = {
CONST_ROOT_FOLDERS: "/todo/lists",
CONST_GET_FOLDER: "/todo/lists/{id}",
}
folder_constructor = Folder
task_constructor = Task
[docs]
def __init__(self, *, parent=None, con=None, **kwargs):
"""Initialise the ToDo object.
:param parent: parent object
:type parent: Account
:param Connection con: connection to use if no parent specified
:param Protocol protocol: protocol to use if no parent specified
(kwargs)
:param str main_resource: use this resource instead of parent resource
(kwargs)
"""
if parent and con:
raise ValueError("Need a parent or a connection but not both")
self.con = parent.con if parent else con
# Choose the main_resource passed in kwargs over parent main_resource
main_resource = kwargs.pop("main_resource", None) or (
getattr(parent, "main_resource", None) if parent else None
)
super().__init__(
protocol=parent.protocol if parent else kwargs.get("protocol"),
main_resource=main_resource,
)
def __str__(self):
"""Representation of the ToDo via the Graph api as a string."""
return self.__repr__()
def __repr__(self):
"""Representation of the ToDo via the Graph api as."""
return "Microsoft To-Do"
[docs]
def list_folders(self, query=None, limit=None):
"""Return a list of folders.
To use query an order_by check the OData specification here:
https://docs.oasis-open.org/odata/odata/v4.0/errata03/os/complete/
part2-url-conventions/odata-v4.0-errata03-os-part2-url-conventions
-complete.html
:param query: the query string or object to list folders
:param int limit: max no. of folders to get. Over 999 uses batch.
:rtype: list[Folder]
"""
url = self.build_url(self._endpoints.get(CONST_ROOT_FOLDERS))
params = {}
if limit:
params["$top"] = limit
if query:
if isinstance(query, str):
params["$filter"] = query
else:
params |= query.as_params()
response = self.con.get(url, params=params or None)
if not response:
return []
data = response.json()
return [
self.folder_constructor(parent=self, **{self._cloud_data_key: x})
for x in data.get("value", [])
]
[docs]
def new_folder(self, folder_name):
"""Create a new folder.
:param str folder_name: name of the new folder
:return: a new Calendar instance
:rtype: Calendar
"""
if not folder_name:
return None
url = self.build_url(self._endpoints.get(CONST_ROOT_FOLDERS))
response = self.con.post(url, data={self._cc("displayName"): folder_name})
if not response:
return None
data = response.json()
# Everything received from cloud must be passed as self._cloud_data_key
return self.folder_constructor(parent=self, **{self._cloud_data_key: data})
[docs]
def get_folder(self, folder_id=None, folder_name=None):
"""Return a folder by it's id or name.
:param str folder_id: the folder id to be retrieved.
:param str folder_name: the folder name to be retrieved.
:return: folder for the given info
:rtype: Calendar
"""
if folder_id and folder_name:
raise RuntimeError("Provide only one of the options")
if not folder_id and not folder_name:
raise RuntimeError("Provide one of the options")
if folder_id:
url = self.build_url(
self._endpoints.get(CONST_GET_FOLDER).format(id=folder_id)
)
response = self.con.get(url)
return (
self.folder_constructor(
parent=self, **{self._cloud_data_key: response.json()}
)
if response
else None
)
query = self.new_query("displayName").equals(folder_name)
folders = self.list_folders(query=query)
return folders[0]
[docs]
def get_default_folder(self):
"""Return the default folder for the current user.
:rtype: Folder
"""
folders = self.list_folders()
for folder in folders:
if folder.is_default:
return folder
[docs]
def get_tasks(self, batch=None, order_by=None):
"""Get tasks from the default Calendar.
:param order_by: orders the result set based on this condition
:param int batch: batch size, retrieves items in
batches allowing to retrieve more items than the limit.
:return: list of items in this folder
:rtype: list[Event] or Pagination
"""
default_folder = self.get_default_folder()
return default_folder.get_tasks(order_by=order_by, batch=batch)
[docs]
def new_task(self, subject=None):
"""Return a new (unsaved) Event object in the default folder.
:param str subject: subject text for the new task
:return: new task
:rtype: Event
"""
default_folder = self.get_default_folder()
return default_folder.new_task(subject=subject)