Dismiss
Announcing Stack Overflow Documentation

We started with Q&A. Technical documentation is next, and we need your help.

Whether you're a beginner or an experienced developer, you can contribute.

Sign up and start helping → Learn more about Documentation →

I am encountering this error while running storm topology in local mode. I have a simple program which checks whether a number is prime or not. I am using KafkaSpout as the source and storm for processing it. Kafka version 2.10-0.8.2.1 storm version 0.9.4 zookeeper 3.4.6

Below is my bolt which checks the prime number

public class PrimeNumberBolt extends BaseRichBolt 
{
    private static final long serialVersionUID = 1L;
    private OutputCollector collector;



    public void prepare( Map conf, TopologyContext context, OutputCollector collector ) 
    {
        this.collector = collector;
    }

    public void execute( Tuple tuple ) 
    {
        //System.out.println(tuple.getFields());
        //System.out.println(tuple.getString(0));
        String num = tuple.getString(0);
        //int number = tuple.getInteger( 0 );
        int number = Integer.parseInt(num);
        //System.out.println("IN Primenumber bolt = "+number);

        if( isPrime( number) )
        {  
            System.out.println( number );

        }
        collector.ack( tuple );
    }

    public void declareOutputFields( OutputFieldsDeclarer declarer ) 
    {
        declarer.declare( new Fields( "number" ) );
    }   

    private boolean isPrime( int n ) 
    {
        if( n == 1)
        {
            return false;
        }
        if( n == 2 || n == 3 )
        {
            return true;
        }

        // Is n an even number?
        if( n % 2 == 0 )
        {
            return false;
        }

        //if not, then just check the odds
        for( int i=3; i*i<=n; i+=2 ) 
        {
            if( n % i == 0)
            {
                return false;
            }
        }
        return true;
    }
}

Error :

 **18156 [Thread-11-prime] ERROR backtype.storm.util - Async loop died!
java.lang.RuntimeException: java.lang.ClassCastException: [B cannot be cast to java.lang.String**
    at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128) ~[storm-core-0.9.4.jar:0.9.4]
    at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99) ~[storm-core-0.9.4.jar:0.9.4]
    at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80) ~[storm-core-0.9.4.jar:0.9.4]
    at backtype.storm.daemon.executor$fn__3439$fn__3451$fn__3498.invoke(executor.clj:748) ~[storm-core-0.9.4.jar:0.9.4]
    at backtype.storm.util$async_loop$fn__460.invoke(util.clj:463) ~[storm-core-0.9.4.jar:0.9.4]
    at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
    at java.lang.Thread.run(Thread.java:745) [na:1.7.0_79]
Caused by: java.lang.ClassCastException: [B cannot be cast to java.lang.String
    **at backtype.storm.tuple.TupleImpl.getString(TupleImpl.java:112) ~[storm-core-0.9.4.jar:0.9.4]
    at com.geekcap.storm_test.PrimeNumberBolt.execute(PrimeNumberBolt.java:40) ~[classes/:na]**
    at backtype.storm.daemon.executor$fn__3439$tuple_action_fn__3441.invoke(executor.clj:633) ~[storm-core-0.9.4.jar:0.9.4]
    at backtype.storm.daemon.executor$mk_task_receiver$fn__3362.invoke(executor.clj:401) ~[storm-core-0.9.4.jar:0.9.4]
    at backtype.storm.disruptor$clojure_handler$reify__1445.onEvent(disruptor.clj:58) ~[storm-core-0.9.4.jar:0.9.4]
    at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125) ~[storm-core-0.9.4.jar:0.9.4]
    ... 6 common frames omitted
**18158 [Thread-11-prime] ERROR backtype.storm.daemon.executor - 
java.lang.RuntimeException: java.lang.ClassCastException: [B cannot be cast to java.lang.String**
    at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128) ~[storm-core-0.9.4.jar:0.9.4]
    at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99) ~[storm-core-0.9.4.jar:0.9.4]
    at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80) ~[storm-core-0.9.4.jar:0.9.4]
    at backtype.storm.daemon.executor$fn__3439$fn__3451$fn__3498.invoke(executor.clj:748) ~[storm-core-0.9.4.jar:0.9.4]
    at backtype.storm.util$async_loop$fn__460.invoke(util.clj:463) ~[storm-core-0.9.4.jar:0.9.4]
    at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
    at java.lang.Thread.run(Thread.java:745) [na:1.7.0_79]
**Caused by: java.lang.ClassCastException: [B cannot be cast to java.lang.String**
    at backtype.storm.tuple.TupleImpl.getString(TupleImpl.java:112) ~[storm-core-0.9.4.jar:0.9.4]
    at com.geekcap.storm_test.PrimeNumberBolt.execute(PrimeNumberBolt.java:40) ~[classes/:na]
    at backtype.storm.daemon.executor$fn__3439$tuple_action_fn__3441.invoke(executor.clj:633) ~[storm-core-0.9.4.jar:0.9.4]
    at backtype.storm.daemon.executor$mk_task_receiver$fn__3362.invoke(executor.clj:401) ~[storm-core-0.9.4.jar:0.9.4]
    at backtype.storm.disruptor$clojure_handler$reify__1445.onEvent(disruptor.clj:58) ~[storm-core-0.9.4.jar:0.9.4]
    at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125) ~[storm-core-0.9.4.jar:0.9.4]
    ... 6 common frames omitted
18375 [Thread-11-prime] ERROR backtype.storm.util - Halting process: ("Worker died")
java.lang.RuntimeException: ("Worker died")
    at backtype.storm.util$exit_process_BANG_.doInvoke(util.clj:325) [storm-core-0.9.4.jar:0.9.4]
    at clojure.lang.RestFn.invoke(RestFn.java:423) [clojure-1.5.1.jar:na]
    at backtype.storm.daemon.worker$fn__4693$fn__4694.invoke(worker.clj:491) [storm-core-0.9.4.jar:0.9.4]
    at backtype.storm.daemon.executor$mk_executor_data$fn__3272$fn__3273.invoke(executor.clj:240) [storm-core-0.9.4.jar:0.9.4]
    at backtype.storm.util$async_loop$fn__460.invoke(util.clj:473) [storm-core-0.9.4.jar:0.9.4]
    at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
    at java.lang.Thread.run(Thread.java:745) [na:1.7.0_79]

Please suggest me the modifications i need to do in the code.Thanks in Advance !!

share|improve this question
    
what tuple.getString(0); returns ? – Yogendra Sharma May 27 '15 at 5:26
1  
it returns a string.In my case in kafka cluster the data is stored as strings(like 232,12 etc).so i am reading and parsing it into int. – user-4870385 May 27 '15 at 5:30
up vote 4 down vote accepted

It seems like your kafka spout read data in byte array format.

Try to use String Scheme by setting spoutconfig.scheme like below.

spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
share|improve this answer
1  
This is my kafka spout ________________________ private static KafkaSpout buildKafkaSentenceSpout() { String zkHostPort = "localhost:2181"; String topic = "cust3"; String zkRoot = "/prime"; String zkSpoutId = "prime-spout"; ZkHosts zkHosts = new ZkHosts(zkHostPort); SpoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); SpoutConfig spoutCfg = new SpoutConfig(zkHosts, topic, zkRoot, zkSpoutId); KafkaSpout kafkaSpout = new KafkaSpout(spoutCfg); return kafkaSpout; } – user-4870385 May 27 '15 at 5:34
1  
i have added it.But it is throwing error as cannot make a static reference to a non-static field SpoutConfig.scheme – user-4870385 May 27 '15 at 5:36
1  
In your case I think you can declare at last, like spoutCfg.scheme=new SchemeAsMultiScheme(new StringScheme()); – 이승진 May 27 '15 at 5:39
1  
Thanks a lot man !! Work perfectly. – user-4870385 May 27 '15 at 5:44
    
Glad that it helped :) – 이승진 May 27 '15 at 6:31

Your Answer

 
discard

By posting your answer, you agree to the privacy policy and terms of service.

Not the answer you're looking for? Browse other questions tagged or ask your own question.