Source code for O365.subscriptions

import datetime as dt
from typing import Iterable, Mapping, Optional, Union

from .utils import ApiComponent, NEXT_LINK_KEYWORD, Pagination


[docs] class Subscriptions(ApiComponent): """Subscription operations for Microsoft Graph webhooks.""" _endpoints = { "subscriptions": "/subscriptions", }
[docs] def __init__(self, *, parent=None, con=None, **kwargs): if parent and con: raise ValueError("Need a parent or a connection but not both") self.con = parent.con if parent else con main_resource = kwargs.pop("main_resource", None) or ( getattr(parent, "main_resource", None) if parent else None ) super().__init__( protocol=parent.protocol if parent else kwargs.get("protocol"), main_resource=main_resource, )
def _build_subscription_url(self, subscription_id: Optional[str] = None) -> str: """Build the Microsoft Graph subscriptions endpoint.""" endpoint = self._endpoints.get("subscriptions") if endpoint is None: raise ValueError("Subscriptions endpoint is not configured.") base_url = self.protocol.service_url.rstrip("/") if subscription_id: return f"{base_url}{endpoint}/{subscription_id}" return f"{base_url}{endpoint}" @staticmethod def _format_subscription_expiration( expiration_datetime: Optional[dt.datetime] = None, expiration_minutes: Optional[int] = None, ) -> str: """Return an ISO 8601 UTC expiration string as required by Graph webhooks.""" if expiration_datetime and expiration_minutes is not None: raise ValueError( "Provide either expiration_datetime or expiration_minutes, not both." ) if expiration_datetime is None: minutes = expiration_minutes if expiration_minutes is not None else 60 if minutes <= 0: raise ValueError("expiration_minutes must be a positive integer.") expiration_datetime = dt.datetime.now(dt.timezone.utc) + dt.timedelta( minutes=minutes ) else: if expiration_datetime.tzinfo is None: expiration_datetime = expiration_datetime.replace(tzinfo=dt.timezone.utc) else: expiration_datetime = expiration_datetime.astimezone(dt.timezone.utc) return expiration_datetime.isoformat(timespec="microseconds").replace("+00:00", "Z") @staticmethod def _stringify_change_type(change_type: Union[str, Iterable[str]]) -> str: """Normalize changeType into the comma-separated string Graph expects.""" if isinstance(change_type, str): value = change_type.strip() else: try: parts = [str(part).strip() for part in change_type] except TypeError as exc: raise ValueError( "change_type must be a string or an iterable of strings." ) from exc value = ",".join(part for part in parts if part) if not value: raise ValueError("change_type must contain at least one value.") return value
[docs] def get_subscription( self, subscription_id: str, *, params: Optional[Mapping[str, object]] = None, **request_kwargs, ) -> Optional[dict]: """Retrieve a single webhook subscription by id.""" if not subscription_id: raise ValueError("subscription_id must be provided.") if params is not None and not isinstance(params, Mapping): raise ValueError("params must be a mapping if provided.") url = self._build_subscription_url(subscription_id) response = self.con.get(url, params=params, **request_kwargs) if not response: return None return response.json()
[docs] def create_subscription( self, notification_url: str, resource: Optional[str] = None, change_type: Union[str, Iterable[str]] = "created", *, expiration_datetime: Optional[dt.datetime] = None, expiration_minutes: Optional[int] = None, client_state: Optional[str] = None, include_resource_data: Optional[bool] = None, encryption_certificate: Optional[str] = None, encryption_certificate_id: Optional[str] = None, lifecycle_notification_url: Optional[str] = None, latest_supported_tls_version: Optional[str] = None, additional_data: Optional[Mapping[str, object]] = None, **request_kwargs, ) -> Optional[dict]: """Create a Microsoft Graph webhook subscription. See subscriptions usage documentation for webhook setup requirements. """ if not notification_url: raise ValueError("notification_url must be provided.") resource = resource or self.main_resource if not resource: raise ValueError("resource must be provided.") if not resource.startswith("/"): resource = f"/{resource}" expiration_value = self._format_subscription_expiration( expiration_datetime=expiration_datetime, expiration_minutes=expiration_minutes, ) change_type_value = self._stringify_change_type(change_type) payload = { self._cc("change_type"): change_type_value, self._cc("notification_url"): notification_url, self._cc("resource"): resource, self._cc("expiration_date_time"): expiration_value, } if client_state is not None: payload[self._cc("client_state")] = client_state if include_resource_data is not None: payload[self._cc("include_resource_data")] = include_resource_data if encryption_certificate is not None: payload[self._cc("encryption_certificate")] = encryption_certificate if encryption_certificate_id is not None: payload[self._cc("encryption_certificate_id")] = encryption_certificate_id if lifecycle_notification_url is not None: payload[self._cc("lifecycle_notification_url")] = lifecycle_notification_url if latest_supported_tls_version is not None: payload[ self._cc("latest_supported_tls_version") ] = latest_supported_tls_version if additional_data: if not isinstance(additional_data, Mapping): raise ValueError("additional_data must be a mapping if provided.") payload.update({str(key): value for key, value in additional_data.items()}) url = self._build_subscription_url() response = self.con.post(url, data=payload, **request_kwargs) if not response: return None return response.json()
[docs] def list_subscriptions( self, *, limit: Optional[int] = None, **request_kwargs, ) -> Union[Iterable[dict], Pagination]: """List webhook subscriptions visible to the current app/context.""" if limit is not None and limit <= 0: raise ValueError("limit must be a positive integer.") url = self._build_subscription_url() response = self.con.get(url, **request_kwargs) if not response: return iter(()) data = response.json() subscriptions = data.get("value", []) next_link = data.get(NEXT_LINK_KEYWORD) if next_link: return Pagination( parent=self, data=subscriptions, next_link=next_link, limit=limit, ) if limit is not None: return subscriptions[:limit] return subscriptions
[docs] def renew_subscription( self, subscription_id: str, *, expiration_datetime: Optional[dt.datetime] = None, expiration_minutes: Optional[int] = None, **request_kwargs, ) -> Optional[dict]: """Renew an existing webhook subscription.""" if not subscription_id: raise ValueError("subscription_id must be provided.") expiration_value = self._format_subscription_expiration( expiration_datetime=expiration_datetime, expiration_minutes=expiration_minutes, ) payload = { self._cc("expiration_date_time"): expiration_value, } url = self._build_subscription_url(subscription_id) response = self.con.patch(url, data=payload, **request_kwargs) if not response: return None return response.json()
[docs] def update_subscription( self, subscription_id: str, *, notification_url: Optional[str] = None, expiration_datetime: Optional[dt.datetime] = None, expiration_minutes: Optional[int] = None, **request_kwargs, ) -> Optional[dict]: """Update subscription fields (expiration and/or notification URL).""" if not subscription_id: raise ValueError("subscription_id must be provided.") payload = {} if expiration_datetime is not None or expiration_minutes is not None: payload[self._cc("expiration_date_time")] = self._format_subscription_expiration( expiration_datetime=expiration_datetime, expiration_minutes=expiration_minutes, ) if notification_url is not None: if not notification_url: raise ValueError("notification_url, if provided, cannot be empty.") payload[self._cc("notification_url")] = notification_url if not payload: raise ValueError("At least one of expiration or notification_url must be provided.") url = self._build_subscription_url(subscription_id) response = self.con.patch(url, data=payload, **request_kwargs) if not response: return None return response.json()
[docs] def delete_subscription( self, subscription_id: str, **request_kwargs, ) -> bool: """Delete an existing webhook subscription.""" if not subscription_id: raise ValueError("subscription_id must be provided.") url = self._build_subscription_url(subscription_id) response = self.con.delete(url, **request_kwargs) return bool(response)