| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140 | import nacosimport socketimport loggingimport timeimport osimport requestsimport threadingclass NacosService:    CLIENT = None    DATA_ID = None    GROUP = None    LOCAL_IP = None    SERVER_ADDRESSES = None    NAMESPACE = None    logger = logging.getLogger('app')  # 这里设置名字    __instance = None    _registered = False  # 添加注册标志位    _stop_event = threading.Event()  # 用于控制心跳任务的停止    def __init__(self):        if os.name == 'posix':  # 'posix' 表示类Unix系统,包括Linux            default_address = "127.0.0.1:8848"        else:            default_address = "192.168.3.80:8848"        self.LOCAL_IP = str(os.getenv("NACOS_REGISTER_IP", default=socket.gethostbyname(socket.gethostname())))        self.SERVER_ADDRESSES = default_address        self.NAMESPACE = "dev"        self.DATA_ID = "python-mendunr"        self.GROUP = "DEFAULT_GROUP"    def __new__(cls, *args, **kwargs):        if NacosService.__instance is None:            NacosService.__instance = object.__new__(cls, *args, **kwargs)        return NacosService.__instance    ### 获取配置    def getConfig(self):        return self.CLIENT.get_config(self.DATA_ID, self.GROUP)    ### 注册实例    def register_instance(self):        if self._registered:            return        res = requests.post(url=f"http://{self.SERVER_ADDRESSES}/nacos/v1/ns/instance",                            data={                                'groupName': self.GROUP,                                'metadata': '{"preserved.register.source":"SPRING_CLOUD","preserved.heart.beat.interval": 5500, "preserved.heart.beat.timeout": 30000, "preserved.ip.delete.timeout": 60000}',                                'namespaceId': self.NAMESPACE,                                'port': 3333,                                'enable': True,                                'healthy': True,                                'clusterName': 'DEFAULT',                                'ip': self.LOCAL_IP,                                'weight': 1,                                'ephemeral': False,  # 修改这里为 False                                'serviceName': self.GROUP + "@@" + self.DATA_ID                            })        if res.status_code == 200:            self.logger.info("register_instance res %s", str(res.content, 'UTF-8'))            self._registered = True  # 设置注册成功标志位        else:            self.logger.error(f"Failed to register instance: {res.status_code} - {res.text}")    def send_heartbeat(self):        while not self._stop_event.is_set():            try:                res = requests.put(url=f"http://{self.SERVER_ADDRESSES}/nacos/v1/ns/instance/beat",                                   data={                                       "ip": self.LOCAL_IP,                                       "port": "3333",                                       "namespaceId": self.NAMESPACE,                                       "serviceName": self.GROUP + "@@" + self.DATA_ID,                                       "clusterName": "DEFAULT"}                                   )                if res.status_code == 200:                    res_json = res.json()                    if res_json["code"] == 20404:  ### 找不到资源重新注册                        self.register_instance()                else:                    self.logger.error(f"Failed to send heartbeat: {res.status_code} - {res.text}")                self._stop_event.wait(4)  ## 4秒执行一次            except Exception as e:                self.logger.error(f"send_heartbeat error: {e}")    def getServer(self, serverName):        try:            res = requests.get(url=f"http://{self.SERVER_ADDRESSES}/nacos/v1/ns/instance/list",                               params={'namespaceId': self.NAMESPACE, 'serviceName': self.GROUP + "@@" + serverName})            if res.status_code == 200:                hosts = res.json()["hosts"]                if len(hosts) > 0:                    return hosts[0]            else:                self.logger.error(f"Failed to get server list: {res.status_code} - {res.text}")        except Exception as e:            self.logger.error(f"getServer error: {e}")        return None    def start(self):        try:            self.CLIENT = nacos.NacosClient(self.SERVER_ADDRESSES, namespace=self.NAMESPACE)            self.register_instance()            self.heartbeat_thread = threading.Thread(target=self.send_heartbeat)            self.heartbeat_thread.start()        except Exception as e:            self.logger.error(f"start error: {e}")    def stop(self):        self._stop_event.set()        self.heartbeat_thread.join()    def get_nacos_configs(self):        url = f"http://{self.SERVER_ADDRESSES}/nacos/v1/cs/configs"        params = {            'tenant': self.NAMESPACE,            'dataId': self.DATA_ID,            'group': self.GROUP        }        try:            r = requests.get(url, params)            r.encoding = r.apparent_encoding            if r.status_code == 200:                r_dict = r.json()                return r_dict            else:                self.logger.error(f"Failed to get Nacos configs: {r.status_code} - {r.text}")        except Exception as e:            self.logger.error(f"get_nacos_configs error: {e}")        return Nonenacos_service_single = NacosService()# 发送url请求,获取nacos上的配置def get_nacos_configs():    return nacos_service_single.get_nacos_configs()
 |