diff --git a/META.json b/META.json index ec4fbe7..b92e95d 100644 --- a/META.json +++ b/META.json @@ -1,10 +1,10 @@ { "name": "pg_amqp", "abstract": "AMQP protocol support for PostgreSQL", - "version": "0.4.1", + "version": "0.4.2", "description": "The pg_amqp package provides the ability for postgres statements to directly publish messages to an AMQP broker.", "maintainer": - [ "Theo Schlossnagle ", "Keith Fiske ", "Keith Fiske " ] "license": [ "bsd", "mozilla_1_0" ], "generated_by": "Keith Fiske", "status": "stable", @@ -17,9 +17,9 @@ }, "provides": { "amqp": { - "file": "sql/amqp--0.4.1.sql", + "file": "sql/amqp--0.4.2.sql", "docfile": "doc/amqp.md", - "version": "0.4.1", + "version": "0.4.2", "abstract": "AMQP protocol support for PostgreSQL" } }, @@ -43,4 +43,4 @@ "rabbitmq", "queue" ] -} +} \ No newline at end of file diff --git a/README.md b/README.md index df42975..065cc64 100644 --- a/README.md +++ b/README.md @@ -1,8 +1,8 @@ pg_amqp -============= The pg_amqp package provides the ability for postgres statements to directly -publish messages to an [AMQP](http://www.amqp.org/) broker. +publish messages to an [AMQP](http://www.amqp.org/) broker. Version 0.4.2 also provides an implementation +for secured connections using open ssl. To build v0.4.2 openssl dev package is required. All bug reports, feature requests and general questions can be directed to the Issues section on Github. - http://github.com/omniti-labs/pg_amqp diff --git a/amqp.control b/amqp.control index e1d3665..67cb8dd 100644 --- a/amqp.control +++ b/amqp.control @@ -1,6 +1,6 @@ # amqp extension comment = 'AMQP protocol support for PostgreSQL' -default_version = '0.4.1' +default_version = '0.4.2' module_pathname = '$libdir/pg_amqp' relocatable = false schema = amqp diff --git a/sql/amqp--0.4.2.sql b/sql/amqp--0.4.2.sql new file mode 100644 index 0000000..a13b8b2 --- /dev/null +++ b/sql/amqp--0.4.2.sql @@ -0,0 +1,91 @@ +CREATE TABLE @extschema@.broker ( + broker_id serial NOT NULL, + host text NOT NULL, + port integer NOT NULL DEFAULT 5672, + vhost text, + username text, + password text, + requiressl boolean DEFAULT false, + verify_cert boolean DEFAULT true, + verify_cn boolean DEFAULT true, + cert text, + key text, + key_password character varying, + ca text, + PRIMARY KEY (broker_id, host, port) +); + + +CREATE FUNCTION @extschema@.autonomous_publish( + broker_id integer + , exchange varchar + , routing_key varchar + , message varchar + , delivery_mode integer default null + , content_type varchar default null + , reply_to varchar default null + , correlation_id varchar default null +) + +RETURNS boolean AS 'pg_amqp.so', 'pg_amqp_autonomous_publish' +LANGUAGE C IMMUTABLE; + +COMMENT ON FUNCTION @extschema@.autonomous_publish(integer, varchar, varchar, varchar, integer, varchar, varchar, varchar) IS +'Works as amqp.publish does, but the message is published immediately irrespective of the +current transaction state. PostgreSQL commit and rollback at a later point will have no +effect on this message being sent to AMQP.'; + + +CREATE FUNCTION amqp.disconnect(broker_id integer) +RETURNS void AS 'pg_amqp.so', 'pg_amqp_disconnect' +LANGUAGE C IMMUTABLE STRICT; + +COMMENT ON FUNCTION amqp.disconnect(integer) IS +'Explicitly disconnect the specified (broker_id) if it is current connected. Broker +connections, once established, live until the PostgreSQL backend terminated. This +allows for more precise control over that. +select amqp.disconnect(broker_id) from amqp.broker +will disconnect any brokers that may be connected.'; + + +CREATE FUNCTION amqp.exchange_declare( + broker_id integer + , exchange varchar + , exchange_type varchar + , passive boolean + , durable boolean + , auto_delete boolean DEFAULT false +) +RETURNS boolean AS 'pg_amqp.so', 'pg_amqp_exchange_declare' +LANGUAGE C IMMUTABLE; + +COMMENT ON FUNCTION amqp.exchange_declare(integer, varchar, varchar, boolean, boolean, boolean) IS +'Declares a exchange (broker_id, exchange_name, exchange_type, passive, durable, auto_delete) +auto_delete should be set to false (default) as unexpected errors can cause disconnect/reconnect which +would trigger the auto deletion of the exchange.'; + + +CREATE FUNCTION @extschema@.publish( + broker_id integer + , exchange varchar + , routing_key varchar + , message varchar + , delivery_mode integer default null + , content_type varchar default null + , reply_to varchar default null + , correlation_id varchar default null + , headers varchar[][2] default null + , properties varchar[][2] default null +) +RETURNS boolean AS 'pg_amqp.so', 'pg_amqp_publish' +LANGUAGE C IMMUTABLE; + +COMMENT ON FUNCTION @extschema@.publish(integer, varchar, varchar, varchar, integer, varchar, varchar, varchar, varchar[][2], varchar[][2]) IS +'Publishes a message (broker_id, exchange, routing_key, message, delivery_mode, content_type, reply_to, correlation_id, headers, properties). +The message will only be published if the containing PostgreSQL transaction successfully commits. +Under certain circumstances, the AMQP commit might fail. In this case, a WARNING is emitted. +The last four parameters are optional and set the following message properties: +delivery_mode (either 1 or 2), content_type, reply_to and correlation_id. + +Publish returns a boolean indicating if the publish command was successful. Note that as +AMQP publish is asynchronous, you may find out later it was unsuccessful.'; diff --git a/sql/functions/functions.sql b/sql/functions/functions.sql index a3339b5..9aabbcf 100644 --- a/sql/functions/functions.sql +++ b/sql/functions/functions.sql @@ -56,12 +56,14 @@ CREATE FUNCTION @extschema@.publish( , content_type varchar default null , reply_to varchar default null , correlation_id varchar default null + , headers varchar[][2] default null + , properties varchar[][2] default null ) RETURNS boolean AS 'pg_amqp.so', 'pg_amqp_publish' LANGUAGE C IMMUTABLE; -COMMENT ON FUNCTION @extschema@.publish(integer, varchar, varchar, varchar, integer, varchar, varchar, varchar) IS -'Publishes a message (broker_id, exchange, routing_key, message). +COMMENT ON FUNCTION @extschema@.publish(integer, varchar, varchar, varchar, integer, varchar, varchar, varchar, varchar[][2], varchar[][2]) IS +'Publishes a message (broker_id, exchange, routing_key, message, delivery_mode, content_type, reply_to, correlation_id, headers, properties). The message will only be published if the containing PostgreSQL transaction successfully commits. Under certain circumstances, the AMQP commit might fail. In this case, a WARNING is emitted. The last four parameters are optional and set the following message properties: @@ -69,4 +71,3 @@ delivery_mode (either 1 or 2), content_type, reply_to and correlation_id. Publish returns a boolean indicating if the publish command was successful. Note that as AMQP publish is asynchronous, you may find out later it was unsuccessful.'; - diff --git a/sql/tables/tables.sql b/sql/tables/tables.sql index 5edf6ff..32aa795 100644 --- a/sql/tables/tables.sql +++ b/sql/tables/tables.sql @@ -5,6 +5,13 @@ CREATE TABLE @extschema@.broker ( vhost text, username text, password text, + requiressl boolean DEFAULT false, + verify_cert boolean DEFAULT true, + verify_cn boolean DEFAULT true, + cert text, + key text, + key_password character varying, + ca text, PRIMARY KEY (broker_id, host, port) ); diff --git a/src/librabbitmq/amqp.h b/src/librabbitmq/amqp.h index 5cc43a8..d8fc01a 100644 --- a/src/librabbitmq/amqp.h +++ b/src/librabbitmq/amqp.h @@ -98,6 +98,11 @@ typedef enum amqp_response_type_enum_ { AMQP_RESPONSE_SERVER_EXCEPTION } amqp_response_type_enum; +typedef enum amqp_ssl_flags_type_enum_ { + AMQP_SSL_FLAG_VERIFY = 1 << 0, + AMQP_SSL_FLAG_CHECK_CN = 1 << 2, +} amqp_ssl_flags_type_enum; + typedef struct amqp_rpc_reply_t_ { amqp_response_type_enum reply_type; amqp_method_t reply; @@ -113,8 +118,7 @@ typedef enum amqp_sasl_method_enum_ { typedef struct amqp_basic_return_t_ amqp_basic_return_t; typedef int (*amqp_output_fn_t)(void *context, void *buffer, size_t count); -typedef void (*amqp_basic_return_fn_t)(amqp_channel_t, amqp_basic_return_t *, - void *); +typedef void (*amqp_basic_return_fn_t)(amqp_channel_t, amqp_basic_return_t *, void *); /* Opaque struct. */ @@ -140,6 +144,7 @@ extern amqp_bytes_t amqp_bytes_malloc_dup(amqp_bytes_t src); } \ }) +extern amqp_connection_state_t amqp_new_ssl_connection(const char *certificate, const char *key, const char *password, const char *ca, unsigned short flags); extern amqp_connection_state_t amqp_new_connection(void); extern int amqp_get_sockfd(amqp_connection_state_t state); extern void amqp_set_sockfd(amqp_connection_state_t state, @@ -173,8 +178,7 @@ extern int amqp_send_frame_to(amqp_connection_state_t state, extern int amqp_table_entry_cmp(void const *entry1, void const *entry2); -extern int amqp_open_socket(char const *hostname, int portnumber, - struct timeval *timeout); +extern int amqp_connect(amqp_connection_state_t state, char const *hostname, int portnumber, struct timeval *timeout); extern int amqp_send_header(amqp_connection_state_t state); extern int amqp_send_header_to(amqp_connection_state_t state, @@ -343,6 +347,7 @@ extern amqp_boolean_t amqp_data_in_buffer(amqp_connection_state_t state); */ extern amqp_rpc_reply_t *amqp_get_rpc_reply(void); + #ifdef __cplusplus } #endif diff --git a/src/librabbitmq/amqp_api.c b/src/librabbitmq/amqp_api.c index 033f84a..28124e5 100644 --- a/src/librabbitmq/amqp_api.c +++ b/src/librabbitmq/amqp_api.c @@ -4,6 +4,8 @@ #include #include +#define DISABLE_THREADS + #ifndef DISABLE_THREADS #include #endif @@ -103,14 +105,9 @@ amqp_rpc_reply_t amqp_channel_close(amqp_connection_state_t state, code, amqp_cstring_bytes(codestr), 0, 0); } -amqp_rpc_reply_t amqp_connection_close(amqp_connection_state_t state, - int code) +amqp_rpc_reply_t amqp_connection_close(amqp_connection_state_t state, int code) { - char codestr[13]; - snprintf(codestr, sizeof(codestr), "%d", code); - return AMQP_SIMPLE_RPC(state, 0, CONNECTION, CLOSE, CLOSE_OK, - amqp_connection_close_t, - code, amqp_cstring_bytes(codestr), 0, 0); + return state->close_connection(state, code); } amqp_exchange_declare_ok_t *amqp_exchange_declare(amqp_connection_state_t state, diff --git a/src/librabbitmq/amqp_connection.c b/src/librabbitmq/amqp_connection.c index a1ec1cd..8bcc8aa 100644 --- a/src/librabbitmq/amqp_connection.c +++ b/src/librabbitmq/amqp_connection.c @@ -3,6 +3,7 @@ #include #include #include +#include #include #include @@ -13,6 +14,7 @@ #include + #define INITIAL_FRAME_POOL_PAGE_SIZE 65536 #define INITIAL_DECODING_POOL_PAGE_SIZE 131072 #define INITIAL_INBOUND_SOCK_BUFFER_SIZE 131072 @@ -27,13 +29,279 @@ _check_state->state); \ } +BIO *bio_err = NULL; + + +static int ssl_password_cb(char *buf, int num, int rwflag, void *data) { + if (num < strlen((char *) data) + 1){ + return 0; + } + return (strlen(strcpy(buf, (char *) data))); +} + +static amqp_rpc_reply_t _amqp_connection_close(amqp_connection_state_t state, int code) +{ + amqp_rpc_reply_t result; + char codestr[13]; + snprintf(codestr, sizeof(codestr), "%d", code); + + /* close socket */ + if(state->sockfd >= 0){ + result = AMQP_SIMPLE_RPC(state, 0, CONNECTION, CLOSE, CLOSE_OK, amqp_connection_close_t, code, amqp_cstring_bytes(codestr), 0, 0); + close(state->sockfd); + state->sockfd = -1; + } + fprintf(stderr, "_amqp_connection_closed"); + return result; +} + +static amqp_rpc_reply_t _amqp_ssl_connection_close(amqp_connection_state_t state, int code) +{ + + amqp_rpc_reply_t result = _amqp_connection_close(state, code); + + if(state->ssl){ + SSL_shutdown(state->ssl); + SSL_free(state->ssl); + state->ssl = NULL; + } + + return result; +} + +int amqp_connect(amqp_connection_state_t state, char const *hostname, int portnumber, struct timeval *timeout){ + return state->connect(state, hostname, portnumber, timeout); +} + +/** + * crate native connection + * \remark also used by ssl implementation + * \return 1 if successful done, otherwise 0 + */ +static int _amqp_connect(amqp_connection_state_t state, const char *host, int port, struct timeval *timeout) { + int sockfd = -1; + + sockfd = amqp_open_socket(host, port, timeout); + amqp_set_sockfd(state, sockfd); + + return (sockfd >= 0) ? 1 : 0; +} + +/** + * initialize ssl connection and verify certificate using ssl_flags + * \return 1 if successful done, otherwise 0 + */ +static int _amqp_ssl_connect(amqp_connection_state_t state, const char *host, int port, struct timeval *timeout) { + int success = -1; + int rv = 0; + int verified = -1; + char cert_cn[256] = "\0"; + char error[256] = "\0"; + X509 *peerCert = NULL; + + /* create socket */ + success = _amqp_connect(state, host, port, timeout); + if(!success) + return 0; + + + state->ssl = SSL_new(state->ctx); + state->bio = BIO_new_socket(state->sockfd, BIO_NOCLOSE); + SSL_set_bio(state->ssl, state->bio, state->bio); + + + + + do{ + + /* do connect */ + if ((rv = SSL_connect(state->ssl)) <= 0) { + sprintf(error, "Error during setup SSL context, rv=%d", rv); + break; + } + + /* verify the server certificate */ + if ( (state->ssl_flags & AMQP_SSL_FLAG_VERIFY) && (verified = SSL_get_verify_result(state->ssl)) != X509_V_OK) { + sprintf(error, "SSL certificate presented by peer cannot be verified: %d\n",verified); + break; + } + + /* verify common name */ + if ( state->ssl_flags & AMQP_SSL_FLAG_CHECK_CN ) { + + /* get peer certificate */ + if (!(peerCert = SSL_get_peer_certificate(state->ssl))) { + sprintf(error, "No SSL certificate was presented by peer"); + break; + } + + X509_NAME_get_text_by_NID(X509_get_subject_name(peerCert), NID_commonName, cert_cn, sizeof(cert_cn)); + X509_free(peerCert); + + //TODO add wildcard support + if (strcasecmp(cert_cn, host)) { + sprintf(error, "common name '%s' doesn't match host name '%s'", cert_cn, host); + break; + } + } + }while(0); + + /* if an error occured, close connection */ + if(strlen(error)>0){ + fprintf(stderr, "%s\n", error); + _amqp_ssl_connection_close(state, AMQP_REPLY_SUCCESS); + return 0; + } + + return 1; +} + +static void _amqp_destroy_connection(amqp_connection_state_t state){ + empty_amqp_pool(&state->frame_pool); + empty_amqp_pool(&state->decoding_pool); + free(state->outbound_buffer.bytes); + free(state->sock_inbound_buffer.bytes); +} + +/** + * destroy ssl connection and free ssl context + */ +static void _amqp_destroy_ssl_connection(amqp_connection_state_t state){ + _amqp_destroy_connection(state); + + if(state->ssl_key_password) + free(state->ssl_key_password); + + /* destroy ssl context */ + if(state->ctx){ + SSL_CTX_free(state->ctx); + state->ctx = NULL; + } +} + +amqp_connection_state_t amqp_new_ssl_connection(const char *certificate, const char *key, const char *password, const char *ca, unsigned short flags) { + amqp_connection_state_t state; + X509 *cert = NULL, *cacert = NULL; + RSA *rsa = NULL; + BIO *cbio, *kbio; + X509_STORE *store = NULL; + char error[254] = "\0"; + + + /** initialize connection_state */ + state = amqp_new_connection(); + if(!state){ + return NULL; + } + + /* reset callbacks to ssl */ + state->connect = _amqp_ssl_connect; + state->write = amqp_ssl_write; + state->read = amqp_ssl_read; + state->close_connection = _amqp_ssl_connection_close; + state->destroy_connection = _amqp_destroy_ssl_connection; + + /* save flags for ssl */ + state->ssl_flags = flags; + + /**S initialize ssl */ + if(!bio_err){ + SSL_library_init(); + OpenSSL_add_all_algorithms(); +// SSL_load_error_strings(); + + /* An error write context */ + bio_err = BIO_new_fp(stderr, BIO_NOCLOSE); + } + + + do{ + if (! (state->ctx = SSL_CTX_new(SSLv23_method()))) { + sprintf(error, "Error during setup SSL context"); + break; + } + + /* read certificate */ + if(!SSL_CTX_use_certificate_chain_file(state->ctx, certificate)){ + cbio = BIO_new_mem_buf((void*)certificate, -1); + PEM_read_bio_X509(cbio, &cert, 0, NULL); + BIO_free(cbio); + + if (!SSL_CTX_use_certificate(state->ctx, cert)) { + sprintf(error, "Can't read certificate file"); + X509_free(cert); + break; + } + X509_free(cert); + } + + + if (password != NULL) { + state->ssl_key_password = strdup(password); + SSL_CTX_set_default_passwd_cb_userdata(state->ctx, (void *) state->ssl_key_password); + SSL_CTX_set_default_passwd_cb(state->ctx, ssl_password_cb); + } + + /* try to read as file */ + if (!SSL_CTX_use_PrivateKey_file(state->ctx, key, SSL_FILETYPE_PEM)) { + + /* try reading from memory */ + kbio = BIO_new_mem_buf((void*)key, -1); + PEM_read_bio_RSAPrivateKey(kbio, &rsa, (password) ? ssl_password_cb : NULL, state->ssl_key_password); + BIO_free(kbio); + if (!SSL_CTX_use_RSAPrivateKey(state->ctx, rsa)) { + sprintf(error, "Can't read key file"); + RSA_free(rsa); + break; + } + RSA_free(rsa); + + } + + /* load ca certificate */ + if(ca){ + if (!SSL_CTX_load_verify_locations(state->ctx, ca, 0)) { + /* ca is not a ca file, try reading mem buffer */ + cbio = BIO_new_mem_buf((void*)ca, -1); + PEM_read_bio_X509(cbio, &cacert, 0, NULL); + BIO_free(cbio); + store = SSL_CTX_get_cert_store(state->ctx); + if(!X509_STORE_add_cert(store, cacert)){ + fprintf(stderr, "Can't add ca file to ca store\n"); + } + X509_free(cacert); + } + } + + /* everything was fine, return state */ + return state; + }while(0); + + /* because we reached this point, an error muss be occured */ + empty_amqp_pool(&state->frame_pool); + empty_amqp_pool(&state->decoding_pool); + free(state); + fprintf(stderr, "%s\n", error); + return NULL; + +} + +/** + * initialize amqp connection_state + */ amqp_connection_state_t amqp_new_connection(void) { - amqp_connection_state_t state = - (amqp_connection_state_t) calloc(1, sizeof(struct amqp_connection_state_t_)); + amqp_connection_state_t state = (amqp_connection_state_t) calloc(1, sizeof(struct amqp_connection_state_t_)); if (state == NULL) { return NULL; } + + /* initialize with default connect method */ + state->connect = _amqp_connect; + state->write = amqp_write; + state->read = amqp_read; + state->close_connection = _amqp_connection_close; + state->destroy_connection = _amqp_destroy_connection; init_amqp_pool(&state->frame_pool, INITIAL_FRAME_POOL_PAGE_SIZE); init_amqp_pool(&state->decoding_pool, INITIAL_DECODING_POOL_PAGE_SIZE); @@ -69,13 +337,13 @@ amqp_connection_state_t amqp_new_connection(void) { return state; } + + int amqp_get_sockfd(amqp_connection_state_t state) { return state->sockfd; } -void amqp_set_sockfd(amqp_connection_state_t state, - int sockfd) -{ +void amqp_set_sockfd(amqp_connection_state_t state, int sockfd) { state->sockfd = sockfd; } @@ -112,10 +380,7 @@ int amqp_get_channel_max(amqp_connection_state_t state) { } void amqp_destroy_connection(amqp_connection_state_t state) { - empty_amqp_pool(&state->frame_pool); - empty_amqp_pool(&state->decoding_pool); - free(state->outbound_buffer.bytes); - free(state->sock_inbound_buffer.bytes); + state->destroy_connection(state); free(state); } @@ -126,8 +391,7 @@ static void return_to_idle(amqp_connection_state_t state) { state->state = CONNECTION_STATE_IDLE; } -void amqp_set_basic_return_cb(amqp_connection_state_t state, - amqp_basic_return_fn_t f, void *data) { +void amqp_set_basic_return_cb(amqp_connection_state_t state, amqp_basic_return_fn_t f, void *data) { state->basic_return_callback = f; state->basic_return_callback_data = data; } @@ -238,8 +502,7 @@ int amqp_handle_input(amqp_connection_state_t state, decoded_frame->frame_type = AMQP_FRAME_BODY; decoded_frame->payload.body_fragment.len = fragment_len; - decoded_frame->payload.body_fragment.bytes = - D_BYTES(state->inbound_buffer, HEADER_SIZE, fragment_len); + decoded_frame->payload.body_fragment.bytes = D_BYTES(state->inbound_buffer, HEADER_SIZE, fragment_len); break; } @@ -256,10 +519,10 @@ int amqp_handle_input(amqp_connection_state_t state, if(decoded_frame->frame_type == AMQP_FRAME_METHOD && decoded_frame->payload.method.id == AMQP_BASIC_RETURN_METHOD) { + amqp_basic_return_t *m = decoded_frame->payload.method.decoded; if(state->basic_return_callback) - state->basic_return_callback(decoded_frame->channel, m, - state->basic_return_callback_data); + state->basic_return_callback(decoded_frame->channel, m, state->basic_return_callback_data); } return total_bytes_consumed; @@ -268,8 +531,7 @@ int amqp_handle_input(amqp_connection_state_t state, case CONNECTION_STATE_WAITING_FOR_PROTOCOL_HEADER: decoded_frame->frame_type = AMQP_PSEUDOFRAME_PROTOCOL_HEADER; decoded_frame->channel = AMQP_PSEUDOFRAME_PROTOCOL_CHANNEL; - amqp_assert(D_8(state->inbound_buffer, 3) == (uint8_t) 'P', - "Invalid protocol header received"); + amqp_assert(D_8(state->inbound_buffer, 3) == (uint8_t) 'P', "Invalid protocol header received"); decoded_frame->payload.protocol_header.transport_high = D_8(state->inbound_buffer, 4); decoded_frame->payload.protocol_header.transport_low = D_8(state->inbound_buffer, 5); decoded_frame->payload.protocol_header.protocol_version_major = D_8(state->inbound_buffer, 6); @@ -381,18 +643,16 @@ int amqp_send_frame(amqp_connection_state_t state, separate_body = inner_send_frame(state, frame, &encoded, &payload_len); switch (separate_body) { case 0: - AMQP_CHECK_RESULT(write(state->sockfd, - state->outbound_buffer.bytes, - payload_len + (HEADER_SIZE + FOOTER_SIZE))); + AMQP_CHECK_RESULT(state->write(state, state->outbound_buffer.bytes, payload_len + (HEADER_SIZE + FOOTER_SIZE))); return 0; case 1: - AMQP_CHECK_RESULT(write(state->sockfd, state->outbound_buffer.bytes, HEADER_SIZE)); - AMQP_CHECK_RESULT(write(state->sockfd, encoded.bytes, payload_len)); + AMQP_CHECK_RESULT(state->write(state, state->outbound_buffer.bytes, HEADER_SIZE)); + AMQP_CHECK_RESULT(state->write(state, encoded.bytes, payload_len)); { unsigned char frame_end_byte = AMQP_FRAME_END; assert(FOOTER_SIZE == 1); - AMQP_CHECK_RESULT(write(state->sockfd, &frame_end_byte, FOOTER_SIZE)); + AMQP_CHECK_RESULT(state->write(state, &frame_end_byte, FOOTER_SIZE)); } return 0; @@ -432,3 +692,69 @@ int amqp_send_frame_to(amqp_connection_state_t state, return separate_body; } } + +int amqp_write(amqp_connection_state_t state, void *buffer, size_t size) { + return write(state->sockfd, buffer, size); +} +int amqp_ssl_write(amqp_connection_state_t state, void *buffer, size_t size) { + int err; + int len; + + len = SSL_write(state->ssl, buffer, size); + + if (! len) { + switch ((err = SSL_get_error(state->ssl, len))) { + case SSL_ERROR_SYSCALL: + if (errno == EWOULDBLOCK || errno == EAGAIN || errno == EINTR) { + case SSL_ERROR_WANT_WRITE: + case SSL_ERROR_WANT_READ: + errno = EWOULDBLOCK; + return (0); + } + + case SSL_ERROR_SSL: + if (errno == EAGAIN) { + return 0; + } + + default: + return 0; + } + } + + + return len; +} +int amqp_read(amqp_connection_state_t state, void *buffer, size_t size) { + return read(state->sockfd, buffer, size); +} + +int amqp_ssl_read(amqp_connection_state_t state, void *buffer, size_t size) { + int err; + int len; + + len = SSL_read(state->ssl, buffer, size); + + if (!len) { + switch ((err = SSL_get_error(state->ssl, len))) { + case SSL_ERROR_SYSCALL: + if ((errno == EWOULDBLOCK) || (errno == EAGAIN) || (errno == EINTR)) { + case SSL_ERROR_WANT_READ: + errno = EWOULDBLOCK; + return 0; + } + + case SSL_ERROR_SSL: + if (errno == EAGAIN) { + return 0; + } + + default: + return 0; + } + } + + + return len; +} +// kate: indent-width 2; replace-tabs off; indent-mode cstyle; auto-insert-doxygen on; line-numbers on; tab-indents on; keep-extra-spaces off; auto-brackets on; diff --git a/src/librabbitmq/amqp_private.h b/src/librabbitmq/amqp_private.h index ce12f9d..3d8b1fd 100644 --- a/src/librabbitmq/amqp_private.h +++ b/src/librabbitmq/amqp_private.h @@ -6,6 +6,8 @@ extern "C" { #endif #include /* ntohl, htonl, ntohs, htons */ +#include +#include /* * Connection states: @@ -48,6 +50,12 @@ typedef struct amqp_link_t_ { void *data; } amqp_link_t; +typedef int (amqp_connect_fn_t)(amqp_connection_state_t state, const char *host, int port, struct timeval *timeout); +typedef int (amqp_write_fn_t)(amqp_connection_state_t state, void *buffer, size_t nb); +typedef int (amqp_read_fn_t)(amqp_connection_state_t state, void *buffer, size_t nb); +typedef amqp_rpc_reply_t (amqp_close_connection_fn_t)(amqp_connection_state_t state, int code); +typedef void (amqp_destroy_connection_fn_t)(amqp_connection_state_t state); + struct amqp_connection_state_t_ { amqp_pool_t frame_pool; amqp_pool_t decoding_pool; @@ -65,6 +73,20 @@ struct amqp_connection_state_t_ { amqp_bytes_t outbound_buffer; int sockfd; + + /** ssl support */ + SSL_CTX *ctx; + SSL *ssl; + BIO *bio; + unsigned short ssl_flags; + amqp_connect_fn_t *connect; + amqp_write_fn_t *write; + amqp_read_fn_t *read; + char *ssl_key_password; + amqp_close_connection_fn_t *close_connection; + amqp_destroy_connection_fn_t *destroy_connection; + /** */ + amqp_bytes_t sock_inbound_buffer; size_t sock_inbound_offset; size_t sock_inbound_limit; @@ -74,6 +96,8 @@ struct amqp_connection_state_t_ { amqp_basic_return_fn_t basic_return_callback; void *basic_return_callback_data; + + }; #define CHECK_LIMIT(b, o, l, v) ({ if ((o + l) > (b).len) { return -EFAULT; } (v); }) @@ -109,6 +133,12 @@ extern int amqp_encode_table(amqp_bytes_t encoded, amqp_table_t *input, int *offsetptr); +extern int amqp_open_socket(char const *hostname, int portnumber, struct timeval *timeout); +int amqp_write(amqp_connection_state_t state, void *buffer, size_t size); +int amqp_ssl_write(amqp_connection_state_t state, void *buffer, size_t size); +int amqp_read(amqp_connection_state_t state, void *buffer, size_t size); +int amqp_ssl_read(amqp_connection_state_t state, void* buffer, size_t size); + #define amqp_assert(condition, ...) \ ({ \ if (!(condition)) { \ diff --git a/src/librabbitmq/amqp_socket.c b/src/librabbitmq/amqp_socket.c index a9aa8fc..d79a55c 100644 --- a/src/librabbitmq/amqp_socket.c +++ b/src/librabbitmq/amqp_socket.c @@ -20,8 +20,7 @@ #include -int amqp_open_socket(char const *hostname, - int portnumber, struct timeval *timeout) +int amqp_open_socket(char const *hostname, int portnumber, struct timeval *timeout) { int result = -1; int sockfd; @@ -97,7 +96,7 @@ static char *header() { } int amqp_send_header(amqp_connection_state_t state) { - return write(state->sockfd, header(), 8); + return state->write(state, header(), 8); } int amqp_send_header_to(amqp_connection_state_t state, @@ -176,9 +175,7 @@ static int wait_frame_inner(amqp_connection_state_t state, assert(result != 0); } - result = read(state->sockfd, - state->sock_inbound_buffer.bytes, - state->sock_inbound_buffer.len); + result = state->read(state, state->sock_inbound_buffer.bytes, state->sock_inbound_buffer.len); if (result < 0) { return -errno; } diff --git a/src/pg_amqp.c b/src/pg_amqp.c index 1e091ac..0522bee 100644 --- a/src/pg_amqp.c +++ b/src/pg_amqp.c @@ -37,6 +37,7 @@ #include #include #include +#include #include "postgres.h" #include "funcapi.h" @@ -50,6 +51,8 @@ #include "access/xact.h" #include "utils/memutils.h" #include "utils/builtins.h" +#include "utils/lsyscache.h" +#include "utils/array.h" #include "librabbitmq/amqp.h" #include "librabbitmq/amqp_framing.h" @@ -61,19 +64,65 @@ } \ } while(0) +#define safe_free(x) if (x){ free(x); x = NULL; } + +#define _BYTES_T 0 +#define _UINT64_T 1 +#define _UINT8_T 2 + #ifdef PG_MODULE_MAGIC PG_MODULE_MAGIC; #endif + +typedef struct { + FmgrInfo proc; + int elems; + Oid typelem; + ArrayIterator iter; +} pg_array_foo; + +typedef struct { + char* key; + char* value; +} pg_array_elem; + +typedef struct { + char* name; + size_t offset; + int type; + int flag; +} amqp_field; + +/* TODO: make this a trie or some other optimised datastructure */ +static amqp_field amqp_fields[] = { { "content_type", offsetof(amqp_basic_properties_t,content_type), _BYTES_T, AMQP_BASIC_CONTENT_TYPE_FLAG}, + { "content_encoding", offsetof(amqp_basic_properties_t,content_encoding), _BYTES_T, AMQP_BASIC_CONTENT_ENCODING_FLAG}, + { "delivery_mode", offsetof(amqp_basic_properties_t,delivery_mode), _UINT8_T, AMQP_BASIC_DELIVERY_MODE_FLAG}, + { "priority", offsetof(amqp_basic_properties_t,priority), _UINT8_T, AMQP_BASIC_PRIORITY_FLAG}, + { "correlation_id", offsetof(amqp_basic_properties_t,correlation_id), _BYTES_T, AMQP_BASIC_CORRELATION_ID_FLAG}, + { "reply_to", offsetof(amqp_basic_properties_t,reply_to), _BYTES_T, AMQP_BASIC_REPLY_TO_FLAG}, + { "expiration", offsetof(amqp_basic_properties_t,expiration), _BYTES_T, AMQP_BASIC_EXPIRATION_FLAG}, + { "message_id", offsetof(amqp_basic_properties_t,message_id), _BYTES_T, AMQP_BASIC_MESSAGE_ID_FLAG}, + { "timestamp", offsetof(amqp_basic_properties_t,timestamp), _UINT64_T, AMQP_BASIC_TIMESTAMP_FLAG}, + { "type", offsetof(amqp_basic_properties_t,type), _BYTES_T, AMQP_BASIC_TYPE_FLAG}, + { "user_id", offsetof(amqp_basic_properties_t,user_id), _BYTES_T, AMQP_BASIC_USER_ID_FLAG}, + { "app_id", offsetof(amqp_basic_properties_t,app_id), _BYTES_T, AMQP_BASIC_APP_ID_FLAG}, + { "cluster_id", offsetof(amqp_basic_properties_t,cluster_id), _BYTES_T, AMQP_BASIC_CLUSTER_ID_FLAG}, + { NULL, 0, 0, 0} +}; + void _PG_init(void); Datum pg_amqp_exchange_declare(PG_FUNCTION_ARGS); Datum pg_amqp_publish(PG_FUNCTION_ARGS); Datum pg_amqp_autonomous_publish(PG_FUNCTION_ARGS); Datum pg_amqp_disconnect(PG_FUNCTION_ARGS); +int pg_process_array(ArrayType* v, FunctionCallInfoData* fcinfo, pg_array_foo* array_foo); +int pg_array_get_elem(pg_array_foo* array_foo, pg_array_elem* elem); +int amqp_set_property(amqp_basic_properties_t* properties, char* key, char* value); + struct brokerstate { int broker_id; amqp_connection_state_t conn; - int sockfd; int uncommitted; int inerror; int idx; @@ -86,9 +135,14 @@ static void local_amqp_disconnect_bs(struct brokerstate *bs) { if(bs && bs->conn) { int errorstate = bs->inerror; - amqp_connection_close(bs->conn, AMQP_REPLY_SUCCESS); - if(bs->sockfd >= 0) close(bs->sockfd); - amqp_destroy_connection(bs->conn); + + if(bs->conn) { + amqp_connection_close(bs->conn, AMQP_REPLY_SUCCESS); + } + + if(bs->conn) { + amqp_destroy_connection(bs->conn); + } memset(bs, 0, sizeof(*bs)); bs->inerror = errorstate; } @@ -99,7 +153,7 @@ static void amqp_local_phase2(XactEvent event, void *arg) { switch(event) { case XACT_EVENT_COMMIT: for(bs = HEAD_BS; bs; bs = bs->next) { - if(bs->inerror) local_amqp_disconnect_bs(bs); + if(bs->inerror) local_amqp_disconnect_bs(bs); bs->inerror = 0; if(!bs->uncommitted) continue; if(bs->conn) amqp_tx_commit(bs->conn, 2, AMQP_EMPTY_TABLE); @@ -107,8 +161,8 @@ static void amqp_local_phase2(XactEvent event, void *arg) { if(reply->reply_type != AMQP_RESPONSE_NORMAL) { elog(WARNING, "amqp could not commit tx mode on broker %d, reply_type=%d, library_errno=%d", bs->broker_id, reply->reply_type, reply->library_errno); local_amqp_disconnect_bs(bs); - } - bs->uncommitted = 0; + } + bs->uncommitted = 0; } break; case XACT_EVENT_ABORT: @@ -121,10 +175,12 @@ static void amqp_local_phase2(XactEvent event, void *arg) { if(reply->reply_type != AMQP_RESPONSE_NORMAL) { elog(WARNING, "amqp could not rollback tx mode on broker %d, reply_type=%d, library_errno=%d", bs->broker_id, reply->reply_type, reply->library_errno); local_amqp_disconnect_bs(bs); - } - bs->uncommitted = 0; + } + bs->uncommitted = 0; } break; + case XACT_EVENT_PRE_COMMIT: + case XACT_EVENT_PRE_PREPARE: case XACT_EVENT_PREPARE: /* nothin' */ return; @@ -136,8 +192,7 @@ void _PG_init() { RegisterXactCallback(amqp_local_phase2, NULL); } -static struct brokerstate * -local_amqp_get_a_bs(broker_id) { +static struct brokerstate *local_amqp_get_a_bs(broker_id) { struct brokerstate *bs; for(bs = HEAD_BS; bs; bs = bs->next) { if(bs->broker_id == broker_id) return bs; @@ -148,15 +203,14 @@ local_amqp_get_a_bs(broker_id) { HEAD_BS = bs; return bs; } -static struct brokerstate * -local_amqp_get_bs(broker_id) { +static struct brokerstate *local_amqp_get_bs(broker_id) { char sql[1024]; char host_copy[300] = ""; int tries = 0; struct brokerstate *bs = local_amqp_get_a_bs(broker_id); if(bs->conn) return bs; if(SPI_connect() == SPI_ERROR_CONNECT) return NULL; - snprintf(sql, sizeof(sql), "SELECT host, port, vhost, username, password " + snprintf(sql, sizeof(sql), "SELECT host, port, vhost, username, password, requiressl, verify_cert, verify_cn, cert, key, key_password, ca " " FROM amqp.broker " " WHERE broker_id = %d " " ORDER BY host DESC, port", broker_id); @@ -167,56 +221,117 @@ local_amqp_get_bs(broker_id) { if(SPI_processed > 0) { struct timeval hb = { .tv_sec = 2UL, .tv_usec = 0UL }; amqp_rpc_reply_t *reply, s_reply; - char *host, *vhost, *user, *pass; - Datum port_datum; + char *host, *vhost, *user, *pass, *cert = NULL, *key = NULL, *ca = NULL, *key_pass = NULL; + Datum port_datum, requiressl_datum; bool is_null; + bool requiressl= false, verifyCert = false, verifyCN = false; int port = 5672; + int ssl_flags = 0; + int success = 0; + bs->idx = (bs->idx + 1) % SPI_processed; + host = SPI_getvalue(SPI_tuptable->vals[bs->idx], SPI_tuptable->tupdesc, 1); - if(!host) host = "localhost"; + if(!host) + host = "localhost"; + port_datum = SPI_getbinval(SPI_tuptable->vals[bs->idx], SPI_tuptable->tupdesc, 2, &is_null); - if(!is_null) port = DatumGetInt32(port_datum); + if(!is_null) + port = DatumGetInt32(port_datum); + vhost = SPI_getvalue(SPI_tuptable->vals[bs->idx], SPI_tuptable->tupdesc, 3); - if(!vhost) vhost = "/"; + if(!vhost) + vhost = "/"; + user = SPI_getvalue(SPI_tuptable->vals[bs->idx], SPI_tuptable->tupdesc, 4); - if(!user) user = "guest"; + if(!user) + user = "guest"; + pass = SPI_getvalue(SPI_tuptable->vals[bs->idx], SPI_tuptable->tupdesc, 5); - if(!pass) pass = "guest"; + if(!pass) + pass = "guest"; + + /* ssl data */ + requiressl_datum = SPI_getbinval(SPI_tuptable->vals[bs->idx], SPI_tuptable->tupdesc, 6, &is_null); + if(!is_null) + requiressl = DatumGetBool(requiressl_datum); + + requiressl_datum = SPI_getbinval(SPI_tuptable->vals[bs->idx], SPI_tuptable->tupdesc, 7, &is_null); + if(!is_null) + verifyCert = DatumGetBool(requiressl_datum); + + requiressl_datum = SPI_getbinval(SPI_tuptable->vals[bs->idx], SPI_tuptable->tupdesc, 8, &is_null); + if(!is_null) + verifyCN = DatumGetBool(requiressl_datum); + + /* get ssl values */ + cert = SPI_getvalue(SPI_tuptable->vals[bs->idx], SPI_tuptable->tupdesc, 9); + key = SPI_getvalue(SPI_tuptable->vals[bs->idx], SPI_tuptable->tupdesc, 10); + key_pass = SPI_getvalue(SPI_tuptable->vals[bs->idx], SPI_tuptable->tupdesc, 11); + ca = SPI_getvalue(SPI_tuptable->vals[bs->idx], SPI_tuptable->tupdesc, 12); + + /* set ssl flags */ + if(verifyCert) + ssl_flags |= AMQP_SSL_FLAG_VERIFY; + + if(verifyCN) + ssl_flags |= AMQP_SSL_FLAG_CHECK_CN; + + snprintf(host_copy, sizeof(host_copy), "%s:%d", host, port); - bs->conn = amqp_new_connection(); - if(!bs->conn) { SPI_finish(); return NULL; } - bs->sockfd = amqp_open_socket(host, port, &hb); - if(bs->sockfd < 0) { - elog(WARNING, "amqp[%s] login socket/connect failed: %s", - host_copy, strerror(-bs->sockfd)); - goto busted; + do{ + bs->conn = amqp_new_ssl_connection(cert, key, key_pass, ca, ssl_flags); + if( NULL != bs->conn) { + elog(INFO, "amqp[%s] successfully created ssl connection for broker %d", host_copy, broker_id); + break; + } + + /* ssl is not required, try create unsecure connection */ + if( !requiressl ) { + bs->conn = amqp_new_connection(); + } else { + elog(WARNING, "amqp[%s] unable to create ssl connetion, but ssl is required", host_copy); + } + + + if(!bs->conn) { + SPI_finish(); + return NULL; + } + }while(0); + +// bs->sockfd = amqp_open_socket(host, port, &hb); + success = amqp_connect(bs->conn, host, port, &hb); + if(!success) { +// elog(WARNING, "amqp[%s] login socket/connect failed: %s", host_copy, strerror(-bs->sockfd)); + elog(WARNING, "amqp[%s] login socket/connect failed!", host_copy); + goto busted; } - amqp_set_sockfd(bs->conn, bs->sockfd); - s_reply = amqp_login(bs->conn, vhost, 0, 131072, - 0, AMQP_SASL_METHOD_PLAIN, +// amqp_set_sockfd(bs->conn, bs->sockfd); + s_reply = amqp_login(bs->conn, vhost, 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, user, pass); if(s_reply.reply_type != AMQP_RESPONSE_NORMAL) { - elog(WARNING, "amqp[%s] login failed on broker %d", host_copy, broker_id); - goto busted; + elog(WARNING, "amqp[%s] login failed on broker %d", host_copy, broker_id); + goto busted; } amqp_channel_open(bs->conn, 1); reply = amqp_get_rpc_reply(); if(reply->reply_type != AMQP_RESPONSE_NORMAL) { - elog(WARNING, "amqp[%s] channel open failed on broker %d", host_copy, broker_id); - goto busted; + elog(WARNING, "amqp[%s] channel open failed on broker %d", host_copy, broker_id); + goto busted; } amqp_channel_open(bs->conn, 2); reply = amqp_get_rpc_reply(); if(reply->reply_type != AMQP_RESPONSE_NORMAL) { - elog(WARNING, "amqp[%s] channel open failed on broker %d", host_copy, broker_id); - goto busted; + elog(WARNING, "amqp[%s] channel open failed on broker %d", host_copy, broker_id); + goto busted; } amqp_tx_select(bs->conn, 2, AMQP_EMPTY_TABLE); reply = amqp_get_rpc_reply(); if(reply->reply_type != AMQP_RESPONSE_NORMAL) { - elog(WARNING, "amqp[%s] could not start tx mode on broker %d", host_copy, broker_id); - goto busted; + elog(WARNING, "amqp[%s] could not start tx mode on broker %d", host_copy, broker_id); + goto busted; } } else { elog(WARNING, "amqp can't find broker %d", broker_id); @@ -235,15 +350,14 @@ local_amqp_get_bs(broker_id) { local_amqp_disconnect_bs(bs); return bs; } -static void -local_amqp_disconnect(broker_id) { + +static void local_amqp_disconnect(broker_id) { struct brokerstate *bs = local_amqp_get_a_bs(broker_id); local_amqp_disconnect_bs(bs); } PG_FUNCTION_INFO_V1(pg_amqp_exchange_declare); -Datum -pg_amqp_exchange_declare(PG_FUNCTION_ARGS) { +Datum pg_amqp_exchange_declare(PG_FUNCTION_ARGS) { struct brokerstate *bs; if(!PG_ARGISNULL(0)) { int broker_id; @@ -273,12 +387,95 @@ pg_amqp_exchange_declare(PG_FUNCTION_ARGS) { } PG_RETURN_BOOL(0 != 0); } + + +int pg_process_array(ArrayType* v, FunctionCallInfoData* fcinfo, pg_array_foo* array_foo){ + int *dims, ndims; + Oid element_type; + int typlen; + bool typbyval; + char typalign; + char typdelim; + Oid typiofunc; + + ndims = ARR_NDIM(v); + dims = ARR_DIMS(v); + + if (ndims != 2 || dims[1] != 2){ + elog(ERROR, "headers must be n x 2 dimensional array"); + return 1; + } + + if (array_contains_nulls(v)){ + elog(ERROR, "headers may not contain NULLs"); + return 1; + } + + element_type = ARR_ELEMTYPE(v); + + get_type_io_data(element_type, IOFunc_output, + (int16*)&typlen, &typbyval, + &typalign, &typdelim, + &array_foo->typelem, &typiofunc); + + fmgr_info_cxt(typiofunc, &array_foo->proc, fcinfo->flinfo->fn_mcxt); + array_foo->elems = dims[0]; + array_foo->iter = array_create_iterator(v, 0); + return 0; +} + +int pg_array_get_elem(pg_array_foo* array_foo, pg_array_elem* elem){ + bool isnull; + Datum value; + if (array_iterate(array_foo->iter, &value, &isnull)){ + elem->key = DatumGetCString(FunctionCall3(&array_foo->proc, value, ObjectIdGetDatum(array_foo->typelem), Int32GetDatum(-1))); + if (!array_iterate(array_foo->iter, &value, &isnull)){ + elog(ERROR, "array seriously fucked"); + return 2; + } + elem->value = DatumGetCString(FunctionCall3(&array_foo->proc, value, ObjectIdGetDatum(array_foo->typelem), Int32GetDatum(-1))); + } else { + elem->key = NULL; + elem->value = NULL; + return 1; + } + return 0; +} + +int amqp_set_property(amqp_basic_properties_t* properties, char* key, char* value){ + int i; + void* field; + for (i = 0; amqp_fields[i].name; i++) + if (!strcmp(key,amqp_fields[i].name)) + break; + if (!amqp_fields[i].name) + return 1; + + field = (void*)((size_t)&properties->headers + amqp_fields[i].offset); + + switch (amqp_fields[i].type) { + case _BYTES_T: + ((amqp_bytes_t*)field)->len = strlen(value); + ((amqp_bytes_t*)field)->bytes = value; + break; + case _UINT64_T: + *(uint64_t*)field = (uint64_t)atoi(value); + break; + case _UINT8_T: + *(uint8_t*)field = (uint8_t)atoi(value); + break; + } + properties->_flags |= amqp_fields[i].flag; + return 0; +} + static Datum pg_amqp_publish_opt(PG_FUNCTION_ARGS, int channel) { struct brokerstate *bs; if(!PG_ARGISNULL(0)) { int broker_id; amqp_basic_properties_t properties; + memset(&properties, 0, sizeof(properties)); int once_more = 1; broker_id = PG_GETARG_INT32(0); @@ -294,6 +491,11 @@ pg_amqp_publish_opt(PG_FUNCTION_ARGS, int channel) { amqp_bytes_t body_b = amqp_cstring_bytes(""); properties._flags = 0; + + set_bytes_from_text(exchange_b,1); + set_bytes_from_text(routing_key_b,2); + set_bytes_from_text(body_b,3); + /* Sets delivery_mode */ if (!PG_ARGISNULL(4)) { if (PG_GETARG_INT32(4) == 1 || PG_GETARG_INT32(4) == 2) { @@ -322,11 +524,65 @@ pg_amqp_publish_opt(PG_FUNCTION_ARGS, int channel) { properties._flags |= AMQP_BASIC_CORRELATION_ID_FLAG; set_bytes_from_text(properties.correlation_id, 7); } - - set_bytes_from_text(exchange_b,1); - set_bytes_from_text(routing_key_b,2); - set_bytes_from_text(body_b,3); + /* headers */ + if (!PG_ARGISNULL(8)) { + ArrayType *v; + pg_array_foo array_headers; + pg_array_elem header; + int i; + int ret; + + v = PG_GETARG_ARRAYTYPE_P(8); + if (pg_process_array(v, fcinfo, &array_headers)){ + PG_RETURN_BOOL(0 != 0); + } + + properties.headers.num_entries = array_headers.elems; + properties._flags |= AMQP_BASIC_HEADERS_FLAG; + properties.headers.entries = malloc(array_headers.elems * sizeof(amqp_table_entry_t)); + if (!properties.headers.entries){ + elog(ERROR, "out of memory"); + PG_RETURN_BOOL(0 != 0); + } + + i = 0; + while (!(ret = pg_array_get_elem(&array_headers, &header))){ + properties.headers.entries[i].key = amqp_cstring_bytes(header.key); + properties.headers.entries[i].kind = 'S'; + properties.headers.entries[i].value.bytes = amqp_cstring_bytes(header.value); + i++; + } + + array_free_iterator(array_headers.iter); + if (ret == 2) + PG_RETURN_BOOL(0 != 0); + + + } + + /* generic properties */ + if (!PG_ARGISNULL(9)) { + ArrayType *v; + pg_array_foo array_properties; + pg_array_elem array_elem; + int ret; + + v = PG_GETARG_ARRAYTYPE_P(9); + if (pg_process_array(v, fcinfo, &array_properties)) + PG_RETURN_BOOL(0 != 0); + + while (!(ret = pg_array_get_elem(&array_properties, &array_elem))){ + if(amqp_set_property(&properties, array_elem.key, array_elem.value)){ + elog(WARNING, "Unknow property name '%s', ignore value", array_elem.key); + } + } + array_free_iterator(array_properties.iter); + if (ret == 2) + PG_RETURN_BOOL(0 != 0); + } + + //rv = amqp_basic_publish(bs->conn, channel, exchange_b, routing_key_b, mandatory, immediate, &properties, body_b); if (properties._flags == 0) { rv = amqp_basic_publish(bs->conn, channel, exchange_b, routing_key_b, mandatory, immediate, NULL, body_b); @@ -336,6 +592,9 @@ pg_amqp_publish_opt(PG_FUNCTION_ARGS, int channel) { } reply = amqp_get_rpc_reply(); + if(properties.headers.entries) { + safe_free(properties.headers.entries); + } if(rv || reply->reply_type != AMQP_RESPONSE_NORMAL) { if(once_more && (channel == 1 || bs->uncommitted == 0)) { once_more = 0; @@ -354,20 +613,17 @@ pg_amqp_publish_opt(PG_FUNCTION_ARGS, int channel) { } PG_FUNCTION_INFO_V1(pg_amqp_publish); -Datum -pg_amqp_publish(PG_FUNCTION_ARGS) { +Datum pg_amqp_publish(PG_FUNCTION_ARGS) { return pg_amqp_publish_opt(fcinfo, 2); } PG_FUNCTION_INFO_V1(pg_amqp_autonomous_publish); -Datum -pg_amqp_autonomous_publish(PG_FUNCTION_ARGS) { +Datum pg_amqp_autonomous_publish(PG_FUNCTION_ARGS) { return pg_amqp_publish_opt(fcinfo, 1); } PG_FUNCTION_INFO_V1(pg_amqp_disconnect); -Datum -pg_amqp_disconnect(PG_FUNCTION_ARGS) { +Datum pg_amqp_disconnect(PG_FUNCTION_ARGS) { if(!PG_ARGISNULL(0)) { int broker_id; broker_id = PG_GETARG_INT32(0); diff --git a/updates/amqp--0.4.1--0.4.2.sql b/updates/amqp--0.4.1--0.4.2.sql new file mode 100644 index 0000000..2829b60 --- /dev/null +++ b/updates/amqp--0.4.1--0.4.2.sql @@ -0,0 +1,8 @@ +ALTER TABLE @extschema@.broker +ADD COLUMN requiressl boolean DEFAULT false, +ADD COLUMN verify_cert boolean DEFAULT true, +ADD COLUMN verify_cn boolean DEFAULT true, +ADD COLUMN cert text, +ADD COLUMN key text, +ADD COLUMN key_password character varying, +ADD COLUMN ca text;