mirror of
https://github.com/LAX18/afip_last_stand.git
synced 2026-06-03 15:57:48 -09:00
539 lines
21 KiB
Python
539 lines
21 KiB
Python
import math
|
|
import requests
|
|
import json
|
|
import time
|
|
import sys, os
|
|
import random
|
|
from io import BytesIO
|
|
from http import HTTPStatus
|
|
from websocket import create_connection # type: ignore
|
|
from websocket._exceptions import WebSocketConnectionClosedException # type: ignore
|
|
from PIL import Image
|
|
|
|
from loguru import logger
|
|
from bs4 import BeautifulSoup
|
|
|
|
from src.mappings import ColorMapper
|
|
from src import utils
|
|
|
|
class PlaceClient:
|
|
def __init__(self, config_path):
|
|
self.logger = logger
|
|
|
|
# Data
|
|
self.json_data = utils.get_json_data(self, config_path)
|
|
logger.debug("{}", self.json_data)
|
|
self.pixel_x_start: int = self.json_data["image_start_coords"][0]
|
|
self.pixel_y_start: int = self.json_data["image_start_coords"][1]
|
|
|
|
self.rgb_colors_array = ColorMapper.generate_rgb_colors_array()
|
|
self.legacy_transparency = True
|
|
|
|
self.access_token = None
|
|
self.access_token_expiry_timestamp = None
|
|
|
|
# Image information
|
|
self.pix = None
|
|
self.image_size = None
|
|
self.image_path = self.json_data.get("image_path", "images/image.png")
|
|
utils.load_image(self)
|
|
|
|
|
|
def set_pixel_and_check_ratelimit(
|
|
self,
|
|
access_token,
|
|
pixel_x_start,
|
|
pixel_y_start,
|
|
name,
|
|
color_index_in=18,
|
|
canvas_index=0,
|
|
):
|
|
# canvas structure:
|
|
# 0 | 1 | 2
|
|
# 3 | 4 | 5
|
|
logger.warning(
|
|
"Attempting to place {} pixel at {}, {}",
|
|
ColorMapper.color_id_to_name(color_index_in),
|
|
pixel_x_start + (1000 * (canvas_index % 3)),
|
|
pixel_y_start + (1000 * (canvas_index // 3)),
|
|
)
|
|
|
|
url = "https://gql-realtime-2.reddit.com/query"
|
|
|
|
payload = json.dumps(
|
|
{
|
|
"operationName": "setPixel",
|
|
"variables": {
|
|
"input": {
|
|
"actionName": "r/replace:set_pixel",
|
|
"PixelMessageData": {
|
|
"coordinate": {"x": pixel_x_start, "y": pixel_y_start},
|
|
"colorIndex": color_index_in,
|
|
"canvasIndex": canvas_index,
|
|
},
|
|
}
|
|
},
|
|
"query": "mutation setPixel($input: ActInput!) {\n act(input: $input) {\n data {\n ... on BasicMessage {\n id\n data {\n ... on GetUserCooldownResponseMessageData {\n nextAvailablePixelTimestamp\n __typename\n }\n ... on SetPixelResponseMessageData {\n timestamp\n __typename\n }\n __typename\n }\n __typename\n }\n __typename\n }\n __typename\n }\n}\n",
|
|
}
|
|
)
|
|
headers = {
|
|
"origin": "https://garlic-bread.reddit.com",
|
|
"referer": "https://garlic-bread.reddit.com/",
|
|
"apollographql-client-name": "mona-lisa",
|
|
"Authorization": "Bearer " + access_token,
|
|
"Content-Type": "application/json",
|
|
}
|
|
response = requests.post(
|
|
url,
|
|
headers=headers,
|
|
data=payload,
|
|
)
|
|
logger.debug(
|
|
"Received response: {}", response.text
|
|
)
|
|
|
|
# There are 2 different JSON keys for responses to get the next timestamp.
|
|
# If we don't get data, it means we've been rate limited.
|
|
# If we do, a pixel has been successfully placed.
|
|
if response.json()["data"] is None:
|
|
logger.debug(response.json().get("errors"))
|
|
|
|
try:
|
|
waitTime = math.floor(
|
|
response.json()["errors"][0]["extensions"]["nextAvailablePixelTs"]
|
|
)
|
|
except KeyError as e:
|
|
print("Received bad response. Trying again.")
|
|
return 0
|
|
|
|
logger.error(
|
|
"Failed placing pixel: rate limited",
|
|
)
|
|
else:
|
|
waitTime = math.floor(
|
|
response.json()["data"]["act"]["data"][0]["data"][
|
|
"nextAvailablePixelTimestamp"
|
|
]
|
|
)
|
|
logger.success(
|
|
"Succeeded placing pixel at {}, {}",
|
|
(pixel_x_start + (1000 * (canvas_index % 3))-1500), (pixel_y_start + (1000 * (canvas_index // 3))-1000)
|
|
)
|
|
|
|
# Reddit returns time in ms and we need seconds, so divide by 1000
|
|
return waitTime / 1000
|
|
|
|
def get_board(self, access_token):
|
|
self.update_image()
|
|
utils.load_image(self)
|
|
logger.debug("Connecting and obtaining board images")
|
|
while True:
|
|
try:
|
|
ws = create_connection(
|
|
"wss://gql-realtime-2.reddit.com/query",
|
|
origin="https://garlic-bread.reddit.com",
|
|
)
|
|
break
|
|
except Exception:
|
|
logger.error(
|
|
"Failed to connect to websocket, trying again in 30 seconds..."
|
|
)
|
|
time.sleep(30)
|
|
|
|
ws.send(
|
|
json.dumps(
|
|
{
|
|
"type": "connection_init",
|
|
"payload": {"Authorization": "Bearer " + access_token},
|
|
}
|
|
)
|
|
)
|
|
while True:
|
|
try:
|
|
msg = ws.recv()
|
|
except WebSocketConnectionClosedException as e:
|
|
logger.error(e)
|
|
continue
|
|
if msg is None:
|
|
logger.error("Reddit failed to acknowledge connection_init")
|
|
exit()
|
|
if msg.startswith('{"type":"connection_ack"}'):
|
|
logger.debug("Connected to WebSocket server")
|
|
break
|
|
logger.debug("Obtaining Canvas information")
|
|
ws.send(
|
|
json.dumps(
|
|
{
|
|
"id": "1",
|
|
"type": "start",
|
|
"payload": {
|
|
"variables": {
|
|
"input": {
|
|
"channel": {
|
|
"teamOwner": "GARLICBREAD",
|
|
"category": "CONFIG",
|
|
}
|
|
}
|
|
},
|
|
"extensions": {},
|
|
"operationName": "configuration",
|
|
"query": "subscription configuration($input: SubscribeInput!) {\n subscribe(input: $input) {\n id\n ... on BasicMessage {\n data {\n __typename\n ... on ConfigurationMessageData {\n colorPalette {\n colors {\n hex\n index\n __typename\n }\n __typename\n }\n canvasConfigurations {\n index\n dx\n dy\n __typename\n }\n canvasWidth\n canvasHeight\n __typename\n }\n }\n __typename\n }\n __typename\n }\n}\n",
|
|
},
|
|
}
|
|
)
|
|
)
|
|
|
|
while True:
|
|
canvas_payload = json.loads(ws.recv())
|
|
if canvas_payload["type"] == "data":
|
|
canvas_details = canvas_payload["payload"]["data"]["subscribe"]["data"]
|
|
logger.debug("Canvas config: {}", canvas_payload)
|
|
break
|
|
|
|
canvas_sockets = []
|
|
|
|
canvas_count = len(canvas_details["canvasConfigurations"])
|
|
|
|
for i in range(0, canvas_count):
|
|
canvas_sockets.append(2 + i)
|
|
logger.debug("Creating canvas socket {}", canvas_sockets[i])
|
|
|
|
ws.send(
|
|
json.dumps(
|
|
{
|
|
"id": str(2 + i),
|
|
"type": "start",
|
|
"payload": {
|
|
"variables": {
|
|
"input": {
|
|
"channel": {
|
|
"teamOwner": "GARLICBREAD",
|
|
"category": "CANVAS",
|
|
"tag": str(i),
|
|
}
|
|
}
|
|
},
|
|
"extensions": {},
|
|
"operationName": "replace",
|
|
"query": "subscription replace($input: SubscribeInput!) {\n subscribe(input: $input) {\n id\n ... on BasicMessage {\n data {\n __typename\n ... on FullFrameMessageData {\n __typename\n name\n timestamp\n }\n ... on DiffFrameMessageData {\n __typename\n name\n currentTimestamp\n previousTimestamp\n }\n }\n __typename\n }\n __typename\n }\n}\n",
|
|
},
|
|
}
|
|
)
|
|
)
|
|
|
|
imgs = []
|
|
logger.debug("A total of {} canvas sockets opened", len(canvas_sockets))
|
|
|
|
while len(canvas_sockets) > 0:
|
|
temp = json.loads(ws.recv())
|
|
logger.debug("Waiting for WebSocket message")
|
|
|
|
if temp["type"] == "data":
|
|
logger.debug("Received WebSocket data type message")
|
|
msg = temp["payload"]["data"]["subscribe"]
|
|
|
|
if msg["data"]["__typename"] == "FullFrameMessageData":
|
|
logger.debug("Received full frame message")
|
|
img_id = int(temp["id"])
|
|
logger.debug("Image ID: {}", img_id)
|
|
|
|
if img_id in canvas_sockets:
|
|
logger.debug("Getting image: {}", msg["data"]["name"])
|
|
imgs.append(
|
|
[
|
|
img_id,
|
|
Image.open(
|
|
BytesIO(
|
|
requests.get(
|
|
msg["data"]["name"],
|
|
stream=True,
|
|
).content
|
|
)
|
|
),
|
|
]
|
|
)
|
|
canvas_sockets.remove(img_id)
|
|
logger.debug(
|
|
"Canvas sockets remaining: {}", len(canvas_sockets)
|
|
)
|
|
|
|
for i in range(0, canvas_count - 1):
|
|
ws.send(json.dumps({"id": str(2 + i), "type": "stop"}))
|
|
|
|
ws.close()
|
|
|
|
new_img_width = (
|
|
max(map(lambda x: x["dx"], canvas_details["canvasConfigurations"]))
|
|
+ canvas_details["canvasWidth"]
|
|
)
|
|
logger.debug("New image width: {}", new_img_width)
|
|
|
|
new_img_height = (
|
|
max(map(lambda x: x["dy"], canvas_details["canvasConfigurations"]))
|
|
+ canvas_details["canvasHeight"]
|
|
)
|
|
logger.debug("New image height: {}", new_img_height)
|
|
|
|
new_img = Image.new("RGB", (new_img_width, new_img_height))
|
|
|
|
for idx, img in enumerate(sorted(imgs, key=lambda x: x[0])):
|
|
logger.debug("Adding image (ID {}): {}", img[0], img[1])
|
|
dx_offset = int(canvas_details["canvasConfigurations"][idx]["dx"])
|
|
dy_offset = int(canvas_details["canvasConfigurations"][idx]["dy"])
|
|
new_img.paste(img[1], (dx_offset, dy_offset))
|
|
|
|
return new_img
|
|
|
|
def get_unset_pixel(self):
|
|
originalX = x = random.randint(0, self.image_size[0]-1)
|
|
originalY = y = random.randint(0, self.image_size[1]-1)
|
|
loopedOnce = False
|
|
imgOutdated = True
|
|
|
|
while True:
|
|
if x >= self.image_size[0]:
|
|
y += 1
|
|
x = 0
|
|
|
|
if y >= self.image_size[1]:
|
|
y = 0
|
|
|
|
if x == originalX and y == originalY and loopedOnce:
|
|
logger.info(
|
|
"All pixels correct, trying again in 10 seconds... ",
|
|
)
|
|
time.sleep(10)
|
|
imgOutdated = True
|
|
|
|
|
|
if imgOutdated:
|
|
boarding = self.get_board(self.access_token)
|
|
pix2 = boarding.convert("RGB").load()
|
|
imgOutdated = False
|
|
|
|
logger.debug("{}, {}", x + self.pixel_x_start, y + self.pixel_y_start)
|
|
logger.debug(
|
|
"{}, {}, boarding, {}, {}", x, y, self.image_size[0], self.image_size[1]
|
|
)
|
|
|
|
target_rgb = self.pix[x, y]
|
|
|
|
new_rgb = ColorMapper.closest_color(
|
|
target_rgb, self.rgb_colors_array, self.legacy_transparency
|
|
)
|
|
|
|
if pix2[x + self.pixel_x_start, y + self.pixel_y_start] != new_rgb:
|
|
logger.debug(
|
|
"{}, {}, {}, {}",
|
|
pix2[x + self.pixel_x_start, y + self.pixel_y_start],
|
|
new_rgb,
|
|
new_rgb != (69, 42, 0),
|
|
pix2[x, y] != new_rgb,
|
|
)
|
|
|
|
# (69, 42, 0) is a special color reserved for transparency.
|
|
if new_rgb != (69, 42, 0):
|
|
logger.debug(
|
|
"Replacing {} pixel at: {},{} with {} color",
|
|
pix2[x + self.pixel_x_start, y + self.pixel_y_start],
|
|
x + self.pixel_x_start,
|
|
y + self.pixel_y_start,
|
|
new_rgb,
|
|
)
|
|
break
|
|
else:
|
|
logger.info(
|
|
"Transparent Pixel at {}, {} skipped",
|
|
x + self.pixel_x_start,
|
|
y + self.pixel_y_start,
|
|
)
|
|
x += 1
|
|
loopedOnce = True
|
|
return x, y, new_rgb
|
|
|
|
|
|
def task(self, name, passw):
|
|
self.name = name
|
|
self.passw = passw
|
|
repeat_forever = True
|
|
while True:
|
|
# Timing shit
|
|
pixel_place_frequency = 315 + random.random()*60
|
|
|
|
current_time = math.floor(time.time())
|
|
next_placement_time = current_time + pixel_place_frequency
|
|
if self.access_token is None:
|
|
next_placement_time = current_time
|
|
|
|
while True:
|
|
current_timestamp = math.floor(time.time())
|
|
if self.access_token_expiry_timestamp is None or current_timestamp >= self.access_token_expiry_timestamp:
|
|
logger.info(
|
|
"User {}: Refreshing access token", name
|
|
)
|
|
|
|
username = name
|
|
password = passw
|
|
while True:
|
|
try:
|
|
client = requests.Session()
|
|
|
|
client.headers.update(
|
|
{
|
|
"User-Agent": f"{utils.select_user_agent(self)}",
|
|
"Origin": "https://www.reddit.com/",
|
|
"Sec-Fetch-Dest": "empty",
|
|
"Sec-Fetch-Mode": "cors",
|
|
"Sec-Fetch-Site": "same-origin"
|
|
}
|
|
)
|
|
|
|
r = client.get(
|
|
"https://www.reddit.com/login",
|
|
)
|
|
login_get_soup = BeautifulSoup(r.content, "html.parser")
|
|
csrf_token = login_get_soup.find(
|
|
"input", {"name": "csrf_token"}
|
|
)["value"]
|
|
data = {
|
|
"username": username,
|
|
"password": password,
|
|
"dest": "https://new.reddit.com/",
|
|
"csrf_token": csrf_token,
|
|
}
|
|
|
|
r = client.post(
|
|
"https://www.reddit.com/login",
|
|
data=data,
|
|
)
|
|
break
|
|
except Exception as e:
|
|
logger.error(e)
|
|
logger.error(
|
|
"Failed to connect to websocket, trying again in 30 seconds..."
|
|
)
|
|
time.sleep(30)
|
|
if r.status_code != HTTPStatus.OK.value:
|
|
# password is probably invalid
|
|
logger.exception("{} - Authorization failed!", username)
|
|
logger.debug("response: {} - {}", r.status_code, r.text)
|
|
return
|
|
else:
|
|
logger.success("{} - Authorization successful!", username)
|
|
logger.info("Obtaining access token...")
|
|
r = client.get(
|
|
"https://new.reddit.com/",
|
|
)
|
|
data_str = (
|
|
BeautifulSoup(r.content, features="html.parser")
|
|
.find("script", {"id": "data"})
|
|
.contents[0][len("window.__r = ") : -1]
|
|
)
|
|
data = json.loads(data_str)
|
|
response_data = data["user"]["session"]
|
|
|
|
if "error" in response_data:
|
|
logger.info(
|
|
"An error occured. Make sure you have the correct credentials. Response data: {}",
|
|
response_data,
|
|
)
|
|
exit()
|
|
|
|
self.access_token = response_data["accessToken"]
|
|
access_token_expires_in_seconds = response_data[
|
|
"expiresIn"
|
|
] # this is usually "3600"
|
|
|
|
self.access_token_expiry_timestamp = current_timestamp + int(access_token_expires_in_seconds)
|
|
logger.info(
|
|
"Received new access token: {}************",
|
|
self.access_token,
|
|
)
|
|
|
|
if self.access_token is not None and (
|
|
current_timestamp >= next_placement_time
|
|
):
|
|
current_x, current_y, new_rgb = self.get_unset_pixel()
|
|
new_rgb_hex = ColorMapper.rgb_to_hex(new_rgb)
|
|
pixel_color_index = ColorMapper.COLOR_MAP[new_rgb_hex]
|
|
|
|
canvas = 0
|
|
pixel_x_start = self.pixel_x_start + current_x
|
|
pixel_y_start = self.pixel_y_start + current_y
|
|
while pixel_x_start > 999:
|
|
pixel_x_start -= 1000
|
|
canvas += 1
|
|
while pixel_y_start > 999:
|
|
pixel_y_start -= 1000
|
|
canvas += 3
|
|
|
|
# draw the pixel onto r/place
|
|
next_placement_time = self.set_pixel_and_check_ratelimit(
|
|
self.access_token,
|
|
pixel_x_start,
|
|
pixel_y_start,
|
|
name,
|
|
pixel_color_index,
|
|
canvas,
|
|
)
|
|
|
|
time_until_next_draw = next_placement_time - current_timestamp
|
|
|
|
# If next_pixel_placement_time (returned by place_pixel_and_check_ratelimit)
|
|
# is too large, user is likely permabanned
|
|
if time_until_next_draw > 10000:
|
|
logger.warning(
|
|
"CANCELLED :: Rate-Limit Banned"
|
|
)
|
|
repeat_forever = False
|
|
break
|
|
|
|
print(f"Time until next place: {time_until_next_draw}", end='\r', flush=True)
|
|
|
|
|
|
if not repeat_forever:
|
|
break
|
|
|
|
def update_image(self):
|
|
json_response = requests.get("https://us0.co/config.json")
|
|
if json_response.status_code == 200: #CHECK STATUS OF URL
|
|
try:
|
|
self.json_data = json_response.json() #PARSE JSON RESPONSE
|
|
|
|
|
|
except json.JSONDecodeError:
|
|
print("Error decoding JSON response.")
|
|
else:
|
|
print("Failed to fetch data. Status code:", json_response.status_code)
|
|
|
|
image_response = requests.get("https://us0.co/image.png")
|
|
if image_response.status_code == 200:
|
|
filename = 'image.png'
|
|
image_save_path = os.path.join("./", filename) #SET SAVE PATH
|
|
|
|
#SAVE IMAGE TO PATH
|
|
with open(image_save_path, 'wb') as f:
|
|
f.write(image_response.content)
|
|
|
|
print(f"Image saved successfully to {image_save_path}")
|
|
else:
|
|
print(f"Failed to download image. Status code: {image_response.status_code}")
|
|
|
|
|
|
def main():
|
|
logger.remove()
|
|
logger.add(sys.stderr, level="INFO")
|
|
client = PlaceClient('config.json')
|
|
if ('AFIP_USER' in os.environ):
|
|
user = os.environ['AFIP_USER']
|
|
else:
|
|
user = input("Reddit username: ")
|
|
if ('AFIP_PASS' in os.environ):
|
|
passw = os.environ['AFIP_PASS']
|
|
else:
|
|
passw = input("Password: ")
|
|
client.task(user, passw)
|
|
|
|
if __name__ == "__main__":
|
|
main()
|