Skip to content

Commit

Permalink
Merge branch 'jeffthibault-main'
Browse files Browse the repository at this point in the history
  • Loading branch information
callebtc committed Feb 2, 2023
2 parents c5a050f + 9520aab commit fb3bbca
Show file tree
Hide file tree
Showing 18 changed files with 893 additions and 86 deletions.
6 changes: 5 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,6 @@
venv/
nostr/__pycache__/
__pycache__/
nostr.egg-info/
dist/
nostr/_version.py
.DS_Store
67 changes: 44 additions & 23 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,9 @@ time.sleep(1.25) # allow the connections to open
private_key = PrivateKey()

event = Event(private_key.public_key.hex(), "Hello Nostr")
event.sign(private_key.hex())
private_key.sign_event(event)

message = json.dumps([ClientMessageType.EVENT, event.to_json_object()])
relay_manager.publish_message(message)
relay_manager.publish_event(event)
time.sleep(1) # allow the messages to send

relay_manager.close_connections()
Expand Down Expand Up @@ -90,34 +89,56 @@ while relay_manager.message_pool.has_events():
relay_manager.close_connections()
```

## Installation
1. Clone repository
```bash
git clone https://github.com/jeffthibault/python-nostr.git
```
2. Install dependencies in repo
```bash
python -m venv venv
pip install -r requirements.txt
**NIP-26 delegation**
```python
from nostr.delegation import Delegation
from nostr.event import EventKind, Event
from nostr.key import PrivateKey

# Load your "identity" PK that you'd like to keep safely offline
identity_pk = PrivateKey.from_nsec("nsec1...")

# Create a new, disposable PK as the "delegatee" that can be "hot" in a Nostr client
delegatee_pk = PrivateKey()

# the "identity" PK will authorize "delegatee" to sign TEXT_NOTEs on its behalf for the next month
delegation = Delegation(
delegator_pubkey=identity_pk.public_key.hex(),
delegatee_pubkey=delegatee_pk.public_key.hex(),
event_kind=EventKind.TEXT_NOTE,
duration_secs=30*24*60*60
)

identity_pk.sign_delegation(delegation)

event = Event(
delegatee_pk.public_key.hex(),
"Hello, NIP-26!",
tags=[delegation.get_tag()],
)
delegatee_pk.sign_event(event)

# ...normal broadcast steps...
```

Note: If the pip install fails, you might need to install ```wheel```. Try the following:
The resulting delegation tag can be stored as plaintext and reused as-is by the "delegatee" PK until the delegation token expires. There is no way to revoke a signed delegation, so current best practice is to keep the expiration time relatively short.

Hopefully clients will include an optional field to store the delegation tag. That would allow the "delegatee" PK to seamlessly post messages on the "identity" key's behalf, while the "identity" key stays safely offline in cold storage.


## Installation
```bash
pip install wheel
pip install -r requirements.txt
pip install nostr
```

## Dependencies
- [websocket-client](https://github.com/websocket-client/websocket-client) for websocket operations
- [secp256k1](https://github.com/rustyrussell/secp256k1-py) for key generation, signing, and verifying
- [cryptography](https://github.com/pyca/cryptography) for encrypting and decrypting direct messages

Note: I wrote this with Python 3.9.5.

## Test Suite
See the [Test Suite README](test/README.md)

## Disclaimer
- This library is in very early development and still a WIP.
- This library is in very early development.
- It might have some bugs.
- I need to add tests.
- I will try to publish this as a [PyPI](https://pypi.org/) package at some point.
- I need to add more tests.

Please feel free to add issues, add PRs, or provide any feedback!
32 changes: 32 additions & 0 deletions nostr/delegation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import time
from dataclasses import dataclass


@dataclass
class Delegation:
delegator_pubkey: str
delegatee_pubkey: str
event_kind: int
duration_secs: int = 30*24*60 # default to 30 days
signature: str = None # set in PrivateKey.sign_delegation

@property
def expires(self) -> int:
return int(time.time()) + self.duration_secs

@property
def conditions(self) -> str:
return f"kind={self.event_kind}&created_at<{self.expires}"

@property
def delegation_token(self) -> str:
return f"nostr:delegation:{self.delegatee_pubkey}:{self.conditions}"

def get_tag(self) -> list[str]:
""" Called by Event """
return [
"delegation",
self.delegator_pubkey,
self.conditions,
self.signature,
]
42 changes: 23 additions & 19 deletions nostr/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
from secp256k1 import PrivateKey, PublicKey
from hashlib import sha256

from nostr.message_type import ClientMessageType


class EventKind(IntEnum):
SET_METADATA = 0
TEXT_NOTE = 1
Expand All @@ -12,26 +15,27 @@ class EventKind(IntEnum):
ENCRYPTED_DIRECT_MESSAGE = 4
DELETE = 5


class Event():
def __init__(
self,
public_key: str,
content: str,
created_at: int=int(time.time()),
created_at: int = None,
kind: int=EventKind.TEXT_NOTE,
tags: "list[list[str]]"=[],
id: str=None,
signature: str=None) -> None:
if not isinstance(content, str):
raise TypeError("Argument 'content' must be of type str")

self.id = id if not id is None else Event.compute_id(public_key, created_at, kind, tags, content)

self.public_key = public_key
self.content = content
self.created_at = created_at
self.created_at = created_at or int(time.time())
self.kind = kind
self.tags = tags
self.signature = signature
self.id = id or Event.compute_id(self.public_key, self.created_at, self.kind, self.tags, self.content)

@staticmethod
def serialize(public_key: str, created_at: int, kind: int, tags: "list[list[str]]", content: str) -> bytes:
Expand All @@ -43,23 +47,23 @@ def serialize(public_key: str, created_at: int, kind: int, tags: "list[list[str]
def compute_id(public_key: str, created_at: int, kind: int, tags: "list[list[str]]", content: str) -> str:
return sha256(Event.serialize(public_key, created_at, kind, tags, content)).hexdigest()

def sign(self, private_key_hex: str) -> None:
sk = PrivateKey(bytes.fromhex(private_key_hex))
sig = sk.schnorr_sign(bytes.fromhex(self.id), None, raw=True)
self.signature = sig.hex()

def verify(self) -> bool:
pub_key = PublicKey(bytes.fromhex("02" + self.public_key), True) # add 02 for schnorr (bip340)
event_id = Event.compute_id(self.public_key, self.created_at, self.kind, self.tags, self.content)
return pub_key.schnorr_verify(bytes.fromhex(event_id), bytes.fromhex(self.signature), None, raw=True)

def to_json_object(self) -> dict:
return {
"id": self.id,
"pubkey": self.public_key,
"created_at": self.created_at,
"kind": self.kind,
"tags": self.tags,
"content": self.content,
"sig": self.signature
}
def to_message(self) -> str:
return json.dumps(
[
ClientMessageType.EVENT,
{
"id": self.id,
"pubkey": self.public_key,
"created_at": self.created_at,
"kind": self.kind,
"tags": self.tags,
"content": self.content,
"sig": self.signature
}
]
)
119 changes: 103 additions & 16 deletions nostr/filter.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,28 @@
from collections import UserList
from .event import Event
from typing import List

from .event import Event, EventKind




class Filter:
"""
NIP-01 filtering.
Explicitly supports "#e" and "#p" tag filters via `event_refs` and `pubkey_refs`.
Arbitrary NIP-12 single-letter tag filters are also supported via `add_arbitrary_tag`.
If a particular single-letter tag gains prominence, explicit support should be
added. For example:
# arbitrary tag
filter.add_arbitrary_tag('t', [hashtags])
# promoted to explicit support
Filter(hashtag_refs=[hashtags])
"""
def __init__(
<<<<<<< HEAD
self,
ids: "list[str]" = None,
kinds: "list[int]" = None,
Expand All @@ -14,58 +33,122 @@ def __init__(
limit: int = None,
) -> None:
self.IDs = ids
=======
self,
event_ids: List[str] = None,
kinds: List[EventKind] = None,
authors: List[str] = None,
since: int = None,
until: int = None,
event_refs: List[str] = None, # the "#e" attr; list of event ids referenced in an "e" tag
pubkey_refs: List[str] = None, # The "#p" attr; list of pubkeys referenced in a "p" tag
limit: int = None) -> None:
self.event_ids = event_ids
>>>>>>> bda320f6d6d5087fe1afecd122831afe025f7633
self.kinds = kinds
self.authors = authors
self.since = since
self.until = until
self.tags = tags
self.event_refs = event_refs
self.pubkey_refs = pubkey_refs
self.limit = limit

self.tags = {}
if self.event_refs:
self.add_arbitrary_tag('e', self.event_refs)
if self.pubkey_refs:
self.add_arbitrary_tag('p', self.pubkey_refs)


def add_arbitrary_tag(self, tag: str, values: list):
"""
Filter on any arbitrary tag with explicit handling for NIP-01 and NIP-12
single-letter tags.
"""
# NIP-01 'e' and 'p' tags and any NIP-12 single-letter tags must be prefixed with "#"
tag_key = tag if len(tag) > 1 else f"#{tag}"
self.tags[tag_key] = values


def matches(self, event: Event) -> bool:
if self.IDs != None and event.id not in self.IDs:
if self.event_ids is not None and event.id not in self.event_ids:
return False
if self.kinds != None and event.kind not in self.kinds:
if self.kinds is not None and event.kind not in self.kinds:
return False
if self.authors != None and event.public_key not in self.authors:
if self.authors is not None and event.public_key not in self.authors:
return False
if self.since != None and event.created_at < self.since:
if self.since is not None and event.created_at < self.since:
return False
if self.until != None and event.created_at > self.until:
if self.until is not None and event.created_at > self.until:
return False
if self.tags != None and len(event.tags) == 0:
if (self.event_refs is not None or self.pubkey_refs is not None) and len(event.tags) == 0:
return False
<<<<<<< HEAD
if self.tags != None:
e_tag_identifiers = [e_tag[0] for e_tag in event.tags]
=======

if self.tags:
e_tag_identifiers = set([e_tag[0] for e_tag in event.tags])
>>>>>>> bda320f6d6d5087fe1afecd122831afe025f7633
for f_tag, f_tag_values in self.tags.items():
if f_tag[1:] not in e_tag_identifiers:
# Omit any NIP-01 or NIP-12 "#" chars on single-letter tags
f_tag = f_tag.replace("#", "")

if f_tag not in e_tag_identifiers:
# Event is missing a tag type that we're looking for
return False

# Multiple values within f_tag_values are treated as OR search; an Event
# needs to match only one.
# Note: an Event could have multiple entries of the same tag type
# (e.g. a reply to multiple people) so we have to check all of them.
match_found = False
for e_tag in event.tags:
<<<<<<< HEAD
if e_tag[1] not in f_tag_values:
return False
=======
if e_tag[0] == f_tag and e_tag[1] in f_tag_values:
match_found = True
break
if not match_found:
return False
>>>>>>> bda320f6d6d5087fe1afecd122831afe025f7633

return True


def to_json_object(self) -> dict:
res = {}
<<<<<<< HEAD
if self.IDs != None:
res["ids"] = self.IDs
if self.kinds != None:
=======
if self.event_ids is not None:
res["ids"] = self.event_ids
if self.kinds is not None:
>>>>>>> bda320f6d6d5087fe1afecd122831afe025f7633
res["kinds"] = self.kinds
if self.authors != None:
if self.authors is not None:
res["authors"] = self.authors
if self.since != None:
if self.since is not None:
res["since"] = self.since
if self.until != None:
if self.until is not None:
res["until"] = self.until
if self.tags != None:
for tag, values in self.tags.items():
res[tag] = values
if self.limit != None:
if self.limit is not None:
res["limit"] = self.limit
if self.tags:
res.update(self.tags)

return res


<<<<<<< HEAD
=======

>>>>>>> bda320f6d6d5087fe1afecd122831afe025f7633
class Filters(UserList):
def __init__(self, initlist: "list[Filter]" = []) -> None:
super().__init__(initlist)
Expand All @@ -78,4 +161,8 @@ def match(self, event: Event):
return False

def to_json_array(self) -> list:
<<<<<<< HEAD
return [filter.to_json_object() for filter in self.data]
=======
return [filter.to_json_object() for filter in self.data]
>>>>>>> bda320f6d6d5087fe1afecd122831afe025f7633
Loading

0 comments on commit fb3bbca

Please sign in to comment.