123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101 |
- # import json
- # import threading
- #
- # from flask import Flask
- # from nacos import NacosClient
- #
- # app = Flask(__name__)
- #
- # class nacos:
- # def __init__(self,address,namespace,username,password,data_id,group):
- # self.SERVER_ADDRESSES = address
- # self.NAMESPACE = namespace
- # self.USERNAME = username
- # self.PASSWORD = password
- # self.client = NacosClient(server_addresses=self.SERVER_ADDRESSES, namespace=self.NAMESPACE)
- # self.data_id = data_id
- # self.group = group
- # def load_config(self):
- # """从Nacos加载配置并更新应用状态"""
- # config_data = self.client.get_config(data_id=self.data_id, group=self.group)
- # app.config['CONFIG_DATA'] = config_data
- # print(config_data)
- # return json.loads(config_data)
- #
- # def on_config_change(self,cfg):
- # """当Nacos中的配置发生变化时,更新应用配置中的配置数据"""
- # config_data = json.loads(cfg['content']) # Parse the JSON string to a dictionary
- # app.config['CONFIG_DATA'] = config_data
- # return config_data
- #
- # def watch_config_changes(self):
- # try:
- # self.client.add_config_watcher(data_id='python-mendunr', group='DEFAULT_GROUP', cb=self.on_config_change)
- # self.client.add_naming_instance(service_name='python-mendunr',
- # ip='192.168.3.175',
- # port=3333,
- # group_name='DEFAULT_GROUP',
- # ephemeral=False)
- # except Exception as e:
- # raise e
- #
- #
- # # def start_background_tasks(self):
- # # """启动后台任务线程来监控配置变化"""
- # # t = threading.Thread(target=self.watch_config_changes)
- # # t.daemon = True
- # # t.start()
- # # # self.watch_config_changes()
- #
- # nacos_single = nacos(address="192.168.3.80:8848",namespace = "dev",username = 'nacos',
- # password ='nacos',data_id = 'python-mendunr',group = 'DEFAULT_GROUP')
- # nacos_single.watch_config_changes()
- # configs = nacos_single.load_config()
- import json
- from flask import Flask, request
- from nacos import NacosClient
- app = Flask(__name__)
- class NacosConfig:
- def __init__(self, address, namespace, username, password, data_id, group):
- self.SERVER_ADDRESSES = address
- self.NAMESPACE = namespace
- self.USERNAME = username
- self.PASSWORD = password
- self.client = NacosClient(server_addresses=self.SERVER_ADDRESSES, namespace=self.NAMESPACE,
- username=self.USERNAME, password=self.PASSWORD)
- self.data_id = data_id
- self.group = group
- self.config_data = None
- def load_config(self):
- """从Nacos加载配置"""
- config_str = self.client.get_config(data_id=self.data_id, group=self.group)
- self.config_data = json.loads(config_str) if config_str else {}
- return self.config_data
- def get_config(self):
- """返回当前加载的配置数据"""
- if self.config_data is None:
- self.load_config()
- return self.config_data
- nacos_config = NacosConfig(address="192.168.3.80:8848", namespace="dev", username='nacos', password='nacos',
- data_id='python-mendunr', group='DEFAULT_GROUP')
- configs = nacos_config.get_config()
- @app.route('/config')
- def get_config_route():
- """返回当前配置,或者可以选择在这里重新加载配置"""
- # 可以在这里选择是否重新加载配置,或者根据需要进行其他逻辑处理
- # nacos_config.load_config() # 如果需要每次请求都检查更新,可以取消注释这行代码
- return json.dumps(nacos_config.get_config())
- if __name__ == '__main__':
- app.run(debug=True)
|