aboutsummaryrefslogtreecommitdiff
path: root/rscalib
diff options
context:
space:
mode:
authorMike Gabriel <mike.gabriel@das-netzwerkteam.de>2016-09-14 14:10:54 +0200
committerMike Gabriel <mike.gabriel@das-netzwerkteam.de>2016-09-14 14:12:30 +0200
commitca3bc5e49f2965ed365aa17cf8c8adb55fdaa828 (patch)
treef25471db93024725929ddda0165066307f16e33e /rscalib
parent254c1b80cda0e79083c3107288dd5a17e156a1ad (diff)
downloadremote-logon-config-agent-ca3bc5e49f2965ed365aa17cf8c8adb55fdaa828.tar.gz
remote-logon-config-agent-ca3bc5e49f2965ed365aa17cf8c8adb55fdaa828.tar.bz2
remote-logon-config-agent-ca3bc5e49f2965ed365aa17cf8c8adb55fdaa828.zip
Rename module tccalib to rscalib.
Diffstat (limited to 'rscalib')
-rw-r--r--rscalib/__init__.py224
-rw-r--r--rscalib/tests/__init__.py239
2 files changed, 463 insertions, 0 deletions
diff --git a/rscalib/__init__.py b/rscalib/__init__.py
new file mode 100644
index 0000000..9c61844
--- /dev/null
+++ b/rscalib/__init__.py
@@ -0,0 +1,224 @@
+# Copyright 2012 Canonical Ltd. This software is licensed under the GNU
+# General Public License version 3 (see the file LICENSE).
+
+__metaclass__ = type
+
+
+import errno
+from http.client import parse_headers
+import socket
+from io import BytesIO, StringIO
+from urllib.parse import quote
+import urllib.request, urllib.error
+import base64
+
+import pycurl
+
+
+class UserError(Exception):
+ """An error message that should be presented to the user."""
+
+ def __init__(self, msg=None):
+ if msg is None:
+ msg = self.__doc__
+ super(UserError, self).__init__(msg)
+
+
+class Unauthorized(UserError):
+ """Invalid username or password"""
+
+ status = 2
+
+
+class CouldNotConnect(UserError):
+ """Could not connect"""
+
+ status = 3
+
+
+class CertificateVerificationFailed(UserError):
+ """Certificate verification failed"""
+
+ status = 4
+
+
+class URLLibGetter:
+ """Get data from URLs using URLib."""
+
+ @staticmethod
+ def get_response_body(request, verify_ssl):
+ """Return the body of the response to the supplied request.
+
+ :param request: A urllib2.Request
+ :param verify_ssl: Unused
+ :raises CouldNotConnect: if there is a connection error.
+ """
+ try:
+ return urllib.request.urlopen(request).read()
+ except urllib.error.URLError as e:
+ if not isinstance(e.args[0], socket.error):
+ raise
+ if e.args[0].errno == errno.ECONNREFUSED:
+ raise CouldNotConnect
+ raise
+
+
+class PycURLGetter:
+ """Get data from URLs using PycURL."""
+
+ def __init__(self, _curl=None):
+ if _curl is None:
+ _curl = pycurl.Curl()
+ self.curl = _curl
+ self.result = BytesIO()
+ self.response_header = BytesIO()
+ self.curl.setopt(pycurl.HEADERFUNCTION, self.response_header.write)
+ self.curl.setopt(pycurl.WRITEFUNCTION, self.result.write)
+ self.verify_ssl = True
+
+ def prepare_curl(self, request):
+ """Prepare the curl object for the supplied request.
+
+ :param request: a urllib2.Request instance.
+ """
+ self.curl.setopt(pycurl.URL, request.get_full_url())
+ request_headers = ['%s: %s' % item for
+ item in list(request.headers.items())]
+ self.curl.setopt(pycurl.HTTPHEADER, request_headers)
+ self.curl.setopt(pycurl.SSL_VERIFYPEER, self.verify_ssl)
+
+ @classmethod
+ def get_response_body(cls, request, verify_ssl=True, _curl=None):
+ """Return the body of the response to the supplied request.
+
+ :param request: A urllib2.Request instance.
+ :param verify_ssl: If true, verify SSL certificates.
+ :param _curl: The pycurl.Curl object to use (for testing).
+ :raises CouldNotConnect: if there is a connection error.
+ :raises CertificateVerificationFailed: if the SSL certificate could
+ not be verified.
+ :raises HTTPError: if the response status is not 200.
+ """
+ instance = cls(_curl)
+ instance.verify_ssl = verify_ssl
+ instance.prepare_curl(request)
+ return instance.handle_response()
+
+ def handle_response(self):
+ """Perform the curl operation and handle the response.
+
+ :return: The body of the response on success.
+ :raises CouldNotConnect: if there is a connection error.
+ :raises CertificateVerificationFailed: if the SSL certificate could
+ not be verified.
+ :raises HTTPError: if the response status is not 200.
+ """
+ try:
+ self.curl.perform()
+ except pycurl.error as e:
+ if e.args[0] in (pycurl.E_COULDNT_CONNECT,
+ pycurl.E_COULDNT_RESOLVE_HOST):
+ raise CouldNotConnect
+ elif e.args[0] == pycurl.E_SSL_CACERT:
+ raise CertificateVerificationFailed
+ else:
+ raise
+ status = self.curl.getinfo(pycurl.HTTP_CODE)
+ if status == 200:
+ return self.result.getvalue().decode('utf-8')
+ else:
+ lines = self.response_header.getvalue().decode('utf-8').splitlines(True)
+ header_ = ''.join(lines[1:])
+ headers = parse_headers(BytesIO(header_.encode('ascii')))
+ raise urllib.error.HTTPError(
+ self.curl.getinfo(pycurl.EFFECTIVE_URL), status,
+ self.result.getvalue(), headers, None)
+
+
+class GetMazaData:
+ """Base class for retrieving data from MAZA server."""
+
+ @classmethod
+ def run(cls, username, password, server_root=None, verify_ssl=True):
+ """Return the requested data.
+
+ :param username: The username of the user.
+ :param password: The user's password.
+ :param server_root: The root URL to make queries to.
+ :param verify_ssl: If true, verify SSL certificates.
+ """
+ return cls(username, password, server_root).get_data(verify_ssl)
+
+ def __init__(self, username, password, server_root=None):
+ self.username = username
+ self.password = password
+ if server_root is not None:
+ self.server_root = server_root
+ else:
+ self.server_root = 'https://uccs.landscape.canonical.com'
+ self.getter = PycURLGetter
+
+ def get_api_url(self):
+ """Return the URL for an API version."""
+ return '%s/api/%s/' % (self.server_root, self.api_version)
+
+ def get_data(self, verify_ssl=True):
+ """Return the data for this version of the API."""
+ try:
+ return self.getter.get_response_body(self.make_request(),
+ verify_ssl)
+ except urllib.error.HTTPError as e:
+ if e.getcode() == 401:
+ raise Unauthorized
+ else:
+ raise
+
+
+class GetMazaDataAPI1(GetMazaData):
+ """Get the maza data for a given email and password via API v1."""
+
+ api_version = 1
+
+ def make_request(self):
+ path = '%s/%s' % (quote(self.username), quote(self.password))
+ return urllib.request.Request(self.get_api_url() + path)
+
+
+class GetMazaDataAPI4(GetMazaData):
+ """Get the maza data for a given email and password via API v4."""
+ api_version = 4
+
+ def get_url(self):
+ return self.get_api_url()
+
+ def make_request(self):
+ request = urllib.request.Request(self.get_url())
+ credentials = '%s:%s' % (self.username, self.password)
+ credentials64 = base64.encodebytes(credentials.encode('ascii'))
+ authorization = 'Basic %s' % credentials64.decode('ascii')
+ request.add_header('Authorization', authorization)
+ return request
+
+
+class GetMazaDataAPI3(GetMazaDataAPI4):
+ """Get the maza data for a given email and password via API v3."""
+
+ api_version = 3
+
+ def get_url(self):
+ return self.get_api_url() + quote(self.username)
+
+
+class GetMazaDataAPI2(GetMazaDataAPI3):
+ """Get the maza data for a given email and password via API v2."""
+
+ api_version = 2
+
+
+api_versions = {
+ '1': GetMazaDataAPI1,
+ '2': GetMazaDataAPI2,
+ '3': GetMazaDataAPI3,
+ '4': GetMazaDataAPI4,
+ 'default': GetMazaDataAPI4,
+}
diff --git a/rscalib/tests/__init__.py b/rscalib/tests/__init__.py
new file mode 100644
index 0000000..c3a8afa
--- /dev/null
+++ b/rscalib/tests/__init__.py
@@ -0,0 +1,239 @@
+# Copyright 2012 Canonical Ltd. This software is licensed under the GNU
+# General Public License version 3 (see the file LICENSE).
+
+
+__metaclass__ = type
+
+
+import urllib.request, urllib.error, urllib.parse
+from unittest import TestCase
+import base64
+
+import pycurl
+from io import BytesIO
+
+from rscalib import (
+ CertificateVerificationFailed,
+ CouldNotConnect,
+ GetMazaData,
+ GetMazaDataAPI1,
+ GetMazaDataAPI2,
+ GetMazaDataAPI3,
+ GetMazaDataAPI4,
+ PycURLGetter,
+ Unauthorized,
+ )
+
+
+class FakeGetResponseBody:
+ """Fake to raise a 401 when get_response_body is called."""
+
+ def get_response_body(self, request, verify_ssl):
+ raise urllib.error.HTTPError(None, 401, None, None, None)
+
+
+class GetMazaDataFaked(GetMazaData):
+ """Subclass of GetMazaData for testing."""
+
+ api_version = 34
+
+ def __init__(self, username=None, server_root=None):
+ super(GetMazaDataFaked, self).__init__(username, username, server_root)
+
+ def make_request(self):
+ pass
+
+
+class TestGetMazaData(TestCase):
+
+ def test_get_data_unauthorized(self):
+ """If a 401 is encountered, Unauthorized is raised."""
+ api_client = GetMazaDataFaked()
+ api_client.getter = FakeGetResponseBody()
+ with self.assertRaises(Unauthorized):
+ api_client.get_data()
+
+ def test_get_api_url_default(self):
+ """Default URL is as expected."""
+ url = GetMazaDataFaked().get_api_url()
+ self.assertEqual('https://uccs.landscape.canonical.com/api/34/',
+ url)
+
+ def test_get_api_url_explicit(self):
+ """URL is as expected with specified server_root."""
+ url = GetMazaDataFaked(server_root='http://foo').get_api_url()
+ self.assertEqual('http://foo/api/34/', url)
+
+
+class TestGetMazaDataAPI1(TestCase):
+
+ def test_make_request(self):
+ """v1 requests have correct URL and no Auth header."""
+ getter = GetMazaDataAPI1('foo', 'bar')
+ request = getter.make_request()
+ self.assertEqual('https://uccs.landscape.canonical.com/api/1/foo/bar',
+ request.get_full_url())
+ self.assertIs(None, request.headers.get('Authorization'))
+
+
+class TestGetMazaDataAPI2(TestCase):
+
+ def test_make_request(self):
+ """v2 requests have correct URL and Auth header."""
+ getter = GetMazaDataAPI2('foo', 'bar')
+ request = getter.make_request()
+ credentials = base64.encodebytes(b'foo:bar').decode('ascii')
+ expected = 'Basic %s' % credentials
+ self.assertEqual('GET', request.get_method())
+ self.assertEqual('https://uccs.landscape.canonical.com/api/2/foo',
+ request.get_full_url())
+ self.assertEqual(expected, request.headers['Authorization'])
+
+
+class TestGetMazaDataAPI3(TestCase):
+
+ def test_make_request(self):
+ """v3 requests have correct URL and Auth header."""
+ getter = GetMazaDataAPI3('foo', 'bar')
+ request = getter.make_request()
+ credentials = base64.encodebytes(b'foo:bar').decode('ascii')
+ expected = 'Basic %s' % credentials
+ self.assertEqual('GET', request.get_method())
+ self.assertEqual('https://uccs.landscape.canonical.com/api/3/foo',
+ request.get_full_url())
+ self.assertEqual(expected, request.headers['Authorization'])
+
+
+class TestGetMazaDataAPI4(TestCase):
+
+ def test_make_request(self):
+ """v4 requests have correct URL and Auth header."""
+ getter = GetMazaDataAPI4('foo', 'bar')
+ request = getter.make_request()
+ credentials = base64.encodebytes(b'foo:bar').decode('ascii')
+ expected = 'Basic %s' % credentials
+ self.assertEqual('GET', request.get_method())
+ self.assertEqual('https://uccs.landscape.canonical.com/api/4/',
+ request.get_full_url())
+ self.assertEqual(expected, request.headers['Authorization'])
+
+
+class FakeCurl:
+ """Fake pycurl.Curl for testing PycURLGetter."""
+
+ def __init__(self, response_status=200, body='', header='',
+ effective_url='http://example.org/', perform_error=None):
+ self.options = {}
+ self.info = {}
+ self.response_status = response_status
+ self.response_header = bytes(header, 'UTF-8')
+ self.response_body = bytes(body, 'UTF-8')
+ self.effective_url=effective_url
+ self.perform_error = perform_error
+
+ def setopt(self, key, value):
+ self.options[key] = value
+
+ def perform(self):
+ if self.perform_error is not None:
+ raise self.perform_error
+ self.info[pycurl.EFFECTIVE_URL] = self.effective_url
+ self.info[pycurl.HTTP_CODE] = self.response_status
+ self.options[pycurl.HEADERFUNCTION](self.response_header)
+ self.options[pycurl.WRITEFUNCTION](self.response_body)
+
+ def getinfo(self, key):
+ return self.info[key]
+
+
+class TestPycURLGetter(TestCase):
+
+ def test_init(self):
+ """Init should set the WRITEFUNCTION and HEADERFUNCTION."""
+ getter = PycURLGetter(FakeCurl())
+ options = getter.curl.options
+ self.assertIsNot(None, options[pycurl.WRITEFUNCTION])
+ self.assertIsNot(None, options[pycurl.HEADERFUNCTION])
+
+ @staticmethod
+ def make_request():
+ return GetMazaDataAPI3('pete', 'pass').make_request()
+
+ def test_prepare_curl(self):
+ """prepare_curl sets URL and auth header."""
+ curl = FakeCurl()
+ getter = PycURLGetter(curl)
+ request = self.make_request()
+ getter.prepare_curl(request)
+ self.assertEqual(request.get_full_url(), curl.options[pycurl.URL])
+ self.assertEqual(['Authorization: Basic cGV0ZTpwYXNz\n'],
+ curl.options[pycurl.HTTPHEADER])
+
+ def test_prepare_curl_ssl_verify(self):
+ """SSL cert verification can be disabled for testing purposes."""
+ curl = FakeCurl()
+ getter = PycURLGetter(curl)
+ getter.prepare_curl(self.make_request())
+ self.assertTrue(curl.options[pycurl.SSL_VERIFYPEER])
+ getter.verify_ssl = False
+ getter.prepare_curl(self.make_request())
+ self.assertFalse(curl.options[pycurl.SSL_VERIFYPEER])
+
+ def test_handle_reponse(self):
+ """On success, handle_response returns response body."""
+ curl = FakeCurl(body='My body!', header='My header!',
+ effective_url='http://example.com/')
+ getter = PycURLGetter(curl)
+ output = getter.handle_response()
+ self.assertEqual('My body!', output)
+ self.assertEqual('My header!', getter.response_header.getvalue().decode('UTF-8'))
+ self.assertEqual('http://example.com/',
+ getter.curl.getinfo(pycurl.EFFECTIVE_URL))
+
+ def test_handle_response_http_error(self):
+ """On http error, handle_response raises urllib.error.HTTPError."""
+ curl = FakeCurl(500, 'My body!', '\nContent-type: fake\n\n')
+ getter = PycURLGetter(curl)
+ try:
+ getter.handle_response()
+ except urllib.error.HTTPError as e:
+ httpe = e
+ else:
+ self.fail('No error was raised.')
+ try:
+ httpe.geturl()
+ except AttributeError as attre:
+ self.assertEqual("'HTTPError' object has no attribute 'url'",
+ str(attre))
+ self.assertEqual(500, httpe.getcode())
+ self.assertEqual('My body!', httpe.msg.decode('UTF-8'))
+ self.assertEqual([('Content-type', 'fake')], httpe.hdrs.items())
+
+ def test_handle_response_connection_error(self):
+ """On connection error, handle_response raises CouldNotConnect."""
+ error = pycurl.error(pycurl.E_COULDNT_CONNECT)
+ getter = PycURLGetter(FakeCurl(perform_error=error))
+ with self.assertRaises(CouldNotConnect):
+ getter.handle_response()
+
+ def test_cert_verification_failed(self):
+ """Cert verification error raises CertificateVerificationFailed."""
+ error = pycurl.error(pycurl.E_SSL_CACERT)
+ getter = PycURLGetter(FakeCurl(perform_error=error))
+ with self.assertRaises(CertificateVerificationFailed):
+ getter.handle_response()
+
+ def test_handle_response_pycurl_error(self):
+ """PycURLGetter allows other errors to propagate."""
+ error = pycurl.error(pycurl.E_MULTI_OUT_OF_MEMORY)
+ getter = PycURLGetter(FakeCurl(perform_error=error))
+ with self.assertRaises(pycurl.error):
+ getter.handle_response()
+
+ def test_get_response_body(self):
+ """On success, get_response_body returns response body."""
+ curl = FakeCurl(body='My body!', header='My header!',
+ effective_url='http://example.com/')
+ request = GetMazaDataAPI3('pete', 'pass').make_request()
+ output = PycURLGetter.get_response_body(request, _curl=curl)
+ self.assertEqual('My body!', output)