Update generated code from fix in zproto

This commit is contained in:
Oran Juice 2015-03-23 19:59:32 +05:30
parent 77c6e85cbb
commit 322900b374
No known key found for this signature in database
GPG Key ID: 71C5AF46CCB28124
8 changed files with 215 additions and 94 deletions

View File

@ -16,8 +16,10 @@
========================================================================= =========================================================================
*/ */
#ifndef __WAP_CLIENT_H_INCLUDED__ #ifndef WAP_CLIENT_H_INCLUDED
#define __WAP_CLIENT_H_INCLUDED__ #define WAP_CLIENT_H_INCLUDED
#include <czmq.h>
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
@ -30,14 +32,12 @@ typedef struct _wap_client_t wap_client_t;
#endif #endif
// @interface // @interface
// Create a new wap_client // Create a new wap_client, return the reference if successful, or NULL
// Connect to server endpoint, with specified timeout in msecs (zero means wait // if construction failed due to lack of available memory.
// forever). Constructor succeeds if connection is successful. The caller may
// specify its address.
WAP_EXPORT wap_client_t * WAP_EXPORT wap_client_t *
wap_client_new (const char *endpoint, uint32_t timeout, const char *identity); wap_client_new (void);
// Destroy the wap_client // Destroy the wap_client and free all memory used by the object.
WAP_EXPORT void WAP_EXPORT void
wap_client_destroy (wap_client_t **self_p); wap_client_destroy (wap_client_t **self_p);
@ -54,6 +54,13 @@ WAP_EXPORT zactor_t *
WAP_EXPORT zsock_t * WAP_EXPORT zsock_t *
wap_client_msgpipe (wap_client_t *self); wap_client_msgpipe (wap_client_t *self);
// Connect to server endpoint, with specified timeout in msecs (zero means wait
// forever). Constructor succeeds if connection is successful. The caller may
// specify its address.
// Returns >= 0 if successful, -1 if interrupted.
WAP_EXPORT int
wap_client_connect (wap_client_t *self, const char *endpoint, uint32_t timeout, const char *identity);
// Request a set of blocks from the server. // Request a set of blocks from the server.
// Returns >= 0 if successful, -1 if interrupted. // Returns >= 0 if successful, -1 if interrupted.
WAP_EXPORT int WAP_EXPORT int
@ -120,7 +127,7 @@ WAP_EXPORT zframe_t *
// Self test of this class // Self test of this class
WAP_EXPORT void WAP_EXPORT void
wap_client_test (bool verbose); wap_client_test (bool verbose);
// To enable verbose tracing (animation) of wap_client instances, set // To enable verbose tracing (animation) of wap_client instances, set
// this to true. This lets you trace from and including construction. // this to true. This lets you trace from and including construction.
WAP_EXPORT extern volatile int WAP_EXPORT extern volatile int

View File

@ -39,7 +39,7 @@ typedef enum {
typedef enum { typedef enum {
NULL_event = 0, NULL_event = 0,
constructor_event = 1, connect_event = 1,
bad_endpoint_event = 2, bad_endpoint_event = 2,
open_ok_event = 3, open_ok_event = 3,
expired_event = 4, expired_event = 4,
@ -88,7 +88,7 @@ s_state_name [] = {
static char * static char *
s_event_name [] = { s_event_name [] = {
"(NONE)", "(NONE)",
"constructor", "connect",
"bad_endpoint", "bad_endpoint",
"OPEN_OK", "OPEN_OK",
"expired", "expired",
@ -113,7 +113,7 @@ s_event_name [] = {
"command_invalid", "command_invalid",
"other" "other"
}; };
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
// Context for the client. This embeds the application-level client context // Context for the client. This embeds the application-level client context
@ -142,6 +142,7 @@ typedef struct {
wap_proto_t *message; // Message received or sent wap_proto_t *message; // Message received or sent
client_args_t args; // Method arguments structure client_args_t args; // Method arguments structure
bool terminated; // True if client is shutdown bool terminated; // True if client is shutdown
bool fsm_stopped; // "terminate" action called
size_t timeout; // inactivity timeout, msecs size_t timeout; // inactivity timeout, msecs
state_t state; // Current state state_t state; // Current state
event_t event; // Current event event_t event; // Current event
@ -216,7 +217,7 @@ static void
// Global tracing/animation indicator; we can't use a client method as // Global tracing/animation indicator; we can't use a client method as
// that only works after construction (which we often want to trace). // that only works after construction (which we often want to trace).
volatile int wap_client_verbose = false; volatile int wap_client_verbose = false;
// Create a new client connection // Create a new client connection
static s_client_t * static s_client_t *
@ -441,7 +442,7 @@ s_protocol_event (s_client_t *self, wap_proto_t *message)
// Execute state machine as long as we have events; if event is NULL_event, // Execute state machine as long as we have events; if event is NULL_event,
// or state machine is terminated, do nothing. // or state machine is stopped, do nothing.
static void static void
s_client_execute (s_client_t *self, event_t event) s_client_execute (s_client_t *self, event_t event)
@ -452,7 +453,9 @@ s_client_execute (s_client_t *self, event_t event)
zloop_timer_end (self->loop, self->wakeup_timer); zloop_timer_end (self->loop, self->wakeup_timer);
self->wakeup_timer = 0; self->wakeup_timer = 0;
} }
while (!self->terminated && self->next_event != NULL_event) { while (!self->terminated // Actor is dying
&& !self->fsm_stopped // FSM has finished
&& self->next_event != NULL_event) {
self->event = self->next_event; self->event = self->next_event;
self->next_event = NULL_event; self->next_event = NULL_event;
self->exception = NULL_event; self->exception = NULL_event;
@ -462,7 +465,7 @@ s_client_execute (s_client_t *self, event_t event)
} }
switch (self->state) { switch (self->state) {
case start_state: case start_state:
if (self->event == constructor_event) { if (self->event == connect_event) {
if (!self->exception) { if (!self->exception) {
// connect to server endpoint // connect to server endpoint
if (wap_client_verbose) if (wap_client_verbose)
@ -503,7 +506,7 @@ s_client_execute (s_client_t *self, event_t event)
// terminate // terminate
if (wap_client_verbose) if (wap_client_verbose)
zsys_debug ("wap_client: $ terminate"); zsys_debug ("wap_client: $ terminate");
self->terminated = true; self->fsm_stopped = true;
} }
} }
else { else {
@ -543,11 +546,14 @@ s_client_execute (s_client_t *self, event_t event)
// terminate // terminate
if (wap_client_verbose) if (wap_client_verbose)
zsys_debug ("wap_client: $ terminate"); zsys_debug ("wap_client: $ terminate");
self->terminated = true; self->fsm_stopped = true;
} }
} }
else else
if (self->event == ping_ok_event) { if (self->event == ping_ok_event) {
// No action - just logging
if (wap_client_verbose)
zsys_debug ("wap_client: $ ping_ok");
} }
else else
if (self->event == error_event) { if (self->event == error_event) {
@ -562,6 +568,9 @@ s_client_execute (s_client_t *self, event_t event)
} }
else { else {
// Handle unexpected protocol events // Handle unexpected protocol events
// No action - just logging
if (wap_client_verbose)
zsys_debug ("wap_client: $ *");
} }
break; break;
@ -709,6 +718,9 @@ s_client_execute (s_client_t *self, event_t event)
} }
else else
if (self->event == ping_ok_event) { if (self->event == ping_ok_event) {
// No action - just logging
if (wap_client_verbose)
zsys_debug ("wap_client: $ ping_ok");
} }
else else
if (self->event == error_event) { if (self->event == error_event) {
@ -723,6 +735,9 @@ s_client_execute (s_client_t *self, event_t event)
} }
else { else {
// Handle unexpected protocol events // Handle unexpected protocol events
// No action - just logging
if (wap_client_verbose)
zsys_debug ("wap_client: $ *");
} }
break; break;
@ -739,6 +754,9 @@ s_client_execute (s_client_t *self, event_t event)
} }
else else
if (self->event == ping_ok_event) { if (self->event == ping_ok_event) {
// No action - just logging
if (wap_client_verbose)
zsys_debug ("wap_client: $ ping_ok");
} }
else else
if (self->event == error_event) { if (self->event == error_event) {
@ -753,6 +771,9 @@ s_client_execute (s_client_t *self, event_t event)
} }
else { else {
// Handle unexpected protocol events // Handle unexpected protocol events
// No action - just logging
if (wap_client_verbose)
zsys_debug ("wap_client: $ *");
} }
break; break;
@ -769,6 +790,9 @@ s_client_execute (s_client_t *self, event_t event)
} }
else else
if (self->event == ping_ok_event) { if (self->event == ping_ok_event) {
// No action - just logging
if (wap_client_verbose)
zsys_debug ("wap_client: $ ping_ok");
} }
else else
if (self->event == error_event) { if (self->event == error_event) {
@ -783,6 +807,9 @@ s_client_execute (s_client_t *self, event_t event)
} }
else { else {
// Handle unexpected protocol events // Handle unexpected protocol events
// No action - just logging
if (wap_client_verbose)
zsys_debug ("wap_client: $ *");
} }
break; break;
@ -799,6 +826,9 @@ s_client_execute (s_client_t *self, event_t event)
} }
else else
if (self->event == ping_ok_event) { if (self->event == ping_ok_event) {
// No action - just logging
if (wap_client_verbose)
zsys_debug ("wap_client: $ ping_ok");
} }
else else
if (self->event == error_event) { if (self->event == error_event) {
@ -813,6 +843,9 @@ s_client_execute (s_client_t *self, event_t event)
} }
else { else {
// Handle unexpected protocol events // Handle unexpected protocol events
// No action - just logging
if (wap_client_verbose)
zsys_debug ("wap_client: $ *");
} }
break; break;
@ -829,6 +862,9 @@ s_client_execute (s_client_t *self, event_t event)
} }
else else
if (self->event == ping_ok_event) { if (self->event == ping_ok_event) {
// No action - just logging
if (wap_client_verbose)
zsys_debug ("wap_client: $ ping_ok");
} }
else else
if (self->event == error_event) { if (self->event == error_event) {
@ -843,6 +879,9 @@ s_client_execute (s_client_t *self, event_t event)
} }
else { else {
// Handle unexpected protocol events // Handle unexpected protocol events
// No action - just logging
if (wap_client_verbose)
zsys_debug ("wap_client: $ *");
} }
break; break;
@ -859,6 +898,9 @@ s_client_execute (s_client_t *self, event_t event)
} }
else else
if (self->event == ping_ok_event) { if (self->event == ping_ok_event) {
// No action - just logging
if (wap_client_verbose)
zsys_debug ("wap_client: $ ping_ok");
} }
else else
if (self->event == error_event) { if (self->event == error_event) {
@ -873,6 +915,9 @@ s_client_execute (s_client_t *self, event_t event)
} }
else { else {
// Handle unexpected protocol events // Handle unexpected protocol events
// No action - just logging
if (wap_client_verbose)
zsys_debug ("wap_client: $ *");
} }
break; break;
@ -889,6 +934,9 @@ s_client_execute (s_client_t *self, event_t event)
} }
else else
if (self->event == ping_ok_event) { if (self->event == ping_ok_event) {
// No action - just logging
if (wap_client_verbose)
zsys_debug ("wap_client: $ ping_ok");
} }
else else
if (self->event == error_event) { if (self->event == error_event) {
@ -903,6 +951,9 @@ s_client_execute (s_client_t *self, event_t event)
} }
else { else {
// Handle unexpected protocol events // Handle unexpected protocol events
// No action - just logging
if (wap_client_verbose)
zsys_debug ("wap_client: $ *");
} }
break; break;
@ -919,6 +970,9 @@ s_client_execute (s_client_t *self, event_t event)
} }
else else
if (self->event == ping_ok_event) { if (self->event == ping_ok_event) {
// No action - just logging
if (wap_client_verbose)
zsys_debug ("wap_client: $ ping_ok");
} }
else else
if (self->event == error_event) { if (self->event == error_event) {
@ -933,6 +987,9 @@ s_client_execute (s_client_t *self, event_t event)
} }
else { else {
// Handle unexpected protocol events // Handle unexpected protocol events
// No action - just logging
if (wap_client_verbose)
zsys_debug ("wap_client: $ *");
} }
break; break;
@ -948,7 +1005,7 @@ s_client_execute (s_client_t *self, event_t event)
// terminate // terminate
if (wap_client_verbose) if (wap_client_verbose)
zsys_debug ("wap_client: $ terminate"); zsys_debug ("wap_client: $ terminate");
self->terminated = true; self->fsm_stopped = true;
} }
} }
else else
@ -963,11 +1020,14 @@ s_client_execute (s_client_t *self, event_t event)
// terminate // terminate
if (wap_client_verbose) if (wap_client_verbose)
zsys_debug ("wap_client: $ terminate"); zsys_debug ("wap_client: $ terminate");
self->terminated = true; self->fsm_stopped = true;
} }
} }
else else
if (self->event == ping_ok_event) { if (self->event == ping_ok_event) {
// No action - just logging
if (wap_client_verbose)
zsys_debug ("wap_client: $ ping_ok");
} }
else else
if (self->event == error_event) { if (self->event == error_event) {
@ -982,11 +1042,17 @@ s_client_execute (s_client_t *self, event_t event)
} }
else { else {
// Handle unexpected protocol events // Handle unexpected protocol events
// No action - just logging
if (wap_client_verbose)
zsys_debug ("wap_client: $ *");
} }
break; break;
case defaults_state: case defaults_state:
if (self->event == ping_ok_event) { if (self->event == ping_ok_event) {
// No action - just logging
if (wap_client_verbose)
zsys_debug ("wap_client: $ ping_ok");
} }
else else
if (self->event == error_event) { if (self->event == error_event) {
@ -1001,6 +1067,9 @@ s_client_execute (s_client_t *self, event_t event)
} }
else { else {
// Handle unexpected protocol events // Handle unexpected protocol events
// No action - just logging
if (wap_client_verbose)
zsys_debug ("wap_client: $ *");
} }
break; break;
@ -1034,7 +1103,7 @@ s_client_execute (s_client_t *self, event_t event)
// terminate // terminate
if (wap_client_verbose) if (wap_client_verbose)
zsys_debug ("wap_client: $ terminate"); zsys_debug ("wap_client: $ terminate");
self->terminated = true; self->fsm_stopped = true;
} }
} }
else { else {
@ -1058,6 +1127,9 @@ s_client_execute (s_client_t *self, event_t event)
} }
else else
if (self->event == ping_ok_event) { if (self->event == ping_ok_event) {
// No action - just logging
if (wap_client_verbose)
zsys_debug ("wap_client: $ ping_ok");
} }
else else
if (self->event == error_event) { if (self->event == error_event) {
@ -1072,6 +1144,9 @@ s_client_execute (s_client_t *self, event_t event)
} }
else { else {
// Handle unexpected protocol events // Handle unexpected protocol events
// No action - just logging
if (wap_client_verbose)
zsys_debug ("wap_client: $ *");
} }
break; break;
} }
@ -1123,11 +1198,11 @@ s_client_handle_cmdpipe (zloop_t *loop, zsock_t *reader, void *argument)
if (streq (method, "$TERM")) if (streq (method, "$TERM"))
self->terminated = true; // Shutdown the engine self->terminated = true; // Shutdown the engine
else else
if (streq (method, "CONSTRUCTOR")) { if (streq (method, "CONNECT")) {
zstr_free (&self->args.endpoint); zstr_free (&self->args.endpoint);
zstr_free (&self->args.identity); zstr_free (&self->args.identity);
zsock_recv (self->cmdpipe, "sis", &self->args.endpoint, &self->args.timeout, &self->args.identity); zsock_recv (self->cmdpipe, "s4s", &self->args.endpoint, &self->args.timeout, &self->args.identity);
s_client_execute (self, constructor_event); s_client_execute (self, connect_event);
} }
else else
if (streq (method, "DESTRUCTOR")) { if (streq (method, "DESTRUCTOR")) {
@ -1136,7 +1211,7 @@ s_client_handle_cmdpipe (zloop_t *loop, zsock_t *reader, void *argument)
else else
if (streq (method, "BLOCKS")) { if (streq (method, "BLOCKS")) {
zlist_destroy (&self->args.block_ids); zlist_destroy (&self->args.block_ids);
zsock_recv (self->cmdpipe, "pi", &self->args.block_ids, &self->args.start_height); zsock_recv (self->cmdpipe, "p8", &self->args.block_ids, &self->args.start_height);
s_client_execute (self, blocks_event); s_client_execute (self, blocks_event);
} }
else else
@ -1164,7 +1239,7 @@ s_client_handle_cmdpipe (zloop_t *loop, zsock_t *reader, void *argument)
else else
if (streq (method, "START")) { if (streq (method, "START")) {
zstr_free (&self->args.address); zstr_free (&self->args.address);
zsock_recv (self->cmdpipe, "si", &self->args.address, &self->args.thread_count); zsock_recv (self->cmdpipe, "s8", &self->args.address, &self->args.thread_count);
s_client_execute (self, start_event); s_client_execute (self, start_event);
} }
else else
@ -1199,8 +1274,11 @@ s_client_handle_msgpipe (zloop_t *loop, zsock_t *reader, void *argument)
if (wap_client_verbose) if (wap_client_verbose)
zsys_debug ("wap_client: API message=%s", method); zsys_debug ("wap_client: API message=%s", method);
// Front-end shuts down msgpipe before cmdpipe // Front-end shuts down msgpipe before cmdpipe, this little
if (streq (method, "$TERM")) // handshake just ensures all traffic on the msgpipe has been
// flushed before the calling thread continues with destroying
// the actor.
if (streq (method, "$FLUSH"))
zsock_signal (self->cmdpipe, 0); zsock_signal (self->cmdpipe, 0);
// Cleanup pipe if any argument frames are still waiting to be eaten // Cleanup pipe if any argument frames are still waiting to be eaten
if (zsock_rcvmore (self->msgpipe)) { if (zsock_rcvmore (self->msgpipe)) {
@ -1256,7 +1334,7 @@ wap_client (zsock_t *cmdpipe, void *msgpipe)
s_client_t *self = s_client_new (cmdpipe, (zsock_t *) msgpipe); s_client_t *self = s_client_new (cmdpipe, (zsock_t *) msgpipe);
if (self) { if (self) {
zsock_signal (cmdpipe, 0); zsock_signal (cmdpipe, 0);
// Set up handler for the sockets the client uses // Set up handler for the sockets the client uses
engine_handle_socket ((client_t *) self, self->cmdpipe, s_client_handle_cmdpipe); engine_handle_socket ((client_t *) self, self->cmdpipe, s_client_handle_cmdpipe);
engine_handle_socket ((client_t *) self, self->msgpipe, s_client_handle_msgpipe); engine_handle_socket ((client_t *) self, self->msgpipe, s_client_handle_msgpipe);
@ -1291,25 +1369,16 @@ struct _wap_client_t {
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
// Create a new wap_client // Create a new wap_client
// Connect to server endpoint, with specified timeout in msecs (zero means wait
// forever). Constructor succeeds if connection is successful. The caller may
// specify its address.
static int
wap_client_constructor (wap_client_t *self, const char *endpoint, uint32_t timeout, const char *identity);
WAP_EXPORT wap_client_t * WAP_EXPORT wap_client_t *
wap_client_new (const char *endpoint, uint32_t timeout, const char *identity) wap_client_new (void)
{ {
wap_client_t *self = (wap_client_t *) zmalloc (sizeof (wap_client_t)); wap_client_t *self = (wap_client_t *) zmalloc (sizeof (wap_client_t));
if (self) { if (self) {
zsock_t *backend; zsock_t *backend;
self->msgpipe = zsys_create_pipe (&backend); self->msgpipe = zsys_create_pipe (&backend);
self->actor = zactor_new (wap_client, backend); if (self->msgpipe)
if (self->actor) self->actor = zactor_new (wap_client, backend);
self->status = wap_client_constructor (self, endpoint, timeout, identity);
if (self->status == -1)
zactor_destroy (&self->actor);
if (!self->actor) if (!self->actor)
wap_client_destroy (&self); wap_client_destroy (&self);
} }
@ -1332,10 +1401,12 @@ wap_client_destroy (wap_client_t **self_p)
if (*self_p) { if (*self_p) {
wap_client_t *self = *self_p; wap_client_t *self = *self_p;
if (self->actor && !zsys_interrupted) { if (self->actor && !zsys_interrupted) {
// Shut down msgpipe first so that client can do clean shutdown, // Before destroying the actor we have to flush any pending
// sending any pending messages and handshaking goodbye to server // traffic on the msgpipe, otherwise it gets lost in a fire and
zstr_send (self->msgpipe, "$TERM"); // forget scenario. We do this by sending $FLUSH to the msgpipe
zsock_wait (self->actor); // and waiting for a signal back on the cmdpipe.
if (zstr_send (self->msgpipe, "$FLUSH") == 0)
zsock_wait (self->actor);
wap_client_destructor (self); wap_client_destructor (self);
} }
zactor_destroy (&self->actor); zactor_destroy (&self->actor);
@ -1391,7 +1462,7 @@ s_accept_reply (wap_client_t *self, ...)
char *reply = zstr_recv (self->actor); char *reply = zstr_recv (self->actor);
if (!reply) if (!reply)
break; // Interrupted or timed-out break; // Interrupted or timed-out
va_list args; va_list args;
va_start (args, self); va_start (args, self);
char *filter = va_arg (args, char *); char *filter = va_arg (args, char *);
@ -1408,11 +1479,11 @@ s_accept_reply (wap_client_t *self, ...)
else else
if (streq (reply, "BLOCKS OK")) { if (streq (reply, "BLOCKS OK")) {
zmsg_destroy (&self->block_data); zmsg_destroy (&self->block_data);
zsock_recv (self->actor, "iiip", &self->status, &self->start_height, &self->curr_height, &self->block_data); zsock_recv (self->actor, "888p", &self->status, &self->start_height, &self->curr_height, &self->block_data);
} }
else else
if (streq (reply, "PUT OK")) { if (streq (reply, "PUT OK")) {
zsock_recv (self->actor, "i", &self->status); zsock_recv (self->actor, "8", &self->status);
} }
else else
if (streq (reply, "GET OK")) { if (streq (reply, "GET OK")) {
@ -1426,11 +1497,11 @@ s_accept_reply (wap_client_t *self, ...)
else else
if (streq (reply, "OUTPUT INDEXES OK")) { if (streq (reply, "OUTPUT INDEXES OK")) {
zframe_destroy (&self->o_indexes); zframe_destroy (&self->o_indexes);
zsock_recv (self->actor, "ip", &self->status, &self->o_indexes); zsock_recv (self->actor, "8p", &self->status, &self->o_indexes);
} }
else else
if (streq (reply, "START OK")) { if (streq (reply, "START OK")) {
zsock_recv (self->actor, "i", &self->status); zsock_recv (self->actor, "8", &self->status);
} }
else else
if (streq (reply, "STOP OK")) { if (streq (reply, "STOP OK")) {
@ -1458,11 +1529,12 @@ s_accept_reply (wap_client_t *self, ...)
// specify its address. // specify its address.
// Returns >= 0 if successful, -1 if interrupted. // Returns >= 0 if successful, -1 if interrupted.
static int int
wap_client_constructor (wap_client_t *self, const char *endpoint, uint32_t timeout, const char *identity) wap_client_connect (wap_client_t *self, const char *endpoint, uint32_t timeout, const char *identity)
{ {
assert (self); assert (self);
zsock_send (self->actor, "ssis", "CONSTRUCTOR", endpoint, timeout, identity);
zsock_send (self->actor, "ss4s", "CONNECT", endpoint, timeout, identity);
if (s_accept_reply (self, "SUCCESS", "FAILURE", NULL)) if (s_accept_reply (self, "SUCCESS", "FAILURE", NULL))
return -1; // Interrupted or timed-out return -1; // Interrupted or timed-out
return self->status; return self->status;
@ -1478,6 +1550,7 @@ int
wap_client_destructor (wap_client_t *self) wap_client_destructor (wap_client_t *self)
{ {
assert (self); assert (self);
zsock_send (self->actor, "s", "DESTRUCTOR"); zsock_send (self->actor, "s", "DESTRUCTOR");
if (s_accept_reply (self, "SUCCESS", "FAILURE", NULL)) if (s_accept_reply (self, "SUCCESS", "FAILURE", NULL))
return -1; // Interrupted or timed-out return -1; // Interrupted or timed-out
@ -1493,7 +1566,8 @@ int
wap_client_blocks (wap_client_t *self, zlist_t **block_ids_p, uint64_t start_height) wap_client_blocks (wap_client_t *self, zlist_t **block_ids_p, uint64_t start_height)
{ {
assert (self); assert (self);
zsock_send (self->actor, "spi", "BLOCKS", *block_ids_p, start_height);
zsock_send (self->actor, "sp8", "BLOCKS", *block_ids_p, start_height);
*block_ids_p = NULL; // Take ownership of block_ids *block_ids_p = NULL; // Take ownership of block_ids
if (s_accept_reply (self, "BLOCKS OK", "FAILURE", NULL)) if (s_accept_reply (self, "BLOCKS OK", "FAILURE", NULL))
return -1; // Interrupted or timed-out return -1; // Interrupted or timed-out
@ -1509,6 +1583,7 @@ int
wap_client_put (wap_client_t *self, zchunk_t **tx_data_p) wap_client_put (wap_client_t *self, zchunk_t **tx_data_p)
{ {
assert (self); assert (self);
zsock_send (self->actor, "sp", "PUT", *tx_data_p); zsock_send (self->actor, "sp", "PUT", *tx_data_p);
*tx_data_p = NULL; // Take ownership of tx_data *tx_data_p = NULL; // Take ownership of tx_data
if (s_accept_reply (self, "PUT OK", "FAILURE", NULL)) if (s_accept_reply (self, "PUT OK", "FAILURE", NULL))
@ -1525,6 +1600,7 @@ int
wap_client_get (wap_client_t *self, const char *tx_id) wap_client_get (wap_client_t *self, const char *tx_id)
{ {
assert (self); assert (self);
zsock_send (self->actor, "ss", "GET", tx_id); zsock_send (self->actor, "ss", "GET", tx_id);
if (s_accept_reply (self, "GET OK", "FAILURE", NULL)) if (s_accept_reply (self, "GET OK", "FAILURE", NULL))
return -1; // Interrupted or timed-out return -1; // Interrupted or timed-out
@ -1540,6 +1616,7 @@ int
wap_client_save (wap_client_t *self) wap_client_save (wap_client_t *self)
{ {
assert (self); assert (self);
zsock_send (self->actor, "s", "SAVE"); zsock_send (self->actor, "s", "SAVE");
if (s_accept_reply (self, "SAVE OK", "FAILURE", NULL)) if (s_accept_reply (self, "SAVE OK", "FAILURE", NULL))
return -1; // Interrupted or timed-out return -1; // Interrupted or timed-out
@ -1555,6 +1632,7 @@ int
wap_client_output_indexes (wap_client_t *self, const char *tx_id) wap_client_output_indexes (wap_client_t *self, const char *tx_id)
{ {
assert (self); assert (self);
zsock_send (self->actor, "ss", "OUTPUT INDEXES", tx_id); zsock_send (self->actor, "ss", "OUTPUT INDEXES", tx_id);
if (s_accept_reply (self, "OUTPUT INDEXES OK", "FAILURE", NULL)) if (s_accept_reply (self, "OUTPUT INDEXES OK", "FAILURE", NULL))
return -1; // Interrupted or timed-out return -1; // Interrupted or timed-out
@ -1570,7 +1648,8 @@ int
wap_client_start (wap_client_t *self, const char *address, uint64_t thread_count) wap_client_start (wap_client_t *self, const char *address, uint64_t thread_count)
{ {
assert (self); assert (self);
zsock_send (self->actor, "ssi", "START", address, thread_count);
zsock_send (self->actor, "ss8", "START", address, thread_count);
if (s_accept_reply (self, "START OK", "FAILURE", NULL)) if (s_accept_reply (self, "START OK", "FAILURE", NULL))
return -1; // Interrupted or timed-out return -1; // Interrupted or timed-out
return self->status; return self->status;
@ -1585,6 +1664,7 @@ int
wap_client_stop (wap_client_t *self) wap_client_stop (wap_client_t *self)
{ {
assert (self); assert (self);
zsock_send (self->actor, "s", "STOP"); zsock_send (self->actor, "s", "STOP");
if (s_accept_reply (self, "STOP OK", "FAILURE", NULL)) if (s_accept_reply (self, "STOP OK", "FAILURE", NULL))
return -1; // Interrupted or timed-out return -1; // Interrupted or timed-out

View File

@ -18,8 +18,8 @@
========================================================================= =========================================================================
*/ */
#ifndef __WAP_PROTO_H_INCLUDED__ #ifndef WAP_PROTO_H_INCLUDED
#define __WAP_PROTO_H_INCLUDED__ #define WAP_PROTO_H_INCLUDED
/* These are the wap_proto messages: /* These are the wap_proto messages:

View File

@ -16,8 +16,10 @@
========================================================================= =========================================================================
*/ */
#ifndef __WAP_SERVER_H_INCLUDED__ #ifndef WAP_SERVER_H_INCLUDED
#define __WAP_SERVER_H_INCLUDED__ #define WAP_SERVER_H_INCLUDED
#include <czmq.h>
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
@ -29,11 +31,11 @@ extern "C" {
// Create new wap_server instance, passing logging prefix: // Create new wap_server instance, passing logging prefix:
// //
// zactor_t *wap_server = zactor_new (wap_server, "myname"); // zactor_t *wap_server = zactor_new (wap_server, "myname");
// //
// Destroy wap_server instance // Destroy wap_server instance
// //
// zactor_destroy (&wap_server); // zactor_destroy (&wap_server);
// //
// Enable verbose logging of commands and activity: // Enable verbose logging of commands and activity:
// //
// zstr_send (wap_server, "VERBOSE"); // zstr_send (wap_server, "VERBOSE");
@ -54,12 +56,16 @@ extern "C" {
// Specify configuration file to load, overwriting any previous loaded // Specify configuration file to load, overwriting any previous loaded
// configuration file or options: // configuration file or options:
// //
// zstr_sendx (wap_server, "CONFIGURE", filename, NULL); // zstr_sendx (wap_server, "LOAD", filename, NULL);
// //
// Set configuration path value: // Set configuration path value:
// //
// zstr_sendx (wap_server, "SET", path, value, NULL); // zstr_sendx (wap_server, "SET", path, value, NULL);
// //
// Save configuration data to config file on disk:
//
// zstr_sendx (wap_server, "SAVE", filename, NULL);
//
// Send zmsg_t instance to wap_server: // Send zmsg_t instance to wap_server:
// //
// zactor_send (wap_server, &msg); // zactor_send (wap_server, &msg);

View File

@ -378,7 +378,7 @@ s_client_new (s_server_t *server, zframe_t *routing_id)
s_client_t *self = (s_client_t *) zmalloc (sizeof (s_client_t)); s_client_t *self = (s_client_t *) zmalloc (sizeof (s_client_t));
assert (self); assert (self);
assert ((s_client_t *) &self->client == self); assert ((s_client_t *) &self->client == self);
self->server = server; self->server = server;
self->hashkey = zframe_strhex (routing_id); self->hashkey = zframe_strhex (routing_id);
self->routing_id = zframe_dup (routing_id); self->routing_id = zframe_dup (routing_id);
@ -1002,17 +1002,17 @@ static void
s_server_config_global (s_server_t *self) s_server_config_global (s_server_t *self)
{ {
// Built-in server configuration options // Built-in server configuration options
// //
// If we didn't already set verbose, check if the config tree wants it // If we didn't already set verbose, check if the config tree wants it
if (!self->verbose if (!self->verbose
&& atoi (zconfig_resolve (self->config, "server/verbose", "0"))) && atoi (zconfig_resolve (self->config, "server/verbose", "0")))
self->verbose = true; self->verbose = true;
// Default client timeout is 60 seconds // Default client timeout is 60 seconds
self->timeout = atoi ( self->timeout = atoi (
zconfig_resolve (self->config, "server/timeout", "60000")); zconfig_resolve (self->config, "server/timeout", "60000"));
zloop_set_ticket_delay (self->loop, self->timeout); zloop_set_ticket_delay (self->loop, self->timeout);
// Do we want to run server in the background? // Do we want to run server in the background?
int background = atoi ( int background = atoi (
zconfig_resolve (self->config, "server/background", "0")); zconfig_resolve (self->config, "server/background", "0"));
@ -1097,12 +1097,15 @@ s_server_config_service (s_server_t *self)
char *mechanism = zconfig_resolve (section, "mechanism", "null"); char *mechanism = zconfig_resolve (section, "mechanism", "null");
char *domain = zconfig_resolve (section, "domain", NULL); char *domain = zconfig_resolve (section, "domain", NULL);
if (streq (mechanism, "null")) { if (streq (mechanism, "null")) {
zsys_notice ("server is using NULL security");
if (domain) if (domain)
zsock_set_zap_domain (self->router, NULL); zsock_set_zap_domain (self->router, NULL);
} }
else else
if (streq (mechanism, "plain")) if (streq (mechanism, "plain")) {
zsys_notice ("server is using PLAIN security");
zsock_set_plain_server (self->router, 1); zsock_set_plain_server (self->router, 1);
}
else else
zsys_warning ("mechanism=%s is not supported", mechanism); zsys_warning ("mechanism=%s is not supported", mechanism);
} }
@ -1148,20 +1151,20 @@ s_server_handle_pipe (zloop_t *loop, zsock_t *reader, void *argument)
zstr_sendm (self->pipe, "PORT"); zstr_sendm (self->pipe, "PORT");
zstr_sendf (self->pipe, "%d", self->port); zstr_sendf (self->pipe, "%d", self->port);
} }
else else // Deprecated method name
if (streq (method, "CONFIGURE")) { if (streq (method, "LOAD") || streq (method, "CONFIGURE")) {
char *config_file = zmsg_popstr (msg); char *filename = zmsg_popstr (msg);
zconfig_destroy (&self->config); zconfig_destroy (&self->config);
self->config = zconfig_load (config_file); self->config = zconfig_load (filename);
if (self->config) { if (self->config) {
s_server_config_service (self); s_server_config_service (self);
self->server.config = self->config; self->server.config = self->config;
} }
else { else {
zsys_warning ("cannot load config file '%s'\n", config_file); zsys_warning ("cannot load config file '%s'", filename);
self->config = zconfig_new ("root", NULL); self->config = zconfig_new ("root", NULL);
} }
free (config_file); free (filename);
} }
else else
if (streq (method, "SET")) { if (streq (method, "SET")) {
@ -1176,6 +1179,13 @@ s_server_handle_pipe (zloop_t *loop, zsock_t *reader, void *argument)
free (path); free (path);
free (value); free (value);
} }
else
if (streq (method, "SAVE")) {
char *filename = zmsg_popstr (msg);
if (zconfig_save (self->config, filename))
zsys_warning ("cannot save config file '%s'", filename);
free (filename);
}
else { else {
// Execute custom method // Execute custom method
zmsg_t *reply = server_method (&self->server, method, msg); zmsg_t *reply = server_method (&self->server, method, msg);
@ -1211,7 +1221,7 @@ s_server_handle_protocol (zloop_t *loop, zsock_t *reader, void *argument)
// Any input from client counts as activity // Any input from client counts as activity
if (client->ticket) if (client->ticket)
zloop_ticket_reset (self->loop, client->ticket); zloop_ticket_reset (self->loop, client->ticket);
// Pass to client state machine // Pass to client state machine
s_client_execute (client, s_protocol_event (self->message)); s_client_execute (client, s_protocol_event (self->message));
} }

View File

@ -138,7 +138,7 @@ prepare_get_output_indexes_command (client_t *self)
static void static void
signal_have_blocks_ok (client_t *self) signal_have_blocks_ok (client_t *self)
{ {
zsock_send (self->cmdpipe, "siiip", "BLOCKS OK", wap_proto_status(self->message), zsock_send (self->cmdpipe, "s888p", "BLOCKS OK", wap_proto_status(self->message),
wap_proto_start_height (self->message), wap_proto_start_height (self->message),
wap_proto_curr_height (self->message), wap_proto_curr_height (self->message),
wap_proto_get_block_data (self->message)); wap_proto_get_block_data (self->message));
@ -174,7 +174,7 @@ prepare_put_command (client_t *self)
static void static void
signal_have_put_ok (client_t *self) signal_have_put_ok (client_t *self)
{ {
zsock_send (self->cmdpipe, "sis", "PUT OK", 0, zsock_send (self->cmdpipe, "s8s", "PUT OK", 0,
wap_proto_tx_id (self->message)); wap_proto_tx_id (self->message));
} }
@ -197,7 +197,7 @@ prepare_get_command (client_t *self)
static void static void
signal_have_get_ok (client_t *self) signal_have_get_ok (client_t *self)
{ {
zsock_send (self->cmdpipe, "sip", "GET OK", 0, zsock_send (self->cmdpipe, "s8p", "GET OK", 0,
wap_proto_get_tx_data (self->message)); wap_proto_get_tx_data (self->message));
} }
@ -220,7 +220,7 @@ prepare_save_command (client_t *self)
static void static void
signal_have_save_ok (client_t *self) signal_have_save_ok (client_t *self)
{ {
zsock_send (self->cmdpipe, "si", "SAVE OK", 0); zsock_send (self->cmdpipe, "s8", "SAVE OK", 0);
} }
@ -231,7 +231,7 @@ signal_have_save_ok (client_t *self)
static void static void
signal_have_start_ok (client_t *self) signal_have_start_ok (client_t *self)
{ {
zsock_send(self->cmdpipe, "si", "START OK", wap_proto_status(self->message)); zsock_send(self->cmdpipe, "s8", "START OK", wap_proto_status(self->message));
} }
@ -242,7 +242,7 @@ signal_have_start_ok (client_t *self)
static void static void
signal_have_stop_ok (client_t *self) signal_have_stop_ok (client_t *self)
{ {
zsock_send (self->cmdpipe, "si", "STOP OK", 0); zsock_send (self->cmdpipe, "s8", "STOP OK", 0);
} }
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------

View File

@ -34,18 +34,30 @@ struct _wap_proto_t {
int id; // wap_proto message ID int id; // wap_proto message ID
byte *needle; // Read/write pointer for serialization byte *needle; // Read/write pointer for serialization
byte *ceiling; // Valid upper limit for read pointer byte *ceiling; // Valid upper limit for read pointer
char identity [256]; // Wallet identity /* Wallet identity */
zlist_t *block_ids; // char identity [256];
uint64_t start_height; // /* */
uint64_t status; // zlist_t *block_ids;
uint64_t curr_height; // /* */
zmsg_t *block_data; // Frames of block data uint64_t start_height;
zchunk_t *tx_data; // Transaction data /* */
char tx_id [256]; // Transaction ID uint64_t status;
zframe_t *o_indexes; // Output Indexes /* */
char address [256]; // uint64_t curr_height;
uint64_t thread_count; // /* Frames of block data */
char reason [256]; // Printable explanation zmsg_t *block_data;
/* Transaction data */
zchunk_t *tx_data;
/* Transaction ID */
char tx_id [256];
/* Output Indexes */
zframe_t *o_indexes;
/* */
char address [256];
/* */
uint64_t thread_count;
/* Printable explanation */
char reason [256];
}; };
// -------------------------------------------------------------------------- // --------------------------------------------------------------------------
@ -333,6 +345,7 @@ wap_proto_recv (wap_proto_t *self, zsock_t *input)
zsys_warning ("wap_proto: tx_data is missing data"); zsys_warning ("wap_proto: tx_data is missing data");
goto malformed; goto malformed;
} }
zchunk_destroy (&self->tx_data);
self->tx_data = zchunk_new (self->needle, chunk_size); self->tx_data = zchunk_new (self->needle, chunk_size);
self->needle += chunk_size; self->needle += chunk_size;
} }
@ -369,6 +382,7 @@ wap_proto_recv (wap_proto_t *self, zsock_t *input)
zsys_warning ("wap_proto: tx_data is missing data"); zsys_warning ("wap_proto: tx_data is missing data");
goto malformed; goto malformed;
} }
zchunk_destroy (&self->tx_data);
self->tx_data = zchunk_new (self->needle, chunk_size); self->tx_data = zchunk_new (self->needle, chunk_size);
self->needle += chunk_size; self->needle += chunk_size;
} }
@ -1179,6 +1193,9 @@ wap_proto_test (bool verbose)
{ {
printf (" * wap_proto: "); printf (" * wap_proto: ");
// Silence an "unused" warning by "using" the verbose variable
if (verbose) {;}
// @selftest // @selftest
// Simple create/destroy test // Simple create/destroy test
wap_proto_t *self = wap_proto_new (); wap_proto_t *self = wap_proto_new ();

View File

@ -84,7 +84,8 @@ namespace tools
wallet2(const wallet2&) : m_run(true), m_callback(0), m_testnet(false) {}; wallet2(const wallet2&) : m_run(true), m_callback(0), m_testnet(false) {};
public: public:
wallet2(bool testnet = false, bool restricted = false) : m_run(true), m_callback(0), m_testnet(testnet) { wallet2(bool testnet = false, bool restricted = false) : m_run(true), m_callback(0), m_testnet(testnet) {
client = wap_client_new ("ipc://@/monero", 200, "wallet identity"); client = wap_client_new ();
wap_client_connect (client, "ipc://@/monero", 200, "wallet identity");
if (!client) { if (!client) {
// TODO: Daemon not up. // TODO: Daemon not up.
} }