Earn the coveted Fabric Analytics Engineer certification. 100% off your exam for a limited time only!
Hello,
I'm pulling data into Power BI using the Elastic Stack REST API, but I'm running into a problem when trying to pull more than 10k records.
In order to do so, I need to use the scroll function: https://www.elastic.co/guide/en/elasticsearch/reference/master/search-request-scroll.html
The first call:
GET /_search?scroll=1m { "query": ... }
Will return the first set of results plus a scroll id:
DXF1ZXJ5QW5kRmV0Y2gBAAAAAAAAAD4...
Which is used for subsequent calls:
GET /_search { "scroll_id": "DXF1Z..." }
Each call returns a set of records plus a new scroll id. The scroll id is then passed onto the next record, and so on.
For performance reasons, I don't want to increase the number of records that can be returned by the REST API.
Is it possible to solve this using Power BI?
Thanks,
Ken
Solved! Go to Solution.
There have been a few different threads like this but I believe the solution lies in creating a Power Query function that you call recursively. Here is an example of functions and recursion on a completely unrelated topic but might get you there:
Thanks for pointing me in the right direction.
For anyone else that runs into this problem, here's the first iteration of a working function:
let RecursiveElasticFetch = (queryUrl, scrollUrl, scrollId, counter) => let Counter = if (counter = null) then 0 else counter, Results = if (scrollId = null) then Json.Document(Web.Contents(queryUrl)) else Json.Document(Web.Contents(scrollUrl&scrollId, [Headers=[MyHeader=Text.From(Counter)]])), ParsedResults = Table.FromList(Results[hits][hits], Splitter.SplitByNothing(), null, null, ExtraValues.Error), Return = if (Counter < 10) then ParsedResults & RecursiveElasticFetch(queryUrl, scrollUrl, Results[_scroll_id], Counter+1) else ParsedResults in Return in RecursiveElasticFetch
For the parameters:
queryUrl - The query URL that kicks off the search: http://elasticsrch-dev:9200/intranet*/_search?scroll=1m&source={....}
scrollUrl - The URL used for subsequent searches (the scroll id is appended at the end): http://elasticsrch-dev:9200/_search/scroll?scroll=1m&scroll_id=
scrollId - Used to pass the scroll_id to subsequent calls. Leave it blank for the first call
counter - Used to limit the number of iterations. Leave it blank for the first call.
Good to know, thanks!
I replicated the problem in our environment and I believe I have a fix.
Power BI caches results from repeat calls to the same URL - this is why I added in the customer header. Adding the same snippet to your call seems to fix the problem for me.
Json.Document(Web.Contents(scrollURL,[Content = Text.ToBinary(scrollBody&scrollID&scrollEND), Headers=[MyHeader=Text.From(Counter)]])),
Let me know if this works for you.
Hello,
I believe I am having the cached result issue. Adding this to the function ends up with an error since scrollBody and scrollEND are not defined anywhere in the code.
Where would I add these variables?
Please help!
Thanks!
Nic
Just want to confirm: Are you using the code snippet that I posted in this forum or that wendt_1 posted in the Elastic forum?
Sorry emorquecho, that question was directed at nicpenning.
In your case, I'm not sure how to help or what the issue might be. Your suggestion sounds reasonable, but I haven't done a lot with Elastic data in Power BI since I originally posted on this.
I didn't have the right code! I had a different code block I was using.
The following is the function I used that worked:
let RecursiveElasticFetch = (url, scrollURL, urlBody, scrollBody, scrollID, counter, scrollEND) => let Counter = if (counter = null) then 0 else counter, Results = if (scrollID = null) then Json.Document(Web.Contents(url,[Content = Text.ToBinary(urlBody)])) else Json.Document(Web.Contents(scrollURL,[Content = Text.ToBinary(scrollBody&scrollID&scrollEND)])), ParsedResults = Table.FromList(Results[hits][hits], Splitter.SplitByNothing(), null, null, ExtraValues.Error), Return = if (Counter < 10) then ParsedResults & RecursiveElasticFetch(url, scrollURL, urlBody, scrollBody, Results[_scroll_id], Counter+1, scrollEND) else ParsedResults in Return in RecursiveElasticFetch
And this is a snippet of the query:
let url = "http://[elasticinstance]:9200/index*/_search?scroll=1m", scrollURL = "http://[elasticinstance]:9200/_search/scroll", urlBody = "{""size"":10000,""query"":{""match_all"":{}}}", scrollBody = "{""scroll"":""10m"",""scroll_id"":""", scrollID = null, counter = null, scrollEND = """}", Source = RecursiveElasticFetch(url, scrollURL, urlBody, scrollBody, scrollID, counter scrollEND) in* Source*
* I chopped out all of my steps but I believe this should work.
I can query all the data, but it is a bit slow. It takes a few minutes for about 120K rows.
I have the following code but for some reasons it's not working
can anyone help me correct it? i'm getting: Token comma expected error. and cant seem to solve it
let url = "http://Redacted 'URL edited by Admin':82/the_hive*/_search?scroll=1m", scrollURL = "http://Redacted 'URL edited by Admin':82/_search/scroll", urlBody = "{""size"":10000,""query"":{""match_all"":{}}}", scrollBody = "{""scroll"":""10m"",""scroll_id"":""", scrollID = null, counter = null, scrollEND = """}", Source = RecursiveElasticFetch(url, scrollURL, urlBody, scrollBody, scrollID, counter, scrollEND) let RecursiveElasticFetch = (url, scrollURL, urlBody, scrollBody, scrollID, counter, scrollEND) => let Counter = if (counter = null) then 0 else counter, Results = if (scrollID = null) then Json.Document(Web.Contents(url,[Content = Text.ToBinary(urlBody)])) else Json.Document(Web.Contents(scrollURL,[Content = Text.ToBinary(scrollBody&scrollID&scrollEND)])), ParsedResults = Table.FromList(Results[hits][hits], Splitter.SplitByNothing(), null, null, ExtraValues.Error), Return = if (Counter < 10) then ParsedResults & RecursiveElasticFetch(url, scrollURL, urlBody, scrollBody, Results[_scroll_id], Counter+1, scrollEND) else ParsedResults in Return in RecursiveElasticFetch in Source
@nicpenning how do you actually combine the 2 queries to get the elastic scroll efect?
Reply edited by Admin
Create your query with this code:
let url = "http://[elasticinstance]:9200/index*/_search?scroll=1m", scrollURL = "http://[elasticinstance]:9200/_search/scroll", urlBody = "{""size"":10000,""query"":{""match_all"":{}}}", scrollBody = "{""scroll"":""10m"",""scroll_id"":""", scrollID = null, counter = null, scrollEND = """}", Source = RecursiveElasticFetch(url, scrollURL, urlBody, scrollBody, scrollID, counter, scrollEND) in Source
You should create a function (Right-Click your Query and click new function) with this code:
let RecursiveElasticFetch = (url, scrollURL, urlBody, scrollBody, scrollID, counter, scrollEND) => let Counter = if (counter = null) then 0 else counter, Results = if (scrollID = null) then Json.Document(Web.Contents(url,[Content = Text.ToBinary(urlBody)])) else Json.Document(Web.Contents(scrollURL,[Content = Text.ToBinary(scrollBody&scrollID&scrollEND)])), ParsedResults = Table.FromList(Results[hits][hits], Splitter.SplitByNothing(), null, null, ExtraValues.Error), Return = if (Counter < 10) then ParsedResults & RecursiveElasticFetch(url, scrollURL, urlBody, scrollBody, Results[_scroll_id], Counter+1, scrollEND) else ParsedResults in Return in RecursiveElasticFetch
Revisiting this old post. This works well on an unauthenticated cluster.
Anyone know how to add an API Key to perform this type of query?
I understand I can add:
let RecursiveElasticFetch = (url, scrollURL, urlBody, scrollBody, scrollID, counter, scrollEND) => let Counter = if (counter = null) then 0 else counter, Results = if (scrollID = null) then Json.Document(Web.Contents(url,[Content = Text.ToBinary(urlBody),[Headers=[#"Authorization"="ApiKey redacted"]]])) else Json.Document(Web.Contents(scrollURL,[Content = Text.ToBinary(scrollBody&scrollID&scrollEND)])), ParsedResults = Table.FromList(Results[hits][hits], Splitter.SplitByNothing(), null, null, ExtraValues.Error), Return = if (Counter < 10) then ParsedResults & RecursiveElasticFetch(url, scrollURL, urlBody, scrollBody, Results[_scroll_id], Counter+1, scrollEND) else ParsedResults in Return in RecursiveElasticFetch
I believe the (406): Not Acceptable issue was because I was using a self-signed certificate.
I replaced the self-signed certificate with a much higher level signed certificate and that seemed to work locally and at the gateway level.
Otherwise, you have to tell PowerBi to trust self-signed certs:
Update the file C:\Program Files\Microsoft Power BI Desktop\bin\Microsoft.Mashup.Container.NetFX45.exe.config to:
<configuration> <system.net> <settings> <servicePointManager checkCertificateName="false" checkCertificateRevocationList="false" /> </settings> </system.net> </configuration>
Restart Power BI.
I added some other functionality to this script, so try this new function and query. I added API usage and
Create this function called RecursiveElasticFetch:
let
RecursiveElasticFetch = (baseUrl, relativepa, relativepa2, batchSize, urlBody, scrollBody, scrollID, counter, scrollEND, apiKey) =>
let
Results = if (scrollID = null) then
//Initial Query
Json.Document(Web.Contents(baseUrl, [RelativePath=relativepa, Headers=[#"Authorization"=Text.Combine({"ApiKey ",apiKey,""}), #"Content-Type"="application/json"], Content = Text.ToBinary(urlBody)]))
else
//All other queries execute this to gather results from the scroll api even if the scroll id changes.
Json.Document(Web.Contents(baseUrl, [RelativePath=relativepa2, Headers=[#"Authorization"=Text.Combine({"ApiKey ",apiKey,""}), #"Content-Type"="application/json", MyHeader=Text.From(counter)], Content = Text.ToBinary(scrollBody&scrollID&scrollEND)])),
//If this is the first time the function runs, the counter should be null so this will dynamically calcuate how many times this function needs to run.
counter = if (counter = null) then
//Dynamically get the counter - Note: You can uncomment the next line for testing and then comment out the Number.RoundUp
//7
Number.RoundUp(Results[hits][total][value]/Number.FromText(batchSize))
else
counter,
//Store the hits from the ElasticSearch query into ParsedResults and if results already exist, append more results to generate the full table of events.
ParsedResults = Table.FromList(Results[hits][hits], Splitter.SplitByNothing(), null, null, ExtraValues.Error),
Return = if (counter > 0) then
ParsedResults & RecursiveElasticFetch(baseUrl, relativepa, relativepa2, batchSize, urlBody, scrollBody, Results[_scroll_id], counter-1, scrollEND, apiKey)
else
ParsedResults
in
Return
in
RecursiveElasticFetch
Then create this query:
let
baseUrl = "https://127.0.0.1:9200",
url = Text.Combine(baseUrl,"/stats-*/_search?scroll=1m"),
scrollURL = Text.Combine(baseUrl,"/_search/scroll"),
relativepa = "/stats-transform-filebeat-cisco-*/_search?scroll=1m",
relativepa2 = "/_search/scroll",
batchSize = "10000",
urlBody = Text.Combine({"{""size"":",batchSize,",""query"":{""match_all"":{}}}"}),
scrollBody = "{""scroll"":""1m"",""scroll_id"":""",
scrollID = null,
counter = null,
scrollEND = """}",
apiKey = "Your API KEY",
Source = RecursiveElasticFetch(baseUrl, relativepa, relativepa2, batchSize, urlBody, scrollBody, scrollID, counter, scrollEND, apiKey),
#"Renamed Columns" = Table.RenameColumns(Source,{{"Column1", "test"}}),
#"Expanded test" = Table.ExpandRecordColumn(#"Renamed Columns", "test", {"_index", "_source"}, {"_index", "_source"})
in
#"Expanded test"
If this doesn't work.
Try looking at your Elasticsearch logs in the logs directory of Elasticsearch which is usually called your-cluster-name.log
See why Elasticsearch might not like your query.
Hi @nicpenning
your solution is working now, but i could not able refresh this data, when it is published in power bi service
This dataset includes a dynamic data source. Since dynamic data sources aren't refreshed in the Power BI service, this dataset won't be refreshed. Learn more: https://aka.ms/dynamic-data-sources.
You may have to create a new PowerBi and use the WebContents and enter in your url for Elasticsearch http://127.0.0.1:9200 or https://127.0.0.1:9200 if you have SSL enabled.
Get Data -> Other -> Web
Connect
URL: http:127.0.0.1:9200
Then you can create the function and queries and you should be good to go after changing the api and url parameters.
If you don't have security enabled then you might not need the api key.
If that is the case you will have to remove the API parts of the function and the query since I don't know what it will do if you give it an API key that is not used.
Do you have security in your cluster?
This is how to create an API key for a specific index.
https://www.elastic.co/guide/en/elasticsearch/reference/current/security-api-create-api-key.html
Hi @nicpenning
No, im in starting stage, i didnt configured security in cluster and also im using localhost server
This might be a long shot, but would there be any reason why this query doesn't quite pull absolutely everything? I see that I am missing some of the most recent indexed data but I haven't figured how much yet. If anybody else is seeing this as well please let me know.
Also, is there a good method to only query anything newer than what has already been queried? It doesn't seem efficient to query everything every time there is a change in the data.
Thanks in advance!
Hi nicpenning,
your solution worked perfectly, but when you go up to the Power BI Service, it does not allow you to update the data because it says that the query is dynamic.
Do you know how to resolve this?
You. Are. A lifesaver. I had no idea that Power cached results, and your solution worked like a charm. Thank you for the time you put in to help me with this issue!