1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
|
# Copyright 2012 Canonical Ltd. This software is licensed under the GNU
# General Public License version 3 (see the file LICENSE).
__metaclass__ = type
import errno
from http.client import parse_headers
import socket
from io import BytesIO, StringIO
from urllib.parse import quote
import urllib.request, urllib.error
import base64
import pycurl
class UserError(Exception):
"""An error message that should be presented to the user."""
def __init__(self, msg=None):
if msg is None:
msg = self.__doc__
super(UserError, self).__init__(msg)
class Unauthorized(UserError):
"""Invalid username or password"""
status = 2
class CouldNotConnect(UserError):
"""Could not connect"""
status = 3
class CertificateVerificationFailed(UserError):
"""Certificate verification failed"""
status = 4
class URLLibGetter:
"""Get data from URLs using URLib."""
@staticmethod
def get_response_body(request, verify_ssl):
"""Return the body of the response to the supplied request.
:param request: A urllib2.Request
:param verify_ssl: Unused
:raises CouldNotConnect: if there is a connection error.
"""
try:
return urllib.request.urlopen(request).read()
except urllib.error.URLError as e:
if not isinstance(e.args[0], socket.error):
raise
if e.args[0].errno == errno.ECONNREFUSED:
raise CouldNotConnect
raise
class PycURLGetter:
"""Get data from URLs using PycURL."""
def __init__(self, _curl=None):
if _curl is None:
_curl = pycurl.Curl()
self.curl = _curl
self.result = BytesIO()
self.response_header = BytesIO()
self.curl.setopt(pycurl.HEADERFUNCTION, self.response_header.write)
self.curl.setopt(pycurl.WRITEFUNCTION, self.result.write)
self.verify_ssl = True
def prepare_curl(self, request):
"""Prepare the curl object for the supplied request.
:param request: a urllib2.Request instance.
"""
self.curl.setopt(pycurl.URL, request.get_full_url())
request_headers = ['%s: %s' % item for
item in list(request.headers.items())]
self.curl.setopt(pycurl.HTTPHEADER, request_headers)
self.curl.setopt(pycurl.SSL_VERIFYPEER, self.verify_ssl)
@classmethod
def get_response_body(cls, request, verify_ssl=True, _curl=None):
"""Return the body of the response to the supplied request.
:param request: A urllib2.Request instance.
:param verify_ssl: If true, verify SSL certificates.
:param _curl: The pycurl.Curl object to use (for testing).
:raises CouldNotConnect: if there is a connection error.
:raises CertificateVerificationFailed: if the SSL certificate could
not be verified.
:raises HTTPError: if the response status is not 200.
"""
instance = cls(_curl)
instance.verify_ssl = verify_ssl
instance.prepare_curl(request)
return instance.handle_response()
def handle_response(self):
"""Perform the curl operation and handle the response.
:return: The body of the response on success.
:raises CouldNotConnect: if there is a connection error.
:raises CertificateVerificationFailed: if the SSL certificate could
not be verified.
:raises HTTPError: if the response status is not 200.
"""
try:
self.curl.perform()
except pycurl.error as e:
if e.args[0] in (pycurl.E_COULDNT_CONNECT,
pycurl.E_COULDNT_RESOLVE_HOST):
raise CouldNotConnect
elif e.args[0] == pycurl.E_SSL_CACERT:
raise CertificateVerificationFailed
else:
raise
status = self.curl.getinfo(pycurl.HTTP_CODE)
if status == 200:
return self.result.getvalue().decode('utf-8')
else:
lines = self.response_header.getvalue().decode('utf-8').splitlines(True)
header_ = ''.join(lines[1:])
headers = parse_headers(BytesIO(header_.encode('ascii')))
raise urllib.error.HTTPError(
self.curl.getinfo(pycurl.EFFECTIVE_URL),
code=status,
msg=self.result.getvalue(),
hdrs=headers,
fp=StringIO()
)
class GetMazaData:
"""Base class for retrieving data from MAZA server."""
@classmethod
def run(cls, username, password, server_root=None, verify_ssl=True):
"""Return the requested data.
:param username: The username of the user.
:param password: The user's password.
:param server_root: The root URL to make queries to.
:param verify_ssl: If true, verify SSL certificates.
"""
return cls(username, password, server_root).get_data(verify_ssl)
def __init__(self, username, password, server_root=None):
self.username = username
self.password = password
if server_root is not None:
self.server_root = server_root
else:
self.server_root = 'https://uccs.landscape.canonical.com'
self.getter = PycURLGetter
def get_api_url(self):
"""Return the URL for an API version."""
return '%s/api/%s/' % (self.server_root, self.api_version)
def get_data(self, verify_ssl=True):
"""Return the data for this version of the API."""
try:
return self.getter.get_response_body(self.make_request(),
verify_ssl)
except urllib.error.HTTPError as e:
if e.getcode() == 401:
raise Unauthorized
else:
raise
class GetMazaDataAPI1(GetMazaData):
"""Get the maza data for a given email and password via API v1."""
api_version = 1
def make_request(self):
path = '%s/%s' % (quote(self.username), quote(self.password))
return urllib.request.Request(self.get_api_url() + path)
class GetMazaDataAPI4(GetMazaData):
"""Get the maza data for a given email and password via API v4."""
api_version = 4
def get_url(self):
return self.get_api_url()
def make_request(self):
request = urllib.request.Request(self.get_url())
credentials = '%s:%s' % (self.username, self.password)
credentials64 = base64.encodebytes(credentials.encode('ascii'))
authorization = 'Basic %s' % credentials64.decode('ascii')
request.add_header('Authorization', authorization)
return request
class GetMazaDataAPI3(GetMazaDataAPI4):
"""Get the maza data for a given email and password via API v3."""
api_version = 3
def get_url(self):
return self.get_api_url() + quote(self.username)
class GetMazaDataAPI2(GetMazaDataAPI3):
"""Get the maza data for a given email and password via API v2."""
api_version = 2
api_versions = {
'1': GetMazaDataAPI1,
'2': GetMazaDataAPI2,
'3': GetMazaDataAPI3,
'4': GetMazaDataAPI4,
'default': GetMazaDataAPI4,
}
|