#!env python3 import requests import urllib.request import os import argparse import time import configparser from multiprocessing import Pool import xdg import os API_URL = "https://ifunny.co/api/v1/feeds/tags/" class Video: def __init__(self, url, name): self.url = url self.name = name def download(video, name): if args.verbose: print("Now running for " + str(video)) urlsplit = video.split('.') if args.verbose: print("url split = " + str(urlsplit)) if len(name) > 200: if args.verbose: print("shortening name because its too long") name = name[0:200] name = name.replace('"', "") name = name.replace('/', "") outputPath = args.output + tags + " - " + name + "." + urlsplit[len(urlsplit)-1] if args.verbose: print("name read as: " + name) if os.path.exists(outputPath): print(name + " already exists!") else: print("saving " + video + " as " + name) if args.type.lower() == "video" or args.type.lower() == "videos": os.system("ffmpeg -y -i \"" + video + "\" \"" + outputPath + "\"") elif args.type.lower() == "gifs" or args.type.lower() == "gif": urllib.request.urlretrieve(video, outputPath) if args.verbose: print("saving complete!") config = configparser.ConfigParser() configdir = str(xdg.XDG_CONFIG_HOME) config.read(configdir + "/comedyGenerator") parser = argparse.ArgumentParser(add_help=True, formatter_class=argparse.RawTextHelpFormatter) parser.add_argument('--verbose', '-v', default=False, action='store_true', dest='verbose', help='Enables verbose output' ) parser.add_argument('--amount','-a', default=int(config['DEFAULT']['amount']), dest='amount', action="store", help="the amount of funnies you'd like to download per tag", type=int ) parser.add_argument('--output','-o', dest=None, action="store", help="the output directory of the funnies", type=str ) parser.add_argument('--log','-l', default=config['DEFAULT']['log'], dest='log', action="store", help="the output directory of the log file for your funnies. STDOUT if you wish for standard output.", type=str ) parser.add_argument('--jobs','-j', default=int(config['DEFAULT']['jobs']), dest='jobs', action="store", help="how many jobs you'd like to use while downloading", type=int ) parser.add_argument('--no-download', '--nd', dest='noDownload', action="store_true", help="If you would like to actually download the funnies or not", ) parser.add_argument('--type','-t', default=str(config['DEFAULT']['type']), dest='type', action="store", help= ''' The type of content that will be extracted The following arguments are valid (case insensitive): Video Gifs ''', type=str ) parser.add_argument('tags', nargs='+', type=str, help='Provides tags to be check for funny downloading' ) args = parser.parse_args() contentFilter = "video" if args.output == None: if args.verbose: print("Recalibtrating output location") if args.type.lower() == "video" or args.type.lower() == "videos": args.output = config['Videos']['output'] elif args.type.lower() == "gifs" or args.type.lower() == "gif": args.output = config['Gifs']['output'] if args.type.lower() == "video" or args.type.lower() == "videos": contentFilter = "video" elif args.type.lower() == "gifs" or args.type.lower() == "gif": contentFilter = "gif" if args.verbose: print("Current output location: " + str(args.output)) print("Current filter: " + contentFilter) #string comparision so I don't need two vars; bloat kills bloat logging = False if not args.log == "False": logging = True if args.log == "STDOUT": logFile = "STDOUT" else: logFile = open(args.log, "a") tmpLog = [] if args.verbose: print("We're going to be logging!") print("Logging Path: " + args.log) headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; rv:91.0) Gecko/20100101 Firefox/91.0", "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8", "Accept-Language": "en-US,en;q=0.5", "DNT": "1", "Connection": "keep-alive", "Upgrade-Insecure-Requests": "1", "Sec-Fetch-Dest": "document", "Sec-Fetch-Mode": "navigate", "Sec-Fetch-Site": "cross-site", "Cache-Control": "max-age=0" } for tags in args.tags: videos = [] print("Downloading Tag: " + tags) headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; rv:91.0) Gecko/20100101 Firefox/91.0", "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8", "Accept-Language": "en-US,en;q=0.5", "DNT": "1", "Connection": "keep-alive", "Upgrade-Insecure-Requests": "1", "Sec-Fetch-Dest": "document", "Sec-Fetch-Mode": "navigate", "Sec-Fetch-Site": "cross-site", "Cache-Control": "max-age=0" } for tries in range(100): try: master = requests.get('https://ifunny.co/', headers=headers) except Exception as error: if tries < 100 - 1: print("Rate Limited! Sleeping for " + str(tries*1.5) + " seconds!") time.sleep(tries*1.5) print(error) continue break combineHeader = (dict(master.headers)|headers) requestHeader = { #"User-Agent":combineHeader['User-Agent'], "User-Agent": "User-Agent: Mozilla/5.0 (Windows NT 10.0; rv:124.0) Gecko/20100101 Firefox/124.0", "Content-Type":combineHeader['Content-Type'], "x-requested-with": "fetch", "x-csrf-token": combineHeader['Set-Cookie'].split(';')[0].split('=')[1], "set-cookies":combineHeader['Set-Cookie'], "access-control-allow-headers":combineHeader['access-control-allow-headers'] } requestCookies = { "CID" : combineHeader['Set-Cookie'].split(';')[3].split('=')[2], "sound" : "off", "viewMode" : "list", "x-csrf-token": combineHeader['Set-Cookie'].split(';')[0].split('=')[1] } for tries in range(100): try: tagPage = requests.get(API_URL + tags + "?filter=" + contentFilter, headers=requestHeader, cookies=requestCookies) if args.verbose: print("Got Webpage!") except: if tries < 100 - 1: print("Rate Limited! Sleeping for " + str(tries*1.5) + " seconds!") time.sleep(tries*1.5) continue break JSONDump = tagPage.json() while len(videos) < args.amount and len(JSONDump['items']) > 0: print("Currently have " + str(len(videos)) + " " + contentFilter + " out of " + str(args.amount) + " (" + str((len(videos)/args.amount)*100) + "%)") for item in range(len(JSONDump['items'])): videos.append( Video(JSONDump['items'][item]['url'], JSONDump['items'][item]['title']) ) for tries in range(100): try: tagPage = requests.get(API_URL + tags + "?filter=" + contentFilter + "&next=" + JSONDump['pagination']['next'], headers=requestHeader, cookies=requestCookies) JSONDump = tagPage.json() if args.verbose: print("Got New Tag Page!") break except: if tries < 100 - 1: print("Rate Limited! Sleeping for " + str(tries*1.5) + " seconds!") time.sleep(tries*1.5) continue if len(videos) > args.amount: videos = videos[:args.amount] if args.verbose: print("Videos list truncated! Its now: " + str(len(videos)) + " units long") if args.jobs > 1 and args.noDownload == False: if args.verbose: print("Creating multiprocessing pool...") pool = Pool(args.jobs) for video in videos: if args.verbose: print("Sending " + video.name + " to multiprocessing pool") pool.apply_async(download, (video.url, video.name,)) if args.verbose: print("closing the multiprocessing pool") pool.close() pool.join() elif args.noDownload == False: for video in videos: download(video.url, video.name) if logging and args.noDownload == False: for video in videos: logName = video.url logPath = args.output + logName if args.verbose: print("Writing " + logPath + " to log file") logFile.write(logPath + "\n") elif logging: for video in videos: logName = video.url if args.verbose: print("Writing " + logName + " to log file") logFile.write(logName + "\n") if logging: logFile.close() #This program is free software: you can redistribute it and/or modify #it under the terms of the GNU Affero General Public License version 3 as published by #the Free Software Foundation. # #This program is distributed in the hope that it will be useful, #but WITHOUT ANY WARRANTY; without even the implied warranty of #MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #GNU Affero General Public License for more details. # #You should have received a copy of the GNU Affero General Public License #along with this program. If not, see .