1  import threading, smtpd, asyncore, socket, smtplib, time 
 2  import unittest 
 3  import pyzmail 
 4  from pyzmail.generate import * 
 5   
 6   
 7  smtpd_addr='127.0.0.1'  
 8  smtpd_port=32525 
 9  smtp_bad_port=smtpd_port-1 
10   
11  smtp_mode='normal' 
12  smtp_login=None 
13  smtp_password=None 
14       
15   
17 -    def __init__(self, localaddr, remoteaddr, received): 
 18          smtpd.SMTPServer.__init__(self, localaddr, remoteaddr) 
19          self.set_reuse_addr() 
20           
21          self.received=received 
 22           
24          ret=None 
25          if mail_from.startswith('data_error'): 
26              ret='552 Requested mail action aborted: exceeded storage allocation' 
27          self.received.append((ret, peer, mail_from, rcpt_to, data)) 
28          return ret 
 31   
39   
40                       
41          self.payload, self.mail_from, self.rcpt_to, self.msg_id=compose_mail((u'Me', 'me@foo.com'), [(u'Him', 'him@bar.com')], u'the subject', 'iso-8859-1', ('Hello world', 'us-ascii')) 
42   
43           
44           
45          self.smtpd_thread=threading.Thread(target=asyncloop) 
46          self.smtpd_thread.daemon=True 
47          self.smtpd_thread.start() 
 48   
49   
51          self.smtp_server.close() 
52          self.smtpd_thread.join() 
 53           
55          """simple send""" 
56          ret=send_mail(self.payload, self.mail_from, self.rcpt_to, smtpd_addr, smtpd_port, smtp_mode=smtp_mode, smtp_login=smtp_login, smtp_password=smtp_password) 
57          self.assertEqual(ret, dict()) 
58          (ret, peer, mail_from, rcpt_to, payload)=self.received[0] 
59          self.assertEqual(self.payload, payload) 
60          self.assertEqual(self.mail_from, mail_from) 
61          self.assertEqual(self.rcpt_to, rcpt_to) 
62          self.assertEqual('127.0.0.1', peer[0]) 
 63   
69   
74   
75  if __name__ == '__main__': 
76      unittest.main() 
77