calendar.py 78 KB


  1. """
  2. 黄历信息数据模型
  3. 基于 create_calendar_info.sql 中的DDL定义创建
  4. """
  5. from datetime import date, datetime
  6. from typing import Optional
  7. from sqlalchemy import Column, Integer, Date, Text, String, Boolean, DateTime
  8. from sqlalchemy.orm import Session
  9. from sqlalchemy import create_engine, text
  10. from sqlalchemy.dialects.postgresql import JSONB
  11. import json
  12. import requests
  13. import re
  14. from app import db
  15. from .calendar_config import CALENDAR_API_CONFIG
  16. from .wechat_api import get_openid_from_code, validate_openid
  17. class CalendarInfo(db.Model):
  18. """
  19. 黄历信息表数据模型
  20. 对应数据库表: public.calendar_info
  21. 表注释: 黄历信息表
  22. """
  23. __tablename__ = 'calendar_info'
  24. __table_args__ = {'schema': 'public'}
  25. # 主键ID (serial primary key)
  26. id = db.Column(db.Integer, primary_key=True, autoincrement=True, comment='主键ID')
  27. # 阳历日期 (date not null)
  28. yangli = db.Column(db.Date, nullable=False, comment='阳历日期')
  29. # 阴历日期 (text not null)
  30. yinli = db.Column(db.Text, nullable=False, comment='阴历日期')
  31. # 五行 (text)
  32. wuxing = db.Column(db.Text, nullable=True, comment='五行')
  33. # 冲煞 (text)
  34. chongsha = db.Column(db.Text, nullable=True, comment='冲煞')
  35. # 彭祖百忌 (text)
  36. baiji = db.Column(db.Text, nullable=True, comment='彭祖百忌')
  37. # 吉神宜趋 (text)
  38. jishen = db.Column(db.Text, nullable=True, comment='吉神宜趋')
  39. # 宜 (text)
  40. yi = db.Column(db.Text, nullable=True, comment='宜')
  41. # 凶神宜忌 (text)
  42. xiongshen = db.Column(db.Text, nullable=True, comment='凶神宜忌')
  43. # 忌 (text)
  44. ji = db.Column(db.Text, nullable=True, comment='忌')
  45. # 颜色 (varchar(10))
  46. color = db.Column(db.String(10), nullable=True, comment='颜色')
  47. def __init__(self, **kwargs):
  48. super().__init__(**kwargs)
  49. def __repr__(self):
  50. return f"<CalendarInfo(id={self.id}, yangli='{self.yangli}', yinli='{self.yinli}')>"
  51. def to_dict(self) -> dict:
  52. """
  53. 将模型对象转换为字典
  54. Returns:
  55. dict: 包含所有字段的字典
  56. """
  57. return {
  58. 'id': self.id,
  59. 'yangli': self.yangli.isoformat() if self.yangli is not None else None,
  60. 'yinli': self.yinli,
  61. 'wuxing': self.wuxing,
  62. 'chongsha': self.chongsha,
  63. 'baiji': self.baiji,
  64. 'jishen': self.jishen,
  65. 'yi': self.yi,
  66. 'xiongshen': self.xiongshen,
  67. 'ji': self.ji,
  68. 'color': self.color
  69. }
  70. def to_json(self) -> str:
  71. """
  72. 将模型对象转换为JSON字符串
  73. Returns:
  74. str: JSON格式的字符串
  75. """
  76. return json.dumps(self.to_dict(), ensure_ascii=False, indent=2)
  77. @classmethod
  78. def from_dict(cls, data: dict) -> 'CalendarInfo':
  79. """
  80. 从字典创建模型对象
  81. Args:
  82. data (dict): 包含字段数据的字典
  83. Returns:
  84. CalendarInfo: 新创建的模型对象
  85. """
  86. # 处理日期字段
  87. yangli = data.get('yangli')
  88. if isinstance(yangli, str):
  89. try:
  90. yangli = date.fromisoformat(yangli)
  91. except ValueError:
  92. yangli = None
  93. # 从wuxing字段中判断五行元素并设置对应的颜色值
  94. wuxing = data.get('wuxing', '') or ''
  95. color = data.get('color') # 先获取字典中的color值
  96. # 如果字典中没有color值,则根据wuxing字段判断五行元素设置颜色
  97. if not color:
  98. if '金' in wuxing:
  99. color = '白'
  100. elif '水' in wuxing:
  101. color = '黑'
  102. elif '木' in wuxing:
  103. color = '绿'
  104. elif '火' in wuxing:
  105. color = '红'
  106. elif '土' in wuxing:
  107. color = '黄'
  108. return cls(
  109. yangli=yangli, # type: ignore
  110. yinli=data.get('yinli'), # type: ignore
  111. wuxing=wuxing, # type: ignore
  112. chongsha=data.get('chongsha'), # type: ignore
  113. baiji=data.get('baiji'), # type: ignore
  114. jishen=data.get('jishen'), # type: ignore
  115. yi=data.get('yi'), # type: ignore
  116. xiongshen=data.get('xiongshen'), # type: ignore
  117. ji=data.get('ji'), # type: ignore
  118. color=color # type: ignore
  119. )
  120. class WechatUser(db.Model):
  121. """
  122. 微信用户信息表数据模型
  123. 对应数据库表: public.wechat_users
  124. 表注释: 微信用户信息表
  125. """
  126. __tablename__ = 'wechat_users'
  127. __table_args__ = {'schema': 'public'}
  128. # 主键ID (serial primary key)
  129. id = db.Column(db.Integer, primary_key=True, autoincrement=True, comment='主键ID')
  130. # 微信用户openid (varchar(255) not null unique)
  131. openid = db.Column(db.String(255), nullable=False, unique=True, comment='微信用户openid,唯一标识')
  132. # 用户手机号码 (varchar(20))
  133. phone_number = db.Column(db.String(20), nullable=True, comment='用户手机号码')
  134. # 用户身份证号码 (varchar(18))
  135. id_card_number = db.Column(db.String(18), nullable=True, comment='用户身份证号码')
  136. # 当前登录状态 (boolean default false not null)
  137. login_status = db.Column(db.Boolean, nullable=False, default=False, comment='当前登录状态,true表示已登录,false表示未登录')
  138. # 最后登录时间 (timestamp with time zone)
  139. login_time = db.Column(db.DateTime(timezone=True), nullable=True, comment='最后登录时间')
  140. # 用户账户状态 (varchar(20) default 'active' not null)
  141. user_status = db.Column(db.String(20), nullable=False, default='active', comment='用户账户状态:active-活跃,inactive-非活跃,suspended-暂停,deleted-已删除')
  142. # 账户创建时间 (timestamp with time zone default current_timestamp not null)
  143. created_at = db.Column(db.DateTime(timezone=True), nullable=False, default=datetime.utcnow, comment='账户创建时间')
  144. # 信息更新时间 (timestamp with time zone default current_timestamp not null)
  145. updated_at = db.Column(db.DateTime(timezone=True), nullable=False, default=datetime.utcnow, onupdate=datetime.utcnow, comment='信息更新时间')
  146. def __init__(self, **kwargs):
  147. super().__init__(**kwargs)
  148. def __repr__(self):
  149. return f"<WechatUser(id={self.id}, openid='{self.openid}', user_status='{self.user_status}')>"
  150. def to_dict(self) -> dict:
  151. """
  152. 将模型对象转换为字典
  153. Returns:
  154. dict: 包含所有字段的字典
  155. """
  156. return {
  157. 'id': self.id,
  158. 'openid': self.openid,
  159. 'phone_number': self.phone_number,
  160. 'id_card_number': self.id_card_number,
  161. 'login_status': self.login_status,
  162. 'login_time': self.login_time.isoformat() if self.login_time is not None else None,
  163. 'user_status': self.user_status,
  164. 'created_at': self.created_at.isoformat() if self.created_at is not None else None,
  165. 'updated_at': self.updated_at.isoformat() if self.updated_at is not None else None
  166. }
  167. def to_json(self) -> str:
  168. """
  169. 将模型对象转换为JSON字符串
  170. Returns:
  171. str: JSON格式的字符串
  172. """
  173. return json.dumps(self.to_dict(), ensure_ascii=False, indent=2)
  174. @classmethod
  175. def from_dict(cls, data: dict) -> 'WechatUser':
  176. """
  177. 从字典创建模型对象
  178. Args:
  179. data (dict): 包含字段数据的字典
  180. Returns:
  181. WechatUser: 新创建的模型对象
  182. """
  183. # 处理日期时间字段
  184. login_time = data.get('login_time')
  185. if isinstance(login_time, str):
  186. try:
  187. login_time = datetime.fromisoformat(login_time.replace('Z', '+00:00'))
  188. except ValueError:
  189. login_time = None
  190. created_at = data.get('created_at')
  191. if isinstance(created_at, str):
  192. try:
  193. created_at = datetime.fromisoformat(created_at.replace('Z', '+00:00'))
  194. except ValueError:
  195. created_at = datetime.utcnow()
  196. elif created_at is None:
  197. created_at = datetime.utcnow()
  198. updated_at = data.get('updated_at')
  199. if isinstance(updated_at, str):
  200. try:
  201. updated_at = datetime.fromisoformat(updated_at.replace('Z', '+00:00'))
  202. except ValueError:
  203. updated_at = datetime.utcnow()
  204. elif updated_at is None:
  205. updated_at = datetime.utcnow()
  206. return cls(
  207. openid=data.get('openid'), # type: ignore
  208. phone_number=data.get('phone_number'), # type: ignore
  209. id_card_number=data.get('id_card_number'), # type: ignore
  210. login_status=data.get('login_status', False), # type: ignore
  211. login_time=login_time, # type: ignore
  212. user_status=data.get('user_status', 'active'), # type: ignore
  213. created_at=created_at, # type: ignore
  214. updated_at=updated_at # type: ignore
  215. )
  216. class CalendarRecord(db.Model):
  217. """
  218. 日历内容记录表数据模型
  219. 对应数据库表: public.calendar_records
  220. 表注释: 日历内容记录表
  221. """
  222. __tablename__ = 'calendar_records'
  223. __table_args__ = {'schema': 'public'}
  224. # 主键ID (serial primary key)
  225. id = db.Column(db.Integer, primary_key=True, autoincrement=True, comment='主键ID')
  226. # 微信用户openid (varchar(255) not null)
  227. openid = db.Column(db.String(255), nullable=False, comment='微信用户openid')
  228. # 月份标识 (varchar(7) not null)
  229. month_key = db.Column(db.String(7), nullable=False, comment='月份标识,格式为YYYY-MM')
  230. # 日历内容 (jsonb not null)
  231. calendar_content = db.Column(JSONB, nullable=False, comment='日历内容,JSON数组格式')
  232. # 创建时间 (timestamp with time zone)
  233. created_at = db.Column(db.DateTime(timezone=True), nullable=False, default=datetime.utcnow, comment='记录创建时间')
  234. # 更新时间 (timestamp with time zone)
  235. updated_at = db.Column(db.DateTime(timezone=True), nullable=False, default=datetime.utcnow, onupdate=datetime.utcnow, comment='记录更新时间')
  236. def __init__(self, **kwargs):
  237. super().__init__(**kwargs)
  238. def __repr__(self):
  239. return f"<CalendarRecord(id={self.id}, openid='{self.openid}', month_key='{self.month_key}')>"
  240. def to_dict(self) -> dict:
  241. """
  242. 将模型对象转换为字典
  243. Returns:
  244. dict: 包含所有字段的字典
  245. """
  246. return {
  247. 'id': self.id,
  248. 'openid': self.openid,
  249. 'month_key': self.month_key,
  250. 'calendar_content': self.calendar_content,
  251. 'created_at': self.created_at.isoformat() if self.created_at is not None else None,
  252. 'updated_at': self.updated_at.isoformat() if self.updated_at is not None else None
  253. }
  254. def to_json(self) -> str:
  255. """
  256. 将模型对象转换为JSON字符串
  257. Returns:
  258. str: JSON格式的字符串
  259. """
  260. return json.dumps(self.to_dict(), ensure_ascii=False, indent=2)
  261. @classmethod
  262. def from_dict(cls, data: dict) -> 'CalendarRecord':
  263. """
  264. 从字典创建模型对象
  265. Args:
  266. data (dict): 包含字段数据的字典
  267. Returns:
  268. CalendarRecord: 创建的模型对象
  269. """
  270. # 处理时间字段
  271. created_at = data.get('created_at')
  272. if isinstance(created_at, str):
  273. try:
  274. created_at = datetime.fromisoformat(created_at.replace('Z', '+00:00'))
  275. except ValueError:
  276. created_at = datetime.utcnow()
  277. elif created_at is None:
  278. created_at = datetime.utcnow()
  279. updated_at = data.get('updated_at')
  280. if isinstance(updated_at, str):
  281. try:
  282. updated_at = datetime.fromisoformat(updated_at.replace('Z', '+00:00'))
  283. except ValueError:
  284. updated_at = datetime.utcnow()
  285. elif updated_at is None:
  286. updated_at = datetime.utcnow()
  287. return cls(
  288. openid=data.get('openid'), # type: ignore
  289. month_key=data.get('month_key'), # type: ignore
  290. calendar_content=data.get('calendar_content'), # type: ignore
  291. created_at=created_at, # type: ignore
  292. updated_at=updated_at # type: ignore
  293. )
  294. @staticmethod
  295. def validate_month_key(month_key: str) -> bool:
  296. """
  297. 验证月份格式是否正确
  298. Args:
  299. month_key (str): 月份字符串
  300. Returns:
  301. bool: 格式正确返回True,否则返回False
  302. """
  303. if not month_key or not isinstance(month_key, str):
  304. return False
  305. # 检查格式是否为YYYY-MM
  306. pattern = r'^\d{4}-\d{2}$'
  307. if not re.match(pattern, month_key):
  308. return False
  309. # 检查月份是否在有效范围内
  310. try:
  311. year, month = month_key.split('-')
  312. year_int = int(year)
  313. month_int = int(month)
  314. if year_int < 1900 or year_int > 2100:
  315. return False
  316. if month_int < 1 or month_int > 12:
  317. return False
  318. return True
  319. except (ValueError, IndexError):
  320. return False
  321. @staticmethod
  322. def validate_calendar_content(content) -> bool:
  323. """
  324. 验证日历内容格式是否正确
  325. Args:
  326. content: 日历内容
  327. Returns:
  328. bool: 格式正确返回True,否则返回False
  329. """
  330. if content is None:
  331. return False
  332. # 如果是字符串,尝试解析为JSON
  333. if isinstance(content, str):
  334. try:
  335. content = json.loads(content)
  336. except (json.JSONDecodeError, TypeError):
  337. return False
  338. # 检查是否为列表
  339. if not isinstance(content, list):
  340. return False
  341. return True
  342. class CalendarService:
  343. """
  344. 黄历信息服务类
  345. 提供黄历信息的增删改查操作
  346. """
  347. def __init__(self, engine=None):
  348. """
  349. 初始化服务
  350. Args:
  351. engine: SQLAlchemy引擎对象,如果为None则使用Flask-SQLAlchemy的db.session
  352. """
  353. self.engine = engine
  354. def create_calendar_info(self, calendar_data: dict) -> CalendarInfo:
  355. """
  356. 创建新的黄历信息记录
  357. Args:
  358. calendar_data (dict): 黄历信息数据
  359. Returns:
  360. CalendarInfo: 创建的黄历信息对象
  361. """
  362. calendar_info = CalendarInfo.from_dict(calendar_data)
  363. if self.engine:
  364. with Session(self.engine) as session:
  365. session.add(calendar_info)
  366. session.commit()
  367. session.refresh(calendar_info)
  368. else:
  369. # 使用Flask-SQLAlchemy的db.session
  370. db.session.add(calendar_info)
  371. db.session.commit()
  372. db.session.refresh(calendar_info)
  373. return calendar_info
  374. def get_calendar_by_date(self, yangli_date: date) -> Optional[CalendarInfo]:
  375. """
  376. 根据阳历日期查询黄历信息
  377. Args:
  378. yangli_date (date): 阳历日期
  379. Returns:
  380. Optional[CalendarInfo]: 黄历信息对象,如果不存在则返回None
  381. """
  382. if self.engine:
  383. with Session(self.engine) as session:
  384. return session.query(CalendarInfo).filter(
  385. CalendarInfo.yangli == yangli_date
  386. ).first()
  387. else:
  388. # 使用Flask-SQLAlchemy的db.session
  389. return CalendarInfo.query.filter(
  390. CalendarInfo.yangli == yangli_date
  391. ).first()
  392. def get_calendar_by_id(self, calendar_id: int) -> Optional[CalendarInfo]:
  393. """
  394. 根据ID查询黄历信息
  395. Args:
  396. calendar_id (int): 黄历信息ID
  397. Returns:
  398. Optional[CalendarInfo]: 黄历信息对象,如果不存在则返回None
  399. """
  400. if self.engine:
  401. with Session(self.engine) as session:
  402. return session.query(CalendarInfo).filter(
  403. CalendarInfo.id == calendar_id
  404. ).first()
  405. else:
  406. # 使用Flask-SQLAlchemy的db.session
  407. return CalendarInfo.query.filter(
  408. CalendarInfo.id == calendar_id
  409. ).first()
  410. def update_calendar_info(self, calendar_id: int, update_data: dict) -> Optional[CalendarInfo]:
  411. """
  412. 更新黄历信息
  413. Args:
  414. calendar_id (int): 黄历信息ID
  415. update_data (dict): 要更新的数据
  416. Returns:
  417. Optional[CalendarInfo]: 更新后的黄历信息对象,如果不存在则返回None
  418. """
  419. if self.engine:
  420. with Session(self.engine) as session:
  421. calendar_info = session.query(CalendarInfo).filter(
  422. CalendarInfo.id == calendar_id
  423. ).first()
  424. if not calendar_info:
  425. return None
  426. # 更新字段
  427. for key, value in update_data.items():
  428. if hasattr(calendar_info, key):
  429. if key == 'yangli' and isinstance(value, str):
  430. try:
  431. value = date.fromisoformat(value)
  432. except ValueError:
  433. continue
  434. setattr(calendar_info, key, value)
  435. session.commit()
  436. session.refresh(calendar_info)
  437. else:
  438. # 使用Flask-SQLAlchemy的db.session
  439. calendar_info = CalendarInfo.query.filter(
  440. CalendarInfo.id == calendar_id
  441. ).first()
  442. if not calendar_info:
  443. return None
  444. # 更新字段
  445. for key, value in update_data.items():
  446. if hasattr(calendar_info, key):
  447. if key == 'yangli' and isinstance(value, str):
  448. try:
  449. value = date.fromisoformat(value)
  450. except ValueError:
  451. continue
  452. setattr(calendar_info, key, value)
  453. db.session.commit()
  454. db.session.refresh(calendar_info)
  455. return calendar_info
  456. def delete_calendar_info(self, calendar_id: int) -> bool:
  457. """
  458. 删除黄历信息
  459. Args:
  460. calendar_id (int): 黄历信息ID
  461. Returns:
  462. bool: 删除成功返回True,否则返回False
  463. """
  464. if self.engine:
  465. with Session(self.engine) as session:
  466. calendar_info = session.query(CalendarInfo).filter(
  467. CalendarInfo.id == calendar_id
  468. ).first()
  469. if not calendar_info:
  470. return False
  471. session.delete(calendar_info)
  472. session.commit()
  473. else:
  474. # 使用Flask-SQLAlchemy的db.session
  475. calendar_info = CalendarInfo.query.filter(
  476. CalendarInfo.id == calendar_id
  477. ).first()
  478. if not calendar_info:
  479. return False
  480. db.session.delete(calendar_info)
  481. db.session.commit()
  482. return True
  483. def get_calendar_list(self, limit: int = 100, offset: int = 0) -> list[CalendarInfo]:
  484. """
  485. 获取黄历信息列表
  486. Args:
  487. limit (int): 限制返回数量,默认100
  488. offset (int): 偏移量,默认0
  489. Returns:
  490. list[CalendarInfo]: 黄历信息对象列表
  491. """
  492. if self.engine:
  493. with Session(self.engine) as session:
  494. return session.query(CalendarInfo).order_by(
  495. CalendarInfo.yangli.desc()
  496. ).offset(offset).limit(limit).all()
  497. else:
  498. # 使用Flask-SQLAlchemy的db.session
  499. return CalendarInfo.query.order_by(
  500. CalendarInfo.yangli.desc()
  501. ).offset(offset).limit(limit).all()
  502. def search_calendar_by_keyword(self, keyword: str, limit: int = 100) -> list[CalendarInfo]:
  503. """
  504. 根据关键词搜索黄历信息
  505. Args:
  506. keyword (str): 搜索关键词
  507. limit (int): 限制返回数量,默认100
  508. Returns:
  509. list[CalendarInfo]: 匹配的黄历信息对象列表
  510. """
  511. if self.engine:
  512. with Session(self.engine) as session:
  513. return session.query(CalendarInfo).filter(
  514. (CalendarInfo.yinli.contains(keyword)) |
  515. (CalendarInfo.wuxing.contains(keyword)) |
  516. (CalendarInfo.chongsha.contains(keyword)) |
  517. (CalendarInfo.baiji.contains(keyword)) |
  518. (CalendarInfo.jishen.contains(keyword)) |
  519. (CalendarInfo.yi.contains(keyword)) |
  520. (CalendarInfo.xiongshen.contains(keyword)) |
  521. (CalendarInfo.ji.contains(keyword)) |
  522. (CalendarInfo.color.contains(keyword))
  523. ).limit(limit).all()
  524. else:
  525. # 使用Flask-SQLAlchemy的db.session
  526. return CalendarInfo.query.filter(
  527. (CalendarInfo.yinli.contains(keyword)) |
  528. (CalendarInfo.wuxing.contains(keyword)) |
  529. (CalendarInfo.chongsha.contains(keyword)) |
  530. (CalendarInfo.baiji.contains(keyword)) |
  531. (CalendarInfo.jishen.contains(keyword)) |
  532. (CalendarInfo.yi.contains(keyword)) |
  533. (CalendarInfo.xiongshen.contains(keyword)) |
  534. (CalendarInfo.ji.contains(keyword)) |
  535. (CalendarInfo.color.contains(keyword))
  536. ).limit(limit).all()
  537. def fetch_calendar_from_api(self, yangli_date: date) -> Optional[dict]:
  538. """
  539. 从外部API获取黄历信息
  540. Args:
  541. yangli_date (date): 阳历日期
  542. Returns:
  543. Optional[dict]: API返回的黄历信息,如果失败则返回None
  544. """
  545. try:
  546. # 从配置文件获取API配置
  547. api_url = CALENDAR_API_CONFIG['url']
  548. api_key = CALENDAR_API_CONFIG['key']
  549. timeout = CALENDAR_API_CONFIG['timeout']
  550. # 格式化日期为YYYYMMDD格式
  551. date_str = yangli_date.strftime('%Y%m%d')
  552. # 请求参数
  553. request_params = {
  554. 'key': api_key,
  555. 'date': date_str,
  556. }
  557. # 发起API请求
  558. response = requests.get(api_url, params=request_params, timeout=timeout)
  559. if response.status_code == 200:
  560. response_result = response.json()
  561. # 检查API返回结果
  562. if response_result.get('error_code') == 0 and response_result.get('reason') == 'successed':
  563. return response_result.get('result')
  564. else:
  565. print(f"API返回错误: {response_result}")
  566. return None
  567. else:
  568. print(f"API请求失败,状态码: {response.status_code}")
  569. return None
  570. except requests.exceptions.RequestException as e:
  571. print(f"API请求异常: {e}")
  572. return None
  573. except Exception as e:
  574. print(f"获取API数据时发生错误: {e}")
  575. return None
  576. def save_calendar_from_api(self, api_data: dict) -> Optional[CalendarInfo]:
  577. """
  578. 将API返回的黄历信息保存到数据库
  579. Args:
  580. api_data (dict): API返回的黄历信息数据
  581. Returns:
  582. Optional[CalendarInfo]: 保存后的黄历信息对象,如果失败则返回None
  583. """
  584. try:
  585. # 解析API数据
  586. yangli_str = api_data.get('yangli')
  587. if not yangli_str:
  588. print("API数据中缺少阳历日期")
  589. return None
  590. # 解析日期
  591. try:
  592. yangli_date = date.fromisoformat(yangli_str)
  593. except ValueError:
  594. print(f"无效的日期格式: {yangli_str}")
  595. return None
  596. # 从wuxing字段中判断五行元素并设置对应的颜色值
  597. wuxing = api_data.get('wuxing', '') or ''
  598. color = api_data.get('color') # 先获取API中的color值
  599. # 如果API中没有color值,则根据wuxing字段判断五行元素设置颜色
  600. if not color:
  601. if '金' in wuxing:
  602. color = '白'
  603. elif '水' in wuxing:
  604. color = '黑'
  605. elif '木' in wuxing:
  606. color = '绿'
  607. elif '火' in wuxing:
  608. color = '红'
  609. elif '土' in wuxing:
  610. color = '黄'
  611. # 创建CalendarInfo对象
  612. calendar_info = CalendarInfo(
  613. yangli=yangli_date, # type: ignore
  614. yinli=api_data.get('yinli', ''), # type: ignore
  615. wuxing=wuxing, # type: ignore
  616. chongsha=api_data.get('chongsha'), # type: ignore
  617. baiji=api_data.get('baiji'), # type: ignore
  618. jishen=api_data.get('jishen'), # type: ignore
  619. yi=api_data.get('yi'), # type: ignore
  620. xiongshen=api_data.get('xionshen'), # type: ignore # 注意API返回的是xionshen
  621. ji=api_data.get('ji'), # type: ignore
  622. color=color # type: ignore
  623. )
  624. # 保存到数据库
  625. if self.engine:
  626. with Session(self.engine) as session:
  627. session.add(calendar_info)
  628. session.commit()
  629. session.refresh(calendar_info)
  630. else:
  631. # 使用Flask-SQLAlchemy的db.session
  632. db.session.add(calendar_info)
  633. db.session.commit()
  634. db.session.refresh(calendar_info)
  635. print(f"成功保存黄历信息到数据库,ID: {calendar_info.id}")
  636. return calendar_info
  637. except Exception as e:
  638. print(f"保存API数据到数据库时发生错误: {e}")
  639. return None
  640. class WechatUserService:
  641. """
  642. 微信用户信息服务类
  643. 提供微信用户的注册、登录、状态管理等操作
  644. """
  645. def __init__(self, engine=None):
  646. """
  647. 初始化服务
  648. Args:
  649. engine: SQLAlchemy引擎对象,如果为None则使用Flask-SQLAlchemy的db.session
  650. """
  651. self.engine = engine
  652. def create_user(self, user_data: dict) -> WechatUser:
  653. """
  654. 创建新的微信用户记录
  655. Args:
  656. user_data (dict): 用户信息数据
  657. Returns:
  658. WechatUser: 创建的用户对象
  659. """
  660. user = WechatUser.from_dict(user_data)
  661. if self.engine:
  662. with Session(self.engine) as session:
  663. session.add(user)
  664. session.commit()
  665. session.refresh(user)
  666. else:
  667. # 使用Flask-SQLAlchemy的db.session
  668. db.session.add(user)
  669. db.session.commit()
  670. db.session.refresh(user)
  671. return user
  672. def get_user_by_openid(self, openid: str) -> Optional[WechatUser]:
  673. """
  674. 根据微信openid查询用户
  675. Args:
  676. openid (str): 微信用户openid
  677. Returns:
  678. Optional[WechatUser]: 用户对象,如果不存在则返回None
  679. """
  680. if self.engine:
  681. with Session(self.engine) as session:
  682. return session.query(WechatUser).filter(
  683. WechatUser.openid == openid
  684. ).first()
  685. else:
  686. # 使用Flask-SQLAlchemy的db.session
  687. return WechatUser.query.filter(
  688. WechatUser.openid == openid
  689. ).first()
  690. def get_user_by_id(self, user_id: int) -> Optional[WechatUser]:
  691. """
  692. 根据ID查询用户
  693. Args:
  694. user_id (int): 用户ID
  695. Returns:
  696. Optional[WechatUser]: 用户对象,如果不存在则返回None
  697. """
  698. if self.engine:
  699. with Session(self.engine) as session:
  700. return session.query(WechatUser).filter(
  701. WechatUser.id == user_id
  702. ).first()
  703. else:
  704. # 使用Flask-SQLAlchemy的db.session
  705. return WechatUser.query.filter(
  706. WechatUser.id == user_id
  707. ).first()
  708. def get_user_by_phone(self, phone_number: str) -> Optional[WechatUser]:
  709. """
  710. 根据手机号查询用户
  711. Args:
  712. phone_number (str): 手机号码
  713. Returns:
  714. Optional[WechatUser]: 用户对象,如果不存在则返回None
  715. """
  716. if self.engine:
  717. with Session(self.engine) as session:
  718. return session.query(WechatUser).filter(
  719. WechatUser.phone_number == phone_number
  720. ).first()
  721. else:
  722. # 使用Flask-SQLAlchemy的db.session
  723. return WechatUser.query.filter(
  724. WechatUser.phone_number == phone_number
  725. ).first()
  726. def register_user_by_code(self, wechat_code: str, phone_number: Optional[str] = None, id_card_number: Optional[str] = None, platform: str = 'miniprogram') -> tuple[bool, Optional[WechatUser], Optional[str]]:
  727. """
  728. 通过微信授权码注册新用户或返回已存在用户
  729. 如果用户已存在,则返回现有用户信息;如果用户不存在,则创建新用户。
  730. Args:
  731. wechat_code (str): 微信授权码(15分钟有效期)
  732. phone_number (str, optional): 手机号码
  733. id_card_number (str, optional): 身份证号码
  734. platform (str): 微信平台类型,默认为小程序
  735. Returns:
  736. tuple[bool, Optional[WechatUser], Optional[str]]:
  737. (是否成功, 用户对象, 状态信息)
  738. - 新用户: (True, user, None)
  739. - 已存在用户: (True, user, "用户已存在")
  740. - 失败: (False, None, 错误信息)
  741. """
  742. try:
  743. # 使用微信code换取openid
  744. success, openid, error_msg = get_openid_from_code(wechat_code, platform)
  745. if not success or not openid:
  746. return False, None, f"获取openid失败: {error_msg}"
  747. # 验证openid格式
  748. if not validate_openid(openid):
  749. return False, None, "无效的openid格式"
  750. # 检查用户是否已存在
  751. existing_user = self.get_user_by_openid(openid)
  752. if existing_user:
  753. # 用户已存在,返回现有用户信息
  754. return True, existing_user, "用户已存在"
  755. # 创建用户数据
  756. user_data = {
  757. 'openid': openid,
  758. 'phone_number': phone_number,
  759. 'id_card_number': id_card_number,
  760. 'login_status': False,
  761. 'user_status': 'active'
  762. }
  763. user = self.create_user(user_data)
  764. return True, user, None
  765. except Exception as e:
  766. return False, None, f"注册用户时发生错误: {str(e)}"
  767. def register_user_by_openid(self, openid: str, phone_number: Optional[str] = None, id_card_number: Optional[str] = None) -> WechatUser:
  768. """
  769. 直接通过openid注册新用户(用于已知openid的情况)
  770. Args:
  771. openid (str): 微信用户openid
  772. phone_number (str, optional): 手机号码
  773. id_card_number (str, optional): 身份证号码
  774. Returns:
  775. WechatUser: 注册的用户对象
  776. Raises:
  777. ValueError: 如果用户已存在或openid无效
  778. """
  779. # 验证openid格式
  780. if not validate_openid(openid):
  781. raise ValueError(f"无效的openid格式: {openid}")
  782. # 检查用户是否已存在
  783. existing_user = self.get_user_by_openid(openid)
  784. if existing_user:
  785. raise ValueError(f"用户已存在,openid: {openid}")
  786. # 创建用户数据
  787. user_data = {
  788. 'openid': openid,
  789. 'phone_number': phone_number,
  790. 'id_card_number': id_card_number,
  791. 'login_status': False,
  792. 'user_status': 'active'
  793. }
  794. return self.create_user(user_data)
  795. def login_user_by_code(self, wechat_code: str, platform: str = 'miniprogram') -> tuple[bool, Optional[WechatUser], Optional[str]]:
  796. """
  797. 通过微信授权码进行用户登录
  798. Args:
  799. wechat_code (str): 微信授权码(15分钟有效期)
  800. platform (str): 微信平台类型,默认为小程序
  801. Returns:
  802. tuple[bool, Optional[WechatUser], Optional[str]]:
  803. (是否成功, 用户对象, 错误信息)
  804. """
  805. try:
  806. # 使用微信code换取openid
  807. success, openid, error_msg = get_openid_from_code(wechat_code, platform)
  808. if not success or not openid:
  809. return False, None, f"获取openid失败: {error_msg}"
  810. # 验证openid格式
  811. if not validate_openid(openid):
  812. return False, None, "无效的openid格式"
  813. # 查找用户
  814. user = self.get_user_by_openid(openid)
  815. if not user:
  816. return False, None, "用户不存在,请先注册"
  817. # 检查用户状态
  818. if user.user_status != 'active':
  819. return False, None, f"用户账户状态异常: {user.user_status}"
  820. # 更新登录状态和登录时间
  821. update_data = {
  822. 'login_status': True,
  823. 'login_time': datetime.utcnow()
  824. }
  825. updated_user = self.update_user(user.id, update_data)
  826. if updated_user:
  827. return True, updated_user, None
  828. else:
  829. return False, None, "更新登录状态失败"
  830. except Exception as e:
  831. return False, None, f"登录时发生错误: {str(e)}"
  832. def login_user_by_openid(self, openid: str) -> Optional[WechatUser]:
  833. """
  834. 直接通过openid进行用户登录(用于已知openid的情况)
  835. Args:
  836. openid (str): 微信用户openid
  837. Returns:
  838. Optional[WechatUser]: 登录成功返回用户对象,否则返回None
  839. """
  840. # 验证openid格式
  841. if not validate_openid(openid):
  842. return None
  843. user = self.get_user_by_openid(openid)
  844. if not user:
  845. return None
  846. # 检查用户状态
  847. if user.user_status != 'active':
  848. return None
  849. # 更新登录状态和登录时间
  850. update_data = {
  851. 'login_status': True,
  852. 'login_time': datetime.utcnow()
  853. }
  854. return self.update_user(user.id, update_data)
  855. def logout_user_by_openid(self, openid: str) -> bool:
  856. """
  857. 通过openid进行用户登出
  858. Args:
  859. openid (str): 微信用户openid
  860. Returns:
  861. bool: 登出成功返回True,否则返回False
  862. """
  863. # 验证openid格式
  864. if not validate_openid(openid):
  865. return False
  866. user = self.get_user_by_openid(openid)
  867. if not user:
  868. return False
  869. # 更新登录状态
  870. update_data = {
  871. 'login_status': False
  872. }
  873. updated_user = self.update_user(user.id, update_data)
  874. return updated_user is not None
  875. def update_user(self, user_id: int, update_data: dict) -> Optional[WechatUser]:
  876. """
  877. 更新用户信息
  878. Args:
  879. user_id (int): 用户ID
  880. update_data (dict): 要更新的数据
  881. Returns:
  882. Optional[WechatUser]: 更新后的用户对象,如果不存在则返回None
  883. """
  884. if self.engine:
  885. with Session(self.engine) as session:
  886. user = session.query(WechatUser).filter(
  887. WechatUser.id == user_id
  888. ).first()
  889. if not user:
  890. return None
  891. # 更新字段
  892. for key, value in update_data.items():
  893. if hasattr(user, key):
  894. if key in ['login_time', 'created_at', 'updated_at'] and isinstance(value, str):
  895. try:
  896. value = datetime.fromisoformat(value.replace('Z', '+00:00'))
  897. except ValueError:
  898. continue
  899. setattr(user, key, value)
  900. session.commit()
  901. session.refresh(user)
  902. else:
  903. # 使用Flask-SQLAlchemy的db.session
  904. user = WechatUser.query.filter(
  905. WechatUser.id == user_id
  906. ).first()
  907. if not user:
  908. return None
  909. # 更新字段
  910. for key, value in update_data.items():
  911. if hasattr(user, key):
  912. if key in ['login_time', 'created_at', 'updated_at'] and isinstance(value, str):
  913. try:
  914. value = datetime.fromisoformat(value.replace('Z', '+00:00'))
  915. except ValueError:
  916. continue
  917. setattr(user, key, value)
  918. db.session.commit()
  919. db.session.refresh(user)
  920. return user
  921. def deactivate_user(self, user_id: int) -> bool:
  922. """
  923. 停用用户账户
  924. Args:
  925. user_id (int): 用户ID
  926. Returns:
  927. bool: 停用成功返回True,否则返回False
  928. """
  929. update_data = {
  930. 'user_status': 'inactive',
  931. 'login_status': False
  932. }
  933. updated_user = self.update_user(user_id, update_data)
  934. return updated_user is not None
  935. def activate_user(self, user_id: int) -> bool:
  936. """
  937. 激活用户账户
  938. Args:
  939. user_id (int): 用户ID
  940. Returns:
  941. bool: 激活成功返回True,否则返回False
  942. """
  943. update_data = {
  944. 'user_status': 'active'
  945. }
  946. updated_user = self.update_user(user_id, update_data)
  947. return updated_user is not None
  948. def delete_user(self, user_id: int) -> bool:
  949. """
  950. 删除用户(软删除,将状态设为deleted)
  951. Args:
  952. user_id (int): 用户ID
  953. Returns:
  954. bool: 删除成功返回True,否则返回False
  955. """
  956. update_data = {
  957. 'user_status': 'deleted',
  958. 'login_status': False
  959. }
  960. updated_user = self.update_user(user_id, update_data)
  961. return updated_user is not None
  962. def get_active_users(self, limit: int = 100, offset: int = 0) -> list[WechatUser]:
  963. """
  964. 获取活跃用户列表
  965. Args:
  966. limit (int): 限制返回数量,默认100
  967. offset (int): 偏移量,默认0
  968. Returns:
  969. list[WechatUser]: 活跃用户对象列表
  970. """
  971. if self.engine:
  972. with Session(self.engine) as session:
  973. return session.query(WechatUser).filter(
  974. WechatUser.user_status == 'active'
  975. ).order_by(
  976. WechatUser.created_at.desc()
  977. ).offset(offset).limit(limit).all()
  978. else:
  979. # 使用Flask-SQLAlchemy的db.session
  980. return WechatUser.query.filter(
  981. WechatUser.user_status == 'active'
  982. ).order_by(
  983. WechatUser.created_at.desc()
  984. ).offset(offset).limit(limit).all()
  985. def get_logged_in_users(self, limit: int = 100) -> list[WechatUser]:
  986. """
  987. 获取当前已登录的用户列表
  988. Args:
  989. limit (int): 限制返回数量,默认100
  990. Returns:
  991. list[WechatUser]: 已登录用户对象列表
  992. """
  993. if self.engine:
  994. with Session(self.engine) as session:
  995. return session.query(WechatUser).filter(
  996. WechatUser.login_status == True,
  997. WechatUser.user_status == 'active'
  998. ).order_by(
  999. WechatUser.login_time.desc()
  1000. ).limit(limit).all()
  1001. else:
  1002. # 使用Flask-SQLAlchemy的db.session
  1003. return WechatUser.query.filter(
  1004. WechatUser.login_status == True,
  1005. WechatUser.user_status == 'active'
  1006. ).order_by(
  1007. WechatUser.login_time.desc()
  1008. ).limit(limit).all()
  1009. # 便捷函数
  1010. def create_calendar_info(calendar_data: dict, engine=None) -> CalendarInfo:
  1011. """
  1012. 创建黄历信息的便捷函数
  1013. Args:
  1014. calendar_data (dict): 黄历信息数据
  1015. engine: SQLAlchemy引擎对象
  1016. Returns:
  1017. CalendarInfo: 创建的黄历信息对象
  1018. """
  1019. service = CalendarService(engine)
  1020. return service.create_calendar_info(calendar_data)
  1021. def get_calendar_by_date(yangli_date: str, engine=None) -> dict:
  1022. """
  1023. 根据阳历日期查询黄历信息的便捷函数
  1024. Args:
  1025. yangli_date (str): 阳历日期,格式为YYYY-MM-DD
  1026. engine: SQLAlchemy引擎对象
  1027. Returns:
  1028. dict: 包含查询结果的JSON格式数据
  1029. """
  1030. try:
  1031. # 验证日期格式
  1032. if not isinstance(yangli_date, str) or len(yangli_date) != 10:
  1033. return {
  1034. "reason": "failed",
  1035. "return_code": 400,
  1036. "result": None,
  1037. "error": "日期格式错误,请使用YYYY-MM-DD格式"
  1038. }
  1039. # 解析日期字符串
  1040. try:
  1041. parsed_date = date.fromisoformat(yangli_date)
  1042. except ValueError:
  1043. return {
  1044. "reason": "failed",
  1045. "return_code": 400,
  1046. "result": None,
  1047. "error": "无效的日期格式"
  1048. }
  1049. # 查询数据库
  1050. service = CalendarService(engine)
  1051. calendar_info = service.get_calendar_by_date(parsed_date)
  1052. if calendar_info:
  1053. # 查询成功,返回指定格式的JSON数据
  1054. return {
  1055. "reason": "successed",
  1056. "return_code": 200,
  1057. "result": {
  1058. "id": str(calendar_info.id),
  1059. "yangli": calendar_info.yangli.isoformat() if calendar_info.yangli is not None else None,
  1060. "yinli": calendar_info.yinli,
  1061. "wuxing": calendar_info.wuxing,
  1062. "chongsha": calendar_info.chongsha,
  1063. "baiji": calendar_info.baiji,
  1064. "jishen": calendar_info.jishen,
  1065. "yi": calendar_info.yi,
  1066. "xiongshen": calendar_info.xiongshen,
  1067. "ji": calendar_info.ji,
  1068. "color": calendar_info.color
  1069. }
  1070. }
  1071. else:
  1072. # 数据库中没有找到记录,尝试从API获取
  1073. print(f"数据库中没有找到日期 {yangli_date} 的黄历信息,尝试从API获取...")
  1074. # 从API获取数据
  1075. api_data = service.fetch_calendar_from_api(parsed_date)
  1076. if api_data:
  1077. # API获取成功,保存到数据库
  1078. print("API获取数据成功,正在保存到数据库...")
  1079. saved_calendar = service.save_calendar_from_api(api_data)
  1080. if saved_calendar:
  1081. # 保存成功,返回数据
  1082. return {
  1083. "reason": "successed",
  1084. "return_code": 200,
  1085. "result": {
  1086. "id": str(saved_calendar.id),
  1087. "yangli": saved_calendar.yangli.isoformat() if saved_calendar.yangli is not None else None,
  1088. "yinli": saved_calendar.yinli,
  1089. "wuxing": saved_calendar.wuxing,
  1090. "chongsha": saved_calendar.chongsha,
  1091. "baiji": saved_calendar.baiji,
  1092. "jishen": saved_calendar.jishen,
  1093. "yi": saved_calendar.yi,
  1094. "xiongshen": saved_calendar.xiongshen,
  1095. "ji": saved_calendar.ji,
  1096. "color": saved_calendar.color
  1097. }
  1098. }
  1099. else:
  1100. # 保存到数据库失败
  1101. return {
  1102. "reason": "failed",
  1103. "return_code": 500,
  1104. "result": None,
  1105. "error": f"API获取数据成功但保存到数据库失败"
  1106. }
  1107. else:
  1108. # API获取失败
  1109. return {
  1110. "reason": "failed",
  1111. "return_code": 404,
  1112. "result": None,
  1113. "error": f"未找到日期 {yangli_date} 的黄历信息,且API获取失败"
  1114. }
  1115. except Exception as e:
  1116. # 发生异常
  1117. return {
  1118. "reason": "failed",
  1119. "return_code": 500,
  1120. "result": None,
  1121. "error": f"查询过程中发生错误: {str(e)}"
  1122. }
  1123. def get_calendar_by_id(calendar_id: int, engine=None) -> Optional[CalendarInfo]:
  1124. """
  1125. 根据ID查询黄历信息的便捷函数
  1126. Args:
  1127. calendar_id (int): 黄历信息ID
  1128. engine: SQLAlchemy引擎对象
  1129. Returns:
  1130. Optional[CalendarInfo]: 黄历信息对象
  1131. """
  1132. service = CalendarService(engine)
  1133. return service.get_calendar_by_id(calendar_id)
  1134. class CalendarRecordService:
  1135. """
  1136. 日历内容记录服务类
  1137. 提供日历内容记录的增删改查操作
  1138. """
  1139. def __init__(self, engine=None):
  1140. """
  1141. 初始化服务
  1142. Args:
  1143. engine: SQLAlchemy引擎对象,如果为None则使用Flask-SQLAlchemy的db.session
  1144. """
  1145. self.engine = engine
  1146. def save_calendar_record(self, data: dict) -> tuple[bool, Optional[CalendarRecord], Optional[str]]:
  1147. """
  1148. 保存日历记录(插入或更新)
  1149. Args:
  1150. data (dict): 包含openid, month_key, calendar_content的字典
  1151. Returns:
  1152. tuple[bool, Optional[CalendarRecord], Optional[str]]:
  1153. (是否成功, 记录对象, 错误信息)
  1154. """
  1155. try:
  1156. # 验证必需参数
  1157. openid = data.get('openid')
  1158. month_key = data.get('month_key')
  1159. calendar_content = data.get('calendar_content')
  1160. if not openid:
  1161. return False, None, "缺少必需参数: openid"
  1162. if not month_key:
  1163. return False, None, "缺少必需参数: month_key"
  1164. if calendar_content is None:
  1165. return False, None, "缺少必需参数: calendar_content"
  1166. # 验证openid格式
  1167. if not validate_openid(openid):
  1168. return False, None, f"无效的openid格式: {openid}"
  1169. # 验证月份格式
  1170. if not CalendarRecord.validate_month_key(month_key):
  1171. return False, None, f"无效的月份格式: {month_key}"
  1172. # 验证日历内容格式
  1173. if not CalendarRecord.validate_calendar_content(calendar_content):
  1174. return False, None, "无效的日历内容格式"
  1175. if self.engine:
  1176. with Session(self.engine) as session:
  1177. # 查找是否已存在记录
  1178. existing_record = session.query(CalendarRecord).filter(
  1179. CalendarRecord.openid == openid,
  1180. CalendarRecord.month_key == month_key
  1181. ).first()
  1182. if existing_record:
  1183. # 更新现有记录
  1184. existing_record.calendar_content = calendar_content
  1185. existing_record.updated_at = datetime.utcnow()
  1186. session.commit()
  1187. session.refresh(existing_record)
  1188. return True, existing_record, None
  1189. else:
  1190. # 创建新记录
  1191. new_record = CalendarRecord(
  1192. openid=openid,
  1193. month_key=month_key,
  1194. calendar_content=calendar_content
  1195. )
  1196. session.add(new_record)
  1197. session.commit()
  1198. session.refresh(new_record)
  1199. return True, new_record, None
  1200. else:
  1201. # 使用Flask-SQLAlchemy的db.session
  1202. existing_record = CalendarRecord.query.filter(
  1203. CalendarRecord.openid == openid,
  1204. CalendarRecord.month_key == month_key
  1205. ).first()
  1206. if existing_record:
  1207. # 更新现有记录
  1208. existing_record.calendar_content = calendar_content
  1209. existing_record.updated_at = datetime.utcnow()
  1210. db.session.commit()
  1211. db.session.refresh(existing_record)
  1212. return True, existing_record, None
  1213. else:
  1214. # 创建新记录
  1215. new_record = CalendarRecord(
  1216. openid=openid,
  1217. month_key=month_key,
  1218. calendar_content=calendar_content
  1219. )
  1220. db.session.add(new_record)
  1221. db.session.commit()
  1222. db.session.refresh(new_record)
  1223. return True, new_record, None
  1224. except Exception as e:
  1225. return False, None, f"保存日历记录时发生错误: {str(e)}"
  1226. def get_calendar_record(self, openid: str, month_key: str) -> tuple[bool, Optional[CalendarRecord], Optional[str]]:
  1227. """
  1228. 查找日历记录
  1229. Args:
  1230. openid (str): 微信用户openid
  1231. month_key (str): 月份标识(YYYY-MM格式)
  1232. Returns:
  1233. tuple[bool, Optional[CalendarRecord], Optional[str]]:
  1234. (是否成功, 记录对象或None, 错误信息)
  1235. """
  1236. try:
  1237. # 验证参数
  1238. if not openid:
  1239. return False, None, "缺少必需参数: openid"
  1240. if not month_key:
  1241. return False, None, "缺少必需参数: month_key"
  1242. # 验证openid格式
  1243. if not validate_openid(openid):
  1244. return False, None, f"无效的openid格式: {openid}"
  1245. # 验证月份格式
  1246. if not CalendarRecord.validate_month_key(month_key):
  1247. return False, None, f"无效的月份格式: {month_key}"
  1248. if self.engine:
  1249. with Session(self.engine) as session:
  1250. record = session.query(CalendarRecord).filter(
  1251. CalendarRecord.openid == openid,
  1252. CalendarRecord.month_key == month_key
  1253. ).first()
  1254. return True, record, None
  1255. else:
  1256. # 使用Flask-SQLAlchemy的db.session
  1257. record = CalendarRecord.query.filter(
  1258. CalendarRecord.openid == openid,
  1259. CalendarRecord.month_key == month_key
  1260. ).first()
  1261. return True, record, None
  1262. except Exception as e:
  1263. return False, None, f"查找日历记录时发生错误: {str(e)}"
  1264. def get_user_records(self, openid: str, limit: int = 12) -> tuple[bool, list[CalendarRecord], Optional[str]]:
  1265. """
  1266. 获取用户的所有日历记录
  1267. Args:
  1268. openid (str): 微信用户openid
  1269. limit (int): 限制返回数量,默认12(一年的月份数)
  1270. Returns:
  1271. tuple[bool, list[CalendarRecord], Optional[str]]:
  1272. (是否成功, 记录列表, 错误信息)
  1273. """
  1274. try:
  1275. # 验证参数
  1276. if not openid:
  1277. return False, [], "缺少必需参数: openid"
  1278. # 验证openid格式
  1279. if not validate_openid(openid):
  1280. return False, [], f"无效的openid格式: {openid}"
  1281. if self.engine:
  1282. with Session(self.engine) as session:
  1283. records = session.query(CalendarRecord).filter(
  1284. CalendarRecord.openid == openid
  1285. ).order_by(
  1286. CalendarRecord.month_key.desc()
  1287. ).limit(limit).all()
  1288. return True, records, None
  1289. else:
  1290. # 使用Flask-SQLAlchemy的db.session
  1291. records = CalendarRecord.query.filter(
  1292. CalendarRecord.openid == openid
  1293. ).order_by(
  1294. CalendarRecord.month_key.desc()
  1295. ).limit(limit).all()
  1296. return True, records, None
  1297. except Exception as e:
  1298. return False, [], f"获取用户日历记录时发生错误: {str(e)}"
  1299. def delete_calendar_record(self, openid: str, month_key: str) -> tuple[bool, Optional[str]]:
  1300. """
  1301. 删除日历记录
  1302. Args:
  1303. openid (str): 微信用户openid
  1304. month_key (str): 月份标识(YYYY-MM格式)
  1305. Returns:
  1306. tuple[bool, Optional[str]]: (是否成功, 错误信息)
  1307. """
  1308. try:
  1309. # 验证参数
  1310. if not openid:
  1311. return False, "缺少必需参数: openid"
  1312. if not month_key:
  1313. return False, "缺少必需参数: month_key"
  1314. # 验证openid格式
  1315. if not validate_openid(openid):
  1316. return False, f"无效的openid格式: {openid}"
  1317. # 验证月份格式
  1318. if not CalendarRecord.validate_month_key(month_key):
  1319. return False, f"无效的月份格式: {month_key}"
  1320. if self.engine:
  1321. with Session(self.engine) as session:
  1322. record = session.query(CalendarRecord).filter(
  1323. CalendarRecord.openid == openid,
  1324. CalendarRecord.month_key == month_key
  1325. ).first()
  1326. if record:
  1327. session.delete(record)
  1328. session.commit()
  1329. return True, None
  1330. else:
  1331. return False, "记录不存在"
  1332. else:
  1333. # 使用Flask-SQLAlchemy的db.session
  1334. record = CalendarRecord.query.filter(
  1335. CalendarRecord.openid == openid,
  1336. CalendarRecord.month_key == month_key
  1337. ).first()
  1338. if record:
  1339. db.session.delete(record)
  1340. db.session.commit()
  1341. return True, None
  1342. else:
  1343. return False, "记录不存在"
  1344. except Exception as e:
  1345. return False, f"删除日历记录时发生错误: {str(e)}"
  1346. # 微信用户相关便捷函数
  1347. def register_wechat_user(wechat_code: str, phone_number: Optional[str] = None, id_card_number: Optional[str] = None, platform: str = 'miniprogram', engine=None) -> dict:
  1348. """
  1349. 注册微信用户的便捷函数
  1350. 如果用户已存在,则返回现有用户信息;如果用户不存在,则创建新用户。
  1351. Args:
  1352. wechat_code (str): 微信授权码(15分钟有效期)
  1353. phone_number (str, optional): 手机号码
  1354. id_card_number (str, optional): 身份证号码
  1355. platform (str): 微信平台类型,默认为小程序
  1356. engine: SQLAlchemy引擎对象
  1357. Returns:
  1358. dict: 包含注册结果的JSON格式数据
  1359. - 新用户注册成功: return_code=201, is_new_user=True
  1360. - 用户已存在: return_code=200, is_new_user=False
  1361. - 注册失败: return_code=400/500
  1362. """
  1363. try:
  1364. # 验证必填参数
  1365. if not wechat_code or not isinstance(wechat_code, str):
  1366. return {
  1367. "reason": "failed",
  1368. "return_code": 400,
  1369. "result": None,
  1370. "error": "微信授权码不能为空"
  1371. }
  1372. # 创建服务实例
  1373. service = WechatUserService(engine)
  1374. # 尝试注册用户
  1375. success, user, error_msg = service.register_user_by_code(wechat_code, phone_number, id_card_number, platform)
  1376. if success and user:
  1377. # 检查是否为已存在用户
  1378. if error_msg == "用户已存在":
  1379. # 用户已存在,返回现有用户信息
  1380. return {
  1381. "reason": "successed",
  1382. "return_code": 200,
  1383. "result": {
  1384. "id": str(user.id),
  1385. "openid": user.openid,
  1386. "phone_number": user.phone_number,
  1387. "id_card_number": user.id_card_number,
  1388. "login_status": user.login_status,
  1389. "user_status": user.user_status,
  1390. "created_at": user.created_at.isoformat() if user.created_at is not None else None,
  1391. "updated_at": user.updated_at.isoformat() if user.updated_at is not None else None,
  1392. "is_new_user": False
  1393. },
  1394. "message": "用户已存在,返回现有用户信息"
  1395. }
  1396. else:
  1397. # 新用户注册成功
  1398. return {
  1399. "reason": "successed",
  1400. "return_code": 201,
  1401. "result": {
  1402. "id": str(user.id),
  1403. "openid": user.openid,
  1404. "phone_number": user.phone_number,
  1405. "id_card_number": user.id_card_number,
  1406. "login_status": user.login_status,
  1407. "user_status": user.user_status,
  1408. "created_at": user.created_at.isoformat() if user.created_at is not None else None,
  1409. "updated_at": user.updated_at.isoformat() if user.updated_at is not None else None,
  1410. "is_new_user": True
  1411. },
  1412. "message": "新用户注册成功"
  1413. }
  1414. else:
  1415. # 注册失败
  1416. return {
  1417. "reason": "failed",
  1418. "return_code": 400,
  1419. "result": None,
  1420. "error": error_msg or "注册失败"
  1421. }
  1422. except Exception as e:
  1423. # 其他系统错误
  1424. return {
  1425. "reason": "failed",
  1426. "return_code": 500,
  1427. "result": None,
  1428. "error": f"注册过程中发生错误: {str(e)}"
  1429. }
  1430. def login_wechat_user(wechat_code: str, platform: str = 'miniprogram', engine=None) -> dict:
  1431. """
  1432. 微信用户登录的便捷函数
  1433. Args:
  1434. wechat_code (str): 微信授权码(15分钟有效期)
  1435. platform (str): 微信平台类型,默认为小程序
  1436. engine: SQLAlchemy引擎对象
  1437. Returns:
  1438. dict: 包含登录结果的JSON格式数据
  1439. """
  1440. try:
  1441. # 验证必填参数
  1442. if not wechat_code or not isinstance(wechat_code, str):
  1443. return {
  1444. "reason": "failed",
  1445. "return_code": 400,
  1446. "result": None,
  1447. "error": "微信授权码不能为空"
  1448. }
  1449. # 创建服务实例
  1450. service = WechatUserService(engine)
  1451. # 尝试登录
  1452. success, user, error_msg = service.login_user_by_code(wechat_code, platform)
  1453. if success and user:
  1454. # 登录成功
  1455. return {
  1456. "reason": "successed",
  1457. "return_code": 200,
  1458. "result": {
  1459. "id": str(user.id),
  1460. "openid": user.openid,
  1461. "phone_number": user.phone_number,
  1462. "id_card_number": user.id_card_number,
  1463. "login_status": user.login_status,
  1464. "login_time": user.login_time.isoformat() if user.login_time is not None else None,
  1465. "user_status": user.user_status
  1466. }
  1467. }
  1468. else:
  1469. # 登录失败
  1470. error_code = 404 if "不存在" in (error_msg or "") else 401
  1471. return {
  1472. "reason": "failed",
  1473. "return_code": error_code,
  1474. "result": None,
  1475. "error": error_msg or "登录失败"
  1476. }
  1477. except Exception as e:
  1478. # 系统错误
  1479. return {
  1480. "reason": "failed",
  1481. "return_code": 500,
  1482. "result": None,
  1483. "error": f"登录过程中发生错误: {str(e)}"
  1484. }
  1485. def logout_wechat_user(openid: str, engine=None) -> dict:
  1486. """
  1487. 微信用户登出的便捷函数
  1488. Args:
  1489. openid (str): 微信用户openid
  1490. engine: SQLAlchemy引擎对象
  1491. Returns:
  1492. dict: 包含登出结果的JSON格式数据
  1493. """
  1494. try:
  1495. # 验证必填参数
  1496. if not openid or not isinstance(openid, str):
  1497. return {
  1498. "reason": "failed",
  1499. "return_code": 400,
  1500. "result": None,
  1501. "error": "微信用户openid不能为空"
  1502. }
  1503. # 创建服务实例
  1504. service = WechatUserService(engine)
  1505. # 尝试登出
  1506. success = service.logout_user_by_openid(openid)
  1507. if success:
  1508. # 登出成功
  1509. return {
  1510. "reason": "successed",
  1511. "return_code": 200,
  1512. "result": {
  1513. "message": "用户已成功登出"
  1514. }
  1515. }
  1516. else:
  1517. # 登出失败
  1518. return {
  1519. "reason": "failed",
  1520. "return_code": 404,
  1521. "result": None,
  1522. "error": "用户不存在或openid无效"
  1523. }
  1524. except Exception as e:
  1525. # 系统错误
  1526. return {
  1527. "reason": "failed",
  1528. "return_code": 500,
  1529. "result": None,
  1530. "error": f"登出过程中发生错误: {str(e)}"
  1531. }
  1532. def get_wechat_user_info(openid: str, engine=None) -> dict:
  1533. """
  1534. 获取微信用户信息的便捷函数
  1535. Args:
  1536. openid (str): 微信用户openid
  1537. engine: SQLAlchemy引擎对象
  1538. Returns:
  1539. dict: 包含用户信息的JSON格式数据
  1540. """
  1541. try:
  1542. # 验证必填参数
  1543. if not openid or not isinstance(openid, str):
  1544. return {
  1545. "reason": "failed",
  1546. "return_code": 400,
  1547. "result": None,
  1548. "error": "微信用户openid不能为空"
  1549. }
  1550. # 创建服务实例
  1551. service = WechatUserService(engine)
  1552. # 查询用户信息
  1553. user = service.get_user_by_openid(openid)
  1554. if user:
  1555. # 查询成功
  1556. return {
  1557. "reason": "successed",
  1558. "return_code": 200,
  1559. "result": {
  1560. "id": str(user.id),
  1561. "openid": user.openid,
  1562. "phone_number": user.phone_number,
  1563. "id_card_number": user.id_card_number,
  1564. "login_status": user.login_status,
  1565. "login_time": user.login_time.isoformat() if user.login_time is not None else None,
  1566. "user_status": user.user_status,
  1567. "created_at": user.created_at.isoformat() if user.created_at is not None else None,
  1568. "updated_at": user.updated_at.isoformat() if user.updated_at is not None else None
  1569. }
  1570. }
  1571. else:
  1572. # 用户不存在
  1573. return {
  1574. "reason": "failed",
  1575. "return_code": 404,
  1576. "result": None,
  1577. "error": f"未找到openid为 {openid} 的用户"
  1578. }
  1579. except Exception as e:
  1580. # 系统错误
  1581. return {
  1582. "reason": "failed",
  1583. "return_code": 500,
  1584. "result": None,
  1585. "error": f"查询过程中发生错误: {str(e)}"
  1586. }
  1587. def update_wechat_user_info(openid: str, update_data: dict, engine=None) -> dict:
  1588. """
  1589. 更新微信用户信息的便捷函数
  1590. Args:
  1591. openid (str): 微信用户openid
  1592. update_data (dict): 要更新的数据
  1593. engine: SQLAlchemy引擎对象
  1594. Returns:
  1595. dict: 包含更新结果的JSON格式数据
  1596. """
  1597. try:
  1598. # 验证必填参数
  1599. if not openid or not isinstance(openid, str):
  1600. return {
  1601. "reason": "failed",
  1602. "return_code": 400,
  1603. "result": None,
  1604. "error": "微信用户openid不能为空"
  1605. }
  1606. if not update_data or not isinstance(update_data, dict):
  1607. return {
  1608. "reason": "failed",
  1609. "return_code": 400,
  1610. "result": None,
  1611. "error": "更新数据不能为空"
  1612. }
  1613. # 创建服务实例
  1614. service = WechatUserService(engine)
  1615. # 先查找用户
  1616. user = service.get_user_by_openid(openid)
  1617. if not user:
  1618. return {
  1619. "reason": "failed",
  1620. "return_code": 404,
  1621. "result": None,
  1622. "error": f"未找到openid为 {openid} 的用户"
  1623. }
  1624. # 更新用户信息
  1625. updated_user = service.update_user(user.id, update_data)
  1626. if updated_user:
  1627. # 更新成功
  1628. return {
  1629. "reason": "successed",
  1630. "return_code": 200,
  1631. "result": {
  1632. "id": str(updated_user.id),
  1633. "openid": updated_user.openid,
  1634. "phone_number": updated_user.phone_number,
  1635. "id_card_number": updated_user.id_card_number,
  1636. "login_status": updated_user.login_status,
  1637. "login_time": updated_user.login_time.isoformat() if updated_user.login_time is not None else None,
  1638. "user_status": updated_user.user_status,
  1639. "created_at": updated_user.created_at.isoformat() if updated_user.created_at is not None else None,
  1640. "updated_at": updated_user.updated_at.isoformat() if updated_user.updated_at is not None else None
  1641. }
  1642. }
  1643. else:
  1644. # 更新失败
  1645. return {
  1646. "reason": "failed",
  1647. "return_code": 500,
  1648. "result": None,
  1649. "error": "更新用户信息失败"
  1650. }
  1651. except Exception as e:
  1652. # 系统错误
  1653. return {
  1654. "reason": "failed",
  1655. "return_code": 500,
  1656. "result": None,
  1657. "error": f"更新过程中发生错误: {str(e)}"
  1658. }
  1659. # 日历内容记录相关便捷函数
  1660. def save_calendar_record(data: dict, engine=None) -> dict:
  1661. """
  1662. 保存日历记录的便捷函数
  1663. Args:
  1664. data (dict): 包含openid, month_key, calendar_content的字典
  1665. engine: SQLAlchemy引擎对象
  1666. Returns:
  1667. dict: 包含保存结果的JSON格式数据
  1668. """
  1669. try:
  1670. # 验证必填参数
  1671. if not data or not isinstance(data, dict):
  1672. return {
  1673. "reason": "failed",
  1674. "return_code": 400,
  1675. "result": None,
  1676. "error": "请求数据不能为空"
  1677. }
  1678. # 创建服务实例
  1679. service = CalendarRecordService(engine)
  1680. # 尝试保存记录
  1681. success, record, error_msg = service.save_calendar_record(data)
  1682. if success and record:
  1683. # 保存成功
  1684. return {
  1685. "reason": "successed",
  1686. "return_code": 200,
  1687. "result": {
  1688. "id": record.id,
  1689. "openid": record.openid,
  1690. "month_key": record.month_key,
  1691. "calendar_content": record.calendar_content,
  1692. "created_at": record.created_at.isoformat() if record.created_at else None,
  1693. "updated_at": record.updated_at.isoformat() if record.updated_at else None
  1694. }
  1695. }
  1696. else:
  1697. # 保存失败
  1698. error_code = 400
  1699. return {
  1700. "reason": "failed",
  1701. "return_code": error_code,
  1702. "result": None,
  1703. "error": error_msg or "保存失败"
  1704. }
  1705. except Exception as e:
  1706. # 其他系统错误
  1707. return {
  1708. "reason": "failed",
  1709. "return_code": 500,
  1710. "result": None,
  1711. "error": f"保存过程中发生错误: {str(e)}"
  1712. }
  1713. def get_calendar_record(openid: str, month_key: str, engine=None) -> dict:
  1714. """
  1715. 查找日历记录的便捷函数
  1716. Args:
  1717. openid (str): 微信用户openid
  1718. month_key (str): 月份标识(YYYY-MM格式)
  1719. engine: SQLAlchemy引擎对象
  1720. Returns:
  1721. dict: 包含查询结果的JSON格式数据
  1722. """
  1723. try:
  1724. # 验证必填参数
  1725. if not openid:
  1726. return {
  1727. "reason": "failed",
  1728. "return_code": 400,
  1729. "result": None,
  1730. "error": "缺少必需参数: openid"
  1731. }
  1732. if not month_key:
  1733. return {
  1734. "reason": "failed",
  1735. "return_code": 400,
  1736. "result": None,
  1737. "error": "缺少必需参数: month_key"
  1738. }
  1739. # 创建服务实例
  1740. service = CalendarRecordService(engine)
  1741. # 尝试查找记录
  1742. success, record, error_msg = service.get_calendar_record(openid, month_key)
  1743. if success:
  1744. if record:
  1745. # 查找成功,有记录
  1746. return {
  1747. "reason": "successed",
  1748. "return_code": 200,
  1749. "result": {
  1750. "id": record.id,
  1751. "openid": record.openid,
  1752. "month_key": record.month_key,
  1753. "calendar_content": record.calendar_content,
  1754. "created_at": record.created_at.isoformat() if record.created_at else None,
  1755. "updated_at": record.updated_at.isoformat() if record.updated_at else None
  1756. }
  1757. }
  1758. else:
  1759. # 查找成功,但无记录
  1760. return {
  1761. "reason": "successed",
  1762. "return_code": 200,
  1763. "result": {
  1764. "id": None,
  1765. "openid": openid,
  1766. "month_key": month_key,
  1767. "calendar_content": [],
  1768. "created_at": None,
  1769. "updated_at": None
  1770. }
  1771. }
  1772. else:
  1773. # 查找失败
  1774. return {
  1775. "reason": "failed",
  1776. "return_code": 400,
  1777. "result": None,
  1778. "error": error_msg or "查找失败"
  1779. }
  1780. except Exception as e:
  1781. # 系统错误
  1782. return {
  1783. "reason": "failed",
  1784. "return_code": 500,
  1785. "result": None,
  1786. "error": f"查找过程中发生错误: {str(e)}"
  1787. }
  1788. def get_user_calendar_records(openid: str, limit: int = 12, engine=None) -> dict:
  1789. """
  1790. 获取用户所有日历记录的便捷函数
  1791. Args:
  1792. openid (str): 微信用户openid
  1793. limit (int): 限制返回数量,默认12
  1794. engine: SQLAlchemy引擎对象
  1795. Returns:
  1796. dict: 包含查询结果的JSON格式数据
  1797. """
  1798. try:
  1799. # 验证必填参数
  1800. if not openid:
  1801. return {
  1802. "reason": "failed",
  1803. "return_code": 400,
  1804. "result": None,
  1805. "error": "缺少必需参数: openid"
  1806. }
  1807. # 创建服务实例
  1808. service = CalendarRecordService(engine)
  1809. # 尝试获取记录
  1810. success, records, error_msg = service.get_user_records(openid, limit)
  1811. if success:
  1812. # 转换记录为字典列表
  1813. result_records = []
  1814. for record in records:
  1815. result_records.append({
  1816. "id": record.id,
  1817. "openid": record.openid,
  1818. "month_key": record.month_key,
  1819. "calendar_content": record.calendar_content,
  1820. "created_at": record.created_at.isoformat() if record.created_at else None,
  1821. "updated_at": record.updated_at.isoformat() if record.updated_at else None
  1822. })
  1823. return {
  1824. "reason": "successed",
  1825. "return_code": 200,
  1826. "result": {
  1827. "openid": openid,
  1828. "total": len(result_records),
  1829. "records": result_records
  1830. }
  1831. }
  1832. else:
  1833. # 获取失败
  1834. return {
  1835. "reason": "failed",
  1836. "return_code": 400,
  1837. "result": None,
  1838. "error": error_msg or "获取记录失败"
  1839. }
  1840. except Exception as e:
  1841. # 系统错误
  1842. return {
  1843. "reason": "failed",
  1844. "return_code": 500,
  1845. "result": None,
  1846. "error": f"获取记录过程中发生错误: {str(e)}"
  1847. }
  1848. def delete_calendar_record(openid: str, month_key: str, engine=None) -> dict:
  1849. """
  1850. 删除日历记录的便捷函数
  1851. Args:
  1852. openid (str): 微信用户openid
  1853. month_key (str): 月份标识(YYYY-MM格式)
  1854. engine: SQLAlchemy引擎对象
  1855. Returns:
  1856. dict: 包含删除结果的JSON格式数据
  1857. """
  1858. try:
  1859. # 验证必填参数
  1860. if not openid:
  1861. return {
  1862. "reason": "failed",
  1863. "return_code": 400,
  1864. "result": None,
  1865. "error": "缺少必需参数: openid"
  1866. }
  1867. if not month_key:
  1868. return {
  1869. "reason": "failed",
  1870. "return_code": 400,
  1871. "result": None,
  1872. "error": "缺少必需参数: month_key"
  1873. }
  1874. # 创建服务实例
  1875. service = CalendarRecordService(engine)
  1876. # 尝试删除记录
  1877. success, error_msg = service.delete_calendar_record(openid, month_key)
  1878. if success:
  1879. # 删除成功
  1880. return {
  1881. "reason": "successed",
  1882. "return_code": 200,
  1883. "result": {
  1884. "message": f"已删除 {openid} 在 {month_key} 的日历记录"
  1885. }
  1886. }
  1887. else:
  1888. # 删除失败
  1889. error_code = 404 if "不存在" in (error_msg or "") else 400
  1890. return {
  1891. "reason": "failed",
  1892. "return_code": error_code,
  1893. "result": None,
  1894. "error": error_msg or "删除失败"
  1895. }
  1896. except Exception as e:
  1897. # 系统错误
  1898. return {
  1899. "reason": "failed",
  1900. "return_code": 500,
  1901. "result": None,
  1902. "error": f"删除过程中发生错误: {str(e)}"
  1903. }
  1904. # 导出主要类和函数
  1905. __all__ = [
  1906. 'CalendarInfo',
  1907. 'WechatUser',
  1908. 'CalendarRecord',
  1909. 'CalendarService',
  1910. 'WechatUserService',
  1911. 'CalendarRecordService',
  1912. 'create_calendar_info',
  1913. 'get_calendar_by_date',
  1914. 'get_calendar_by_id',
  1915. 'register_wechat_user',
  1916. 'login_wechat_user',
  1917. 'logout_wechat_user',
  1918. 'get_wechat_user_info',
  1919. 'update_wechat_user_info',
  1920. 'save_calendar_record',
  1921. 'get_calendar_record',
  1922. 'get_user_calendar_records',
  1923. 'delete_calendar_record'
  1924. ]