""" 黄历信息数据模型 基于 create_calendar_info.sql 中的DDL定义创建 """ from datetime import date, datetime from typing import Optional from sqlalchemy import Column, Integer, Date, Text, String, Boolean, DateTime from sqlalchemy.orm import Session from sqlalchemy import create_engine, text from sqlalchemy.dialects.postgresql import JSONB import json import requests import re from app import db from .calendar_config import CALENDAR_API_CONFIG from .wechat_api import get_openid_from_code, validate_openid class CalendarInfo(db.Model): """ 黄历信息表数据模型 对应数据库表: public.calendar_info 表注释: 黄历信息表 """ __tablename__ = 'calendar_info' __table_args__ = {'schema': 'public'} # 主键ID (serial primary key) id = db.Column(db.Integer, primary_key=True, autoincrement=True, comment='主键ID') # 阳历日期 (date not null) yangli = db.Column(db.Date, nullable=False, comment='阳历日期') # 阴历日期 (text not null) yinli = db.Column(db.Text, nullable=False, comment='阴历日期') # 五行 (text) wuxing = db.Column(db.Text, nullable=True, comment='五行') # 冲煞 (text) chongsha = db.Column(db.Text, nullable=True, comment='冲煞') # 彭祖百忌 (text) baiji = db.Column(db.Text, nullable=True, comment='彭祖百忌') # 吉神宜趋 (text) jishen = db.Column(db.Text, nullable=True, comment='吉神宜趋') # 宜 (text) yi = db.Column(db.Text, nullable=True, comment='宜') # 凶神宜忌 (text) xiongshen = db.Column(db.Text, nullable=True, comment='凶神宜忌') # 忌 (text) ji = db.Column(db.Text, nullable=True, comment='忌') # 颜色 (varchar(10)) color = db.Column(db.String(10), nullable=True, comment='颜色') def __init__(self, **kwargs): super().__init__(**kwargs) def __repr__(self): return f"" def to_dict(self) -> dict: """ 将模型对象转换为字典 Returns: dict: 包含所有字段的字典 """ return { 'id': self.id, 'yangli': self.yangli.isoformat() if self.yangli is not None else None, 'yinli': self.yinli, 'wuxing': self.wuxing, 'chongsha': self.chongsha, 'baiji': self.baiji, 'jishen': self.jishen, 'yi': self.yi, 'xiongshen': self.xiongshen, 'ji': self.ji, 'color': self.color } def to_json(self) -> str: """ 将模型对象转换为JSON字符串 Returns: str: JSON格式的字符串 """ return json.dumps(self.to_dict(), ensure_ascii=False, indent=2) @classmethod def from_dict(cls, data: dict) -> 'CalendarInfo': """ 从字典创建模型对象 Args: data (dict): 包含字段数据的字典 Returns: CalendarInfo: 新创建的模型对象 """ # 处理日期字段 yangli = data.get('yangli') if isinstance(yangli, str): try: yangli = date.fromisoformat(yangli) except ValueError: yangli = None # 从wuxing字段中判断五行元素并设置对应的颜色值 wuxing = data.get('wuxing', '') or '' color = data.get('color') # 先获取字典中的color值 # 如果字典中没有color值,则根据wuxing字段判断五行元素设置颜色 if not color: if '金' in wuxing: color = '白' elif '水' in wuxing: color = '黑' elif '木' in wuxing: color = '绿' elif '火' in wuxing: color = '红' elif '土' in wuxing: color = '黄' return cls( yangli=yangli, # type: ignore yinli=data.get('yinli'), # type: ignore wuxing=wuxing, # type: ignore chongsha=data.get('chongsha'), # type: ignore baiji=data.get('baiji'), # type: ignore jishen=data.get('jishen'), # type: ignore yi=data.get('yi'), # type: ignore xiongshen=data.get('xiongshen'), # type: ignore ji=data.get('ji'), # type: ignore color=color # type: ignore ) class WechatUser(db.Model): """ 微信用户信息表数据模型 对应数据库表: public.wechat_users 表注释: 微信用户信息表 """ __tablename__ = 'wechat_users' __table_args__ = {'schema': 'public'} # 主键ID (serial primary key) id = db.Column(db.Integer, primary_key=True, autoincrement=True, comment='主键ID') # 微信用户openid (varchar(255) not null unique) openid = db.Column(db.String(255), nullable=False, unique=True, comment='微信用户openid,唯一标识') # 用户手机号码 (varchar(20)) phone_number = db.Column(db.String(20), nullable=True, comment='用户手机号码') # 用户身份证号码 (varchar(18)) id_card_number = db.Column(db.String(18), nullable=True, comment='用户身份证号码') # 当前登录状态 (boolean default false not null) login_status = db.Column(db.Boolean, nullable=False, default=False, comment='当前登录状态,true表示已登录,false表示未登录') # 最后登录时间 (timestamp with time zone) login_time = db.Column(db.DateTime(timezone=True), nullable=True, comment='最后登录时间') # 用户账户状态 (varchar(20) default 'active' not null) user_status = db.Column(db.String(20), nullable=False, default='active', comment='用户账户状态:active-活跃,inactive-非活跃,suspended-暂停,deleted-已删除') # 账户创建时间 (timestamp with time zone default current_timestamp not null) created_at = db.Column(db.DateTime(timezone=True), nullable=False, default=datetime.utcnow, comment='账户创建时间') # 信息更新时间 (timestamp with time zone default current_timestamp not null) updated_at = db.Column(db.DateTime(timezone=True), nullable=False, default=datetime.utcnow, onupdate=datetime.utcnow, comment='信息更新时间') def __init__(self, **kwargs): super().__init__(**kwargs) def __repr__(self): return f"" def to_dict(self) -> dict: """ 将模型对象转换为字典 Returns: dict: 包含所有字段的字典 """ return { 'id': self.id, 'openid': self.openid, 'phone_number': self.phone_number, 'id_card_number': self.id_card_number, 'login_status': self.login_status, 'login_time': self.login_time.isoformat() if self.login_time is not None else None, 'user_status': self.user_status, 'created_at': self.created_at.isoformat() if self.created_at is not None else None, 'updated_at': self.updated_at.isoformat() if self.updated_at is not None else None } def to_json(self) -> str: """ 将模型对象转换为JSON字符串 Returns: str: JSON格式的字符串 """ return json.dumps(self.to_dict(), ensure_ascii=False, indent=2) @classmethod def from_dict(cls, data: dict) -> 'WechatUser': """ 从字典创建模型对象 Args: data (dict): 包含字段数据的字典 Returns: WechatUser: 新创建的模型对象 """ # 处理日期时间字段 login_time = data.get('login_time') if isinstance(login_time, str): try: login_time = datetime.fromisoformat(login_time.replace('Z', '+00:00')) except ValueError: login_time = None created_at = data.get('created_at') if isinstance(created_at, str): try: created_at = datetime.fromisoformat(created_at.replace('Z', '+00:00')) except ValueError: created_at = datetime.utcnow() elif created_at is None: created_at = datetime.utcnow() updated_at = data.get('updated_at') if isinstance(updated_at, str): try: updated_at = datetime.fromisoformat(updated_at.replace('Z', '+00:00')) except ValueError: updated_at = datetime.utcnow() elif updated_at is None: updated_at = datetime.utcnow() return cls( openid=data.get('openid'), # type: ignore phone_number=data.get('phone_number'), # type: ignore id_card_number=data.get('id_card_number'), # type: ignore login_status=data.get('login_status', False), # type: ignore login_time=login_time, # type: ignore user_status=data.get('user_status', 'active'), # type: ignore created_at=created_at, # type: ignore updated_at=updated_at # type: ignore ) class CalendarRecord(db.Model): """ 日历内容记录表数据模型 对应数据库表: public.calendar_records 表注释: 日历内容记录表 """ __tablename__ = 'calendar_records' __table_args__ = {'schema': 'public'} # 主键ID (serial primary key) id = db.Column(db.Integer, primary_key=True, autoincrement=True, comment='主键ID') # 微信用户openid (varchar(255) not null) openid = db.Column(db.String(255), nullable=False, comment='微信用户openid') # 月份标识 (varchar(7) not null) month_key = db.Column(db.String(7), nullable=False, comment='月份标识,格式为YYYY-MM') # 日历内容 (jsonb not null) calendar_content = db.Column(JSONB, nullable=False, comment='日历内容,JSON数组格式') # 创建时间 (timestamp with time zone) created_at = db.Column(db.DateTime(timezone=True), nullable=False, default=datetime.utcnow, comment='记录创建时间') # 更新时间 (timestamp with time zone) updated_at = db.Column(db.DateTime(timezone=True), nullable=False, default=datetime.utcnow, onupdate=datetime.utcnow, comment='记录更新时间') def __init__(self, **kwargs): super().__init__(**kwargs) def __repr__(self): return f"" def to_dict(self) -> dict: """ 将模型对象转换为字典 Returns: dict: 包含所有字段的字典 """ return { 'id': self.id, 'openid': self.openid, 'month_key': self.month_key, 'calendar_content': self.calendar_content, 'created_at': self.created_at.isoformat() if self.created_at is not None else None, 'updated_at': self.updated_at.isoformat() if self.updated_at is not None else None } def to_json(self) -> str: """ 将模型对象转换为JSON字符串 Returns: str: JSON格式的字符串 """ return json.dumps(self.to_dict(), ensure_ascii=False, indent=2) @classmethod def from_dict(cls, data: dict) -> 'CalendarRecord': """ 从字典创建模型对象 Args: data (dict): 包含字段数据的字典 Returns: CalendarRecord: 创建的模型对象 """ # 处理时间字段 created_at = data.get('created_at') if isinstance(created_at, str): try: created_at = datetime.fromisoformat(created_at.replace('Z', '+00:00')) except ValueError: created_at = datetime.utcnow() elif created_at is None: created_at = datetime.utcnow() updated_at = data.get('updated_at') if isinstance(updated_at, str): try: updated_at = datetime.fromisoformat(updated_at.replace('Z', '+00:00')) except ValueError: updated_at = datetime.utcnow() elif updated_at is None: updated_at = datetime.utcnow() return cls( openid=data.get('openid'), # type: ignore month_key=data.get('month_key'), # type: ignore calendar_content=data.get('calendar_content'), # type: ignore created_at=created_at, # type: ignore updated_at=updated_at # type: ignore ) @staticmethod def validate_month_key(month_key: str) -> bool: """ 验证月份格式是否正确 Args: month_key (str): 月份字符串 Returns: bool: 格式正确返回True,否则返回False """ if not month_key or not isinstance(month_key, str): return False # 检查格式是否为YYYY-MM pattern = r'^\d{4}-\d{2}$' if not re.match(pattern, month_key): return False # 检查月份是否在有效范围内 try: year, month = month_key.split('-') year_int = int(year) month_int = int(month) if year_int < 1900 or year_int > 2100: return False if month_int < 1 or month_int > 12: return False return True except (ValueError, IndexError): return False @staticmethod def validate_calendar_content(content) -> bool: """ 验证日历内容格式是否正确 Args: content: 日历内容 Returns: bool: 格式正确返回True,否则返回False """ if content is None: return False # 如果是字符串,尝试解析为JSON if isinstance(content, str): try: content = json.loads(content) except (json.JSONDecodeError, TypeError): return False # 检查是否为列表 if not isinstance(content, list): return False return True class CalendarService: """ 黄历信息服务类 提供黄历信息的增删改查操作 """ def __init__(self, engine=None): """ 初始化服务 Args: engine: SQLAlchemy引擎对象,如果为None则使用Flask-SQLAlchemy的db.session """ self.engine = engine def create_calendar_info(self, calendar_data: dict) -> CalendarInfo: """ 创建新的黄历信息记录 Args: calendar_data (dict): 黄历信息数据 Returns: CalendarInfo: 创建的黄历信息对象 """ calendar_info = CalendarInfo.from_dict(calendar_data) if self.engine: with Session(self.engine) as session: session.add(calendar_info) session.commit() session.refresh(calendar_info) else: # 使用Flask-SQLAlchemy的db.session db.session.add(calendar_info) db.session.commit() db.session.refresh(calendar_info) return calendar_info def get_calendar_by_date(self, yangli_date: date) -> Optional[CalendarInfo]: """ 根据阳历日期查询黄历信息 Args: yangli_date (date): 阳历日期 Returns: Optional[CalendarInfo]: 黄历信息对象,如果不存在则返回None """ if self.engine: with Session(self.engine) as session: return session.query(CalendarInfo).filter( CalendarInfo.yangli == yangli_date ).first() else: # 使用Flask-SQLAlchemy的db.session return CalendarInfo.query.filter( CalendarInfo.yangli == yangli_date ).first() def get_calendar_by_id(self, calendar_id: int) -> Optional[CalendarInfo]: """ 根据ID查询黄历信息 Args: calendar_id (int): 黄历信息ID Returns: Optional[CalendarInfo]: 黄历信息对象,如果不存在则返回None """ if self.engine: with Session(self.engine) as session: return session.query(CalendarInfo).filter( CalendarInfo.id == calendar_id ).first() else: # 使用Flask-SQLAlchemy的db.session return CalendarInfo.query.filter( CalendarInfo.id == calendar_id ).first() def update_calendar_info(self, calendar_id: int, update_data: dict) -> Optional[CalendarInfo]: """ 更新黄历信息 Args: calendar_id (int): 黄历信息ID update_data (dict): 要更新的数据 Returns: Optional[CalendarInfo]: 更新后的黄历信息对象,如果不存在则返回None """ if self.engine: with Session(self.engine) as session: calendar_info = session.query(CalendarInfo).filter( CalendarInfo.id == calendar_id ).first() if not calendar_info: return None # 更新字段 for key, value in update_data.items(): if hasattr(calendar_info, key): if key == 'yangli' and isinstance(value, str): try: value = date.fromisoformat(value) except ValueError: continue setattr(calendar_info, key, value) session.commit() session.refresh(calendar_info) else: # 使用Flask-SQLAlchemy的db.session calendar_info = CalendarInfo.query.filter( CalendarInfo.id == calendar_id ).first() if not calendar_info: return None # 更新字段 for key, value in update_data.items(): if hasattr(calendar_info, key): if key == 'yangli' and isinstance(value, str): try: value = date.fromisoformat(value) except ValueError: continue setattr(calendar_info, key, value) db.session.commit() db.session.refresh(calendar_info) return calendar_info def delete_calendar_info(self, calendar_id: int) -> bool: """ 删除黄历信息 Args: calendar_id (int): 黄历信息ID Returns: bool: 删除成功返回True,否则返回False """ if self.engine: with Session(self.engine) as session: calendar_info = session.query(CalendarInfo).filter( CalendarInfo.id == calendar_id ).first() if not calendar_info: return False session.delete(calendar_info) session.commit() else: # 使用Flask-SQLAlchemy的db.session calendar_info = CalendarInfo.query.filter( CalendarInfo.id == calendar_id ).first() if not calendar_info: return False db.session.delete(calendar_info) db.session.commit() return True def get_calendar_list(self, limit: int = 100, offset: int = 0) -> list[CalendarInfo]: """ 获取黄历信息列表 Args: limit (int): 限制返回数量,默认100 offset (int): 偏移量,默认0 Returns: list[CalendarInfo]: 黄历信息对象列表 """ if self.engine: with Session(self.engine) as session: return session.query(CalendarInfo).order_by( CalendarInfo.yangli.desc() ).offset(offset).limit(limit).all() else: # 使用Flask-SQLAlchemy的db.session return CalendarInfo.query.order_by( CalendarInfo.yangli.desc() ).offset(offset).limit(limit).all() def search_calendar_by_keyword(self, keyword: str, limit: int = 100) -> list[CalendarInfo]: """ 根据关键词搜索黄历信息 Args: keyword (str): 搜索关键词 limit (int): 限制返回数量,默认100 Returns: list[CalendarInfo]: 匹配的黄历信息对象列表 """ if self.engine: with Session(self.engine) as session: return session.query(CalendarInfo).filter( (CalendarInfo.yinli.contains(keyword)) | (CalendarInfo.wuxing.contains(keyword)) | (CalendarInfo.chongsha.contains(keyword)) | (CalendarInfo.baiji.contains(keyword)) | (CalendarInfo.jishen.contains(keyword)) | (CalendarInfo.yi.contains(keyword)) | (CalendarInfo.xiongshen.contains(keyword)) | (CalendarInfo.ji.contains(keyword)) | (CalendarInfo.color.contains(keyword)) ).limit(limit).all() else: # 使用Flask-SQLAlchemy的db.session return CalendarInfo.query.filter( (CalendarInfo.yinli.contains(keyword)) | (CalendarInfo.wuxing.contains(keyword)) | (CalendarInfo.chongsha.contains(keyword)) | (CalendarInfo.baiji.contains(keyword)) | (CalendarInfo.jishen.contains(keyword)) | (CalendarInfo.yi.contains(keyword)) | (CalendarInfo.xiongshen.contains(keyword)) | (CalendarInfo.ji.contains(keyword)) | (CalendarInfo.color.contains(keyword)) ).limit(limit).all() def fetch_calendar_from_api(self, yangli_date: date) -> Optional[dict]: """ 从外部API获取黄历信息 Args: yangli_date (date): 阳历日期 Returns: Optional[dict]: API返回的黄历信息,如果失败则返回None """ try: # 从配置文件获取API配置 api_url = CALENDAR_API_CONFIG['url'] api_key = CALENDAR_API_CONFIG['key'] timeout = CALENDAR_API_CONFIG['timeout'] # 格式化日期为YYYYMMDD格式 date_str = yangli_date.strftime('%Y%m%d') # 请求参数 request_params = { 'key': api_key, 'date': date_str, } # 发起API请求 response = requests.get(api_url, params=request_params, timeout=timeout) if response.status_code == 200: response_result = response.json() # 检查API返回结果 if response_result.get('error_code') == 0 and response_result.get('reason') == 'successed': return response_result.get('result') else: print(f"API返回错误: {response_result}") return None else: print(f"API请求失败,状态码: {response.status_code}") return None except requests.exceptions.RequestException as e: print(f"API请求异常: {e}") return None except Exception as e: print(f"获取API数据时发生错误: {e}") return None def save_calendar_from_api(self, api_data: dict) -> Optional[CalendarInfo]: """ 将API返回的黄历信息保存到数据库 Args: api_data (dict): API返回的黄历信息数据 Returns: Optional[CalendarInfo]: 保存后的黄历信息对象,如果失败则返回None """ try: # 解析API数据 yangli_str = api_data.get('yangli') if not yangli_str: print("API数据中缺少阳历日期") return None # 解析日期 try: yangli_date = date.fromisoformat(yangli_str) except ValueError: print(f"无效的日期格式: {yangli_str}") return None # 从wuxing字段中判断五行元素并设置对应的颜色值 wuxing = api_data.get('wuxing', '') or '' color = api_data.get('color') # 先获取API中的color值 # 如果API中没有color值,则根据wuxing字段判断五行元素设置颜色 if not color: if '金' in wuxing: color = '白' elif '水' in wuxing: color = '黑' elif '木' in wuxing: color = '绿' elif '火' in wuxing: color = '红' elif '土' in wuxing: color = '黄' # 创建CalendarInfo对象 calendar_info = CalendarInfo( yangli=yangli_date, # type: ignore yinli=api_data.get('yinli', ''), # type: ignore wuxing=wuxing, # type: ignore chongsha=api_data.get('chongsha'), # type: ignore baiji=api_data.get('baiji'), # type: ignore jishen=api_data.get('jishen'), # type: ignore yi=api_data.get('yi'), # type: ignore xiongshen=api_data.get('xionshen'), # type: ignore # 注意API返回的是xionshen ji=api_data.get('ji'), # type: ignore color=color # type: ignore ) # 保存到数据库 if self.engine: with Session(self.engine) as session: session.add(calendar_info) session.commit() session.refresh(calendar_info) else: # 使用Flask-SQLAlchemy的db.session db.session.add(calendar_info) db.session.commit() db.session.refresh(calendar_info) print(f"成功保存黄历信息到数据库,ID: {calendar_info.id}") return calendar_info except Exception as e: print(f"保存API数据到数据库时发生错误: {e}") return None class WechatUserService: """ 微信用户信息服务类 提供微信用户的注册、登录、状态管理等操作 """ def __init__(self, engine=None): """ 初始化服务 Args: engine: SQLAlchemy引擎对象,如果为None则使用Flask-SQLAlchemy的db.session """ self.engine = engine def create_user(self, user_data: dict) -> WechatUser: """ 创建新的微信用户记录 Args: user_data (dict): 用户信息数据 Returns: WechatUser: 创建的用户对象 """ user = WechatUser.from_dict(user_data) if self.engine: with Session(self.engine) as session: session.add(user) session.commit() session.refresh(user) else: # 使用Flask-SQLAlchemy的db.session db.session.add(user) db.session.commit() db.session.refresh(user) return user def get_user_by_openid(self, openid: str) -> Optional[WechatUser]: """ 根据微信openid查询用户 Args: openid (str): 微信用户openid Returns: Optional[WechatUser]: 用户对象,如果不存在则返回None """ if self.engine: with Session(self.engine) as session: return session.query(WechatUser).filter( WechatUser.openid == openid ).first() else: # 使用Flask-SQLAlchemy的db.session return WechatUser.query.filter( WechatUser.openid == openid ).first() def get_user_by_id(self, user_id: int) -> Optional[WechatUser]: """ 根据ID查询用户 Args: user_id (int): 用户ID Returns: Optional[WechatUser]: 用户对象,如果不存在则返回None """ if self.engine: with Session(self.engine) as session: return session.query(WechatUser).filter( WechatUser.id == user_id ).first() else: # 使用Flask-SQLAlchemy的db.session return WechatUser.query.filter( WechatUser.id == user_id ).first() def get_user_by_phone(self, phone_number: str) -> Optional[WechatUser]: """ 根据手机号查询用户 Args: phone_number (str): 手机号码 Returns: Optional[WechatUser]: 用户对象,如果不存在则返回None """ if self.engine: with Session(self.engine) as session: return session.query(WechatUser).filter( WechatUser.phone_number == phone_number ).first() else: # 使用Flask-SQLAlchemy的db.session return WechatUser.query.filter( WechatUser.phone_number == phone_number ).first() def register_user_by_code(self, wechat_code: str, phone_number: Optional[str] = None, id_card_number: Optional[str] = None, platform: str = 'miniprogram') -> tuple[bool, Optional[WechatUser], Optional[str]]: """ 通过微信授权码注册新用户或返回已存在用户 如果用户已存在,则返回现有用户信息;如果用户不存在,则创建新用户。 Args: wechat_code (str): 微信授权码(15分钟有效期) phone_number (str, optional): 手机号码 id_card_number (str, optional): 身份证号码 platform (str): 微信平台类型,默认为小程序 Returns: tuple[bool, Optional[WechatUser], Optional[str]]: (是否成功, 用户对象, 状态信息) - 新用户: (True, user, None) - 已存在用户: (True, user, "用户已存在") - 失败: (False, None, 错误信息) """ try: # 使用微信code换取openid success, openid, error_msg = get_openid_from_code(wechat_code, platform) if not success or not openid: return False, None, f"获取openid失败: {error_msg}" # 验证openid格式 if not validate_openid(openid): return False, None, "无效的openid格式" # 检查用户是否已存在 existing_user = self.get_user_by_openid(openid) if existing_user: # 用户已存在,返回现有用户信息 return True, existing_user, "用户已存在" # 创建用户数据 user_data = { 'openid': openid, 'phone_number': phone_number, 'id_card_number': id_card_number, 'login_status': False, 'user_status': 'active' } user = self.create_user(user_data) return True, user, None except Exception as e: return False, None, f"注册用户时发生错误: {str(e)}" def register_user_by_openid(self, openid: str, phone_number: Optional[str] = None, id_card_number: Optional[str] = None) -> WechatUser: """ 直接通过openid注册新用户(用于已知openid的情况) Args: openid (str): 微信用户openid phone_number (str, optional): 手机号码 id_card_number (str, optional): 身份证号码 Returns: WechatUser: 注册的用户对象 Raises: ValueError: 如果用户已存在或openid无效 """ # 验证openid格式 if not validate_openid(openid): raise ValueError(f"无效的openid格式: {openid}") # 检查用户是否已存在 existing_user = self.get_user_by_openid(openid) if existing_user: raise ValueError(f"用户已存在,openid: {openid}") # 创建用户数据 user_data = { 'openid': openid, 'phone_number': phone_number, 'id_card_number': id_card_number, 'login_status': False, 'user_status': 'active' } return self.create_user(user_data) def login_user_by_code(self, wechat_code: str, platform: str = 'miniprogram') -> tuple[bool, Optional[WechatUser], Optional[str]]: """ 通过微信授权码进行用户登录 Args: wechat_code (str): 微信授权码(15分钟有效期) platform (str): 微信平台类型,默认为小程序 Returns: tuple[bool, Optional[WechatUser], Optional[str]]: (是否成功, 用户对象, 错误信息) """ try: # 使用微信code换取openid success, openid, error_msg = get_openid_from_code(wechat_code, platform) if not success or not openid: return False, None, f"获取openid失败: {error_msg}" # 验证openid格式 if not validate_openid(openid): return False, None, "无效的openid格式" # 查找用户 user = self.get_user_by_openid(openid) if not user: return False, None, "用户不存在,请先注册" # 检查用户状态 if user.user_status != 'active': return False, None, f"用户账户状态异常: {user.user_status}" # 更新登录状态和登录时间 update_data = { 'login_status': True, 'login_time': datetime.utcnow() } updated_user = self.update_user(user.id, update_data) if updated_user: return True, updated_user, None else: return False, None, "更新登录状态失败" except Exception as e: return False, None, f"登录时发生错误: {str(e)}" def login_user_by_openid(self, openid: str) -> Optional[WechatUser]: """ 直接通过openid进行用户登录(用于已知openid的情况) Args: openid (str): 微信用户openid Returns: Optional[WechatUser]: 登录成功返回用户对象,否则返回None """ # 验证openid格式 if not validate_openid(openid): return None user = self.get_user_by_openid(openid) if not user: return None # 检查用户状态 if user.user_status != 'active': return None # 更新登录状态和登录时间 update_data = { 'login_status': True, 'login_time': datetime.utcnow() } return self.update_user(user.id, update_data) def logout_user_by_openid(self, openid: str) -> bool: """ 通过openid进行用户登出 Args: openid (str): 微信用户openid Returns: bool: 登出成功返回True,否则返回False """ # 验证openid格式 if not validate_openid(openid): return False user = self.get_user_by_openid(openid) if not user: return False # 更新登录状态 update_data = { 'login_status': False } updated_user = self.update_user(user.id, update_data) return updated_user is not None def update_user(self, user_id: int, update_data: dict) -> Optional[WechatUser]: """ 更新用户信息 Args: user_id (int): 用户ID update_data (dict): 要更新的数据 Returns: Optional[WechatUser]: 更新后的用户对象,如果不存在则返回None """ if self.engine: with Session(self.engine) as session: user = session.query(WechatUser).filter( WechatUser.id == user_id ).first() if not user: return None # 更新字段 for key, value in update_data.items(): if hasattr(user, key): if key in ['login_time', 'created_at', 'updated_at'] and isinstance(value, str): try: value = datetime.fromisoformat(value.replace('Z', '+00:00')) except ValueError: continue setattr(user, key, value) session.commit() session.refresh(user) else: # 使用Flask-SQLAlchemy的db.session user = WechatUser.query.filter( WechatUser.id == user_id ).first() if not user: return None # 更新字段 for key, value in update_data.items(): if hasattr(user, key): if key in ['login_time', 'created_at', 'updated_at'] and isinstance(value, str): try: value = datetime.fromisoformat(value.replace('Z', '+00:00')) except ValueError: continue setattr(user, key, value) db.session.commit() db.session.refresh(user) return user def deactivate_user(self, user_id: int) -> bool: """ 停用用户账户 Args: user_id (int): 用户ID Returns: bool: 停用成功返回True,否则返回False """ update_data = { 'user_status': 'inactive', 'login_status': False } updated_user = self.update_user(user_id, update_data) return updated_user is not None def activate_user(self, user_id: int) -> bool: """ 激活用户账户 Args: user_id (int): 用户ID Returns: bool: 激活成功返回True,否则返回False """ update_data = { 'user_status': 'active' } updated_user = self.update_user(user_id, update_data) return updated_user is not None def delete_user(self, user_id: int) -> bool: """ 删除用户(软删除,将状态设为deleted) Args: user_id (int): 用户ID Returns: bool: 删除成功返回True,否则返回False """ update_data = { 'user_status': 'deleted', 'login_status': False } updated_user = self.update_user(user_id, update_data) return updated_user is not None def get_active_users(self, limit: int = 100, offset: int = 0) -> list[WechatUser]: """ 获取活跃用户列表 Args: limit (int): 限制返回数量,默认100 offset (int): 偏移量,默认0 Returns: list[WechatUser]: 活跃用户对象列表 """ if self.engine: with Session(self.engine) as session: return session.query(WechatUser).filter( WechatUser.user_status == 'active' ).order_by( WechatUser.created_at.desc() ).offset(offset).limit(limit).all() else: # 使用Flask-SQLAlchemy的db.session return WechatUser.query.filter( WechatUser.user_status == 'active' ).order_by( WechatUser.created_at.desc() ).offset(offset).limit(limit).all() def get_logged_in_users(self, limit: int = 100) -> list[WechatUser]: """ 获取当前已登录的用户列表 Args: limit (int): 限制返回数量,默认100 Returns: list[WechatUser]: 已登录用户对象列表 """ if self.engine: with Session(self.engine) as session: return session.query(WechatUser).filter( WechatUser.login_status == True, WechatUser.user_status == 'active' ).order_by( WechatUser.login_time.desc() ).limit(limit).all() else: # 使用Flask-SQLAlchemy的db.session return WechatUser.query.filter( WechatUser.login_status == True, WechatUser.user_status == 'active' ).order_by( WechatUser.login_time.desc() ).limit(limit).all() # 便捷函数 def create_calendar_info(calendar_data: dict, engine=None) -> CalendarInfo: """ 创建黄历信息的便捷函数 Args: calendar_data (dict): 黄历信息数据 engine: SQLAlchemy引擎对象 Returns: CalendarInfo: 创建的黄历信息对象 """ service = CalendarService(engine) return service.create_calendar_info(calendar_data) def get_calendar_by_date(yangli_date: str, engine=None) -> dict: """ 根据阳历日期查询黄历信息的便捷函数 Args: yangli_date (str): 阳历日期,格式为YYYY-MM-DD engine: SQLAlchemy引擎对象 Returns: dict: 包含查询结果的JSON格式数据 """ try: # 验证日期格式 if not isinstance(yangli_date, str) or len(yangli_date) != 10: return { "reason": "failed", "return_code": 400, "result": None, "error": "日期格式错误,请使用YYYY-MM-DD格式" } # 解析日期字符串 try: parsed_date = date.fromisoformat(yangli_date) except ValueError: return { "reason": "failed", "return_code": 400, "result": None, "error": "无效的日期格式" } # 查询数据库 service = CalendarService(engine) calendar_info = service.get_calendar_by_date(parsed_date) if calendar_info: # 查询成功,返回指定格式的JSON数据 return { "reason": "successed", "return_code": 200, "result": { "id": str(calendar_info.id), "yangli": calendar_info.yangli.isoformat() if calendar_info.yangli is not None else None, "yinli": calendar_info.yinli, "wuxing": calendar_info.wuxing, "chongsha": calendar_info.chongsha, "baiji": calendar_info.baiji, "jishen": calendar_info.jishen, "yi": calendar_info.yi, "xiongshen": calendar_info.xiongshen, "ji": calendar_info.ji, "color": calendar_info.color } } else: # 数据库中没有找到记录,尝试从API获取 print(f"数据库中没有找到日期 {yangli_date} 的黄历信息,尝试从API获取...") # 从API获取数据 api_data = service.fetch_calendar_from_api(parsed_date) if api_data: # API获取成功,保存到数据库 print("API获取数据成功,正在保存到数据库...") saved_calendar = service.save_calendar_from_api(api_data) if saved_calendar: # 保存成功,返回数据 return { "reason": "successed", "return_code": 200, "result": { "id": str(saved_calendar.id), "yangli": saved_calendar.yangli.isoformat() if saved_calendar.yangli is not None else None, "yinli": saved_calendar.yinli, "wuxing": saved_calendar.wuxing, "chongsha": saved_calendar.chongsha, "baiji": saved_calendar.baiji, "jishen": saved_calendar.jishen, "yi": saved_calendar.yi, "xiongshen": saved_calendar.xiongshen, "ji": saved_calendar.ji, "color": saved_calendar.color } } else: # 保存到数据库失败 return { "reason": "failed", "return_code": 500, "result": None, "error": f"API获取数据成功但保存到数据库失败" } else: # API获取失败 return { "reason": "failed", "return_code": 404, "result": None, "error": f"未找到日期 {yangli_date} 的黄历信息,且API获取失败" } except Exception as e: # 发生异常 return { "reason": "failed", "return_code": 500, "result": None, "error": f"查询过程中发生错误: {str(e)}" } def get_calendar_by_id(calendar_id: int, engine=None) -> Optional[CalendarInfo]: """ 根据ID查询黄历信息的便捷函数 Args: calendar_id (int): 黄历信息ID engine: SQLAlchemy引擎对象 Returns: Optional[CalendarInfo]: 黄历信息对象 """ service = CalendarService(engine) return service.get_calendar_by_id(calendar_id) class CalendarRecordService: """ 日历内容记录服务类 提供日历内容记录的增删改查操作 """ def __init__(self, engine=None): """ 初始化服务 Args: engine: SQLAlchemy引擎对象,如果为None则使用Flask-SQLAlchemy的db.session """ self.engine = engine def save_calendar_record(self, data: dict) -> tuple[bool, Optional[CalendarRecord], Optional[str]]: """ 保存日历记录(插入或更新) Args: data (dict): 包含openid, month_key, calendar_content的字典 Returns: tuple[bool, Optional[CalendarRecord], Optional[str]]: (是否成功, 记录对象, 错误信息) """ try: # 验证必需参数 openid = data.get('openid') month_key = data.get('month_key') calendar_content = data.get('calendar_content') if not openid: return False, None, "缺少必需参数: openid" if not month_key: return False, None, "缺少必需参数: month_key" if calendar_content is None: return False, None, "缺少必需参数: calendar_content" # 验证openid格式 if not validate_openid(openid): return False, None, f"无效的openid格式: {openid}" # 验证月份格式 if not CalendarRecord.validate_month_key(month_key): return False, None, f"无效的月份格式: {month_key}" # 验证日历内容格式 if not CalendarRecord.validate_calendar_content(calendar_content): return False, None, "无效的日历内容格式" if self.engine: with Session(self.engine) as session: # 查找是否已存在记录 existing_record = session.query(CalendarRecord).filter( CalendarRecord.openid == openid, CalendarRecord.month_key == month_key ).first() if existing_record: # 更新现有记录 existing_record.calendar_content = calendar_content existing_record.updated_at = datetime.utcnow() session.commit() session.refresh(existing_record) return True, existing_record, None else: # 创建新记录 new_record = CalendarRecord( openid=openid, month_key=month_key, calendar_content=calendar_content ) session.add(new_record) session.commit() session.refresh(new_record) return True, new_record, None else: # 使用Flask-SQLAlchemy的db.session existing_record = CalendarRecord.query.filter( CalendarRecord.openid == openid, CalendarRecord.month_key == month_key ).first() if existing_record: # 更新现有记录 existing_record.calendar_content = calendar_content existing_record.updated_at = datetime.utcnow() db.session.commit() db.session.refresh(existing_record) return True, existing_record, None else: # 创建新记录 new_record = CalendarRecord( openid=openid, month_key=month_key, calendar_content=calendar_content ) db.session.add(new_record) db.session.commit() db.session.refresh(new_record) return True, new_record, None except Exception as e: return False, None, f"保存日历记录时发生错误: {str(e)}" def get_calendar_record(self, openid: str, month_key: str) -> tuple[bool, Optional[CalendarRecord], Optional[str]]: """ 查找日历记录 Args: openid (str): 微信用户openid month_key (str): 月份标识(YYYY-MM格式) Returns: tuple[bool, Optional[CalendarRecord], Optional[str]]: (是否成功, 记录对象或None, 错误信息) """ try: # 验证参数 if not openid: return False, None, "缺少必需参数: openid" if not month_key: return False, None, "缺少必需参数: month_key" # 验证openid格式 if not validate_openid(openid): return False, None, f"无效的openid格式: {openid}" # 验证月份格式 if not CalendarRecord.validate_month_key(month_key): return False, None, f"无效的月份格式: {month_key}" if self.engine: with Session(self.engine) as session: record = session.query(CalendarRecord).filter( CalendarRecord.openid == openid, CalendarRecord.month_key == month_key ).first() return True, record, None else: # 使用Flask-SQLAlchemy的db.session record = CalendarRecord.query.filter( CalendarRecord.openid == openid, CalendarRecord.month_key == month_key ).first() return True, record, None except Exception as e: return False, None, f"查找日历记录时发生错误: {str(e)}" def get_user_records(self, openid: str, limit: int = 12) -> tuple[bool, list[CalendarRecord], Optional[str]]: """ 获取用户的所有日历记录 Args: openid (str): 微信用户openid limit (int): 限制返回数量,默认12(一年的月份数) Returns: tuple[bool, list[CalendarRecord], Optional[str]]: (是否成功, 记录列表, 错误信息) """ try: # 验证参数 if not openid: return False, [], "缺少必需参数: openid" # 验证openid格式 if not validate_openid(openid): return False, [], f"无效的openid格式: {openid}" if self.engine: with Session(self.engine) as session: records = session.query(CalendarRecord).filter( CalendarRecord.openid == openid ).order_by( CalendarRecord.month_key.desc() ).limit(limit).all() return True, records, None else: # 使用Flask-SQLAlchemy的db.session records = CalendarRecord.query.filter( CalendarRecord.openid == openid ).order_by( CalendarRecord.month_key.desc() ).limit(limit).all() return True, records, None except Exception as e: return False, [], f"获取用户日历记录时发生错误: {str(e)}" def delete_calendar_record(self, openid: str, month_key: str) -> tuple[bool, Optional[str]]: """ 删除日历记录 Args: openid (str): 微信用户openid month_key (str): 月份标识(YYYY-MM格式) Returns: tuple[bool, Optional[str]]: (是否成功, 错误信息) """ try: # 验证参数 if not openid: return False, "缺少必需参数: openid" if not month_key: return False, "缺少必需参数: month_key" # 验证openid格式 if not validate_openid(openid): return False, f"无效的openid格式: {openid}" # 验证月份格式 if not CalendarRecord.validate_month_key(month_key): return False, f"无效的月份格式: {month_key}" if self.engine: with Session(self.engine) as session: record = session.query(CalendarRecord).filter( CalendarRecord.openid == openid, CalendarRecord.month_key == month_key ).first() if record: session.delete(record) session.commit() return True, None else: return False, "记录不存在" else: # 使用Flask-SQLAlchemy的db.session record = CalendarRecord.query.filter( CalendarRecord.openid == openid, CalendarRecord.month_key == month_key ).first() if record: db.session.delete(record) db.session.commit() return True, None else: return False, "记录不存在" except Exception as e: return False, f"删除日历记录时发生错误: {str(e)}" # 微信用户相关便捷函数 def register_wechat_user(wechat_code: str, phone_number: Optional[str] = None, id_card_number: Optional[str] = None, platform: str = 'miniprogram', engine=None) -> dict: """ 注册微信用户的便捷函数 如果用户已存在,则返回现有用户信息;如果用户不存在,则创建新用户。 Args: wechat_code (str): 微信授权码(15分钟有效期) phone_number (str, optional): 手机号码 id_card_number (str, optional): 身份证号码 platform (str): 微信平台类型,默认为小程序 engine: SQLAlchemy引擎对象 Returns: dict: 包含注册结果的JSON格式数据 - 新用户注册成功: return_code=201, is_new_user=True - 用户已存在: return_code=200, is_new_user=False - 注册失败: return_code=400/500 """ try: # 验证必填参数 if not wechat_code or not isinstance(wechat_code, str): return { "reason": "failed", "return_code": 400, "result": None, "error": "微信授权码不能为空" } # 创建服务实例 service = WechatUserService(engine) # 尝试注册用户 success, user, error_msg = service.register_user_by_code(wechat_code, phone_number, id_card_number, platform) if success and user: # 检查是否为已存在用户 if error_msg == "用户已存在": # 用户已存在,返回现有用户信息 return { "reason": "successed", "return_code": 200, "result": { "id": str(user.id), "openid": user.openid, "phone_number": user.phone_number, "id_card_number": user.id_card_number, "login_status": user.login_status, "user_status": user.user_status, "created_at": user.created_at.isoformat() if user.created_at is not None else None, "updated_at": user.updated_at.isoformat() if user.updated_at is not None else None, "is_new_user": False }, "message": "用户已存在,返回现有用户信息" } else: # 新用户注册成功 return { "reason": "successed", "return_code": 201, "result": { "id": str(user.id), "openid": user.openid, "phone_number": user.phone_number, "id_card_number": user.id_card_number, "login_status": user.login_status, "user_status": user.user_status, "created_at": user.created_at.isoformat() if user.created_at is not None else None, "updated_at": user.updated_at.isoformat() if user.updated_at is not None else None, "is_new_user": True }, "message": "新用户注册成功" } else: # 注册失败 return { "reason": "failed", "return_code": 400, "result": None, "error": error_msg or "注册失败" } except Exception as e: # 其他系统错误 return { "reason": "failed", "return_code": 500, "result": None, "error": f"注册过程中发生错误: {str(e)}" } def login_wechat_user(wechat_code: str, platform: str = 'miniprogram', engine=None) -> dict: """ 微信用户登录的便捷函数 Args: wechat_code (str): 微信授权码(15分钟有效期) platform (str): 微信平台类型,默认为小程序 engine: SQLAlchemy引擎对象 Returns: dict: 包含登录结果的JSON格式数据 """ try: # 验证必填参数 if not wechat_code or not isinstance(wechat_code, str): return { "reason": "failed", "return_code": 400, "result": None, "error": "微信授权码不能为空" } # 创建服务实例 service = WechatUserService(engine) # 尝试登录 success, user, error_msg = service.login_user_by_code(wechat_code, platform) if success and user: # 登录成功 return { "reason": "successed", "return_code": 200, "result": { "id": str(user.id), "openid": user.openid, "phone_number": user.phone_number, "id_card_number": user.id_card_number, "login_status": user.login_status, "login_time": user.login_time.isoformat() if user.login_time is not None else None, "user_status": user.user_status } } else: # 登录失败 error_code = 404 if "不存在" in (error_msg or "") else 401 return { "reason": "failed", "return_code": error_code, "result": None, "error": error_msg or "登录失败" } except Exception as e: # 系统错误 return { "reason": "failed", "return_code": 500, "result": None, "error": f"登录过程中发生错误: {str(e)}" } def logout_wechat_user(openid: str, engine=None) -> dict: """ 微信用户登出的便捷函数 Args: openid (str): 微信用户openid engine: SQLAlchemy引擎对象 Returns: dict: 包含登出结果的JSON格式数据 """ try: # 验证必填参数 if not openid or not isinstance(openid, str): return { "reason": "failed", "return_code": 400, "result": None, "error": "微信用户openid不能为空" } # 创建服务实例 service = WechatUserService(engine) # 尝试登出 success = service.logout_user_by_openid(openid) if success: # 登出成功 return { "reason": "successed", "return_code": 200, "result": { "message": "用户已成功登出" } } else: # 登出失败 return { "reason": "failed", "return_code": 404, "result": None, "error": "用户不存在或openid无效" } except Exception as e: # 系统错误 return { "reason": "failed", "return_code": 500, "result": None, "error": f"登出过程中发生错误: {str(e)}" } def get_wechat_user_info(openid: str, engine=None) -> dict: """ 获取微信用户信息的便捷函数 Args: openid (str): 微信用户openid engine: SQLAlchemy引擎对象 Returns: dict: 包含用户信息的JSON格式数据 """ try: # 验证必填参数 if not openid or not isinstance(openid, str): return { "reason": "failed", "return_code": 400, "result": None, "error": "微信用户openid不能为空" } # 创建服务实例 service = WechatUserService(engine) # 查询用户信息 user = service.get_user_by_openid(openid) if user: # 查询成功 return { "reason": "successed", "return_code": 200, "result": { "id": str(user.id), "openid": user.openid, "phone_number": user.phone_number, "id_card_number": user.id_card_number, "login_status": user.login_status, "login_time": user.login_time.isoformat() if user.login_time is not None else None, "user_status": user.user_status, "created_at": user.created_at.isoformat() if user.created_at is not None else None, "updated_at": user.updated_at.isoformat() if user.updated_at is not None else None } } else: # 用户不存在 return { "reason": "failed", "return_code": 404, "result": None, "error": f"未找到openid为 {openid} 的用户" } except Exception as e: # 系统错误 return { "reason": "failed", "return_code": 500, "result": None, "error": f"查询过程中发生错误: {str(e)}" } def update_wechat_user_info(openid: str, update_data: dict, engine=None) -> dict: """ 更新微信用户信息的便捷函数 Args: openid (str): 微信用户openid update_data (dict): 要更新的数据 engine: SQLAlchemy引擎对象 Returns: dict: 包含更新结果的JSON格式数据 """ try: # 验证必填参数 if not openid or not isinstance(openid, str): return { "reason": "failed", "return_code": 400, "result": None, "error": "微信用户openid不能为空" } if not update_data or not isinstance(update_data, dict): return { "reason": "failed", "return_code": 400, "result": None, "error": "更新数据不能为空" } # 创建服务实例 service = WechatUserService(engine) # 先查找用户 user = service.get_user_by_openid(openid) if not user: return { "reason": "failed", "return_code": 404, "result": None, "error": f"未找到openid为 {openid} 的用户" } # 更新用户信息 updated_user = service.update_user(user.id, update_data) if updated_user: # 更新成功 return { "reason": "successed", "return_code": 200, "result": { "id": str(updated_user.id), "openid": updated_user.openid, "phone_number": updated_user.phone_number, "id_card_number": updated_user.id_card_number, "login_status": updated_user.login_status, "login_time": updated_user.login_time.isoformat() if updated_user.login_time is not None else None, "user_status": updated_user.user_status, "created_at": updated_user.created_at.isoformat() if updated_user.created_at is not None else None, "updated_at": updated_user.updated_at.isoformat() if updated_user.updated_at is not None else None } } else: # 更新失败 return { "reason": "failed", "return_code": 500, "result": None, "error": "更新用户信息失败" } except Exception as e: # 系统错误 return { "reason": "failed", "return_code": 500, "result": None, "error": f"更新过程中发生错误: {str(e)}" } # 日历内容记录相关便捷函数 def save_calendar_record(data: dict, engine=None) -> dict: """ 保存日历记录的便捷函数 Args: data (dict): 包含openid, month_key, calendar_content的字典 engine: SQLAlchemy引擎对象 Returns: dict: 包含保存结果的JSON格式数据 """ try: # 验证必填参数 if not data or not isinstance(data, dict): return { "reason": "failed", "return_code": 400, "result": None, "error": "请求数据不能为空" } # 创建服务实例 service = CalendarRecordService(engine) # 尝试保存记录 success, record, error_msg = service.save_calendar_record(data) if success and record: # 保存成功 return { "reason": "successed", "return_code": 200, "result": { "id": record.id, "openid": record.openid, "month_key": record.month_key, "calendar_content": record.calendar_content, "created_at": record.created_at.isoformat() if record.created_at else None, "updated_at": record.updated_at.isoformat() if record.updated_at else None } } else: # 保存失败 error_code = 400 return { "reason": "failed", "return_code": error_code, "result": None, "error": error_msg or "保存失败" } except Exception as e: # 其他系统错误 return { "reason": "failed", "return_code": 500, "result": None, "error": f"保存过程中发生错误: {str(e)}" } def get_calendar_record(openid: str, month_key: str, engine=None) -> dict: """ 查找日历记录的便捷函数 Args: openid (str): 微信用户openid month_key (str): 月份标识(YYYY-MM格式) engine: SQLAlchemy引擎对象 Returns: dict: 包含查询结果的JSON格式数据 """ try: # 验证必填参数 if not openid: return { "reason": "failed", "return_code": 400, "result": None, "error": "缺少必需参数: openid" } if not month_key: return { "reason": "failed", "return_code": 400, "result": None, "error": "缺少必需参数: month_key" } # 创建服务实例 service = CalendarRecordService(engine) # 尝试查找记录 success, record, error_msg = service.get_calendar_record(openid, month_key) if success: if record: # 查找成功,有记录 return { "reason": "successed", "return_code": 200, "result": { "id": record.id, "openid": record.openid, "month_key": record.month_key, "calendar_content": record.calendar_content, "created_at": record.created_at.isoformat() if record.created_at else None, "updated_at": record.updated_at.isoformat() if record.updated_at else None } } else: # 查找成功,但无记录 return { "reason": "successed", "return_code": 200, "result": { "id": None, "openid": openid, "month_key": month_key, "calendar_content": [], "created_at": None, "updated_at": None } } else: # 查找失败 return { "reason": "failed", "return_code": 400, "result": None, "error": error_msg or "查找失败" } except Exception as e: # 系统错误 return { "reason": "failed", "return_code": 500, "result": None, "error": f"查找过程中发生错误: {str(e)}" } def get_user_calendar_records(openid: str, limit: int = 12, engine=None) -> dict: """ 获取用户所有日历记录的便捷函数 Args: openid (str): 微信用户openid limit (int): 限制返回数量,默认12 engine: SQLAlchemy引擎对象 Returns: dict: 包含查询结果的JSON格式数据 """ try: # 验证必填参数 if not openid: return { "reason": "failed", "return_code": 400, "result": None, "error": "缺少必需参数: openid" } # 创建服务实例 service = CalendarRecordService(engine) # 尝试获取记录 success, records, error_msg = service.get_user_records(openid, limit) if success: # 转换记录为字典列表 result_records = [] for record in records: result_records.append({ "id": record.id, "openid": record.openid, "month_key": record.month_key, "calendar_content": record.calendar_content, "created_at": record.created_at.isoformat() if record.created_at else None, "updated_at": record.updated_at.isoformat() if record.updated_at else None }) return { "reason": "successed", "return_code": 200, "result": { "openid": openid, "total": len(result_records), "records": result_records } } else: # 获取失败 return { "reason": "failed", "return_code": 400, "result": None, "error": error_msg or "获取记录失败" } except Exception as e: # 系统错误 return { "reason": "failed", "return_code": 500, "result": None, "error": f"获取记录过程中发生错误: {str(e)}" } def delete_calendar_record(openid: str, month_key: str, engine=None) -> dict: """ 删除日历记录的便捷函数 Args: openid (str): 微信用户openid month_key (str): 月份标识(YYYY-MM格式) engine: SQLAlchemy引擎对象 Returns: dict: 包含删除结果的JSON格式数据 """ try: # 验证必填参数 if not openid: return { "reason": "failed", "return_code": 400, "result": None, "error": "缺少必需参数: openid" } if not month_key: return { "reason": "failed", "return_code": 400, "result": None, "error": "缺少必需参数: month_key" } # 创建服务实例 service = CalendarRecordService(engine) # 尝试删除记录 success, error_msg = service.delete_calendar_record(openid, month_key) if success: # 删除成功 return { "reason": "successed", "return_code": 200, "result": { "message": f"已删除 {openid} 在 {month_key} 的日历记录" } } else: # 删除失败 error_code = 404 if "不存在" in (error_msg or "") else 400 return { "reason": "failed", "return_code": error_code, "result": None, "error": error_msg or "删除失败" } except Exception as e: # 系统错误 return { "reason": "failed", "return_code": 500, "result": None, "error": f"删除过程中发生错误: {str(e)}" } # 导出主要类和函数 __all__ = [ 'CalendarInfo', 'WechatUser', 'CalendarRecord', 'CalendarService', 'WechatUserService', 'CalendarRecordService', 'create_calendar_info', 'get_calendar_by_date', 'get_calendar_by_id', 'register_wechat_user', 'login_wechat_user', 'logout_wechat_user', 'get_wechat_user_info', 'update_wechat_user_info', 'save_calendar_record', 'get_calendar_record', 'get_user_calendar_records', 'delete_calendar_record' ]