SPLAY manual

Introduction

SPLAY provides a language, a set of libraries, a deployment system and experiment control/monitoring to easily develop and evaluate distributed applications and protocols.

This manual presents the basics for using SPLAY language and libraries. Note that the SPLAY libraries can be used outside of the SPLAY execution environment, e.g., for testing, local runs or in a production system.

For any questions, comments or suggestions of improvements concerning SPLAY or this manual, please contact us at info@splay-project.org

  • First, get the latest SPLAYd and SPLAY libraries and install it.
  • SPLAY uses Lua as its base language. Lua is a simple, easy to learn, and well documented language. This manual briefly recap the Lua syntax but we recommend to have a look at the online free Lua manual, and in particular these sections: You can also get and print this great quick sheat.
  • If you need to use TCP and UDP sockets directly, proceed to the LuaSocket documentation. Note that using SPLAY's RPCs mechanism is more convenient and concise in most cases.

Lua basics

We strongly recommend users to consult the Lua manual, nonetheless, this part goes over the most important Lua syntax and data structure aspects.

  • Single line comment: -- this is a comment
  • Multiple lines comment: --[[ this is a comment ]]
  • Lua "natural" array indexes begin with 1, not 0.
  • Not equal is ~=
  • Dynamic typing
  • Functions can return more than one value
  • By convention, when there is an error, a function returns the two values nil and a string with the error message
  • Tables are the primary data structure (see below)
  • Functions are first class objects. One can pass on a function to another function as a parameter, or redefine a function
  • If not declared as "local", a variable is global
  • Error handling
  • Variable number of arguments (advanced topic, see documentation)
  • Meta-tables (advanced topic, see documentation)
  • Upvalues and closures (advanced topic, see documentation)

Tables

In Lua, tables are used both for numerically indexed arrays and hash-indexed maps (and can be a mix of both).

a = {} -- we create a table
a[1] = "one"
a["two"] = 2

-- short syntax
b = {[1] = "one", two = 2}

Some functions rely on a numerical indexing and others do not.
For example, the size operator (#) counts only the size from index 1 (numeric) to n (numeric). If there is a hole (a missing numerical index) in the increment, the size reported will be the size before the hole.

a = {}
a[1] = "a"
a[2] = "b"
a[3] = "c"
a[4] = "d"
print(#a) -- => 4

b[1]=1
b[3]=1
print(#b) -- => 1 (and not 2!)

a[3] = nil
print(#a) -- => 4 (size = greatest numerical index before introducing a hole)
a[3] = "e"
print(#a) -- => 4

table.insert(a, "f")
print(#a) -- => 5
table.remove(a, 3) -- (re-indexing to avoid a hole, reports correct size)
print(#a) -- => 4

Lua functions table.insert() and table.remove() manipulate numerically indexed tables without creating any holes. Using these functions only is a safe way of keeping the size of an array correctly reported.

a = {}
-- a table based FIFO
table.insert(a, 1)
table.insert(a, 2)
table.insert(a, 3)
print(table.remove(a)) -- => 1
print(table.remove(a)) -- => 2
print(table.remove(a)) -- => 3

Two syntaxes exist for accessing a table by a key.

a = {}

a["hello"] = "world"
-- or
a.hello = "world"

Tables can contain everything, including functions.

a = {}

a.hello = function()
	print("world")
end

a.hello() -- => "world"

Note that tables can be used to have an Object-oriented programming style. Note however that Lua is closer to prototype programming than to OOP. When one calls a table function with ":" instead of ".", the first parameter given to the function will be the table itself.

For advanced OOP/prototype style programming and heritance, one needs to use meta-tables (see Lua manual).

a = {}

a.c = 1 -- internal counter

a.count = function(self)
	self.c = self.c + 1
	return self.c
end

print(a:count()) -- => 2
print(a:count()) -- => 3

Multiple return values

Functions in Lua can return multiple values. This is used, in particular, for error handling.

function test(a, b)
	return a + b, a * b
end

print(test(2, 3)) -- => 5, 6

-- create a table from the set of returned values
a = {test(a,b)} 

Functions as first class objects

Functions are primary objects in Lua. One can program in a functional-programming style, as functions can return functions themselves, and can be passed as parameters just as any other value type.

function apply(func, a)
	return func(a)
end

apply(print, "hello world")

SPLAY API

This section covers the libraries and language support offered by SPLAY.

Nodes

As a convention, a node is a Lua table with (at least) two keys: "ip" and "port".

node = {ip = "127.0.0.1", port = 20000}

All network-related functions that require an IP and/or port parameters will directly accept such a structure (including LuaSocket when used through SPLAY). In most applications, the IP and port can be completely hidden using this convention.

When using the bootstrap list feature of SPLAY, an application node receives its list of other peers in the form of a list of node structures.

Conventions

In this documentation, <IP, port> as a parameter of some function parameters means that one can use 'IP' and 'port' as two separate arguments or together as a node structure. Parameters <port>, can be either a single port number or a node structure.

If a function parameter is followed by a '*', that parameter can be a single element or an array containing several elements.

splay.base

splay.base contains the minimum set of libraries required by all SPLAY applications. Each SPLAY application begins with:

require("splay.base")

Loading splay.base sets the following global variables:

These four libraries can be used directly using these shortcuts (e.g., misc.func() instead of splay.misc.func() ). Loading again these libraries has no other effect than creating a new alias.

require("splay.base")

print(misc) -- "pointer" to splay.misc

misc2 = require("splay.misc")

print(misc2) -- still a pointer to splay.misc

-- 3 ways of calling the same function
misc.gen_string(3)
misc2.gen_string(3)
splay.misc.gen_string(3)

-- "misc", "log", "socket" and "events" are just predefined "shortcuts"

splay.events

The event system is the core of the SPLAY runtime environment. It is used by the threading system, network I/O, etc. It basically acts as a global scheduler. Times are expressed in seconds.

A SPLAY application finishes only when there is no more running or runnable threads. All server functions (rpc.server(), net.server(), net.udp_helper(), events.periodic()) are based on a never-ending thread. When using one of these it is necessary to explicitly terminate the application by calling events.exit().

Important note: all the operations listed in the events.* library below with the exception of events.run(), must be called insider a thread (e.g., inside the code embedded in events.run(), events.thread() or events.periodic()). Using these methods outside a thread will result in the following error message from Lua: "attempt to yield across metamethod/C-call boundary".

run([func*])

Starts the main application loop.

The function in argument to run() is automatically embedded in a new thread. By convention, the (anonymous) function given as a parameter to run() is considered as the "main" function of the application.

Note that a call to run() is required to activate the triggering of events and the scheduler, which are required for events.wait(), events.thread(), events.periodic() and the code therein.
This function accepts the same parameters as events.thread().

require("splay.base")

-- "main"
events.run(function()
	print("hello world")
end)
exit()

Terminates the application.

The call to events.exit() includes a small wait in order to let pending RPCs terminate before quitting the application (and avoid timeouts at the caller nodes).

thread(func*)

Creates a new thread embedding the function given as a parameter.

Multiple functions in an array as a parameter result in one thread per function. Returns a reference to the created thread (or an array of references with multiple functions). If the function to be embedded requires a parameter, it needs to be embedded in a enclosing anonymous function.

require("splay.base")

function hello()
	print("world")
end

function my_print(s)
	print(s)
end

events.run(function()
	events.thread(hello)
								
	-- anonymous function to embed a function with a parameter
	events.thread(function() my_print("test") end)

	-- array of functions with one function pointer and one anonymous function
	events.thread({hello, function() print("SPLAY") end})
end)
kill(thread_reference*) Kills the thread, or the set of threads, given as parameter. The parameter can be a Lua table that stores references to active threads. This is useful to kill multiple threads at the same time with one single call.
periodic(time, func*[, force])

Periodically (each "time" seconds) calls function "func". The parameter can also be an array of functions.

If a single function is given and the 'force' option is not set or is set to false, the next call will only be done if the previous one has terminated. If the 'force' option is set, then the function is re-run independantly of the calls in the previous periods.

This function returns a thread reference to the thread that periodically creates the new threads embedding the periodically executed function(s).

require("splay.base")

function slow(a)
	print("slow", a)
	events.sleep(7)
end

events.run(function()
	local t = events.periodic(1, function() print("hello") end)
	events.periodic(3, function() slow("one") end)
	events.periodic(3, function() slow("two") end, true)
	events.sleep(5)
	events.kill(t)
end)
dead(thread_reference) Returns true if the thread in paramater is dead.
status(thread_reference) Returns the thread status among: "normal", "running", "suspended", "dead".
sleep(time) Sleeps for "time" seconds.
fire(name[, arg]*) Fires a new event called "name" (any String can be used), with optional additional arguments for the thread that is awaken by the event. Returns true if the fire is successful or the two return values false, error message otherwise.
wait(name[, timeout])

Waits for an event named "name", with an optional timeout.

  • If no timeout value is defined:
    Returns optional arguments passed by fire (empty return by default)
  • If a timeout value is defined:
    • If the wait has not timed out:
      Returns true and optional arguments passed by fire.
    • If wait has timed out:
      Returns false and a "timeout" error message.
require("splay.base")

events.run(function()

	-- WITHOUT TIMEOUT PARAMETER

	-- 1 fire() without parameters
	
	events.thread(function() print(events.wait("ev")) end)
	-- will print nothing (nil)
	print(events.fire("ev")) -- return: true
	events.sleep(1)

	-- 2 fire() with one parameter
	
	events.thread(function() print(events.wait("ev")) end)
	-- will print: "hello"
	print(events.fire("ev", "hello")) -- return: true
	events.sleep(1)

	-- 3 Two fires:
	-- The first with multiple parameters.
	-- The second one did not fire, because the slot ("ev") is already used by
	-- the first call (wait() has still not get it)

	events.thread(function() print(events.wait("ev")) end)
	-- will print: "hello, world"
	print(events.fire("ev", "hello", "world")) -- return: true
	print(events.fire("ev", "no effect")) -- return: false
	events.sleep(1)

	-- WITH TIMEOUT PARAMETER
	
	-- 1 fire() without parameters (not timed out)
	
	events.thread(function() print(events.wait("ev", 2)) end)
	-- will print: "true"
	events.fire("ev")
	events.sleep(1)

	-- 1.1 Without parameters (timed out)
	
	events.thread(function() print(events.wait("ev", 0.01)) end)
	-- will print: "false, timeout"
	events.sleep(0.1)
	events.fire("ev")
	events.sleep(1)

	-- 2 fire() with multiple parameters (not timed out)
	
	events.thread(function() print(events.wait("ev", 2)) end)
	-- will print: "true, hello, world"
	events.fire("ev", "hello", "world")
	events.sleep(1)

end)
yield()

The support of thread in Lua is non-preemptive. It is instead based on a lightweigth cooperative approach based on Lua's coroutines. While it simplifies the design and implementation of many algorithms, it is necessary to take into account that long computations without I/Os may consume all CPU resources without being unscheduled automatically in favor of others.

Calls to the network I/Os libraries, and calls to synchronization operations automatically give an opportunity for the scheduler for switching to another thread. If a function has to perform a long computation without I/Os it is necessary to periodically give the scheduler the opportunity to select another runnable thread for execution. This is done by explicitly calling events.yield().

Always returns true.

The following operations contain an implicit yield:

  • events.sleep()
  • events.wait()
  • Network operations (RPCs, ...)
  • Waiting to acquire a lock.
require("splay.base")

-- dangerous thread: infinite loop, never yields,
-- no IO operations inside the loop:
-- the application will stall
function hello_bad()
	local d = c
	c = c + 1
	while true do
		print("world", d)
	end
end

c = 1
function hello_good()
	local d = c
	c = c + 1
	while events.yield() do
		print("world", d)
	end
end

events.run({hello_good, hello_good, hello_good})
lock([secure])

Returns a new mutual exclusion lock object. Acquiring the lock and releasing the lock is done using the :lock() and :unlock() methods, respectively.

The :lock() call is blocking and implicitly yields.

The secure parameter determines the behavior of the lock in case of a failure (see below).

require("splay.base")

l = events.lock()

function test()
	l:lock()
	print("in")
	events.sleep(2)
	l:unlock()
	print("out")
end

events.run({test, test, test})
semaphore(size[, secure])

Similar to a lock but "size" threads can be granted the right to be in the enclosed region at the same time.

The secure parameter determines the behavior of the lock in case of a failure (see below).

require("splay.base")

l = events.semaphore(2)

function test()
	l:lock()
	print("in")
	events.sleep(2)
	l:unlock()
	print("out")
end

events.run({test, test, test, test, test})
synchronize(func[, timeout]) Permits to synchronize the access to some function "func". Synchronize acquires a lock before accessing the function, ensuring the only one thread can be inside that function at one given time (if all calls to that function are included in a synchronized call). Returns the function's return value.
stats() Returns a string containing some statistics about the scheduler activity.

Secure locks

lock() and semaphore() can take an additionnal 'secure' parameter. By default, its value is 1 (secure), and if a thread holding a lock dies due to an error, the lock(s) it holds are automatically unlocked.

A value of false for the secure parameter gives a classic lock that is never automatically released.

A value of 2 means that if a thread holding a lock dies, with or without an error, the lock(s) is/are automatically released.

Note: having a thread dying on error is certainly not an acceptable behavior. It means that error checking or protection is missing when calling some functions. Secure locks avoid some dead locks and often facilitates debugging, but they only help highlighting problems, not solving them.

splay.rpc, splay.urpc and splay.rpcq

RPC (Remote Procedure Call) allows to easily call a function on a remote host and locally get its result. RPCs are the mechanism of choice for communication in SPLAY application as they permit to obtain clean, concise and readable code.

SPLAY provides three types of RPC: using TCP with connections on demand (splay.rpc), using UDP (splay.urpc) and using TCP with persistent connections (splay.rpcq).

TCP RPC (splay.rpc) characteristics:

  • Functions can accept arguments of unlimited size and can receive replies of unlimited size.
  • Establishing a connection is slower than UDP.
  • Each ongoing RPC uses one socket that is immediatly closed when the RPC returns.

UDP RPC (splay.urpc) characteristics:

  • Parameters and returned values must be smaller than 8kB.
  • No connection establishment => faster than TCP-based RPC.
  • Timeout + retry (configurable) if packet loss.
  • Only 1 socket used for both receiving and sending.

TCP RPC queue (splay.rpcq) characteristics (BETA):

  • Functions can accept arguments of unlimited size and can receive replies of unlimited size.
  • Establishing a connection is slower than UDP.
  • Keeps connections open as long as possible. If no more ports are available or if limits are reached, the least-recently used existing sockets is close first (LRU).
  • Queue messages and sends them using a window size for less delays.
  • Reconnects whenever a connection is lost if there are still messages in the queue for that connection.
  • Use more sockets than splay.rpc but recommended if the same nodes have to communicate together often.

The API is the same for all types of RPC:

require("splay.base")
rpc = require("splay.rpc")
urpc = require("splay.urpc")
rpcq = require("splay.rpcq")

me = {ip = "127.0.0.1", port = 45678}
-- another port for rpcq
me2 = {ip = "127.0.0.1", port = 45679}

rpc.server(me)
urpc.server(me)
rpcq.server(me2)

function call_me(a)
	return "You called me with "..a
end

events.run(function()
		print(rpc.call(me, {"call_me", "rpc"}))
		print(urpc.call(me, {"call_me", "urpc"}))
		print(rpcq.call(me2, {"call_me", "rpcq"}))
		events.exit() -- rpc.server() is still running...
end)

RPCs allow to remotely call functions but also to access the remote value of a variable, a variable in a table, etc. RPCs do not allow accessing remotely function pointers.

require("splay.base")
rpc = require("splay.rpc")

me = {ip = "127.0.0.1", port = 45678}

var = 5
var2 = {1, 2}
var3 = {
	key = "value",
	func = function(a)
		return "func_call "..tostring(a)
	end
}

events.run(function()
	rpc.server(me)

	-- VARIABLES

	print(rpc.call(me, "no_var")) -- return nil
	print(rpc.call(me, "var"))
	print(rpc.call(me, "var2"))
	print(rpc.call(me, "var2[2]"))
	print(rpc.call(me, "var3['key']"))
	print(rpc.call(me, "var3.key"))

	-- FUNCTION CALLS

	-- call without parameters
	print(rpc.call(me, {"var3.func"}))
	print(rpc.call(me, "var3.func")) -- shortcut if no parameters

	-- call with parameter "hello"
	print(rpc.call(me, {"var3.func", "hello"}))
	events.exit() -- rpc.server() is still running...
end)

If a RPC remotely calls a non existing function or variable, the system will consider that the variable exists but is equal to nil. Calling a non-existing function with parameters will result in the following error message: "calling a variable with parameters".

rpc.server() is a never-ending thread. Hence, the termination of the main function (in the events.run() call) will not terminate properly the program as one thread remains active. It is necessary to call events.exit() in order to kill the RPC server before leaving the main loop.

server(<port> [, max])

Runs a new RPC server on port "port". "max" (only for TCP) sockets at the same time (default = limited by system resources or resource usage enforcement if set up by the administrator).

Returns true or nil and error message if there is a problem (e.g., the port is already bound).

stop_server(<port>) Stops the server listening on the specified port.
acall(<ip, port>, func_a, timeout)

acall is the standard RPC call. It returns a couple true, return values or a couple false, error message in case of failure, which allows to distinguish between a succesful or failed call.

"func_a" is an array that contains: (1) the name of the function to be called or variable to read (first parameter); (2) parameters for the function being called (subsequent parameters). Calling a function without parameters, or retrieving a variable, is possible by using its name directly and not in an array.

If the RPC is successful, it returns: true, numerical array with function return values

If the RPC has failed, it returns: false, error message

require("splay.base")
rpc = require("splay.rpc")

me = {ip = "127.0.0.1", port = 45678}

function concat(a, b)
	return a..b
end

events.run(function()
	rpc.server(me)
	local ok, r = rpc.acall(me, {"concat", 'one', 'two'})
	if ok then
		print("result", r[1])
	else
		print("failed", r) -- r contain error message
	end
	events.exit() -- rpc.server() is still running...
end)
call(<ip, port>, func_a, timeout) Same as acall() but directly returns the function return values (similar to calling a local function). Using call(), it may be difficult to distinguish between a succesful reply and an error and thus this call shall be used only in non-faulty environments.
ecall(<ip, port>, func_a, timeout) Same as acall() but in the case of a network error, throws an exception (error()) that should be catched with pcall() (if the exception is not catched, the thread dies).
ping(<ip, port>[, timeout])

RPC level ping and RTT measurements.

Allows to check the availability of some host and measure the round-trip time delay between two hosts.

Returns RTT delay if the ping succeeds, nil and an error message otherwise.

proxy(<ip, port>) Creates a new RPC proxy for the host. Calls are expressed in an even more natural manner. Generates exception on errors (as with using ecall()).
require("splay.base")
rpc = require("splay.rpc")

me = {ip = "127.0.0.1", port = 45678}

function concat(a, b)
	return a..b
end

events.run(function()
	rpc.server(me)
	local node = rpc.proxy(me)
	local ok, err = pcall(function()
		local r = node:concat("one", "two")
		print("result", r)
	end)
	if not ok then print("failed", err) end
	events.exit() -- rpc.server() is still running...
end)
mode [variable] Informational variable that contain 'rpc', 'urpc' or 'rpcq' depending of the type of RPC used.
settings = { max = nil (all), default_timeout = 60 (urpc = 40) }

max: set the max number of simultaneous outgoing RPCs (default nil = unlimited).

default_timeout: set the max RPC total time (60s for rpc and rpcq, 40s for urpc).

RPCs are used as the base mechanism to exchange information between peers. RPC calls generally have a timeout. But calling an RPC can generate another RPC call from the called function and recursively. An example is DHT routing protocols that locate a particular key in a data structure using multiple hops between nodes.

A good practice is to have a function called by RPC replying as soon as possible, without waiting for a potentially long chain of RPCs and creating chain reactions in case of a timeout on the chain. Generally, that consists of creating a thread that will process the result of the call when it returns. If the caller needs to block on the initial RPC call, this can be implemented using signals and a RPC-based notification from the callee.

-- practical example of callback usage
require("splay.base")
rpc = require("splay.rpc")

me = {ip = "127.0.0.1", port = 30000}

function callback(node, count)
	print("result after "..count.." nodes")
	events.fire("node", node)
end

function recursive_find(m)
	-- Creating a new thread, the function will return immediately
	events.thread(function()
		if math.random(10) == 1 then
			-- I am the destination
			rpc.call(m.origin, {'callback', me, m.count})
		else
			m.count = m.count + 1
			local next = me -- no budget for more nodes
			rpc.call(next, {'recursive_find', m})
		end
	end)
end

function find()
	local m = {origin = me, count = 0}
	rpc.call(me, {'recursive_find', m})
	return events.wait("node")
end

events.run(function()
	rpc.server(me)
	local dest = find()
	print(dest.ip, dest.port)
	events.exit()
end)

splay.net

Collection of methods to ease the usage of TCP and UDP.

As for the RPC servers, when net.server() or net.udp_helper() are running, the program will not finish even if the "main" function (events.run()) finishes: the server thread is still running and waiting for incoming connections. Calling events.exit() will terminate the program.

client(<ip, port>, handlers) Equivalent of server() using multiple handlers but used when we need to connect a peer.
server(<port>, handlers[, max])

Start a new TCP server listening on port "port". Every time a new client connects, the function handler will be called with the socket given as parameter. The server allows "max" connections (sockets) at the same time (default = unlimited)

-- SYNCHRONOUS PROTOCOL
-- a functionnal multi client echo server
-- test it with "telnet localhost 20000"

require("splay.base")
net = require("splay.net")

function echo_server(s)
	local r = s:receive("*l")
	while r do
		s:send(r)
		r = s:receive("*l")
  end
end

net.server(20000, echo_server)

events.run()

When the handler returns, some cleanup is done (such as closing the socket and decrementing the number of current connections). Therefore the handler should not return before being done with the socket. The problem appears with asynchronous protocols where two threads (send and receive) are running.

-- ASYNCHRONOUS PROTOCOL
-- test it with "telnet localhost 20000"

require("splay.base")
net = require("splay.net")

function send(s)
	while events.sleep(1) do
		if not s:send("hello\n") then break end
	end
end

function receive(s)
	s:receive("*l")
	-- will make send() fail
	s:close()
end

function hello_server(s)
	events.thread(function() send(s) end)
	events.thread(function() receive(s) end)
	events.wait("finish_"..tostring(s))
	print("finished")
end

events.run(function()
	net.server(20000, hello_server)
end)

Callback system

Four callback functions can be defined:

  • receive
  • send
  • initialize
  • finalize

All of them are optional. Every callback is called with the corresponding socket as its first argument, and true for its second argument if the current node is the initiator of the connection.

When the socket is initially connected, initialize() is called. If it does not return 'false', send() and receive() threads are launched. If one of them finishes, the other one is killed immediatly. At the end, but before the socket is closed, finalize() is called.

The callback table can be indexed by position (1 = receive, 2 = send, 3 = initialize, 4 = finalize) or with hash names: {initialize = init, send = send, receive = receive, finalize = finalize}

-- ASYNCHRONOUS PROTOCOL
-- test it with "telnet localhost 20000"

-- Using new callback system

require("splay.base")
net = require("splay.net")

function send(s)
	while events.sleep(1) do
		if not s:send("hello\n") then break end
	end
end

function receive(s)
	s:receive("*l")
	-- if receive() ends, send() will be killed
end

events.run(function()
	net.server(20000, {receive, send})
end)
-- ASYNCHRONOUS PROTOCOL

-- A more complete example with all callbacks
-- and a client

require("splay.base")
net = require("splay.net")

function send(s)
  while events.sleep(1) do
    if not s:send("hello\n") then break end
  end
end

function receive(s)
  s:receive("*l")
end

function init(s, connect)
  -- if this function returns false,
  -- the connection will be closed immediatly
  local ip, port = s:getpeername()
  if connect then
    print("connection to: "..ip..":"..port)
  else
    print("connection from: "..ip..":"..port)
  end
end

function final(s)
  local ip, port = s:getpeername()
  print("closing: "..ip..":"..port)
end

function c_send(s)
  events.sleep(10)
  s:send("bye\n")
end

function c_receive(s)
  while true do
    local m = s:receive("*l")
    if not m then break end
    print(m)
  end
end

events.run(function()
  net.server(20000, {receive, send, init, final})
  net.client({ip = "localhost", port = 20000},
    {send = c_send, receive = c_receive,
    finalize = final, initialize = init})
end)
stop_server(<port>) Stops the TCP server listening on port "port".
udp_helper(<port>, func)

Returns a new "UDP object" and starts a server listening on port "port" and calling the function "func" when we receive an incoming UDP packet.

If the handler value is nil, for each packet, a new event is fired with the name "udp:<port>".

This function permits the reuse of the UDP socket that is used for the server.

require("splay.base")
net = require("splay.net")

function print_server(data, ip, port)
  print(">>>", data, ip, port)
end
u = net.udp_helper(20000, print_server)

events.run(function()
  u.s:sendto("hello", "127.0.0.1", 20000)
  u.s:sendto("world", "127.0.0.1", 20000)
	events.exit() -- net.udp_helper() is still running...
end)

splay.log (and splay.out)

SPLAY's log system allows logging of messages with different levels of criticity and through differents outputs. splay.out contains I/Os calls for some common outputs such as disk and network.

Log levels:

  • (1) debug
  • (2) notice
  • (3) warning
  • (4) error
  • (5) print
global_level (variable) Permits to set the log of the default log instance 'log'.
new([level [, prefix]]) Creates a new log object that will log with level 'level' and append a 'prefix' before output. Default level: 3 (warning).
log_object.level [variable] From 1 (debug) to 5 (print), allows to choose the minimum log level.
log:debug(msg)
log:notice(msg)
log:warn(msg)
log:error(msg)
log:print(msg)
require("splay.base")

events.run(function()

	-- standard log
	log:debug("debug") -- will not be displayed
	log:print("print")
	log.global_level = 1
	log:debug("debug again")

	-- log object
	local l_o = log.new(2, "[my_log]") -- default level set to 'notice'
	l_o:debug("debug") -- will not be displayed
	l_o.level = 1
	l_o:debug("debug again")
end)
out.print() Returns a function that takes a parameter and print it to the standard output (+ flush the output automatically). By default, this function is used by the log system.
out.file(file_name) Returns a function that takes a parameter and appends its value of its parameter to the file given by file_name.
out.network(ip, port) Returns a function that will take a parameter and send it through the TCP socket opened with ip and port.

The SPLAY log system has been designed to allow a complete and easy customization. Every function can be overridden to implement a custom behavior. The example below shows how to redirect logs.

require("splay.base")
out = require("splay.out")

events.run(function()

	l_o = log.new(2, "[mylog]")

	l_o:info("test") -- => [mylog] test

	-- we customize write to append current thread name and time
	-- the 3 dots (...) are an Lua construct for variable arguments
	l_o.write = function(level, ...)
		-- Still call the global_write to add the level 'labels'
		return log.global_write(
				level, coroutine.running(), string.format("%.4f", misc.time()), ...)
	end

	l_o:info("test") -- => [mylog] N: thread: 0x8b784d8  1233181778.3707  test

	-- we override the out function to log to a file
	l_o.out = out.file("my_log.txt")
	l_o:info("test")

	-- launch: nc -l -p 20000
	-- we override the out function to do network logging
	l_o.out = out.network("localhost", 20000)
	l_o:info("test")
end)

Most SPLAY libraries contain an integrated log object (with default level set to warning). This object is called "l_o". It can be customized so as to change its defaults parameters (level, outputs, ...) or completely replaced by another method.

require("splay.base")
rpc = require("splay.rpc")

rpc.l_o.level = 2

splay.misc

A container for various useful functions.

dup(e) Recursively duplicates an element (generally a table).
a = {s = "hello"}
b = a
b.s = "world"
print(a.s) -- => "world"

a = {s = "hello"}
b = misc.dup(a)
b.s = "world"
print(a.s) -- => "hello"
equal(o1, o2) Compares two "objects" (recursively).
split(s, sep) Splits a string 's' based on the separator 'sep'. Returns a table.
size(t) Returns the size of a table, counts all elements (see isize() for an example).
isize(t) Returns the size of a table watching only the highest numerical index.
a = {"a", "b", "c"}

print(#a) -- => 3
-- a = {1 = "a", 2 = "b", 3 = "c"}

a[2] = nil

print(#a) -- => 1
-- a = {1 = "a", 3 = "c"}
-- but # count only numerical indexes without holes (nil is a hole)

print(misc.isize(a)) -- => 3
-- because isize search the biggest numerical index

print(misc.size(a)) -- => 2
-- count all elements (nil are not elements)

a["key"] = "val"
a["key2"] = "val2"
-- a = {1 = "a", 3 = "c", key = "val", key2 = "val2"}

print(#a) -- => 1
print(misc.isize(a)) -- => 3
print(misc.size(a)) -- => 4
random_pick(t[, n]) Picks randomly 'n' elements from a table 't'. If the parameter 'n' is not given, picks only one element and returns it. If 'n' is given, return an array of n picked elements without repetition without duplicates.
shuffle(a) Returns a new array containing the same elements as "a" but randomly shuffled.
time() Return the UNIX time (milliseconds precision).
between_c(i, a, b) Circular between (clockwise). Return true if 'i' is between 'a' and 'b' on a circular ring. Useful for DHT overlays.
gen_string(mult[, string]) Generates a string resulting of 'mult' times 'string' concatenations. If no 'string' parameter is given, this function will generate a string of length 'mult' (containing only the 'a' character).
merge(t1, t2[, t3, .., tn]) Tries to merge n tables (2 by 2 from left). If the tables are both arrays, merge concatenates elements from the first, then from the second array. If one or more are mixed or hash maps, merge does a key-based merge and gives the priority to the first one in the case of collisions. Return a new table.
hash_ascii_to_byte(s) Transforms a hexadecimal ASCII string in bytes (2 times shorter).
assert_object(object) Returns a new object that calls assert() on all function calls on the wrapped object.
assert_function(func) Returns a new function that calls assert() before calling "func".
convert_base(input, b1, b2) Converts "input" (string or number) in base "b1" (default 10) to base "b2" (default 16). Return a string. Base accepted: 2 to 26.
table_keyset(t) Returns the key set from a table (all keys that map to a non-nil value).

splay.llenc

Block encoding for network transfers.

wrap(socket) After wrapping, the socket can send and receive full blocks of any kind of data.
socket:send(block) Sends a block (can be a numerical array of blocks, in that case multiple send will be done).
socket:receive(max_length) Receives a block or nil; "error" if the block is too long.
socket:receive_array(number, max_length) Same as receive() but waits "number" blocks and replies with a table containing them.

splay.json

Data encoding for network transfers.

wrap(socket) After wrapping, the socket can send and receive any kind of Lua data (that can be serialized).
socket:send(data) Sends a Lua data structure.
socket:receive() Receives a Lua data structure or nil if there was an error.

splay.benc

Bittorrent encoding library. Extended to encode all Lua data but still compatible with other BitTorrent clients and swarms. This encoding is very fast and used by default by RPCs.

wrap(socket) After wrapping, the socket can send and receive any kind of Lua data (that can be serialized). This wrapper will also use a LLenc wrapper.
socket:send(block) Sends a Lua data structure.
socket:receive() Receives a Lua data structure or nil if there is an error.
encode(data) Encodes data with bencoding.
decode(data) Decodes bencoded data.

splay.bits

Bits representation as a high level structure (table) with conversions (from and) to binary strings. The table contain true and false to represent 1 and 0.

init(bits, size) Initializes a bit table of size "size". All values are false (0).
ascii_to_bits(s[, max_length]) Converts a binary string (8bits/char) to a bit table.
bits_to_ascii(bits) Returns a binary string (8bits/char) from a bit table.
show_bits(bits) Returns a human string representation of the bit table (or a binary string): "00110000 11001111...".
size(bits) Returns the size of the bit table (or a binary string).
count(bits) Returns the number of '1's in the bit table (or a binary string).
is_set(bits, bit) Returns if bit number "bit" is set. The first parameter can be a bit table or a binary string.

splay.utils

Collection of methods for local testing, when the application runs outside the SPLAY environment and thus do not have access to runtime support such as the list of alive nodes, etc.

generate_job(position, [nb_nodes, [first_port, [list_size, [random]]]])

Generates a fake job variable for local testing.

require("splay.base")

if not job then
	-- can NOT be required in SPLAY deployments !
	local utils = require("splay.utils")
	if #arg < 2 then
		print("lua "..arg[0].." my_position nb_nodes")
		os.exit()
	else
		local pos, total = tonumber(arg[1]), tonumber(arg[2])
		job = utils.generate_job(pos, total, 20001)
	end
end

-- now you can use "job" as in real deployment

The run of the application can then be done using a script such as:

#!/bin/bash

max=50

killall lua
sleep 1

for (( n=1;n<=$[$max];n++ )); do
	rm $n.log > /dev/null 2>&1
	lua your_app.lua $n $max > $n.log 2>&1 &
done

splay.restricted_sockets

When deploying a SPLAY application, it will be sandboxed. Restricted socket is the network sandbox. While keeping LuaSocket compatibility, it provides additional functions.

These additionnal functions are only available when sockets are restricted (when running as part of a SPLAY deployment and instantiated by a splay daemon). They can be used to monitor the real usage of the protocols, or to adapt to the live behavior of the application.

socket.infos() Returns a (human readable) string describing state of the limitations and the current usage. To get numerical values use limits() and stats().
socket.limits() Returns max_send, max_receive, max_sockets, start_port, end_port, local_ip
socket.stats() Returns total_sent, total_received, total_tcp_sockets, total_udp_sockets

splay.restricted_io

When deploying a SPLAY application on a testbed where the splay daemons have been set up, the application will be sandboxed. This section will describe how sandboxing is done for the Lua IO library, particulary file system accesses. There is no sandboxing when running an application locally.

Similarly to restricted_socket that wraps LuaSocket, restricted_io wraps original IO functions behind a security layer that applies additionnal restrictions as set up by the administrator or by the user when doing the deployment.

First, the library needs to be initialized via the init() function. This function allows to set the requires restrictions and, in particular, to give the path of the folder that will be the root of the virtual restricted file system.

When opening a file in RIO (Restricted IO), the file path will be mapped to a flat file using a MD5 hash. All virtual files appear in the same directory with different hash names. If set up by the administrator, additionnal restrictions will be applied like:

  • number of file descriptors
  • maximum number of files
  • maximum disk space

The total disk space used can be a little more than the total disk space as the underlying file system generally allocates by block. In the worst case, the real size on disk will be: block_size * max_files + max_size.

init(settings) RIO will not work correctly if it is not initialized using this function.
require("splay.base")

-- Force restricted_io for local tests 
io = require("splay.restricted_io")

-- All parameters except directory are optional
io.init({
	directory = "tmpfs", -- mandatory
	no_std = true, -- disable stdin, stdout and stderr
	keep_popen = false, -- disable dangerous popen() function
	max_file_descriptors = 128,
	max_size = 128 * 1024 * 1024, -- total file system size
	max_files = 1024,
	clean = true -- delete all files in the directory
})

-- use IO as usual
All Lua IO functions Lua IO API

Local execution vs. deployement over a testbed

This section describes how to code applications that can run both locally and in the deployment execution environment (instantiated by a SPLAY daemon) without modifications.

When the application is deployed in the context of a SPLAY testbed, each node is given an environment variable (a table), named "job", that contains various information useful for the execution. In a local run, it is necessary to recreate this "job" table so that the remaining of the code remains unchanged. Checking for the type of deployment is made by checking the existence of the "job" variable. If it does not exist, then the current deployment is a local one.

"job" is a table and contains:

  • job.me: the node actually running the instance of the application
  • job.position: the node absolute position in the job list (lists of all nodes in the system, including currently offline ones in a churn-replay context)
  • job.nodes(): a function that returns the list of alive nodes, given by the splayd, at the time the node is put online. Note that job.me == job.nodes()[job.position] only if type of list is "head" (not "random") and if list size >= job.position, which means that with a random list a node does never have itself in its view. Previous versions of the Splay library had a different API (ie job.nodes was a static table).
  • job.list_type: type of the list: "head" (first x nodes in the job list) or "random"

For more informations about what a "node" is, consult API section.

By checking the presence of the "job" variable, an instance of the application can know if it runs locally or under deployment. In the former case, one has probably launched application directly within the Lua interpreter. In that case, one can use additionnal command line arguments.

One can specialize the behavior of the application by using the job informations too. For example, if the job instance receives a "head" type list, the rendez-vous node can be the first one in the list. If the list type is "random", the rendez-vous node can be an external hard coded node.

require("splay.base")

if job then -- SPLAY deployment
	me = job.me
else -- local
	if arg[1] and arg[2] then
		me = {ip = arg[1], port = tonumber(arg[2])}
	else
		print(arg[0].." <ip> <port>")
		os.exit()
	end
end

...

Also see splay.utils.generate_job() to auto generate a fake "job" variable for local testing.

LuaSocket

SPLAY libraries keep the whole LuaSocket API unchanged, adding our restrictions and events layers. Its syntax has been extended by calls accepting nodes in the SPLAY format whenever possible.

Visit the LuaSocket homepage for more informations.

Restrictions

SPLAY applications run in a sanbox. The list of disabled functions is:

  • load()
  • loadfile()
  • dofile()
  • os.execute()
  • os.getenv()
  • os.setlocale()
  • io.popen()
  • debug.*

These functions are disabled as they introduce security issues or allow to get information about the host running the SPLAY daemon.

All other Lua functions and standard libraries are available. Most IO functions have been wrapped to work in a secure virtual file system but the API is kept the same.

If the use of such a function is really needed locally, its existence can be verified before calling it (or based on the existence of the "job" variable).

if loadfile then
	loadfile("filename")
end

Error handling in Lua/SPLAY

In Lua, a convention for most functions is to return the result when there are no errors and to return nil, "error message" if something goes wrong. SPLAY follows this convention.

function divide(a, b)
	if b == 0 then
		return nil, "divide by 0"
	else
		return a / b
	end
end

print(divide(5, 0))

Lua has another mechanism for handling errors using the two functions error() and pcall(). Protect call allows the execution of a function in a "protected" environment and "catches" errors raised by error().

If there are no errors, pcall() returns true and the return values of the embedded function. If there is an error, pcall() returns false and the error description.

An uncatched error causes the thread to die.

function divide(a, b)
	if b == 0 then
		error("divide by 0")
	end
	return a / b
end

print(pcall(function() divide(5, 0) end))

LuaSocket provides a different way to define "protected functions" and to define a finalizer code if a problem occurs. This article explains how it works. Using it is done by adding:

require("splay.base")
protect, try, newtry = socket.protect, socket.try, socket.newtry

SPLAY defines another error-handling method, more similar to the classical exception handling mechanisms of common languages. It requires a try() function that takes the function to execute as the first parameter, and an array of "catch" functions that will be executed if an exception has been thrown.

require("splay.base")
try, throw = misc.try, misc.throw

try(function()
	try(function()
			print("start of my function")
			--throw("div_by_0")
			throw("runtime", "division by 0")
			print("end of my function")
		end,
		{
			div_by_0 = function(value)
				print("Division by 0")
			end
		}
	)
	end,
	{
		runtime = function(value)
			print("*runtime*", value)
		end,
		another_exception = function()
			print("not used")
		end
	}
)

Extend SPLAY

SPLAY can be extended both by Lua modules (libraries) and/or C modules.

Additional modules can be loaded as usual (see Lua documentation). To allow the new module in a deployment it is necessary to edit the file jobd.lua and add the name of the module in the sandbox white list. The official SPLAYd libraries set only include modules that do not present any security/ressources issues.