- Published on
Sync rawdata to S3
- Authors
- Name
- Shelton Ma
sync_rawdata_to_s3
import os import time import datetime import logging import traceback import sys import subprocess def sync_rawdata_to_s3(modified_second=3600): """ Syncs JSON.gz files from `/rawdata` to `s3://raw-data/`. Sync Conditions: - Files modified within the last hour are synced. - The script executes `s3 sync` in the respective directory. Args: - `modified_second` (int, optional): Specifies the time window in seconds for file modification. Defaults to 3600 (1 hour). The sync runs every 10 minutes, ensuring each file has up to 6 opportunities to upload, minimizing the risk of failure. """ try: prefix_path = set() for dirpath, dirs, filenames in os.walk('/rawdata'): for filename in filenames: file_path = os.path.join(dirpath, filename) if '.json.gz' in file_path and (time.time()-os.path.getmtime(file_path))<modified_second: prefix_path.add(dirpath) for prefix in prefix_path: cmd = f'/usr/local/bin/aws s3 sync {prefix} s3://raw-data/{prefix.replace("/rawdata/", "")} --exclude "*.json"' p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) output, err = p.communicate() if p.returncode != 0: error_message = f"{sys._getframe().f_code.co_name} sync failed. Please review. Command: {cmd}, Output: {output.decode('utf-8')}, Error: {err.decode('utf-8')}" logging.info(error_message) else: logging.info(f"cmd: {cmd}, output: {output.decode('utf-8')}, err: {err.decode('utf-8')}") except: logging.error(traceback.format_exc())