import json
import logging
import os
import time
from oauthlib.oauth2 import TokenExpiredError, WebApplicationClient, BackendApplicationClient
from requests import Session
from requests.adapters import HTTPAdapter
from requests.exceptions import HTTPError, RequestException, ProxyError
from requests.exceptions import SSLError, Timeout, ConnectionError
# Dynamic loading of module Retry by requests.packages
# noinspection PyUnresolvedReferences
from requests.packages.urllib3.util.retry import Retry
from requests_oauthlib import OAuth2Session
from stringcase import pascalcase, camelcase, snakecase
from tzlocal import get_localzone
from pytz import UnknownTimeZoneError, UTC, timezone as get_timezone
from .utils import ME_RESOURCE, BaseTokenBackend, FileSystemTokenBackend, Token
log = logging.getLogger(__name__)
O365_API_VERSION = 'v2.0'
GRAPH_API_VERSION = 'v1.0'
OAUTH_REDIRECT_URL = 'https://login.microsoftonline.com/common/oauth2/nativeclient' # version <= 1.1.3. : 'https://outlook.office365.com/owa/'
RETRIES_STATUS_LIST = (
429, # Status code for TooManyRequests
500, 502, 503, 504 # Server errors
)
RETRIES_BACKOFF_FACTOR = 0.5
DEFAULT_SCOPES = {
# wrap any scope in a 1 element tuple to avoid prefixing
'basic': [('offline_access',), 'User.Read'],
'mailbox': ['Mail.Read'],
'mailbox_shared': ['Mail.Read.Shared'],
'message_send': ['Mail.Send'],
'message_send_shared': ['Mail.Send.Shared'],
'message_all': ['Mail.ReadWrite', 'Mail.Send'],
'message_all_shared': ['Mail.ReadWrite.Shared', 'Mail.Send.Shared'],
'address_book': ['Contacts.Read'],
'address_book_shared': ['Contacts.Read.Shared'],
'address_book_all': ['Contacts.ReadWrite'],
'address_book_all_shared': ['Contacts.ReadWrite.Shared'],
'calendar': ['Calendars.Read'],
'calendar_shared': ['Calendars.Read.Shared'],
'calendar_all': ['Calendars.ReadWrite'],
'calendar_shared_all': ['Calendars.ReadWrite.Shared'],
'users': ['User.ReadBasic.All'],
'onedrive': ['Files.Read.All'],
'onedrive_all': ['Files.ReadWrite.All'],
'sharepoint': ['Sites.Read.All'],
'sharepoint_dl': ['Sites.ReadWrite.All'],
'settings_all': ['MailboxSettings.ReadWrite'],
}
[docs]class Protocol:
""" Base class for all protocols """
# Override these in subclass
_protocol_url = 'not_defined' # Main url to request.
_oauth_scope_prefix = '' # Prefix for scopes
_oauth_scopes = {} # Dictionary of {scopes_name: [scope1, scope2]}
[docs] def __init__(self, *, protocol_url=None, api_version=None,
default_resource=None,
casing_function=None, protocol_scope_prefix=None,
timezone=None, **kwargs):
""" Create a new protocol object
:param str protocol_url: the base url used to communicate with the
server
:param str api_version: the api version
:param str default_resource: the default resource to use when there is
nothing explicitly specified during the requests
:param function casing_function: the casing transform function to be
used on api keywords (camelcase / pascalcase)
:param str protocol_scope_prefix: prefix url for scopes
:param pytz.UTC or str timezone: preferred timezone, defaults to the
system timezone
:raises ValueError: if protocol_url or api_version are not supplied
"""
if protocol_url is None or api_version is None:
raise ValueError(
'Must provide valid protocol_url and api_version values')
self.protocol_url = protocol_url or self._protocol_url
self.protocol_scope_prefix = protocol_scope_prefix or ''
self.api_version = api_version
self.service_url = '{}{}/'.format(protocol_url, api_version)
self.default_resource = default_resource or ME_RESOURCE
self.use_default_casing = True if casing_function is None else False
self.casing_function = casing_function or camelcase
if timezone and isinstance(timezone, str):
timezone = get_timezone(timezone)
try:
self.timezone = timezone or get_localzone() # pytz timezone
except UnknownTimeZoneError as e:
log.info('Timezone not provided and the local timezone could not be found. Default to UTC.')
self.timezone = UTC # pytz.timezone('UTC')
self.max_top_value = 500 # Max $top parameter value
# define any keyword that can be different in this protocol
# for example, attachments Odata type differs between Outlook
# rest api and graph: (graph = #microsoft.graph.fileAttachment and
# outlook = #Microsoft.OutlookServices.FileAttachment')
self.keyword_data_store = {}
[docs] def get_service_keyword(self, keyword):
""" Returns the data set to the key in the internal data-key dict
:param str keyword: key to get value for
:return: value of the keyword
"""
return self.keyword_data_store.get(keyword, None)
[docs] def convert_case(self, key):
""" Returns a key converted with this protocol casing method
Converts case to send/read from the cloud
When using Microsoft Graph API, the keywords of the API use
lowerCamelCase Casing
When using Office 365 API, the keywords of the API use PascalCase Casing
Default case in this API is lowerCamelCase
:param str key: a dictionary key to convert
:return: key after case conversion
:rtype: str
"""
return key if self.use_default_casing else self.casing_function(key)
[docs] @staticmethod
def to_api_case(key):
""" Converts key to snake_case
:param str key: key to convert into snake_case
:return: key after case conversion
:rtype: str
"""
return snakecase(key)
[docs] def get_scopes_for(self, user_provided_scopes):
""" Returns a list of scopes needed for each of the
scope_helpers provided, by adding the prefix to them if required
:param user_provided_scopes: a list of scopes or scope helpers
:type user_provided_scopes: list or tuple or str
:return: scopes with url prefix added
:rtype: list
:raises ValueError: if unexpected datatype of scopes are passed
"""
if user_provided_scopes is None:
# return all available scopes
user_provided_scopes = [app_part for app_part in self._oauth_scopes]
elif isinstance(user_provided_scopes, str):
user_provided_scopes = [user_provided_scopes]
if not isinstance(user_provided_scopes, (list, tuple)):
raise ValueError(
"'user_provided_scopes' must be a list or a tuple of strings")
scopes = set()
for app_part in user_provided_scopes:
for scope in self._oauth_scopes.get(app_part, [(app_part,)]):
scopes.add(self.prefix_scope(scope))
return list(scopes)
[docs] def prefix_scope(self, scope):
""" Inserts the protocol scope prefix if required"""
if self.protocol_scope_prefix:
if isinstance(scope, tuple):
return scope[0]
elif scope.startswith(self.protocol_scope_prefix):
return scope
else:
return '{}{}'.format(self.protocol_scope_prefix, scope)
else:
if isinstance(scope, tuple):
return scope[0]
else:
return scope
[docs]class MSGraphProtocol(Protocol):
""" A Microsoft Graph Protocol Implementation
https://docs.microsoft.com/en-us/outlook/rest/compare-graph-outlook
"""
_protocol_url = 'https://graph.microsoft.com/'
_oauth_scope_prefix = 'https://graph.microsoft.com/'
_oauth_scopes = DEFAULT_SCOPES
[docs] def __init__(self, api_version='v1.0', default_resource=None,
**kwargs):
""" Create a new Microsoft Graph protocol object
_protocol_url = 'https://graph.microsoft.com/'
_oauth_scope_prefix = 'https://graph.microsoft.com/'
:param str api_version: api version to use
:param str default_resource: the default resource to use when there is
nothing explicitly specified during the requests
"""
super().__init__(protocol_url=self._protocol_url,
api_version=api_version,
default_resource=default_resource,
casing_function=camelcase,
protocol_scope_prefix=self._oauth_scope_prefix,
**kwargs)
self.keyword_data_store['message_type'] = 'microsoft.graph.message'
self.keyword_data_store['event_message_type'] = 'microsoft.graph.eventMessage'
self.keyword_data_store[
'file_attachment_type'] = '#microsoft.graph.fileAttachment'
self.keyword_data_store[
'item_attachment_type'] = '#microsoft.graph.itemAttachment'
self.max_top_value = 999 # Max $top parameter value
[docs]class MSOffice365Protocol(Protocol):
""" A Microsoft Office 365 Protocol Implementation
https://docs.microsoft.com/en-us/outlook/rest/compare-graph-outlook
"""
_protocol_url = 'https://outlook.office.com/api/'
_oauth_scope_prefix = 'https://outlook.office.com/'
_oauth_scopes = DEFAULT_SCOPES
[docs] def __init__(self, api_version='v2.0', default_resource=None,
**kwargs):
""" Create a new Office 365 protocol object
_protocol_url = 'https://outlook.office.com/api/'
_oauth_scope_prefix = 'https://outlook.office.com/'
:param str api_version: api version to use
:param str default_resource: the default resource to use when there is
nothing explicitly specified during the requests
"""
super().__init__(protocol_url=self._protocol_url,
api_version=api_version,
default_resource=default_resource,
casing_function=pascalcase,
protocol_scope_prefix=self._oauth_scope_prefix,
**kwargs)
self.keyword_data_store[
'message_type'] = 'Microsoft.OutlookServices.Message'
self.keyword_data_store[
'event_message_type'] = 'Microsoft.OutlookServices.EventMessage'
self.keyword_data_store[
'file_attachment_type'] = '#Microsoft.OutlookServices.' \
'FileAttachment'
self.keyword_data_store[
'item_attachment_type'] = '#Microsoft.OutlookServices.' \
'ItemAttachment'
self.max_top_value = 999 # Max $top parameter value
[docs]class MSBusinessCentral365Protocol(Protocol):
""" A Microsoft Business Central Protocol Implementation
https://docs.microsoft.com/en-us/dynamics-nav/api-reference/v1.0/endpoints-apis-for-dynamics
"""
_protocol_url = 'https://api.businesscentral.dynamics.com/'
_oauth_scope_prefix = 'https://api.businesscentral.dynamics.com/'
_oauth_scopes = DEFAULT_SCOPES
_protocol_scope_prefix = 'https://api.businesscentral.dynamics.com/'
[docs] def __init__(self, api_version='v1.0', default_resource=None,environment=None,
**kwargs):
""" Create a new Microsoft Graph protocol object
_protocol_url = 'https://api.businesscentral.dynamics.com/'
_oauth_scope_prefix = 'https://api.businesscentral.dynamics.com/'
:param str api_version: api version to use
:param str default_resource: the default resource to use when there is
nothing explicitly specified during the requests
"""
if environment:
_version = "2.0"
_environment = "/"+environment
else:
_version = "1.0"
_environment = ''
self._protocol_url = "{}v{}{}/api/".format(self._protocol_url, _version, _environment)
super().__init__(protocol_url=self._protocol_url,
api_version=api_version,
default_resource=default_resource,
casing_function=camelcase,
protocol_scope_prefix=self._protocol_scope_prefix,
**kwargs)
self.keyword_data_store['message_type'] = 'microsoft.graph.message'
self.keyword_data_store['event_message_type'] = 'microsoft.graph.eventMessage'
self.keyword_data_store[
'file_attachment_type'] = '#microsoft.graph.fileAttachment'
self.keyword_data_store[
'item_attachment_type'] = '#microsoft.graph.itemAttachment'
self.max_top_value = 999 # Max $top parameter value
[docs]class Connection:
""" Handles all communication (requests) between the app and the server """
_allowed_methods = ['get', 'post', 'put', 'patch', 'delete']
[docs] def __init__(self, credentials, *, scopes=None,
proxy_server=None, proxy_port=8080, proxy_username=None,
proxy_password=None, requests_delay=200, raise_http_errors=True,
request_retries=3, token_backend=None,
tenant_id='common',
auth_flow_type='authorization',
timeout=None, json_encoder=None,
verify_ssl=True, **kwargs):
""" Creates an API connection object
:param tuple credentials: a tuple of (client_id, client_secret)
Generate client_id and client_secret in https://apps.dev.microsoft.com
:param list[str] scopes: list of scopes to request access to
:param str proxy_server: the proxy server
:param int proxy_port: the proxy port, defaults to 8080
:param str proxy_username: the proxy username
:param str proxy_password: the proxy password
:param int requests_delay: number of milliseconds to wait between api
calls.
The Api will respond with 429 Too many requests if more than
17 requests are made per second. Defaults to 200 milliseconds
just in case more than 1 connection is making requests
across multiple processes.
:param bool raise_http_errors: If True Http 4xx and 5xx status codes
will raise as exceptions
:param int request_retries: number of retries done when the server
responds with 5xx error codes.
:param BaseTokenBackend token_backend: the token backend used to get
and store tokens
:param str tenant_id: use this specific tenant id, defaults to common
:param str auth_flow_type: the auth method flow style used: Options:
- 'authorization': 2 step web style grant flow using an authentication url
- 'public': 2 step web style grant flow using an authentication url for public apps where
client secret cannot be secured
- 'credentials': also called client credentials grant flow using only the cliend id and secret
:param float or tuple timeout: How long to wait for the server to send
data before giving up, as a float, or a tuple (connect timeout, read timeout)
:param JSONEncoder json_encoder: The JSONEnocder to use during the JSON serialization on the request.
:param bool verify_ssl: set the verify flag on the requests library
:param dict kwargs: any extra params passed to Connection
:raises ValueError: if credentials is not tuple of
(client_id, client_secret)
"""
if auth_flow_type == 'public': # allow client id only for public flow
if not isinstance(credentials, tuple) or len(credentials) != 1 or (not credentials[0]):
raise ValueError('Provide client id only for public flow credentials')
else:
if not isinstance(credentials, tuple) or len(credentials) != 2 or (not credentials[0] and not credentials[1]):
raise ValueError('Provide valid auth credentials')
self._auth_flow_type = auth_flow_type # 'authorization' or 'credentials' or 'public'
if auth_flow_type == 'credentials' and tenant_id == 'common':
raise ValueError('When using the "credentials" auth_flow the "tenant_id" must be set')
self.tenant_id = tenant_id
self.auth = credentials
self.scopes = scopes
self.store_token = True
token_backend = token_backend or FileSystemTokenBackend()
if not isinstance(token_backend, BaseTokenBackend):
raise ValueError('"token_backend" must be an instance of a subclass of BaseTokenBackend')
self.token_backend = token_backend
self.session = None # requests Oauth2Session object
self.proxy = {}
self.set_proxy(proxy_server, proxy_port, proxy_username, proxy_password)
self.requests_delay = requests_delay or 0
self._previous_request_at = None # store previous request time
self.raise_http_errors = raise_http_errors
self.request_retries = request_retries
self.timeout = timeout
self.json_encoder = json_encoder
self.verify_ssl = verify_ssl
self.naive_session = None # lazy loaded: holds a requests Session object
self._oauth2_authorize_url = 'https://login.microsoftonline.com/' \
'{}/oauth2/v2.0/authorize'.format(tenant_id)
self._oauth2_token_url = 'https://login.microsoftonline.com/' \
'{}/oauth2/v2.0/token'.format(tenant_id)
self.oauth_redirect_url = 'https://login.microsoftonline.com/common/oauth2/nativeclient'
@property
def auth_flow_type(self):
return self._auth_flow_type
[docs] def set_proxy(self, proxy_server, proxy_port, proxy_username,
proxy_password):
""" Sets a proxy on the Session
:param str proxy_server: the proxy server
:param int proxy_port: the proxy port, defaults to 8080
:param str proxy_username: the proxy username
:param str proxy_password: the proxy password
"""
if proxy_server and proxy_port:
if proxy_username and proxy_password:
self.proxy = {
"http": "http://{}:{}@{}:{}".format(proxy_username,
proxy_password,
proxy_server,
proxy_port),
"https": "https://{}:{}@{}:{}".format(proxy_username,
proxy_password,
proxy_server,
proxy_port),
}
else:
self.proxy = {
"http": "http://{}:{}".format(proxy_server, proxy_port),
"https": "https://{}:{}".format(proxy_server, proxy_port),
}
[docs] def get_authorization_url(self, requested_scopes=None,
redirect_uri=None, **kwargs):
""" Initializes the oauth authorization flow, getting the
authorization url that the user must approve.
:param list[str] requested_scopes: list of scopes to request access for
:param str redirect_uri: redirect url configured in registered app
:param kwargs: allow to pass unused params in conjunction with Connection
:return: authorization url
:rtype: str
"""
redirect_uri = redirect_uri or self.oauth_redirect_url
scopes = requested_scopes or self.scopes
if not scopes:
raise ValueError('Must provide at least one scope')
self.session = oauth = self.get_session(redirect_uri=redirect_uri,
scopes=scopes)
# TODO: access_type='offline' has no effect according to documentation
# This is done through scope 'offline_access'.
auth_url, state = oauth.authorization_url(
url=self._oauth2_authorize_url, access_type='offline')
return auth_url, state
[docs] def request_token(self, authorization_url, *,
state=None,
redirect_uri=None,
requested_scopes=None,
store_token=True,
**kwargs):
""" Authenticates for the specified url and gets the token, save the
token for future based if requested
:param str or None authorization_url: url given by the authorization flow
:param str state: session-state identifier for web-flows
:param str redirect_uri: callback url for web-flows
:param lst requested_scopes: a list of scopes to be requested.
Only used when auth_flow_type is 'credentials'
:param bool store_token: whether or not to store the token,
so you don't have to keep opening the auth link and
authenticating every time
:param kwargs: allow to pass unused params in conjunction with Connection
:return: Success/Failure
:rtype: bool
"""
redirect_uri = redirect_uri or self.oauth_redirect_url
# Allow token scope to not match requested scope.
# (Other auth libraries allow this, but Requests-OAuthlib
# raises exception on scope mismatch by default.)
os.environ['OAUTHLIB_RELAX_TOKEN_SCOPE'] = '1'
os.environ['OAUTHLIB_IGNORE_SCOPE_CHANGE'] = '1'
scopes = requested_scopes or self.scopes
if self.session is None:
if self.auth_flow_type in ('authorization', 'public'):
self.session = self.get_session(state=state,
redirect_uri=redirect_uri)
elif self.auth_flow_type == 'credentials':
self.session = self.get_session(scopes=scopes)
else:
raise ValueError('"auth_flow_type" must be "authorization", "public" or "credentials"')
try:
if self.auth_flow_type == 'authorization':
self.token_backend.token = Token(self.session.fetch_token(
token_url=self._oauth2_token_url,
authorization_response=authorization_url,
include_client_id=True,
client_secret=self.auth[1]))
elif self.auth_flow_type == 'public':
self.token_backend.token = Token(self.session.fetch_token(
token_url=self._oauth2_token_url,
authorization_response=authorization_url,
include_client_id=True))
elif self.auth_flow_type == 'credentials':
self.token_backend.token = Token(self.session.fetch_token(
token_url=self._oauth2_token_url,
include_client_id=True,
client_secret=self.auth[1],
scope=scopes))
except Exception as e:
log.error('Unable to fetch auth token. Error: {}'.format(str(e)))
return False
if store_token:
self.token_backend.save_token()
return True
[docs] def get_session(self, *, state=None,
redirect_uri=None,
load_token=False,
scopes=None):
""" Create a requests Session object
:param str state: session-state identifier to rebuild OAuth session (CSRF protection)
:param str redirect_uri: callback URL specified in previous requests
:param list(str) scopes: list of scopes we require access to
:param bool load_token: load and ensure token is present
:return: A ready to use requests session, or a rebuilt in-flow session
:rtype: OAuth2Session
"""
redirect_uri = redirect_uri or self.oauth_redirect_url
client_id = self.auth[0]
if self.auth_flow_type in ('authorization', 'public'):
oauth_client = WebApplicationClient(client_id=client_id)
elif self.auth_flow_type == 'credentials':
oauth_client = BackendApplicationClient(client_id=client_id)
else:
raise ValueError('"auth_flow_type" must be "authorization", "credentials" or "public"')
requested_scopes = scopes or self.scopes
if load_token:
# gets a fresh token from the store
token = self.token_backend.get_token()
if token is None:
raise RuntimeError('No auth token found. Authentication Flow needed')
oauth_client.token = token
if self.auth_flow_type in ('authorization', 'public'):
requested_scopes = None # the scopes are already in the token (Not if type is backend)
session = OAuth2Session(client_id=client_id,
client=oauth_client,
token=token,
scope=requested_scopes)
else:
session = OAuth2Session(client_id=client_id,
client=oauth_client,
state=state,
redirect_uri=redirect_uri,
scope=requested_scopes)
session.verify = self.verify_ssl
session.proxies = self.proxy
if self.request_retries:
retry = Retry(total=self.request_retries, read=self.request_retries,
connect=self.request_retries,
backoff_factor=RETRIES_BACKOFF_FACTOR,
status_forcelist=RETRIES_STATUS_LIST)
adapter = HTTPAdapter(max_retries=retry)
session.mount('http://', adapter)
session.mount('https://', adapter)
return session
[docs] def get_naive_session(self):
""" Creates and returns a naive session """
naive_session = Session() # requests Session object
naive_session.proxies = self.proxy
naive_session.verify = self.verify_ssl
if self.request_retries:
retry = Retry(total=self.request_retries, read=self.request_retries,
connect=self.request_retries,
backoff_factor=RETRIES_BACKOFF_FACTOR,
status_forcelist=RETRIES_STATUS_LIST)
adapter = HTTPAdapter(max_retries=retry)
naive_session.mount('http://', adapter)
naive_session.mount('https://', adapter)
return naive_session
[docs] def refresh_token(self):
"""
Refresh the OAuth authorization token.
This will be called automatically when the access token
expires, however, you can manually call this method to
request a new refresh token.
:return bool: Success / Failure
"""
if self.session is None:
self.session = self.get_session(load_token=True)
token = self.token_backend.token
if not token:
raise RuntimeError('Token not found.')
if token.is_long_lived or self.auth_flow_type == 'credentials':
log.info('Refreshing token')
if self.auth_flow_type == 'authorization':
client_id, client_secret = self.auth
self.token_backend.token = Token(
self.session.refresh_token(
self._oauth2_token_url,
client_id=client_id,
client_secret=client_secret)
)
elif self.auth_flow_type == 'public':
client_id = self.auth[0]
self.token_backend.token = Token(
self.session.refresh_token(
self._oauth2_token_url,
client_id=client_id)
)
elif self.auth_flow_type == 'credentials':
if self.request_token(None, store_token=False) is False:
log.error('Refresh for Client Credentials Grant Flow failed.')
return False
log.info('New oauth token fetched by refresh method')
else:
log.error('You can not refresh an access token that has no "refreh_token" available.'
'Include "offline_access" scope when authenticating to get a "refresh_token"')
return False
if self.store_token:
self.token_backend.save_token()
return True
def _check_delay(self):
""" Checks if a delay is needed between requests and sleeps if True """
if self._previous_request_at:
dif = round(time.time() - self._previous_request_at,
2) * 1000 # difference in miliseconds
if dif < self.requests_delay:
sleep_for = (self.requests_delay - dif)
log.info('Sleeping for {} miliseconds'.format(sleep_for))
time.sleep(sleep_for / 1000) # sleep needs seconds
self._previous_request_at = time.time()
def _internal_request(self, request_obj, url, method, **kwargs):
""" Internal handling of requests. Handles Exceptions.
:param request_obj: a requests session.
:param str url: url to send request to
:param str method: type of request (get/put/post/patch/delete)
:param kwargs: extra params to send to the request api
:return: Response of the request
:rtype: requests.Response
"""
method = method.lower()
if method not in self._allowed_methods:
raise ValueError('Method must be one of the allowed ones')
if method == 'get':
kwargs.setdefault('allow_redirects', True)
elif method in ['post', 'put', 'patch']:
if 'headers' not in kwargs:
kwargs['headers'] = {}
if kwargs.get('headers') is not None and kwargs['headers'].get(
'Content-type') is None:
kwargs['headers']['Content-type'] = 'application/json'
if 'data' in kwargs and kwargs['data'] is not None and kwargs['headers'].get(
'Content-type') == 'application/json':
kwargs['data'] = json.dumps(kwargs['data'], cls=self.json_encoder) # convert to json
if self.timeout is not None:
kwargs['timeout'] = self.timeout
request_done = False
token_refreshed = False
while not request_done:
self._check_delay() # sleeps if needed
try:
log.info('Requesting ({}) URL: {}'.format(method.upper(), url))
log.info('Request parameters: {}'.format(kwargs))
# auto_retry will occur inside this function call if enabled
response = request_obj.request(method, url, **kwargs)
response.raise_for_status() # raise 4XX and 5XX error codes.
log.info('Received response ({}) from URL {}'.format(
response.status_code, response.url))
request_done = True
return response
except TokenExpiredError as e:
# Token has expired, try to refresh the token and try again on the next loop
log.info('Oauth Token is expired')
if self.token_backend.token.is_long_lived is False and self.auth_flow_type == 'authorization':
raise e
if token_refreshed:
# Refresh token done but still TokenExpiredError raise
raise RuntimeError('Token Refresh Operation not working')
should_rt = self.token_backend.should_refresh_token(self)
if should_rt is True:
# The backend has checked that we can refresh the token
if self.refresh_token() is False:
raise RuntimeError('Token Refresh Operation not working')
token_refreshed = True
elif should_rt is False:
# the token was refreshed by another instance and updated into
# this instance, so: update the session token and
# go back to the loop and try the request again.
request_obj.token = self.token_backend.token
else:
# the refresh was performed by the tokend backend.
token_refreshed = True
except (ConnectionError, ProxyError, SSLError, Timeout) as e:
# We couldn't connect to the target url, raise error
log.debug('Connection Error calling: {}.{}'
''.format(url, ('Using proxy: {}'.format(self.proxy)
if self.proxy else '')))
raise e # re-raise exception
except HTTPError as e:
# Server response with 4XX or 5XX error status codes
# try to extract the error message:
try:
error = response.json()
error_message = error.get('error', {}).get('message', '')
except ValueError:
error_message = ''
status_code = int(e.response.status_code / 100)
if status_code == 4:
# Client Error
# Logged as error. Could be a library error or Api changes
log.error('Client Error: {} | Error Message: {}'.format(str(e), error_message))
else:
# Server Error
log.debug('Server Error: {}'.format(str(e)))
if self.raise_http_errors:
if error_message:
raise HTTPError('{} | Error Message: {}'.format(e.args[0], error_message), response=response) from None
else:
raise e
else:
return e.response
except RequestException as e:
# catch any other exception raised by requests
log.debug('Request Exception: {}'.format(str(e)))
raise e
[docs] def naive_request(self, url, method, **kwargs):
""" Makes a request to url using an without oauth authorization
session, but through a normal session
:param str url: url to send request to
:param str method: type of request (get/put/post/patch/delete)
:param kwargs: extra params to send to the request api
:return: Response of the request
:rtype: requests.Response
"""
if self.naive_session is None:
# lazy creation of a naive session
self.naive_session = self.get_naive_session()
return self._internal_request(self.naive_session, url, method, **kwargs)
[docs] def oauth_request(self, url, method, **kwargs):
""" Makes a request to url using an oauth session
:param str url: url to send request to
:param str method: type of request (get/put/post/patch/delete)
:param kwargs: extra params to send to the request api
:return: Response of the request
:rtype: requests.Response
"""
# oauth authentication
if self.session is None:
self.session = self.get_session(load_token=True)
return self._internal_request(self.session, url, method, **kwargs)
[docs] def get(self, url, params=None, **kwargs):
""" Shorthand for self.oauth_request(url, 'get')
:param str url: url to send get oauth request to
:param dict params: request parameter to get the service data
:param kwargs: extra params to send to request api
:return: Response of the request
:rtype: requests.Response
"""
return self.oauth_request(url, 'get', params=params, **kwargs)
[docs] def post(self, url, data=None, **kwargs):
""" Shorthand for self.oauth_request(url, 'post')
:param str url: url to send post oauth request to
:param dict data: post data to update the service
:param kwargs: extra params to send to request api
:return: Response of the request
:rtype: requests.Response
"""
return self.oauth_request(url, 'post', data=data, **kwargs)
[docs] def put(self, url, data=None, **kwargs):
""" Shorthand for self.oauth_request(url, 'put')
:param str url: url to send put oauth request to
:param dict data: put data to update the service
:param kwargs: extra params to send to request api
:return: Response of the request
:rtype: requests.Response
"""
return self.oauth_request(url, 'put', data=data, **kwargs)
[docs] def patch(self, url, data=None, **kwargs):
""" Shorthand for self.oauth_request(url, 'patch')
:param str url: url to send patch oauth request to
:param dict data: patch data to update the service
:param kwargs: extra params to send to request api
:return: Response of the request
:rtype: requests.Response
"""
return self.oauth_request(url, 'patch', data=data, **kwargs)
[docs] def delete(self, url, **kwargs):
""" Shorthand for self.request(url, 'delete')
:param str url: url to send delete oauth request to
:param kwargs: extra params to send to request api
:return: Response of the request
:rtype: requests.Response
"""
return self.oauth_request(url, 'delete', **kwargs)
def __del__(self):
"""
Clear the session by closing it
This should be called manually by the user "del account.con"
There is no guarantee that this method will be called by the garbage collection
But this is not an issue because this connections will be automatically closed.
"""
if self.session:
self.session.close()
[docs]def oauth_authentication_flow(client_id, client_secret, scopes=None,
protocol=None, **kwargs):
""" A helper method to perform the OAuth2 authentication flow.
Authenticate and get the oauth token
:param str client_id: the client_id
:param str client_secret: the client_secret
:param list[str] scopes: a list of protocol user scopes to be converted
by the protocol or raw scopes
:param Protocol protocol: the protocol to be used.
Defaults to MSGraphProtocol
:param kwargs: other configuration to be passed to the Connection instance,
connection.get_authorization_url or connection.request_token
:return: Success or Failure
:rtype: bool
"""
credentials = (client_id, client_secret)
protocol = protocol or MSGraphProtocol()
con = Connection(credentials, scopes=protocol.get_scopes_for(scopes),
**kwargs)
consent_url, _ = con.get_authorization_url(**kwargs)
print('Visit the following url to give consent:')
print(consent_url)
token_url = input('Paste the authenticated url here:\n')
if token_url:
result = con.request_token(token_url, **kwargs) # no need to pass state as the session is the same
if result:
print('Authentication Flow Completed. Oauth Access Token Stored. '
'You can now use the API.')
else:
print('Something go wrong. Please try again.')
return bool(result)
else:
print('Authentication Flow aborted.')
return False