patroni.api module

Implement Patroni’s REST API.

Exposes a REST API of patroni operations functions, such as status, performance and management to web clients.

Much of what can be achieved with the command line tool patronictl can be done via the API. Patroni CLI and daemon utilises the API to perform these functions.

class patroni.api.RestApiHandler(request: Any, client_address: Any, server: Union[RestApiServer, http.server.HTTPServer])

Bases: http.server.BaseHTTPRequestHandler

Define how to handle each of the requests that are made against the REST API server.

do_DELETE_restart(*args, **kwargs) → None
do_DELETE_switchover(*args, **kwargs) → None
do_GET(write_status_code_only: bool = False) → None

Process all GET requests which can not be routed to other methods.

Is used for handling all health-checks requests. E.g. “GET /(primary|replica|sync|async|etc…)”.

The (optional) query parameters and the HTTP response status depend on the requested path:

  • /, primary, or read-write:

    • HTTP status 200: if a primary with the leader lock.
  • /standby-leader:

    • HTTP status 200: if holds the leader lock in a standby cluster.
  • /leader:

    • HTTP status 200: if holds the leader lock.
  • /replica:

    • Query parameters:

      • lag: only accept replication lag up to lag. Accepts either an int, which
        represents lag in bytes, or a str representing lag in human-readable format (e.g. 10MB).
      • Any custom parameter: will attempt to match them against node tags.
    • HTTP status 200: if up and running as a standby and without noloadbalance tag.

  • /read-only:

    • HTTP status 200: if up and running and without noloadbalance tag.
  • /synchronous or /sync:

    • HTTP status 200: if up and running as a synchronous standby.
  • /read-only-sync:

    • HTTP status 200: if up and running as a synchronous standby or primary.
  • /asynchronous:

    • Query parameters:

      • lag: only accept replication lag up to lag. Accepts either an int, which
        represents lag in bytes, or a str representing lag in human-readable format (e.g. 10MB).
    • HTTP status 200: if up and running as an asynchronous standby.

  • /health:

    • HTTP status 200: if up and running.

Note

If not able to honor the query parameter, or not able to match the condition described for HTTP status 200 in each path above, then HTTP status will be 503.

Note

Independently of the requested path, if write_status_code_only is False, then it always write an HTTP response through _write_status_response(), with the node status.

Parameters:write_status_code_only – indicates that instead of a normal HTTP response we should send only the HTTP Status Code and close the connection. Useful when health-checks are executed by HAProxy.
do_GET_cluster() → None

Handle a GET request to /cluster path.

Write an HTTP response with JSON content based on the output of cluster_as_json(), with HTTP status 200 and the JSON representation of the cluster topology.

do_GET_config() → None

Handle a GET request to /config path.

Write an HTTP response with a JSON content representing the Patroni configuration that is stored in the DCS, with HTTP status 200.

If the cluster information is not available in the DCS, then it will respond with no body and HTTP status 502 instead.

do_GET_failsafe() → None

Handle a GET request to /failsafe path.

Writes a response with a JSON string body containing all nodes that are known to Patroni at a given point in time, with HTTP status 200. The JSON contains a dictionary, each key is the name of the Patroni node, and the corresponding value is the URI to access /patroni path of its REST API.

Note

If failsafe_mode is not enabled, then write a response with HTTP status 502.

do_GET_history() → None

Handle a GET request to /history path.

Write an HTTP response with a JSON content representing the history of events in the cluster, with HTTP status 200.

The response contains a list of failover/switchover events. Each item is a list with the following items:

  • Timeline when the event occurred (class:int);
  • LSN at which the event occurred (class:int);
  • The reason for the event (class:str);
  • Timestamp when the new timeline was created (class:str);
  • Name of the involved Patroni node (class:str).
do_GET_liveness() → None

Handle a GET request to /liveness path.

Write a simple HTTP response with HTTP status:

  • 200:

    • If the cluster is in maintenance mode; or
    • If Patroni heartbeat loop is properly running;
  • 503:

    • if Patroni heartbeat loop last run was more than ttl setting ago on the primary (or twice the
      value of ttl on a replica).
do_GET_metrics() → None

Handle a GET request to /metrics path.

Write an HTTP response with plain text content in the format used by Prometheus, with HTTP status 200.

The response contains the following items:

  • patroni_version: Patroni version without periods, e.g. 030002 for Patroni 3.0.2;
  • patroni_postgres_running: 1 if PostgreSQL is running, else 0;
  • patroni_postmaster_start_time: epoch timestamp since Postmaster was started;
  • patroni_master: 1 if this node holds the leader lock, else 0;
  • patroni_primary: same as patroni_master;
  • patroni_xlog_location: pg_wal_lsn_diff(pg_current_wal_flush_lsn(), '0/0') if leader, else 0;
  • patroni_standby_leader: 1 if standby leader node, else 0;
  • patroni_replica: 1 if a replica, else 0;
  • patroni_sync_standby: 1 if a sync replica, else 0;
  • patroni_xlog_received_location: pg_wal_lsn_diff(pg_last_wal_receive_lsn(), '0/0');
  • patroni_xlog_replayed_location: pg_wal_lsn_diff(pg_last_wal_replay_lsn(), '0/0);
  • patroni_xlog_replayed_timestamp: pg_last_xact_replay_timestamp;
  • patroni_xlog_paused: pg_is_wal_replay_paused();
  • patroni_postgres_server_version: Postgres version without periods, e.g. 150002 for Postgres 15.2;
  • patroni_cluster_unlocked: 1 if no one holds the leader lock, else 0;
  • patroni_failsafe_mode_is_active: 1 if failsafe_mode is currently active, else 0;
  • patroni_postgres_timeline: PostgreSQL timeline based on current WAL file name;
  • patroni_dcs_last_seen: epoch timestamp when DCS was last contacted successfully;
  • patroni_pending_restart: 1 if this PostgreSQL node is pending a restart, else 0;
  • patroni_is_paused: 1 if Patroni is in maintenance node, else 0.

For PostgreSQL v9.6+ the response will also have the following:

  • patroni_postgres_streaming: 1 if Postgres is streaming from another node, else 0;
  • patroni_postgres_in_archive_recovery: 1 if Postgres isn’t streaming and there is restore_command available, else 0.
do_GET_patroni() → None

Handle a GET request to /patroni path.

Write an HTTP response through _write_status_response(), with HTTP status 200 and the status of Postgres.

do_GET_readiness() → None

Handle a GET request to /readiness path.

Write a simple HTTP response which HTTP status can be:

  • 200:

    • If this Patroni node holds the DCS leader lock; or
    • If this PostgreSQL instance is up and running;
  • 503: if none of the previous conditions apply.

do_HEAD() → None

Handle a HEAD request.

Write a simple HTTP response that represents the current PostgreSQL status. Send only 200 OK or 503 Service Unavailable as a response and nothing more, particularly no headers.

do_OPTIONS() → None

Handle an OPTIONS request.

Write a simple HTTP response that represents the current PostgreSQL status. Send only 200 OK or 503 Service Unavailable as a response and nothing more, particularly no headers.

do_PATCH_config(*args, **kwargs) → None
do_POST_citus(*args, **kwargs) → None
do_POST_failover(*args, **kwargs) → None
do_POST_failsafe(*args, **kwargs) → None
do_POST_reinitialize(*args, **kwargs) → None
do_POST_reload(*args, **kwargs) → None
do_POST_restart(*args, **kwargs) → None
do_POST_sigterm(*args, **kwargs) → None
do_POST_switchover() → None

Handle a POST request to /switchover path.

Calls do_POST_failover() with switchover option.

do_PUT_config(*args, **kwargs) → None
get_postgresql_status(retry: bool = False) → Dict[str, Any]

Builds an object representing a status of “postgres”.

Some of the values are collected by executing a query and other are taken from the state stored in memory.

Parameters:retry – whether the query should be retried if failed or give up immediately
Returns:a dict with the status of Postgres/Patroni. The keys are:
  • state: Postgres state among stopping, stopped, stop failed, crashed, running, starting, start failed, restarting, restart failed, initializing new cluster, initdb failed, running custom bootstrap script, custom bootstrap failed, creating replica, or unknown;
  • postmaster_start_time: pg_postmaster_start_time();
  • role: replica or master based on pg_is_in_recovery() output;
  • server_version: Postgres version without periods, e.g. 150002 for Postgres 15.2;
  • xlog: dictionary. Its structure depends on role:
    • If master:
      • location: pg_current_wal_flush_lsn()
    • If replica:
      • received_location: pg_wal_lsn_diff(pg_last_wal_receive_lsn(), '0/0');
      • replayed_location: pg_wal_lsn_diff(pg_last_wal_replay_lsn(), '0/0);
      • replayed_timestamp: pg_last_xact_replay_timestamp;
      • paused: pg_is_wal_replay_paused();
  • sync_standby: True if replication mode is synchronous and this is a sync standby;
  • timeline: PostgreSQL primary node timeline;
  • replication: list of dict entries, one for each replication connection. Each entry
    contains the following keys:
    • application_name: pg_stat_activity.application_name;
    • client_addr: pg_stat_activity.client_addr;
    • state: pg_stat_replication.state;
    • sync_priority: pg_stat_replication.sync_priority;
    • sync_state: pg_stat_replication.sync_state;
    • usename: pg_stat_activity.usename.
  • pause: True if cluster is in maintenance mode;
  • cluster_unlocked: True if cluster has no node holding the leader lock;
  • failsafe_mode_is_active: True if DCS failsafe mode is currently active;
  • dcs_last_seen: epoch timestamp DCS was last reached by Patroni.
handle_one_request() → None

Parse and dispatch a request to the appropriate do_* method.

Note

This is only used to keep track of latency when logging messages through log_message().

is_failover_possible(cluster: patroni.dcs.Cluster, leader: Optional[str], candidate: Optional[str], action: str) → Optional[str]

Checks whether there are nodes that could take over after demoting the primary.

Parameters:
  • cluster – the Patroni cluster.
  • leader – name of the current Patroni leader.
  • candidate – name of the Patroni node to be promoted.
  • action – the action to be performed (switchover or failover).
Returns:

a string with the error message or None if good nodes are found.

log_message(format: str, *args) → None

Log a custom debug message.

Additionally, to format, the log entry contains the client IP address and the current latency of the request.

Parameters:
  • format – printf-style format string message to be logged.
  • args – arguments to be applied as inputs to format.
parse_request() → bool

Override parse_request() to enrich basic functionality of BaseHTTPRequestHandler.

Original class can only invoke do_GET(), do_POST(), do_PUT(), etc method implementations if they are defined.

But we would like to have at least some simple routing mechanism, i.e.:

  • GET /uri1/part2 request should invoke do_GET_uri1()
  • POST /other should invoke do_POST_other()

If the do_ method does not exist we’ll fall back to original behavior.

Returns:True for success, False for failure; on failure, any relevant error response has already been sent back.
static parse_schedule(schedule: str, action: str) → Tuple[Optional[int], Optional[str], Optional[datetime.datetime]]

Parse the given schedule and validate it.

Parameters:
  • schedule – a string representing a timestamp, e.g. 2023-04-14T20:27:00+00:00.
  • action – the action to be scheduled (restart, switchover, or failover).
Returns:

a tuple composed of 3 items:

  • Suggested HTTP status code for a response:

    • None: if no issue was faced while parsing, leaving it up to the caller to decide the status; or
    • 400: if no timezone information could be found in schedule; or
    • 422: if schedule is invalid – in the past or not parsable.
  • An error message, if any error is faced, otherwise None;

  • Parsed schedule, if able to parse, otherwise None.

poll_failover_result(leader: Optional[str], candidate: Optional[str], action: str) → Tuple[int, str]

Poll failover/switchover operation until it finishes or times out.

Parameters:
  • leader – name of the current Patroni leader.
  • candidate – name of the Patroni node to be promoted.
  • action – the action that is ongoing (switchover or failover).
Returns:

a tuple composed of 2 items:

  • Response HTTP status codes:

    • 200: if the operation succeeded; or
    • 503: if the operation failed or timed out.
  • A status message about the operation.

query(sql: str, *params, **kwargs) → List[Tuple[Any, ...]]

Execute sql query with params.

Parameters:
  • sql – the SQL statement to be run.
  • params – positional arguments to call RestApiServer.query() with.
  • kwargs – can contain the key retry. If the key is present its value should be a bool which indicates whether the query should be retried upon failure or given up immediately.
Returns:

a list of rows that were fetched from the database.

write_response(status_code: int, body: str, content_type: str = 'text/html', headers: Optional[Dict[str, str]] = None) → None

Write an HTTP response.

Note

Besides Content-Type header, and the HTTP headers passed through headers, this function will also write the HTTP headers defined through restapi.http_extra_headers and restapi.https_extra_headers from Patroni configuration.

Parameters:
  • status_code – response HTTP status code.
  • body – response body.
  • content_type – value for Content-Type HTTP header.
  • headers – dictionary of additional HTTP headers to set for the response. Each key is the header name, and the corresponding value is the value for the header in the response.
class patroni.api.RestApiServer(patroni: patroni.__main__.Patroni, config: Dict[str, Any])

Bases: socketserver.ThreadingMixIn, http.server.HTTPServer, threading.Thread

Patroni REST API server.

An asynchronous thread-based HTTP server.

check_access(rh: patroni.api.RestApiHandler) → Optional[bool]

Ensure client has enough privileges to perform a given request.

Write a response back to the client if any issue is observed, and the HTTP status may be:

  • 401: if Authorization header is missing or contain an invalid password;

  • 403: if:

    • restapi.allowlist was configured, but client IP is not in the allowed list; or
    • restapi.allowlist_include_members is enabled, but client IP is not in the members list; or
    • a client certificate is expected by the server, but is missing in the request.
Parameters:rh – the request which access should be checked.
Returns:True if client access verification succeeded, otherwise None.
check_auth_header(auth_header: Optional[str]) → Optional[str]

Validate HTTP Basic authorization header, if present.

Parameters:auth_header – value of Authorization HTTP header, if present, else None.
Returns:an error message if any issue is found, None otherwise.
check_basic_auth_key(key: str) → bool

Check if key matches the password configured for the REST API.

Parameters:key – the password received through the Basic authorization header of an HTTP request.
Returns:True if key matches the password configured for the REST API.
daemon_threads = True
get_certificate_serial_number() → Optional[str]

Get serial number of the certificate used by the REST API.

Returns:serial number of the certificate configured through restapi.certfile setting.
handle_error(request: Union[socket.socket, Tuple[bytes, socket.socket]], client_address: Tuple[str, int]) → None

Handle any exception that is thrown while handling a request to the REST API.

Logs WARNING messages with the client information, and the stack trace of the faced exception.

Parameters:
  • request – the request that faced an exception.
  • client_address – a tuple composed of the IP and port of the client connection.
process_request_thread(request: Union[socket.socket, Tuple[bytes, socket.socket]], client_address: Tuple[str, int]) → None

Process a request to the REST API.

Wrapper for process_request_thread() that additionally:

  • Enable TCP keepalive
  • Perform SSL handshake (if an SSL socket).
Parameters:
  • request – socket to handle the client request.
  • client_address – tuple containing the client IP and port.
query(sql: str, *params) → List[Tuple[Any, ...]]

Execute sql query with params.

Parameters:
  • sql – the SQL statement to be run.
  • params – positional arguments to be used as parameters for sql.
Returns:

a list of rows that were fetched from the database.

Raises:

psycopg.Error: if had issues while executing sql. PostgresConnectionException: if had issues while connecting to the database.

reload_config(config: Dict[str, Any]) → None

Reload REST API configuration.

Parameters:config – dictionary representing values under the restapi configuration section.
Raises:ValueError: if listen key is not present in config.
reload_local_certificate() → Optional[bool]

Reload the SSL certificate used by the REST API.

Returns:True if a different certificate has been configured through restapi.certfile` setting, ``None otherwise.
shutdown_request(request: Union[socket.socket, Tuple[bytes, socket.socket]]) → None

Shut down a request to the REST API.

Wrapper for http.server.HTTPServer.shutdown_request() that additionally:

  • Perform SSL shutdown handshake (if a SSL socket).
Parameters:request – socket to handle the client request.
patroni.api.check_access(func: Callable[[RestApiHandler], None]) → Callable[[...], None]

Check the source ip, authorization header, or client certificates.

Note

The actual logic to check access is implemented through RestApiServer.check_access().

Parameters:

func – function to be decorated.

Returns:

a decorator that executes func only if RestApiServer.check_access() returns True.

Example:
>>> class FooServer:
...   def check_access(self, *args, **kwargs):
...     print(f'In FooServer: {args[0].__class__.__name__}')
...     return True
...
>>> class Foo:
...   server = FooServer()
...   @check_access
...   def do_PUT_foo(self):
...      print('In do_PUT_foo')
>>> f = Foo()
>>> f.do_PUT_foo()
In FooServer: Foo
In do_PUT_foo