Ffmpeg Audio Out of Sync on Raspberry Pi Live Streaming

I have written a code that is live streaming a nature camera from my Raspberry Pi using a Pi Camera v2 to Facebook Live/YouTube.

I have ran into a couple of problems with the code that I can’t solve and require some expert support.

This is the code:

raspivid -o - -t 0 -w 1440 -h 1080 -fps 30 -b 8000000 -g 50 | ffmpeg -r 30 -ar 44100 -ac 2 -acodec pcm_s16le -f s16le -ac 2 -i /dev/zero -f h264 -r 30 -i - -vcodec copy -acodec aac -ab 128k -g 10 -f flv rtmp://192.168.1.1/nestbox1/**streamkey**  

The code streams fine but there are two problems:

Firstly, the audio is delayed by around 4 seconds. I have tried to solve this by using -itsoffset however this makes the audio track silent. I have tried using a negative value for e.g. -itsoffset -4 and this creates a negative audio result, which is the opposite of what I am trying to do.

-itsoffset 4 

Secondly, the audio drops out for a very short time every 5 seconds and then starts again. I suspect this is the aac driver included with ffmpeg. I have tried replacing -acodec aac with pcm_s16le and this works fine however, facebook live requires aac audio so I need to use this some how. I have looked into the FDK aac codec and this does the same.

-acodec pcm_16le 

I know there is some issue with raspivid not creating timestamps and therefore not passing them to ffmpeg and this might be part of the problem. I am continually getting a timestamp error from ffmpeg but this has not caused any problems with live streaming before

Structured Spark Streaming throwing java.lang.NoSuchMethodError: org.apache.spark.sql.SQLContext.internalCreateDataFrame

I’m starting with Structured Spark Streaming with Kafka source and was following a simple tutorial. My Kafka server is OSS kafka. My producer source code is as follows

import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord;  import java.util.Properties;  public class LogGenerator {     public static void main(String[] args) throws Exception {         Properties prop = new Properties();         prop.load(ClassLoader.getSystemResourceAsStream("kafka-producer.properties"));          KafkaProducer<String, String> producer = new KafkaProducer<String, String>(prop);          for (int i=0; i<10000; i++) {             System.out.printf("%d ", i);             producer.send(new ProducerRecord<>("my-log-topic", Integer.toString(i), Integer.toString(i)));         }     }  } 

The producer writes just (0,0) through (999,999)

The structured Streaming code to read from this topic is as follows

mport org.apache.spark.sql.SparkSession

object DimaStreamer {    //initialize configs    val spark: SparkSession = SparkSession.builder().master("local").appName("StreamingData").getOrCreate()    def main(args: Array[String]): Unit = {     val dfRefundStream = spark.readStream.format("kafka")       .option("kafka.bootstrap.servers", "0.0.0.0:9091,0.0.0.0:9092")       .option("subscribe", "my-log-topic")        .load()      import org.apache.spark.sql.functions._     dfRefundStream.printSchema()     dfRefundStream.select(col("value")).writeStream       .format("console")         .queryName("inittest")       .start()       .awaitTermination()    } } 

Its a maven project. The job is executed as follows with the –jars option used for passing the jars with comma separation.

spark-submit --class com.apple.arcadia.solr.batch.DimaStreamer --jars $  JARSPATH target/mainproject-1.0-SNAPSHOT.jar 

The job throws the following exception

19/06/13 15:10:05 INFO internals.ConsumerCoordinator: Setting newly assigned partitions [my-log-topic-1, my-log-topic-0] for group spark-kafka-source-0ce55060-09ad-430a-9916-e0063d8103fe--48657453-driver-0 19/06/13 15:10:06 INFO kafka010.KafkaSource: Initial offsets: {"my-log-topic":{"1":9878,"0":10122}} 19/06/13 15:10:06 INFO streaming.StreamExecution: Committed offsets for batch 0. Metadata OffsetSeqMetadata(0,1560463806021,Map(spark.sql.shuffle.partitions -> 200)) 19/06/13 15:10:06 INFO kafka010.KafkaSource: GetBatch called with start = None, end = {"my-log-topic":{"1":9878,"0":10122}} 19/06/13 15:10:06 INFO kafka010.KafkaSource: Partitions added: Map() 19/06/13 15:10:06 INFO kafka010.KafkaSource: GetBatch generating RDD of offset range: KafkaSourceRDDOffsetRange(my-log-topic-0,10122,10122,None), KafkaSourceRDDOffsetRange(my-log-topic-1,9878,9878,None) 19/06/13 15:10:06 ERROR streaming.StreamExecution: Query inittest [id = 305e10aa-a446-4f42-a4e9-8d2372250fa8, runId = 2218049f-08a4-40bf-9b46-b7e898321b85] terminated with error java.lang.NoSuchMethodError: org.apache.spark.sql.SQLContext.internalCreateDataFrame(Lorg/apache/spark/rdd/RDD;Lorg/apache/spark/sql/types/StructType;Z)Lorg/apache/spark/sql/Dataset;     at org.apache.spark.sql.kafka010.KafkaSource.getBatch(KafkaSource.scala:301)     at org.apache.spark.sql.execution.streaming.StreamExecution$  $  anonfun$  org$  apache$  spark$  sql$  execution$  streaming$  StreamExecution$  $  runBatch$  2$  $  anonfun$  apply$  7.apply(StreamExecution.scala:614)     at org.apache.spark.sql.execution.streaming.StreamExecution$  $  anonfun$  org$  apache$  spark$  sql$  execution$  streaming$  StreamExecution$  $  runBatch$  2$  $  anonfun$  apply$  7.apply(StreamExecution.scala:610)     at scala.collection.TraversableLike$  $  anonfun$  flatMap$  1.apply(TraversableLike.scala:241)     at scala.collection.TraversableLike$  $  anonfun$  flatMap$  1.apply(TraversableLike.scala:241)     at scala.collection.Iterator$  class.foreach(Iterator.scala:893)     at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)     at scala.collection.IterableLike$  class.foreach(IterableLike.scala:72)     at org.apache.spark.sql.execution.streaming.StreamProgress.foreach(StreamProgress.scala:25)     at scala.collection.TraversableLike$  class.flatMap(TraversableLike.scala:241)     at org.apache.spark.sql.execution.streaming.StreamProgress.flatMap(StreamProgress.scala:25)     at org.apache.spark.sql.execution.streaming.StreamExecution$  $  anonfun$  org$  apache$  spark$  sql$  execution$  streaming$  StreamExecution$  $  runBatch$  2.apply(StreamExecution.scala:610)     at org.apache.spark.sql.execution.streaming.StreamExecution$  $  anonfun$  org$  apache$  spark$  sql$  execution$  streaming$  StreamExecution$  $  runBatch$  2.apply(StreamExecution.scala:610)     at org.apache.spark.sql.execution.streaming.ProgressReporter$  class.reportTimeTaken(ProgressReporter.scala:279)     at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)     at org.apache.spark.sql.execution.streaming.StreamExecution.org$  apache$  spark$  sql$  execution$  streaming$  StreamExecution$  $  runBatch(StreamExecution.scala:609)     at org.apache.spark.sql.execution.streaming.StreamExecution$  $  anonfun$  org$  apache$  spark$  sql$  execution$  streaming$  StreamExecution$  $  runBatches$  1$  $  anonfun$  apply$  mcZ$  sp$  1.apply$  mcV$  sp(StreamExecution.scala:306)     at org.apache.spark.sql.execution.streaming.StreamExecution$  $  anonfun$  org$  apache$  spark$  sql$  execution$  streaming$  StreamExecution$  $  runBatches$  1$  $  anonfun$  apply$  mcZ$  sp$  1.apply(StreamExecution.scala:294)     at org.apache.spark.sql.execution.streaming.StreamExecution$  $  anonfun$  org$  apache$  spark$  sql$  execution$  streaming$  StreamExecution$  $  runBatches$  1$  $  anonfun$  apply$  mcZ$  sp$  1.apply(StreamExecution.scala:294)     at org.apache.spark.sql.execution.streaming.ProgressReporter$  class.reportTimeTaken(ProgressReporter.scala:279)     at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)     at org.apache.spark.sql.execution.streaming.StreamExecution$  $  anonfun$  org$  apache$  spark$  sql$  execution$  streaming$  StreamExecution$  $  runBatches$  1.apply$  mcZ$  sp(StreamExecution.scala:294)     at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)     at org.apache.spark.sql.execution.streaming.StreamExecution.org$  apache$  spark$  sql$  execution$  streaming$  StreamExecution$  $  runBatches(StreamExecution.scala:290)     at org.apache.spark.sql.execution.streaming.StreamExecution$  $  anon$  1.run(StreamExecution.scala:206) Exception in thread "stream execution thread for inittest [id = 305e10aa-a446-4f42-a4e9-8d2372250fa8, runId = 2218049f-08a4-40bf-9b46-b7e898321b85]" java.lang.NoSuchMethodError: org.apache.spark.sql.SQLContext.internalCreateDataFrame(Lorg/apache/spark/rdd/RDD;Lorg/apache/spark/sql/types/StructType;Z)Lorg/apache/spark/sql/Dataset;     at org.apache.spark.sql.kafka010.KafkaSource.getBatch(KafkaSource.scala:301)     at org.apache.spark.sql.execution.streaming.StreamExecution$  $  anonfun$  org$  apache$  spark$  sql$  execution$  streaming$  StreamExecution$  $  runBatch$  2$  $  anonfun$  apply$  7.apply(StreamExecution.scala:614)     at org.apache.spark.sql.execution.streaming.StreamExecution$  $  anonfun$  org$  apache$  spark$  sql$  execution$  streaming$  StreamExecution$  $  runBatch$  2$  $  anonfun$  apply$  7.apply(StreamExecution.scala:610)     at scala.collection.TraversableLike$  $  anonfun$  flatMap$  1.apply(TraversableLike.scala:241)     at scala.collection.TraversableLike$  $  anonfun$  flatMap$  1.apply(TraversableLike.scala:241)     at scala.collection.Iterator$  class.foreach(Iterator.scala:893)     at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)     at scala.collection.IterableLike$  class.foreach(IterableLike.scala:72)     at org.apache.spark.sql.execution.streaming.StreamProgress.foreach(StreamProgress.scala:25)     at scala.collection.TraversableLike$  class.flatMap(TraversableLike.scala:241)     at org.apache.spark.sql.execution.streaming.StreamProgress.flatMap(StreamProgress.scala:25)     at org.apache.spark.sql.execution.streaming.StreamExecution$  $  anonfun$  org$  apache$  spark$  sql$  execution$  streaming$  StreamExecution$  $  runBatch$  2.apply(StreamExecution.scala:610)     at org.apache.spark.sql.execution.streaming.StreamExecution$  $  anonfun$  org$  apache$  spark$  sql$  execution$  streaming$  StreamExecution$  $  runBatch$  2.apply(StreamExecution.scala:610)     at org.apache.spark.sql.execution.streaming.ProgressReporter$  class.reportTimeTaken(ProgressReporter.scala:279)     at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)     at org.apache.spark.sql.execution.streaming.StreamExecution.org$  apache$  spark$  sql$  execution$  streaming$  StreamExecution$  $  runBatch(StreamExecution.scala:609)     at org.apache.spark.sql.execution.streaming.StreamExecution$  $  anonfun$  org$  apache$  spark$  sql$  execution$  streaming$  StreamExecution$  $  runBatches$  1$  $  anonfun$  apply$  mcZ$  sp$  1.apply$  mcV$  sp(StreamExecution.scala:306)     at org.apache.spark.sql.execution.streaming.StreamExecution$  $  anonfun$  org$  apache$  spark$  sql$  execution$  streaming$  StreamExecution$  $  runBatches$  1$  $  anonfun$  apply$  mcZ$  sp$  1.apply(StreamExecution.scala:294)     at org.apache.spark.sql.execution.streaming.StreamExecution$  $  anonfun$  org$  apache$  spark$  sql$  execution$  streaming$  StreamExecution$  $  runBatches$  1$  $  anonfun$  apply$  mcZ$  sp$  1.apply(StreamExecution.scala:294)     at org.apache.spark.sql.execution.streaming.ProgressReporter$  class.reportTimeTaken(ProgressReporter.scala:279)     at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)     at org.apache.spark.sql.execution.streaming.StreamExecution$  $  anonfun$  org$  apache$  spark$  sql$  execution$  streaming$  StreamExecution$  $  runBatches$  1.apply$  mcZ$  sp(StreamExecution.scala:294)     at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)     at org.apache.spark.sql.execution.streaming.StreamExecution.org$  apache$  spark$  sql$  execution$  streaming$  StreamExecution$  $  runBatches(StreamExecution.scala:290)     at org.apache.spark.sql.execution.streaming.StreamExecution$  $  anon$  1.run(StreamExecution.scala:206) Exception in thread "main" org.apache.spark.sql.streaming.StreamingQueryException: org.apache.spark.sql.SQLContext.internalCreateDataFrame(Lorg/apache/spark/rdd/RDD;Lorg/apache/spark/sql/types/StructType;Z)Lorg/apache/spark/sql/Dataset; === Streaming Query === Identifier: inittest [id = 305e10aa-a446-4f42-a4e9-8d2372250fa8, runId = 2218049f-08a4-40bf-9b46-b7e898321b85] Current Committed Offsets: {} Current Available Offsets: {KafkaSource[Subscribe[my-log-topic]]: {"my-log-topic":{"1":9878,"0":10122}}}  Current State: ACTIVE Thread State: RUNNABLE  Logical Plan: Project [value#1] +- StreamingExecutionRelation KafkaSource[Subscribe[my-log-topic]], [key#0, value#1, topic#2, partition#3, offset#4L, timestamp#5, timestampType#6]      at org.apache.spark.sql.execution.streaming.StreamExecution.org$  apache$  spark$  sql$  execution$  streaming$  StreamExecution$  $  runBatches(StreamExecution.scala:343)     at org.apache.spark.sql.execution.streaming.StreamExecution$  $  anon$  1.run(StreamExecution.scala:206) Caused by: java.lang.NoSuchMethodError: org.apache.spark.sql.SQLContext.internalCreateDataFrame(Lorg/apache/spark/rdd/RDD;Lorg/apache/spark/sql/types/StructType;Z)Lorg/apache/spark/sql/Dataset;     at org.apache.spark.sql.kafka010.KafkaSource.getBatch(KafkaSource.scala:301)     at org.apache.spark.sql.execution.streaming.StreamExecution$  $  anonfun$  org$  apache$  spark$  sql$  execution$  streaming$  StreamExecution$  $  runBatch$  2$  $  anonfun$  apply$  7.apply(StreamExecution.scala:614)     at org.apache.spark.sql.execution.streaming.StreamExecution$  $  anonfun$  org$  apache$  spark$  sql$  execution$  streaming$  StreamExecution$  $  runBatch$  2$  $  anonfun$  apply$  7.apply(StreamExecution.scala:610)     at scala.collection.TraversableLike$  $  anonfun$  flatMap$  1.apply(TraversableLike.scala:241)     at scala.collection.TraversableLike$  $  anonfun$  flatMap$  1.apply(TraversableLike.scala:241)     at scala.collection.Iterator$  class.foreach(Iterator.scala:893)     at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)     at scala.collection.IterableLike$  class.foreach(IterableLike.scala:72)     at org.apache.spark.sql.execution.streaming.StreamProgress.foreach(StreamProgress.scala:25)     at scala.collection.TraversableLike$  class.flatMap(TraversableLike.scala:241)     at org.apache.spark.sql.execution.streaming.StreamProgress.flatMap(StreamProgress.scala:25)     at org.apache.spark.sql.execution.streaming.StreamExecution$  $  anonfun$  org$  apache$  spark$  sql$  execution$  streaming$  StreamExecution$  $  runBatch$  2.apply(StreamExecution.scala:610)     at org.apache.spark.sql.execution.streaming.StreamExecution$  $  anonfun$  org$  apache$  spark$  sql$  execution$  streaming$  StreamExecution$  $  runBatch$  2.apply(StreamExecution.scala:610)     at org.apache.spark.sql.execution.streaming.ProgressReporter$  class.reportTimeTaken(ProgressReporter.scala:279)     at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)     at org.apache.spark.sql.execution.streaming.StreamExecution.org$  apache$  spark$  sql$  execution$  streaming$  StreamExecution$  $  runBatch(StreamExecution.scala:609)     at org.apache.spark.sql.execution.streaming.StreamExecution$  $  anonfun$  org$  apache$  spark$  sql$  execution$  streaming$  StreamExecution$  $  runBatches$  1$  $  anonfun$  apply$  mcZ$  sp$  1.apply$  mcV$  sp(StreamExecution.scala:306)     at org.apache.spark.sql.execution.streaming.StreamExecution$  $  anonfun$  org$  apache$  spark$  sql$  execution$  streaming$  StreamExecution$  $  runBatches$  1$  $  anonfun$  apply$  mcZ$  sp$  1.apply(StreamExecution.scala:294)     at org.apache.spark.sql.execution.streaming.StreamExecution$  $  anonfun$  org$  apache$  spark$  sql$  execution$  streaming$  StreamExecution$  $  runBatches$  1$  $  anonfun$  apply$  mcZ$  sp$  1.apply(StreamExecution.scala:294)     at org.apache.spark.sql.execution.streaming.ProgressReporter$  class.reportTimeTaken(ProgressReporter.scala:279)     at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)     at org.apache.spark.sql.execution.streaming.StreamExecution$  $  anonfun$  org$  apache$  spark$  sql$  execution$  streaming$  StreamExecution$  $  runBatches$  1.apply$  mcZ$  sp(StreamExecution.scala:294)     at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)     at org.apache.spark.sql.execution.streaming.StreamExecution.org$  apache$  spark$  sql$  execution$  streaming$  StreamExecution$  $  runBatches(StreamExecution.scala:290)     ... 1 more 

Based on the exception, it seems like kafka010.KafkaSource class seems to be missing a method with signature SQLContext.internalCreateDataFrame(rdd, schema)

Upon checking the source code here[https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala]

I see the method exists. Has anyone seen such an issue and resolved it? If yes, could you please help understand what the issue was

Finally, the details are the POM are

 <dependency>         <groupId>org.apache.spark</groupId>         <artifactId>spark-core_2.11</artifactId>       <version>2.3.1</version>     </dependency>     <dependency>       <groupId>org.apache.spark</groupId>       <artifactId>spark-core_2.11</artifactId>       <version>2.3.1</version>       <classifier>tests</classifier>     </dependency>     <dependency>       <groupId>org.apache.spark</groupId>       <artifactId>spark-sql_2.11</artifactId>       <version>2.3.1</version>     </dependency>       <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka-0-10 -->     <dependency>       <groupId>org.apache.spark</groupId>       <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>       <version>2.3.1</version>     </dependency>     <dependency>       <groupId>org.apache.spark</groupId>       <artifactId>spark-sql-kafka-0-10_2.11</artifactId>       <version>2.3.1</version>     </dependency>     <dependency>       <groupId>org.apache.spark</groupId>       <artifactId>spark-sql_2.11</artifactId>       <version>2.3.1</version>     </dependency> 

Looking for advice setting streaming site

I’m a sysadmin of a small barber college. I was charged with building a Web site to host training videos for your students. I need a server specs or setup for the following:

1- 200 students w/ max 100 concurrent streams. 2- 200 videos ranging from 5mins-1 hours each. 3- Ubuntu WordPress site running nginx or open lightspeed 4- Expecting 10TB of bandwidth/month.

I’m looking at buying a dedicated 1U Ryzen Epyc 8 cores CPU w/ 16GB of ram, 1GB NVME OS w/ 2 x 4TB RAID disk and setup the following VM:

1- 1 core/1GB RAM – Dev/Staging 2- 3 cored/7GB – Mysql Server 3- 4 cores/8GB RAM – Web Server

My question is what type of VM should I use, colocation needs for my server or if that’s a proper setup. I’m planning to use Cloudflare free CDN or is there a better and cheaper CDN to use ?

Any help or ideas on how to build a proper video streaming site appreciated.

Thank you in advance for your help.

H.264 not streaming over dynamic DNS

I’m trying to view a network camera remotely so I registered with a dynamic DNS service. I can access the camera fine on the DNS associated URL and stream using MPEG but when I switch to H.264, the stream fails.

What’s odd is I can stream H.264 if I use the local ip address. So the failure here is only occurring when streaming H.264 using the dynamic DNS address rather than the local ip.

I can’t imagine why this would be. Would anyone have any insight?