diff options
author | Mike Gabriel <mike.gabriel@das-netzwerkteam.de> | 2016-09-14 14:10:54 +0200 |
---|---|---|
committer | Mike Gabriel <mike.gabriel@das-netzwerkteam.de> | 2016-09-14 14:12:30 +0200 |
commit | ca3bc5e49f2965ed365aa17cf8c8adb55fdaa828 (patch) | |
tree | f25471db93024725929ddda0165066307f16e33e /tccalib | |
parent | 254c1b80cda0e79083c3107288dd5a17e156a1ad (diff) | |
download | remote-logon-config-agent-ca3bc5e49f2965ed365aa17cf8c8adb55fdaa828.tar.gz remote-logon-config-agent-ca3bc5e49f2965ed365aa17cf8c8adb55fdaa828.tar.bz2 remote-logon-config-agent-ca3bc5e49f2965ed365aa17cf8c8adb55fdaa828.zip |
Rename module tccalib to rscalib.
Diffstat (limited to 'tccalib')
-rw-r--r-- | tccalib/__init__.py | 224 | ||||
-rw-r--r-- | tccalib/tests/__init__.py | 239 |
2 files changed, 0 insertions, 463 deletions
diff --git a/tccalib/__init__.py b/tccalib/__init__.py deleted file mode 100644 index 9c61844..0000000 --- a/tccalib/__init__.py +++ /dev/null @@ -1,224 +0,0 @@ -# Copyright 2012 Canonical Ltd. This software is licensed under the GNU -# General Public License version 3 (see the file LICENSE). - -__metaclass__ = type - - -import errno -from http.client import parse_headers -import socket -from io import BytesIO, StringIO -from urllib.parse import quote -import urllib.request, urllib.error -import base64 - -import pycurl - - -class UserError(Exception): - """An error message that should be presented to the user.""" - - def __init__(self, msg=None): - if msg is None: - msg = self.__doc__ - super(UserError, self).__init__(msg) - - -class Unauthorized(UserError): - """Invalid username or password""" - - status = 2 - - -class CouldNotConnect(UserError): - """Could not connect""" - - status = 3 - - -class CertificateVerificationFailed(UserError): - """Certificate verification failed""" - - status = 4 - - -class URLLibGetter: - """Get data from URLs using URLib.""" - - @staticmethod - def get_response_body(request, verify_ssl): - """Return the body of the response to the supplied request. - - :param request: A urllib2.Request - :param verify_ssl: Unused - :raises CouldNotConnect: if there is a connection error. - """ - try: - return urllib.request.urlopen(request).read() - except urllib.error.URLError as e: - if not isinstance(e.args[0], socket.error): - raise - if e.args[0].errno == errno.ECONNREFUSED: - raise CouldNotConnect - raise - - -class PycURLGetter: - """Get data from URLs using PycURL.""" - - def __init__(self, _curl=None): - if _curl is None: - _curl = pycurl.Curl() - self.curl = _curl - self.result = BytesIO() - self.response_header = BytesIO() - self.curl.setopt(pycurl.HEADERFUNCTION, self.response_header.write) - self.curl.setopt(pycurl.WRITEFUNCTION, self.result.write) - self.verify_ssl = True - - def prepare_curl(self, request): - """Prepare the curl object for the supplied request. - - :param request: a urllib2.Request instance. - """ - self.curl.setopt(pycurl.URL, request.get_full_url()) - request_headers = ['%s: %s' % item for - item in list(request.headers.items())] - self.curl.setopt(pycurl.HTTPHEADER, request_headers) - self.curl.setopt(pycurl.SSL_VERIFYPEER, self.verify_ssl) - - @classmethod - def get_response_body(cls, request, verify_ssl=True, _curl=None): - """Return the body of the response to the supplied request. - - :param request: A urllib2.Request instance. - :param verify_ssl: If true, verify SSL certificates. - :param _curl: The pycurl.Curl object to use (for testing). - :raises CouldNotConnect: if there is a connection error. - :raises CertificateVerificationFailed: if the SSL certificate could - not be verified. - :raises HTTPError: if the response status is not 200. - """ - instance = cls(_curl) - instance.verify_ssl = verify_ssl - instance.prepare_curl(request) - return instance.handle_response() - - def handle_response(self): - """Perform the curl operation and handle the response. - - :return: The body of the response on success. - :raises CouldNotConnect: if there is a connection error. - :raises CertificateVerificationFailed: if the SSL certificate could - not be verified. - :raises HTTPError: if the response status is not 200. - """ - try: - self.curl.perform() - except pycurl.error as e: - if e.args[0] in (pycurl.E_COULDNT_CONNECT, - pycurl.E_COULDNT_RESOLVE_HOST): - raise CouldNotConnect - elif e.args[0] == pycurl.E_SSL_CACERT: - raise CertificateVerificationFailed - else: - raise - status = self.curl.getinfo(pycurl.HTTP_CODE) - if status == 200: - return self.result.getvalue().decode('utf-8') - else: - lines = self.response_header.getvalue().decode('utf-8').splitlines(True) - header_ = ''.join(lines[1:]) - headers = parse_headers(BytesIO(header_.encode('ascii'))) - raise urllib.error.HTTPError( - self.curl.getinfo(pycurl.EFFECTIVE_URL), status, - self.result.getvalue(), headers, None) - - -class GetMazaData: - """Base class for retrieving data from MAZA server.""" - - @classmethod - def run(cls, username, password, server_root=None, verify_ssl=True): - """Return the requested data. - - :param username: The username of the user. - :param password: The user's password. - :param server_root: The root URL to make queries to. - :param verify_ssl: If true, verify SSL certificates. - """ - return cls(username, password, server_root).get_data(verify_ssl) - - def __init__(self, username, password, server_root=None): - self.username = username - self.password = password - if server_root is not None: - self.server_root = server_root - else: - self.server_root = 'https://uccs.landscape.canonical.com' - self.getter = PycURLGetter - - def get_api_url(self): - """Return the URL for an API version.""" - return '%s/api/%s/' % (self.server_root, self.api_version) - - def get_data(self, verify_ssl=True): - """Return the data for this version of the API.""" - try: - return self.getter.get_response_body(self.make_request(), - verify_ssl) - except urllib.error.HTTPError as e: - if e.getcode() == 401: - raise Unauthorized - else: - raise - - -class GetMazaDataAPI1(GetMazaData): - """Get the maza data for a given email and password via API v1.""" - - api_version = 1 - - def make_request(self): - path = '%s/%s' % (quote(self.username), quote(self.password)) - return urllib.request.Request(self.get_api_url() + path) - - -class GetMazaDataAPI4(GetMazaData): - """Get the maza data for a given email and password via API v4.""" - api_version = 4 - - def get_url(self): - return self.get_api_url() - - def make_request(self): - request = urllib.request.Request(self.get_url()) - credentials = '%s:%s' % (self.username, self.password) - credentials64 = base64.encodebytes(credentials.encode('ascii')) - authorization = 'Basic %s' % credentials64.decode('ascii') - request.add_header('Authorization', authorization) - return request - - -class GetMazaDataAPI3(GetMazaDataAPI4): - """Get the maza data for a given email and password via API v3.""" - - api_version = 3 - - def get_url(self): - return self.get_api_url() + quote(self.username) - - -class GetMazaDataAPI2(GetMazaDataAPI3): - """Get the maza data for a given email and password via API v2.""" - - api_version = 2 - - -api_versions = { - '1': GetMazaDataAPI1, - '2': GetMazaDataAPI2, - '3': GetMazaDataAPI3, - '4': GetMazaDataAPI4, - 'default': GetMazaDataAPI4, -} diff --git a/tccalib/tests/__init__.py b/tccalib/tests/__init__.py deleted file mode 100644 index 6323600..0000000 --- a/tccalib/tests/__init__.py +++ /dev/null @@ -1,239 +0,0 @@ -# Copyright 2012 Canonical Ltd. This software is licensed under the GNU -# General Public License version 3 (see the file LICENSE). - - -__metaclass__ = type - - -import urllib.request, urllib.error, urllib.parse -from unittest import TestCase -import base64 - -import pycurl -from io import BytesIO - -from tccalib import ( - CertificateVerificationFailed, - CouldNotConnect, - GetMazaData, - GetMazaDataAPI1, - GetMazaDataAPI2, - GetMazaDataAPI3, - GetMazaDataAPI4, - PycURLGetter, - Unauthorized, - ) - - -class FakeGetResponseBody: - """Fake to raise a 401 when get_response_body is called.""" - - def get_response_body(self, request, verify_ssl): - raise urllib.error.HTTPError(None, 401, None, None, None) - - -class GetMazaDataFaked(GetMazaData): - """Subclass of GetMazaData for testing.""" - - api_version = 34 - - def __init__(self, username=None, server_root=None): - super(GetMazaDataFaked, self).__init__(username, username, server_root) - - def make_request(self): - pass - - -class TestGetMazaData(TestCase): - - def test_get_data_unauthorized(self): - """If a 401 is encountered, Unauthorized is raised.""" - api_client = GetMazaDataFaked() - api_client.getter = FakeGetResponseBody() - with self.assertRaises(Unauthorized): - api_client.get_data() - - def test_get_api_url_default(self): - """Default URL is as expected.""" - url = GetMazaDataFaked().get_api_url() - self.assertEqual('https://uccs.landscape.canonical.com/api/34/', - url) - - def test_get_api_url_explicit(self): - """URL is as expected with specified server_root.""" - url = GetMazaDataFaked(server_root='http://foo').get_api_url() - self.assertEqual('http://foo/api/34/', url) - - -class TestGetMazaDataAPI1(TestCase): - - def test_make_request(self): - """v1 requests have correct URL and no Auth header.""" - getter = GetMazaDataAPI1('foo', 'bar') - request = getter.make_request() - self.assertEqual('https://uccs.landscape.canonical.com/api/1/foo/bar', - request.get_full_url()) - self.assertIs(None, request.headers.get('Authorization')) - - -class TestGetMazaDataAPI2(TestCase): - - def test_make_request(self): - """v2 requests have correct URL and Auth header.""" - getter = GetMazaDataAPI2('foo', 'bar') - request = getter.make_request() - credentials = base64.encodebytes(b'foo:bar').decode('ascii') - expected = 'Basic %s' % credentials - self.assertEqual('GET', request.get_method()) - self.assertEqual('https://uccs.landscape.canonical.com/api/2/foo', - request.get_full_url()) - self.assertEqual(expected, request.headers['Authorization']) - - -class TestGetMazaDataAPI3(TestCase): - - def test_make_request(self): - """v3 requests have correct URL and Auth header.""" - getter = GetMazaDataAPI3('foo', 'bar') - request = getter.make_request() - credentials = base64.encodebytes(b'foo:bar').decode('ascii') - expected = 'Basic %s' % credentials - self.assertEqual('GET', request.get_method()) - self.assertEqual('https://uccs.landscape.canonical.com/api/3/foo', - request.get_full_url()) - self.assertEqual(expected, request.headers['Authorization']) - - -class TestGetMazaDataAPI4(TestCase): - - def test_make_request(self): - """v4 requests have correct URL and Auth header.""" - getter = GetMazaDataAPI4('foo', 'bar') - request = getter.make_request() - credentials = base64.encodebytes(b'foo:bar').decode('ascii') - expected = 'Basic %s' % credentials - self.assertEqual('GET', request.get_method()) - self.assertEqual('https://uccs.landscape.canonical.com/api/4/', - request.get_full_url()) - self.assertEqual(expected, request.headers['Authorization']) - - -class FakeCurl: - """Fake pycurl.Curl for testing PycURLGetter.""" - - def __init__(self, response_status=200, body='', header='', - effective_url='http://example.org/', perform_error=None): - self.options = {} - self.info = {} - self.response_status = response_status - self.response_header = bytes(header, 'UTF-8') - self.response_body = bytes(body, 'UTF-8') - self.effective_url=effective_url - self.perform_error = perform_error - - def setopt(self, key, value): - self.options[key] = value - - def perform(self): - if self.perform_error is not None: - raise self.perform_error - self.info[pycurl.EFFECTIVE_URL] = self.effective_url - self.info[pycurl.HTTP_CODE] = self.response_status - self.options[pycurl.HEADERFUNCTION](self.response_header) - self.options[pycurl.WRITEFUNCTION](self.response_body) - - def getinfo(self, key): - return self.info[key] - - -class TestPycURLGetter(TestCase): - - def test_init(self): - """Init should set the WRITEFUNCTION and HEADERFUNCTION.""" - getter = PycURLGetter(FakeCurl()) - options = getter.curl.options - self.assertIsNot(None, options[pycurl.WRITEFUNCTION]) - self.assertIsNot(None, options[pycurl.HEADERFUNCTION]) - - @staticmethod - def make_request(): - return GetMazaDataAPI3('pete', 'pass').make_request() - - def test_prepare_curl(self): - """prepare_curl sets URL and auth header.""" - curl = FakeCurl() - getter = PycURLGetter(curl) - request = self.make_request() - getter.prepare_curl(request) - self.assertEqual(request.get_full_url(), curl.options[pycurl.URL]) - self.assertEqual(['Authorization: Basic cGV0ZTpwYXNz\n'], - curl.options[pycurl.HTTPHEADER]) - - def test_prepare_curl_ssl_verify(self): - """SSL cert verification can be disabled for testing purposes.""" - curl = FakeCurl() - getter = PycURLGetter(curl) - getter.prepare_curl(self.make_request()) - self.assertTrue(curl.options[pycurl.SSL_VERIFYPEER]) - getter.verify_ssl = False - getter.prepare_curl(self.make_request()) - self.assertFalse(curl.options[pycurl.SSL_VERIFYPEER]) - - def test_handle_reponse(self): - """On success, handle_response returns response body.""" - curl = FakeCurl(body='My body!', header='My header!', - effective_url='http://example.com/') - getter = PycURLGetter(curl) - output = getter.handle_response() - self.assertEqual('My body!', output) - self.assertEqual('My header!', getter.response_header.getvalue().decode('UTF-8')) - self.assertEqual('http://example.com/', - getter.curl.getinfo(pycurl.EFFECTIVE_URL)) - - def test_handle_response_http_error(self): - """On http error, handle_response raises urllib.error.HTTPError.""" - curl = FakeCurl(500, 'My body!', '\nContent-type: fake\n\n') - getter = PycURLGetter(curl) - try: - getter.handle_response() - except urllib.error.HTTPError as e: - httpe = e - else: - self.fail('No error was raised.') - try: - httpe.geturl() - except AttributeError as attre: - self.assertEqual("'HTTPError' object has no attribute 'url'", - str(attre)) - self.assertEqual(500, httpe.getcode()) - self.assertEqual('My body!', httpe.msg.decode('UTF-8')) - self.assertEqual([('Content-type', 'fake')], httpe.hdrs.items()) - - def test_handle_response_connection_error(self): - """On connection error, handle_response raises CouldNotConnect.""" - error = pycurl.error(pycurl.E_COULDNT_CONNECT) - getter = PycURLGetter(FakeCurl(perform_error=error)) - with self.assertRaises(CouldNotConnect): - getter.handle_response() - - def test_cert_verification_failed(self): - """Cert verification error raises CertificateVerificationFailed.""" - error = pycurl.error(pycurl.E_SSL_CACERT) - getter = PycURLGetter(FakeCurl(perform_error=error)) - with self.assertRaises(CertificateVerificationFailed): - getter.handle_response() - - def test_handle_response_pycurl_error(self): - """PycURLGetter allows other errors to propagate.""" - error = pycurl.error(pycurl.E_MULTI_OUT_OF_MEMORY) - getter = PycURLGetter(FakeCurl(perform_error=error)) - with self.assertRaises(pycurl.error): - getter.handle_response() - - def test_get_response_body(self): - """On success, get_response_body returns response body.""" - curl = FakeCurl(body='My body!', header='My header!', - effective_url='http://example.com/') - request = GetMazaDataAPI3('pete', 'pass').make_request() - output = PycURLGetter.get_response_body(request, _curl=curl) - self.assertEqual('My body!', output) |