Source code for arcresthelper.securityhandlerhelper

from __future__ import print_function
from __future__ import absolute_import


from arcrest import security
import arcrest
from arcrest.manageags import AGSAdministration
from arcrest.manageorg import Administration
from .packages import six
from .packages.six.moves.urllib.error import HTTPError
import os
from . import common
import copy

########################################################################
[docs]def lineno(): """Returns the current line number in our program.""" return inspect.currentframe().f_back.f_lineno
#----------------------------------------------------------------------
[docs]def trace(): """Determines information about where an error was thrown. Returns: tuple: line number, filename, error message Examples: >>> try: ... 1/0 ... except: ... print("Error on '{}'\\nin file '{}'\\nwith error '{}'".format(*trace())) ... Error on 'line 1234' in file 'C:\\foo\\baz.py' with error 'ZeroDivisionError: integer division or modulo by zero' """ import traceback, inspect, sys tb = sys.exc_info()[2] tbinfo = traceback.format_tb(tb)[0] filename = inspect.getfile(inspect.currentframe()) # script name + line number line = tbinfo.split(", ")[1] # Get Python syntax error # synerror = traceback.format_exc().splitlines()[-1] return line, filename, synerror
[docs]class securityhandlerhelper(object): """A number of different security handlers are suppoted by ArcGIS; this function simplifies the creation of the handlers. Args: securityinfo (dict): A ``dict`` with the type of handler and information to be created. +-----------------+-------------------------------+-------------------------+ | Key | Description | Required? | +-----------------+-------------------------------+-------------------------+ | security_type | security type used to connect | See Values * | +-----------------+-------------------------------+-------------------------+ | username | username for log in | ``LDAP|NTLM`` ** | +-----------------+-------------------------------+-------------------------+ | password | password for log in | ``LDAP|NTLM`` ** | +-----------------+-------------------------------+-------------------------+ | proxy_url | url for proxy server | Optional | +-----------------+-------------------------------+-------------------------+ | proxy_port | port for proxy server | Optional | +-----------------+-------------------------------+-------------------------+ | token_url | url for token server | Optional | +-----------------+-------------------------------+-------------------------+ | referer_url | url for referer | Optional | +-----------------+-------------------------------+-------------------------+ | certificatefile | | ``PKI`` | +-----------------+-------------------------------+-------------------------+ | keyfile | | ``PKI`` | +-----------------+-------------------------------+-------------------------+ | client_id | | ``OAuth`` | +-----------------+-------------------------------+-------------------------+ | secret_id | | ``OAuth`` | +-----------------+-------------------------------+-------------------------+ | \* Values: ``ArcGIS|Portal|LDAP|NTLM|PKI|OAuth`` (defaults to ``Portal``) | +-----------------+-------------------------------+-------------------------+ | \*\* Also required for ``ArcGIS|Portal`` unless using anonymous logins | +-----------------+-------------------------------+-------------------------+ Examples: >>> securityinfo = {'username' : 'secret', 'password' : 'really_secret'} >>> shh = arcresthelper.securityhandlerhelper.securityhandlerhelper(securityinfo) >>> print(shh._org_url) http://myorg.maps.arcgis.com """ _org_url = None _username = None _password = None _proxy_url = None _proxy_port = None _token_url = None _securityHandler = None _security_type = None _featureServiceFieldCase = None _keyfile = None _certificatefile = None _referer_url = None _client_id = None _secret_id = None _is_portal = False _supported_types = ['LDAP', 'NTLM', 'OAuth', 'Portal', 'PKI', "ArcGIS"] _valid = None _message = None #---------------------------------------------------------------------- def __init__(self, securityinfo): """Constructor""" try: if not securityinfo is None: if isinstance(securityinfo,securityhandlerhelper): self._securityHandler = securityinfo.securityhandler self._username = securityinfo._username self._password = securityinfo._password self._proxy_url = securityinfo._proxy_url self._proxy_port = securityinfo._proxy_port self._token_url = securityinfo._token_url self._security_type = securityinfo._security_type self._featureServiceFieldCase = securityinfo._featureServiceFieldCase self._keyfile = securityinfo._keyfile self._certificatefile = securityinfo._certificatefile self._referer_url = securityinfo._referer_url self._client_id = securityinfo._client_id self._secret_id = securityinfo._secret_id self._is_portal = securityinfo._is_portal self._message = securityinfo._message self._valid = securityinfo._valid #self._securityHandler = securityinfo return else: pass if isinstance(securityinfo,str) and os.path.isfile(securityinfo): securityinfo = common.init_config_json(config_file=securityinfo) if 'Credentials' in securityinfo: securityinfo = securityinfo['Credentials'] if 'security_type' in securityinfo: self._security_type = securityinfo['security_type'] else: self._security_type = 'Portal' if not any(self._security_type in s for s in self._supported_types): self._message = 'Security type not supported: ' + self._security_type self._valid = False return if 'proxy_url' in securityinfo: self._proxy_url = securityinfo['proxy_url'] if 'proxy_port' in securityinfo: self._proxy_port = securityinfo['proxy_port'] if 'referer_url' in securityinfo: self._referer_url = securityinfo['referer_url'] if 'token_url' in securityinfo and securityinfo['token_url'] is not None: self._token_url = securityinfo['token_url'] if not self._token_url.startswith('http://') and \ not self._token_url.startswith('https://'): self._token_url = 'https://' + self._token_url if 'org_url' in securityinfo and securityinfo['org_url'] is not None: self._org_url = securityinfo['org_url'] if not self._org_url.startswith('http://') and not self._org_url.startswith('https://'): self._org_url = 'http://' + self._org_url if 'username' in securityinfo: self._username = securityinfo['username'] if 'password' in securityinfo: self._password = securityinfo['password'] if 'certificatefile' in securityinfo: self._certificatefile = securityinfo['certificatefile'] if 'keyfile' in securityinfo: self._keyfile = securityinfo['keyfile'] if 'client_id' in securityinfo: self._client_id = securityinfo['client_id'] if 'secret_id' in securityinfo: self._secret_id = securityinfo['secret_id'] if str(self._security_type).upper() == 'ArcGIS'.upper(): self._securityHandler = security.ArcGISTokenSecurityHandler(proxy_url=self._proxy_url, proxy_port=self._proxy_port) self._org_url = self._securityHandler.org_url self._username = self._securityHandler.username self._valid = True self._message = "ArcGIS security handler created" elif str(self._security_type).upper() == 'Portal'.upper() or \ str(self._security_type).upper() == 'AGOL'.upper(): if self._org_url is None or self._org_url == '': self._org_url = 'http://www.arcgis.com' if self._username is None or self._username == '' or \ self._password is None or self._password == '': self._message = "No username or password, no security handler generated" self._valid = True else: if self._org_url is None or '.arcgis.com' in self._org_url: self._securityHandler = security.AGOLTokenSecurityHandler(username=self._username, password=self._password, org_url=self._org_url, token_url=self._token_url, proxy_url=self._proxy_url, proxy_port=self._proxy_port) self._org_url = self._securityHandler.org_url self._message = "ArcGIS Online security handler created" else: self._securityHandler = security.PortalTokenSecurityHandler(username=self._username, password=self._password, org_url=self._org_url, proxy_url=self._proxy_url, proxy_port=self._proxy_port) self._message = "Portal security handler created" elif str(self._security_type).upper() == 'NTLM'.upper(): if self._username is None or self._username == '' or \ self._password is None or self._password == '': self._message = "Username and password required for NTLM" self._valid = False else: self._securityHandler = security.NTLMSecurityHandler(username=self._username, password=self._password, org_url=self._org_url, proxy_url=self._proxy_url, proxy_port=self._proxy_port, referer_url=self._referer_url) self._message = "NTLM security handler created" elif str(self._security_type).upper() == 'LDAP'.upper(): if self._username is None or self._username == '' or \ self._password is None or self._password == '': self._message = "Username and password required for LDAP" self._valid = False else: self._securityHandler = security.LDAPSecurityHandler(username=self._username, password=self._password, org_url=self._org_url, proxy_url=self._proxy_url, proxy_port=self._proxy_port, referer_url=self._referer_url) self._message = "LDAP security handler created" elif str(self._security_type).upper() == 'PKI'.upper(): if self._keyfile is None or self._keyfile == '' or \ self._certificatefile is None or self._certificatefile == '': self._message = "Key file and certification file required for PKI" self._valid = False else: self._securityHandler = security.PKISecurityHandler(keyfile = self._keyfile, certificatefile = self._certificatefile, org_url=self._org_url, proxy_url=self._proxy_url, proxy_port=self._proxy_port, referer_url=self._referer_url) self._message = "PKI security handler created" elif str(securityinfo['security_type']).upper() == 'OAUTH'.upper(): if self._secret_id is None or self._secret_id == '' or \ self._client_id is None or self._client_id == '': self._message = "client_id and secret_id required for OAUTH" self._valid = False else: self._securityHandler = security.OAuthSecurityHandler(client_id=self._client_id, secret_id = self._secret_id, org_url=self._org_url, proxy_url=self._proxy_url, proxy_port=self._proxy_port) self._message = "OAuth security handler created" else: print ("No valid security type set") self._message = "No valid security type set" if self._securityHandler is not None: admin = Administration(url=self._org_url, securityHandler=self._securityHandler) try: portal = admin.portals.portalSelf if portal.featureServers is not None: for hostingServer in portal.featureServers: if hostingServer is not None: if isinstance(hostingServer, AGSAdministration): try: serData = hostingServer.data dataItems = serData.rootDataItems if dataItems is not None: if 'rootItems' in dataItems: for rootItem in dataItems['rootItems']: if rootItem == '/enterpriseDatabases': rootItems = serData.findDataItems(ancestorPath=rootItem,type='fgdb,egdb') if not rootItems is None and 'items' in rootItems: for item in rootItems['items']: if 'info' in item: if 'isManaged' in item['info'] and item['info']['isManaged'] == True: conStrDic = {} conStr = item['info']['connectionString'].split(";") for conStrValue in conStr: spltval = conStrValue.split("=") conStrDic[spltval[0]] = spltval[1] if 'DBCLIENT' in conStrDic: if str(conStrDic['DBCLIENT']).upper() == 'postgresql'.upper(): self._featureServiceFieldCase = 'lower' except HTTPError as err: if err.code == 403: print ("Admistrative access denied, unable to check if hosting servers") else: print (err) except Exception as e: print (e) except HTTPError as err: if err.code == 403: print ("Admistrative access denied, unable to check if hosting servers") else: print (err) except Exception as e: print (e) if 'error' in self._securityHandler.message: self._message = self._securityHandler.message self._valid = False else: if self._securityHandler.message is not None: self._message = self._securityHandler.message self._valid = True else: self._message = 'Security info not set' self._valid = True except ValueError as e: raise e except Exception as e: line, filename, synerror = trace() raise common.ArcRestHelperError({ "function": "securityhandlerhelper_init", "line": line, "filename": filename, "synerror": synerror, }) #----------------------------------------------------------------------
[docs] def dispose(self): """Disposes the :py:class:`securityhandlerhelper` object.""" self._username = None self._password = None self._org_url = None self._proxy_url = None self._proxy_port = None self._token_url = None self._securityHandler = None self._valid = None self._message = None del self._username del self._password del self._org_url del self._proxy_url del self._proxy_port del self._token_url del self._securityHandler del self._valid del self._message
#---------------------------------------------------------------------- @property def message(self): """ Returns: str: Any messages generated by the object. """ return self._message #---------------------------------------------------------------------- @message.setter def message(self,message): """ Returns: str: Any messages generated by the object. """ self._message = message #---------------------------------------------------------------------- @property def valid(self): """ Returns: bool: ``True`` if the object is valid, ``False`` otherwise. """ return self._valid #---------------------------------------------------------------------- @valid.setter def valid(self,valid): """ Returns: bool: ``True`` if the object is valid, ``False`` otherwise. """ self._valid = valid #---------------------------------------------------------------------- @property def securityhandler(self): """ Returns: securityhandlerhelper: The security handler object. """ return self._securityHandler
#@property #def is_portal(self): #return self._is_portal