auth.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377
  1. """
  2. 系统用户认证模块
  3. 提供用户注册、登录验证等功能
  4. """
  5. import base64
  6. import logging
  7. import time
  8. import uuid
  9. from functools import wraps
  10. from urllib.parse import unquote, urlparse
  11. import psycopg2
  12. from flask import current_app, jsonify, request
  13. logger = logging.getLogger(__name__)
  14. # PostgreSQL连接池
  15. pg_pool = None
  16. def get_pg_connection():
  17. """
  18. 获取PostgreSQL数据库连接
  19. Returns:
  20. connection: PostgreSQL连接对象
  21. """
  22. global pg_pool
  23. if pg_pool is None:
  24. try:
  25. # 解析SQLAlchemy URI,处理包含特殊字符的密码
  26. db_uri = current_app.config["SQLALCHEMY_DATABASE_URI"]
  27. # 尝试使用urlparse解析
  28. uri = urlparse(db_uri)
  29. # 如果解析失败(缺少用户名或主机名)或密码包含特殊字符导致解析错误,使用手动解析
  30. if uri.username is None or uri.hostname is None:
  31. # 手动解析URI: postgresql://username:password@host:port/database
  32. scheme_end = db_uri.find("://")
  33. if scheme_end == -1:
  34. raise ValueError("Invalid database URI format")
  35. auth_and_host = db_uri[scheme_end + 3 :] # 跳过 '://'
  36. at_pos = auth_and_host.rfind("@") # 从右向左查找最后一个@
  37. if at_pos == -1:
  38. raise ValueError("Invalid database URI: missing @ separator")
  39. auth_part = auth_and_host[:at_pos]
  40. host_part = auth_and_host[at_pos + 1 :]
  41. # 解析用户名和密码(可能包含特殊字符)
  42. colon_pos = auth_part.find(":")
  43. if colon_pos == -1:
  44. username = unquote(auth_part)
  45. password = None
  46. else:
  47. username = unquote(auth_part[:colon_pos])
  48. password = unquote(auth_part[colon_pos + 1 :])
  49. # 解析主机、端口和数据库
  50. slash_pos = host_part.find("/")
  51. if slash_pos == -1:
  52. raise ValueError("Invalid database URI: missing database name")
  53. host_port = host_part[:slash_pos]
  54. database = unquote(host_part[slash_pos + 1 :])
  55. # 解析主机和端口
  56. colon_pos = host_port.find(":")
  57. if colon_pos == -1:
  58. hostname = host_port
  59. port = 5432
  60. else:
  61. hostname = host_port[:colon_pos]
  62. port = int(host_port[colon_pos + 1 :])
  63. else:
  64. # urlparse解析成功,解码可能被URL编码的字段
  65. username = unquote(uri.username) if uri.username else None
  66. password = unquote(uri.password) if uri.password else None
  67. database = (
  68. unquote(uri.path[1:]) if uri.path and len(uri.path) > 1 else None
  69. )
  70. hostname = uri.hostname
  71. port = uri.port or 5432
  72. # 验证必需的字段(username, database, hostname 是必需的,password 是可选的)
  73. if not all([username, database, hostname]):
  74. raise ValueError(
  75. "Missing required database connection parameters: username, database, and hostname are required"
  76. )
  77. # 创建连接池
  78. pg_pool = psycopg2.pool.SimpleConnectionPool( # type: ignore[attr-defined]
  79. 1,
  80. 20,
  81. host=hostname,
  82. database=database,
  83. user=username,
  84. password=password,
  85. port=str(port),
  86. )
  87. logger.info("PostgreSQL连接池初始化成功")
  88. except Exception as e:
  89. logger.error(f"PostgreSQL连接池初始化失败: {str(e)}")
  90. raise
  91. return pg_pool.getconn()
  92. def release_pg_connection(conn):
  93. """
  94. 释放PostgreSQL连接到连接池
  95. Args:
  96. conn: 数据库连接对象
  97. """
  98. global pg_pool
  99. if pg_pool and conn:
  100. pg_pool.putconn(conn)
  101. def encode_password(password):
  102. """
  103. 对密码进行base64编码
  104. Args:
  105. password: 原始密码
  106. Returns:
  107. str: 编码后的密码
  108. """
  109. return base64.b64encode(password.encode("utf-8")).decode("utf-8")
  110. def create_user_table():
  111. """
  112. 创建用户表,如果不存在
  113. Returns:
  114. bool: 是否成功创建
  115. """
  116. conn = None
  117. try:
  118. conn = get_pg_connection()
  119. cursor = conn.cursor()
  120. # 创建用户表
  121. create_table_query = """
  122. CREATE TABLE IF NOT EXISTS users (
  123. id VARCHAR(100) PRIMARY KEY,
  124. username VARCHAR(50) UNIQUE NOT NULL,
  125. password VARCHAR(100) NOT NULL,
  126. created_at FLOAT NOT NULL,
  127. last_login FLOAT,
  128. is_admin BOOLEAN DEFAULT FALSE
  129. );
  130. """
  131. cursor.execute(create_table_query)
  132. # 创建索引加速查询
  133. create_index_query = """
  134. CREATE INDEX IF NOT EXISTS idx_users_username ON users(username);
  135. """
  136. cursor.execute(create_index_query)
  137. conn.commit()
  138. cursor.close()
  139. logger.info("用户表创建成功")
  140. return True
  141. except Exception as e:
  142. logger.error(f"创建用户表失败: {str(e)}")
  143. if conn:
  144. conn.rollback()
  145. return False
  146. finally:
  147. if conn:
  148. release_pg_connection(conn)
  149. def register_user(username, password):
  150. """
  151. 注册新用户
  152. Args:
  153. username: 用户名
  154. password: 密码
  155. Returns:
  156. tuple: (是否成功, 消息)
  157. """
  158. conn = None
  159. try:
  160. # 确保表已创建
  161. create_user_table()
  162. # 对密码进行编码
  163. encoded_password = encode_password(password)
  164. # 生成用户ID
  165. user_id = str(uuid.uuid4())
  166. conn = get_pg_connection()
  167. cursor = conn.cursor()
  168. # 检查用户名是否存在
  169. check_query = "SELECT username FROM users WHERE username = %s"
  170. cursor.execute(check_query, (username,))
  171. if cursor.fetchone():
  172. return False, "用户名已存在"
  173. # 创建用户
  174. insert_query = """
  175. INSERT INTO users (id, username, password, created_at, last_login)
  176. VALUES (%s, %s, %s, %s, %s)
  177. """
  178. cursor.execute(
  179. insert_query, (user_id, username, encoded_password, time.time(), None)
  180. )
  181. conn.commit()
  182. cursor.close()
  183. return True, "注册成功"
  184. except Exception as e:
  185. logger.error(f"用户注册失败: {str(e)}")
  186. if conn:
  187. conn.rollback()
  188. return False, f"注册失败: {str(e)}"
  189. finally:
  190. if conn:
  191. release_pg_connection(conn)
  192. def login_user(username, password):
  193. """
  194. 用户登录验证
  195. Args:
  196. username: 用户名
  197. password: 密码
  198. Returns:
  199. tuple: (是否成功, 用户信息/错误消息)
  200. """
  201. conn = None
  202. try:
  203. # 对输入的密码进行编码
  204. encoded_password = encode_password(password)
  205. conn = get_pg_connection()
  206. cursor = conn.cursor()
  207. # 查询用户
  208. query = """
  209. SELECT id, username, password, created_at, last_login, is_admin
  210. FROM users WHERE username = %s
  211. """
  212. cursor.execute(query, (username,))
  213. user = cursor.fetchone()
  214. # 检查用户是否存在
  215. if not user:
  216. return False, "用户名或密码错误"
  217. # 验证密码
  218. if user[2] != encoded_password:
  219. return False, "用户名或密码错误"
  220. # 更新最后登录时间
  221. current_time = time.time()
  222. update_query = """
  223. UPDATE users SET last_login = %s WHERE username = %s
  224. """
  225. cursor.execute(update_query, (current_time, username))
  226. conn.commit()
  227. # 构建用户信息
  228. user_info = {
  229. "id": user[0],
  230. "username": user[1],
  231. "created_at": user[3],
  232. "last_login": current_time,
  233. "is_admin": user[5] if len(user) > 5 else False,
  234. }
  235. cursor.close()
  236. return True, user_info
  237. except Exception as e:
  238. logger.error(f"用户登录失败: {str(e)}")
  239. if conn:
  240. conn.rollback()
  241. return False, f"登录失败: {str(e)}"
  242. finally:
  243. if conn:
  244. release_pg_connection(conn)
  245. def get_user_by_username(username):
  246. """
  247. 根据用户名获取用户信息
  248. Args:
  249. username: 用户名
  250. Returns:
  251. dict: 用户信息(不包含密码)
  252. """
  253. conn = None
  254. try:
  255. conn = get_pg_connection()
  256. cursor = conn.cursor()
  257. query = """
  258. SELECT id, username, created_at, last_login, is_admin
  259. FROM users WHERE username = %s
  260. """
  261. cursor.execute(query, (username,))
  262. user = cursor.fetchone()
  263. cursor.close()
  264. if not user:
  265. return None
  266. user_info = {
  267. "id": user[0],
  268. "username": user[1],
  269. "created_at": user[2],
  270. "last_login": user[3],
  271. "is_admin": user[4] if user[4] is not None else False,
  272. }
  273. return user_info
  274. except Exception as e:
  275. logger.error(f"获取用户信息失败: {str(e)}")
  276. return None
  277. finally:
  278. if conn:
  279. release_pg_connection(conn)
  280. def init_db():
  281. """
  282. 初始化数据库,创建用户表
  283. Returns:
  284. bool: 是否成功初始化
  285. """
  286. return create_user_table()
  287. def require_auth(f):
  288. @wraps(f)
  289. def decorated(*args, **kwargs):
  290. auth_header = request.headers.get("Authorization")
  291. if not auth_header:
  292. return jsonify({"message": "缺少认证头"}), 401
  293. try:
  294. # 验证认证头
  295. if auth_header != current_app.config["SECRET_KEY"]:
  296. return jsonify({"message": "无效的认证信息"}), 401
  297. return f(*args, **kwargs)
  298. except Exception:
  299. return jsonify({"message": "认证失败"}), 401
  300. return decorated