Python with custom endpoint

from google.cloud import storage
from google.oauth2 import service_account

class GCSConfig(CloudConfig):
    """Google Cloud Storage configuration."""
    def __init__(self, config_dict: Dict[str, Any]):
        super().__init__(config_dict)
        required_fields = ['bucket_name', 'custom_endpoint', 'client_email', 'secret_key']
        missing = [field for field in required_fields if field not in config_dict]
        if missing:
            raise ValueError(f"Missing required GCS configuration fields: {', '.join(missing)}")

def gcs_client(self, gcs_config: GCSConfig):
    """Create a GCS client using the provided configuration."""
    # Log the configuration
    logger.debug(f"Creating GCS client with endpoint: {gcs_config.custom_endpoint}")
    logger.debug(f"Token URI: {gcs_config.custom_endpoint + '/token'}")
    
    # Create service account credentials
    credentials_info = {
        'type': 'service_account',
        'private_key': gcs_config.secret_key,
        'client_email': gcs_config.client_email,
        'token_uri': gcs_config.custom_endpoint + '/token'
    }
    
    logger.debug(f"Credentials info size: {len(str(credentials_info))} bytes")
    
    credentials = service_account.Credentials.from_service_account_info(credentials_info)
    
    # Create client with custom endpoint
    client_kwargs = {
        'credentials': credentials,
        'client_options': {
            'api_endpoint': gcs_config.custom_endpoint
        }
    }
    
    # Add project_id only if it exists
    if hasattr(gcs_config, 'project_id'):
        client_kwargs['project'] = gcs_config.project_id

    client = storage.Client(**client_kwargs)
    logger.debug(f"Created GCS client with options: {client_kwargs}")
    return client

def test_bucket_exists(self, gcs_client, gcs_config):
    """Test if the configured bucket exists."""
    try:
        bucket = gcs_client.get_bucket(gcs_config.bucket_name)
        assert bucket.name == gcs_config.bucket_name
        logger.debug(f"Successfully found bucket: {bucket.name}")
    except Exception as e:
        write_trace(e)
        pytest.fail(f"Bucket {gcs_config.bucket_name} does not exist or is not accessible: {str(e)}")

Last updated

Was this helpful?