ansible-collection-infomaniak/plugins/module_utils/infomaniak_api_client.py

53 lines
1.8 KiB
Python

import requests
import json
INFOMANIAK_API_URL = "https://api.infomaniak.com"
class InfomaniakAPIClient:
def __init__(self, api_version, api_token):
self.base_url = f"{INFOMANIAK_API_URL}/{api_version}"
self.headers = {
"Authorization": f"Bearer {api_token}",
"Content-Type": "application/json",
"Accept": "application/json",
}
def _request(self, method, endpoint, data=None, params=None):
url = f"{self.base_url}{endpoint}"
try:
response = requests.request(
method=method,
url=url,
headers=self.headers,
json=data,
params=params,
timeout=10,
)
response.raise_for_status()
try:
return response.json()
except json.JSONDecodeError:
raise Exception(f"Invalid JSON response: {response.text}")
except requests.exceptions.HTTPError as http_err:
error_content = response.content if response else "No response"
raise Exception(
f"HTTP error occurred: {http_err}, Response content: {error_content}"
)
except requests.exceptions.RequestException as req_err:
raise Exception(f"Request error occurred: {req_err}")
except Exception as err:
raise Exception(f"An error occurred: {err}")
def get(self, endpoint, params=None):
return self._request("GET", endpoint, params=params)
def post(self, endpoint, data=None):
return self._request("POST", endpoint, data=data)
def put(self, endpoint, data=None):
return self._request("PUT", endpoint, data=data)
def delete(self, endpoint):
return self._request("DELETE", endpoint)