Python with custom endpoint
from google.cloud import storage
from google.oauth2 import service_account
class GCSConfig(CloudConfig):
"""Google Cloud Storage configuration."""
def __init__(self, config_dict: Dict[str, Any]):
super().__init__(config_dict)
required_fields = ['bucket_name', 'custom_endpoint', 'client_email', 'secret_key']
missing = [field for field in required_fields if field not in config_dict]
if missing:
raise ValueError(f"Missing required GCS configuration fields: {', '.join(missing)}")
def gcs_client(self, gcs_config: GCSConfig):
"""Create a GCS client using the provided configuration."""
# Log the configuration
logger.debug(f"Creating GCS client with endpoint: {gcs_config.custom_endpoint}")
logger.debug(f"Token URI: {gcs_config.custom_endpoint + '/token'}")
# Create service account credentials
credentials_info = {
'type': 'service_account',
'private_key': gcs_config.secret_key,
'client_email': gcs_config.client_email,
'token_uri': gcs_config.custom_endpoint + '/token'
}
logger.debug(f"Credentials info size: {len(str(credentials_info))} bytes")
credentials = service_account.Credentials.from_service_account_info(credentials_info)
# Create client with custom endpoint
client_kwargs = {
'credentials': credentials,
'client_options': {
'api_endpoint': gcs_config.custom_endpoint
}
}
# Add project_id only if it exists
if hasattr(gcs_config, 'project_id'):
client_kwargs['project'] = gcs_config.project_id
client = storage.Client(**client_kwargs)
logger.debug(f"Created GCS client with options: {client_kwargs}")
return client
def test_bucket_exists(self, gcs_client, gcs_config):
"""Test if the configured bucket exists."""
try:
bucket = gcs_client.get_bucket(gcs_config.bucket_name)
assert bucket.name == gcs_config.bucket_name
logger.debug(f"Successfully found bucket: {bucket.name}")
except Exception as e:
write_trace(e)
pytest.fail(f"Bucket {gcs_config.bucket_name} does not exist or is not accessible: {str(e)}")
Last updated
Was this helpful?