diff options
Diffstat (limited to 'ayatanawebmail/imaplib2.py')
| -rwxr-xr-x | ayatanawebmail/imaplib2.py | 231 | 
1 files changed, 10 insertions, 221 deletions
| diff --git a/ayatanawebmail/imaplib2.py b/ayatanawebmail/imaplib2.py index aa4a1d4..e4ff61c 100755 --- a/ayatanawebmail/imaplib2.py +++ b/ayatanawebmail/imaplib2.py @@ -506,7 +506,16 @@ class IMAP4(object):              ssl_version =  TLS_MAP[self.tls_level][self.ssl_version] -            self.sock = ssl.wrap_socket(self.sock, self.keyfile, self.certfile, ca_certs=self.ca_certs, cert_reqs=cert_reqs, ssl_version=ssl_version) +            pContext = ssl.SSLContext (ssl_version) +            pContext.verify_mode = cert_reqs + +            if self.ca_certs: +                pContext.load_verify_locations (self.ca_certs) + +            if self.keyfile and self.certfile: +                pContext.load_cert_chain (self.certfile, self.keyfile) + +            self.sock = pContext.wrap_socket (self.sock, server_hostname=self.host)              ssl_exc = ssl.SSLError              self.read_fd = self.sock.fileno()          except ImportError: @@ -2413,223 +2422,3 @@ def ParseFlags(resp):          return ()      return tuple(mo.group('flags').split()) - - - -if __name__ == '__main__': - -    # To test: invoke either as 'python imaplib2.py [IMAP4_server_hostname]', -    # or as 'python imaplib2.py -s "rsh IMAP4_server_hostname exec /etc/rimapd"' -    # or as 'python imaplib2.py -l keyfile[:certfile]|: [IMAP4_SSL_server_hostname]' -    # -    # Option "-d <level>" turns on debugging (use "-d 5" for everything) -    # Option "-i" tests that IDLE is interruptible -    # Option "-p <port>" allows alternate ports - -    if not __debug__: -        raise ValueError('Please run without -O') - -    import getopt, getpass - -    try: -        optlist, args = getopt.getopt(sys.argv[1:], 'd:il:s:p:') -    except getopt.error as val: -        optlist, args = (), () - -    debug, debug_buf_lvl, port, stream_command, keyfile, certfile, idle_intr = (None,)*7 -    for opt,val in optlist: -        if opt == '-d': -            debug = int(val) -            debug_buf_lvl = debug - 1 -        elif opt == '-i': -            idle_intr = 1 -        elif opt == '-l': -            try: -                keyfile,certfile = val.split(':') -            except ValueError: -                keyfile,certfile = val,val -        elif opt == '-p': -            port = int(val) -        elif opt == '-s': -            stream_command = val -            if not args: args = (stream_command,) - -    if not args: args = ('',) -    if not port: port = (keyfile is not None) and IMAP4_SSL_PORT or IMAP4_PORT - -    host = args[0] - -    USER = getpass.getuser() - -    data = open(os.path.exists("test.data") and "test.data" or __file__).read(1000) -    test_mesg = 'From: %(user)s@localhost%(lf)sSubject: IMAP4 test%(lf)s%(lf)s%(data)s' \ -                     % {'user':USER, 'lf':'\n', 'data':data} - -    test_seq1 = [ -    ('list', ('""', '""')), -    ('list', ('""', '"%"')), -    ('create', ('imaplib2_test0',)), -    ('rename', ('imaplib2_test0', 'imaplib2_test1')), -    ('CREATE', ('imaplib2_test2',)), -    ('append', ('imaplib2_test2', None, None, test_mesg)), -    ('list', ('""', '"imaplib2_test%"')), -    ('select', ('imaplib2_test2',)), -    ('search', (None, 'SUBJECT', '"IMAP4 test"')), -    ('fetch', ('1:*', '(FLAGS INTERNALDATE RFC822)')), -    ('store', ('1', 'FLAGS', '(\Deleted)')), -    ('namespace', ()), -    ('expunge', ()), -    ('recent', ()), -    ('close', ()), -    ] - -    test_seq2 = ( -    ('select', ()), -    ('response', ('UIDVALIDITY',)), -    ('response', ('EXISTS',)), -    ('append', (None, None, None, test_mesg)), -    ('examine', ()), -    ('select', ()), -    ('fetch', ('1:*', '(FLAGS UID)')), -    ('examine', ()), -    ('select', ()), -    ('uid', ('SEARCH', 'SUBJECT', '"IMAP4 test"')), -    ('uid', ('SEARCH', 'ALL')), -    ('uid', ('THREAD', 'references', 'UTF-8', '(SEEN)')), -    ('recent', ()), -    ) - - -    AsyncError, M = None, None - -    def responder(cb_arg_list): -        response, cb_arg, error = cb_arg_list -        global AsyncError -        cmd, args = cb_arg -        if error is not None: -            AsyncError = error -            M._log(0, '[cb] ERROR %s %.100s => %s' % (cmd, args, error)) -            return -        typ, dat = response -        M._log(0, '[cb] %s %.100s => %s %.100s' % (cmd, args, typ, dat)) -        if typ == 'NO': -            AsyncError = (Exception, dat[0]) - -    def run(cmd, args, cb=True): -        if AsyncError: -            M._log(1, 'AsyncError %s' % repr(AsyncError)) -            M.logout() -            typ, val = AsyncError -            raise typ(val) -        if not M.debug: M._log(0, '%s %.100s' % (cmd, args)) -        try: -            if cb: -                typ, dat = getattr(M, cmd)(callback=responder, cb_arg=(cmd, args), *args) -                M._log(1, '%s %.100s => %s %.100s' % (cmd, args, typ, dat)) -            else: -                typ, dat = getattr(M, cmd)(*args) -                M._log(1, '%s %.100s => %s %.100s' % (cmd, args, typ, dat)) -        except: -            M._log(1, '%s - %s' % sys.exc_info()[:2]) -            M.logout() -            raise -        if typ == 'NO': -            M._log(1, 'NO') -            M.logout() -            raise Exception(dat[0]) -        return dat - -    try: -        threading.currentThread().setName('main') - -        if keyfile is not None: -            if not keyfile: keyfile = None -            if not certfile: certfile = None -            M = IMAP4_SSL(host=host, port=port, keyfile=keyfile, certfile=certfile, ssl_version="tls1", debug=debug, identifier='', timeout=10, debug_buf_lvl=debug_buf_lvl, tls_level="tls_no_ssl") -        elif stream_command: -            M = IMAP4_stream(stream_command, debug=debug, identifier='', timeout=10, debug_buf_lvl=debug_buf_lvl) -        else: -            M = IMAP4(host=host, port=port, debug=debug, identifier='', timeout=10, debug_buf_lvl=debug_buf_lvl) -        if M.state != 'AUTH':   # Login needed -            PASSWD = getpass.getpass("IMAP password for %s on %s: " % (USER, host or "localhost")) -            test_seq1.insert(0, ('login', (USER, PASSWD))) -        M._log(0, 'PROTOCOL_VERSION = %s' % M.PROTOCOL_VERSION) -        if 'COMPRESS=DEFLATE' in M.capabilities: -            M.enable_compression() - -        for cmd,args in test_seq1: -            run(cmd, args) - -        for ml in run('list', ('""', '"imaplib2_test%"'), cb=False): -            mo = re.match(br'.*"([^"]+)"$', ml) -            if mo: path = mo.group(1) -            else: path = ml.split()[-1] -            run('delete', (path,)) - -        if 'ID' in M.capabilities: -            run('id', ()) -            run('id', ("(name imaplib2)",)) -            run('id', ("version", __version__, "os", os.uname()[0])) - -        for cmd,args in test_seq2: -            if (cmd,args) != ('uid', ('SEARCH', 'SUBJECT', 'IMAP4 test')): -                run(cmd, args) -                continue - -            dat = run(cmd, args, cb=False) -            uid = dat[-1].split() -            if not uid: continue -            run('uid', ('FETCH', uid[-1], -                    '(FLAGS INTERNALDATE RFC822.SIZE RFC822.HEADER RFC822.TEXT)')) -            run('uid', ('STORE', uid[-1], 'FLAGS', '(\Deleted)')) -            run('expunge', ()) - -        if 'IDLE' in M.capabilities: -            run('idle', (2,), cb=False) -            run('idle', (99,))          # Asynchronous, to test interruption of 'idle' by 'noop' -            time.sleep(1) -            run('noop', (), cb=False) - -            run('append', (None, None, None, test_mesg), cb=False) -            num = run('search', (None, 'ALL'), cb=False)[0].split()[0] -            dat = run('fetch', (num, '(FLAGS INTERNALDATE RFC822)'), cb=False) -            M._mesg('fetch %s => %s' % (num, repr(dat))) -            run('idle', (2,)) -            run('store', (num, '-FLAGS', '(\Seen)'), cb=False), -            dat = run('fetch', (num, '(FLAGS INTERNALDATE RFC822)'), cb=False) -            M._mesg('fetch %s => %s' % (num, repr(dat))) -            run('uid', ('STORE', num, 'FLAGS', '(\Deleted)')) -            run('expunge', ()) -            if idle_intr: -                M._mesg('HIT CTRL-C to interrupt IDLE') -                try: -                    run('idle', (99,), cb=False) # Synchronous, to test interruption of 'idle' by INTR -                except KeyboardInterrupt: -                    M._mesg('Thanks!') -                    M._mesg('') -                    raise -        elif idle_intr: -            M._mesg('chosen server does not report IDLE capability') - -        run('logout', (), cb=False) - -        if debug: -            M._mesg('') -            M._print_log() -            M._mesg('') -            M._mesg('unused untagged responses in order, most recent last:') -            for typ,dat in M.pop_untagged_responses(): M._mesg('\t%s %s' % (typ, dat)) - -        print('All tests OK.') - -    except: -        if not idle_intr or M is None or not 'IDLE' in M.capabilities: -            print('Tests failed.') - -            if not debug: -                print(''' -If you would like to see debugging output, -try: %s -d5 -''' % sys.argv[0]) - -            raise | 
