A .flake8 => .flake8 +7 -0
@@ 0,0 1,7 @@
+[flake8]
+max-line-length = 88
+extend-ignore =
+ # See https://github.com/PyCQA/pycodestyle/issues/373
+per-file-ignores =
+ # ignoring since the connection is being held to cause failure in the sub process.
+ test_postgres.py:F841
M LICENSE => LICENSE +1 -1
@@ 186,7 186,7 @@
same "printed page" as the copyright notice for easier
identification within third-party archives.
- Copyright 2019 Alexander Hagerman
+ Copyright 2019 Elastic
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
M README.md => README.md +1 -1
@@ 20,7 20,7 @@ from postgres import AdvisoryLock, DatabaseConfig
dbconfig = DatabaseConfig()
-async with AdvisoryLock(dbconfig, "gold_leader") as connection:
+async with AdvisoryLock("gold_leader", dbconfig) as connection:
# application code
```
M postgres.py => postgres.py +63 -22
@@ 1,7 1,13 @@
import os
import uuid
+import random
+import asyncio
+import logging
import asyncpg
from dataclasses import dataclass
+from typing import Optional
+
+logging.getLogger(__name__).addHandler(logging.NullHandler())
class AdvisoryLockException(Exception):
@@ 9,6 15,7 @@ class AdvisoryLockException(Exception):
pass
+
@dataclass
class DatabaseConfig:
db_user: str = os.environ["DATABASE_USER"]
@@ 19,12 26,12 @@ class DatabaseConfig:
class AdvisoryLock:
"""
- Setup an advisory lock in Postgres to make sure only one
+ Setup an advisory lock in postgres to make sure only one
instance of the application is processing to maintain read/write
safety.
Intended usage:
- >>> async with AdvisoryLock(config, "gold_leader") as connection:
+ >>> async with AdvisoryLock("gold_leader") as connection:
>>> async with connection.transaction():
>>> async for record in connection.cursor(query):
>>> # do something
@@ 35,59 42,93 @@ class AdvisoryLock:
- `self.connection_app_name` - the name of the connection as seen by postgres
- `self.got_lock` - `True` if the lock was acquired successfully.
+ `self.retries` is available—defaulting to 0—to be used in cases when retrying
+ acquisition of a lock makes sense. When set, that amount of retries will
+ be made to acquire the lock, using a randomly increasing amount of sleep
+ time in between attempts, before either returning the lock or raising
+ AdvisoryLockException.
+
:return: connection for which the lock is taken. Use this connection to interact
with the DB.
.. seealso::
-
https://www.postgresql.org/docs/9.4/static/explicit-locking.html#ADVISORY-LOCKS
"""
- def __init__(self, config, lock_name: str):
- self.config = config
+ def __init__(
+ self,
+ lock_name: str,
+ config: Optional[DatabaseConfig] = None,
+ retries: Optional[int] = 0,
+ ):
+
+ self.config = config or DatabaseConfig()
self.lock_name = lock_name
+ self.retries = retries
self.connection_app_name = f"{uuid.uuid4()}-{self.lock_name}-lock"
self.got_lock = False
self.locked_connection = None
+ self._transaction = None
async def _release(self):
"""
- Release the advisory lock when we leave the scope of the
- context manager.
+ Release the advisory lock when we leave the scope of the AdvisoryLock.
"""
self.got_lock = False
await self.locked_connection.execute("SELECT pg_advisory_unlock_all()")
await self.locked_connection.close()
self.locked_connection = None
- async def _set_lock(self):
+ async def _get_lock(self) -> bool:
"""
Make use of stored procedure in Postgres to acquire an application
lock managed in Postgres so that we can have multiple instances of
- the billing framework running but only one billing to prevent
- duplicated billing and tracking.
+ running but only one with control.
+
+ This coroutine will attempt to retry the lock acquisition up to
+ the amount set in `self.retries`, by sleeping a randomized amount
+ between 0 and 1 seconds between attempts.
.. seealso::
https://www.postgresql.org/docs/9.4/explicit-locking.html#ADVISORY-LOCKS
"""
- self.got_lock = await self.locked_connection.fetchval("""SELECT pg_try_advisory_lock( ('x'||substr(md5($1),1,16))::bit(64)::bigint );""", self.lock_name)
+ attempts = 0
+ max_attempts = 1 + self.retries
+ sleep_time = 0
+
+ got_lock = False
+ while attempts <= max_attempts:
+ got_lock: bool = await self.locked_connection.fetchval(
+ """SELECT pg_try_advisory_lock( ('x'||substr(md5($1),1,16))::bit(64)::bigint );""", # noqa
+ self.lock_name,
+ )
+ if got_lock:
+ return True
+ elif self.retries == 0:
+ return False
+ else:
+ attempts += 1
+ logging.debug(
+ "Retrying AdvisoryLock", name=self.lock_name, attempt=attempts
+ )
+ # If we failed to get the lock, sleep a randomly increasing
+ # amount of time by fractions of one second, and then retry.
+ sleep_time += random.random()
+ await asyncio.sleep(sleep_time)
+ return False
async def __aenter__(self) -> asyncpg.connection.Connection:
"""
Acquire connection from the pool, start transactions
and manage committing or rolling back the transactions
depending on the success or failure of operations.
-
- All transactions with PostgresAdvisoryLock are autocommit
+ All transactions with AdvisoryLock are autocommit
just like asyncpg. To manage transactions inside of
- PostgresAdvisoryLock manually use:
-
- >>> async with AdvisoryLock(config, "my_lock") as connection:
+ AdvisoryLock manually use:
+ >>> async with AdvisoryLock(config, "gold_leader") as connection:
>>> async with connection.transaction():
-
or for full control:
-
- >>> async with AdvisoryLock(config, "my_lock") as connection:
+ >>> async with AdvisoryLock(config, "gold_leader") as connection:
>>> local_transaction = connection.transaction()
>>> await local_transaction.start()
>>> try:
@@ 97,7 138,6 @@ class AdvisoryLock:
>>> raise
>>> else:
>>> await local_transaction.commit()
-
... seealso::
https://magicstack.github.io/asyncpg/current/api/index.html#transactions
"""
@@ 108,8 148,9 @@ class AdvisoryLock:
password=self.config.db_pass,
server_settings={"application_name": self.connection_app_name},
)
- await self._set_lock()
- if self.got_lock:
+
+ if await self._get_lock():
+ self.got_lock = True
return self.locked_connection
else:
if self.locked_connection:
M test_postgres.py => test_postgres.py +43 -25
@@ 9,7 9,9 @@ class Test_postgres_advisory_lock(asynctest.TestCase):
self.dbconfig = DatabaseConfig()
async def test_get_results_with_lock(self):
- async with AdvisoryLock(self.dbconfig, "gold_leader") as connection:
+ async with AdvisoryLock(
+ lock_name="gold_leader", config=self.dbconfig
+ ) as connection:
val = await connection.fetchrow("SELECT 1;")
self.assertEqual(val[0], 1)
@@ 17,11 19,11 @@ class Test_postgres_advisory_lock(asynctest.TestCase):
async def test_lock_prevents_second_lock(self):
with self.assertRaises(AdvisoryLockException):
async with AdvisoryLock(
- self.dbconfig, "gold_leader"
+ lock_name="gold_leader", config=self.dbconfig
) as connection:
await connection.fetchrow("SELECT 1;")
async with AdvisoryLock(
- self.dbconfig, "gold_leader"
+ lock_name="gold_leader", config=self.dbconfig
) as second_connection:
await second_connection.fetchrow("SELECT 1;")
@@ 30,30 32,25 @@ class Test_postgres_advisory_lock(asynctest.TestCase):
# and attempts to acquire a Postgres advisory lock.
executable = """
- import asyncio
- from postgres import PostgresAdvisoryLock, DatabaseConfig
-
-
- async def main():
- dbconfig = DatabaseConfig()
-
- async with PostgresAdvisoryLock(dbconfig, "gold_leader") as connection:
- while True:
- await asyncio.sleep(0.1)
-
-
- if __name__ == "__main__":
- asyncio.run(main())
- """
-
+import asyncio
+import postgres
+async def main():
+ dbconfig = postgres.DatabaseConfig()
+ async with postgres.AdvisoryLock(
+ lock_name="gold_leader", config=dbconfig
+ ) as connection:
+ while True:
+ await asyncio.sleep(0.1)
+if __name__ == "__main__":
+ asyncio.run(main())
+"""
with self.assertRaises(AdvisoryLockException):
- async with AdvisoryLock(self.dbconfig, "gold_leader") as connection:
+ async with AdvisoryLock(
+ lock_name="gold_leader", config=self.dbconfig
+ ):
proc = await asyncio.subprocess.create_subprocess_exec(
- sys.executable,
- "-c",
- executable,
- stderr=asyncio.subprocess.PIPE,
+ sys.executable, "-c", executable, stderr=asyncio.subprocess.PIPE
)
err = await proc.stderr.read()
@@ 62,5 59,26 @@ class Test_postgres_advisory_lock(asynctest.TestCase):
# wait the subprocess so that transports are cleaned up.
await proc.wait()
- if "PostgresAdvisoryLockException" in err:
+ if "AdvisoryLockException" in err:
raise AdvisoryLockException
+
+ async def test_lock_retries(self):
+ lock_name = "test_retries"
+
+ async def _fn():
+ async with AdvisoryLock(lock_name=lock_name, config=self.dbconfig):
+ await asyncio.sleep(2)
+
+ test_task = self.loop.create_task(_fn())
+ # Sleep briefly to let _fn become scheduled and acquire the lock first.
+ await asyncio.sleep(0.2)
+
+ # Retry a lot of times (too many, really) so we can be sure we get the lock.
+ async with AdvisoryLock(
+ lock_name=lock_name, config=self.dbconfig, retries=20
+ ):
+ # If we don't get the lock, AdvisoryLockException will fail this.
+ pass
+
+ await test_task
+