nacos_bak.py 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140
  1. import nacos
  2. import socket
  3. import logging
  4. import time
  5. import os
  6. import requests
  7. import threading
  8. class NacosService:
  9. CLIENT = None
  10. DATA_ID = None
  11. GROUP = None
  12. LOCAL_IP = None
  13. SERVER_ADDRESSES = None
  14. NAMESPACE = None
  15. logger = logging.getLogger('app') # 这里设置名字
  16. __instance = None
  17. _registered = False # 添加注册标志位
  18. _stop_event = threading.Event() # 用于控制心跳任务的停止
  19. def __init__(self):
  20. if os.name == 'posix': # 'posix' 表示类Unix系统,包括Linux
  21. default_address = "127.0.0.1:8848"
  22. else:
  23. default_address = "192.168.3.80:8848"
  24. self.LOCAL_IP = str(os.getenv("NACOS_REGISTER_IP", default=socket.gethostbyname(socket.gethostname())))
  25. self.SERVER_ADDRESSES = default_address
  26. self.NAMESPACE = "dev"
  27. self.DATA_ID = "python-mendunr"
  28. self.GROUP = "DEFAULT_GROUP"
  29. def __new__(cls, *args, **kwargs):
  30. if NacosService.__instance is None:
  31. NacosService.__instance = object.__new__(cls, *args, **kwargs)
  32. return NacosService.__instance
  33. ### 获取配置
  34. def getConfig(self):
  35. return self.CLIENT.get_config(self.DATA_ID, self.GROUP)
  36. ### 注册实例
  37. def register_instance(self):
  38. if self._registered:
  39. return
  40. res = requests.post(url=f"http://{self.SERVER_ADDRESSES}/nacos/v1/ns/instance",
  41. data={
  42. 'groupName': self.GROUP,
  43. 'metadata': '{"preserved.register.source":"SPRING_CLOUD","preserved.heart.beat.interval": 5500, "preserved.heart.beat.timeout": 30000, "preserved.ip.delete.timeout": 60000}',
  44. 'namespaceId': self.NAMESPACE,
  45. 'port': 3333,
  46. 'enable': True,
  47. 'healthy': True,
  48. 'clusterName': 'DEFAULT',
  49. 'ip': self.LOCAL_IP,
  50. 'weight': 1,
  51. 'ephemeral': False, # 修改这里为 False
  52. 'serviceName': self.GROUP + "@@" + self.DATA_ID
  53. })
  54. if res.status_code == 200:
  55. self.logger.info("register_instance res %s", str(res.content, 'UTF-8'))
  56. self._registered = True # 设置注册成功标志位
  57. else:
  58. self.logger.error(f"Failed to register instance: {res.status_code} - {res.text}")
  59. def send_heartbeat(self):
  60. while not self._stop_event.is_set():
  61. try:
  62. res = requests.put(url=f"http://{self.SERVER_ADDRESSES}/nacos/v1/ns/instance/beat",
  63. data={
  64. "ip": self.LOCAL_IP,
  65. "port": "3333",
  66. "namespaceId": self.NAMESPACE,
  67. "serviceName": self.GROUP + "@@" + self.DATA_ID,
  68. "clusterName": "DEFAULT"}
  69. )
  70. if res.status_code == 200:
  71. res_json = res.json()
  72. if res_json["code"] == 20404: ### 找不到资源重新注册
  73. self.register_instance()
  74. else:
  75. self.logger.error(f"Failed to send heartbeat: {res.status_code} - {res.text}")
  76. self._stop_event.wait(4) ## 4秒执行一次
  77. except Exception as e:
  78. self.logger.error(f"send_heartbeat error: {e}")
  79. def getServer(self, serverName):
  80. try:
  81. res = requests.get(url=f"http://{self.SERVER_ADDRESSES}/nacos/v1/ns/instance/list",
  82. params={'namespaceId': self.NAMESPACE, 'serviceName': self.GROUP + "@@" + serverName})
  83. if res.status_code == 200:
  84. hosts = res.json()["hosts"]
  85. if len(hosts) > 0:
  86. return hosts[0]
  87. else:
  88. self.logger.error(f"Failed to get server list: {res.status_code} - {res.text}")
  89. except Exception as e:
  90. self.logger.error(f"getServer error: {e}")
  91. return None
  92. def start(self):
  93. try:
  94. self.CLIENT = nacos.NacosClient(self.SERVER_ADDRESSES, namespace=self.NAMESPACE)
  95. self.register_instance()
  96. self.heartbeat_thread = threading.Thread(target=self.send_heartbeat)
  97. self.heartbeat_thread.start()
  98. except Exception as e:
  99. self.logger.error(f"start error: {e}")
  100. def stop(self):
  101. self._stop_event.set()
  102. self.heartbeat_thread.join()
  103. def get_nacos_configs(self):
  104. url = f"http://{self.SERVER_ADDRESSES}/nacos/v1/cs/configs"
  105. params = {
  106. 'tenant': self.NAMESPACE,
  107. 'dataId': self.DATA_ID,
  108. 'group': self.GROUP
  109. }
  110. try:
  111. r = requests.get(url, params)
  112. r.encoding = r.apparent_encoding
  113. if r.status_code == 200:
  114. r_dict = r.json()
  115. return r_dict
  116. else:
  117. self.logger.error(f"Failed to get Nacos configs: {r.status_code} - {r.text}")
  118. except Exception as e:
  119. self.logger.error(f"get_nacos_configs error: {e}")
  120. return None
  121. nacos_service_single = NacosService()
  122. # 发送url请求,获取nacos上的配置
  123. def get_nacos_configs():
  124. return nacos_service_single.get_nacos_configs()