1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239
| from xhs import XhsClient import time from typing import List from config import XHS_CONFIG, WECOM_CONFIG, MONITOR_CONFIG from utils import xhs_sign from db import Database from wecom import WecomMessage from comment_generator import CommentGenerator
class XHSMonitor: def __init__(self, cookie: str, corpid: str, agentid: int, secret: str): """ 初始化监控类 :param cookie: 小红书cookie :param corpid: 企业ID :param agentid: 应用ID :param secret: 应用的Secret """ self.client = XhsClient(cookie=cookie, sign=xhs_sign) self.wecom = WecomMessage(corpid, agentid, secret) self.db = Database() self.error_count = 0 self.comment_generator = CommentGenerator() def send_error_notification(self, error_msg: str): """ 发送错误通知 :param error_msg: 错误信息 """ time_str = time.strftime('%Y-%m-%d %H:%M:%S') content = ( "小红书监控异常告警\n" f"错误信息:{error_msg}\n" f"告警时间:{time_str}" ) self.wecom.send_text(content) def get_latest_notes(self, user_id: str) -> List[dict]: """ 获取用户最新笔记 :param user_id: 用户ID :return: 笔记列表 """ try: res_data = self.client.get_user_notes(user_id) self.error_count = 0 return res_data.get('notes', []) except Exception as e: error_msg = str(e)
print(f"获取用户笔记失败: {error_msg}")
time.sleep(60)
self.error_count += 1
if self.error_count >= MONITOR_CONFIG["ERROR_COUNT"]: self.send_error_notification(f"API 请求失败\n详细信息:{error_msg}") exit(-1)
return []
def like_note(self, note_id: str) -> bool: """ 点赞笔记 :param note_id: 笔记ID :return: 是否成功 """ try: time.sleep(MONITOR_CONFIG["LIKE_DELAY"]) self.client.like_note(note_id) print(f"点赞成功: {note_id}") return True except Exception as e: print(f"点赞失败: {e}") return False
def get_note_detail(self, note_id: str, xsec: str) -> dict: """ 获取笔记详细信息 :param note_id: 笔记ID :return: 笔记详细信息 """ try: uri = '/api/sns/web/v1/feed' data = {"source_note_id":note_id,"image_formats":["jpg","webp","avif"],"extra":{"need_body_topic":"1"},"xsec_source":"pc_search","xsec_token": xsec} res = self.client.post(uri, data=data) note_detail = res["items"][0]["note_card"] return note_detail except Exception as e: print(f"获取笔记详情失败: {e}") return {}
def comment_note(self, note_id: str, note_data: dict) -> dict: """ 评论笔记 :param note_id: 笔记ID :param note_data: 笔记数据 :return: 评论结果 """ try: time.sleep(MONITOR_CONFIG["COMMENT_DELAY"]) note_detail = self.get_note_detail(note_id, note_data.get('xsec_token', '')) title = note_detail.get('title', '') content = note_detail.get('desc', '') note_type = '视频' if note_detail.get('type') == 'video' else '图文' content = f"这是一个{note_type}笔记。{content}" comment = self.comment_generator.generate_comment(title, content) self.client.comment_note(note_id, comment) print(f"评论成功: {note_id} - {comment}") return { "comment_status": True, "comment_content": comment } except Exception as e: print(f"评论失败: {e}") return { "comment_status": False, "comment_content": "" }
def interact_with_note(self, note_data: dict) -> dict: """ 与笔记互动(点赞+评论) :param note_data: 笔记数据 :return: 互动结果 """ result = { "like_status": False, "comment_status": False, "comment_content": "" } if not MONITOR_CONFIG.get("AUTO_INTERACT"): return result
note_id = note_data.get('note_id') if not note_id: return result
result["like_status"] = self.like_note(note_id) comment_result = self.comment_note(note_id, note_data)
result["comment_status"] = comment_result["comment_status"] result["comment_content"] = comment_result["comment_content"] return result
def send_note_notification(self, note_data: dict, interact_result: dict = None): """ 发送笔记通知 :param note_data: 笔记数据 :param interact_result: 互动结果 """ note_url = f"https://www.xiaohongshu.com/explore/{note_data.get('note_id')}" user_name = note_data.get('user', {}).get('nickname', '未知用户') title = note_data.get('display_title', '无标题') type = note_data.get('type', '未知类型') time_str = time.strftime('%Y-%m-%d %H:%M:%S') content = [ "小红书用户发布新笔记", f"用户:{user_name}", f"标题:{title}", f"链接:{note_url}", f"类型:{type}", ] if interact_result and MONITOR_CONFIG.get("AUTO_INTERACT"): like_status = "成功" if interact_result["like_status"] else "失败" content.append(f"点赞:{like_status}") if interact_result["comment_status"]: content.append(f"评论:成功") content.append(f"评论内容:{interact_result['comment_content']}") else: content.append(f"评论:失败") content.append(f"监控时间:{time_str}") self.wecom.send_text("\n".join(content))
def monitor_user(self, user_id: str, interval: int): """ 监控用户动态 :param user_id: 用户ID :param interval: 检查间隔(秒) """ print(f"开始监控用户: {user_id}") while True: try: latest_notes = self.get_latest_notes(user_id) existing_notes = self.db.get_user_notes_count(user_id) is_first_monitor = existing_notes == 0 and len(latest_notes) > 1 if is_first_monitor: welcome_msg = ( f"欢迎使用 xhs-monitor 系统\n" f"监控用户:{latest_notes[0].get('user', {}).get('nickname', user_id)}\n" f"首次监控某用户时,不会对历史笔记进行自动点赞和评论,仅保存笔记记录\n" f"以防止被系统以及用户发现" ) self.wecom.send_text(welcome_msg) for note in latest_notes: self.db.add_note_if_not_exists(note) else: for note in latest_notes: if self.db.add_note_if_not_exists(note): print(f"发现新笔记: {note.get('display_title')}") interact_result = self.interact_with_note(note) self.send_note_notification(note, interact_result) except Exception as e: error_msg = str(e) print(f"监控过程发生错误: {error_msg}") time.sleep(interval)
def main(): monitor = XHSMonitor( cookie=XHS_CONFIG["COOKIE"], corpid=WECOM_CONFIG["CORPID"], agentid=WECOM_CONFIG["AGENTID"], secret=WECOM_CONFIG["SECRET"] )
monitor.monitor_user( user_id=MONITOR_CONFIG["USER_ID"], interval=MONITOR_CONFIG["CHECK_INTERVAL"] )
if __name__ == "__main__": main()
|