Usage¶
Using aioserf
is reasonably simple: You open a long-lived connection
and send requests to it.
Let’s consider a basic example:
import anyio
from aioserf import serf_client, UTF8Codec
async def main():
async with serf_client(codec = UTF8Codec()) as client:
await client.event("Hello", payload="I am an example")
async with client.stream('*') as stream:
async for resp in stream:
if resp.event == 'query':
await resp.respond('For %s, with %s' % (resp.name, resp.payload))
else:
print("I got me a %s" % (resp,))
if __name__ == "__main__":
anyio.run(main)
This sample is slightly asocial because it indiscriminately responds to all queries (and requests all other events even if it doesn’t do anythign with them), but it’s immediately obvious what this code is doing.
— Multiple concurrent requests —
Async code as you know it heavily leans towards throwing requests over your
interface’s wall and then using callbacks to process any replies. AioSerf
doesn’t support that. Instead, you start a new task and do your work there.
The aioserf.AioSerf class has a AioSerf.spawn()
helper method so that you can start your task within AioSerf’s task group.
-
await
AioSerf.
spawn
(proc, *args, **kw)¶ Run a task within this object’s task group.
Returns: a cancel scope you can use to stop the task.
Thus, let’s extend our example with a keepalive transmitter:
async def keepalive(client):
while True:
await anyio.sleep(60)
await client.event("keepalive", payload="My example")
async def main():
async with serf_client(codec = UTF8Codec()) as client:
await client.event("Hello", payload="I am an example")
await client.spawn(keepalive)
## async with … – continue as above
Any complex system should include code to shut itself down cleanly, so let’s add that too:
keeper = await client.spawn(keepalive)
async with client.stream('*') as stream:
async for resp in stream:
if resp.event == 'query':
await resp.respond('For %s, with %s' % (resp.name, resp.payload))
elif resp.event == 'user' and resp.name == 'shutdown':
break
elif keeper is not None and resp.event == 'user' and resp.name == 'quiet':
await keeper.cancel()
keeper = None
else:
print("I got me a %s" % (resp,))
await client.cancel()
though in a real, complex system you probably want to open multiple, more selective event streams.
-
await
AioSerf.
cancel
()¶ Cancel our internal task group. This should cleanly shut down everything.
Supported RPC methods¶
AioSerf aims to support all methods exported by Serf’s RPF interface.
Streaming¶
You’ve already seen an example for receiving an event stream:
-
AioSerf.
stream
(event_types='*')¶ Open an event stream.
Possible event types:
*
– all eventsuser
– all user eventsuser:TYPE
– all user events of type TYPEquery
– all queriesquery:TYPE
– all queries of type TYPEmember-join
member-leave
This method returns a SerfStream object which affords an async context manager plus async iterator, which will give you SerfEvent objects, which you can use to return a reply (if triggered by a “query” event).
Replies have a “respond” method which you may use to send responses to queries.
Example Usage:
>>> async with client.stream("user") as stream: >>> async for event in stream: >>> msg = await dispatch(event.type)(event.payload) >>> await event.respond(msg)
though you might want to process messages in a task group:
>>> async def in_tg(event): >>> msg = await dispatch(event.type)(event.payload) >>> await event.respond(msg) >>> async with anyio.create_task_group() as tg: >>> async with client.stream("query") as stream: >>> async for event in stream: >>> await tg.spawn(in_tg, event)
A query is also implemented as a stream because there may be any number of replies:
NS=10**9
async def ask_query(self, client):
acks = 0
reps = 0
async with client.query("foo", payload="baz", request_ack=True,
timeout=5*NS) as q:
async for r in q:
if not hasattr(r,'type'):
break
if r.type == "ack":
acks += 1
elif r.type == "response":
reps += 1
# process `r.payload`
pass
else:
assert False, r
# at this point the query is finished
pass
The done
type (which you’ll find in Serf’s RPC documentation) is
internally translated to a StopAsyncIteration
exception, so you don’t
have to handle it yourself.
-
AioSerf.
query
(name, payload=None, *, nodes=None, tags=None, request_ack=False, timeout=0)¶ Send a query.
Parameters: - name – The query name. Mandatory.
- payload – Your payload. Will be passed through this client’s codec.
- nodes – The list of nodes to pass this query to. Default: no restriction
- tags – A dict of tags used to filter nodes. Values are regexps which a node’s corresponding tag value must match.
- request_ack – A flag whether the query result shall include
messages that a Serf node matches the request.
The default is
False
. - timeout – Time (in seconds) after which the query will be concluded.
Returns: a
aioserf.stream.SerfQuery
object.Note that the query will not be started until you enter its context. You should then iterate over the results:
async with client.query("example") as stream: async for response in stream: if response.type == "ack": print("Response arrived at %s" % (response.from,)) elif response.type == "response": print("Node %s answered %s" % (response.from, repr(response.payload)))
You can also receive a logging stream. Remember that Serf sends a bunch of old log entries which may deadlock the RPC socket until you read them all.
-
AioSerf.
monitor
(log_level='info')¶ Ask the server to stream (some of) its log entries to you.
Args: log_level
: The debug level.Possible values are “trace”, “debug”, “info”, “warn”, and “err”. The default is “info”.
Requests¶
AioSerf contains methods for all remaining RPC methods that Serf offers.
See Serf’s RPC documentation <https://www.serf.io/docs/agent/rpc.html> for details, esp. regarding the values in replies.
-
AioSerf.
event
(name, payload=None, coalesce=True)¶ Send a user-specified event to the cluster. Can take an optional payload, which will be sent as translated by the client’s codec.
Parameters: - name – The name of the user event.
- payload – The payload, as acceptable to the codec’s
encode
method. - coalesce – A flag specifying whether multiple events with the same name should be replaced by
-
AioSerf.
respond
(seq, payload)¶ Respond to a query.
You should probably call this via
aioserf.stream.SerfEvent
.
-
AioSerf.
members
(name=None, status=None, tags=None)¶ Lists members of a Serf cluster, optionally filtered by one or more filters:
Parameters: - name – a string with a regex that matches on node names.
- status – a string with a regex matching on node status.
- tags – a dict of tags used to filter nodes. Values are regexps which a node’s corresponding tag value must match.
All arguments must match for a node to be returned.
Set this node’s tags.
Keyword arguments are used to name tags to be set or deleted.
You can delete a tag by passing
None
as its value.Tags that are not mentioned are not changed.
-
AioSerf.
join
(location, replay=False)¶ Ask Serf to join a cluster, by providing a list of possible ip:port locations.
Parameters: - location – A list of addresses.
- replay – a flag indicating whether to replay old user events
to new nodes; defaults to
False
.
-
AioSerf.
force_leave
(name)¶ Force a node to leave the cluster.
Parameters: name – the name of the node that should leave.
-
AioSerf.
stats
()¶ Obtain operator debugging information about the running Serf agent.
Codecs¶
Serf’s RPC protocol can transport user-specific payloads, which must be binary strings. As these are inconvenient and rarely what you need, AioSerf transparently encodes and decodes them with a user-supplied codec.
The codec needs to have encode
and decode
methods which return /
accept bytes
. The default is to do nothing.
-
class
aioserf.codec.
NoopCodec
¶ A codec that does nothing.
Your payload needs to consist of bytes.
-
class
aioserf.codec.
UTF8Codec
¶ A codec that translates to UTF-8 strings.
Your payload needs to be a single string.
This codec will not stringify other data types for you.
-
class
aioserf.codec.
MsgPackCodec
(use_bin_type=True, use_list=False)¶ A codec that encodes to “msgpack”-encoded bytestring.
Your payload must consist of whatever “msgpack” accepts.
Parameters: - use_bin_type – Bytestrings are encoded as byte arrays.
Defaults to
True
. IfFalse
, bytes are transmitted as string types and input strings are not UTF8-decoded. Use this if your network contains clients which use non-UTF8-encoded strings (this violates the MsgPack specification). - use_list – if
True
, lists and tuples are returned as lists (i.e. modifyable). Defaults toFalse
, which uses immutable tuples (this is faster).
- use_bin_type – Bytestrings are encoded as byte arrays.
Defaults to
Helper classes¶
-
class
aioserf.stream.
SerfStream
(client, stream)¶ An object of this class is returned by
aioserf.AioSerf.stream()
. It represents the message stream that’s returned by a query which returns more than one reply.All you should do with this object is iterate over it with an async context:
async with client.stream(...) as stream: assert isinstance(stream, SerfStream) async for reply in stream: assert isinstance(reply, SerfEvent) pass
Note that the actual query is not started until you enter the context.
-
class
aioserf.stream.
SerfQuery
(client, stream)¶ An object of this class is returned by
aioserf.AioSerf.query()
.All you should do with this object is iterate over it with an async context:
async with client.query(...) as query: assert isinstance(query, SerfQuery) async for reply in query: assert isinstance(reply, SerfEvent) pass
This is a derivative class of
SerfStream
.Cancelling a
SerfQuery
has no effect – the query only terminates when its timeout expires.
-
class
aioserf.stream.
SerfEvent
(client)¶ Encapsulates one event returned by a
SerfStream
orSerfQuery
.The event’s data are represented by (lower-cased) attributes of this object.
The payload (if any) will have been decoded by the client’s codec. Non-decodable payloads trigger a fatal error. To avoid that, use the
aioserf.codec.NoopCodec
codec and decode manually.-
await
respond
(payload=None)¶ This method only works for “query” requests, as returned by iterating over a
SerfStream
.Parameters: payload – the payload, as accepted by the client codec’s encoder.
-
await
-
class
aioserf.util.
ValueEvent
(scope=None)¶ A waitable value useful for inter-task synchronization, inspired by
threading.Event
.An event object manages an internal value, which is initially unset, and a task can wait for it to become True.
Parameters: scope – A cancelation scope that will be cancelled if/when this ValueEvent is. Used for clean cancel propagation. Note that the value can only be read once.