Discovering Shopify Domains: A Journey Through Common Crawl Data (2024)

Overview

In this article, I will explain how I downloaded 4,800 files totaling over 3 terabytes of data from commoncrawl, containing over 45 billion URLs. Using Kaggle and self-hosted MinIO to process and store the data, I parsed this data to find all domains and subdomains and then resolved these domains to IP addresses using Google’s and Cloudflare’s DNS over HTTPS services. To maximize hardware capabilities, I used the aiohttp and multiprocessing libraries in Python. Ultimately, I discovered over 465,000 Shopify domains. You can get code and the domains from these link below.

Notebook Code: https://www.kaggle.com/code/alighafoori/shopify2024

Download shopify’s domains: https://docs.google.com/spreadsheets/d/1dykrF5EKpQliD4uPNywyYQMzbqMT_iwVOo_bUMbDHB0/edit?usp=sharing

Why I Use Object Storage Instead of aDatabase

I used Google Colab, Kaggle, and Saturn Cloud for scraping data, and I have a VPS on Hetzner with a substantial amount of free space. By using MinIO for object storage, I was able to streamline the process and reduce the amount of code needed. Due to network latency, it was more efficient to load all data into RAM for processing and then store it in object storage for further use.

Download All URL IndexFiles

I aimed to process all data from 2022 to 2024, which comprises 14 archives. Each archive of Common Crawl data contains 300 URL index files, and each file includes at least 10 million lines similar to the text below.

net,slideshare,es)/shellyzediyo 20240529233815 {"url": "https://es.slideshare.net/ShellyZediyo", "mime": "text/html", "mime-detected": "text/html", "status": "200", "digest": "PF673LETY7C5454OZXLPRJ6YSUTVN67O", "length": "20654", "offset": "202849866", "filename": "crawl-data/CC-MAIN-2024-22/segments/1715971059412.27/warc/CC-MAIN-20240529230852-20240530020852-00717.warc.gz", "charset": "UTF-8", "languages": "spa"}

Within the `crawl_index_paths` function, identify URLs of all index files present in the archive files. then, leverage multiprocessing to efficiently process these index files in parallel.

def crawl_index_paths(start_urls,bucket_name): """ Crawls a list of Common Crawl archives. Args: start_urls (list): A list of URLs pointing to Common Crawl archives. bucket_name (str): The name of the S3 bucket (or MinIO equivalent) containing previously processed files. """ #Get a list of previously processed files to skip lst=get_list_objects(bucket_name) #Store start time to calculate total processing time start_time = time.time() #Determine the number of cores available on the system num_cores = multiprocessing.cpu_count() print(f"number cores:{num_cores}") #List to store URLs of all index files found within archive files urls=[] for url in start_urls: #download index file response = requests.get(url) print(f'get index file {url} len {len(response.content)}') #Open gzip file for reading content line by line with gzip.GzipFile(fileobj=io.BytesIO(response.content), mode='rb') as gz_file: for line in gz_file: l=line.decode('utf-8').strip() #Add the file to the processing list if it ends with '.gz' and is not present in the skip list if l.endswith('.gz'): name=get_object_name(l) if not name in lst: urls.append(l) print(f"count of path files:{len(urls)}") #Create a pool for parallel processing pool = multiprocessing.Pool(processes=num_cores) #distribute the URL list for processing tasks pool.starmap(proccess_index_file, [(url,bucket_name) for url in urls]) #close the pool and wait for all tasks to finish pool.close() pool.join() #Record the end time and calculate the total execution time end_time = time.time() execution_time = end_time - start_time print(f"Total execution time:", execution_time)

Within the `process_index_file` function:

1. Read the file line by line to extract host names.

2. For each unique host name encountered, add it to the dictionary.

3. Once processing is complete, serialize the dictionary to JSON format.

4. Compress the JSON data and upload the compressed JSON data to object storage.

def proccess_index_file(path,bucket_name): try: #Extract the object name from the path name=get_object_name(path) print(f"start proccessing {path} {bucket_name}\n") #Store start time to calculate total processing time start_time = time.time() #Define a counter variable for tracking the number of lines processed a=0 url='https://data.commoncrawl.org/'+path response = requests.get(url) print(f"download complete {path}") #Create a dictionary to efficiently store unique host names encountered dic=dict() #Open gzip file for reading content line by line with gzip.GzipFile(fileobj=io.BytesIO(response.content), mode='rb') as gz_file: for line in gz_file: a+=1 try: # extract host name from text sp_host=get_before_space(line.decode('utf-8').strip()).split(')') #Add the host name to the dictionary if it's not already present if not sp_host[0] in dic: dic[sp_host[0]]='' #Print the number of lines processed every 1 million lines for progress tracking if a % 1000000==0: print(f'proccess line {a} in {path}') except Exception as e: print(a,'error') print(e) return result=dict() #Split text, reverse the order of elements, and join them back to form the correct host name & add to result dict for key in dic.keys(): host_array=key.split(',') host_array.reverse() host='.'.join(host_array) result[host]='' # Write the results dictionary to a JSON file file_name=f'{name}.txt' with open(file_name, 'w') as json_file: json.dump(result, json_file) # Compress and upload json file to object storage compress_and_upload_to_minio(file_name,bucket_name,name) #Record the end time and calculate the total execution time end_time = time.time() execution_time = end_time - start_time print(f"Execution time {path}:", execution_time) print(f"domains count:{len(result)}") except Exception as e: print(f'error in path {path} \n {e}')

Processing 300 files from a single archive typically takes around 1.5 hours on average in Kaggle.

Merging and splitting Dictionaries

now I have thousands of files, each containing hundreds of thousands of unique hostnames. Some hostnames might have appeared in multiple files. My goal was to combine everything into one giant list of unique hostnames. putting everything together took up 17 gigabytes of memory on kaggle, and I found over 140 million unique website hostnames!

def download_and_merge_objects(bucket_name): # Get a list of processed files containing hostnames processed_files = get_list_objects(bucket_name) # Get a dictionary of merged dictionaries merged_files = download_dict('domains', 'finish.gz') # Get a dictionary that contains all hostnames merged_data = download_dict('domains', 'list.gz') # Define a counter variable for tracking the number of merged dictionaries merge_counter = 0 print(f'files: {len(processed_files)} finish: {len(merged_files)} domains: {len(merged_data)}') for key in processed_files: # If the dictionary is already merged, then skip if key in merged_files: print(f'skip {key}') continue merge_counter += 1 # Download dictionary data = download_json_dict(bucket_name, key, False) # Merge data for item in data.keys(): if item not in merged_data: merged_data[item] = data[item] # Free memory del data # Add current dictionary to the list of merged dictionaries merged_files[key] = '' print(f'domains count: {len(merged_data):,} counter: {merge_counter}') # Every 500 loops, upload data to object storage if merge_counter % 500 == 0: upload_dict(merged_data, 'domains', 'list.gz') upload_dict(merged_files, 'domains', 'finish.gz') # Collect garbage to free memory gc.collect() # Final step: upload data upload_dict(merged_data, 'domains', 'list.gz') upload_dict(merged_files, 'domains', 'finish.gz') return merged_data

Now it’s time to split the giant dictionary into chunks, each containing up to 1 million hostnames.

def split_dictionary(): # Download the dictionary containing over 145 million domains domain_dict = download_dict('domains', 'list.gz') # Define a counter to track progress counter = 0 # Define part ID and part dictionary for splitting part_id = 1 part = {} for key in domain_dict: part[key] = '' counter += 1 # If the counter hits one million records, upload the part dictionary to object storage if counter % 1000000 == 0: upload_dict(part, 'domains-parts', f'part-{part_id}.pkl.gz') part_id += 1 part = {} # Upload the remaining records upload_dict(part, 'domains-parts', f'part-{part_id}.pkl.gz')

Resolve hostnames

Now, I harness the power of parallel programming to utilize the full capacity of the hardware. First, I compile a list of dictionary parts and shuffle it. This shuffling is important because, in future runs, some processes might skip the initial items. Without shuffling, this could result in a single processor at the end of the list doing all the work, which I want to avoid.

def start_resolve(): # Get the list of parts parts_list = get_list_objects('domains-parts') # Compile the list of parts and then shuffle it parts_to_process = [('domains-parts', key) for key in parts_list] random.shuffle(parts_to_process) # Determine the number of available CPU cores num_cores = multiprocessing.cpu_count() # Create a pool of worker processes and assign the list to it with multiprocessing.Pool(processes=num_cores) as pool: pool.starmap(resolve_task, parts_to_process) pool.close() pool.join()

We utilize aiohttp for asynchronous HTTP requests. First, if the IP addresses for the domains have already been found, the function will stop further processing. Otherwise, a session is created with a maximum of 200 connections, allowing up to 100 connections per host. This ensures efficient and controlled concurrency. If a domain is resolved, the result is saved. This approach maximizes the use of available connections while preventing overloading any single host

async def resolve_dns(bucket_name, object_name, loop): # A dictionary to store hostnames with their corresponding IPs domains = {} # Check if the part has already been DNS resolved; if so, download it for further checks if check_file_existence('domain-resolved', object_name): print(f'Another check resolve {object_name}') domains = download_dict('domain-resolved', object_name, check_exist=False) else: # If not resolved, download the part to resolve DNS print(f'Download dict from bucket {object_name}') domains = download_dict(bucket_name, object_name, check_exist=False) # Capture the start time for performance tracking start_time = time.time() # A dictionary to store futures futures = {} # HTTP header request headers = {"accept": "application/dns-json"} # Counter for total resolved and skipped entries resolved = 0 skip = 0 # Define TCPConnector for aiohttp session with a maximum of 200 connections and 100 per host connector = aiohttp.TCPConnector(limit=200, limit_per_host=100, loop=loop) session = ClientSession(connector=connector) # A variable for tracking progress count = 0 # Define semaphore with a maximum capacity of 200 for concurrent requests semaphore = asyncio.Semaphore(200) # Regex pattern for extracting IP addresses ip_pattern = r'^((25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)$' # Fetch function to send DNS over HTTPS request, release semaphore, and store hostname and IP async def fetch(url, id): nonlocal resolved try: async with session.get(url, headers=headers) as response: r = await response.json(content_type=None) semaphore.release() out = [] if 'Answer' in r: for ans in r['Answer']: if re.match(ip_pattern, ans['data']): out.append(ans['data']) del futures[id] domains[id] = out if len(out) > 0: resolved += 1 except Exception as e: print(f"Error fetching DNS for {id}: {e}") print(f'Domains count: {len(domains)} object_name: {object_name}') # Variable to alternate between Google and Cloudflare for DNS resolution use_cloudflare = True # Iterate over hostnames in the dictionary for key in domains.keys(): count += 1 # If already has IP, then skip if domains[key] != '': skip += 1 continue # Wait for semaphore availability await semaphore.acquire() url = '' # Set URL based on Cloudflare or Google if use_cloudflare: url = f'https://1.1.1.1/dns-query?name={key}' else: url = f'https://dns.google.com/resolve?type=1&name={key}' # Toggle DNS service use_cloudflare = !use_cloudflare # Create and store future for further result future = asyncio.ensure_future(fetch(url, key)) futures[key] = future # Log every 10000 steps if count % 10000 == 0: print(f'Processed {count} domains. Object name: {object_name} Resolved: {resolved} Skipped: {skip}') try: # Wait for all futures to complete and then close session await asyncio.gather(*list(futures.values())) await session.close() except Exception as e: print(f'Error in final step: {e}') # Upload resolved domains to object storage if more than 100 entries are resolved if resolved > 100: upload_dict(domains, 'domain-resolved', object_name) else: print(f'Cancel upload due to lack of changes: {object_name}') # Calculate and log total execution time end_time = time.time() execution_time = end_time - start_time print(f"Total execution time: {execution_time} seconds. Object name: {object_name} Resolved: {resolved} Skipped: {skip}") return domains

Last Phase: Locating ShopifyDomains

In this section, we will examine each domain along with its IP addresses. If the IP address 23.227.38.65, which is the main IP of Shopify, is found among them, we will add the domain to our results.

def find_shopify_domains(): # Get the list of all dictionaries containing hostnames and their IPs dictionaries_list = get_list_objects('domain-resolved') # A counter for tracking progress counter = 0 # A dictionary to save Shopify domains shopify_domains = {} # Download dictionaries and check if Shopify IP is in them, then add the domain to the result for object_name in dictionaries_list: domains = download_dict('domain-resolved', object_name, check_exist=False) for hostname, ips in domains.items(): counter += 1 for ip in ips: if ip == '23.227.38.65': shopify_domains[hostname] = '' # Print progress every 10,000 domains if counter % 10000 == 0: print(f'Processed: {counter} domains, Found Shopify domains: {len(shopify_domains)}') # Print total number of Shopify domains found and upload the result to object storage print(f'Final count of Shopify domains: {len(shopify_domains)}') upload_dict(shopify_domains, 'shopify-list', 'list2.pkl.gz')

Appreciation for Your Attention

Thank you for taking the time to read through to the end. If you have any questions or feedback, feel free to reach out me in linkedin.

Discovering Shopify Domains: A Journey Through Common Crawl Data (2024)

References

Top Articles
Latest Posts
Article information

Author: Barbera Armstrong

Last Updated:

Views: 5862

Rating: 4.9 / 5 (79 voted)

Reviews: 86% of readers found this page helpful

Author information

Name: Barbera Armstrong

Birthday: 1992-09-12

Address: Suite 993 99852 Daugherty Causeway, Ritchiehaven, VT 49630

Phone: +5026838435397

Job: National Engineer

Hobby: Listening to music, Board games, Photography, Ice skating, LARPing, Kite flying, Rugby

Introduction: My name is Barbera Armstrong, I am a lovely, delightful, cooperative, funny, enchanting, vivacious, tender person who loves writing and wants to share my knowledge and understanding with you.