auth.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364
  1. """
  2. 系统用户认证模块
  3. 提供用户注册、登录验证等功能
  4. """
  5. import logging
  6. import base64
  7. import time
  8. import uuid
  9. import psycopg2
  10. from psycopg2 import pool
  11. from urllib.parse import urlparse, unquote
  12. from flask import current_app, request, jsonify
  13. from functools import wraps
  14. logger = logging.getLogger(__name__)
  15. # PostgreSQL连接池
  16. pg_pool = None
  17. def get_pg_connection():
  18. """
  19. 获取PostgreSQL数据库连接
  20. Returns:
  21. connection: PostgreSQL连接对象
  22. """
  23. global pg_pool
  24. if pg_pool is None:
  25. try:
  26. # 解析SQLAlchemy URI,处理包含特殊字符的密码
  27. db_uri = current_app.config['SQLALCHEMY_DATABASE_URI']
  28. # 尝试使用urlparse解析
  29. uri = urlparse(db_uri)
  30. # 如果解析失败(缺少用户名或主机名)或密码包含特殊字符导致解析错误,使用手动解析
  31. if uri.username is None or uri.hostname is None:
  32. # 手动解析URI: postgresql://username:password@host:port/database
  33. scheme_end = db_uri.find('://')
  34. if scheme_end == -1:
  35. raise ValueError("Invalid database URI format")
  36. auth_and_host = db_uri[scheme_end + 3:] # 跳过 '://'
  37. at_pos = auth_and_host.rfind('@') # 从右向左查找最后一个@
  38. if at_pos == -1:
  39. raise ValueError("Invalid database URI: missing @ separator")
  40. auth_part = auth_and_host[:at_pos]
  41. host_part = auth_and_host[at_pos + 1:]
  42. # 解析用户名和密码(可能包含特殊字符)
  43. colon_pos = auth_part.find(':')
  44. if colon_pos == -1:
  45. username = unquote(auth_part)
  46. password = None
  47. else:
  48. username = unquote(auth_part[:colon_pos])
  49. password = unquote(auth_part[colon_pos + 1:])
  50. # 解析主机、端口和数据库
  51. slash_pos = host_part.find('/')
  52. if slash_pos == -1:
  53. raise ValueError("Invalid database URI: missing database name")
  54. host_port = host_part[:slash_pos]
  55. database = unquote(host_part[slash_pos + 1:])
  56. # 解析主机和端口
  57. colon_pos = host_port.find(':')
  58. if colon_pos == -1:
  59. hostname = host_port
  60. port = 5432
  61. else:
  62. hostname = host_port[:colon_pos]
  63. port = int(host_port[colon_pos + 1:])
  64. else:
  65. # urlparse解析成功,解码可能被URL编码的字段
  66. username = unquote(uri.username) if uri.username else None
  67. password = unquote(uri.password) if uri.password else None
  68. database = unquote(uri.path[1:]) if uri.path and len(uri.path) > 1 else None
  69. hostname = uri.hostname
  70. port = uri.port or 5432
  71. # 验证必需的字段(username, database, hostname 是必需的,password 是可选的)
  72. if not all([username, database, hostname]):
  73. raise ValueError("Missing required database connection parameters: username, database, and hostname are required")
  74. # 创建连接池
  75. pg_pool = psycopg2.pool.SimpleConnectionPool(
  76. 1, 20,
  77. host=hostname,
  78. database=database,
  79. user=username,
  80. password=password,
  81. port=str(port)
  82. )
  83. logger.info("PostgreSQL连接池初始化成功")
  84. except Exception as e:
  85. logger.error(f"PostgreSQL连接池初始化失败: {str(e)}")
  86. raise
  87. return pg_pool.getconn()
  88. def release_pg_connection(conn):
  89. """
  90. 释放PostgreSQL连接到连接池
  91. Args:
  92. conn: 数据库连接对象
  93. """
  94. global pg_pool
  95. if pg_pool and conn:
  96. pg_pool.putconn(conn)
  97. def encode_password(password):
  98. """
  99. 对密码进行base64编码
  100. Args:
  101. password: 原始密码
  102. Returns:
  103. str: 编码后的密码
  104. """
  105. return base64.b64encode(password.encode('utf-8')).decode('utf-8')
  106. def create_user_table():
  107. """
  108. 创建用户表,如果不存在
  109. Returns:
  110. bool: 是否成功创建
  111. """
  112. conn = None
  113. try:
  114. conn = get_pg_connection()
  115. cursor = conn.cursor()
  116. # 创建用户表
  117. create_table_query = """
  118. CREATE TABLE IF NOT EXISTS users (
  119. id VARCHAR(100) PRIMARY KEY,
  120. username VARCHAR(50) UNIQUE NOT NULL,
  121. password VARCHAR(100) NOT NULL,
  122. created_at FLOAT NOT NULL,
  123. last_login FLOAT,
  124. is_admin BOOLEAN DEFAULT FALSE
  125. );
  126. """
  127. cursor.execute(create_table_query)
  128. # 创建索引加速查询
  129. create_index_query = """
  130. CREATE INDEX IF NOT EXISTS idx_users_username ON users(username);
  131. """
  132. cursor.execute(create_index_query)
  133. conn.commit()
  134. cursor.close()
  135. logger.info("用户表创建成功")
  136. return True
  137. except Exception as e:
  138. logger.error(f"创建用户表失败: {str(e)}")
  139. if conn:
  140. conn.rollback()
  141. return False
  142. finally:
  143. if conn:
  144. release_pg_connection(conn)
  145. def register_user(username, password):
  146. """
  147. 注册新用户
  148. Args:
  149. username: 用户名
  150. password: 密码
  151. Returns:
  152. tuple: (是否成功, 消息)
  153. """
  154. conn = None
  155. try:
  156. # 确保表已创建
  157. create_user_table()
  158. # 对密码进行编码
  159. encoded_password = encode_password(password)
  160. # 生成用户ID
  161. user_id = str(uuid.uuid4())
  162. conn = get_pg_connection()
  163. cursor = conn.cursor()
  164. # 检查用户名是否存在
  165. check_query = "SELECT username FROM users WHERE username = %s"
  166. cursor.execute(check_query, (username,))
  167. if cursor.fetchone():
  168. return False, "用户名已存在"
  169. # 创建用户
  170. insert_query = """
  171. INSERT INTO users (id, username, password, created_at, last_login)
  172. VALUES (%s, %s, %s, %s, %s)
  173. """
  174. cursor.execute(
  175. insert_query,
  176. (user_id, username, encoded_password, time.time(), None)
  177. )
  178. conn.commit()
  179. cursor.close()
  180. return True, "注册成功"
  181. except Exception as e:
  182. logger.error(f"用户注册失败: {str(e)}")
  183. if conn:
  184. conn.rollback()
  185. return False, f"注册失败: {str(e)}"
  186. finally:
  187. if conn:
  188. release_pg_connection(conn)
  189. def login_user(username, password):
  190. """
  191. 用户登录验证
  192. Args:
  193. username: 用户名
  194. password: 密码
  195. Returns:
  196. tuple: (是否成功, 用户信息/错误消息)
  197. """
  198. conn = None
  199. try:
  200. # 对输入的密码进行编码
  201. encoded_password = encode_password(password)
  202. conn = get_pg_connection()
  203. cursor = conn.cursor()
  204. # 查询用户
  205. query = """
  206. SELECT id, username, password, created_at, last_login, is_admin
  207. FROM users WHERE username = %s
  208. """
  209. cursor.execute(query, (username,))
  210. user = cursor.fetchone()
  211. # 检查用户是否存在
  212. if not user:
  213. return False, "用户名或密码错误"
  214. # 验证密码
  215. if user[2] != encoded_password:
  216. return False, "用户名或密码错误"
  217. # 更新最后登录时间
  218. current_time = time.time()
  219. update_query = """
  220. UPDATE users SET last_login = %s WHERE username = %s
  221. """
  222. cursor.execute(update_query, (current_time, username))
  223. conn.commit()
  224. # 构建用户信息
  225. user_info = {
  226. "id": user[0],
  227. "username": user[1],
  228. "created_at": user[3],
  229. "last_login": current_time,
  230. "is_admin": user[5] if len(user) > 5 else False
  231. }
  232. cursor.close()
  233. return True, user_info
  234. except Exception as e:
  235. logger.error(f"用户登录失败: {str(e)}")
  236. if conn:
  237. conn.rollback()
  238. return False, f"登录失败: {str(e)}"
  239. finally:
  240. if conn:
  241. release_pg_connection(conn)
  242. def get_user_by_username(username):
  243. """
  244. 根据用户名获取用户信息
  245. Args:
  246. username: 用户名
  247. Returns:
  248. dict: 用户信息(不包含密码)
  249. """
  250. conn = None
  251. try:
  252. conn = get_pg_connection()
  253. cursor = conn.cursor()
  254. query = """
  255. SELECT id, username, created_at, last_login, is_admin
  256. FROM users WHERE username = %s
  257. """
  258. cursor.execute(query, (username,))
  259. user = cursor.fetchone()
  260. cursor.close()
  261. if not user:
  262. return None
  263. user_info = {
  264. "id": user[0],
  265. "username": user[1],
  266. "created_at": user[2],
  267. "last_login": user[3],
  268. "is_admin": user[4] if user[4] is not None else False
  269. }
  270. return user_info
  271. except Exception as e:
  272. logger.error(f"获取用户信息失败: {str(e)}")
  273. return None
  274. finally:
  275. if conn:
  276. release_pg_connection(conn)
  277. def init_db():
  278. """
  279. 初始化数据库,创建用户表
  280. Returns:
  281. bool: 是否成功初始化
  282. """
  283. return create_user_table()
  284. def require_auth(f):
  285. @wraps(f)
  286. def decorated(*args, **kwargs):
  287. auth_header = request.headers.get('Authorization')
  288. if not auth_header:
  289. return jsonify({'message': '缺少认证头'}), 401
  290. try:
  291. # 验证认证头
  292. if auth_header != current_app.config['SECRET_KEY']:
  293. return jsonify({'message': '无效的认证信息'}), 401
  294. return f(*args, **kwargs)
  295. except Exception as e:
  296. return jsonify({'message': '认证失败'}), 401
  297. return decorated