Published on

Sync rawdata to S3

Authors
  • avatar
    Name
    Shelton Ma
    Twitter
  1. sync_rawdata_to_s3

    import os
    import time
    import datetime
    import logging
    import traceback
    import sys
    import subprocess
    
    def sync_rawdata_to_s3(modified_second=3600):
        """
        Syncs JSON.gz files from `/rawdata` to `s3://raw-data/`.  
    
        Sync Conditions:  
        - Files modified within the last hour are synced.  
        - The script executes `s3 sync` in the respective directory.  
    
        Args:  
        - `modified_second` (int, optional): Specifies the time window in seconds for file modification. Defaults to 3600 (1 hour).  
        The sync runs every 10 minutes, ensuring each file has up to 6 opportunities to upload, minimizing the risk of failure.  
        """
        try:
            prefix_path = set()
            for dirpath, dirs, filenames in os.walk('/rawdata'):
                for filename in filenames:
                    file_path = os.path.join(dirpath, filename)
                    if '.json.gz' in file_path and (time.time()-os.path.getmtime(file_path))<modified_second:
                        prefix_path.add(dirpath)
            for prefix in prefix_path:
                cmd = f'/usr/local/bin/aws s3 sync {prefix} s3://raw-data/{prefix.replace("/rawdata/", "")} --exclude "*.json"'
                p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
                output, err = p.communicate()
                if p.returncode != 0:
                    error_message = f"{sys._getframe().f_code.co_name} sync failed. Please review. Command: {cmd}, Output: {output.decode('utf-8')}, Error: {err.decode('utf-8')}"
                    logging.info(error_message)
                else:
                    logging.info(f"cmd: {cmd}, output: {output.decode('utf-8')}, err: {err.decode('utf-8')}")
        except:
            logging.error(traceback.format_exc())