Files
afip_last_stand/main.py
T

539 lines
21 KiB
Python

import math
import requests
import json
import time
import sys, os
import random
from io import BytesIO
from http import HTTPStatus
from websocket import create_connection # type: ignore
from websocket._exceptions import WebSocketConnectionClosedException # type: ignore
from PIL import Image
from loguru import logger
from bs4 import BeautifulSoup
from src.mappings import ColorMapper
from src import utils
class PlaceClient:
def __init__(self, config_path):
self.logger = logger
# Data
self.json_data = utils.get_json_data(self, config_path)
logger.debug("{}", self.json_data)
self.pixel_x_start: int = self.json_data["image_start_coords"][0]
self.pixel_y_start: int = self.json_data["image_start_coords"][1]
self.rgb_colors_array = ColorMapper.generate_rgb_colors_array()
self.legacy_transparency = True
self.access_token = None
self.access_token_expiry_timestamp = None
# Image information
self.pix = None
self.image_size = None
self.image_path = self.json_data.get("image_path", "images/image.png")
utils.load_image(self)
def set_pixel_and_check_ratelimit(
self,
access_token,
pixel_x_start,
pixel_y_start,
name,
color_index_in=18,
canvas_index=0,
):
# canvas structure:
# 0 | 1 | 2
# 3 | 4 | 5
logger.warning(
"Attempting to place {} pixel at {}, {}",
ColorMapper.color_id_to_name(color_index_in),
pixel_x_start + (1000 * (canvas_index % 3)),
pixel_y_start + (1000 * (canvas_index // 3)),
)
url = "https://gql-realtime-2.reddit.com/query"
payload = json.dumps(
{
"operationName": "setPixel",
"variables": {
"input": {
"actionName": "r/replace:set_pixel",
"PixelMessageData": {
"coordinate": {"x": pixel_x_start, "y": pixel_y_start},
"colorIndex": color_index_in,
"canvasIndex": canvas_index,
},
}
},
"query": "mutation setPixel($input: ActInput!) {\n act(input: $input) {\n data {\n ... on BasicMessage {\n id\n data {\n ... on GetUserCooldownResponseMessageData {\n nextAvailablePixelTimestamp\n __typename\n }\n ... on SetPixelResponseMessageData {\n timestamp\n __typename\n }\n __typename\n }\n __typename\n }\n __typename\n }\n __typename\n }\n}\n",
}
)
headers = {
"origin": "https://garlic-bread.reddit.com",
"referer": "https://garlic-bread.reddit.com/",
"apollographql-client-name": "mona-lisa",
"Authorization": "Bearer " + access_token,
"Content-Type": "application/json",
}
response = requests.post(
url,
headers=headers,
data=payload,
)
logger.debug(
"Received response: {}", response.text
)
# There are 2 different JSON keys for responses to get the next timestamp.
# If we don't get data, it means we've been rate limited.
# If we do, a pixel has been successfully placed.
if response.json()["data"] is None:
logger.debug(response.json().get("errors"))
try:
waitTime = math.floor(
response.json()["errors"][0]["extensions"]["nextAvailablePixelTs"]
)
except KeyError as e:
print("Received bad response. Trying again.")
return 0
logger.error(
"Failed placing pixel: rate limited",
)
else:
waitTime = math.floor(
response.json()["data"]["act"]["data"][0]["data"][
"nextAvailablePixelTimestamp"
]
)
logger.success(
"Succeeded placing pixel at {}, {}",
(pixel_x_start + (1000 * (canvas_index % 3))-1500), (pixel_y_start + (1000 * (canvas_index // 3))-1000)
)
# Reddit returns time in ms and we need seconds, so divide by 1000
return waitTime / 1000
def get_board(self, access_token):
self.update_image()
utils.load_image(self)
logger.debug("Connecting and obtaining board images")
while True:
try:
ws = create_connection(
"wss://gql-realtime-2.reddit.com/query",
origin="https://garlic-bread.reddit.com",
)
break
except Exception:
logger.error(
"Failed to connect to websocket, trying again in 30 seconds..."
)
time.sleep(30)
ws.send(
json.dumps(
{
"type": "connection_init",
"payload": {"Authorization": "Bearer " + access_token},
}
)
)
while True:
try:
msg = ws.recv()
except WebSocketConnectionClosedException as e:
logger.error(e)
continue
if msg is None:
logger.error("Reddit failed to acknowledge connection_init")
exit()
if msg.startswith('{"type":"connection_ack"}'):
logger.debug("Connected to WebSocket server")
break
logger.debug("Obtaining Canvas information")
ws.send(
json.dumps(
{
"id": "1",
"type": "start",
"payload": {
"variables": {
"input": {
"channel": {
"teamOwner": "GARLICBREAD",
"category": "CONFIG",
}
}
},
"extensions": {},
"operationName": "configuration",
"query": "subscription configuration($input: SubscribeInput!) {\n subscribe(input: $input) {\n id\n ... on BasicMessage {\n data {\n __typename\n ... on ConfigurationMessageData {\n colorPalette {\n colors {\n hex\n index\n __typename\n }\n __typename\n }\n canvasConfigurations {\n index\n dx\n dy\n __typename\n }\n canvasWidth\n canvasHeight\n __typename\n }\n }\n __typename\n }\n __typename\n }\n}\n",
},
}
)
)
while True:
canvas_payload = json.loads(ws.recv())
if canvas_payload["type"] == "data":
canvas_details = canvas_payload["payload"]["data"]["subscribe"]["data"]
logger.debug("Canvas config: {}", canvas_payload)
break
canvas_sockets = []
canvas_count = len(canvas_details["canvasConfigurations"])
for i in range(0, canvas_count):
canvas_sockets.append(2 + i)
logger.debug("Creating canvas socket {}", canvas_sockets[i])
ws.send(
json.dumps(
{
"id": str(2 + i),
"type": "start",
"payload": {
"variables": {
"input": {
"channel": {
"teamOwner": "GARLICBREAD",
"category": "CANVAS",
"tag": str(i),
}
}
},
"extensions": {},
"operationName": "replace",
"query": "subscription replace($input: SubscribeInput!) {\n subscribe(input: $input) {\n id\n ... on BasicMessage {\n data {\n __typename\n ... on FullFrameMessageData {\n __typename\n name\n timestamp\n }\n ... on DiffFrameMessageData {\n __typename\n name\n currentTimestamp\n previousTimestamp\n }\n }\n __typename\n }\n __typename\n }\n}\n",
},
}
)
)
imgs = []
logger.debug("A total of {} canvas sockets opened", len(canvas_sockets))
while len(canvas_sockets) > 0:
temp = json.loads(ws.recv())
logger.debug("Waiting for WebSocket message")
if temp["type"] == "data":
logger.debug("Received WebSocket data type message")
msg = temp["payload"]["data"]["subscribe"]
if msg["data"]["__typename"] == "FullFrameMessageData":
logger.debug("Received full frame message")
img_id = int(temp["id"])
logger.debug("Image ID: {}", img_id)
if img_id in canvas_sockets:
logger.debug("Getting image: {}", msg["data"]["name"])
imgs.append(
[
img_id,
Image.open(
BytesIO(
requests.get(
msg["data"]["name"],
stream=True,
).content
)
),
]
)
canvas_sockets.remove(img_id)
logger.debug(
"Canvas sockets remaining: {}", len(canvas_sockets)
)
for i in range(0, canvas_count - 1):
ws.send(json.dumps({"id": str(2 + i), "type": "stop"}))
ws.close()
new_img_width = (
max(map(lambda x: x["dx"], canvas_details["canvasConfigurations"]))
+ canvas_details["canvasWidth"]
)
logger.debug("New image width: {}", new_img_width)
new_img_height = (
max(map(lambda x: x["dy"], canvas_details["canvasConfigurations"]))
+ canvas_details["canvasHeight"]
)
logger.debug("New image height: {}", new_img_height)
new_img = Image.new("RGB", (new_img_width, new_img_height))
for idx, img in enumerate(sorted(imgs, key=lambda x: x[0])):
logger.debug("Adding image (ID {}): {}", img[0], img[1])
dx_offset = int(canvas_details["canvasConfigurations"][idx]["dx"])
dy_offset = int(canvas_details["canvasConfigurations"][idx]["dy"])
new_img.paste(img[1], (dx_offset, dy_offset))
return new_img
def get_unset_pixel(self):
originalX = x = random.randint(0, self.image_size[0]-1)
originalY = y = random.randint(0, self.image_size[1]-1)
loopedOnce = False
imgOutdated = True
while True:
if x >= self.image_size[0]:
y += 1
x = 0
if y >= self.image_size[1]:
y = 0
if x == originalX and y == originalY and loopedOnce:
logger.info(
"All pixels correct, trying again in 10 seconds... ",
)
time.sleep(10)
imgOutdated = True
if imgOutdated:
boarding = self.get_board(self.access_token)
pix2 = boarding.convert("RGB").load()
imgOutdated = False
logger.debug("{}, {}", x + self.pixel_x_start, y + self.pixel_y_start)
logger.debug(
"{}, {}, boarding, {}, {}", x, y, self.image_size[0], self.image_size[1]
)
target_rgb = self.pix[x, y]
new_rgb = ColorMapper.closest_color(
target_rgb, self.rgb_colors_array, self.legacy_transparency
)
if pix2[x + self.pixel_x_start, y + self.pixel_y_start] != new_rgb:
logger.debug(
"{}, {}, {}, {}",
pix2[x + self.pixel_x_start, y + self.pixel_y_start],
new_rgb,
new_rgb != (69, 42, 0),
pix2[x, y] != new_rgb,
)
# (69, 42, 0) is a special color reserved for transparency.
if new_rgb != (69, 42, 0):
logger.debug(
"Replacing {} pixel at: {},{} with {} color",
pix2[x + self.pixel_x_start, y + self.pixel_y_start],
x + self.pixel_x_start,
y + self.pixel_y_start,
new_rgb,
)
break
else:
logger.info(
"Transparent Pixel at {}, {} skipped",
x + self.pixel_x_start,
y + self.pixel_y_start,
)
x += 1
loopedOnce = True
return x, y, new_rgb
def task(self, name, passw):
self.name = name
self.passw = passw
repeat_forever = True
while True:
# Timing shit
pixel_place_frequency = 315 + random.random()*60
current_time = math.floor(time.time())
next_placement_time = current_time + pixel_place_frequency
if self.access_token is None:
next_placement_time = current_time
while True:
current_timestamp = math.floor(time.time())
if self.access_token_expiry_timestamp is None or current_timestamp >= self.access_token_expiry_timestamp:
logger.info(
"User {}: Refreshing access token", name
)
username = name
password = passw
while True:
try:
client = requests.Session()
client.headers.update(
{
"User-Agent": f"{utils.select_user_agent(self)}",
"Origin": "https://www.reddit.com/",
"Sec-Fetch-Dest": "empty",
"Sec-Fetch-Mode": "cors",
"Sec-Fetch-Site": "same-origin"
}
)
r = client.get(
"https://www.reddit.com/login",
)
login_get_soup = BeautifulSoup(r.content, "html.parser")
csrf_token = login_get_soup.find(
"input", {"name": "csrf_token"}
)["value"]
data = {
"username": username,
"password": password,
"dest": "https://new.reddit.com/",
"csrf_token": csrf_token,
}
r = client.post(
"https://www.reddit.com/login",
data=data,
)
break
except Exception as e:
logger.error(e)
logger.error(
"Failed to connect to websocket, trying again in 30 seconds..."
)
time.sleep(30)
if r.status_code != HTTPStatus.OK.value:
# password is probably invalid
logger.exception("{} - Authorization failed!", username)
logger.debug("response: {} - {}", r.status_code, r.text)
return
else:
logger.success("{} - Authorization successful!", username)
logger.info("Obtaining access token...")
r = client.get(
"https://new.reddit.com/",
)
data_str = (
BeautifulSoup(r.content, features="html.parser")
.find("script", {"id": "data"})
.contents[0][len("window.__r = ") : -1]
)
data = json.loads(data_str)
response_data = data["user"]["session"]
if "error" in response_data:
logger.info(
"An error occured. Make sure you have the correct credentials. Response data: {}",
response_data,
)
exit()
self.access_token = response_data["accessToken"]
access_token_expires_in_seconds = response_data[
"expiresIn"
] # this is usually "3600"
self.access_token_expiry_timestamp = current_timestamp + int(access_token_expires_in_seconds)
logger.info(
"Received new access token: {}************",
self.access_token,
)
if self.access_token is not None and (
current_timestamp >= next_placement_time
):
current_x, current_y, new_rgb = self.get_unset_pixel()
new_rgb_hex = ColorMapper.rgb_to_hex(new_rgb)
pixel_color_index = ColorMapper.COLOR_MAP[new_rgb_hex]
canvas = 0
pixel_x_start = self.pixel_x_start + current_x
pixel_y_start = self.pixel_y_start + current_y
while pixel_x_start > 999:
pixel_x_start -= 1000
canvas += 1
while pixel_y_start > 999:
pixel_y_start -= 1000
canvas += 3
# draw the pixel onto r/place
next_placement_time = self.set_pixel_and_check_ratelimit(
self.access_token,
pixel_x_start,
pixel_y_start,
name,
pixel_color_index,
canvas,
)
time_until_next_draw = next_placement_time - current_timestamp
# If next_pixel_placement_time (returned by place_pixel_and_check_ratelimit)
# is too large, user is likely permabanned
if time_until_next_draw > 10000:
logger.warning(
"CANCELLED :: Rate-Limit Banned"
)
repeat_forever = False
break
print(f"Time until next place: {time_until_next_draw}", end='\r', flush=True)
if not repeat_forever:
break
def update_image(self):
json_response = requests.get("https://us0.co/config.json")
if json_response.status_code == 200: #CHECK STATUS OF URL
try:
self.json_data = json_response.json() #PARSE JSON RESPONSE
except json.JSONDecodeError:
print("Error decoding JSON response.")
else:
print("Failed to fetch data. Status code:", json_response.status_code)
image_response = requests.get("https://us0.co/image.png")
if image_response.status_code == 200:
filename = 'image.png'
image_save_path = os.path.join("./", filename) #SET SAVE PATH
#SAVE IMAGE TO PATH
with open(image_save_path, 'wb') as f:
f.write(image_response.content)
print(f"Image saved successfully to {image_save_path}")
else:
print(f"Failed to download image. Status code: {image_response.status_code}")
def main():
logger.remove()
logger.add(sys.stderr, level="INFO")
client = PlaceClient('config.json')
if ('AFIP_USER' in os.environ):
user = os.environ['AFIP_USER']
else:
user = input("Reddit username: ")
if ('AFIP_PASS' in os.environ):
passw = os.environ['AFIP_PASS']
else:
passw = input("Password: ")
client.task(user, passw)
if __name__ == "__main__":
main()