From 7a893853f6db3989a4cc2da8830bc81110b016a4 Mon Sep 17 00:00:00 2001 From: Robert Tari Date: Mon, 13 May 2024 17:29:17 +0200 Subject: ayatanawebmail/imaplib2.py: Fix Python 3.12 bug and drop unused code --- ayatanawebmail/imaplib2.py | 231 ++------------------------------------------- 1 file changed, 10 insertions(+), 221 deletions(-) (limited to 'ayatanawebmail') diff --git a/ayatanawebmail/imaplib2.py b/ayatanawebmail/imaplib2.py index aa4a1d4..e4ff61c 100755 --- a/ayatanawebmail/imaplib2.py +++ b/ayatanawebmail/imaplib2.py @@ -506,7 +506,16 @@ class IMAP4(object): ssl_version = TLS_MAP[self.tls_level][self.ssl_version] - self.sock = ssl.wrap_socket(self.sock, self.keyfile, self.certfile, ca_certs=self.ca_certs, cert_reqs=cert_reqs, ssl_version=ssl_version) + pContext = ssl.SSLContext (ssl_version) + pContext.verify_mode = cert_reqs + + if self.ca_certs: + pContext.load_verify_locations (self.ca_certs) + + if self.keyfile and self.certfile: + pContext.load_cert_chain (self.certfile, self.keyfile) + + self.sock = pContext.wrap_socket (self.sock, server_hostname=self.host) ssl_exc = ssl.SSLError self.read_fd = self.sock.fileno() except ImportError: @@ -2413,223 +2422,3 @@ def ParseFlags(resp): return () return tuple(mo.group('flags').split()) - - - -if __name__ == '__main__': - - # To test: invoke either as 'python imaplib2.py [IMAP4_server_hostname]', - # or as 'python imaplib2.py -s "rsh IMAP4_server_hostname exec /etc/rimapd"' - # or as 'python imaplib2.py -l keyfile[:certfile]|: [IMAP4_SSL_server_hostname]' - # - # Option "-d " turns on debugging (use "-d 5" for everything) - # Option "-i" tests that IDLE is interruptible - # Option "-p " allows alternate ports - - if not __debug__: - raise ValueError('Please run without -O') - - import getopt, getpass - - try: - optlist, args = getopt.getopt(sys.argv[1:], 'd:il:s:p:') - except getopt.error as val: - optlist, args = (), () - - debug, debug_buf_lvl, port, stream_command, keyfile, certfile, idle_intr = (None,)*7 - for opt,val in optlist: - if opt == '-d': - debug = int(val) - debug_buf_lvl = debug - 1 - elif opt == '-i': - idle_intr = 1 - elif opt == '-l': - try: - keyfile,certfile = val.split(':') - except ValueError: - keyfile,certfile = val,val - elif opt == '-p': - port = int(val) - elif opt == '-s': - stream_command = val - if not args: args = (stream_command,) - - if not args: args = ('',) - if not port: port = (keyfile is not None) and IMAP4_SSL_PORT or IMAP4_PORT - - host = args[0] - - USER = getpass.getuser() - - data = open(os.path.exists("test.data") and "test.data" or __file__).read(1000) - test_mesg = 'From: %(user)s@localhost%(lf)sSubject: IMAP4 test%(lf)s%(lf)s%(data)s' \ - % {'user':USER, 'lf':'\n', 'data':data} - - test_seq1 = [ - ('list', ('""', '""')), - ('list', ('""', '"%"')), - ('create', ('imaplib2_test0',)), - ('rename', ('imaplib2_test0', 'imaplib2_test1')), - ('CREATE', ('imaplib2_test2',)), - ('append', ('imaplib2_test2', None, None, test_mesg)), - ('list', ('""', '"imaplib2_test%"')), - ('select', ('imaplib2_test2',)), - ('search', (None, 'SUBJECT', '"IMAP4 test"')), - ('fetch', ('1:*', '(FLAGS INTERNALDATE RFC822)')), - ('store', ('1', 'FLAGS', '(\Deleted)')), - ('namespace', ()), - ('expunge', ()), - ('recent', ()), - ('close', ()), - ] - - test_seq2 = ( - ('select', ()), - ('response', ('UIDVALIDITY',)), - ('response', ('EXISTS',)), - ('append', (None, None, None, test_mesg)), - ('examine', ()), - ('select', ()), - ('fetch', ('1:*', '(FLAGS UID)')), - ('examine', ()), - ('select', ()), - ('uid', ('SEARCH', 'SUBJECT', '"IMAP4 test"')), - ('uid', ('SEARCH', 'ALL')), - ('uid', ('THREAD', 'references', 'UTF-8', '(SEEN)')), - ('recent', ()), - ) - - - AsyncError, M = None, None - - def responder(cb_arg_list): - response, cb_arg, error = cb_arg_list - global AsyncError - cmd, args = cb_arg - if error is not None: - AsyncError = error - M._log(0, '[cb] ERROR %s %.100s => %s' % (cmd, args, error)) - return - typ, dat = response - M._log(0, '[cb] %s %.100s => %s %.100s' % (cmd, args, typ, dat)) - if typ == 'NO': - AsyncError = (Exception, dat[0]) - - def run(cmd, args, cb=True): - if AsyncError: - M._log(1, 'AsyncError %s' % repr(AsyncError)) - M.logout() - typ, val = AsyncError - raise typ(val) - if not M.debug: M._log(0, '%s %.100s' % (cmd, args)) - try: - if cb: - typ, dat = getattr(M, cmd)(callback=responder, cb_arg=(cmd, args), *args) - M._log(1, '%s %.100s => %s %.100s' % (cmd, args, typ, dat)) - else: - typ, dat = getattr(M, cmd)(*args) - M._log(1, '%s %.100s => %s %.100s' % (cmd, args, typ, dat)) - except: - M._log(1, '%s - %s' % sys.exc_info()[:2]) - M.logout() - raise - if typ == 'NO': - M._log(1, 'NO') - M.logout() - raise Exception(dat[0]) - return dat - - try: - threading.currentThread().setName('main') - - if keyfile is not None: - if not keyfile: keyfile = None - if not certfile: certfile = None - M = IMAP4_SSL(host=host, port=port, keyfile=keyfile, certfile=certfile, ssl_version="tls1", debug=debug, identifier='', timeout=10, debug_buf_lvl=debug_buf_lvl, tls_level="tls_no_ssl") - elif stream_command: - M = IMAP4_stream(stream_command, debug=debug, identifier='', timeout=10, debug_buf_lvl=debug_buf_lvl) - else: - M = IMAP4(host=host, port=port, debug=debug, identifier='', timeout=10, debug_buf_lvl=debug_buf_lvl) - if M.state != 'AUTH': # Login needed - PASSWD = getpass.getpass("IMAP password for %s on %s: " % (USER, host or "localhost")) - test_seq1.insert(0, ('login', (USER, PASSWD))) - M._log(0, 'PROTOCOL_VERSION = %s' % M.PROTOCOL_VERSION) - if 'COMPRESS=DEFLATE' in M.capabilities: - M.enable_compression() - - for cmd,args in test_seq1: - run(cmd, args) - - for ml in run('list', ('""', '"imaplib2_test%"'), cb=False): - mo = re.match(br'.*"([^"]+)"$', ml) - if mo: path = mo.group(1) - else: path = ml.split()[-1] - run('delete', (path,)) - - if 'ID' in M.capabilities: - run('id', ()) - run('id', ("(name imaplib2)",)) - run('id', ("version", __version__, "os", os.uname()[0])) - - for cmd,args in test_seq2: - if (cmd,args) != ('uid', ('SEARCH', 'SUBJECT', 'IMAP4 test')): - run(cmd, args) - continue - - dat = run(cmd, args, cb=False) - uid = dat[-1].split() - if not uid: continue - run('uid', ('FETCH', uid[-1], - '(FLAGS INTERNALDATE RFC822.SIZE RFC822.HEADER RFC822.TEXT)')) - run('uid', ('STORE', uid[-1], 'FLAGS', '(\Deleted)')) - run('expunge', ()) - - if 'IDLE' in M.capabilities: - run('idle', (2,), cb=False) - run('idle', (99,)) # Asynchronous, to test interruption of 'idle' by 'noop' - time.sleep(1) - run('noop', (), cb=False) - - run('append', (None, None, None, test_mesg), cb=False) - num = run('search', (None, 'ALL'), cb=False)[0].split()[0] - dat = run('fetch', (num, '(FLAGS INTERNALDATE RFC822)'), cb=False) - M._mesg('fetch %s => %s' % (num, repr(dat))) - run('idle', (2,)) - run('store', (num, '-FLAGS', '(\Seen)'), cb=False), - dat = run('fetch', (num, '(FLAGS INTERNALDATE RFC822)'), cb=False) - M._mesg('fetch %s => %s' % (num, repr(dat))) - run('uid', ('STORE', num, 'FLAGS', '(\Deleted)')) - run('expunge', ()) - if idle_intr: - M._mesg('HIT CTRL-C to interrupt IDLE') - try: - run('idle', (99,), cb=False) # Synchronous, to test interruption of 'idle' by INTR - except KeyboardInterrupt: - M._mesg('Thanks!') - M._mesg('') - raise - elif idle_intr: - M._mesg('chosen server does not report IDLE capability') - - run('logout', (), cb=False) - - if debug: - M._mesg('') - M._print_log() - M._mesg('') - M._mesg('unused untagged responses in order, most recent last:') - for typ,dat in M.pop_untagged_responses(): M._mesg('\t%s %s' % (typ, dat)) - - print('All tests OK.') - - except: - if not idle_intr or M is None or not 'IDLE' in M.capabilities: - print('Tests failed.') - - if not debug: - print(''' -If you would like to see debugging output, -try: %s -d5 -''' % sys.argv[0]) - - raise -- cgit v1.2.3