#!/usr/bin/env python """ This utility extracts media urls from tweet jsonl.gz and save them as warc records. Warcio (https://github.com/webrecorder/warcio) is a dependency and before you can use it you need to: % pip install warcio You run it like this: % python media2warc.py /mnt/tweets/ferguson/tweets-0001.jsonl.gz /mnt/tweets/ferguson/tweets-0001.warc.gz The input file will be checked for duplicate urls to avoid duplicates within the input file. Subsequent runs will be deduplicated using a sqlite db. If an identical-payload-digest is found a revist record is created. The script is able to fetch media resources in multiple threads (maximum 2) by passing --threads (default to a single thread). Please be careful modifying this script to use more than two threads since it can be interpreted as a DoS-attack. """ import os import gzip import json import time import queue import hashlib import logging import sqlite3 import argparse import requests import threading from datetime import timedelta from warcio.warcwriter import WARCWriter from warcio.statusandheaders import StatusAndHeaders q = queue.Queue() out_queue = queue.Queue() BLOCK_SIZE = 25600 class GetResource(threading.Thread): def __init__(self, q): threading.Thread.__init__(self) self.q = q self.rlock = threading.Lock() self.out_queue = out_queue self.d = Dedup() def run(self): while True: host = self.q.get() try: r = requests.get( host, headers={"Accept-Encoding": "identity"}, stream=True ) data = [r.raw.headers.items(), r.raw, host, r.status_code, r.reason] print(data[2]) self.out_queue.put(data) self.q.task_done() except requests.exceptions.RequestException as e: logging.error("%s for %s", e, data[2]) print(e) self.q.task_done() continue class WriteWarc(threading.Thread): def __init__(self, out_queue, warcfile): threading.Thread.__init__(self) self.out_queue = out_queue self.lock = threading.Lock() self.warcfile = warcfile self.dedup = Dedup() def run(self): with open(self.warcfile, "ab") as output: while True: self.lock.acquire() data = self.out_queue.get() writer = WARCWriter(output, gzip=False) headers_list = data[0] http_headers = StatusAndHeaders( "{} {}".format(data[3], data[4]), headers_list, protocol="HTTP/1.0" ) record = writer.create_warc_record( data[2], "response", payload=data[1], http_headers=http_headers ) h = hashlib.sha1() h.update(record.raw_stream.read(BLOCK_SIZE)) if self.dedup.lookup(h.hexdigest()): record = writer.create_warc_record( data[2], "revisit", http_headers=http_headers ) writer.write_record(record) self.out_queue.task_done() self.lock.release() else: self.dedup.save(h.hexdigest(), data[2]) record.raw_stream.seek(0) writer.write_record(record) self.out_queue.task_done() self.lock.release() class Dedup: """ Stolen from warcprox https://github.com/internetarchive/warcprox/blob/master/warcprox/dedup.py """ def __init__(self): self.file = os.path.join(args.archive_dir, "dedup.db") def start(self): conn = sqlite3.connect(self.file) conn.execute( "create table if not exists dedup (" " key varchar(300) primary key," " value varchar(4000)" ");" ) conn.commit() conn.close() def save(self, digest_key, url): conn = sqlite3.connect(self.file) conn.execute( "insert or replace into dedup (key, value) values (?, ?)", (digest_key, url) ) conn.commit() conn.close() def lookup(self, digest_key, url=None): result = False conn = sqlite3.connect(self.file) cursor = conn.execute("select value from dedup where key = ?", (digest_key,)) result_tuple = cursor.fetchone() conn.close() if result_tuple: result = True return result def parse_extended_entities(extended_entities_dict): """Parse media file URL:s form tweet data :extended_entities_dict: :returns: list of media file urls """ urls = [] if "media" in extended_entities_dict.keys(): for item in extended_entities_dict["media"]: # add static image urls.append(item["media_url_https"]) # add best quality video file if "video_info" in item.keys(): max_bitrate = -1 # handle twitters occasional bitrate=0 video_url = None for video in item["video_info"]["variants"]: if "bitrate" in video.keys() and "content_type" in video.keys(): if video["content_type"] == "video/mp4": if int(video["bitrate"]) > max_bitrate: max_bitrate = int(video["bitrate"]) video_url = video["url"] if not video_url: print("Error: No bitrate / content_type") print(item["video_info"]) else: urls.append(video_url) return urls def parse_binlinks_from_tweet(tweetdict): """Parse binary file url:s from a single tweet. :tweetdict: json data dict for tweet :returns: list of urls for media files """ urls = [] if "user" in tweetdict.keys(): urls.append(tweetdict["user"]["profile_image_url_https"]) urls.append(tweetdict["user"]["profile_background_image_url_https"]) if "extended_entities" in tweetdict.keys(): urls.extend(parse_extended_entities(tweetdict["extended_entities"])) return urls def main(): start = time.time() if not os.path.isdir(args.archive_dir): os.mkdir(args.archive_dir) logging.basicConfig( filename=os.path.join(args.archive_dir, "media_harvest.log"), level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s", ) logging.getLogger(__name__) logging.info("Logging media harvest for %s", args.tweet_file) urls = [] d = Dedup() d.start() uniqueUrlCount = 0 duplicateUrlCount = 0 if args.tweet_file.endswith(".gz"): tweetfile = gzip.open(args.tweet_file, "r") else: tweetfile = open(args.tweet_file, "r") logging.info("Checking for duplicate urls") for line in tweetfile: tweet = json.loads(line) tweet_urls = parse_binlinks_from_tweet(tweet) for url in tweet_urls: if not url in urls: urls.append(url) q.put(url) uniqueUrlCount += 1 else: duplicateUrlCount += 1 logging.info( "Found %s total media urls %s unique and %s duplicates", uniqueUrlCount + duplicateUrlCount, uniqueUrlCount, duplicateUrlCount, ) threads = int(args.threads) if threads > 2: threads = 2 for i in range(threads): t = GetResource(q) t.daemon = True t.start() wt = WriteWarc(out_queue, os.path.join(args.archive_dir, "warc.warc")) wt.daemon = True wt.start() q.join() out_queue.join() logging.info( "Finished media harvest in %s", str(timedelta(seconds=(time.time() - start))) ) if __name__ == "__main__": parser = argparse.ArgumentParser("archive") parser.add_argument( "tweet_file", action="store", help="a twitter jsonl.gz input file" ) parser.add_argument( "archive_dir", action="store", help="a directory where the resulting warc is stored", ) parser.add_argument( "--threads", action="store", default=1, help="Number of threads that fetches media resources", ) args = parser.parse_args() main()