Beware the TinkerPop my son!
Neo4j is killer, but it isn't the only option if you want to use GraphDBs. Furthermore, if you want to take advantage of modern distributed processing techniques with graphs, you need a different toolkit. In this class, we will discuss:
We'll also breifly introduce Python's new Asyncio module, available as part of the standard library starting with Python 3.4.
Gremlin is a subset of the Groovy programming language that provides an API designed specifically for graph traversals. Groovy is a language that runs on the Java Virtual Machine, like Java or Scala, but it is made for scripting. In some ways, Gremlin's syntax is similar to Python's.
Gremlin uses method chaining to compose graph traversals from the docs:
$ bin/gremlin.sh
\,,,/
(o o)
-----oOOo-(3)-oOOo-----
gremlin> g = TinkerFactory.createModern()
==>tinkergraph[vertices:6 edges:6]
gremlin> g.V().has('name','marko').out('knows').values('name')
==>vadas
==>josh
The above can be can be broken down into the following:
gremlin> marko = g.V().has('name','marko').next()
==>v[1]
gremlin> marko.out('knows')
==>v[2]
==>v[4]
gremlin> marko.out('knows').values('name')
==>vadas
==>josh
The Gremlin graph traversal API is huge, with all kinds of steps (methods), but all of the steps are composed from the five basic steps of the ...
map
: transform the incoming traverser’s object to another object (S → E).
flatMap
: transform the incoming traverser’s object to an iterator of other objects (S → E*).
filter
: allow or disallow the traverser from proceeding to the next step (S → S ∪ ∅).
sideEffect
: allow the traverser to proceed unchanged, but yield some computational sideEffect in the process (S ↬ S).
branch
: split the traverser and send each to an arbitrary location in the traversal (S ⇒ S1, S2, …, Sn).
Let's take some time to think about this...
Here's is the example from the docs using the five basic steps to compose the traversal shown above:
gremlin> g.V().
filter{it.get().value('name') == 'marko'}.
flatMap{it.get().out('knows')}.
map{it.get().value('name')}
==>vadas
==>josh
And here is a crafty (academic) example using only flatMap:
gremlin> g.V().
flatMap{it.get().value('name') == 'marko' ? [it.get()].iterator() : [].iterator()}.
flatMap{it.get().out('knows')}.
flatMap{[it.get().value('name')].iterator()}
==>vadas
==>josh
The Gremlin server allows you to remotely execute Gremlin scripts against any Gremlin enabled graph hosted in the server. You can execute scripts using the Gremlin Console, as shown above, or use a driver writen in another language to submit scripts to the server through a websocket connection. There are a variety of drivers in various languages that you can use, we'll be using Python.
First we need the Gremlin Server. Let's download and unpack:
$ wget http://tinkerpop.com/downloads/3.0.0.M7/gremlin-server-3.0.0.M7.zip
$ unzip gremlin-server-3.0.0.M7.zip
$ rm rm gremlin-server-3.0.0.M7.zip
$ mv gremlin-server-3.0.0.M7 gremlin-server
The server comes with a variety of basic config templates, they live in the conf/ directory:
$ cd gremlin-server/conf
$ ls
gremlin-server-classic.yaml gremlin-server.yaml
gremlin-server-min.yaml log4j-server.properties
gremlin-server-modern.yaml neo4j-empty.properties
gremlin-server-neo4j.yaml tinkergraph-empty.properties
gremlin-server-rest-modern.yaml
As you can see, there is a rest server option, but we'll be using gizmo, a Python library that provides bindings for the Gremlin Server websockets communication. For this example, we'll use the base gremlin-server.yaml config file.
$ ./bin/gremlin-server.sh conf/gremlin-server.yaml
Since Python 3.4 is not the default version on many systems, it's nice to create a virtualenv that uses Python 3.4 by default. Then use pip to install gizmo. Using virtualenvwrapper on Ubuntu 14.04:
$ mkvirtualenv -p /usr/bin/python3.4 gizmo
$ pip install gizmo
Fire up the Gremlin-Server.
$ ./bin/gremlin-server.sh conf/gremlin-server-modern.yaml
The AsyncGremlinClient uses asyncio and websockets to communicate asynchronously with the Gremlin Server. The API is based upon the use of asyncio.Task objects, which take an asyncio coroutine generator, wrap it in a future, and schedule its execution on the event loop. With the async client, the user must manually launch the transport layer for socket communication using one of the various methods that run the event loop. Observe:
# Create a websocket connection.
>>> gc = AsyncGremlinClient('ws://localhost:8182/')
# Create a task by passing a coroutine and its parameters as args and kwargs.
# In this case, submit is a method that submits a gremlin script to the
# server and stores the responses on the client object in a message queue.
>>> task = gc.task(gc.submit, "g.V(x).out()", bindings={"x":1})
# Run the event loop until the task is complete.
>>> gc.run_until_complete(task)
At its most basic, the AsyncGremlinClient allows to send and receive message through a socket. The GremlinServer sends reponses in chunks, so it is important to keep receiving messages until the AsyncGremlinClient.recv returns None. Observe:
@asyncio.coroutine
def recv_coro(gc):
yield from gc.send("g.V().has(n, val).values(n)",
bindings={"n": "name", "val": "gremlin"})
while True:
f = yield from gc.recv()
if f is None:
break
self.assertEqual(f[0], "gremlin")
>>> gc.run_until_complete(recv_coro(gc))
Sometimes you'll want to store the server results for later usage. To do so, AsyncGremlinClient provides the submit method, which allows you to submit a script to the server for evaluation, and then modify the responses on the fly as they come from the server before they are stored in AsyncGremlinClient.messages, an asycio.Queue. You can read messages off the queue using the read method. Observe:
# This is applied to each message as it is received from the server.
consumer = lambda x: x[0] ** 2
@asyncio.coroutine
def message_queue_coro(gc):
yield from gc.submit("2 + 2", consumer=consumer)
while True:
f = yield from gc.read()
if f is None:
break
assert(f == 16)
>>> gc.run_until_complete(message_queue_coro(gc))
# A consumer could also be a coroutine
@asyncio.coroutine
def consumer_coro(x):
yield from asyncio.sleep(0)
return x[0] ** 2
@asyncio.coroutine
def coroutine_consumer_coro():
yield from gc.submit("2 + 2", consumer=consumer)
# Access the messages queue directly.
while not gc.messages.empty():
f = yield from gc.read()
assert(f == 16)
>>> gc.run_until_complete(coroutine_consumer_coro(gc))
It's easy to run a bunch of tasks in "parallel", just add them to the client and run them. Warning: the following is an asynchronous technique and does not guarantee the order in which tasks will be completed. Observe:
# Our "slow" function. This will always take longer than other tasks.
@asyncio.coroutine
def sleepy(gc):
yield from asyncio.sleep(0.25)
yield from gc.task(gc.submit, "g.V().has(n, val).values(n)",
bindings={"n": "name", "val": "gremlin"})
# Submit the slow task first.
>>> gc.add_task(sleepy, gc)
# Now the fast task.
>>> gc.add_task(gc.submit, "g.V().values(n)", bindings={"n": "name"},
consumer=lambda x: x)
# This runs all the declared tasks.
>>> gc.run_tasks()
# The task results will be stored on the message queue.
# You can retrieve them using the asyncio.Queue API, or you can just iterate.
>>> for x in gc:
... print(x)
# ['marko', 'vadas', 'lop', 'josh', 'ripple', 'peter']
# ['gremlin']
gizmo just barely parses the Gremlin Server response message by wrapping it in a GremlinResponse object. This object inherits from list, and the content of the response is available using all normal list methods, iteration etc. The GremlinResponse also includes the metadata contained in the server response as properties.
As the above examples demonstrate, AsyncGremlinClient is made to be interoperable with asyncio. Here is an example that uses asyncio to create synchronous communication with the Gremlin Server.
# Define a coroutine that sequentially executes instructions.
@asyncio.coroutine
def client(gc):
yield from sleepy(gc)
yield from gc.task(gc.submit, "g.V().values(n)",
bindings={"n": "name"})
# Response messages sent by server are stored in an asyncio.Queue
while True:
f = yield from gc.messages.get()
if f is None:
break
print(f)
>>> gc = AsyncGremlinClient('ws://localhost:8182/')
>>> gc.run_until_complete(client(gc))
# ['gremlin']
# ['marko', 'vadas', 'lop', 'josh', 'ripple', 'peter']
You can use the AsyncGremlinClient task queue to enqueue and dequeue tasks. Tasks are executed as they are dequeued.
Let's set up the example graph used in the TP3 docs.
Fire up the Gremlin-Server.
$ ./bin/gremlin-server.sh conf/gremlin-server.yaml
@asyncio.coroutine
def graph_create_coro(gc):
# Open a TinkerGraph.
yield from gc.task(gc.submit, "g = TinkerGraph.open()",
consumer=lambda x: x)
f = yield from gc.messages.get()
assert(f.status_code == 200)
# Clear the graph.
yield from gc.task(gc.submit, "g.V().remove(); g.E().remove();",
collect=False)
# Add nodes, edge, add/remove properties.
yield from gc.task(gc.submit,
("gremlin = g.addVertex(label,'software','name','gremlin');" +
"gremlin.property('created', 2009);" +
"blueprints = g.addVertex(label,'software','name','blueprints');" +
"gremlin.addEdge('dependsOn',blueprints);" +
"blueprints.property('created',2010);" +
"blueprints.property('created').remove()"), collect=False)
# Confirm node creation.
yield from gc.task(gc.submit, "g.V().count()", consumer=lambda x: x)
f = yield from gc.messages.get()
assert(f[0] == 2)
# Confirm edge creation.
yield from gc.task(gc.submit, "g.E().count()", consumer=lambda x: x)
f = yield from gc.messages.get()
assert(f[0] == 1)
>>> gc = AsyncGremlinClient("ws://localhost:8182/")
>>> gc.run_until_complete(graph_create_coro(gc))
Ok, now use the task queue to interact with the graph.
# Enqueue two tasks, the first sleepy, the second fast. Then dequeue and
# execute them one by one
@asyncio.coroutine
def enqueue_dequeue_coro(gc):
# Enqueues.
yield from gc.enqueue_task(sleepy)
yield from gc.enqueue_task(gc.submit, "g.V().has(n, val).values(n)",
bindings={"n": "name", "val": "blueprints"},
consumer=lambda x: x[0])
# Dequeues.
yield from gc.dequeue_task()
yield from gc.dequeue_task()
# Test that first in first out was maintained.
mssg1 = yield from gc.messages.get()
assert(mssg1 == "gremlin")
print("Successfully dequeued slow operation: gremlin")
mssg2 = yield from gc.messages.get()
assert(mssg2 == "blueprints")
print("Successfully dequeued fast operation: blueprints")
>>> gc.run_until_complete(enqueue_dequeue_coro(gc))
Use dequeue_all to dequeue and execute all tasks in the order in which they were enqueued.
@asyncio.coroutine
def client(gc):
yield from gc.enqueue_task(sleepy)
yield from gc.enqueue_task(gc.submit, "g.V().values(name)",
bindings={"name": "name"})
yield from gc.dequeue_all()
>>> gc.run_until_complete(client(gc))
A more advanced usage of the task queue would be to use async_dequeue_all, which requires you to define a coroutine that takes the task_queue as a param and uses the asyncio.Queue.get method to retrieve a coroutine and its args and kwargs. Behind the scenes, this creates a coroutine for each item in the queue, and then executes them on the queue asynchronously. Observe:
# Get a coro from the queue and create a task.
@asyncio.coroutine
def async_dequeue_consumer(q):
coro, args, kwargs = yield from q.get()
task = gc.task(coro, *args, **kwargs)
f = yield from task
@asyncio.coroutine
def enqueue_all_coro(gc):
yield from gc.enqueue_task(sleepy,
consumer=lambda x: x[0])
yield from gc.enqueue_task(gc.submit, "g.V().has(n, val).values(n)",
bindings={"n": "name", "val": "blueprints"},
consumer=lambda x: x[0])
# Add all items to the task queue.
>>> gc.run_until_complete(enqueue_all_coro(gc))
# Map the consumer coroutines to the task queue.
>>> gc.async_dequeue_all(async_dequeue_consumer)
# This coroutine just tests the results of the above pattern.
@asyncio.coroutine
def test_messages_coro():
mssg1 = yield from gc.messages.get()
mssg2 = yield from gc.messages.get()
assert(mssg1 == "blueprints")
print("Async returned fast first: blueprints")
assert(mssg2 == "gremlin")
print("Async returned slow second gremlin")
>>> gc.run_until_complete(check_messages_coro())
Now it is up to you to explore to explore Gremlin and the different ways you can use asyncio and gizmo to interact with the Gremlin Server :D!
from functools import partial
def on_chunk(chunk, f):
chunk = chunk
f.write(json.dumps(chunk) + '\n')
@asyncio.coroutine
def slowjson(gc):
yield from asyncio.sleep(5)
yield from gc.send("g.V().values(n)", bindings={"n": "name"})
@asyncio.coroutine
def client():
gc = AsyncGremlinClient('ws://localhost:8182/')
with open("testfile.txt", "w") as f:
p = partial(on_chunk, f=f)
yield from slowjson(gc)
yield from gc.receive(consumer=p, collect=False) # Don't collect messages.
yield from gc.send("g.V(x).out()", bindings={"x":1})
yield from gc.receive(consumer=p, collect=False) # Don't collect messages.
if gc.messages.empty():
print("No messages collected.")
>>> asyncio.get_event_loop().run_until_complete(client())
Here's the output:
>>> f = open("testfile.txt")
>>> for line in f:
... print(line)
["marko", "vadas", "lop", "josh", "ripple", "peter"]
[{"label": "software", "name": ["lop"], "type": "vertex", "lang": ["java"], "id": 3}, {"label": "person", "name": ["vadas"], "age": [27], "type": "vertex", "id": 2}, {"label": "person", "name": ["josh"], "age": [32], "type": "vertex", "id": 4}]
Use gizmo with Tornado:
import asyncio
import json
from tornado import escape, gen
from tornado.web import RequestHandler, Application, url
from tornado.platform.asyncio import AsyncIOMainLoop
from gizmo import AsyncGremlinClient
class GremlinHandler(RequestHandler):
@gen.coroutine
def get(self):
gc = AsyncGremlinClient(uri='ws://localhost:8182/')
# Could define custom receive here, but for brevity...
yield from gc.task(gc.submit, "g.V().values(n)",
bindings={"n": "name"})
while not gc.messages.empty():
message = yield from gc.messages.get()
message = json.dumps(message)
self.write(message)
def make_app():
return Application([
url(r"/", GremlinHandler),
])
def main():
app = make_app()
# Must create IOLoop before calling app.listen.
AsyncIOMainLoop().install()
app.listen(8888)
asyncio.get_event_loop().run_forever()
if __name__ == '__main__':
print("Starting server at http://localhost:8888/")
main()
The beauty of the Gremlin Server lies in its adaptability.
Coming soon in Python...