国产邮件系统攻防演练(用python做批量发送邮件脚本)

2025-10-05 14:16:41

上海辰童科技有限公司

直接把encoding改成utf-8就可
def load_config(self):
self.config = {}
if os.path.exists(SAVE_FILE):
with open(SAVE_FILE, 'r', encoding='utf-8') as f:
self.config = json.load(f)

1. 导入模块部分

import base64import loggingimport mimetypesimport smtplibimport osimport tracebackfrom email import encodersfrom email.mime.base import MIMEBasefrom email.mime.multipart import MIMEMultipartimport tkinter as tkfrom tkinter import messagebox, filedialog, ttkimport threadingimport timeimport jsonfrom queue import Queuefrom urllib.parse import quote
  • base64:用于对数据进行 Base64 编码,在处理邮件附件时会用到。

  • logging:用于记录程序运行过程中的信息,方便调试和监控。

  • mimetypes:用于猜测文件的 MIME 类型,在处理邮件附件时确定附件的类型。

  • smtplib:用于实现 SMTP 协议,实现邮件的发送功能。

  • os:提供了与操作系统进行交互的功能,如文件路径处理、文件存在性检查等。

  • traceback:用于打印异常的堆栈跟踪信息,方便调试。

  • email 相关模块:用于构建和处理邮件消息,包括邮件的主体、附件等。

  • tkinter 相关模块:用于创建图形用户界面(GUI),包括窗口、标签、输入框、按钮等。

  • threading:用于实现多线程编程,避免在发送邮件时阻塞 GUI 界面。

  • time:用于实现时间相关的操作,如发送间隔的控制。

  • json:用于处理 JSON 数据,实现配置文件的读写。

  • Queue:用于线程间的通信,在多线程环境下传递消息。

  • quote:用于对 URL 进行编码,在处理邮件附件的文件名时使用。

2. 常量配置和全局状态变量部分

# 常量配置DEFAULT_SMTP_SERVER = 'smtp.qq.com'DEFAULT_SMTP_PORT = 465DAILY_SEND_LIMIT = 500BUFFER_SIZE = 512 * 1024  # 512KB分块处理MAX_RETRIES = 3SEND_INTERVAL = 5  # 秒# 全局状态变量sent_count = 0success_count = 0SAVE_FILE = 'email_config.json'total_emails = 0logging.basicConfig(filename="email.log", level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
  • 常量配置

    • DEFAULT_SMTP_SERVER:默认的 SMTP 服务器地址,这里使用的是 QQ 邮箱的 SMTP 服务器。

    • DEFAULT_SMTP_PORT:默认的 SMTP 服务器端口,465 是 QQ 邮箱 SMTP 服务器使用的 SSL 端口。

    • DAILY_SEND_LIMIT:每日发送邮件的限制数量。

    • BUFFER_SIZE:处理文件时的缓冲区大小,用于分块读取文件。

    • MAX_RETRIES:发送邮件失败时的最大重试次数。

    • SEND_INTERVAL:每封邮件发送之间的间隔时间,单位为秒。

  • 全局状态变量

    • sent_count:记录已经发送的邮件数量。

    • success_count:记录成功发送的邮件数量。

    • SAVE_FILE:保存配置信息的 JSON 文件的文件名。

    • total_emails:记录总共发起的邮件发送请求数量。

  • 日志配置:使用 logging.basicConfig 配置日志,将日志信息记录到 email.log 文件中,日志级别为 INFO

3. EmailSenderApp 类部分

  • self.select_attachments 是选择附件按钮的回调函数,用于选择附件文件。

  • self.save_info_var 是一个 IntVar 类型的变量,用于记录是否保存配置信息。

  • self.start_sending 是发送邮件按钮的回调函数,用于启动邮件发送过程。

load_config 方法

    def load_config(self):
        self.config = {}
        if os.path.exists(SAVE_FILE):
            with open(SAVE_FILE, 'r', encoding='utf-8') as f:
                self.config = json.load(f)
  • 该方法用于加载之前保存的配置信息。

  • 首先检查 SAVE_FILE 文件是否存在,如果存在则以 UTF - 8 编码打开文件,使用 json.load 方法将 JSON 数据解析为 Python 字典,并保存到 self.config 中。

save_config 方法

    def save_config(self):
        config = {
            'email': self.email_entry.get(),
            'password': self.password_entry.get(),
            'recipients': [r.strip() for r in self.recipients_entry.get().split(',') if r.strip()],
            'smtp_server': self.server_entry.get(),
            'port': int(self.port_entry.get()),
            'attachments': [a.strip() for a in self.attachment_text.get("1.0", tk.END).split('
') if a.strip()]
        }
        with open(SAVE_FILE, 'w', encoding='utf-8') as f:
            json.dump(config, f, ensure_ascii=False)
  • 该方法用于保存当前的配置信息到 SAVE_FILE 文件中。

  • 首先将各个输入框中的信息整理成一个字典 config,然后以 UTF - 8 编码打开文件,使用 json.dump 方法将字典数据转换为 JSON 字符串并写入文件中,ensure_ascii=False 用于确保中文字符能正确保存。

select_attachments 方法

    def select_attachments(self):
        files = filedialog.askopenfilenames(filetypes=[("压缩文件", "*.zip")])
        if files:
            self.attachment_text.delete("1.0", tk.END)
            for file in files:
                self.attachment_text.insert(tk.END, file + '
')
  • 该方法是选择附件按钮的回调函数。

  • 使用 filedialog.askopenfilenames 方法弹出文件选择对话框,只允许选择 ZIP 压缩文件。(这个后期其实可以改)

  • 如果选择了文件,则清空 self.attachment_text 文本框中的内容,并将选择的文件路径逐行插入到文本框中。

send_single_email 方法

  • 发送邮件后检查是否达到每日发送限制,如果达到则弹出警告消息框并返回 False

  • 如果发送过程中出现异常,打印异常堆栈信息,重试次数加 1,暂停 SEND_INTERVAL 秒。如果重试次数达到上限,则将错误信息放入队列中并返回 False

4. 主程序部分

完整代码在这,自己记录一下:

import loggingimport mimetypesimport smtplibimport osimport tracebackfrom email import encodersfrom email.mime.base import MIMEBasefrom email.mime.multipart import MIMEMultipartimport tkinter as tkfrom tkinter import messagebox, filedialog, ttkimport threadingimport timeimport jsonfrom queue import Queuefrom urllib.parse import quote# 常量配置DEFAULT_SMTP_SERVER = 'smtp.qq.com'DEFAULT_SMTP_PORT = 465DAILY_SEND_LIMIT = 500BUFFER_SIZE = 512 * 1024  # 512KB分块处理MAX_RETRIES = 3SEND_INTERVAL = 5  # 秒# 全局状态变量sent_count = 0success_count = 0SAVE_FILE = 'email_config.json'total_emails = 0logging.basicConfig(filename="email.log", level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")class EmailSenderApp:
    def __init__(self, root):
        self.root = root
        self.queue = Queue()
        self.attachments = []
        self.status_label = None
        self.load_config()
        self.create_widgets()

    def create_widgets(self):
        # 发件人邮箱
        tk.Label(self.root, text="发件人邮箱:").pack()
        self.email_entry = tk.Entry(self.root)
        self.email_entry.insert(0, self.config.get('email', ''))
        self.email_entry.pack()

        # 授权码
        tk.Label(self.root, text="邮箱授权码:").pack()
        self.password_entry = tk.Entry(self.root, show="*")
        self.password_entry.insert(0, self.config.get('password', ''))
        self.password_entry.pack()

        # 收件人邮箱
        tk.Label(self.root, text="收件人邮箱(逗号分隔):").pack()
        self.recipients_entry = tk.Entry(self.root, width=50)
        self.recipients_entry.insert(0, ','.join(self.config.get('recipients', [])))
        self.recipients_entry.pack()

        # SMTP配置
        tk.Label(self.root, text="SMTP 服务器:").pack()
        self.server_entry = tk.Entry(self.root)
        self.server_entry.insert(0, self.config.get('smtp_server', DEFAULT_SMTP_SERVER))
        self.server_entry.pack()

        tk.Label(self.root, text="SMTP 端口:").pack()
        self.port_entry = tk.Entry(self.root)
        self.port_entry.insert(0, str(self.config.get('port', DEFAULT_SMTP_PORT)))
        self.port_entry.pack()

        # 附件
        tk.Label(self.root, text="附件路径(每行一个):").pack()
        self.attachment_text = tk.Text(self.root, height=5, width=50)
        for att in self.config.get('attachments', []):
            self.attachment_text.insert(tk.END, att + '
')
        self.attachment_text.pack()

        tk.Button(self.root, text="选择附件", command=self.select_attachments).pack()

        # 保存配置
        self.save_info_var = tk.IntVar()
        tk.Checkbutton(self.root, text="保存配置", variable=self.save_info_var).pack()

        # 发送按钮
        tk.Button(self.root, text="发送邮件", command=self.start_sending).pack()

    def load_config(self):
        self.config = {}
        if os.path.exists(SAVE_FILE):
            with open(SAVE_FILE, 'r', encoding='utf-8') as f:
                self.config = json.load(f)

    def save_config(self):
        config = {
            'email': self.email_entry.get(),
            'password': self.password_entry.get(),
            'recipients': [r.strip() for r in self.recipients_entry.get().split(',') if r.strip()],
          'smtp_server': self.server_entry.get(),
            'port': int(self.port_entry.get()),
            'attachments': [a.strip() for a in self.attachment_text.get("1.0", tk.END).split('
') if a.strip()]
        }
        with open(SAVE_FILE, 'w', encoding='utf-8') as f:
            json.dump(config, f, ensure_ascii=False)

    def select_attachments(self):
        files = filedialog.askopenfilenames(filetypes=[("压缩文件", "*.zip")])
        if files:
            self.attachment_text.delete("1.0", tk.END)
            for file in files:
                self.attachment_text.insert(tk.END, file + '
')

    def start_sending(self):
        global total_emails
        total_emails += 1
        threading.Thread(target=self.send_emails_background, daemon=True).start()

    def send_emails_background(self):
        global sent_count, success_count

        email = self.email_entry.get().strip()
        password = self.password_entry.get().strip()
        recipients = [r.strip() for r in self.recipients_entry.get().strip().split(',') if r.strip()]
        smtp_server = self.server_entry.get().strip()
        port = int(self.port_entry.get().strip())
        attachments = [a.strip() for a in self.attachment_text.get("1.0", tk.END).strip().split('
') if a.strip()]
        save_info = self.save_info_var.get()

        if not all([email, password, recipients, smtp_server, port, attachments]):
            self.queue.put(('error', "请输入完整信息"))
            return

        if save_info:
            self.save_config()

        for recipient in recipients:
            for attachment in attachments:
                if sent_count >= DAILY_SEND_LIMIT:
                    self.queue.put(('warning', f"已达每日发送限制 {DAILY_SEND_LIMIT} 封"))
                    return

                result = self.send_single_email(
                    smtp_server, port, email, password,
                    recipient, attachment                )

                sent_count += 1
                if result:
                    success_count += 1
                    logging.info(f"{recipient}---{attachment}发送成功")
                else:
                    logging.info(f"{recipient}---{attachment}发送失败")
                logging.info(f"已发送/已发起 {sent_count}/{total_emails}")
                logging.info(f"成功/已发送 {success_count}/{sent_count}")
                time.sleep(SEND_INTERVAL)

    def send_single_email(self, smtp_server, port, email, password, recipient, attachment):
        retries = 0
        if not os.path.exists(attachment):
            messagebox.showerror("错误", f"附件 {attachment} 不存在,请检查路径")
            return False

        while retries < MAX_RETRIES:
            try:
                msg = MIMEMultipart()
                msg['From'] = email
                msg['To'] = recipient
                msg['Subject'] = "正式邮件主题"

                with open(attachment, 'rb') as data:
                    ctype, encoding = mimetypes.guess_type(attachment)
                    if ctype is None or encoding is not None:
                        ctype = 'application/octet-stream'
                    maintype, subtype = ctype.split('/', 1)
                    file_msg = MIMEBase(maintype, subtype)
                    file_msg.set_payload(data.read())
                    encoders.encode_base64(file_msg)  # 把附件编码

                    filename_encoded = quote(os.path.basename(attachment))
                    content_disposition = f'attachment; filename*=utf-8''{filename_encoded}'
                    file_msg.add_header('Content-Disposition', content_disposition)  # 修改邮件头
                    msg.attach(file_msg)  #这一部分代码最重要

                server = smtplib.SMTP_SSL(smtp_server, port)
                server.login(email, password)
                server.sendmail(email, [recipient], msg.as_string())
                if sent_count >= DAILY_SEND_LIMIT:
                    messagebox.showwarning("提示", f"已达每日发送限制 {DAILY_SEND_LIMIT} 封,停止发送")
                    return False
                server.quit()
                return True
            except Exception as e:
                traceback.print_exc()
                retries += 1
                if retries >= MAX_RETRIES:
                    self.queue.put(('error', f"发送给 {recipient} 失败:{str(e)}"))
                    return False
                time.sleep(SEND_INTERVAL)if __name__ == "__main__":
    root = tk.Tk()
    root.title("邮件发送工具")
    app = EmailSenderApp(root)
    root.mainloop()```