red_panda.aws package

Submodules

red_panda.aws.athena module

class red_panda.aws.athena.AthenaUtils(aws_config: dict, s3_staging_dir: dict, work_group: str = None, region_name: str = None)

Bases: red_panda.aws.AWSUtils

AWS Athena operations.

Parameters:
  • aws_config – AWS configuration.
  • s3_staging_dir – Full S3 folder uri, i.e. s3://athena-query/results.
  • region_name – AWS region name.
aws_config

AWS configuration.

s3_staging_dir

Full S3 folder uri, i.e. s3://athena-query/results.

region_name

AWS region name.

run_query(sql: str, as_df: bool = False, use_cache: bool = False) → Union[list, pandas.core.frame.DataFrame]

Run query on Athena.

Parameters:
  • sql – SQL query.
  • as_df (optional) – Whether to return the result as DataFrame.
Returns:

Query result as DataFrame or List[Tuple].

red_panda.aws.redshift module

class red_panda.aws.redshift.RedshiftUtils(redshift_config: dict, dryrun: bool = False)

Bases: object

Base class for Redshift operations.

Parameters:
  • redshift_conf – Redshift configuration.
  • dryrun (optional) – If True, queries will be printed instead of executed.
redshift_conf

Redshift configuration.

Type:dict
cancel_query(pid: Union[str, int], transaction: bool = False)

Cancels a running query given pid.

Parameters:
  • pid – PID of a running query in Redshift.
  • transaction (optional) – Whether the running query is a transaction.
create_table(table_name: str, column_definition: dict, temp: bool = False, if_not_exists: bool = False, backup: str = 'YES', unique: List[str] = None, primary_key: str = None, foreign_key: List[str] = None, references: List[str] = None, diststyle: str = 'EVEN', distkey: str = None, sortstyle: str = 'COMPOUND', sortkey: List[str] = None, drop_first: bool = False)

Utility for creating a table in Redshift.

Parameters:
  • table_name – Name of table to be created.
  • column_definition

    default:

    {
        "col1": {
            "data_type": "varchar(256)", # str
            "default": None, # Any
            "identity": None, # tuple(int, int)
            "encode": None, # str
            "distkey": False,
            "sortkey": False,
            "nullable": True,
            "unique": False,
            "primary_key": False,
            "foreign_key": False,
            "references": None, # str
            "like": None, # str
        },
        "col2": {
            ...
        },
        ...
    }
    
  • temp (optional) – Corresponding argument of create table in Redshift.
  • if_not_exists (optional) – Corresponding argument of create table in Redshift.
  • backup (optional) – Corresponding argument of create table in Redshift.
  • unique (optional) – Corresponding argument of create table in Redshift.
  • primary_key (optional) – Corresponding argument of create table in Redshift.
  • foreign_key (optional) – Corresponding argument of create table in Redshift. Must match references.
  • references (optional) – Corresponding argument of create table in Redshift. Must match foreign_key.
  • diststyle (optional) – Corresponding argument of create table in Redshift.
  • distkey (optional) – Corresponding argument of create table in Redshift.
  • sortstyle (optional) – Corresponding argument of create table in Redshift.
  • sortkey (optional) – Corresponding argument of create table in Redshift.
  • drop_first (optional) – Corresponding argument of create table in Redshift.
get_load_error(as_df: bool = True) → Union[Tuple[Optional[dict], Optional[list]], pandas.core.frame.DataFrame]

Utility to get load errors in the cluster.

Parameters:as_df (optional) – Whether or not to return the result as a pandas.DataFrame.
Returns:Load error information.
get_lock_info(as_df: bool = True) → Union[Tuple[Optional[dict], Optional[list]], pandas.core.frame.DataFrame]

Utility to get lock information in the cluster.

Parameters:as_df (optional) – Whether or not to return the result as a pandas.DataFrame.
Returns:Lock information.
get_num_slices() → Optional[int]

Get number of slices of a Redshift cluster.

Returns:Number of slices of the connected cluster.
Return type:int
Raises:IndexError – When Redshift returns invalid number of slices.
get_running_info(as_df: bool = True) → Union[Tuple[Optional[dict], Optional[list]], pandas.core.frame.DataFrame]

Utility to get information on running queries in the cluster.

Parameters:as_df (optional) – Whether or not to return the result as a pandas.DataFrame.
Returns:Running query information.
get_table_info(as_df: bool = True, simple: bool = False) → Union[Tuple[Optional[dict], Optional[list]], pandas.core.frame.DataFrame]

Utility to get table information in the cluster.

Parameters:
  • as_df (optional) – Whether or not to return the result as a pandas.DataFrame.
  • simple (optional) – Whether to get the basic table information only.
Returns:

Table information.

get_transaction_info(as_df: bool = True) → Union[Tuple[Optional[dict], Optional[list]], pandas.core.frame.DataFrame]

Utility to get transaction information in the cluster.

Parameters:as_df (optional) – Whether or not to return the result as a pandas.DataFrame.
Returns:Transaction information.
kill_session(pid: Union[str, int])

Kill a session given pid.

Parameters:pid – PID of a running query in Redshift.
redshift_to_df(sql: str) → pandas.core.frame.DataFrame

Redshift query result to a Pandas DataFrame.

Parameters:sql – SQL query.
Returns:A DataFrame of query result.
Return type:pandas.DataFrame
redshift_to_file(sql: str, file_name: str, **kwargs)

Redshift query result to a file.

Parameters:
  • sql – SQL query.
  • file_name – File name of the saved file.
  • **kwargsto_csv keyword arguments.
run_query(sql: str, fetch: bool = False) → Tuple[Optional[dict], Optional[list]]

Run a SQL query.

Parameters:
  • sql – SQL string.
  • fetch (optional) – Whether to return data from the query.
Returns:

(data, columns) where data is a json/dict representation of the data and columns is a list of column names.

Return type:

Tuple[dict, list]

run_sql_from_file(file_name: str)

Run a .sql file.

Parameters:file_name – SQL file to be run.
run_template(sql: str, as_df: bool = True) → Union[Tuple[Optional[dict], Optional[list]], pandas.core.frame.DataFrame]

Utility method to run a pre-defined sql template.

Parameters:
  • sql – SQL string.
  • as_df (optional) – Whether or not to return the result as a pandas.DataFrame.
Returns:

Either return a DataFrame/table of the template query result or the raw form as specified in run_query.

Return type:

Union[pandas.DataFrame, Tuple[dict, list]]

red_panda.aws.redshift.create_column_definition(d: dict) → str

Create full column definition string for Redshift.

Parameters:d – A dict if single column definitions, where the keys are column names.
Returns:Full column definition for Redshift.
Return type:str
red_panda.aws.redshift.create_column_definition_single(d: dict) → str

Create the column definition for a single column.

Parameters:d

A dict of values to compose a single column definition, defaults:

{
    "data_type": "varchar(256)", # str
    "default": None, # Any
    "identity": None, # tuple
    "encode": None, # str
    "distkey": False,
    "sortkey": False,
    "nullable": True,
    "unique": False,
    "primary_key": False,
    "foreign_key": False,
    "references": None, # str
    "like": None, # str
}
Returns:Single column definition for Redshift.
Return type:str

red_panda.aws.s3 module

class red_panda.aws.s3.S3Utils(aws_config: dict)

Bases: red_panda.aws.AWSUtils

AWS S3 operations.

Parameters:aws_config – AWS configuration.
aws_config

AWS configuration.

Type:dict
create_bucket(bucket: str, error: str = 'warn', **kwargs)

Check and create bucket.

Parameters:
  • bucket – S3 bucket name.
  • error (optional) – Specify warn or raise or silent. How to handle if bucket already exists. Default is warn.
  • **kwargs – Additional keyword arguments for creating bucket.
Returns:

The response from boto3.create_bucket.

delete_bucket(bucket: str)

Empty and delete bucket.

Parameters:bucket – S3 bucket name.
delete_from_s3(bucket: str, key: str)

Delete object from S3.

Parameters:
  • bucket – S3 bucket name.
  • key – S3 key.
df_to_s3(df: pandas.core.frame.DataFrame, bucket: str, key: str, **kwargs)

Put DataFrame to S3.

Parameters:
  • df – Source dataframe.
  • bucket – S3 bucket name.
  • key – S3 key.
  • **kwargs – kwargs for boto3.Bucket.put_object and pandas.DataFrame.to_csv.
file_to_s3(file_name: str, bucket: str, key: str, **kwargs)

Put a file to S3.

Parameters:
  • file_name – Local file name.
  • bucket – S3 bucket name.
  • key – S3 key.
  • **kwargs – ExtraArgs for boto3.client.upload_file.
get_s3_client()

Return a boto3 S3 client

get_s3_resource()

Return a boto3 S3 resource

list_buckets() → list

List all buckets.

Returns:All S3 buckets for the account.
list_object_keys(bucket: str, prefix: str = '') → list

List all object keys.

Parameters:
  • bucket – Bucket name.
  • prefix – Any prefix for the object.
Returns:

A list of all objects in a bucket given certain prefix.

s3_folder_to_df(bucket: str, folder: str, prefix: str = None, **kwargs)

Read all files in folder with prefix to a df.

Parameters:
  • bucket – S3 bucket name.
  • folder – S3 folder.
  • prefix – File prefix.
Returns:

A DataFrame.

s3_to_df(bucket: str, key: str, **kwargs)

Read S3 object into memory as DataFrame

Only supporting delimited files. Default is tab delimited files.

Parameters:
  • bucket – S3 bucket name.
  • key – S3 key.
  • **kwargs – kwargs for pandas.read_table and boto3.client.get_object.
Returns:

A DataFrame.

s3_to_file(bucket: str, key: str, file_name: str, **kwargs)

Download S3 object as local file.

Parameters:
  • bucket – S3 bucket name.
  • key – S3 key.
  • file_name – Local file name.
  • **kwargs – kwargs for boto3.client.download_file.
s3_to_obj(bucket: str, key: str, **kwargs) → _io.BytesIO

Read S3 object into memory as BytesIO.

Parameters:
  • bucket – S3 bucket name.
  • key – S3 key.
  • **kwargs – kwargs for boto3.client.get_object.

Module contents

class red_panda.aws.AWSUtils(aws_config: dict)

Bases: object

Base class for AWS operations.

Parameters:aws_config – AWS configuration.
aws_config

AWS configuration.

Type:dict