nacos_config.py 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101
  1. # import json
  2. # import threading
  3. #
  4. # from flask import Flask
  5. # from nacos import NacosClient
  6. #
  7. # app = Flask(__name__)
  8. #
  9. # class nacos:
  10. # def __init__(self,address,namespace,username,password,data_id,group):
  11. # self.SERVER_ADDRESSES = address
  12. # self.NAMESPACE = namespace
  13. # self.USERNAME = username
  14. # self.PASSWORD = password
  15. # self.client = NacosClient(server_addresses=self.SERVER_ADDRESSES, namespace=self.NAMESPACE)
  16. # self.data_id = data_id
  17. # self.group = group
  18. # def load_config(self):
  19. # """从Nacos加载配置并更新应用状态"""
  20. # config_data = self.client.get_config(data_id=self.data_id, group=self.group)
  21. # app.config['CONFIG_DATA'] = config_data
  22. # print(config_data)
  23. # return json.loads(config_data)
  24. #
  25. # def on_config_change(self,cfg):
  26. # """当Nacos中的配置发生变化时,更新应用配置中的配置数据"""
  27. # config_data = json.loads(cfg['content']) # Parse the JSON string to a dictionary
  28. # app.config['CONFIG_DATA'] = config_data
  29. # return config_data
  30. #
  31. # def watch_config_changes(self):
  32. # try:
  33. # self.client.add_config_watcher(data_id='python-mendunr', group='DEFAULT_GROUP', cb=self.on_config_change)
  34. # self.client.add_naming_instance(service_name='python-mendunr',
  35. # ip='192.168.3.175',
  36. # port=3333,
  37. # group_name='DEFAULT_GROUP',
  38. # ephemeral=False)
  39. # except Exception as e:
  40. # raise e
  41. #
  42. #
  43. # # def start_background_tasks(self):
  44. # # """启动后台任务线程来监控配置变化"""
  45. # # t = threading.Thread(target=self.watch_config_changes)
  46. # # t.daemon = True
  47. # # t.start()
  48. # # # self.watch_config_changes()
  49. #
  50. # nacos_single = nacos(address="192.168.3.80:8848",namespace = "dev",username = 'nacos',
  51. # password ='nacos',data_id = 'python-mendunr',group = 'DEFAULT_GROUP')
  52. # nacos_single.watch_config_changes()
  53. # configs = nacos_single.load_config()
  54. import json
  55. from flask import Flask, request
  56. from nacos import NacosClient
  57. app = Flask(__name__)
  58. class NacosConfig:
  59. def __init__(self, address, namespace, username, password, data_id, group):
  60. self.SERVER_ADDRESSES = address
  61. self.NAMESPACE = namespace
  62. self.USERNAME = username
  63. self.PASSWORD = password
  64. self.client = NacosClient(server_addresses=self.SERVER_ADDRESSES, namespace=self.NAMESPACE,
  65. username=self.USERNAME, password=self.PASSWORD)
  66. self.data_id = data_id
  67. self.group = group
  68. self.config_data = None
  69. def load_config(self):
  70. """从Nacos加载配置"""
  71. config_str = self.client.get_config(data_id=self.data_id, group=self.group)
  72. self.config_data = json.loads(config_str) if config_str else {}
  73. return self.config_data
  74. def get_config(self):
  75. """返回当前加载的配置数据"""
  76. if self.config_data is None:
  77. self.load_config()
  78. return self.config_data
  79. nacos_config = NacosConfig(address="192.168.3.80:8848", namespace="dev", username='nacos', password='nacos',
  80. data_id='python-mendunr', group='DEFAULT_GROUP')
  81. configs = nacos_config.get_config()
  82. @app.route('/config')
  83. def get_config_route():
  84. """返回当前配置,或者可以选择在这里重新加载配置"""
  85. # 可以在这里选择是否重新加载配置,或者根据需要进行其他逻辑处理
  86. # nacos_config.load_config() # 如果需要每次请求都检查更新,可以取消注释这行代码
  87. return json.dumps(nacos_config.get_config())
  88. if __name__ == '__main__':
  89. app.run(debug=True)