我正在try 构建一个应用程序脚本来解析新传入的邮箱,我已经判断了来自特定发件人的现有邮箱的脚本,它运行得很好,但因为我已经将其修改为实时运行的脚本,所以它抛给我一个错误,说:An error occurred: 'utf-8' codec can't decode byte 0xa9 in position 802: invalid start byte

以下是它的代码:

import imaplib
import email
import re
import yaml
from datetime import datetime
import time

# Function to extract the promo code from email body
def extract_promo_code(body):
    promo_code_pattern = r'(?s)Enter (?:code|promo).*?\b([A-Z\d]{10,})'
    match = re.search(promo_code_pattern, body, re.MULTILINE)
    if match:
        return match.group(1)
    else:
        return None

# Function to extract the expiry date from email body
def extract_expiry_date(body):
    expiry_date_pattern = r'Offer valid until ([A-Za-z]+ \d{1,2}, \d{4})'
    match = re.search(expiry_date_pattern, body)
    if match:
        original_date = match.group(1)
        parsed_date = datetime.strptime(original_date, '%B %d, %Y')
        formatted_date = parsed_date.strftime('%d/%m/%Y')
        return formatted_date
    else:
        return None

# Read credentials from a YAML file
with open('credentials.yaml') as f:
    content = f.read()

my_credentials = yaml.load(content, Loader=yaml.FullLoader)

user, password = my_credentials['user'], my_credentials['password']

imap_url = 'imap.gmail.com'

while True:
    try:
        my_mail = imaplib.IMAP4_SSL(imap_url)
        my_mail.login(user, password)
        my_mail.select('Inbox')

        _, data = my_mail.search(None, 'ALL')

        mail_id_list = data[0].split()
        
        for num in mail_id_list:
            typ, data = my_mail.fetch(num, '(RFC822)')
            msgs = []

            for msg in data:
                if isinstance(msg, tuple):
                    my_msg = email.message_from_bytes(msg[1])

                    # Initialize data fields
                    msg_to = my_msg['to']
                    date = my_msg['date']
                    expiry_date = None
                    subject = my_msg['subject']
                    promo_code = None
                    exclusions = None
                    supplier = "Supplier ID"
                    message_id = my_msg['Message-ID']

                    # Extract promo code from email body
                    for part in my_msg.walk():
                        if part.get_content_type() == 'text/plain':
                            body = part.get_payload(decode=True).decode('utf-8')
                            promo_code = extract_promo_code(body)

                            # Extract exclusions if present (you can modify this part)
                            exclusions_match = re.search(r'\*\s*EXCLUSIONS AND DISCLAIMERS\s*(.*?)Some exclusions apply\.', body, re.IGNORECASE | re.MULTILINE | re.DOTALL)
                            if exclusions_match:
                                exclusions = exclusions_match.group(1).strip()

                            # Extract expiry date from email body
                            expiry_date = extract_expiry_date(body)

                    # Check if a promo code was found before printing or saving the extracted data
                    if promo_code:
                        # Print or save the extracted data
                        print('______________________________')
                        print("msg_to:", msg_to)
                        print("date:", date)
                        print("expiry_date:", expiry_date)
                        print("subject:", subject)
                        print("promo_code:", promo_code)
                        print("exclusions:", exclusions)
                        print("supplier:", supplier)
                        print("message_id:", message_id)
                        print('______________________________')

        # Close the mailbox
        my_mail.logout()

        # Sleep for a while before checking for new emails again
        time.sleep(60)  # Sleep for 60 seconds before checking again
    except Exception as e:
        print(f"An error occurred: {str(e)}")

即使在没有收到邮箱的情况下,该代码也会每隔60秒运行一次异常.我只想让它判断未读的新传入邮箱,并从其中提取所需的数据,如果它存在.

推荐答案

我不是很确定,但我猜你的收件箱里有一封带有非UTF-8字符的邮箱.由于您的脚本在连续循环中运行,两次迭代之间有60秒的Hibernate 间隔,因此它每分钟都会try 阅读这封邮箱,这会导致异常.

我建议将您的解码语句包装在try-Catch块中,以便优雅地处理错误:

try:
    body = part.get_payload(decode=True).decode('utf-8')
except UnicodeDecodeError:
    print(f"Error decoding email {message_id}. Skipping.")
    continue

此外,只处理未读邮箱而不是所有邮箱也可能是有意义的:

_, data = my_mail.search(None, 'UNSEEN')

Python相关问答推荐

覆盖Django rest响应,仅返回PK

计算相同形状的两个张量的SSE损失

按顺序合并2个词典列表

如何列举Pandigital Prime Set

递归访问嵌套字典中的元素值

Asyncio:如何从子进程中读取stdout?

使用Python从URL下载Excel文件

Python列表不会在条件while循环中正确随机化'

在两极中过滤

如何在TensorFlow中分类多个类

如何更改groupby作用域以找到满足掩码条件的第一个值?

AES—256—CBC加密在Python和PHP中返回不同的结果,HELPPP

Maya Python脚本将纹理应用于所有对象,而不是选定对象

ConversationalRetrivalChain引发键错误

交替字符串位置的正则表达式

基于多个数组的多个条件将值添加到numpy数组

干燥化与列姆化的比较

pysnmp—lextudio使用next()和getCmd()生成器导致TypeError:tuple对象不是迭代器''

提高算法效率的策略?

将字节序列解码为Unicode字符串