2024-03-23, update for ticket-plus

master
CHUN YU YAO 2024-04-06 22:53:42 +08:00
parent 3b4074dbca
commit 124c8dc492
6 changed files with 843 additions and 543 deletions

View File

@ -142,11 +142,11 @@ def get_config_dict(args):
if not args.homepage is None:
if len(args.homepage) > 0:
config_dict["homepage"] = args.homepage
if not args.ticket_number is None:
if args.ticket_number > 0:
config_dict["ticket_number"] = args.ticket_number
if not args.browser is None:
if len(args.browser) > 0:
config_dict["browser"] = args.browser
@ -154,7 +154,7 @@ def get_config_dict(args):
if not args.tixcraft_sid is None:
if len(args.tixcraft_sid) > 0:
config_dict["advanced"]["tixcraft_sid"] = args.tixcraft_sid
if not args.ibonqware is None:
if len(args.ibonqware) > 0:
config_dict["advanced"]["ibonqware"] = args.ibonqware
@ -165,7 +165,7 @@ def get_config_dict(args):
if not args.kktix_password is None:
if len(args.kktix_password) > 0:
config_dict["advanced"]["kktix_password_plaintext"] = args.kktix_password
if not args.proxy_server is None:
if len(args.proxy_server) > 2:
config_dict["advanced"]["proxy_server_port"] = args.proxy_server
@ -590,7 +590,7 @@ def close_browser_tabs(driver):
def get_driver_by_config(config_dict):
driver = None
# read config.
homepage = config_dict["homepage"]
@ -771,9 +771,6 @@ def get_driver_by_config(config_dict):
if len(config_dict["advanced"]["fami_account"])>0:
homepage = CONST_FAMI_SIGN_IN_URL
if 'ibon.com' in homepage:
pass
if 'kham.com' in homepage:
if len(config_dict["advanced"]["kham_account"])>0:
homepage = CONST_KHAM_SIGN_IN_URL
@ -794,9 +791,6 @@ def get_driver_by_config(config_dict):
if len(config_dict["advanced"]["hkticketing_account"])>0:
homepage = CONST_HKTICKETING_SIGN_IN_URL
if 'galaxymacau.com' in homepage:
pass
if 'ticketplus.com.tw' in homepage:
if len(config_dict["advanced"]["ticketplus_account"]) > 1:
homepage = "https://ticketplus.com.tw/"
@ -816,14 +810,14 @@ def get_driver_by_config(config_dict):
tixcraft_family = True
if tixcraft_family:
if len(config_dict["advanced"]["tixcraft_sid"]) > 1:
tixcraft_sid = config_dict["advanced"]["tixcraft_sid"]
tixcraft_sid = config_dict["advanced"]["tixcraft_sid"]
if len(tixcraft_sid) > 1:
driver.delete_cookie("SID")
driver.add_cookie({"name":"SID", "value": tixcraft_sid, "path" : "/", "secure":True})
if 'ibon.com' in homepage:
if len(config_dict["advanced"]["ibonqware"]) > 1:
ibonqware = config_dict["advanced"]["ibonqware"]
ibonqware = config_dict["advanced"]["ibonqware"]
if len(ibonqware) > 1:
driver.delete_cookie("ibonqware")
driver.add_cookie({"name":"ibonqware", "value": ibonqware, "domain" : "ibon.com.tw", "secure":True})
@ -926,66 +920,32 @@ def force_press_button(driver, select_by, select_query, force_submit=True):
return ret
# close some div on home url.
def tixcraft_home_close_window(driver, config_dict):
show_debug_message = True # debug.
show_debug_message = False # online
def tixcraft_home_close_window(driver):
accept_all_cookies_btn = None
try:
accept_all_cookies_btn = driver.find_element(By.CSS_SELECTOR, '#onetrust-accept-btn-handler')
if accept_all_cookies_btn:
accept_all_cookies_btn.click()
except Exception as exc:
#print(exc)
if show_debug_message:
print("find accept_all_cookies_btn fail")
pass
if not accept_all_cookies_btn is None:
is_visible = False
try:
if accept_all_cookies_btn.is_enabled() and accept_all_cookies_btn.is_displayed():
is_visible = True
except Exception as exc:
#print(exc)
pass
if is_visible:
if show_debug_message:
print("accept_all_cookies_btn visible. start to press.")
try:
accept_all_cookies_btn.click()
except Exception as exc:
#print(exc)
print("try to click accept_all_cookies_btn fail, force click by js.")
try:
driver.execute_script("arguments[0].click();", accept_all_cookies_btn)
except Exception as exc:
pass
else:
if show_debug_message:
print("accept_all_cookies_btn invisible.")
# from detail to game
def tixcraft_redirect(driver, url):
ret = False
game_name = ""
# get game_name from url
url_split = url.split("/")
if len(url_split) >= 6:
game_name = url_split[5]
if "/activity/detail/%s" % (game_name,) in url:
# to support teamear
entry_url = url.replace("/activity/detail/","/activity/game/")
print("redirec to new url:", entry_url)
try:
driver.get(entry_url)
except Exception as exec1:
pass
ret = True
if len(game_name) > 0:
if "/activity/detail/%s" % (game_name,) in url:
entry_url = url.replace("/activity/detail/","/activity/game/")
print("redirec to new url:", entry_url)
try:
driver.get(entry_url)
ret = True
except Exception as exec1:
pass
return ret
@ -1728,25 +1688,25 @@ def ticket_number_select_fill(driver, select_obj, ticket_number):
return is_ticket_number_assigned
def get_div_text_by_selector(driver, my_css_selector):
def get_text_by_selector(driver, my_css_selector, attribute='innerHTML'):
div_element = None
try:
div_element = driver.find_element(By.CSS_SELECTOR, my_css_selector)
except Exception as exc:
print("find verify textbox fail")
#print("find element fail")
pass
question_text = ""
row_text = ""
if not div_element is None:
try:
question_text = div_element.text
if attribute=='innerText':
row_html = div_element.get_attribute('innerHTML')
row_text = util.remove_html_tags(row_html)
else:
row_text = div_element.get_attribute(attribute)
except Exception as exc:
print("get text fail")
if question_text is None:
question_text = ""
return question_text
print("get text fail:", my_css_selector)
return row_text
def fill_common_verify_form(driver, config_dict, inferred_answer_string, fail_list, input_text_css, next_step_button_css, submit_by_enter, check_input_interval):
@ -1910,7 +1870,7 @@ def tixcraft_input_check_code(driver, config_dict, fail_list, question_selector)
answer_list = []
question_text = get_div_text_by_selector(driver, question_selector)
question_text = get_text_by_selector(driver, question_selector, 'innerText')
if len(question_text) > 0:
write_question_to_file(question_text)
@ -2804,7 +2764,7 @@ def kktix_check_agree_checkbox(driver, config_dict):
if is_dom_ready:
is_finish_checkbox_click = check_checkbox(driver, By.CSS_SELECTOR, '#person_agree_terms')
#print("status:", is_dom_ready, is_finish_checkbox_click)
return is_dom_ready, is_finish_checkbox_click
@ -2858,30 +2818,6 @@ def force_check_checkbox(driver, agree_checkbox):
return is_finish_checkbox_click
def kktix_reg_captcha_question_text(captcha_inner_div):
captcha_text_div = None
try:
captcha_text_div = captcha_inner_div.find_element(By.TAG_NAME, "p")
except Exception as exc:
pass
print("find p tag(captcha_text_div) fail")
print(exc)
question_text = None
if not captcha_text_div is None:
try:
question_text = captcha_text_div.text
except Exception as exc:
pass
if question_text is None:
question_text = ""
return question_text
# PS: no double check, NOW.
def kktix_double_check_all_text_value(driver, ticket_number):
is_do_press_next_button = False
@ -2916,19 +2852,6 @@ def kktix_double_check_all_text_value(driver, ticket_number):
return is_do_press_next_button
# 本票券需要符合以下任一資格才可以購買
def get_kktix_control_label_text(driver):
question_text = ""
captcha_inner_div = None
try:
captcha_inner_div = driver.find_element(By.CSS_SELECTOR, 'div > div.code-input > div.control-group > label.control-label')
if not captcha_inner_div is None:
question_text = util.remove_html_tags(captcha_inner_div.get_attribute('innerHTML'))
except Exception as exc:
pass
return question_text
def set_kktix_control_label_text(driver, config_dict):
fail_list = []
answer_list = util.get_answer_list_from_user_guess_string(config_dict, CONST_MAXBOT_ANSWER_ONLINE_FILE)
@ -2943,18 +2866,6 @@ def set_kktix_control_label_text(driver, config_dict):
check_input_interval = 0.2
is_answer_sent, fail_list = fill_common_verify_form(driver, config_dict, inferred_answer_string, fail_list, input_text_css, next_step_button_css, submit_by_enter, check_input_interval)
def get_kktix_question_text(driver):
question_text = ""
captcha_inner_div = None
try:
captcha_inner_div = driver.find_element(By.CSS_SELECTOR, 'div.custom-captcha-inner')
except Exception as exc:
pass
if not captcha_inner_div is None:
question_text = kktix_reg_captcha_question_text(captcha_inner_div)
return question_text
def kktix_reg_captcha(driver, config_dict, fail_list, registrationsNewApp_div):
show_debug_message = True # debug.
@ -2966,7 +2877,7 @@ def kktix_reg_captcha(driver, config_dict, fail_list, registrationsNewApp_div):
answer_list = []
is_question_popup = False
question_text = get_kktix_question_text(driver)
question_text = get_text_by_selector(driver, 'div.custom-captcha-inner p', 'innerText')
if len(question_text) > 0:
is_question_popup = True
write_question_to_file(question_text)
@ -3067,14 +2978,14 @@ def kktix_reg_new_main(driver, config_dict, fail_list, played_sound_ticket):
# whole event question.
fail_list, is_question_popup = kktix_reg_captcha(driver, config_dict, fail_list, registrationsNewApp_div)
# single option question
if not is_question_popup:
# no captcha text popup, goto next page.
control_text = get_kktix_control_label_text(driver)
control_text = get_text_by_selector(driver, 'div > div.code-input > div.control-group > label.control-label', 'innerText')
if show_debug_message:
print("control_text:", control_text)
if len(control_text) == 0:
click_ret = kktix_press_next_button(driver)
else:
@ -3086,7 +2997,7 @@ def kktix_reg_new_main(driver, config_dict, fail_list, played_sound_ticket):
else:
if not config_dict["advanced"]["chrome_extension"]:
is_fill_at_webdriver = True
# TODO: not implement in extension, so force to fill in webdriver.
is_fill_at_webdriver = True
if is_fill_at_webdriver:
@ -6014,7 +5925,7 @@ def ticketmaster_captcha(driver, config_dict, ocr, Captcha_Browser, domain_name)
break
def tixcraft_main(driver, url, config_dict, tixcraft_dict, ocr, Captcha_Browser):
tixcraft_home_close_window(driver, config_dict)
tixcraft_home_close_window(driver)
home_url_list = ['https://tixcraft.com/'
,'https://indievox.com/'
@ -6030,7 +5941,7 @@ def tixcraft_main(driver, url, config_dict, tixcraft_dict, ocr, Captcha_Browser)
pass
break
# special case for same event re-open.
# special case for same event re-open, redirect to user's homepage.
if 'https://tixcraft.com/' == url or 'https://tixcraft.com/activity' == url:
if "/ticket/area/" in config_dict["homepage"]:
if len(config_dict["homepage"].split('/'))==7:
@ -6104,6 +6015,7 @@ def tixcraft_main(driver, url, config_dict, tixcraft_dict, ocr, Captcha_Browser)
if '/ticket/order' in url:
tixcraft_dict["done_time"] = time.time()
is_quit_bot = False
if '/ticket/checkout' in url:
if not tixcraft_dict["start_time"] is None:
if not tixcraft_dict["done_time"] is None:
@ -6119,9 +6031,8 @@ def tixcraft_main(driver, url, config_dict, tixcraft_dict, ocr, Captcha_Browser)
print("搶票成功, 請前往該帳號訂單查看: %s" % (checkout_url))
webbrowser.open_new(checkout_url)
tixcraft_dict["is_popup_checkout"] = True
driver.quit()
sys.exit()
is_quit_bot = True
if config_dict["advanced"]["play_sound"]["order"]:
if not tixcraft_dict["played_sound_order"]:
play_sound_while_ordering(config_dict)
@ -6130,7 +6041,7 @@ def tixcraft_main(driver, url, config_dict, tixcraft_dict, ocr, Captcha_Browser)
tixcraft_dict["is_popup_checkout"] = False
tixcraft_dict["played_sound_order"] = False
return tixcraft_dict
return tixcraft_dict, is_quit_bot
def kktix_paused_main(driver, url, config_dict, kktix_dict):
is_url_contain_sign_in = False
@ -6143,7 +6054,7 @@ def kktix_paused_main(driver, url, config_dict, kktix_dict):
if len(kktix_account) > 4:
kktix_login(driver, kktix_account, kktix_password)
is_url_contain_sign_in = True
# PS: after test, this still not popup reCaptcha.
if not is_url_contain_sign_in:
if '/registrations/new' in url:
@ -6230,57 +6141,55 @@ def kktix_main(driver, url, config_dict, kktix_dict):
# break loop.
is_kktix_got_ticket = False
is_quit_bot = False
if is_kktix_got_ticket:
if not kktix_dict["start_time"] is None:
if not kktix_dict["done_time"] is None:
bot_elapsed_time = kktix_dict["done_time"] - kktix_dict["start_time"]
if kktix_dict["elapsed_time"] != bot_elapsed_time:
print("bot elapsed time:", "{:.3f}".format(bot_elapsed_time))
kktix_dict["elapsed_time"] = bot_elapsed_time
if config_dict["advanced"]["play_sound"]["order"]:
if not kktix_dict["played_sound_order"]:
play_sound_while_ordering(config_dict)
kktix_dict["played_sound_order"] = True
if not kktix_dict["start_time"] is None:
if not kktix_dict["done_time"] is None:
bot_elapsed_time = kktix_dict["done_time"] - kktix_dict["start_time"]
if kktix_dict["elapsed_time"] != bot_elapsed_time:
print("bot elapsed time:", "{:.3f}".format(bot_elapsed_time))
kktix_dict["elapsed_time"] = bot_elapsed_time
if config_dict["advanced"]["headless"]:
if not kktix_dict["is_popup_checkout"]:
kktix_account = config_dict["advanced"]["kktix_account"]
kktix_password = config_dict["advanced"]["kktix_password_plaintext"].strip()
if kktix_password == "":
kktix_password = util.decryptMe(config_dict["advanced"]["kktix_password"])
print("基本資料(或實名制)網址:", url)
if len(kktix_account) > 0:
print("搶票成功, 帳號:", kktix_account)
if config_dict["advanced"]["play_sound"]["order"]:
if not kktix_dict["played_sound_order"]:
play_sound_while_ordering(config_dict)
script_name = "chrome_tixcraft"
if config_dict["advanced"]["webdriver_type"] == CONST_WEBDRIVER_TYPE_NODRIVER:
script_name = "nodriver_tixcraft"
threading.Thread(target=util.launch_maxbot, args=(script_name,"", url, kktix_account, kktix_password,"","false",)).start()
#driver.quit()
#sys.exit()
kktix_dict["played_sound_order"] = True
is_event_page = False
if len(url.split('/'))>=7:
is_event_page = True
if is_event_page:
confirm_clicked = kktix_confirm_order_button(driver)
if confirm_clicked:
domain_name = url.split('/')[2]
checkout_url = "https://%s/account/orders" % (domain_name)
print("搶票成功, 請前往該帳號訂單查看: %s" % (checkout_url))
webbrowser.open_new(checkout_url)
kktix_dict["is_popup_checkout"] = True
driver.quit()
sys.exit()
if config_dict["advanced"]["headless"]:
if not kktix_dict["is_popup_checkout"]:
kktix_account = config_dict["advanced"]["kktix_account"]
kktix_password = config_dict["advanced"]["kktix_password_plaintext"].strip()
if kktix_password == "":
kktix_password = util.decryptMe(config_dict["advanced"]["kktix_password"])
print("基本資料(或實名制)網址:", url)
if len(kktix_account) > 0:
print("搶票成功, 帳號:", kktix_account)
script_name = "chrome_tixcraft"
if config_dict["advanced"]["webdriver_type"] == CONST_WEBDRIVER_TYPE_NODRIVER:
script_name = "nodriver_tixcraft"
threading.Thread(target=util.launch_maxbot, args=(script_name,"", url, kktix_account, kktix_password,"","false",)).start()
is_event_page = False
if len(url.split('/'))>=7:
is_event_page = True
if is_event_page:
confirm_clicked = kktix_confirm_order_button(driver)
if confirm_clicked:
domain_name = url.split('/')[2]
checkout_url = "https://%s/account/orders" % (domain_name)
print("搶票成功, 請前往該帳號訂單查看: %s" % (checkout_url))
webbrowser.open_new(checkout_url)
kktix_dict["is_popup_checkout"] = True
is_quit_bot = True
else:
kktix_dict["is_popup_checkout"] = False
kktix_dict["played_sound_order"] = False
return kktix_dict
return kktix_dict, is_quit_bot
def fami_login(driver, account, password):
is_email_sent = assign_text(driver, By.CSS_SELECTOR, '#usr_act', account)
@ -6892,7 +6801,7 @@ def cityline_main(driver, url, config_dict):
if len(url.split('/')) == 6:
fail_list = []
fail_list = cityline_input_code(driver, config_dict, fail_list)
def get_ibon_question_text(driver):
question_div = None
@ -10024,7 +9933,7 @@ def ticketplus_order_exclusive_code(driver, config_dict, fail_list):
answer_list = []
question_selector = ".exclusive-code > form > div"
question_text = get_div_text_by_selector(driver, question_selector)
question_text = get_text_by_selector(driver, question_selector, 'innerText')
is_answer_sent = False
is_question_popup = False
if len(question_text) > 0:
@ -10509,7 +10418,6 @@ def ticketplus_account_auto_fill(driver, config_dict):
# auto fill account info.
if len(config_dict["advanced"]["ticketplus_account"]) > 0:
try:
all_cookies=list_all_cookies(driver)
if 'user' in all_cookies:
@ -10523,16 +10431,63 @@ def ticketplus_account_auto_fill(driver, config_dict):
#print("is_user_signin:", is_user_signin)
if not is_user_signin:
sign_in_btn = None
is_sign_in_btn_pressed = False
try:
my_css_selector = 'button.v-btn > span.v-btn__content > i.mdi-account'
sign_in_btn = driver.find_element(By.CSS_SELECTOR, my_css_selector)
if not sign_in_btn is None:
sign_in_btn.click()
is_sign_in_btn_pressed = True
time.sleep(0.2)
except Exception as exc:
#print(exc)
# RWD mode not show sign in icon.
pass
#print("is_sign_in_btn_pressed", is_sign_in_btn_pressed)
if not is_sign_in_btn_pressed:
#print("rwd mode")
action_btns = None
try:
my_css_selector = 'div.px-4.py-3.drawerItem.cursor-pointer'
action_btns = driver.find_elements(By.CSS_SELECTOR, my_css_selector)
except Exception as exc:
#print(exc)
pass
if action_btns:
#print("len:", len(action_btns))
if len(action_btns) >= 4:
el_pass = None
try:
my_css_selector = 'input[type="password"]'
el_pass = driver.find_element(By.CSS_SELECTOR, my_css_selector)
except Exception as exc:
#print(exc)
pass
is_need_popup_modal = False
if el_pass is None:
is_need_popup_modal = True
else:
try:
if not el_pass.is_displayed():
is_need_popup_modal = True
except Exception as exc:
#print(exc)
pass
print("is_need_popup_modal", is_need_popup_modal)
if is_need_popup_modal:
print("show sign in modal")
#action_btns[3].click()
try:
driver.set_script_timeout(1)
driver.execute_script("arguments[0].click();", action_btns[3])
except Exception as exc:
#print(exc)
pass
is_account_sent, is_password_sent = ticketplus_account_sign_in(driver, config_dict)
return is_user_signin
@ -10947,6 +10902,7 @@ def main(args):
pass
maxbot_last_reset_time = time.time()
is_quit_bot = False
while True:
time.sleep(0.05)
@ -10955,8 +10911,15 @@ def main(args):
print("web driver not accessible!")
break
url, is_quit_bot = get_current_url(driver)
if not is_quit_bot:
url, is_quit_bot = get_current_url(driver)
if is_quit_bot:
try:
driver.quit()
driver = None
except Exception as e:
pass
break
if url is None:
@ -11001,7 +10964,7 @@ def main(args):
tixcraft_family = True
if tixcraft_family:
tixcraft_dict = tixcraft_main(driver, url, config_dict, tixcraft_dict, ocr, Captcha_Browser)
tixcraft_dict, is_quit_bot = tixcraft_main(driver, url, config_dict, tixcraft_dict, ocr, Captcha_Browser)
# for kktix.cc and kktix.com
if 'kktix.c' in url:
@ -11054,6 +11017,7 @@ def main(args):
facebook_password = util.decryptMe(config_dict["advanced"]["facebook_password"])
if len(facebook_account) > 4:
facebook_login(driver, facebook_account, facebook_password)
time.sleep(2)
def cli():
parser = argparse.ArgumentParser(

View File

@ -1,184 +0,0 @@
#!/usr/bin/env python
#encoding=utf-8
import argparse
import asyncio
import base64
import json
import os
import sys
import time
import nodriver as uc
import requests
import util
CONST_APP_VERSION = "MaxBot (2024.03.23)"
CONST_MAXBOT_CONFIG_FILE = "settings.json"
USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36"
PROFILE_URL = "https://kktix.com/users/edit"
SIGNIN_URL = "https://kktix.com/users/sign_in"
def load_json():
app_root = util.get_app_root()
config_filepath = os.path.join(app_root, CONST_MAXBOT_CONFIG_FILE)
config_dict = None
if os.path.isfile(config_filepath):
with open(config_filepath) as json_data:
config_dict = json.load(json_data)
else:
config_dict = get_default_config()
return config_filepath, config_dict
def kktix_signin_requests(kktix_account, kktix_password):
import urllib.parse
headers = {
"accept-language": "zh-TW;q=0.7",
"cache-control": "max-age=0",
"origin": "https://kktix.com",
'User-Agent': USER_AGENT
}
authenticity_token = ""
utf8 = ""
try:
response = requests.get(profile_url , headers=headers, timeout=0.7, allow_redirects=True)
status_code = response.status_code
#print("status_code:",status_code)
#if status_code == 200:
#print(response.content)
response = requests.get(signin_url , headers=headers, timeout=0.7, allow_redirects=False)
s = requests.session()
#print(response.cookies)
for key, value in response.cookies.items():
print(key + '=' + value)
if key=="XSRF-TOKEN":
#authenticity_token = urllib.parse.unquote(value)
authenticity_token = value
login_data={
"utf8": utf8,
"authenticity_token": authenticity_token,
"user[login]": kktix_account,
"user[password]": kktix_password,
"user[remember_me]": 0
}
#print("login_data", login_data)
login_data_string = 'utf8=%E2%9C%93&authenticity_token='+ authenticity_token +'&user%5Blogin%5D='+ kktix_account +'&user%5Bpassword%5D='+ kktix_password +'&user%5Bremember_me%5D=0'
#print("login_data_string", login_data_string)
response=s.post(signin_url, data=login_data)
#response=s.post(signin_url, data=login_data_string)
status_code = response.status_code
#print("status_code:",status_code)
#if status_code == 200:
#print (response.status_code)
#print (response.content)
response = s.get(profile_url , headers=headers, timeout=0.7, allow_redirects=False)
status_code = response.status_code
#print("status_code:",status_code)
#if status_code == 200:
#print(response.content)
except Exception as exc:
print(exc)
async def kktix_signin_nodriver(tab, kktix_account, kktix_password):
while True:
try:
#html = await tab.get_content()
#await tab.sleep(0.1)
#print(html)
x_window = await tab.js_dumps('window')
#print(x_window["location"]["href"])
if x_window["location"]["href"]==SIGNIN_URL:
account = await tab.select("#user_login")
await account.send_keys(kktix_account)
#await tab.sleep(0.1)
password = await tab.select("#user_password")
await password.send_keys(kktix_password)
#await tab.sleep(0.1)
submit = await tab.select("div.form-actions a.btn-primary")
await submit.click()
await tab.sleep(0.5)
#tab = await tab.get(SIGNOUT_URL)
signout = await tab.select("a[href='/users/sign_out']")
await signout.click()
await tab.sleep(0.5)
tab = await tab.get(SIGNIN_URL)
except Exception as e:
print(e)
if str(e)=="coroutine raised StopIteration":
break
pass
#def kktix_signout_main(config_dict):
async def kktix_signout_main(config_dict):
conf = util.get_extension_config()
driver = await uc.start(conf)
tab = await driver.get(SIGNIN_URL)
#if not config_dict["advanced"]["headless"]:
if len(config_dict["advanced"]["window_size"]) > 0:
if "," in config_dict["advanced"]["window_size"]:
target_array = config_dict["advanced"]["window_size"].split(",")
await tab.set_window_size(left=20, top=20, width=int(target_array[0]), height=int(target_array[1]))
kktix_account = config_dict["advanced"]["kktix_account"]
kktix_password = config_dict["advanced"]["kktix_password_plaintext"].strip()
if kktix_password == "":
kktix_password = util.decryptMe(config_dict["advanced"]["kktix_password"])
print("kktix_account:", kktix_account)
#print("kktix_password:", kktix_password)
#kktix_signin_requests(kktix_account, kktix_password)
await kktix_signin_nodriver(tab, kktix_account, kktix_password)
#def main(args):
async def main(args):
config_filepath, config_dict = load_json()
if not args.kktix_account is None:
if len(args.kktix_account) > 0:
config_dict["advanced"]["kktix_account"] = args.kktix_account
if not args.kktix_password is None:
if len(args.kktix_password) > 0:
config_dict["advanced"]["kktix_password_plaintext"] = args.kktix_password
if len(config_dict["advanced"]["kktix_account"]) > 0:
#kktix_account_loop(config_dict)
await kktix_signout_main(config_dict)
else:
print("請輸入 kktix_account")
if __name__ == '__main__':
parser = argparse.ArgumentParser(
description="MaxBot Aggument Parser")
parser.add_argument("--kktix_account",
help="overwrite kktix_account field",
type=str)
parser.add_argument("--kktix_password",
help="overwrite kktix_password field",
type=str)
args = parser.parse_args()
#main(args)
uc.loop().run_until_complete(main(args))

View File

@ -125,11 +125,11 @@ def get_config_dict(args):
if not args.homepage is None:
if len(args.homepage) > 0:
config_dict["homepage"] = args.homepage
if not args.ticket_number is None:
if args.ticket_number > 0:
config_dict["ticket_number"] = args.ticket_number
if not args.browser is None:
if len(args.browser) > 0:
config_dict["browser"] = args.browser
@ -147,7 +147,7 @@ def get_config_dict(args):
if not args.kktix_password is None:
if len(args.kktix_password) > 0:
config_dict["advanced"]["kktix_password_plaintext"] = args.kktix_password
if not args.proxy_server is None:
if len(args.proxy_server) > 2:
config_dict["advanced"]["proxy_server_port"] = args.proxy_server
@ -156,7 +156,6 @@ def get_config_dict(args):
if len(args.window_size) > 2:
config_dict["advanced"]["window_size"] = args.window_size
# special case for headless to enable away from keyboard mode.
is_headless_enable_ocr = False
if config_dict["advanced"]["headless"]:
@ -192,23 +191,48 @@ def play_sound_while_ordering(config_dict):
captcha_sound_filename = os.path.join(app_root, config_dict["advanced"]["play_sound"]["filename"].strip())
util.play_mp3_async(captcha_sound_filename)
async def nodriver_facebook_login(tab, facebook_account, facebook_password):
if tab:
try:
account = await tab.query_selector("#email")
if account:
await account.send_keys(facebook_account)
else:
print("account not found")
password = await tab.query_selector("#pass")
if password:
await password.send_keys(facebook_password)
await tab.send(cdp.input_.dispatch_key_event("keyDown", code="Enter", key="Enter", text="\r", windows_virtual_key_code=13))
await tab.send(cdp.input_.dispatch_key_event("keyUp", code="Enter", key="Enter", text="\r", windows_virtual_key_code=13))
time.sleep(2)
else:
print("password not found")
except Exception as e:
print("send_keys fail.")
print(e)
pass
async def nodriver_kktix_signin(tab, url, config_dict):
kktix_account = config_dict["advanced"]["kktix_account"]
kktix_password = config_dict["advanced"]["kktix_password_plaintext"].strip()
if kktix_password == "":
kktix_password = util.decryptMe(config_dict["advanced"]["kktix_password"])
if len(kktix_account) > 4:
account = await tab.select("#user_login")
await account.send_keys(kktix_account)
#await tab.sleep(0.1)
try:
account = await tab.query_selector("#user_login")
await account.send_keys(kktix_account)
password = await tab.select("#user_password")
await password.send_keys(kktix_password)
#await tab.sleep(0.1)
password = await tab.query_selector("#user_password")
await password.send_keys(kktix_password)
submit = await tab.select("input[type='submit'][name]")
await submit.click()
await tab.sleep(0.2)
submit = await tab.query_selector("input[type='submit'][name]")
await submit.click()
time.sleep(0.2)
except Exception as e:
print(e)
pass
async def nodriver_kktix_paused_main(tab, url, config_dict, kktix_dict):
is_url_contain_sign_in = False
@ -225,7 +249,69 @@ async def nodriver_goto_homepage(driver, config_dict):
if len(config_dict["advanced"]["kktix_account"])>0:
if not 'https://kktix.com/users/sign_in?' in homepage:
homepage = CONST_KKTIX_SIGN_IN_URL % (homepage)
return await driver.get(homepage)
if 'famiticket.com' in homepage:
if len(config_dict["advanced"]["fami_account"])>0:
homepage = CONST_FAMI_SIGN_IN_URL
if 'kham.com' in homepage:
if len(config_dict["advanced"]["kham_account"])>0:
homepage = CONST_KHAM_SIGN_IN_URL
if 'ticket.com.tw' in homepage:
if len(config_dict["advanced"]["ticket_account"])>0:
homepage = CONST_TICKET_SIGN_IN_URL
if 'urbtix.hk' in homepage:
if len(config_dict["advanced"]["urbtix_account"])>0:
homepage = CONST_URBTIX_SIGN_IN_URL
if 'cityline.com' in homepage:
if len(config_dict["advanced"]["cityline_account"])>0:
homepage = CONST_CITYLINE_SIGN_IN_URL
if 'hkticketing.com' in homepage:
if len(config_dict["advanced"]["hkticketing_account"])>0:
homepage = CONST_HKTICKETING_SIGN_IN_URL
if 'ticketplus.com.tw' in homepage:
if len(config_dict["advanced"]["ticketplus_account"]) > 1:
homepage = "https://ticketplus.com.tw/"
tab = await driver.get(homepage)
time.sleep(1)
tixcraft_family = False
if 'tixcraft.com' in homepage:
tixcraft_family = True
if 'indievox.com' in homepage:
tixcraft_family = True
if 'ticketmaster.' in homepage:
tixcraft_family = True
if tixcraft_family:
tixcraft_sid = config_dict["advanced"]["tixcraft_sid"]
if len(tixcraft_sid) > 1:
cookies = await driver.cookies.get_all()
for cookie in cookies:
if cookie.name=='SID':
cookie.value=tixcraft_sid
break
await driver.cookies.set_all(cookies)
if 'ibon.com' in homepage:
ibonqware = config_dict["advanced"]["ibonqware"]
if len(ibonqware) > 1:
cookies = await driver.cookies.get_all()
for cookie in cookies:
if cookie.name=='ibonqware':
cookie.value=ibonqware
break
await driver.cookies.set_all(cookies)
return tab
async def nodriver_kktix_travel_price_list(tab, config_dict, kktix_area_auto_select_mode, kktix_area_keyword):
show_debug_message = True # debug.
@ -412,7 +498,6 @@ async def nodriver_kktix_travel_price_list(tab, config_dict, kktix_area_auto_sel
if kktix_area_auto_select_mode == CONST_FROM_TOP_TO_BOTTOM:
break
if not is_dom_ready:
# not sure to break or continue..., maybe break better.
break
@ -483,36 +568,8 @@ async def nodriver_kktix_assign_ticket_number(tab, config_dict, kktix_area_keywo
# already assigned.
is_ticket_number_assigned = True
return is_dom_ready, is_ticket_number_assigned, is_need_refresh
async def nodriver_get_kktix_question_text(tab):
question_text = ""
captcha_inner_div = None
try:
captcha_inner_div = await tab.select('div.custom-captcha-inner p')
if not captcha_inner_div is None:
js_attr = await captcha_inner_div.get_js_attributes()
question_text = js_attr["innerText"].strip()
except Exception as exc:
print(exc)
pass
return question_text
# 本票券需要符合以下任一資格才可以購買
async def nodriver_get_kktix_control_label_text(tab):
question_text = ""
captcha_inner_div = None
try:
captcha_inner_div = await tab.select('div > div.code-input > div.control-group > label.control-label')
if not captcha_inner_div is None:
js_attr = await captcha_inner_div.get_js_attributes()
question_text = js_attr["innerText"]
except Exception as exc:
pass
return question_text
async def nodriver_kktix_reg_captcha(tab, config_dict, fail_list, registrationsNewApp_div):
show_debug_message = True # debug.
@ -524,7 +581,7 @@ async def nodriver_kktix_reg_captcha(tab, config_dict, fail_list, registrationsN
answer_list = []
is_question_popup = False
question_text = await nodriver_get_kktix_question_text(tab)
question_text = await nodriver_get_text_by_selector(tab, 'div.custom-captcha-inner p', 'innerText')
if len(question_text) > 0:
is_question_popup = True
write_question_to_file(question_text)
@ -555,7 +612,7 @@ async def nodriver_kktix_reg_captcha(tab, config_dict, fail_list, registrationsN
check_input_interval = 0.2
#is_answer_sent, fail_list = fill_common_verify_form(tab, config_dict, inferred_answer_string, fail_list, input_text_css, next_step_button_css, submit_by_enter, check_input_interval)
if len(answer_list) > 0:
input_text = await tab.select(input_text_css)
input_text = await tab.query_selector(input_text_css)
if not input_text is None:
await input_text.send_keys(answer_list[0])
@ -604,7 +661,7 @@ async def nodriver_kktix_reg_new_main(tab, config_dict, fail_list, played_sound_
# part 1: check div.
registrationsNewApp_div = None
try:
registrationsNewApp_div = await tab.select('#registrationsNewApp')
registrationsNewApp_div = await tab.query_selector('#registrationsNewApp')
except Exception as exc:
pass
#print("find input fail:", exc)
@ -661,14 +718,14 @@ async def nodriver_kktix_reg_new_main(tab, config_dict, fail_list, played_sound_
# whole event question.
fail_list, is_question_popup = await nodriver_kktix_reg_captcha(tab, config_dict, fail_list, registrationsNewApp_div)
# single option question
if not is_question_popup:
# no captcha text popup, goto next page.
control_text = await nodriver_get_kktix_control_label_text(tab)
control_text = await nodriver_get_text_by_selector(tab, 'div > div.code-input > div.control-group > label.control-label', 'innerText')
if show_debug_message:
print("control_text:", control_text)
if len(control_text) == 0:
click_ret = await nodriver_kktix_press_next_button(tab)
else:
@ -680,7 +737,7 @@ async def nodriver_kktix_reg_new_main(tab, config_dict, fail_list, played_sound_
else:
if not config_dict["advanced"]["chrome_extension"]:
is_fill_at_webdriver = True
# TODO: not implement in extension, so force to fill in webdriver.
is_fill_at_webdriver = True
if is_fill_at_webdriver:
@ -773,66 +830,494 @@ async def nodriver_kktix_main(tab, url, config_dict, kktix_dict):
# break loop.
is_kktix_got_ticket = False
is_quit_bot = False
if is_kktix_got_ticket:
if not kktix_dict["start_time"] is None:
if not kktix_dict["done_time"] is None:
bot_elapsed_time = kktix_dict["done_time"] - kktix_dict["start_time"]
if kktix_dict["elapsed_time"] != bot_elapsed_time:
print("bot elapsed time:", "{:.3f}".format(bot_elapsed_time))
kktix_dict["elapsed_time"] = bot_elapsed_time
if config_dict["advanced"]["play_sound"]["order"]:
if not kktix_dict["played_sound_order"]:
play_sound_while_ordering(config_dict)
kktix_dict["played_sound_order"] = True
if not kktix_dict["start_time"] is None:
if not kktix_dict["done_time"] is None:
bot_elapsed_time = kktix_dict["done_time"] - kktix_dict["start_time"]
if kktix_dict["elapsed_time"] != bot_elapsed_time:
print("bot elapsed time:", "{:.3f}".format(bot_elapsed_time))
kktix_dict["elapsed_time"] = bot_elapsed_time
if config_dict["advanced"]["headless"]:
if not kktix_dict["is_popup_checkout"]:
kktix_account = config_dict["advanced"]["kktix_account"]
kktix_password = config_dict["advanced"]["kktix_password_plaintext"].strip()
if kktix_password == "":
kktix_password = util.decryptMe(config_dict["advanced"]["kktix_password"])
print("基本資料(或實名制)網址:", url)
if len(kktix_account) > 0:
print("搶票成功, 帳號:", kktix_account)
script_name = "chrome_tixcraft"
if config_dict["advanced"]["webdriver_type"] == CONST_WEBDRIVER_TYPE_NODRIVER:
script_name = "nodriver_tixcraft"
if config_dict["advanced"]["play_sound"]["order"]:
if not kktix_dict["played_sound_order"]:
play_sound_while_ordering(config_dict)
threading.Thread(target=util.launch_maxbot, args=(script_name,"", url, kktix_account, kktix_password,"","false",)).start()
#driver.quit()
#sys.exit()
kktix_dict["played_sound_order"] = True
is_event_page = False
if len(url.split('/'))>=7:
is_event_page = True
if is_event_page:
confirm_clicked = False
try:
submit = await tab.select("div.form-actions a.btn-primary")
await submit.click()
confirm_clicked = True
except Exception as exc:
print(exc)
if config_dict["advanced"]["headless"]:
if not kktix_dict["is_popup_checkout"]:
kktix_account = config_dict["advanced"]["kktix_account"]
kktix_password = config_dict["advanced"]["kktix_password_plaintext"].strip()
if kktix_password == "":
kktix_password = util.decryptMe(config_dict["advanced"]["kktix_password"])
if confirm_clicked:
domain_name = url.split('/')[2]
checkout_url = "https://%s/account/orders" % (domain_name)
print("搶票成功, 請前往該帳號訂單查看: %s" % (checkout_url))
webbrowser.open_new(checkout_url)
kktix_dict["is_popup_checkout"] = True
driver.quit()
sys.exit()
print("基本資料(或實名制)網址:", url)
if len(kktix_account) > 0:
print("搶票成功, 帳號:", kktix_account)
script_name = "chrome_tixcraft"
if config_dict["advanced"]["webdriver_type"] == CONST_WEBDRIVER_TYPE_NODRIVER:
script_name = "nodriver_tixcraft"
threading.Thread(target=util.launch_maxbot, args=(script_name,"", url, kktix_account, kktix_password,"","false",)).start()
#driver.quit()
#sys.exit()
is_event_page = False
if len(url.split('/'))>=7:
is_event_page = True
if is_event_page:
confirm_clicked = False
try:
submit = await tab.query_selector("div.form-actions a.btn-primary")
await submit.click()
confirm_clicked = True
except Exception as exc:
print(exc)
if confirm_clicked:
domain_name = url.split('/')[2]
checkout_url = "https://%s/account/orders" % (domain_name)
print("搶票成功, 請前往該帳號訂單查看: %s" % (checkout_url))
webbrowser.open_new(checkout_url)
kktix_dict["is_popup_checkout"] = True
is_quit_bot = True
else:
kktix_dict["is_popup_checkout"] = False
kktix_dict["played_sound_order"] = False
return kktix_dict
return kktix_dict, is_quit_bot
async def nodriver_tixcraft_home_close_window(tab):
accept_all_cookies_btn = None
try:
accept_all_cookies_btn = await tab.query_selector('#onetrust-accept-btn-handler')
if accept_all_cookies_btn:
accept_all_cookies_btn.click()
except Exception as exc:
#print(exc)
pass
async def nodriver_get_text_by_selector(tab, my_css_selector, attribute='innerHTML'):
div_text = ""
try:
div_element = await tab.query_selector(my_css_selector)
if div_element:
js_attr = await div_element.get_js_attributes()
if js_attr:
div_text = js_attr[attribute]
except Exception as exc:
print("find verify textbox fail")
pass
return div_text
async def nodriver_tixcraft_redirect(tab, url):
ret = False
game_name = ""
url_split = url.split("/")
if len(url_split) >= 6:
game_name = url_split[5]
if len(game_name) > 0:
if "/activity/detail/%s" % (game_name,) in url:
entry_url = url.replace("/activity/detail/","/activity/game/")
print("redirec to new url:", entry_url)
try:
await tab.get(entry_url)
ret = True
except Exception as exec1:
pass
return ret
async def nodriver_ticketmaster_promo(tab, config_dict, fail_list):
question_selector = '#promoBox'
return nodriver_tixcraft_input_check_code(tab, config_dict, fail_list, question_selector)
async def nodriver_tixcraft_verify(tab, config_dict, fail_list):
question_selector = '.zone-verify'
return nodriver_tixcraft_input_check_code(tab, config_dict, fail_list, question_selector)
async def nodriver_tixcraft_input_check_code(tab, config_dict, fail_list, question_selector):
show_debug_message = True # debug.
show_debug_message = False # online
if config_dict["advanced"]["verbose"]:
show_debug_message = True
answer_list = []
question_text = await nodriver_get_text_by_selector(tab, question_selector, 'innerText')
if len(question_text) > 0:
write_question_to_file(question_text)
answer_list = util.get_answer_list_from_user_guess_string(config_dict, CONST_MAXBOT_ANSWER_ONLINE_FILE)
if len(answer_list)==0:
if config_dict["advanced"]["auto_guess_options"]:
answer_list = util.guess_tixcraft_question(driver, question_text)
inferred_answer_string = ""
for answer_item in answer_list:
if not answer_item in fail_list:
inferred_answer_string = answer_item
break
if show_debug_message:
print("inferred_answer_string:", inferred_answer_string)
print("answer_list:", answer_list)
# PS: auto-focus() when empty inferred_answer_string with empty inputed text value.
input_text_css = "input[name='checkCode']"
next_step_button_css = ""
submit_by_enter = True
check_input_interval = 0.2
is_answer_sent, fail_list = fill_common_verify_form(driver, config_dict, inferred_answer_string, fail_list, input_text_css, next_step_button_css, submit_by_enter, check_input_interval)
return fail_list
async def nodriver_tixcraft_main(tab, url, config_dict, tixcraft_dict, ocr, Captcha_Browser):
await nodriver_tixcraft_home_close_window(tab)
# special case for same event re-open, redirect to user's homepage.
if 'https://tixcraft.com/' == url or 'https://tixcraft.com/activity' == url:
if "/ticket/area/" in config_dict["homepage"]:
if len(config_dict["homepage"].split('/'))==7:
try:
await tab.get(config_dict["homepage"])
except Exception as e:
pass
if "/activity/detail/" in url:
tixcraft_dict["start_time"] = time.time()
is_redirected = await nodriver_tixcraft_redirect(tab, url)
is_date_selected = False
if "/activity/game/" in url:
tixcraft_dict["start_time"] = time.time()
if config_dict["date_auto_select"]["enable"]:
domain_name = url.split('/')[2]
# TODO:
#is_date_selected = tixcraft_date_auto_select(driver, url, config_dict, domain_name)
pass
if '/artist/' in url and 'ticketmaster.com' in url:
tixcraft_dict["start_time"] = time.time()
if len(url.split('/'))==6:
if config_dict["date_auto_select"]["enable"]:
domain_name = url.split('/')[2]
# TODO:
#is_date_selected = ticketmaster_date_auto_select(driver, url, config_dict, domain_name)
pass
# choose area
if '/ticket/area/' in url:
domain_name = url.split('/')[2]
if config_dict["area_auto_select"]["enable"]:
if not 'ticketmaster' in domain_name:
# for tixcraft
# TODO:
#tixcraft_area_auto_select(driver, url, config_dict)
pass
tixcraft_dict["area_retry_count"]+=1
#print("count:", tixcraft_dict["area_retry_count"])
if tixcraft_dict["area_retry_count"] >= (60 * 15):
# Cool-down
tixcraft_dict["area_retry_count"] = 0
time.sleep(5)
else:
# area auto select is too difficult, skip in this version.
# TODO:
#tixcraft_dict["fail_promo_list"] = ticketmaster_promo(driver, config_dict, tixcraft_dict["fail_promo_list"])
#ticketmaster_assign_ticket_number(driver, config_dict)
pass
else:
tixcraft_dict["fail_promo_list"] = []
tixcraft_dict["area_retry_count"]=0
# https://ticketmaster.sg/ticket/check-captcha/23_blackpink/954/5/75
if '/ticket/check-captcha/' in url:
domain_name = url.split('/')[2]
# TODO:
#ticketmaster_captcha(driver, config_dict, ocr, Captcha_Browser, domain_name)
pass
if '/ticket/verify/' in url:
# TODO:
#tixcraft_dict["fail_list"] = tixcraft_verify(driver, config_dict, tixcraft_dict["fail_list"])
pass
else:
tixcraft_dict["fail_list"] = []
# main app, to select ticket number.
if '/ticket/ticket/' in url:
domain_name = url.split('/')[2]
# TODO:
#tixcraft_ticket_main(driver, config_dict, ocr, Captcha_Browser, domain_name)
tixcraft_dict["done_time"] = time.time()
if config_dict["advanced"]["play_sound"]["ticket"]:
if not tixcraft_dict["played_sound_ticket"]:
play_sound_while_ordering(config_dict)
tixcraft_dict["played_sound_ticket"] = True
else:
tixcraft_dict["played_sound_ticket"] = False
if '/ticket/order' in url:
tixcraft_dict["done_time"] = time.time()
is_quit_bot = False
if '/ticket/checkout' in url:
if not tixcraft_dict["start_time"] is None:
if not tixcraft_dict["done_time"] is None:
bot_elapsed_time = tixcraft_dict["done_time"] - tixcraft_dict["start_time"]
if tixcraft_dict["elapsed_time"] != bot_elapsed_time:
print("bot elapsed time:", "{:.3f}".format(bot_elapsed_time))
tixcraft_dict["elapsed_time"] = bot_elapsed_time
if config_dict["advanced"]["headless"]:
if not tixcraft_dict["is_popup_checkout"]:
domain_name = url.split('/')[2]
checkout_url = "https://%s/ticket/checkout" % (domain_name)
print("搶票成功, 請前往該帳號訂單查看: %s" % (checkout_url))
webbrowser.open_new(checkout_url)
tixcraft_dict["is_popup_checkout"] = True
is_quit_bot = True
if config_dict["advanced"]["play_sound"]["order"]:
if not tixcraft_dict["played_sound_order"]:
play_sound_while_ordering(config_dict)
tixcraft_dict["played_sound_order"] = True
else:
tixcraft_dict["is_popup_checkout"] = False
tixcraft_dict["played_sound_order"] = False
return tixcraft_dict, is_quit_bot
async def nodriver_ticketplus_account_sign_in(tab, config_dict):
print("nodriver_ticketplus_account_sign_in")
is_filled_form = False
is_submited = False
ticketplus_account = config_dict["advanced"]["ticketplus_account"]
ticketplus_password = config_dict["advanced"]["ticketplus_password_plaintext"].strip()
if ticketplus_password == "":
ticketplus_password = util.decryptMe(config_dict["advanced"]["ticketplus_password"])
# manually keyin verify code.
country_code = ""
try:
my_css_selector = 'input[placeholder="區碼"]'
el_country = await tab.query_selector(my_css_selector)
if el_country:
country_code = await el_country.apply('function (element) { return element.value; } ')
print("country_code", country_code)
except Exception as exc:
print(exc)
is_account_assigned = False
try:
my_css_selector = 'input[placeholder="手機號碼 *"]'
el_account = await tab.query_selector(my_css_selector)
if el_account:
await el_account.click()
await el_account.apply('function (element) {element.value = ""; } ')
await el_account.send_keys(ticketplus_account);
is_account_assigned = True
except Exception as exc:
print(exc)
if is_account_assigned:
try:
my_css_selector = 'input[type="password"]'
el_password = await tab.query_selector(my_css_selector)
if el_password:
print("ticketplus_password:", ticketplus_password)
await el_password.click()
await el_password.apply('function (element) {element.value = ""; } ')
await el_password.send_keys(ticketplus_password);
time.sleep(0.1)
is_filled_form = True
if country_code=="+886":
# only this case to auto sumbmit.
print("press enter")
await tab.send(cdp.input_.dispatch_key_event("keyDown", code="Enter", key="Enter", text="\r", windows_virtual_key_code=13))
await tab.send(cdp.input_.dispatch_key_event("keyUp", code="Enter", key="Enter", text="\r", windows_virtual_key_code=13))
time.sleep(1)
# PS: ticketplus country field may not located at your target country.
is_submited = True
except Exception as exc:
print(exc)
pass
return is_filled_form, is_submited
async def nodriver_ticketplus_is_signin(tab):
is_user_signin = False
try:
cookies = await tab.browser.cookies.get_all()
for cookie in cookies:
if cookie.name=='user':
if '%22account%22:%22' in cookie.value:
is_user_signin = True
cookies = None
except Exception as exc:
print(exc)
pass
return is_user_signin
async def nodriver_ticketplus_account_auto_fill(tab, config_dict):
global is_filled_ticketplus_singin_form
if not 'is_filled_ticketplus_singin_form' in globals():
is_filled_ticketplus_singin_form = False
# auto fill account info.
is_user_signin = False
if len(config_dict["advanced"]["ticketplus_account"]) > 0:
is_user_signin = await nodriver_ticketplus_is_signin(tab)
print("is_user_signin:", is_user_signin)
if not is_user_signin:
time.sleep(0.1)
if not is_filled_ticketplus_singin_form:
is_sign_in_btn_pressed = False
try:
# full screen mode.
my_css_selector = 'button.v-btn > span.v-btn__content > i.mdi-account'
sign_in_btn = await tab.query_selector(my_css_selector)
if sign_in_btn:
await sign_in_btn.click()
is_sign_in_btn_pressed = True
time.sleep(0.2)
except Exception as exc:
print(exc)
pass
print("is_sign_in_btn_pressed", is_sign_in_btn_pressed)
if not is_sign_in_btn_pressed:
#print("rwd mode")
action_btns = None
try:
my_css_selector = 'div.px-4.py-3.drawerItem.cursor-pointer'
action_btns = await tab.query_selector_all(my_css_selector)
except Exception as exc:
print(exc)
pass
if action_btns:
print("len:", len(action_btns))
if len(action_btns) >= 4:
try:
await action_btns[3].click()
except Exception as exc:
print(exc)
pass
is_filled_form, is_submited = await nodriver_ticketplus_account_sign_in(tab, config_dict)
if is_filled_form:
is_filled_ticketplus_singin_form = True
return is_user_signin
async def nodriver_ticketplus_main(tab, url, config_dict, ocr, Captcha_Browser, ticketplus_dict):
home_url = 'https://ticketplus.com.tw/'
is_user_signin = False
if home_url == url.lower():
if config_dict["ocr_captcha"]["enable"]:
domain_name = url.split('/')[2]
if not Captcha_Browser is None:
# TODO:
#Captcha_Browser.Set_cookies(driver.get_cookies())
Captcha_Browser.Set_Domain(domain_name)
is_user_signin = await nodriver_ticketplus_account_auto_fill(tab, config_dict)
if is_user_signin:
# only sign in on homepage.
if url != config_dict["homepage"]:
try:
await tab.get(config_dict["homepage"])
except Exception as e:
pass
# https://ticketplus.com.tw/activity/XXX
if '/activity/' in url.lower():
is_event_page = False
if len(url.split('/'))==5:
is_event_page = True
if is_event_page:
# TODO:
#is_button_pressed = ticketplus_accept_realname_card(driver)
#print("is accept button pressed:", is_button_pressed)
# TODO:
#is_button_pressed = ticketplus_accept_other_activity(driver)
#print("is accept button pressed:", is_button_pressed)
if config_dict["date_auto_select"]["enable"]:
# TODO:
#ticketplus_date_auto_select(driver, config_dict)
pass
#https://ticketplus.com.tw/order/XXX/OOO
if '/order/' in url.lower():
is_event_page = False
if len(url.split('/'))==6:
is_event_page = True
if is_event_page:
# TODO:
#is_button_pressed = ticketplus_accept_realname_card(driver)
#is_button_pressed = ticketplus_accept_order_fail(driver)
is_reloading = False
is_reload_at_webdriver = False
if not config_dict["browser"] in CONST_CHROME_FAMILY:
is_reload_at_webdriver = True
else:
if not config_dict["advanced"]["chrome_extension"]:
is_reload_at_webdriver = True
if is_reload_at_webdriver:
# move below code to chrome extension.
# TODO:
#is_reloading = ticketplus_order_auto_reload_coming_soon(driver)
pass
if not is_reloading:
# TODO:
# is_captcha_sent, ticketplus_dict = ticketplus_order(driver, config_dict, ocr, Captcha_Browser, ticketplus_dict)
pass
else:
ticketplus_dict["fail_list"]=[]
#https://ticketplus.com.tw/confirm/xx/oo
if '/confirm/' in url.lower() or '/confirmseat/' in url.lower():
is_event_page = False
if len(url.split('/'))==6:
is_event_page = True
if is_event_page:
#print("is_popup_confirm",ticketplus_dict["is_popup_confirm"])
if not ticketplus_dict["is_popup_confirm"]:
ticketplus_dict["is_popup_confirm"] = True
play_sound_while_ordering(config_dict)
# TODO:
#ticketplus_confirm(driver, config_dict)
else:
ticketplus_dict["is_popup_confirm"] = False
else:
ticketplus_dict["is_popup_confirm"] = False
return ticketplus_dict
def get_nodriver_browser_args():
@ -891,7 +1376,7 @@ def get_maxbot_plus_extension_path():
extension_path = "webdriver/Maxbotplus_1.0.0/"
if platform.system() == 'Windows':
extension_path = extension_path.replace("/","\\")
app_root = util.get_app_root()
config_filepath = os.path.join(app_root, extension_path)
#print("config_filepath:", config_filepath)
@ -911,7 +1396,8 @@ def get_extension_config(config_dict):
default_lang = "zh-TW"
no_sandbox=True
conf = Config(browser_args=get_nodriver_browser_args(), lang=default_lang, no_sandbox=no_sandbox, headless=config_dict["advanced"]["headless"])
conf.add_extension(get_maxbot_plus_extension_path())
if config_dict["advanced"]["chrome_extension"]:
conf.add_extension(get_maxbot_plus_extension_path())
return conf
async def nodrver_block_urls(tab, config_dict):
@ -937,8 +1423,7 @@ async def nodrver_block_urls(tab, config_dict):
,'*img.uniicreative.com/*'
,'*e2elog.fetnet.net*']
if True:
#if config_dict["advanced"]["hide_some_image"]:
if config_dict["advanced"]["hide_some_image"]:
NETWORK_BLOCKED_URLS.append('*.woff')
NETWORK_BLOCKED_URLS.append('*.woff2')
NETWORK_BLOCKED_URLS.append('*.ttf')
@ -959,8 +1444,7 @@ async def nodrver_block_urls(tab, config_dict):
NETWORK_BLOCKED_URLS.append('https://t.kfs.io/assets/icon-*.png')
NETWORK_BLOCKED_URLS.append('https://t.kfs.io/upload_images/*.jpg')
if False:
#if config_dict["advanced"]["block_facebook_network"]:
if config_dict["advanced"]["block_facebook_network"]:
NETWORK_BLOCKED_URLS.append('*facebook.com/*')
NETWORK_BLOCKED_URLS.append('*.fbcdn.net/*')
@ -981,6 +1465,12 @@ async def nodriver_resize_window(tab, config_dict):
await tab.set_window_size(left=position_left, top=30, width=int(size_array[0]), height=int(size_array[1]))
async def nodriver_current_url(tab):
is_quit_bot = False
exit_bot_error_strings = [
"server rejected WebSocket connection: HTTP 500",
"[Errno 61] Connect call failed ('127.0.0.1',",
]
url = ""
if tab:
url_dict = {}
@ -988,16 +1478,25 @@ async def nodriver_current_url(tab):
url_dict = await tab.js_dumps('window.location.href')
except Exception as exc:
print(exc)
pass
str_exc = ""
try:
str_exc = str(exc)
except Exception as exc2:
pass
if len(str_exc) > 0:
for each_error_string in exit_bot_error_strings:
if each_error_string in str_exc:
#print('quit bot by error:', each_error_string, driver)
is_quit_bot = True
url_array = []
if url_dict:
for k in url_dict:
for k in url_dict:
if k.isnumeric():
if "0" in url_dict[k]:
url_array.append(url_dict[k]["0"])
url = ''.join(url_array)
return url
return url, is_quit_bot
def nodriver_overwrite_prefs(conf, prefs_dict={}):
#print(conf.user_data_dir)
@ -1104,12 +1603,14 @@ async def main(args):
ocr = ddddocr.DdddOcr(show_ad=False, beta=config_dict["ocr_captcha"]["beta"])
Captcha_Browser = NonBrowser()
if len(config_dict["advanced"]["tixcraft_sid"]) > 1:
set_non_browser_cookies(driver, config_dict["homepage"], Captcha_Browser)
#set_non_browser_cookies(driver, config_dict["homepage"], Captcha_Browser)
pass
except Exception as exc:
print(exc)
pass
maxbot_last_reset_time = time.time()
is_quit_bot = False
while True:
time.sleep(0.05)
@ -1118,7 +1619,17 @@ async def main(args):
print("nodriver not accessible!")
break
url = await nodriver_current_url(tab)
if not is_quit_bot:
url, is_quit_bot = await nodriver_current_url(tab)
if is_quit_bot:
try:
await driver.stop()
driver = None
except Exception as e:
pass
break
if url is None:
continue
else:
@ -1146,7 +1657,7 @@ async def main(args):
# for kktix.cc and kktix.com
if 'kktix.c' in url:
kktix_dict = await nodriver_kktix_main(tab, url, config_dict, kktix_dict)
kktix_dict, is_quit_bot = await nodriver_kktix_main(tab, url, config_dict, kktix_dict)
pass
tixcraft_family = False
@ -1160,7 +1671,7 @@ async def main(args):
tixcraft_family = True
if tixcraft_family:
#tixcraft_dict = tixcraft_main(driver, url, config_dict, tixcraft_dict, ocr, Captcha_Browser)
tixcraft_dict, is_quit_bot = await nodriver_tixcraft_main(driver, url, config_dict, tixcraft_dict, ocr, Captcha_Browser)
pass
if 'famiticket.com' in url:
@ -1186,7 +1697,7 @@ async def main(args):
pass
if 'ticketplus.com' in url:
#ticketplus_dict = ticketplus_main(driver, url, config_dict, ocr, Captcha_Browser, ticketplus_dict)
ticketplus_dict = await nodriver_ticketplus_main(tab, url, config_dict, ocr, Captcha_Browser, ticketplus_dict)
pass
if 'urbtix.hk' in url:
@ -1216,8 +1727,7 @@ async def main(args):
if facebook_password == "":
facebook_password = util.decryptMe(config_dict["advanced"]["facebook_password"])
if len(facebook_account) > 4:
#facebook_login(driver, facebook_account, facebook_password)
pass
await nodriver_facebook_login(tab, facebook_account, facebook_password)
def cli():
@ -1276,3 +1786,4 @@ def cli():
if __name__ == "__main__":
cli()

View File

@ -160,7 +160,7 @@ chrome.runtime.onMessage.addListener((request, sender, sendResponse) => {
console.log(typeof crypto_decrypt);
let answer="";
if(typeof crypto_decrypt === 'function') {
answer=crypto_decrypt(request_json.data.text,request_json.data.KEY,request_json.data.IV);
answer=crypto_decrypt(request_json.data.text,request_json.data.key,request_json.data.iv);
}
result_json={"answer": answer};
sendResponse(result_json);
@ -26532,7 +26532,7 @@ crypto_decrypt = function decrypt(text, KEY, IV) {
let decipher = crypto.createDecipheriv('aes-128-cbc', KEY, IV);
let decrypted = decipher.update(encryptedText);
decrypted = decipher.final();
return decrypted.toString()
return decrypted.toString();
};
}).call(this)}).call(this,require("buffer").Buffer)

View File

@ -17,24 +17,24 @@ async function ajax_return_done(data, event_id)
}
async function wait_function_ready() {
const currentUrl = window.location.href;
const currentUrl = window.location.href;
const event_id = currentUrl.split('/')[4];
if(event_id){
let api_url = "https://apis.ticketplus.com.tw/config/api/v1/getS3?path=event/"+event_id+"/sessions.json";
//console.log("calling api:" + api_url);
$.get( api_url, function() {
//alert( "success" );
})
.done(function(data) {
//alert( "second success" );
ajax_return_done(data, event_id);
})
.fail(function() {
//alert( "error" );
})
.always(function() {
//alert( "finished" );
});
})
.done(function(data) {
//alert( "second success" );
ajax_return_done(data, event_id);
})
.fail(function() {
//alert( "error" );
})
.always(function() {
//alert( "finished" );
});
}
}

View File

@ -18,6 +18,7 @@ function get_event_status_entry(real_event_id, real_session_id) {
function ajax_return_done(data, real_event_id, real_session_id) {
//console.log("ajax return done")
let reload=false;
//console.log(data.result);
if(data.result.session[0].status=="pending" || data.result.session[0].status=="soldout" || data.result.session[0].status=="unavailable") {
reload=true;
}
@ -28,88 +29,96 @@ function ajax_return_done(data, real_event_id, real_session_id) {
if(settings) {
auto_reload_page_interval = settings.advanced.auto_reload_page_interval;
}
const rootElement = document.documentElement;
rootElement.remove();
// PS: ticketplus not able to release memory soon.
if (auto_reload_page_interval < 0.7) {
auto_reload_page_interval = 0.7;
}
//const rootElement = document.documentElement;
//rootElement.remove();
if(auto_reload_page_interval == 0) {
//console.log('Start to reload now.');
location.reload();
} else {
console.log('We are going to reload after few seconeds.');
setTimeout(function () {
location.reload();
//location.reload();
location.href=location.href;
}, auto_reload_page_interval * 1000);
}
}
if(data.result.session[0].status=="onsale") {
//console.log("bingo ^_^, onsale")
//$(function() {
// console.log("hello ^_^, onsale");
//let $captcha = $("input[required='required']");
//console.log($captcha.length);
//});
} else {
if(data.result.session[0].status=="onsale") {
//console.log("bingo ^_^, onsale")
//$(function() {
// console.log("hello ^_^, onsale");
//let $captcha = $("input[required='required']");
//console.log($captcha.length);
//});
}
}
}
function get_event_status_core(real_event_id, real_session_id) {
let timestamp = new Date().getTime();
let timestamp = new Date().getTime();
timestamp = (timestamp/1000).toFixed()*1000;
//console.log(timestamp);
let api_url = "https://apis.ticketplus.com.tw/config/api/v1/get?eventId="+ real_event_id +"&sessionId="+real_session_id+"&_="+timestamp;
//console.log("calling api:" + api_url);
$.get( api_url, function() {
//alert( "success" );
})
.done(function(data) {
//alert( "second success" );
ajax_return_done(data, real_event_id, real_session_id);
})
.fail(function() {
//alert( "error" );
})
.always(function() {
//alert( "finished" );
});
//alert( "success" );
})
.done(function(data) {
//alert( "second success" );
ajax_return_done(data, real_event_id, real_session_id);
})
.fail(function() {
//alert( "error" );
})
.always(function() {
//alert( "finished" );
});
}
async function decrypt_text(event_id, session_id) {
//console.log("start to decrypt_text");
const KEY = 'ILOVEFETIXFETIX!';
let KEY = 'ILOVEFETIXFETIX!';
const IV = '!@#$FETIXEVENTiv';
let bundle = {
action: 'decrypt',
data: {
'KEY':KEY,
'IV':IV,
'key': KEY,
'iv': IV,
'text': event_id
}
};
let bundle_string = JSON.stringify(bundle);
const event_answer = await chrome.runtime.sendMessage(bundle);
//console.log(event_answer);
const real_event_id = event_answer.answer;
//console.log(real_event_id);
bundle = {
action: 'decrypt',
data: {
'KEY':KEY,
'IV':IV,
'text': session_id
}
};
let session_answer = await chrome.runtime.sendMessage(bundle);
let real_session_id = session_answer.answer;
//console.log(real_session_id);
get_event_status_entry(real_event_id, real_session_id);
if(event_answer) {
const real_event_id = event_answer.answer;
//console.log(real_event_id);
bundle = {
action: 'decrypt',
data: {
'key': KEY,
'iv': IV,
'text': session_id
}
};
let session_answer = await chrome.runtime.sendMessage(bundle);
let real_session_id = session_answer.answer;
//console.log(real_session_id);
get_event_status_entry(real_event_id, real_session_id);
} else {
console.log("decrypt fail");
}
}
async function wait_function_ready() {
const currentUrl = window.location.href;
const currentUrl = window.location.href;
const event_id = currentUrl.split('/')[4];
const session_id = currentUrl.split('/')[5];
//console.log(event_id);