From ca3bc5e49f2965ed365aa17cf8c8adb55fdaa828 Mon Sep 17 00:00:00 2001 From: Mike Gabriel Date: Wed, 14 Sep 2016 14:10:54 +0200 Subject: Rename module tccalib to rscalib. --- rscalib/__init__.py | 224 +++++++++++++++++++++++++++++++++++++++++++ rscalib/tests/__init__.py | 239 ++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 463 insertions(+) create mode 100644 rscalib/__init__.py create mode 100644 rscalib/tests/__init__.py (limited to 'rscalib') diff --git a/rscalib/__init__.py b/rscalib/__init__.py new file mode 100644 index 0000000..9c61844 --- /dev/null +++ b/rscalib/__init__.py @@ -0,0 +1,224 @@ +# Copyright 2012 Canonical Ltd. This software is licensed under the GNU +# General Public License version 3 (see the file LICENSE). + +__metaclass__ = type + + +import errno +from http.client import parse_headers +import socket +from io import BytesIO, StringIO +from urllib.parse import quote +import urllib.request, urllib.error +import base64 + +import pycurl + + +class UserError(Exception): + """An error message that should be presented to the user.""" + + def __init__(self, msg=None): + if msg is None: + msg = self.__doc__ + super(UserError, self).__init__(msg) + + +class Unauthorized(UserError): + """Invalid username or password""" + + status = 2 + + +class CouldNotConnect(UserError): + """Could not connect""" + + status = 3 + + +class CertificateVerificationFailed(UserError): + """Certificate verification failed""" + + status = 4 + + +class URLLibGetter: + """Get data from URLs using URLib.""" + + @staticmethod + def get_response_body(request, verify_ssl): + """Return the body of the response to the supplied request. + + :param request: A urllib2.Request + :param verify_ssl: Unused + :raises CouldNotConnect: if there is a connection error. + """ + try: + return urllib.request.urlopen(request).read() + except urllib.error.URLError as e: + if not isinstance(e.args[0], socket.error): + raise + if e.args[0].errno == errno.ECONNREFUSED: + raise CouldNotConnect + raise + + +class PycURLGetter: + """Get data from URLs using PycURL.""" + + def __init__(self, _curl=None): + if _curl is None: + _curl = pycurl.Curl() + self.curl = _curl + self.result = BytesIO() + self.response_header = BytesIO() + self.curl.setopt(pycurl.HEADERFUNCTION, self.response_header.write) + self.curl.setopt(pycurl.WRITEFUNCTION, self.result.write) + self.verify_ssl = True + + def prepare_curl(self, request): + """Prepare the curl object for the supplied request. + + :param request: a urllib2.Request instance. + """ + self.curl.setopt(pycurl.URL, request.get_full_url()) + request_headers = ['%s: %s' % item for + item in list(request.headers.items())] + self.curl.setopt(pycurl.HTTPHEADER, request_headers) + self.curl.setopt(pycurl.SSL_VERIFYPEER, self.verify_ssl) + + @classmethod + def get_response_body(cls, request, verify_ssl=True, _curl=None): + """Return the body of the response to the supplied request. + + :param request: A urllib2.Request instance. + :param verify_ssl: If true, verify SSL certificates. + :param _curl: The pycurl.Curl object to use (for testing). + :raises CouldNotConnect: if there is a connection error. + :raises CertificateVerificationFailed: if the SSL certificate could + not be verified. + :raises HTTPError: if the response status is not 200. + """ + instance = cls(_curl) + instance.verify_ssl = verify_ssl + instance.prepare_curl(request) + return instance.handle_response() + + def handle_response(self): + """Perform the curl operation and handle the response. + + :return: The body of the response on success. + :raises CouldNotConnect: if there is a connection error. + :raises CertificateVerificationFailed: if the SSL certificate could + not be verified. + :raises HTTPError: if the response status is not 200. + """ + try: + self.curl.perform() + except pycurl.error as e: + if e.args[0] in (pycurl.E_COULDNT_CONNECT, + pycurl.E_COULDNT_RESOLVE_HOST): + raise CouldNotConnect + elif e.args[0] == pycurl.E_SSL_CACERT: + raise CertificateVerificationFailed + else: + raise + status = self.curl.getinfo(pycurl.HTTP_CODE) + if status == 200: + return self.result.getvalue().decode('utf-8') + else: + lines = self.response_header.getvalue().decode('utf-8').splitlines(True) + header_ = ''.join(lines[1:]) + headers = parse_headers(BytesIO(header_.encode('ascii'))) + raise urllib.error.HTTPError( + self.curl.getinfo(pycurl.EFFECTIVE_URL), status, + self.result.getvalue(), headers, None) + + +class GetMazaData: + """Base class for retrieving data from MAZA server.""" + + @classmethod + def run(cls, username, password, server_root=None, verify_ssl=True): + """Return the requested data. + + :param username: The username of the user. + :param password: The user's password. + :param server_root: The root URL to make queries to. + :param verify_ssl: If true, verify SSL certificates. + """ + return cls(username, password, server_root).get_data(verify_ssl) + + def __init__(self, username, password, server_root=None): + self.username = username + self.password = password + if server_root is not None: + self.server_root = server_root + else: + self.server_root = 'https://uccs.landscape.canonical.com' + self.getter = PycURLGetter + + def get_api_url(self): + """Return the URL for an API version.""" + return '%s/api/%s/' % (self.server_root, self.api_version) + + def get_data(self, verify_ssl=True): + """Return the data for this version of the API.""" + try: + return self.getter.get_response_body(self.make_request(), + verify_ssl) + except urllib.error.HTTPError as e: + if e.getcode() == 401: + raise Unauthorized + else: + raise + + +class GetMazaDataAPI1(GetMazaData): + """Get the maza data for a given email and password via API v1.""" + + api_version = 1 + + def make_request(self): + path = '%s/%s' % (quote(self.username), quote(self.password)) + return urllib.request.Request(self.get_api_url() + path) + + +class GetMazaDataAPI4(GetMazaData): + """Get the maza data for a given email and password via API v4.""" + api_version = 4 + + def get_url(self): + return self.get_api_url() + + def make_request(self): + request = urllib.request.Request(self.get_url()) + credentials = '%s:%s' % (self.username, self.password) + credentials64 = base64.encodebytes(credentials.encode('ascii')) + authorization = 'Basic %s' % credentials64.decode('ascii') + request.add_header('Authorization', authorization) + return request + + +class GetMazaDataAPI3(GetMazaDataAPI4): + """Get the maza data for a given email and password via API v3.""" + + api_version = 3 + + def get_url(self): + return self.get_api_url() + quote(self.username) + + +class GetMazaDataAPI2(GetMazaDataAPI3): + """Get the maza data for a given email and password via API v2.""" + + api_version = 2 + + +api_versions = { + '1': GetMazaDataAPI1, + '2': GetMazaDataAPI2, + '3': GetMazaDataAPI3, + '4': GetMazaDataAPI4, + 'default': GetMazaDataAPI4, +} diff --git a/rscalib/tests/__init__.py b/rscalib/tests/__init__.py new file mode 100644 index 0000000..c3a8afa --- /dev/null +++ b/rscalib/tests/__init__.py @@ -0,0 +1,239 @@ +# Copyright 2012 Canonical Ltd. This software is licensed under the GNU +# General Public License version 3 (see the file LICENSE). + + +__metaclass__ = type + + +import urllib.request, urllib.error, urllib.parse +from unittest import TestCase +import base64 + +import pycurl +from io import BytesIO + +from rscalib import ( + CertificateVerificationFailed, + CouldNotConnect, + GetMazaData, + GetMazaDataAPI1, + GetMazaDataAPI2, + GetMazaDataAPI3, + GetMazaDataAPI4, + PycURLGetter, + Unauthorized, + ) + + +class FakeGetResponseBody: + """Fake to raise a 401 when get_response_body is called.""" + + def get_response_body(self, request, verify_ssl): + raise urllib.error.HTTPError(None, 401, None, None, None) + + +class GetMazaDataFaked(GetMazaData): + """Subclass of GetMazaData for testing.""" + + api_version = 34 + + def __init__(self, username=None, server_root=None): + super(GetMazaDataFaked, self).__init__(username, username, server_root) + + def make_request(self): + pass + + +class TestGetMazaData(TestCase): + + def test_get_data_unauthorized(self): + """If a 401 is encountered, Unauthorized is raised.""" + api_client = GetMazaDataFaked() + api_client.getter = FakeGetResponseBody() + with self.assertRaises(Unauthorized): + api_client.get_data() + + def test_get_api_url_default(self): + """Default URL is as expected.""" + url = GetMazaDataFaked().get_api_url() + self.assertEqual('https://uccs.landscape.canonical.com/api/34/', + url) + + def test_get_api_url_explicit(self): + """URL is as expected with specified server_root.""" + url = GetMazaDataFaked(server_root='http://foo').get_api_url() + self.assertEqual('http://foo/api/34/', url) + + +class TestGetMazaDataAPI1(TestCase): + + def test_make_request(self): + """v1 requests have correct URL and no Auth header.""" + getter = GetMazaDataAPI1('foo', 'bar') + request = getter.make_request() + self.assertEqual('https://uccs.landscape.canonical.com/api/1/foo/bar', + request.get_full_url()) + self.assertIs(None, request.headers.get('Authorization')) + + +class TestGetMazaDataAPI2(TestCase): + + def test_make_request(self): + """v2 requests have correct URL and Auth header.""" + getter = GetMazaDataAPI2('foo', 'bar') + request = getter.make_request() + credentials = base64.encodebytes(b'foo:bar').decode('ascii') + expected = 'Basic %s' % credentials + self.assertEqual('GET', request.get_method()) + self.assertEqual('https://uccs.landscape.canonical.com/api/2/foo', + request.get_full_url()) + self.assertEqual(expected, request.headers['Authorization']) + + +class TestGetMazaDataAPI3(TestCase): + + def test_make_request(self): + """v3 requests have correct URL and Auth header.""" + getter = GetMazaDataAPI3('foo', 'bar') + request = getter.make_request() + credentials = base64.encodebytes(b'foo:bar').decode('ascii') + expected = 'Basic %s' % credentials + self.assertEqual('GET', request.get_method()) + self.assertEqual('https://uccs.landscape.canonical.com/api/3/foo', + request.get_full_url()) + self.assertEqual(expected, request.headers['Authorization']) + + +class TestGetMazaDataAPI4(TestCase): + + def test_make_request(self): + """v4 requests have correct URL and Auth header.""" + getter = GetMazaDataAPI4('foo', 'bar') + request = getter.make_request() + credentials = base64.encodebytes(b'foo:bar').decode('ascii') + expected = 'Basic %s' % credentials + self.assertEqual('GET', request.get_method()) + self.assertEqual('https://uccs.landscape.canonical.com/api/4/', + request.get_full_url()) + self.assertEqual(expected, request.headers['Authorization']) + + +class FakeCurl: + """Fake pycurl.Curl for testing PycURLGetter.""" + + def __init__(self, response_status=200, body='', header='', + effective_url='http://example.org/', perform_error=None): + self.options = {} + self.info = {} + self.response_status = response_status + self.response_header = bytes(header, 'UTF-8') + self.response_body = bytes(body, 'UTF-8') + self.effective_url=effective_url + self.perform_error = perform_error + + def setopt(self, key, value): + self.options[key] = value + + def perform(self): + if self.perform_error is not None: + raise self.perform_error + self.info[pycurl.EFFECTIVE_URL] = self.effective_url + self.info[pycurl.HTTP_CODE] = self.response_status + self.options[pycurl.HEADERFUNCTION](self.response_header) + self.options[pycurl.WRITEFUNCTION](self.response_body) + + def getinfo(self, key): + return self.info[key] + + +class TestPycURLGetter(TestCase): + + def test_init(self): + """Init should set the WRITEFUNCTION and HEADERFUNCTION.""" + getter = PycURLGetter(FakeCurl()) + options = getter.curl.options + self.assertIsNot(None, options[pycurl.WRITEFUNCTION]) + self.assertIsNot(None, options[pycurl.HEADERFUNCTION]) + + @staticmethod + def make_request(): + return GetMazaDataAPI3('pete', 'pass').make_request() + + def test_prepare_curl(self): + """prepare_curl sets URL and auth header.""" + curl = FakeCurl() + getter = PycURLGetter(curl) + request = self.make_request() + getter.prepare_curl(request) + self.assertEqual(request.get_full_url(), curl.options[pycurl.URL]) + self.assertEqual(['Authorization: Basic cGV0ZTpwYXNz\n'], + curl.options[pycurl.HTTPHEADER]) + + def test_prepare_curl_ssl_verify(self): + """SSL cert verification can be disabled for testing purposes.""" + curl = FakeCurl() + getter = PycURLGetter(curl) + getter.prepare_curl(self.make_request()) + self.assertTrue(curl.options[pycurl.SSL_VERIFYPEER]) + getter.verify_ssl = False + getter.prepare_curl(self.make_request()) + self.assertFalse(curl.options[pycurl.SSL_VERIFYPEER]) + + def test_handle_reponse(self): + """On success, handle_response returns response body.""" + curl = FakeCurl(body='My body!', header='My header!', + effective_url='http://example.com/') + getter = PycURLGetter(curl) + output = getter.handle_response() + self.assertEqual('My body!', output) + self.assertEqual('My header!', getter.response_header.getvalue().decode('UTF-8')) + self.assertEqual('http://example.com/', + getter.curl.getinfo(pycurl.EFFECTIVE_URL)) + + def test_handle_response_http_error(self): + """On http error, handle_response raises urllib.error.HTTPError.""" + curl = FakeCurl(500, 'My body!', '\nContent-type: fake\n\n') + getter = PycURLGetter(curl) + try: + getter.handle_response() + except urllib.error.HTTPError as e: + httpe = e + else: + self.fail('No error was raised.') + try: + httpe.geturl() + except AttributeError as attre: + self.assertEqual("'HTTPError' object has no attribute 'url'", + str(attre)) + self.assertEqual(500, httpe.getcode()) + self.assertEqual('My body!', httpe.msg.decode('UTF-8')) + self.assertEqual([('Content-type', 'fake')], httpe.hdrs.items()) + + def test_handle_response_connection_error(self): + """On connection error, handle_response raises CouldNotConnect.""" + error = pycurl.error(pycurl.E_COULDNT_CONNECT) + getter = PycURLGetter(FakeCurl(perform_error=error)) + with self.assertRaises(CouldNotConnect): + getter.handle_response() + + def test_cert_verification_failed(self): + """Cert verification error raises CertificateVerificationFailed.""" + error = pycurl.error(pycurl.E_SSL_CACERT) + getter = PycURLGetter(FakeCurl(perform_error=error)) + with self.assertRaises(CertificateVerificationFailed): + getter.handle_response() + + def test_handle_response_pycurl_error(self): + """PycURLGetter allows other errors to propagate.""" + error = pycurl.error(pycurl.E_MULTI_OUT_OF_MEMORY) + getter = PycURLGetter(FakeCurl(perform_error=error)) + with self.assertRaises(pycurl.error): + getter.handle_response() + + def test_get_response_body(self): + """On success, get_response_body returns response body.""" + curl = FakeCurl(body='My body!', header='My header!', + effective_url='http://example.com/') + request = GetMazaDataAPI3('pete', 'pass').make_request() + output = PycURLGetter.get_response_body(request, _curl=curl) + self.assertEqual('My body!', output) -- cgit v1.2.3