Skip to content

azure_sql_connection

Azure SQL Connection is use to create db connection.

AzureSqlConnection

Source code in physical_operations_utils/azure_utils/azure_sql_connection.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
class AzureSqlConnection:
    def __init__(
        self,
        database: str = "azure_sql_database",
        username: str = None,
        password: str = None,
    ):
        """
        Initializes an Azure SQL connection by fetching credentials and setting up the SQLAlchemy engine.

        Args:
            database (str): The database identifier from the configuration file.
            username (str, optional): The username for authentication. Defaults to None.
            password (str, optional): The password for authentication. Defaults to None.

        Raises:
            KeyError: If the provided database key is not found in the configuration.
            Exception: If the database connection fails.

        Example:
            ```python
            from physical_operations_utils.azure_utils.azure_sql_connection import AzureSqlConnection

            conn = AzureSqlConnection(database="my_db")
            engine = conn.engine  # SQLAlchemy engine
            ```
        """
        setup_environment()

        db_info = get_keys_yaml_file()[database]
        server = db_info["server"]

        if username is None:
            username = db_info["username"]
        if password is None:
            secret = db_info["secret"]
            password = get_secret(secret)

        database = db_info["database"]
        driver = "{ODBC Driver 18 for SQL Server}"

        params = urllib.parse.quote_plus(
            "Driver=%s;" % driver
            + "Server=tcp:%s,1433;" % server
            + "Database=%s;" % database
            + "Uid=%s;" % username
            + "Pwd={%s};" % password
            + "Encrypt=no;"
            + "TrustServerCertificate=no;"
            + "Connection Timeout=180;"
        )

        conn_str = "mssql+pyodbc:///?odbc_connect=" + params
        self.engine = self._create_engine(conn_str)

    @retry(
        wait=wait_fixed(2),
        stop=stop_after_attempt(5),
        before=before_log(logger, logging.DEBUG),
        after=after_log(logger, logging.ERROR),
    )
    def _create_engine(self, conn_str: str):
        """
        Creates a SQLAlchemy engine with retry mechanisms.

        Args:
            conn_str (str): The connection string for the database.

        Returns:
            sqlalchemy.engine.Engine: The SQLAlchemy engine object.
        """
        return create_engine(
            conn_str, fast_executemany=True, pool_recycle=1800, pool_pre_ping=True
        )

__init__(database='azure_sql_database', username=None, password=None)

Initializes an Azure SQL connection by fetching credentials and setting up the SQLAlchemy engine.

Parameters:

Name Type Description Default
database str

The database identifier from the configuration file.

'azure_sql_database'
username str

The username for authentication. Defaults to None.

None
password str

The password for authentication. Defaults to None.

None

Raises:

Type Description
KeyError

If the provided database key is not found in the configuration.

Exception

If the database connection fails.

Example
from physical_operations_utils.azure_utils.azure_sql_connection import AzureSqlConnection

conn = AzureSqlConnection(database="my_db")
engine = conn.engine  # SQLAlchemy engine
Source code in physical_operations_utils/azure_utils/azure_sql_connection.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
def __init__(
    self,
    database: str = "azure_sql_database",
    username: str = None,
    password: str = None,
):
    """
    Initializes an Azure SQL connection by fetching credentials and setting up the SQLAlchemy engine.

    Args:
        database (str): The database identifier from the configuration file.
        username (str, optional): The username for authentication. Defaults to None.
        password (str, optional): The password for authentication. Defaults to None.

    Raises:
        KeyError: If the provided database key is not found in the configuration.
        Exception: If the database connection fails.

    Example:
        ```python
        from physical_operations_utils.azure_utils.azure_sql_connection import AzureSqlConnection

        conn = AzureSqlConnection(database="my_db")
        engine = conn.engine  # SQLAlchemy engine
        ```
    """
    setup_environment()

    db_info = get_keys_yaml_file()[database]
    server = db_info["server"]

    if username is None:
        username = db_info["username"]
    if password is None:
        secret = db_info["secret"]
        password = get_secret(secret)

    database = db_info["database"]
    driver = "{ODBC Driver 18 for SQL Server}"

    params = urllib.parse.quote_plus(
        "Driver=%s;" % driver
        + "Server=tcp:%s,1433;" % server
        + "Database=%s;" % database
        + "Uid=%s;" % username
        + "Pwd={%s};" % password
        + "Encrypt=no;"
        + "TrustServerCertificate=no;"
        + "Connection Timeout=180;"
    )

    conn_str = "mssql+pyodbc:///?odbc_connect=" + params
    self.engine = self._create_engine(conn_str)

get_db_connection(db_connection_id)

Retrieves or creates a cached database connection.

Parameters:

Name Type Description Default
db_connection_id str

The unique identifier of the database.

required

Returns:

Name Type Description
AzureSqlConnection AzureSqlConnection

An instance of the database connection.

Example
from physical_operations_utils.azure_utils.azure_sql_connection import get_db_connection

conn = get_db_connection("my_database")
engine = conn.engine
Source code in physical_operations_utils/azure_utils/azure_sql_connection.py
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
def get_db_connection(db_connection_id: str) -> AzureSqlConnection:
    """
    Retrieves or creates a cached database connection.

    Args:
        db_connection_id (str): The unique identifier of the database.

    Returns:
        AzureSqlConnection: An instance of the database connection.

    Example:
        ```python
        from physical_operations_utils.azure_utils.azure_sql_connection import get_db_connection

        conn = get_db_connection("my_database")
        engine = conn.engine
        ```
    """
    if db_connection_id not in _DB_CONNECTIONS:
        _DB_CONNECTIONS[db_connection_id] = AzureSqlConnection(db_connection_id)
    return _DB_CONNECTIONS[db_connection_id]