123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140 |
- # import nacos
- # import socket
- # import logging
- # import time
- # import os
- # import requests
- # import threading
- #
- # class NacosService:
- # CLIENT = None
- # DATA_ID = None
- # GROUP = None
- # LOCAL_IP = None
- # SERVER_ADDRESSES = None
- # NAMESPACE = None
- # logger = logging.getLogger('app') # 这里设置名字
- #
- # __instance = None
- # _registered = False # 添加注册标志位
- # _stop_event = threading.Event() # 用于控制心跳任务的停止
- #
- # def __init__(self):
- # if os.name == 'posix': # 'posix' 表示类Unix系统,包括Linux
- # default_address = "127.0.0.1:8848"
- # else:
- # default_address = "192.168.3.80:8848"
- # self.LOCAL_IP = str(os.getenv("NACOS_REGISTER_IP", default=socket.gethostbyname(socket.gethostname())))
- # self.SERVER_ADDRESSES = default_address
- # self.NAMESPACE = "dev"
- # self.DATA_ID = "python-mendunr"
- # self.GROUP = "DEFAULT_GROUP"
- #
- # def __new__(cls, *args, **kwargs):
- # if NacosService.__instance is None:
- # NacosService.__instance = object.__new__(cls, *args, **kwargs)
- # return NacosService.__instance
- #
- # ### 获取配置
- # def getConfig(self):
- # return self.CLIENT.get_config(self.DATA_ID, self.GROUP)
- #
- # ### 注册实例
- # def register_instance(self):
- # if self._registered:
- # return
- #
- # res = requests.post(url=f"http://{self.SERVER_ADDRESSES}/nacos/v1/ns/instance",
- # data={
- # 'groupName': self.GROUP,
- # 'metadata': '{"preserved.register.source":"SPRING_CLOUD","preserved.heart.beat.interval": 5500, "preserved.heart.beat.timeout": 30000, "preserved.ip.delete.timeout": 60000}',
- # 'namespaceId': self.NAMESPACE,
- # 'port': 3333,
- # 'enable': True,
- # 'healthy': True,
- # 'clusterName': 'DEFAULT',
- # 'ip': self.LOCAL_IP,
- # 'weight': 1,
- # 'ephemeral': False, # 修改这里为 False
- # 'serviceName': self.GROUP + "@@" + self.DATA_ID
- # })
- # if res.status_code == 200:
- # self.logger.info("register_instance res %s", str(res.content, 'UTF-8'))
- # self._registered = True # 设置注册成功标志位
- # else:
- # self.logger.error(f"Failed to register instance: {res.status_code} - {res.text}")
- #
- # def send_heartbeat(self):
- # while not self._stop_event.is_set():
- # try:
- # res = requests.put(url=f"http://{self.SERVER_ADDRESSES}/nacos/v1/ns/instance/beat",
- # data={
- # "ip": self.LOCAL_IP,
- # "port": "3333",
- # "namespaceId": self.NAMESPACE,
- # "serviceName": self.GROUP + "@@" + self.DATA_ID,
- # "clusterName": "DEFAULT"}
- # )
- # if res.status_code == 200:
- # res_json = res.json()
- # if res_json["code"] == 20404: ### 找不到资源重新注册
- # self.register_instance()
- # else:
- # self.logger.error(f"Failed to send heartbeat: {res.status_code} - {res.text}")
- # self._stop_event.wait(4) ## 4秒执行一次
- # except Exception as e:
- # self.logger.error(f"send_heartbeat error: {e}")
- #
- # def getServer(self, serverName):
- # try:
- # res = requests.get(url=f"http://{self.SERVER_ADDRESSES}/nacos/v1/ns/instance/list",
- # params={'namespaceId': self.NAMESPACE, 'serviceName': self.GROUP + "@@" + serverName})
- # if res.status_code == 200:
- # hosts = res.json()["hosts"]
- # if len(hosts) > 0:
- # return hosts[0]
- # else:
- # self.logger.error(f"Failed to get server list: {res.status_code} - {res.text}")
- # except Exception as e:
- # self.logger.error(f"getServer error: {e}")
- # return None
- #
- # def start(self):
- # try:
- # self.CLIENT = nacos.NacosClient(self.SERVER_ADDRESSES, namespace=self.NAMESPACE)
- # self.register_instance()
- # self.heartbeat_thread = threading.Thread(target=self.send_heartbeat)
- # self.heartbeat_thread.start()
- # except Exception as e:
- # self.logger.error(f"start error: {e}")
- #
- # def stop(self):
- # self._stop_event.set()
- # self.heartbeat_thread.join()
- #
- # def get_nacos_configs(self):
- # url = f"http://{self.SERVER_ADDRESSES}/nacos/v1/cs/configs"
- # params = {
- # 'tenant': self.NAMESPACE,
- # 'dataId': self.DATA_ID,
- # 'group': self.GROUP
- # }
- # try:
- # r = requests.get(url, params)
- # r.encoding = r.apparent_encoding
- # if r.status_code == 200:
- # r_dict = r.json()
- # return r_dict
- # else:
- # self.logger.error(f"Failed to get Nacos configs: {r.status_code} - {r.text}")
- # except Exception as e:
- # self.logger.error(f"get_nacos_configs error: {e}")
- # return None
- #
- #
- # nacos_service_single = NacosService()
- #
- #
- # # 发送url请求,获取nacos上的配置
- # def get_nacos_configs():
- # return nacos_service_single.get_nacos_configs()
|