From ca3bc5e49f2965ed365aa17cf8c8adb55fdaa828 Mon Sep 17 00:00:00 2001 From: Mike Gabriel Date: Wed, 14 Sep 2016 14:10:54 +0200 Subject: Rename module tccalib to rscalib. --- Makefile | 2 +- debian/rules | 2 +- rscalib/__init__.py | 224 +++++++++++++++++++++++++++++++++++++++++++ rscalib/tests/__init__.py | 239 ++++++++++++++++++++++++++++++++++++++++++++++ setup.py | 4 +- tccalib/__init__.py | 224 ------------------------------------------- tccalib/tests/__init__.py | 239 ---------------------------------------------- thinclient-config-agent | 2 +- 8 files changed, 468 insertions(+), 468 deletions(-) create mode 100644 rscalib/__init__.py create mode 100644 rscalib/tests/__init__.py delete mode 100644 tccalib/__init__.py delete mode 100644 tccalib/tests/__init__.py diff --git a/Makefile b/Makefile index 70c0c12..f914853 100644 --- a/Makefile +++ b/Makefile @@ -1,5 +1,5 @@ lint: pyflakes $$(find -name '*.py') $$(grep '^#!.*python' $$(bzr ls -Vkfile) -l) check: - python3 -m unittest tccalib.tests + python3 -m unittest rscalib.tests .PHONY: lint check diff --git a/debian/rules b/debian/rules index c4f1a51..8a9d0d8 100755 --- a/debian/rules +++ b/debian/rules @@ -17,7 +17,7 @@ override_dh_auto_build: override_dh_auto_test: ifeq ($(filter nocheck,$(DEB_BUILD_OPTIONS)),) set -ex; for python in $(PY3VERS) ; do \ - $$python -m unittest tccalib.tests ; \ + $$python -m unittest rscalib.tests ; \ done endif diff --git a/rscalib/__init__.py b/rscalib/__init__.py new file mode 100644 index 0000000..9c61844 --- /dev/null +++ b/rscalib/__init__.py @@ -0,0 +1,224 @@ +# Copyright 2012 Canonical Ltd. This software is licensed under the GNU +# General Public License version 3 (see the file LICENSE). + +__metaclass__ = type + + +import errno +from http.client import parse_headers +import socket +from io import BytesIO, StringIO +from urllib.parse import quote +import urllib.request, urllib.error +import base64 + +import pycurl + + +class UserError(Exception): + """An error message that should be presented to the user.""" + + def __init__(self, msg=None): + if msg is None: + msg = self.__doc__ + super(UserError, self).__init__(msg) + + +class Unauthorized(UserError): + """Invalid username or password""" + + status = 2 + + +class CouldNotConnect(UserError): + """Could not connect""" + + status = 3 + + +class CertificateVerificationFailed(UserError): + """Certificate verification failed""" + + status = 4 + + +class URLLibGetter: + """Get data from URLs using URLib.""" + + @staticmethod + def get_response_body(request, verify_ssl): + """Return the body of the response to the supplied request. + + :param request: A urllib2.Request + :param verify_ssl: Unused + :raises CouldNotConnect: if there is a connection error. + """ + try: + return urllib.request.urlopen(request).read() + except urllib.error.URLError as e: + if not isinstance(e.args[0], socket.error): + raise + if e.args[0].errno == errno.ECONNREFUSED: + raise CouldNotConnect + raise + + +class PycURLGetter: + """Get data from URLs using PycURL.""" + + def __init__(self, _curl=None): + if _curl is None: + _curl = pycurl.Curl() + self.curl = _curl + self.result = BytesIO() + self.response_header = BytesIO() + self.curl.setopt(pycurl.HEADERFUNCTION, self.response_header.write) + self.curl.setopt(pycurl.WRITEFUNCTION, self.result.write) + self.verify_ssl = True + + def prepare_curl(self, request): + """Prepare the curl object for the supplied request. + + :param request: a urllib2.Request instance. + """ + self.curl.setopt(pycurl.URL, request.get_full_url()) + request_headers = ['%s: %s' % item for + item in list(request.headers.items())] + self.curl.setopt(pycurl.HTTPHEADER, request_headers) + self.curl.setopt(pycurl.SSL_VERIFYPEER, self.verify_ssl) + + @classmethod + def get_response_body(cls, request, verify_ssl=True, _curl=None): + """Return the body of the response to the supplied request. + + :param request: A urllib2.Request instance. + :param verify_ssl: If true, verify SSL certificates. + :param _curl: The pycurl.Curl object to use (for testing). + :raises CouldNotConnect: if there is a connection error. + :raises CertificateVerificationFailed: if the SSL certificate could + not be verified. + :raises HTTPError: if the response status is not 200. + """ + instance = cls(_curl) + instance.verify_ssl = verify_ssl + instance.prepare_curl(request) + return instance.handle_response() + + def handle_response(self): + """Perform the curl operation and handle the response. + + :return: The body of the response on success. + :raises CouldNotConnect: if there is a connection error. + :raises CertificateVerificationFailed: if the SSL certificate could + not be verified. + :raises HTTPError: if the response status is not 200. + """ + try: + self.curl.perform() + except pycurl.error as e: + if e.args[0] in (pycurl.E_COULDNT_CONNECT, + pycurl.E_COULDNT_RESOLVE_HOST): + raise CouldNotConnect + elif e.args[0] == pycurl.E_SSL_CACERT: + raise CertificateVerificationFailed + else: + raise + status = self.curl.getinfo(pycurl.HTTP_CODE) + if status == 200: + return self.result.getvalue().decode('utf-8') + else: + lines = self.response_header.getvalue().decode('utf-8').splitlines(True) + header_ = ''.join(lines[1:]) + headers = parse_headers(BytesIO(header_.encode('ascii'))) + raise urllib.error.HTTPError( + self.curl.getinfo(pycurl.EFFECTIVE_URL), status, + self.result.getvalue(), headers, None) + + +class GetMazaData: + """Base class for retrieving data from MAZA server.""" + + @classmethod + def run(cls, username, password, server_root=None, verify_ssl=True): + """Return the requested data. + + :param username: The username of the user. + :param password: The user's password. + :param server_root: The root URL to make queries to. + :param verify_ssl: If true, verify SSL certificates. + """ + return cls(username, password, server_root).get_data(verify_ssl) + + def __init__(self, username, password, server_root=None): + self.username = username + self.password = password + if server_root is not None: + self.server_root = server_root + else: + self.server_root = 'https://uccs.landscape.canonical.com' + self.getter = PycURLGetter + + def get_api_url(self): + """Return the URL for an API version.""" + return '%s/api/%s/' % (self.server_root, self.api_version) + + def get_data(self, verify_ssl=True): + """Return the data for this version of the API.""" + try: + return self.getter.get_response_body(self.make_request(), + verify_ssl) + except urllib.error.HTTPError as e: + if e.getcode() == 401: + raise Unauthorized + else: + raise + + +class GetMazaDataAPI1(GetMazaData): + """Get the maza data for a given email and password via API v1.""" + + api_version = 1 + + def make_request(self): + path = '%s/%s' % (quote(self.username), quote(self.password)) + return urllib.request.Request(self.get_api_url() + path) + + +class GetMazaDataAPI4(GetMazaData): + """Get the maza data for a given email and password via API v4.""" + api_version = 4 + + def get_url(self): + return self.get_api_url() + + def make_request(self): + request = urllib.request.Request(self.get_url()) + credentials = '%s:%s' % (self.username, self.password) + credentials64 = base64.encodebytes(credentials.encode('ascii')) + authorization = 'Basic %s' % credentials64.decode('ascii') + request.add_header('Authorization', authorization) + return request + + +class GetMazaDataAPI3(GetMazaDataAPI4): + """Get the maza data for a given email and password via API v3.""" + + api_version = 3 + + def get_url(self): + return self.get_api_url() + quote(self.username) + + +class GetMazaDataAPI2(GetMazaDataAPI3): + """Get the maza data for a given email and password via API v2.""" + + api_version = 2 + + +api_versions = { + '1': GetMazaDataAPI1, + '2': GetMazaDataAPI2, + '3': GetMazaDataAPI3, + '4': GetMazaDataAPI4, + 'default': GetMazaDataAPI4, +} diff --git a/rscalib/tests/__init__.py b/rscalib/tests/__init__.py new file mode 100644 index 0000000..c3a8afa --- /dev/null +++ b/rscalib/tests/__init__.py @@ -0,0 +1,239 @@ +# Copyright 2012 Canonical Ltd. This software is licensed under the GNU +# General Public License version 3 (see the file LICENSE). + + +__metaclass__ = type + + +import urllib.request, urllib.error, urllib.parse +from unittest import TestCase +import base64 + +import pycurl +from io import BytesIO + +from rscalib import ( + CertificateVerificationFailed, + CouldNotConnect, + GetMazaData, + GetMazaDataAPI1, + GetMazaDataAPI2, + GetMazaDataAPI3, + GetMazaDataAPI4, + PycURLGetter, + Unauthorized, + ) + + +class FakeGetResponseBody: + """Fake to raise a 401 when get_response_body is called.""" + + def get_response_body(self, request, verify_ssl): + raise urllib.error.HTTPError(None, 401, None, None, None) + + +class GetMazaDataFaked(GetMazaData): + """Subclass of GetMazaData for testing.""" + + api_version = 34 + + def __init__(self, username=None, server_root=None): + super(GetMazaDataFaked, self).__init__(username, username, server_root) + + def make_request(self): + pass + + +class TestGetMazaData(TestCase): + + def test_get_data_unauthorized(self): + """If a 401 is encountered, Unauthorized is raised.""" + api_client = GetMazaDataFaked() + api_client.getter = FakeGetResponseBody() + with self.assertRaises(Unauthorized): + api_client.get_data() + + def test_get_api_url_default(self): + """Default URL is as expected.""" + url = GetMazaDataFaked().get_api_url() + self.assertEqual('https://uccs.landscape.canonical.com/api/34/', + url) + + def test_get_api_url_explicit(self): + """URL is as expected with specified server_root.""" + url = GetMazaDataFaked(server_root='http://foo').get_api_url() + self.assertEqual('http://foo/api/34/', url) + + +class TestGetMazaDataAPI1(TestCase): + + def test_make_request(self): + """v1 requests have correct URL and no Auth header.""" + getter = GetMazaDataAPI1('foo', 'bar') + request = getter.make_request() + self.assertEqual('https://uccs.landscape.canonical.com/api/1/foo/bar', + request.get_full_url()) + self.assertIs(None, request.headers.get('Authorization')) + + +class TestGetMazaDataAPI2(TestCase): + + def test_make_request(self): + """v2 requests have correct URL and Auth header.""" + getter = GetMazaDataAPI2('foo', 'bar') + request = getter.make_request() + credentials = base64.encodebytes(b'foo:bar').decode('ascii') + expected = 'Basic %s' % credentials + self.assertEqual('GET', request.get_method()) + self.assertEqual('https://uccs.landscape.canonical.com/api/2/foo', + request.get_full_url()) + self.assertEqual(expected, request.headers['Authorization']) + + +class TestGetMazaDataAPI3(TestCase): + + def test_make_request(self): + """v3 requests have correct URL and Auth header.""" + getter = GetMazaDataAPI3('foo', 'bar') + request = getter.make_request() + credentials = base64.encodebytes(b'foo:bar').decode('ascii') + expected = 'Basic %s' % credentials + self.assertEqual('GET', request.get_method()) + self.assertEqual('https://uccs.landscape.canonical.com/api/3/foo', + request.get_full_url()) + self.assertEqual(expected, request.headers['Authorization']) + + +class TestGetMazaDataAPI4(TestCase): + + def test_make_request(self): + """v4 requests have correct URL and Auth header.""" + getter = GetMazaDataAPI4('foo', 'bar') + request = getter.make_request() + credentials = base64.encodebytes(b'foo:bar').decode('ascii') + expected = 'Basic %s' % credentials + self.assertEqual('GET', request.get_method()) + self.assertEqual('https://uccs.landscape.canonical.com/api/4/', + request.get_full_url()) + self.assertEqual(expected, request.headers['Authorization']) + + +class FakeCurl: + """Fake pycurl.Curl for testing PycURLGetter.""" + + def __init__(self, response_status=200, body='', header='', + effective_url='http://example.org/', perform_error=None): + self.options = {} + self.info = {} + self.response_status = response_status + self.response_header = bytes(header, 'UTF-8') + self.response_body = bytes(body, 'UTF-8') + self.effective_url=effective_url + self.perform_error = perform_error + + def setopt(self, key, value): + self.options[key] = value + + def perform(self): + if self.perform_error is not None: + raise self.perform_error + self.info[pycurl.EFFECTIVE_URL] = self.effective_url + self.info[pycurl.HTTP_CODE] = self.response_status + self.options[pycurl.HEADERFUNCTION](self.response_header) + self.options[pycurl.WRITEFUNCTION](self.response_body) + + def getinfo(self, key): + return self.info[key] + + +class TestPycURLGetter(TestCase): + + def test_init(self): + """Init should set the WRITEFUNCTION and HEADERFUNCTION.""" + getter = PycURLGetter(FakeCurl()) + options = getter.curl.options + self.assertIsNot(None, options[pycurl.WRITEFUNCTION]) + self.assertIsNot(None, options[pycurl.HEADERFUNCTION]) + + @staticmethod + def make_request(): + return GetMazaDataAPI3('pete', 'pass').make_request() + + def test_prepare_curl(self): + """prepare_curl sets URL and auth header.""" + curl = FakeCurl() + getter = PycURLGetter(curl) + request = self.make_request() + getter.prepare_curl(request) + self.assertEqual(request.get_full_url(), curl.options[pycurl.URL]) + self.assertEqual(['Authorization: Basic cGV0ZTpwYXNz\n'], + curl.options[pycurl.HTTPHEADER]) + + def test_prepare_curl_ssl_verify(self): + """SSL cert verification can be disabled for testing purposes.""" + curl = FakeCurl() + getter = PycURLGetter(curl) + getter.prepare_curl(self.make_request()) + self.assertTrue(curl.options[pycurl.SSL_VERIFYPEER]) + getter.verify_ssl = False + getter.prepare_curl(self.make_request()) + self.assertFalse(curl.options[pycurl.SSL_VERIFYPEER]) + + def test_handle_reponse(self): + """On success, handle_response returns response body.""" + curl = FakeCurl(body='My body!', header='My header!', + effective_url='http://example.com/') + getter = PycURLGetter(curl) + output = getter.handle_response() + self.assertEqual('My body!', output) + self.assertEqual('My header!', getter.response_header.getvalue().decode('UTF-8')) + self.assertEqual('http://example.com/', + getter.curl.getinfo(pycurl.EFFECTIVE_URL)) + + def test_handle_response_http_error(self): + """On http error, handle_response raises urllib.error.HTTPError.""" + curl = FakeCurl(500, 'My body!', '\nContent-type: fake\n\n') + getter = PycURLGetter(curl) + try: + getter.handle_response() + except urllib.error.HTTPError as e: + httpe = e + else: + self.fail('No error was raised.') + try: + httpe.geturl() + except AttributeError as attre: + self.assertEqual("'HTTPError' object has no attribute 'url'", + str(attre)) + self.assertEqual(500, httpe.getcode()) + self.assertEqual('My body!', httpe.msg.decode('UTF-8')) + self.assertEqual([('Content-type', 'fake')], httpe.hdrs.items()) + + def test_handle_response_connection_error(self): + """On connection error, handle_response raises CouldNotConnect.""" + error = pycurl.error(pycurl.E_COULDNT_CONNECT) + getter = PycURLGetter(FakeCurl(perform_error=error)) + with self.assertRaises(CouldNotConnect): + getter.handle_response() + + def test_cert_verification_failed(self): + """Cert verification error raises CertificateVerificationFailed.""" + error = pycurl.error(pycurl.E_SSL_CACERT) + getter = PycURLGetter(FakeCurl(perform_error=error)) + with self.assertRaises(CertificateVerificationFailed): + getter.handle_response() + + def test_handle_response_pycurl_error(self): + """PycURLGetter allows other errors to propagate.""" + error = pycurl.error(pycurl.E_MULTI_OUT_OF_MEMORY) + getter = PycURLGetter(FakeCurl(perform_error=error)) + with self.assertRaises(pycurl.error): + getter.handle_response() + + def test_get_response_body(self): + """On success, get_response_body returns response body.""" + curl = FakeCurl(body='My body!', header='My header!', + effective_url='http://example.com/') + request = GetMazaDataAPI3('pete', 'pass').make_request() + output = PycURLGetter.get_response_body(request, _curl=curl) + self.assertEqual('My body!', output) diff --git a/setup.py b/setup.py index df43b06..c0e0d0c 100644 --- a/setup.py +++ b/setup.py @@ -9,6 +9,6 @@ setup( license='GPL', description='Retrieve the list of remote desktop servers for a user.', long_description='Retrieve the list of remote desktop servers for a user.', - packages=['tccalib'], - test_suite='tccalib', + packages=['rscalib'], + test_suite='rscalib', ) diff --git a/tccalib/__init__.py b/tccalib/__init__.py deleted file mode 100644 index 9c61844..0000000 --- a/tccalib/__init__.py +++ /dev/null @@ -1,224 +0,0 @@ -# Copyright 2012 Canonical Ltd. This software is licensed under the GNU -# General Public License version 3 (see the file LICENSE). - -__metaclass__ = type - - -import errno -from http.client import parse_headers -import socket -from io import BytesIO, StringIO -from urllib.parse import quote -import urllib.request, urllib.error -import base64 - -import pycurl - - -class UserError(Exception): - """An error message that should be presented to the user.""" - - def __init__(self, msg=None): - if msg is None: - msg = self.__doc__ - super(UserError, self).__init__(msg) - - -class Unauthorized(UserError): - """Invalid username or password""" - - status = 2 - - -class CouldNotConnect(UserError): - """Could not connect""" - - status = 3 - - -class CertificateVerificationFailed(UserError): - """Certificate verification failed""" - - status = 4 - - -class URLLibGetter: - """Get data from URLs using URLib.""" - - @staticmethod - def get_response_body(request, verify_ssl): - """Return the body of the response to the supplied request. - - :param request: A urllib2.Request - :param verify_ssl: Unused - :raises CouldNotConnect: if there is a connection error. - """ - try: - return urllib.request.urlopen(request).read() - except urllib.error.URLError as e: - if not isinstance(e.args[0], socket.error): - raise - if e.args[0].errno == errno.ECONNREFUSED: - raise CouldNotConnect - raise - - -class PycURLGetter: - """Get data from URLs using PycURL.""" - - def __init__(self, _curl=None): - if _curl is None: - _curl = pycurl.Curl() - self.curl = _curl - self.result = BytesIO() - self.response_header = BytesIO() - self.curl.setopt(pycurl.HEADERFUNCTION, self.response_header.write) - self.curl.setopt(pycurl.WRITEFUNCTION, self.result.write) - self.verify_ssl = True - - def prepare_curl(self, request): - """Prepare the curl object for the supplied request. - - :param request: a urllib2.Request instance. - """ - self.curl.setopt(pycurl.URL, request.get_full_url()) - request_headers = ['%s: %s' % item for - item in list(request.headers.items())] - self.curl.setopt(pycurl.HTTPHEADER, request_headers) - self.curl.setopt(pycurl.SSL_VERIFYPEER, self.verify_ssl) - - @classmethod - def get_response_body(cls, request, verify_ssl=True, _curl=None): - """Return the body of the response to the supplied request. - - :param request: A urllib2.Request instance. - :param verify_ssl: If true, verify SSL certificates. - :param _curl: The pycurl.Curl object to use (for testing). - :raises CouldNotConnect: if there is a connection error. - :raises CertificateVerificationFailed: if the SSL certificate could - not be verified. - :raises HTTPError: if the response status is not 200. - """ - instance = cls(_curl) - instance.verify_ssl = verify_ssl - instance.prepare_curl(request) - return instance.handle_response() - - def handle_response(self): - """Perform the curl operation and handle the response. - - :return: The body of the response on success. - :raises CouldNotConnect: if there is a connection error. - :raises CertificateVerificationFailed: if the SSL certificate could - not be verified. - :raises HTTPError: if the response status is not 200. - """ - try: - self.curl.perform() - except pycurl.error as e: - if e.args[0] in (pycurl.E_COULDNT_CONNECT, - pycurl.E_COULDNT_RESOLVE_HOST): - raise CouldNotConnect - elif e.args[0] == pycurl.E_SSL_CACERT: - raise CertificateVerificationFailed - else: - raise - status = self.curl.getinfo(pycurl.HTTP_CODE) - if status == 200: - return self.result.getvalue().decode('utf-8') - else: - lines = self.response_header.getvalue().decode('utf-8').splitlines(True) - header_ = ''.join(lines[1:]) - headers = parse_headers(BytesIO(header_.encode('ascii'))) - raise urllib.error.HTTPError( - self.curl.getinfo(pycurl.EFFECTIVE_URL), status, - self.result.getvalue(), headers, None) - - -class GetMazaData: - """Base class for retrieving data from MAZA server.""" - - @classmethod - def run(cls, username, password, server_root=None, verify_ssl=True): - """Return the requested data. - - :param username: The username of the user. - :param password: The user's password. - :param server_root: The root URL to make queries to. - :param verify_ssl: If true, verify SSL certificates. - """ - return cls(username, password, server_root).get_data(verify_ssl) - - def __init__(self, username, password, server_root=None): - self.username = username - self.password = password - if server_root is not None: - self.server_root = server_root - else: - self.server_root = 'https://uccs.landscape.canonical.com' - self.getter = PycURLGetter - - def get_api_url(self): - """Return the URL for an API version.""" - return '%s/api/%s/' % (self.server_root, self.api_version) - - def get_data(self, verify_ssl=True): - """Return the data for this version of the API.""" - try: - return self.getter.get_response_body(self.make_request(), - verify_ssl) - except urllib.error.HTTPError as e: - if e.getcode() == 401: - raise Unauthorized - else: - raise - - -class GetMazaDataAPI1(GetMazaData): - """Get the maza data for a given email and password via API v1.""" - - api_version = 1 - - def make_request(self): - path = '%s/%s' % (quote(self.username), quote(self.password)) - return urllib.request.Request(self.get_api_url() + path) - - -class GetMazaDataAPI4(GetMazaData): - """Get the maza data for a given email and password via API v4.""" - api_version = 4 - - def get_url(self): - return self.get_api_url() - - def make_request(self): - request = urllib.request.Request(self.get_url()) - credentials = '%s:%s' % (self.username, self.password) - credentials64 = base64.encodebytes(credentials.encode('ascii')) - authorization = 'Basic %s' % credentials64.decode('ascii') - request.add_header('Authorization', authorization) - return request - - -class GetMazaDataAPI3(GetMazaDataAPI4): - """Get the maza data for a given email and password via API v3.""" - - api_version = 3 - - def get_url(self): - return self.get_api_url() + quote(self.username) - - -class GetMazaDataAPI2(GetMazaDataAPI3): - """Get the maza data for a given email and password via API v2.""" - - api_version = 2 - - -api_versions = { - '1': GetMazaDataAPI1, - '2': GetMazaDataAPI2, - '3': GetMazaDataAPI3, - '4': GetMazaDataAPI4, - 'default': GetMazaDataAPI4, -} diff --git a/tccalib/tests/__init__.py b/tccalib/tests/__init__.py deleted file mode 100644 index 6323600..0000000 --- a/tccalib/tests/__init__.py +++ /dev/null @@ -1,239 +0,0 @@ -# Copyright 2012 Canonical Ltd. This software is licensed under the GNU -# General Public License version 3 (see the file LICENSE). - - -__metaclass__ = type - - -import urllib.request, urllib.error, urllib.parse -from unittest import TestCase -import base64 - -import pycurl -from io import BytesIO - -from tccalib import ( - CertificateVerificationFailed, - CouldNotConnect, - GetMazaData, - GetMazaDataAPI1, - GetMazaDataAPI2, - GetMazaDataAPI3, - GetMazaDataAPI4, - PycURLGetter, - Unauthorized, - ) - - -class FakeGetResponseBody: - """Fake to raise a 401 when get_response_body is called.""" - - def get_response_body(self, request, verify_ssl): - raise urllib.error.HTTPError(None, 401, None, None, None) - - -class GetMazaDataFaked(GetMazaData): - """Subclass of GetMazaData for testing.""" - - api_version = 34 - - def __init__(self, username=None, server_root=None): - super(GetMazaDataFaked, self).__init__(username, username, server_root) - - def make_request(self): - pass - - -class TestGetMazaData(TestCase): - - def test_get_data_unauthorized(self): - """If a 401 is encountered, Unauthorized is raised.""" - api_client = GetMazaDataFaked() - api_client.getter = FakeGetResponseBody() - with self.assertRaises(Unauthorized): - api_client.get_data() - - def test_get_api_url_default(self): - """Default URL is as expected.""" - url = GetMazaDataFaked().get_api_url() - self.assertEqual('https://uccs.landscape.canonical.com/api/34/', - url) - - def test_get_api_url_explicit(self): - """URL is as expected with specified server_root.""" - url = GetMazaDataFaked(server_root='http://foo').get_api_url() - self.assertEqual('http://foo/api/34/', url) - - -class TestGetMazaDataAPI1(TestCase): - - def test_make_request(self): - """v1 requests have correct URL and no Auth header.""" - getter = GetMazaDataAPI1('foo', 'bar') - request = getter.make_request() - self.assertEqual('https://uccs.landscape.canonical.com/api/1/foo/bar', - request.get_full_url()) - self.assertIs(None, request.headers.get('Authorization')) - - -class TestGetMazaDataAPI2(TestCase): - - def test_make_request(self): - """v2 requests have correct URL and Auth header.""" - getter = GetMazaDataAPI2('foo', 'bar') - request = getter.make_request() - credentials = base64.encodebytes(b'foo:bar').decode('ascii') - expected = 'Basic %s' % credentials - self.assertEqual('GET', request.get_method()) - self.assertEqual('https://uccs.landscape.canonical.com/api/2/foo', - request.get_full_url()) - self.assertEqual(expected, request.headers['Authorization']) - - -class TestGetMazaDataAPI3(TestCase): - - def test_make_request(self): - """v3 requests have correct URL and Auth header.""" - getter = GetMazaDataAPI3('foo', 'bar') - request = getter.make_request() - credentials = base64.encodebytes(b'foo:bar').decode('ascii') - expected = 'Basic %s' % credentials - self.assertEqual('GET', request.get_method()) - self.assertEqual('https://uccs.landscape.canonical.com/api/3/foo', - request.get_full_url()) - self.assertEqual(expected, request.headers['Authorization']) - - -class TestGetMazaDataAPI4(TestCase): - - def test_make_request(self): - """v4 requests have correct URL and Auth header.""" - getter = GetMazaDataAPI4('foo', 'bar') - request = getter.make_request() - credentials = base64.encodebytes(b'foo:bar').decode('ascii') - expected = 'Basic %s' % credentials - self.assertEqual('GET', request.get_method()) - self.assertEqual('https://uccs.landscape.canonical.com/api/4/', - request.get_full_url()) - self.assertEqual(expected, request.headers['Authorization']) - - -class FakeCurl: - """Fake pycurl.Curl for testing PycURLGetter.""" - - def __init__(self, response_status=200, body='', header='', - effective_url='http://example.org/', perform_error=None): - self.options = {} - self.info = {} - self.response_status = response_status - self.response_header = bytes(header, 'UTF-8') - self.response_body = bytes(body, 'UTF-8') - self.effective_url=effective_url - self.perform_error = perform_error - - def setopt(self, key, value): - self.options[key] = value - - def perform(self): - if self.perform_error is not None: - raise self.perform_error - self.info[pycurl.EFFECTIVE_URL] = self.effective_url - self.info[pycurl.HTTP_CODE] = self.response_status - self.options[pycurl.HEADERFUNCTION](self.response_header) - self.options[pycurl.WRITEFUNCTION](self.response_body) - - def getinfo(self, key): - return self.info[key] - - -class TestPycURLGetter(TestCase): - - def test_init(self): - """Init should set the WRITEFUNCTION and HEADERFUNCTION.""" - getter = PycURLGetter(FakeCurl()) - options = getter.curl.options - self.assertIsNot(None, options[pycurl.WRITEFUNCTION]) - self.assertIsNot(None, options[pycurl.HEADERFUNCTION]) - - @staticmethod - def make_request(): - return GetMazaDataAPI3('pete', 'pass').make_request() - - def test_prepare_curl(self): - """prepare_curl sets URL and auth header.""" - curl = FakeCurl() - getter = PycURLGetter(curl) - request = self.make_request() - getter.prepare_curl(request) - self.assertEqual(request.get_full_url(), curl.options[pycurl.URL]) - self.assertEqual(['Authorization: Basic cGV0ZTpwYXNz\n'], - curl.options[pycurl.HTTPHEADER]) - - def test_prepare_curl_ssl_verify(self): - """SSL cert verification can be disabled for testing purposes.""" - curl = FakeCurl() - getter = PycURLGetter(curl) - getter.prepare_curl(self.make_request()) - self.assertTrue(curl.options[pycurl.SSL_VERIFYPEER]) - getter.verify_ssl = False - getter.prepare_curl(self.make_request()) - self.assertFalse(curl.options[pycurl.SSL_VERIFYPEER]) - - def test_handle_reponse(self): - """On success, handle_response returns response body.""" - curl = FakeCurl(body='My body!', header='My header!', - effective_url='http://example.com/') - getter = PycURLGetter(curl) - output = getter.handle_response() - self.assertEqual('My body!', output) - self.assertEqual('My header!', getter.response_header.getvalue().decode('UTF-8')) - self.assertEqual('http://example.com/', - getter.curl.getinfo(pycurl.EFFECTIVE_URL)) - - def test_handle_response_http_error(self): - """On http error, handle_response raises urllib.error.HTTPError.""" - curl = FakeCurl(500, 'My body!', '\nContent-type: fake\n\n') - getter = PycURLGetter(curl) - try: - getter.handle_response() - except urllib.error.HTTPError as e: - httpe = e - else: - self.fail('No error was raised.') - try: - httpe.geturl() - except AttributeError as attre: - self.assertEqual("'HTTPError' object has no attribute 'url'", - str(attre)) - self.assertEqual(500, httpe.getcode()) - self.assertEqual('My body!', httpe.msg.decode('UTF-8')) - self.assertEqual([('Content-type', 'fake')], httpe.hdrs.items()) - - def test_handle_response_connection_error(self): - """On connection error, handle_response raises CouldNotConnect.""" - error = pycurl.error(pycurl.E_COULDNT_CONNECT) - getter = PycURLGetter(FakeCurl(perform_error=error)) - with self.assertRaises(CouldNotConnect): - getter.handle_response() - - def test_cert_verification_failed(self): - """Cert verification error raises CertificateVerificationFailed.""" - error = pycurl.error(pycurl.E_SSL_CACERT) - getter = PycURLGetter(FakeCurl(perform_error=error)) - with self.assertRaises(CertificateVerificationFailed): - getter.handle_response() - - def test_handle_response_pycurl_error(self): - """PycURLGetter allows other errors to propagate.""" - error = pycurl.error(pycurl.E_MULTI_OUT_OF_MEMORY) - getter = PycURLGetter(FakeCurl(perform_error=error)) - with self.assertRaises(pycurl.error): - getter.handle_response() - - def test_get_response_body(self): - """On success, get_response_body returns response body.""" - curl = FakeCurl(body='My body!', header='My header!', - effective_url='http://example.com/') - request = GetMazaDataAPI3('pete', 'pass').make_request() - output = PycURLGetter.get_response_body(request, _curl=curl) - self.assertEqual('My body!', output) diff --git a/thinclient-config-agent b/thinclient-config-agent index 1140c97..3d7b5bc 100755 --- a/thinclient-config-agent +++ b/thinclient-config-agent @@ -6,7 +6,7 @@ from optparse import OptionParser import os import sys -from tccalib import ( +from rscalib import ( api_versions, UserError, ) -- cgit v1.2.3