Usage

Using aioserf is reasonably simple: You open a long-lived connection and send requests to it.

Let’s consider a basic example:

import anyio
from aioserf import serf_client, UTF8Codec

async def main():
    async with serf_client(codec = UTF8Codec()) as client:
        await client.event("Hello", payload="I am an example")

        async with client.stream('*') as stream:
            async for resp in stream:
                if resp.event == 'query':
                    await resp.respond('For %s, with %s' % (resp.name, resp.payload))
                else:
                    print("I got me a %s" % (resp,))

if __name__ == "__main__":
    anyio.run(main)

This sample is slightly asocial because it indiscriminately responds to all queries (and requests all other events even if it doesn’t do anythign with them), but it’s immediately obvious what this code is doing.

— Multiple concurrent requests —

Async code as you know it heavily leans towards throwing requests over your interface’s wall and then using callbacks to process any replies. AioSerf doesn’t support that. Instead, you start a new task and do your work there. The aioserf.AioSerf class has a AioSerf.spawn() helper method so that you can start your task within AioSerf’s task group.

await AioSerf.spawn(proc, *args, **kw)

Run a task within this object’s task group.

Returns:a cancel scope you can use to stop the task.

Thus, let’s extend our example with a keepalive transmitter:

async def keepalive(client):
    while True:
        await anyio.sleep(60)
        await client.event("keepalive", payload="My example")

async def main():
    async with serf_client(codec = UTF8Codec()) as client:
        await client.event("Hello", payload="I am an example")
        await client.spawn(keepalive)

        ## async with … – continue as above

Any complex system should include code to shut itself down cleanly, so let’s add that too:

keeper = await client.spawn(keepalive)
async with client.stream('*') as stream:
    async for resp in stream:
        if resp.event == 'query':
            await resp.respond('For %s, with %s' % (resp.name, resp.payload))
        elif resp.event == 'user' and resp.name == 'shutdown':
            break
        elif keeper is not None and resp.event == 'user' and resp.name == 'quiet':
            await keeper.cancel()
            keeper = None
        else:
            print("I got me a %s" % (resp,))
await client.cancel()

though in a real, complex system you probably want to open multiple, more selective event streams.

await AioSerf.cancel()

Cancel our internal task group. This should cleanly shut down everything.

Supported RPC methods

AioSerf aims to support all methods exported by Serf’s RPF interface.

Streaming

You’ve already seen an example for receiving an event stream:

AioSerf.stream(event_types='*')

Open an event stream.

Possible event types:

  • * – all events
  • user – all user events
  • user:TYPE – all user events of type TYPE
  • query – all queries
  • query:TYPE – all queries of type TYPE
  • member-join
  • member-leave

This method returns a SerfStream object which affords an async context manager plus async iterator, which will give you SerfEvent objects, which you can use to return a reply (if triggered by a “query” event).

Replies have a “respond” method which you may use to send responses to queries.

Example Usage:

>>> async with client.stream("user") as stream:
>>>     async for event in stream:
>>>         msg = await dispatch(event.type)(event.payload)
>>>         await event.respond(msg)

though you might want to process messages in a task group:

>>> async def in_tg(event):
>>>     msg = await dispatch(event.type)(event.payload)
>>>     await event.respond(msg)
>>> async with anyio.create_task_group() as tg:
>>>     async with client.stream("query") as stream:
>>>         async for event in stream:
>>>             await tg.spawn(in_tg, event)

A query is also implemented as a stream because there may be any number of replies:

NS=10**9

async def ask_query(self, client):
    acks = 0
    reps = 0
    async with client.query("foo", payload="baz", request_ack=True,
            timeout=5*NS) as q:
        async for r in q:
            if not hasattr(r,'type'):
                break
            if r.type == "ack":
                acks += 1
            elif r.type == "response":
                reps += 1
                # process `r.payload`
                pass
            else:
                assert False, r
    # at this point the query is finished
    pass

The done type (which you’ll find in Serf’s RPC documentation) is internally translated to a StopAsyncIteration exception, so you don’t have to handle it yourself.

AioSerf.query(name, payload=None, *, nodes=None, tags=None, request_ack=False, timeout=0)

Send a query.

Parameters:
  • name – The query name. Mandatory.
  • payload – Your payload. Will be passed through this client’s codec.
  • nodes – The list of nodes to pass this query to. Default: no restriction
  • tags – A dict of tags used to filter nodes. Values are regexps which a node’s corresponding tag value must match.
  • request_ack – A flag whether the query result shall include messages that a Serf node matches the request. The default is False.
  • timeout – Time (in seconds) after which the query will be concluded.
Returns:

a aioserf.stream.SerfQuery object.

Note that the query will not be started until you enter its context. You should then iterate over the results:

async with client.query("example") as stream:
    async for response in stream:
        if response.type == "ack":
            print("Response arrived at %s" % (response.from,))
        elif response.type == "response":
            print("Node %s answered %s" %
                    (response.from,
                     repr(response.payload)))

You can also receive a logging stream. Remember that Serf sends a bunch of old log entries which may deadlock the RPC socket until you read them all.

AioSerf.monitor(log_level='info')

Ask the server to stream (some of) its log entries to you.

Args:
log_level: The debug level.

Possible values are “trace”, “debug”, “info”, “warn”, and “err”. The default is “info”.

Requests

AioSerf contains methods for all remaining RPC methods that Serf offers.

See Serf’s RPC documentation <https://www.serf.io/docs/agent/rpc.html> for details, esp. regarding the values in replies.

AioSerf.event(name, payload=None, coalesce=True)

Send a user-specified event to the cluster. Can take an optional payload, which will be sent as translated by the client’s codec.

Parameters:
  • name – The name of the user event.
  • payload – The payload, as acceptable to the codec’s encode method.
  • coalesce – A flag specifying whether multiple events with the same name should be replaced by
AioSerf.respond(seq, payload)

Respond to a query.

You should probably call this via aioserf.stream.SerfEvent.

AioSerf.members(name=None, status=None, tags=None)

Lists members of a Serf cluster, optionally filtered by one or more filters:

Parameters:
  • name – a string with a regex that matches on node names.
  • status – a string with a regex matching on node status.
  • tags – a dict of tags used to filter nodes. Values are regexps which a node’s corresponding tag value must match.

All arguments must match for a node to be returned.

AioSerf.tags(**tags)

Set this node’s tags.

Keyword arguments are used to name tags to be set or deleted.

You can delete a tag by passing None as its value.

Tags that are not mentioned are not changed.

AioSerf.join(location, replay=False)

Ask Serf to join a cluster, by providing a list of possible ip:port locations.

Parameters:
  • location – A list of addresses.
  • replay – a flag indicating whether to replay old user events to new nodes; defaults to False.
AioSerf.force_leave(name)

Force a node to leave the cluster.

Parameters:name – the name of the node that should leave.
AioSerf.stats()

Obtain operator debugging information about the running Serf agent.

Codecs

Serf’s RPC protocol can transport user-specific payloads, which must be binary strings. As these are inconvenient and rarely what you need, AioSerf transparently encodes and decodes them with a user-supplied codec.

The codec needs to have encode and decode methods which return / accept bytes. The default is to do nothing.

class aioserf.codec.NoopCodec

A codec that does nothing.

Your payload needs to consist of bytes.

class aioserf.codec.UTF8Codec

A codec that translates to UTF-8 strings.

Your payload needs to be a single string.

This codec will not stringify other data types for you.

class aioserf.codec.MsgPackCodec(use_bin_type=True, use_list=False)

A codec that encodes to “msgpack”-encoded bytestring.

Your payload must consist of whatever “msgpack” accepts.

Parameters:
  • use_bin_type – Bytestrings are encoded as byte arrays. Defaults to True. If False, bytes are transmitted as string types and input strings are not UTF8-decoded. Use this if your network contains clients which use non-UTF8-encoded strings (this violates the MsgPack specification).
  • use_list – if True, lists and tuples are returned as lists (i.e. modifyable). Defaults to False, which uses immutable tuples (this is faster).

Helper classes

class aioserf.stream.SerfStream(client, stream)

An object of this class is returned by aioserf.AioSerf.stream(). It represents the message stream that’s returned by a query which returns more than one reply.

All you should do with this object is iterate over it with an async context:

async with client.stream(...) as stream:
    assert isinstance(stream, SerfStream)
    async for reply in stream:
        assert isinstance(reply, SerfEvent)
        pass

Note that the actual query is not started until you enter the context.

class aioserf.stream.SerfQuery(client, stream)

An object of this class is returned by aioserf.AioSerf.query().

All you should do with this object is iterate over it with an async context:

async with client.query(...) as query:
    assert isinstance(query, SerfQuery)
    async for reply in query:
        assert isinstance(reply, SerfEvent)
        pass

This is a derivative class of SerfStream.

Cancelling a SerfQuery has no effect – the query only terminates when its timeout expires.

class aioserf.stream.SerfEvent(client)

Encapsulates one event returned by a SerfStream or SerfQuery.

The event’s data are represented by (lower-cased) attributes of this object.

The payload (if any) will have been decoded by the client’s codec. Non-decodable payloads trigger a fatal error. To avoid that, use the aioserf.codec.NoopCodec codec and decode manually.

await respond(payload=None)

This method only works for “query” requests, as returned by iterating over a SerfStream.

Parameters:payload – the payload, as accepted by the client codec’s encoder.
class aioserf.util.ValueEvent(scope=None)

A waitable value useful for inter-task synchronization, inspired by threading.Event.

An event object manages an internal value, which is initially unset, and a task can wait for it to become True.

Parameters:scope – A cancelation scope that will be cancelled if/when this ValueEvent is. Used for clean cancel propagation.

Note that the value can only be read once.