1 import threading, smtpd, asyncore, socket, smtplib, time
2 import unittest
3 import pyzmail
4 from pyzmail.generate import *
5
6
7 smtpd_addr='127.0.0.1'
8 smtpd_port=32525
9 smtp_bad_port=smtpd_port-1
10
11 smtp_mode='normal'
12 smtp_login=None
13 smtp_password=None
14
15
17 - def __init__(self, localaddr, remoteaddr, received):
18 smtpd.SMTPServer.__init__(self, localaddr, remoteaddr)
19 self.set_reuse_addr()
20
21 self.received=received
22
24 ret=None
25 if mail_from.startswith('data_error'):
26 ret='552 Requested mail action aborted: exceeded storage allocation'
27 self.received.append((ret, peer, mail_from, rcpt_to, data))
28 return ret
29
31
39
40
41 self.payload, self.mail_from, self.rcpt_to, self.msg_id=compose_mail((u'Me', 'me@foo.com'), [(u'Him', 'him@bar.com')], u'the subject', 'iso-8859-1', ('Hello world', 'us-ascii'))
42
43
44
45 self.smtpd_thread=threading.Thread(target=asyncloop)
46 self.smtpd_thread.daemon=True
47 self.smtpd_thread.start()
48
49
51 self.smtp_server.close()
52 self.smtpd_thread.join()
53
55 """simple send"""
56 ret=send_mail(self.payload, self.mail_from, self.rcpt_to, smtpd_addr, smtpd_port, smtp_mode=smtp_mode, smtp_login=smtp_login, smtp_password=smtp_password)
57 self.assertEqual(ret, dict())
58 (ret, peer, mail_from, rcpt_to, payload)=self.received[0]
59 self.assertEqual(self.payload, payload)
60 self.assertEqual(self.mail_from, mail_from)
61 self.assertEqual(self.rcpt_to, rcpt_to)
62 self.assertEqual('127.0.0.1', peer[0])
63
69
74
75 if __name__ == '__main__':
76 unittest.main()
77