1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94
| import requests import json import os import sys
# 设置工作目录为脚本所在目录 os.chdir(os.path.dirname(os.path.abspath(__file__)))
s = requests.Session() s.headers['User-Agent'] = 'okhttp/3.10.0'
# 获取所有频道信息 all_channels_response = s.get("http://120.87.12.38:8083/epg/api/custom/getAllChannel.json") all_channels_data = all_channels_response.json()
if all_channels_data['status'] != '200' or 'channels' not in all_channels_data: print("错误:无法获取频道列表或频道列表格式错误。") sys.exit(1)
# 提取频道代码 channel_codes = [channel['params']['hwcode'] for channel in all_channels_data['channels'] if 'params' in channel and 'hwcode' in channel['params']] channel_codes_str = ','.join(channel_codes)
# 获取访问令牌 token_response = s.get("http://120.87.11.11:33200/EPG/oauth/v2/token?grant_type=EncryToken&client_id=shuma2&authinfo=52A57E9F6583C26C2F2E0C6E3FC7452F164A4C84BDEE623FF387E69D25C4C1A57031D44BBAA6DEF42155B7524C4DED856EE61F320E7CFEDEA491B8AFDF90FF4B4830CEE38BAAEC1E88165B70BC52666557400555BED665177880021B8C65ECBF&UserID=0175205891367&DeviceType=TR100-G9&DeviceVersion=219.470.107&userdomain=1&datadomain=1&accountType=1") access_token = token_response.json().get('access_token')
if not access_token: print("错误:未能获取有效的 access token。") sys.exit(1)
# 使用访问令牌请求数据 headers = { 'Authorization': access_token, 'User-Agent': 'okhttp/3.10.0', 'Connection': 'Keep-Alive', 'Content-Type': 'application/json;charset=utf-8' } data = json.dumps({"channelcodes": channel_codes_str}) # 请修改此处IP,确保与鉴权URL的IP一致 response = s.post("http://120.87.11.11:33200/EPG/interEpg/channellist/batch", headers=headers, data=data)
if not response.json(): print("错误:返回的数据为空。") sys.exit(1)
# 保存响应数据到文件 with open('iptv.json', 'w', encoding='UTF-8') as f: json.dump(response.json(), f)
# 复制 base.txt 到 iptv.txt 和 iptv.m3u8 with open('base.txt', 'r', encoding='UTF-8') as f_base, open('iptv.txt', 'w', encoding='UTF-8') as f_iptv, open('iptv.m3u8', 'w', encoding='UTF-8') as f_m3u8: base_content = f_base.read() f_iptv.write(base_content) f_m3u8.write("#EXTM3U\n")
# 读取 iptv.json 并处理数据 with open('iptv.json', 'r', encoding='UTF-8') as f: data = json.load(f) if not data.get('channellist'): print("错误:'channellist' 数据为空。") sys.exit(1)
channels = {}
for item in data['channellist']: hwcode = item['channelcode'] channel_info = next((ch for ch in all_channels_data['channels'] if ch['params'].get('hwcode') == hwcode), None) if not channel_info: continue
channel_name = channel_info['title'] timeshift_url = item['timeshifturl'] group = "其它" if "CCTV" in channel_name or "CGTN" in channel_name: group = "央视" elif any(x in channel_name for x in ["广东", "广州", "大湾区", "南方", "岭南"]): group = "广东" elif "卫视" in channel_name: group = "卫视"
if group: if group not in channels: channels[group] = [] channels[group].append((channel_name, timeshift_url))
# 写入处理后的数据到 iptv.txt 和 iptv.m3u8 with open('iptv.txt', 'a', encoding='UTF-8') as f_iptv, open('iptv.m3u8', 'a', encoding='UTF-8') as f_m3u8: for group, ch_list in channels.items(): f_iptv.write(f"{group},#genre#\n") for channel_name, url in ch_list: f_iptv.write(f"{channel_name},{url}\n") f_m3u8.write(f"#EXTINF:-1,{channel_name}\n{url}\n")
|