# This library is free software: you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation, either
# version 3 of the License, or (at your option) any later version.
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
# You should have received a copy of the GNU Lesser General Public
# License along with this library. If not, see <http://www.gnu.org/licenses/> or <http://www.gnu.org/licenses/lgpl.txt>.
from ..six.moves import urllib
from ..six.moves.http_client import HTTPConnection, HTTPSConnection
import socket
import re
from . import ntlm
[docs]class AbstractNtlmAuthHandler:
def __init__(self, password_mgr=None, debuglevel=0):
if password_mgr is None:
password_mgr = urllib.request.HTTPPasswordMgr()
self.passwd = password_mgr
self.add_password = self.passwd.add_password
self._debuglevel = debuglevel
[docs] def set_http_debuglevel(self, level):
self._debuglevel = level
[docs] def http_error_authentication_required(self, auth_header_field, req, fp, headers):
auth_header_value = headers.get(auth_header_field, None)
if auth_header_field:
if auth_header_value is not None and 'ntlm' in auth_header_value.lower():
fp.close()
return self.retry_using_http_NTLM_auth(req, auth_header_field, None, headers)
[docs] def retry_using_http_NTLM_auth(self, req, auth_header_field, realm, headers):
user, pw = self.passwd.find_user_password(realm, req.get_full_url())
if pw is not None:
user_parts = user.split('\\', 1)
if len(user_parts) == 1:
UserName = user_parts[0]
DomainName = ''
type1_flags = ntlm.NTLM_TYPE1_FLAGS & ~ntlm.NTLM_NegotiateOemDomainSupplied
else:
DomainName = user_parts[0].upper()
UserName = user_parts[1]
type1_flags = ntlm.NTLM_TYPE1_FLAGS
# ntlm secures a socket, so we must use the same socket for the complete handshake
headers = dict(req.headers)
headers.update(req.unredirected_hdrs)
auth = 'NTLM %s' % ntlm.create_NTLM_NEGOTIATE_MESSAGE(user, type1_flags)
if req.headers.get(self.auth_header, None) == auth:
return None
headers[self.auth_header] = auth
host = req.host
if not host:
raise urllib.request.URLError('no host given')
h = None
if req.get_full_url().startswith('https://'):
h = HTTPSConnection(host) # will parse host:port
else:
h = HTTPConnection(host) # will parse host:port
h.set_debuglevel(self._debuglevel)
# we must keep the connection because NTLM authenticates the connection, not single requests
headers["Connection"] = "Keep-Alive"
headers = dict((name.title(), val) for name, val in headers.items())
# For some reason, six doesn't do this translation correctly
# TODO rsanders low - find bug in six & fix it
try:
selector = req.selector
except AttributeError:
selector = req.get_selector()
h.request(req.get_method(), selector, req.data, headers)
r = h.getresponse()
r.begin()
r._safe_read(int(r.getheader('content-length')))
if r.getheader('set-cookie'):
# this is important for some web applications that store authentication-related info in cookies (it took a long time to figure out)
headers['Cookie'] = r.getheader('set-cookie')
r.fp = None # remove the reference to the socket, so that it can not be closed by the response object (we want to keep the socket open)
auth_header_value = r.getheader(auth_header_field, None)
# some Exchange servers send two WWW-Authenticate headers, one with the NTLM challenge
# and another with the 'Negotiate' keyword - make sure we operate on the right one
m = re.match('(NTLM [A-Za-z0-9+\-/=]+)', auth_header_value)
if m:
auth_header_value, = m.groups()
(ServerChallenge, NegotiateFlags) = ntlm.parse_NTLM_CHALLENGE_MESSAGE(auth_header_value[5:])
auth = 'NTLM %s' % ntlm.create_NTLM_AUTHENTICATE_MESSAGE(ServerChallenge, UserName, DomainName, pw,
NegotiateFlags)
headers[self.auth_header] = auth
headers["Connection"] = "Close"
headers = dict((name.title(), val) for name, val in headers.items())
try:
h.request(req.get_method(), selector, req.data, headers)
# none of the configured handlers are triggered, for example redirect-responses are not handled!
response = h.getresponse()
def notimplemented():
raise NotImplementedError
response.readline = notimplemented
infourl = urllib.response.addinfourl(response, response.msg, req.get_full_url())
infourl.code = response.status
infourl.msg = response.reason
return infourl
except socket.error as err:
raise urllib.URLError(err)
else:
return None
[docs]class HTTPNtlmAuthHandler(AbstractNtlmAuthHandler, urllib.request.BaseHandler):
auth_header = 'Authorization'
[docs] def http_error_401(self, req, fp, code, msg, headers):
return self.http_error_authentication_required('www-authenticate', req, fp, headers)
[docs]class ProxyNtlmAuthHandler(AbstractNtlmAuthHandler, urllib.request.BaseHandler):
"""
CAUTION: this class has NOT been tested at all!!!
use at your own risk
"""
auth_header = 'Proxy-authorization'
[docs] def http_error_407(self, req, fp, code, msg, headers):
return self.http_error_authentication_required('proxy-authenticate', req, fp, headers)