--- apiVersion: v1 kind: List items: - apiVersion: v1 kind: ConfigMap metadata: labels: funktion.fabric8.io/kind: Connector provider: fabric8 project: connector-cql version: 1.1.9 group: io.fabric8.funktion.connector name: cql data: deployment.yml: | --- apiVersion: extensions/v1beta1 kind: Deployment metadata: labels: funktion.fabric8.io/kind: Subscription connector: cql spec: replicas: 1 template: metadata: labels: funktion.fabric8.io/kind: Subscription connector: cql spec: containers: - image: fabric8/connector-cql:1.1.9 name: connector schema.yml: | --- component: kind: component scheme: cql syntax: cql:beanRef:hosts:port/keyspace title: Cassandra CQL description: The cql component aims at integrating Cassandra 2.0 using the CQL3 API (not the Thrift API). label: database,nosql deprecated: false async: false javaType: org.apache.camel.component.cassandra.CassandraComponent groupId: org.apache.camel artifactId: camel-cassandraql version: 2.18.1 componentProperties: {} properties: beanRef: kind: path group: common type: string javaType: java.lang.String deprecated: false secret: false description: beanRef is defined using bean:id hosts: kind: path group: common type: string javaType: java.lang.String deprecated: false secret: false description: Hostname(s) cassansdra server(s). Multiple hosts can be separated by comma. port: kind: path group: common type: integer javaType: java.lang.Integer deprecated: false secret: false description: Port number of cassansdra server(s) keyspace: kind: path group: common type: string javaType: java.lang.String deprecated: false secret: false description: Keyspace to use cluster: kind: parameter group: common type: object javaType: com.datastax.driver.core.Cluster deprecated: false secret: false description: To use the Cluster instance (you would normally not use this option) clusterName: kind: parameter group: common type: string javaType: java.lang.String deprecated: false secret: false description: Cluster name consistencyLevel: kind: parameter group: common type: string javaType: com.datastax.driver.core.ConsistencyLevel enum: - ANY - ONE - TWO - THREE - QUORUM - ALL - LOCAL_QUORUM - EACH_QUORUM - SERIAL - LOCAL_SERIAL - LOCAL_ONE deprecated: false secret: false description: Consistency level to use cql: kind: parameter group: common type: string javaType: java.lang.String deprecated: false secret: false description: CQL query to perform. Can be overridden with the message header with key CamelCqlQuery. loadBalancingPolicy: kind: parameter group: common type: string javaType: java.lang.String deprecated: false secret: false description: To use a specific LoadBalancingPolicy password: kind: parameter group: common type: string javaType: java.lang.String deprecated: false secret: false description: Password for session authentication prepareStatements: kind: parameter group: common type: boolean javaType: boolean deprecated: false secret: false defaultValue: true description: Whether to use PreparedStatements or regular Statements resultSetConversionStrategy: kind: parameter group: common type: string javaType: java.lang.String deprecated: false secret: false description: To use a custom class that implements logic for converting ResultSet into message body ALL ONE LIMIT_10 LIMIT_100... session: kind: parameter group: common type: object javaType: com.datastax.driver.core.Session deprecated: false secret: false description: To use the Session instance (you would normally not use this option) username: kind: parameter group: common type: string javaType: java.lang.String deprecated: false secret: false description: Username for session authentication bridgeErrorHandler: kind: parameter group: consumer label: consumer type: boolean javaType: boolean optionalPrefix: consumer. deprecated: false secret: false defaultValue: false description: Allows for bridging the consumer to the Camel routing Error Handler which mean any exceptions occurred while the consumer is trying to pickup incoming messages or the likes will now be processed as a message and handled by the routing Error Handler. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions that will be logged at WARN/ERROR level and ignored. exceptionHandler: kind: parameter group: consumer (advanced) label: consumer,advanced type: object javaType: org.apache.camel.spi.ExceptionHandler optionalPrefix: consumer. deprecated: false secret: false description: To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this options is not in use. By default the consumer will deal with exceptions that will be logged at WARN/ERROR level and ignored. exchangePattern: kind: parameter group: consumer (advanced) label: consumer,advanced type: string javaType: org.apache.camel.ExchangePattern enum: - InOnly - RobustInOnly - InOut - InOptionalOut - OutOnly - RobustOutOnly - OutIn - OutOptionalIn deprecated: false secret: false description: Sets the exchange pattern when the consumer creates an exchange. synchronous: kind: parameter group: advanced label: advanced type: boolean javaType: boolean deprecated: false secret: false defaultValue: false description: Sets whether synchronous processing should be strictly used or Camel is allowed to use asynchronous processing (if supported). documentation.adoc: | [[Cassandra-CamelCassandraComponent]] Camel Cassandra Component ------------------------- *Available as of Camel 2.15* http://cassandra.apache.org[Apache Cassandra] is an open source NoSQL database designed to handle large amounts on commodity hardware. Like Amazon's DynamoDB, Cassandra has a peer-to-peer and master-less architecture to avoid single point of failure and garanty high availability. Like Google's BigTable, Cassandra data is structured using column families which can be accessed through the Thrift RPC API or a SQL-like API called CQL. This component aims at integrating Cassandra 2.0+ using the CQL3 API (not the Thrift API). It's based on https://github.com/datastax/java-driver[Cassandra Java Driver] provided by DataStax. Maven users will need to add the following dependency to their `pom.xml`: *pom.xml* [source,xml] ------------------------------------------------------------ org.apache.camel camel-cassandraql x.y.z ------------------------------------------------------------ [[Cassandra-URIformat]] URI format ~~~~~~~~~~ The endpoint can initiate the Cassandra connection or use an existing one. [cols="<,<",options="header",] |====================================================================== |URI |Description |`cql:localhost/keyspace` |Single host, default port, usual for testing |`cql:host1,host2/keyspace` |Multi host, default port |`cql:host1,host2:9042/keyspace` |Multi host, custom port |`cql:host1,host2` |Default port and keyspace |`cql:bean:sessionRef` |Provided Session reference |`cql:bean:clusterRef/keyspace` |Provided Cluster reference |====================================================================== To fine tune the Cassandra connection (SSL options, pooling options, load balancing policy, retry policy, reconnection policy...), create your own Cluster instance and give it to the Camel endpoint. [[Cassandra-Options]] Cassandra Options ~~~~~~~~~~~~~~~~~ // component options: START The Cassandra CQL component has no options. // component options: END // endpoint options: START The Cassandra CQL component supports 18 endpoint options which are listed below: {% raw %} [width="100%",cols="2,1,1m,1m,5",options="header"] |======================================================================= | Name | Group | Default | Java Type | Description | beanRef | common | | String | beanRef is defined using bean:id | hosts | common | | String | Hostname(s) cassansdra server(s). Multiple hosts can be separated by comma. | port | common | | Integer | Port number of cassansdra server(s) | keyspace | common | | String | Keyspace to use | cluster | common | | Cluster | To use the Cluster instance (you would normally not use this option) | clusterName | common | | String | Cluster name | consistencyLevel | common | | ConsistencyLevel | Consistency level to use | cql | common | | String | CQL query to perform. Can be overridden with the message header with key CamelCqlQuery. | loadBalancingPolicy | common | | String | To use a specific LoadBalancingPolicy | password | common | | String | Password for session authentication | prepareStatements | common | true | boolean | Whether to use PreparedStatements or regular Statements | resultSetConversionStrategy | common | | String | To use a custom class that implements logic for converting ResultSet into message body ALL ONE LIMIT_10 LIMIT_100... | session | common | | Session | To use the Session instance (you would normally not use this option) | username | common | | String | Username for session authentication | bridgeErrorHandler | consumer | false | boolean | Allows for bridging the consumer to the Camel routing Error Handler which mean any exceptions occurred while the consumer is trying to pickup incoming messages or the likes will now be processed as a message and handled by the routing Error Handler. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions that will be logged at WARN/ERROR level and ignored. | exceptionHandler | consumer (advanced) | | ExceptionHandler | To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this options is not in use. By default the consumer will deal with exceptions that will be logged at WARN/ERROR level and ignored. | exchangePattern | consumer (advanced) | | ExchangePattern | Sets the exchange pattern when the consumer creates an exchange. | synchronous | advanced | false | boolean | Sets whether synchronous processing should be strictly used or Camel is allowed to use asynchronous processing (if supported). |======================================================================= {% endraw %} // endpoint options: END [[Cassandra-Messages]] Messages ~~~~~~~~ [[Cassandra-IncomingMessage]] Incoming Message ^^^^^^^^^^^^^^^^ The Camel Cassandra endpoint expects a bunch of simple objects (`Object` or `Object[]` or `Collection`) which will be bound to the CQL statement as query parameters. If message body is null or empty, then?? CQL query will be executed without binding parameters. Headers: * `CamelCqlQuery` (optional, `String` or `RegularStatement`): CQL query either as a plain String or built using the `QueryBuilder`. [[Cassandra-OutgoingMessage]] Outgoing Message ^^^^^^^^^^^^^^^^ The Camel Cassandra endpoint produces one or many a Cassandra Row objects depending on the??`resultSetConversionStrategy`: ?? * `List` if `resultSetConversionStrategy` is `ALL` or `LIMIT_[0-9]+` * Single` Row` if `resultSetConversionStrategy` is `ONE` * Anything else, if `resultSetConversionStrategy` is a custom implementation of the `ResultSetConversionStrategy` [[Cassandra-Repositories]] Repositories ~~~~~~~~~~~~ Cassandra can be used to store message keys or messages for the idempotent and aggregation EIP. Cassandra might not be the best tool for queuing use cases yet, read http://www.datastax.com/dev/blog/cassandra-anti-patterns-queues-and-queue-like-datasets[Cassandra anti-patterns queues and queue like datasets]. It's advised to use LeveledCompaction and a small GC grace setting for these tables to allow tombstoned rows to be removed quickly. [[Cassandra-Idempotentrepository]] Idempotent repository ^^^^^^^^^^^^^^^^^^^^^ The `NamedCassandraIdempotentRepository` stores messages keys in a Cassandra table like this: *CAMEL_IDEMPOTENT.cql* [source,sql] --------------------------------------------------------- CREATE TABLE CAMEL_IDEMPOTENT ( NAME varchar, -- Repository name KEY varchar, -- Message key PRIMARY KEY (NAME, KEY) ) WITH compaction = {'class':'LeveledCompactionStrategy'} AND gc_grace_seconds = 86400; --------------------------------------------------------- This repository implementation uses lightweight transactions (also known as Compare and Set) and requires Cassandra 2.0.7+. Alternatively, the `CassandraIdempotentRepository` does not have a `NAME` column and can be extended to use a different data model. [width="100%",cols="<34%,<33%,<33%",options="header",] |======================================================================= |Option |Default |Description |`table` |`CAMEL_IDEMPOTENT` |Table name |`pkColumns` |`NAME`,` KEY` |Primary key columns |`name` | | Repository name, value used for `NAME` column |`ttl` | | Key time to live |`writeConsistencyLevel` | | Consistency level used to insert/delete key: `ANY`, `ONE`, `TWO`, `QUORUM`, `LOCAL_QUORUM`??? |`readConsistencyLevel` | | Consistency level used to read/check key: `ONE`, `TWO`, `QUORUM`, `LOCAL_QUORUM`??? |======================================================================= [[Cassandra-Aggregationrepository]] Aggregation repository ^^^^^^^^^^^^^^^^^^^^^^ The `NamedCassandraAggregationRepository` stores exchanges by correlation key in a Cassandra table like this: *CAMEL_AGGREGATION.cql* [source,sql] --------------------------------------------------------- CREATE TABLE CAMEL_AGGREGATION ( NAME varchar, -- Repository name KEY varchar, -- Correlation id EXCHANGE_ID varchar, -- Exchange id EXCHANGE blob, -- Serialized exchange PRIMARY KEY (NAME, KEY) ) WITH compaction = {'class':'LeveledCompactionStrategy'} AND gc_grace_seconds = 86400; --------------------------------------------------------- Alternatively, the `CassandraAggregationRepository` does not have a `NAME` column and can be extended to use a different data model. [width="100%",cols="<34%,<33%,<33%",options="header",] |======================================================================= |Option |Default |Description |`table` |`CAMEL_AGGREGATION` |Table name |`pkColumns` |`NAME`,`KEY` |Primary key columns |`exchangeIdColumn` |`EXCHANGE_ID` |Exchange Id column |`exchangeColumn` |`EXCHANGE` |Exchange content column |`name` | | Repository name, value used for `NAME` column |`ttl` | | Exchange time to live |`writeConsistencyLevel` | | Consistency level used to insert/delete exchange: `ANY`, `ONE`, `TWO`, `QUORUM`, `LOCAL_QUORUM`??? |`readConsistencyLevel` | | Consistency level used to read/check exchange: `ONE`, `TWO`, `QUORUM`, `LOCAL_QUORUM`??? |=======================================================================