Source code for arcresthelper.common

from __future__ import print_function
from __future__ import absolute_import


import os
import sys
import json

import random
import string
import datetime
import time

from .packages.six.moves import urllib_parse as urlparse
import gc
import operator
#----------------------------------------------------------------------
[docs]def trace(): """Determine information about where an error was thrown. Returns: tuple: line number, filename, error message """ import traceback, inspect tb = sys.exc_info()[2] tbinfo = traceback.format_tb(tb)[0] filename = inspect.getfile(inspect.currentframe()) # script name + line number line = tbinfo.split(", ")[1] # Get Python syntax error # synerror = traceback.format_exc().splitlines()[-1] return line, filename, synerror
[docs]class ArcRestHelperError(Exception): """Raised when error occurs in utility module functions.""" pass
[docs]def merge_dicts(dicts, op=operator.add): """Merge a list of dictionaries. Args: dicts (list): a list of dictionary objects op (operator): an operator item used to merge the dictionaries. Defaults to :py:func:`operator.add`. Returns: dict: the merged dictionary """ a = None for b in dicts: if a is None: a = b.copy() else: a = dict(a.items() + b.items() + [(k, op(a[k], b[k])) for k in set(b) & set(a)]) return a
##---------------------------------------------------------------------- #def merge_dicts(dicts): #''' #Given any number of dicts, shallow copy and merge into a new dict, #precedence goes to key value pairs in latter dicts. #''' ## result = {} #z = None #for dictionary in dicts: ##result.update(dictionary) #if z is None: #z = dictionary.copy() #else: #z.update(dictionary) #return z #def merge_two_dicts(x, y): #'''Given two dicts, merge them into a new dict as a shallow copy.''' #z = x.copy() #z.update(y) #return z #----------------------------------------------------------------------
[docs]def noneToValue(value, newValue): """Convert ``None`` to a different value. Args: value: The value to convert. This can be anything. newValue: The resultant value. This can be anything. Returns: newValue """ if value is None: return newValue else: return value
#----------------------------------------------------------------------
[docs]def getLayerIndex(url): """Extract the layer index from a url. Args: url (str): The url to parse. Returns: int: The layer index. Examples: >>> url = "http://services.arcgis.com/<random>/arcgis/rest/services/test/FeatureServer/12" >>> arcresthelper.common.getLayerIndex(url) 12 """ urlInfo = None urlSplit = None inx = None try: urlInfo = urlparse.urlparse(url) urlSplit = str(urlInfo.path).split('/') inx = urlSplit[len(urlSplit)-1] if is_number(inx): return int(inx) except: return 0 finally: urlInfo = None urlSplit = None del urlInfo del urlSplit gc.collect()
#----------------------------------------------------------------------
[docs]def getLayerName(url): """Extract the layer name from a url. Args: url (str): The url to parse. Returns: str: The layer name. Examples: >>> url = "http://services.arcgis.com/<random>/arcgis/rest/services/test/FeatureServer/12" >>> arcresthelper.common.getLayerIndex(url) 'test' """ urlInfo = None urlSplit = None try: urlInfo = urlparse.urlparse(url) urlSplit = str(urlInfo.path).split('/') name = urlSplit[len(urlSplit)-3] return name except: return url finally: urlInfo = None urlSplit = None del urlInfo del urlSplit gc.collect()
#----------------------------------------------------------------------
[docs]def getOrgId(url): """Extract the org ID from a url. Args: url (str): The url to parse. Returns: str: The org ID. Examples: >>> url = "http://services.arcgis.com/<random>/arcgis/rest/services/test/FeatureServer/12" >>> arcresthelper.common.getLayerIndex(url) '<random>' """ urlInfo = None urlSplit = None try: urlInfo = urlparse.urlparse(url) urlSplit = str(urlInfo.path).split('/') name = urlSplit[len(urlSplit)-7] return name except: return url finally: urlInfo = None urlSplit = None del urlInfo del urlSplit gc.collect()
#----------------------------------------------------------------------
[docs]def random_string_generator(size=6, chars=string.ascii_uppercase): """Generates a random string from a set of characters. Args: size (int): The length of the resultant string. Defaults to 6. chars (str): The characters to be used by :py:func:`random.choice`. Defaults to :py:const:`string.ascii_uppercase`. Returns: str: The randomly generated string. Examples: >>> arcresthelper.common.random_string_generator() 'DCNYWU' >>> arcresthelper.common.random_string_generator(12, "arcREST") 'cESaTTEacTES' """ try: return ''.join(random.choice(chars) for _ in range(size)) except: line, filename, synerror = trace() raise ArcRestHelperError({ "function": "random_string_generator", "line": line, "filename": filename, "synerror": synerror, } ) finally: pass
#----------------------------------------------------------------------
[docs]def random_int_generator(maxrange): """Generates a random integer from 0 to `maxrange`, inclusive. Args: maxrange (int): The upper range of integers to randomly choose. Returns: int: The randomly generated integer from :py:func:`random.randint`. Examples: >>> arcresthelper.common.random_int_generator(15) 9 """ try: return random.randint(0,maxrange) except: line, filename, synerror = trace() raise ArcRestHelperError({ "function": "random_int_generator", "line": line, "filename": filename, "synerror": synerror, } ) finally: pass
#----------------------------------------------------------------------
[docs]def local_time_to_online(dt=None): """Converts datetime object to a UTC timestamp for AGOL. Args: dt (datetime): The :py:class:`datetime.datetime` object to convert. Defaults to ``None``, i.e., :py:func:`datetime.datetime.now`. Returns: float: A UTC timestamp as understood by AGOL (time in ms since Unix epoch * 1000) Examples: >>> arcresthelper.common.local_time_to_online() # PST 1457167261000.0 >>> dt = datetime.datetime(1993, 3, 5, 12, 35, 15) # PST >>> arcresthelper.common.local_time_to_online(dt) 731392515000.0 See Also: :py:func:`online_time_to_string` for converting a UTC timestamp """ is_dst = None utc_offset = None try: if dt is None: dt = datetime.datetime.now() is_dst = time.daylight > 0 and time.localtime().tm_isdst > 0 utc_offset = (time.altzone if is_dst else time.timezone) return (time.mktime(dt.timetuple()) * 1000) + (utc_offset * 1000) except: line, filename, synerror = trace() raise ArcRestHelperError({ "function": "local_time_to_online", "line": line, "filename": filename, "synerror": synerror, } ) finally: is_dst = None utc_offset = None del is_dst del utc_offset
#----------------------------------------------------------------------
[docs]def online_time_to_string(value, timeFormat, utcOffset=0): """Converts AGOL timestamp to formatted string. Args: value (float): A UTC timestamp as reported by AGOL (time in ms since Unix epoch * 1000) timeFormat (str): Date/Time format string as parsed by :py:func:`datetime.strftime`. utcOffset (int): Hours difference from UTC and desired output. Default is 0 (remain in UTC). Returns: str: A string representation of the timestamp. Examples: >>> arcresthelper.common.online_time_to_string(1457167261000.0, "%Y-%m-%d %H:%M:%S") '2016-03-05 00:41:01' >>> arcresthelper.common.online_time_to_string(731392515000.0, '%m/%d/%Y %H:%M:%S', -8) # PST is UTC-8:00 '03/05/1993 12:35:15' See Also: :py:func:`local_time_to_online` for converting a :py:class:`datetime.datetime` object to AGOL timestamp """ try: return datetime.datetime.fromtimestamp(value/1000 + utcOffset*3600).strftime(timeFormat) except: line, filename, synerror = trace() raise ArcRestHelperError({ "function": "online_time_to_string", "line": line, "filename": filename, "synerror": synerror, } ) finally: pass
#----------------------------------------------------------------------
[docs]def is_number(s): """Determines if the input is numeric Args: s: The value to check. Returns: bool: ``True`` if the input is numeric, ``False`` otherwise. """ try: float(s) return True except ValueError: pass try: import unicodedata unicodedata.numeric(s) return True except (TypeError, ValueError): pass return False
#----------------------------------------------------------------------
[docs]def init_config_json(config_file): """Deserializes a JSON configuration file. Args: config_file (str): The path to the JSON file. Returns: dict: A dictionary object containing the JSON data. If ``config_file`` does not exist, returns ``None``. """ json_data = None try: if os.path.exists(config_file): #Load the config file with open(config_file) as json_file: json_data = json.load(json_file) return unicode_convert(json_data) else: return None except: line, filename, synerror = trace() raise ArcRestHelperError({ "function": "init_config_json", "line": line, "filename": filename, "synerror": synerror, } ) finally: json_data = None del json_data gc.collect()
#----------------------------------------------------------------------
[docs]def write_config_json(config_file, data): """Serializes an object to disk. Args: config_file (str): The path on disk to save the file. data (object): The object to serialize. """ outfile = None try: with open(config_file, 'w') as outfile: json.dump(data, outfile) except: line, filename, synerror = trace() raise ArcRestHelperError({ "function": "init_config_json", "line": line, "filename": filename, "synerror": synerror, } ) finally: outfile = None del outfile gc.collect()
#----------------------------------------------------------------------
[docs]def unicode_convert(obj): """Converts unicode objects to anscii. Args: obj (object): The object to convert. Returns: The object converted to anscii, if possible. For ``dict`` and ``list``, the object type is maintained. """ try: if isinstance(obj, dict): return {unicode_convert(key): unicode_convert(value) for key, value in obj.items()} elif isinstance(obj, list): return [unicode_convert(element) for element in obj] elif isinstance(obj, unicode): return obj.encode('utf-8') else: return obj except: return obj
[docs]def find_replace_string(obj, find, replace): """Performs a string.replace() on the input object. Args: obj (object): The object to find/replace. It will be cast to ``str``. find (str): The string to search for. replace (str): The string to replace with. Returns: str: The replaced string. """ try: strobj = str(obj) newStr = string.replace(strobj, find, replace) if newStr == strobj: return obj else: return newStr except: line, filename, synerror = trace() raise ArcRestHelperError({ "function": "find_replace_string", "line": line, "filename": filename, "synerror": synerror, } ) finally: pass
[docs]def find_replace(obj, find, replace): """ Searches an object and performs a find and replace. Args: obj (object): The object to iterate and find/replace. find (str): The string to search for. replace (str): The string to replace with. Returns: object: The object with replaced strings. """ try: if isinstance(obj, dict): return {find_replace(key,find,replace): find_replace(value,find,replace) for key, value in obj.items()} elif isinstance(obj, list): return [find_replace(element,find,replace) for element in obj] elif obj == find: return unicode_convert(replace) else: try: return unicode_convert(find_replace_string(obj, find, replace)) #obj = unicode_convert(json.loads(obj)) #return find_replace(obj,find,replace) except: return unicode_convert(obj) except: line, filename, synerror = trace() raise ArcRestHelperError({ "function": "find_replace", "line": line, "filename": filename, "synerror": synerror, } ) finally: pass
#----------------------------------------------------------------------
[docs]def chunklist(l, n): """Yield successive n-sized chunks from l. Args: l (object): The object to chunk. n (int): The size of the chunks. Yields: The next chunk in the object. Raises: TypeError: if ``l`` has no :py:func:`len`. Examples: >>> for c in arcresthelper.common.chunklist(list(range(20)), 6): ... print(c) [0, 1, 2, 3, 4, 5] [6, 7, 8, 9, 10, 11] [12, 13, 14, 15, 16, 17] [18, 19] >>> list(arcresthelper.common.chunklist(string.ascii_uppercase, 7)) ['ABCDEFG', 'HIJKLMN', 'OPQRSTU', 'VWXYZ'] """ n = max(1, n) for i in range(0, len(l), n): yield l[i:i+n]
#----------------------------------------------------------------------
[docs]def init_log(log_file): """ Creates log file on disk and "Tees" :py:class:`sys.stdout` to console and disk Args: log_file (str): The path on disk to append or create the log file. Returns: file: The opened log file. """ #Create the log file log = None try: log = open(log_file, 'a') #Change the output to both the windows and log file #original = sys.stdout sys.stdout = Tee(sys.stdout, log) except: pass return log
[docs]def close_log(log_file): """ Closes the open file and returns :py:class:`sys.stdout` to the default (i.e., console output). Args: log_file (file): The file object to close. """ sys.stdout = sys.__stdout__ if log_file is not None: log_file.close() del log_file
#----------------------------------------------------------------------
[docs]class Tee(object): """Combines standard output with a file for logging.""" def __init__(self, *files): self.files = files
[docs] def write(self, obj): for f in self.files: f.write(obj)