aboutsummaryrefslogtreecommitdiff
path: root/tccalib
diff options
context:
space:
mode:
authorMike Gabriel <mike.gabriel@das-netzwerkteam.de>2016-09-14 14:10:54 +0200
committerMike Gabriel <mike.gabriel@das-netzwerkteam.de>2016-09-14 14:12:30 +0200
commitca3bc5e49f2965ed365aa17cf8c8adb55fdaa828 (patch)
treef25471db93024725929ddda0165066307f16e33e /tccalib
parent254c1b80cda0e79083c3107288dd5a17e156a1ad (diff)
downloadremote-logon-config-agent-ca3bc5e49f2965ed365aa17cf8c8adb55fdaa828.tar.gz
remote-logon-config-agent-ca3bc5e49f2965ed365aa17cf8c8adb55fdaa828.tar.bz2
remote-logon-config-agent-ca3bc5e49f2965ed365aa17cf8c8adb55fdaa828.zip
Rename module tccalib to rscalib.
Diffstat (limited to 'tccalib')
-rw-r--r--tccalib/__init__.py224
-rw-r--r--tccalib/tests/__init__.py239
2 files changed, 0 insertions, 463 deletions
diff --git a/tccalib/__init__.py b/tccalib/__init__.py
deleted file mode 100644
index 9c61844..0000000
--- a/tccalib/__init__.py
+++ /dev/null
@@ -1,224 +0,0 @@
-# Copyright 2012 Canonical Ltd. This software is licensed under the GNU
-# General Public License version 3 (see the file LICENSE).
-
-__metaclass__ = type
-
-
-import errno
-from http.client import parse_headers
-import socket
-from io import BytesIO, StringIO
-from urllib.parse import quote
-import urllib.request, urllib.error
-import base64
-
-import pycurl
-
-
-class UserError(Exception):
- """An error message that should be presented to the user."""
-
- def __init__(self, msg=None):
- if msg is None:
- msg = self.__doc__
- super(UserError, self).__init__(msg)
-
-
-class Unauthorized(UserError):
- """Invalid username or password"""
-
- status = 2
-
-
-class CouldNotConnect(UserError):
- """Could not connect"""
-
- status = 3
-
-
-class CertificateVerificationFailed(UserError):
- """Certificate verification failed"""
-
- status = 4
-
-
-class URLLibGetter:
- """Get data from URLs using URLib."""
-
- @staticmethod
- def get_response_body(request, verify_ssl):
- """Return the body of the response to the supplied request.
-
- :param request: A urllib2.Request
- :param verify_ssl: Unused
- :raises CouldNotConnect: if there is a connection error.
- """
- try:
- return urllib.request.urlopen(request).read()
- except urllib.error.URLError as e:
- if not isinstance(e.args[0], socket.error):
- raise
- if e.args[0].errno == errno.ECONNREFUSED:
- raise CouldNotConnect
- raise
-
-
-class PycURLGetter:
- """Get data from URLs using PycURL."""
-
- def __init__(self, _curl=None):
- if _curl is None:
- _curl = pycurl.Curl()
- self.curl = _curl
- self.result = BytesIO()
- self.response_header = BytesIO()
- self.curl.setopt(pycurl.HEADERFUNCTION, self.response_header.write)
- self.curl.setopt(pycurl.WRITEFUNCTION, self.result.write)
- self.verify_ssl = True
-
- def prepare_curl(self, request):
- """Prepare the curl object for the supplied request.
-
- :param request: a urllib2.Request instance.
- """
- self.curl.setopt(pycurl.URL, request.get_full_url())
- request_headers = ['%s: %s' % item for
- item in list(request.headers.items())]
- self.curl.setopt(pycurl.HTTPHEADER, request_headers)
- self.curl.setopt(pycurl.SSL_VERIFYPEER, self.verify_ssl)
-
- @classmethod
- def get_response_body(cls, request, verify_ssl=True, _curl=None):
- """Return the body of the response to the supplied request.
-
- :param request: A urllib2.Request instance.
- :param verify_ssl: If true, verify SSL certificates.
- :param _curl: The pycurl.Curl object to use (for testing).
- :raises CouldNotConnect: if there is a connection error.
- :raises CertificateVerificationFailed: if the SSL certificate could
- not be verified.
- :raises HTTPError: if the response status is not 200.
- """
- instance = cls(_curl)
- instance.verify_ssl = verify_ssl
- instance.prepare_curl(request)
- return instance.handle_response()
-
- def handle_response(self):
- """Perform the curl operation and handle the response.
-
- :return: The body of the response on success.
- :raises CouldNotConnect: if there is a connection error.
- :raises CertificateVerificationFailed: if the SSL certificate could
- not be verified.
- :raises HTTPError: if the response status is not 200.
- """
- try:
- self.curl.perform()
- except pycurl.error as e:
- if e.args[0] in (pycurl.E_COULDNT_CONNECT,
- pycurl.E_COULDNT_RESOLVE_HOST):
- raise CouldNotConnect
- elif e.args[0] == pycurl.E_SSL_CACERT:
- raise CertificateVerificationFailed
- else:
- raise
- status = self.curl.getinfo(pycurl.HTTP_CODE)
- if status == 200:
- return self.result.getvalue().decode('utf-8')
- else:
- lines = self.response_header.getvalue().decode('utf-8').splitlines(True)
- header_ = ''.join(lines[1:])
- headers = parse_headers(BytesIO(header_.encode('ascii')))
- raise urllib.error.HTTPError(
- self.curl.getinfo(pycurl.EFFECTIVE_URL), status,
- self.result.getvalue(), headers, None)
-
-
-class GetMazaData:
- """Base class for retrieving data from MAZA server."""
-
- @classmethod
- def run(cls, username, password, server_root=None, verify_ssl=True):
- """Return the requested data.
-
- :param username: The username of the user.
- :param password: The user's password.
- :param server_root: The root URL to make queries to.
- :param verify_ssl: If true, verify SSL certificates.
- """
- return cls(username, password, server_root).get_data(verify_ssl)
-
- def __init__(self, username, password, server_root=None):
- self.username = username
- self.password = password
- if server_root is not None:
- self.server_root = server_root
- else:
- self.server_root = 'https://uccs.landscape.canonical.com'
- self.getter = PycURLGetter
-
- def get_api_url(self):
- """Return the URL for an API version."""
- return '%s/api/%s/' % (self.server_root, self.api_version)
-
- def get_data(self, verify_ssl=True):
- """Return the data for this version of the API."""
- try:
- return self.getter.get_response_body(self.make_request(),
- verify_ssl)
- except urllib.error.HTTPError as e:
- if e.getcode() == 401:
- raise Unauthorized
- else:
- raise
-
-
-class GetMazaDataAPI1(GetMazaData):
- """Get the maza data for a given email and password via API v1."""
-
- api_version = 1
-
- def make_request(self):
- path = '%s/%s' % (quote(self.username), quote(self.password))
- return urllib.request.Request(self.get_api_url() + path)
-
-
-class GetMazaDataAPI4(GetMazaData):
- """Get the maza data for a given email and password via API v4."""
- api_version = 4
-
- def get_url(self):
- return self.get_api_url()
-
- def make_request(self):
- request = urllib.request.Request(self.get_url())
- credentials = '%s:%s' % (self.username, self.password)
- credentials64 = base64.encodebytes(credentials.encode('ascii'))
- authorization = 'Basic %s' % credentials64.decode('ascii')
- request.add_header('Authorization', authorization)
- return request
-
-
-class GetMazaDataAPI3(GetMazaDataAPI4):
- """Get the maza data for a given email and password via API v3."""
-
- api_version = 3
-
- def get_url(self):
- return self.get_api_url() + quote(self.username)
-
-
-class GetMazaDataAPI2(GetMazaDataAPI3):
- """Get the maza data for a given email and password via API v2."""
-
- api_version = 2
-
-
-api_versions = {
- '1': GetMazaDataAPI1,
- '2': GetMazaDataAPI2,
- '3': GetMazaDataAPI3,
- '4': GetMazaDataAPI4,
- 'default': GetMazaDataAPI4,
-}
diff --git a/tccalib/tests/__init__.py b/tccalib/tests/__init__.py
deleted file mode 100644
index 6323600..0000000
--- a/tccalib/tests/__init__.py
+++ /dev/null
@@ -1,239 +0,0 @@
-# Copyright 2012 Canonical Ltd. This software is licensed under the GNU
-# General Public License version 3 (see the file LICENSE).
-
-
-__metaclass__ = type
-
-
-import urllib.request, urllib.error, urllib.parse
-from unittest import TestCase
-import base64
-
-import pycurl
-from io import BytesIO
-
-from tccalib import (
- CertificateVerificationFailed,
- CouldNotConnect,
- GetMazaData,
- GetMazaDataAPI1,
- GetMazaDataAPI2,
- GetMazaDataAPI3,
- GetMazaDataAPI4,
- PycURLGetter,
- Unauthorized,
- )
-
-
-class FakeGetResponseBody:
- """Fake to raise a 401 when get_response_body is called."""
-
- def get_response_body(self, request, verify_ssl):
- raise urllib.error.HTTPError(None, 401, None, None, None)
-
-
-class GetMazaDataFaked(GetMazaData):
- """Subclass of GetMazaData for testing."""
-
- api_version = 34
-
- def __init__(self, username=None, server_root=None):
- super(GetMazaDataFaked, self).__init__(username, username, server_root)
-
- def make_request(self):
- pass
-
-
-class TestGetMazaData(TestCase):
-
- def test_get_data_unauthorized(self):
- """If a 401 is encountered, Unauthorized is raised."""
- api_client = GetMazaDataFaked()
- api_client.getter = FakeGetResponseBody()
- with self.assertRaises(Unauthorized):
- api_client.get_data()
-
- def test_get_api_url_default(self):
- """Default URL is as expected."""
- url = GetMazaDataFaked().get_api_url()
- self.assertEqual('https://uccs.landscape.canonical.com/api/34/',
- url)
-
- def test_get_api_url_explicit(self):
- """URL is as expected with specified server_root."""
- url = GetMazaDataFaked(server_root='http://foo').get_api_url()
- self.assertEqual('http://foo/api/34/', url)
-
-
-class TestGetMazaDataAPI1(TestCase):
-
- def test_make_request(self):
- """v1 requests have correct URL and no Auth header."""
- getter = GetMazaDataAPI1('foo', 'bar')
- request = getter.make_request()
- self.assertEqual('https://uccs.landscape.canonical.com/api/1/foo/bar',
- request.get_full_url())
- self.assertIs(None, request.headers.get('Authorization'))
-
-
-class TestGetMazaDataAPI2(TestCase):
-
- def test_make_request(self):
- """v2 requests have correct URL and Auth header."""
- getter = GetMazaDataAPI2('foo', 'bar')
- request = getter.make_request()
- credentials = base64.encodebytes(b'foo:bar').decode('ascii')
- expected = 'Basic %s' % credentials
- self.assertEqual('GET', request.get_method())
- self.assertEqual('https://uccs.landscape.canonical.com/api/2/foo',
- request.get_full_url())
- self.assertEqual(expected, request.headers['Authorization'])
-
-
-class TestGetMazaDataAPI3(TestCase):
-
- def test_make_request(self):
- """v3 requests have correct URL and Auth header."""
- getter = GetMazaDataAPI3('foo', 'bar')
- request = getter.make_request()
- credentials = base64.encodebytes(b'foo:bar').decode('ascii')
- expected = 'Basic %s' % credentials
- self.assertEqual('GET', request.get_method())
- self.assertEqual('https://uccs.landscape.canonical.com/api/3/foo',
- request.get_full_url())
- self.assertEqual(expected, request.headers['Authorization'])
-
-
-class TestGetMazaDataAPI4(TestCase):
-
- def test_make_request(self):
- """v4 requests have correct URL and Auth header."""
- getter = GetMazaDataAPI4('foo', 'bar')
- request = getter.make_request()
- credentials = base64.encodebytes(b'foo:bar').decode('ascii')
- expected = 'Basic %s' % credentials
- self.assertEqual('GET', request.get_method())
- self.assertEqual('https://uccs.landscape.canonical.com/api/4/',
- request.get_full_url())
- self.assertEqual(expected, request.headers['Authorization'])
-
-
-class FakeCurl:
- """Fake pycurl.Curl for testing PycURLGetter."""
-
- def __init__(self, response_status=200, body='', header='',
- effective_url='http://example.org/', perform_error=None):
- self.options = {}
- self.info = {}
- self.response_status = response_status
- self.response_header = bytes(header, 'UTF-8')
- self.response_body = bytes(body, 'UTF-8')
- self.effective_url=effective_url
- self.perform_error = perform_error
-
- def setopt(self, key, value):
- self.options[key] = value
-
- def perform(self):
- if self.perform_error is not None:
- raise self.perform_error
- self.info[pycurl.EFFECTIVE_URL] = self.effective_url
- self.info[pycurl.HTTP_CODE] = self.response_status
- self.options[pycurl.HEADERFUNCTION](self.response_header)
- self.options[pycurl.WRITEFUNCTION](self.response_body)
-
- def getinfo(self, key):
- return self.info[key]
-
-
-class TestPycURLGetter(TestCase):
-
- def test_init(self):
- """Init should set the WRITEFUNCTION and HEADERFUNCTION."""
- getter = PycURLGetter(FakeCurl())
- options = getter.curl.options
- self.assertIsNot(None, options[pycurl.WRITEFUNCTION])
- self.assertIsNot(None, options[pycurl.HEADERFUNCTION])
-
- @staticmethod
- def make_request():
- return GetMazaDataAPI3('pete', 'pass').make_request()
-
- def test_prepare_curl(self):
- """prepare_curl sets URL and auth header."""
- curl = FakeCurl()
- getter = PycURLGetter(curl)
- request = self.make_request()
- getter.prepare_curl(request)
- self.assertEqual(request.get_full_url(), curl.options[pycurl.URL])
- self.assertEqual(['Authorization: Basic cGV0ZTpwYXNz\n'],
- curl.options[pycurl.HTTPHEADER])
-
- def test_prepare_curl_ssl_verify(self):
- """SSL cert verification can be disabled for testing purposes."""
- curl = FakeCurl()
- getter = PycURLGetter(curl)
- getter.prepare_curl(self.make_request())
- self.assertTrue(curl.options[pycurl.SSL_VERIFYPEER])
- getter.verify_ssl = False
- getter.prepare_curl(self.make_request())
- self.assertFalse(curl.options[pycurl.SSL_VERIFYPEER])
-
- def test_handle_reponse(self):
- """On success, handle_response returns response body."""
- curl = FakeCurl(body='My body!', header='My header!',
- effective_url='http://example.com/')
- getter = PycURLGetter(curl)
- output = getter.handle_response()
- self.assertEqual('My body!', output)
- self.assertEqual('My header!', getter.response_header.getvalue().decode('UTF-8'))
- self.assertEqual('http://example.com/',
- getter.curl.getinfo(pycurl.EFFECTIVE_URL))
-
- def test_handle_response_http_error(self):
- """On http error, handle_response raises urllib.error.HTTPError."""
- curl = FakeCurl(500, 'My body!', '\nContent-type: fake\n\n')
- getter = PycURLGetter(curl)
- try:
- getter.handle_response()
- except urllib.error.HTTPError as e:
- httpe = e
- else:
- self.fail('No error was raised.')
- try:
- httpe.geturl()
- except AttributeError as attre:
- self.assertEqual("'HTTPError' object has no attribute 'url'",
- str(attre))
- self.assertEqual(500, httpe.getcode())
- self.assertEqual('My body!', httpe.msg.decode('UTF-8'))
- self.assertEqual([('Content-type', 'fake')], httpe.hdrs.items())
-
- def test_handle_response_connection_error(self):
- """On connection error, handle_response raises CouldNotConnect."""
- error = pycurl.error(pycurl.E_COULDNT_CONNECT)
- getter = PycURLGetter(FakeCurl(perform_error=error))
- with self.assertRaises(CouldNotConnect):
- getter.handle_response()
-
- def test_cert_verification_failed(self):
- """Cert verification error raises CertificateVerificationFailed."""
- error = pycurl.error(pycurl.E_SSL_CACERT)
- getter = PycURLGetter(FakeCurl(perform_error=error))
- with self.assertRaises(CertificateVerificationFailed):
- getter.handle_response()
-
- def test_handle_response_pycurl_error(self):
- """PycURLGetter allows other errors to propagate."""
- error = pycurl.error(pycurl.E_MULTI_OUT_OF_MEMORY)
- getter = PycURLGetter(FakeCurl(perform_error=error))
- with self.assertRaises(pycurl.error):
- getter.handle_response()
-
- def test_get_response_body(self):
- """On success, get_response_body returns response body."""
- curl = FakeCurl(body='My body!', header='My header!',
- effective_url='http://example.com/')
- request = GetMazaDataAPI3('pete', 'pass').make_request()
- output = PycURLGetter.get_response_body(request, _curl=curl)
- self.assertEqual('My body!', output)