Skip to content

Commit

Permalink
Prepare prefect-redis for 0.2.0 release (#15409)
Browse files Browse the repository at this point in the history
  • Loading branch information
desertaxle authored Sep 19, 2024
1 parent 6b8cc51 commit 3e58710
Show file tree
Hide file tree
Showing 13 changed files with 355 additions and 202 deletions.
34 changes: 34 additions & 0 deletions docs/integrations/prefect-redis/index.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
---
title: prefect-redis
---

Integrations to extend Prefect's functionality with Redis.

## Getting started

### Install `prefect-redis`

The following command will install a version of `prefect-redis` compatible with your installed version of `prefect`.
If you don't already have `prefect` installed, it will install the newest version of `prefect` as well.

```bash
pip install "prefect[redis]"
```

Upgrade to the latest versions of `prefect` and `prefect-redis`:

```bash
pip install -U "prefect[redis]"
```

### Register newly installed block types

Register the block types in the `prefect-redis` module to make them available for use.

```bash
prefect block register -m prefect_redis
```

## Resources

Refer to the [SDK documentation](https://prefect-python-sdk-docs.netlify.app/prefect_redis/) to explore all the capabilities of `prefect-redis`.
4 changes: 4 additions & 0 deletions docs/integrations/prefect-redis/sdk.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
---
title: "SDK docs"
url: "https://prefect-python-sdk-docs.netlify.app/prefect_redis/"
---
7 changes: 7 additions & 0 deletions docs/mint.json
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,13 @@
"integrations/prefect-ray/sdk"
]
},
{
"group": "Redis",
"pages": [
"integrations/prefect-redis/index",
"integrations/prefect-redis/sdk"
]
},
{
"group": "Shell",
"pages": [
Expand Down
3 changes: 1 addition & 2 deletions scripts/generate_sdk_docs.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,7 @@ def packages() -> Generator[tuple[str, Path], None, None]:
yield "prefect", Path("./src")

for path in sorted(Path("./src/integrations").iterdir()):
if path.is_dir() and not path.name.endswith("redis"):
yield path.name, path
yield path.name, path


if __name__ == "__main__":
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@
"dbt": "prefect-dbt>=0.6.0rc1",
"snowflake": "prefect-snowflake>=0.28.0rc1",
"sqlalchemy": "prefect-sqlalchemy>=0.5.0rc1",
"redis": "redis>=5.0.1",
"redis": "prefect-redis>=0.2.0",
# Monitoring extras
"email": "prefect-email>=0.4.0rc1",
"slack": "prefect-slack>=0.3.0rc1",
Expand Down
5 changes: 3 additions & 2 deletions src/integrations/prefect-redis/prefect_redis/__init__.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
from .credentials import RedisCredentials
from .redis import (
from .blocks import RedisDatabase
from .tasks import (
redis_set,
redis_get,
redis_set_binary,
redis_get_binary,
redis_execute,
)
from .locking import RedisLockManager
from . import _version

__version__ = _version.__version__
177 changes: 177 additions & 0 deletions src/integrations/prefect-redis/prefect_redis/blocks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
"""Redis credentials handling"""

from typing import Any, Dict, Optional, Union

import redis
import redis.asyncio
from pydantic import Field
from pydantic.types import SecretStr
from redis.asyncio.connection import parse_url

from prefect.filesystems import WritableFileSystem

DEFAULT_PORT = 6379


class RedisDatabase(WritableFileSystem):
"""
Block used to manage authentication with a Redis database
Attributes:
host: The host of the Redis server
port: The port the Redis server is running on
db: The database to write to and read from
username: The username to use when connecting to the Redis server
password: The password to use when connecting to the Redis server
ssl: Whether to use SSL when connecting to the Redis server
Example:
Create a new block from hostname, username and password:
```python
from prefect_redis import RedisDatabase
block = RedisDatabase(
host="myredishost.com", username="redis", password="SuperSecret")
block.save("BLOCK_NAME")
```
Create a new block from a connection string
```python
from prefect_redis import RedisBlock
block = RedisBlock.from_url(""redis://redis:[email protected]:6379")
block.save("BLOCK_NAME")
```
Get Redis client in order to interact directly with Redis
```python
from prefect_redis import RedisBlock
block = RedisBlock.load("BLOCK_NAME")
redis_client = block.get_client()
```
"""

_logo_url = "https://stprododpcmscdnendpoint.azureedge.net/assets/icons/redis.png"
_block_type_name = "Redis Database"

host: str = Field(default="localhost", description="Redis hostname")
port: int = Field(default=DEFAULT_PORT, description="Redis port")
db: int = Field(default=0, description="Redis DB index")
username: Optional[SecretStr] = Field(default=None, description="Redis username")
password: Optional[SecretStr] = Field(default=None, description="Redis password")
ssl: bool = Field(default=False, description="Whether to use SSL")

def block_initialization(self) -> None:
"""Validate parameters"""

if not self.host:
raise ValueError("Missing hostname")
if self.username and not self.password:
raise ValueError("Missing password")

async def read_path(self, path: str) -> bytes:
"""Read a redis key
Args:
path: Redis key to read from
Returns:
Contents at key as bytes
"""
client = self.get_client()
ret = await client.get(path)

await client.close()
return ret

async def write_path(self, path: str, content: bytes) -> None:
"""Write to a redis key
Args:
path: Redis key to write to
content: Binary object to write
"""
client = self.get_client()
ret = await client.set(path, content)

await client.close()
return ret

def get_client(self) -> redis.Redis:
"""Get Redis Client
Returns:
An initialized Redis async client
"""
return redis.Redis(
host=self.host,
port=self.port,
username=self.username.get_secret_value() if self.username else None,
password=self.password.get_secret_value() if self.password else None,
db=self.db,
ssl=self.ssl,
)

def get_async_client(self) -> redis.asyncio.Redis:
"""Get Redis Client
Returns:
An initialized Redis async client
"""
return redis.asyncio.Redis(
host=self.host,
port=self.port,
username=self.username.get_secret_value() if self.username else None,
password=self.password.get_secret_value() if self.password else None,
db=self.db,
ssl=self.ssl,
)

@classmethod
def from_connection_string(
cls, connection_string: Union[str, SecretStr]
) -> "RedisDatabase":
"""Create block from a Redis connection string
Supports the following URL schemes:
- `redis://` creates a TCP socket connection
- `rediss://` creates a SSL wrapped TCP socket connection
Args:
connection_string: Redis connection string
Returns:
`RedisCredentials` instance
"""
connection_kwargs = parse_url(
connection_string
if isinstance(connection_string, str)
else connection_string.get_secret_value()
)
ssl = connection_kwargs.get("connection_class") == redis.asyncio.SSLConnection
return cls(
host=connection_kwargs.get("host", "localhost"),
port=connection_kwargs.get("port", DEFAULT_PORT),
db=connection_kwargs.get("db", 0),
username=connection_kwargs.get("username"),
password=connection_kwargs.get("password"),
ssl=ssl,
)

def as_connection_params(self) -> Dict[str, Any]:
"""
Return a dictionary suitable for unpacking
"""
data = self.model_dump()
data.pop("block_type_slug", None)
# Unwrap SecretStr fields
if self.username is not None:
data["username"] = self.username.get_secret_value()
else:
data.pop("username", None)

if self.password is not None:
data["password"] = self.password.get_secret_value()
else:
data.pop("password", None)

return data
Loading

0 comments on commit 3e58710

Please sign in to comment.